diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml
index f119d23..f55d660 100644
--- a/fizz-bootstrap/pom.xml
+++ b/fizz-bootstrap/pom.xml
@@ -12,7 +12,7 @@
com.fizzgate
fizz-bootstrap
- 2.2.4-SNAPSHOT
+ 2.2.4-beta1
1.8
@@ -32,6 +32,7 @@
4.0.1
3.5.9
1.15
+ 2.11.0
diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml
index 2225981..492d019 100644
--- a/fizz-common/pom.xml
+++ b/fizz-common/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.2.4-SNAPSHOT
+ 2.2.4-beta1
../pom.xml
4.0.0
diff --git a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
index e98a051..a660e76 100644
--- a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
+++ b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
@@ -20,6 +20,7 @@ package we.spring.http.server.reactive.ext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.PooledDataBuffer;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;
@@ -33,10 +34,18 @@ import java.nio.charset.StandardCharsets;
public class FizzServerHttpRequestDecorator extends ServerHttpRequestDecorator {
+ private HttpHeaders headers;
+
private Flux body = Flux.empty();
public FizzServerHttpRequestDecorator(ServerHttpRequest delegate) {
super(delegate);
+ headers = HttpHeaders.writableHttpHeaders(delegate.getHeaders());
+ }
+
+ @Override
+ public HttpHeaders getHeaders() {
+ return headers;
}
public void setBody(DataBuffer body) {
diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml
index 298dd92..2b5cf48 100644
--- a/fizz-core/pom.xml
+++ b/fizz-core/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.2.4-SNAPSHOT
+ 2.2.4-beta1
../pom.xml
4.0.0
diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java
index 5acdbc3..25f3a6a 100644
--- a/fizz-core/src/main/java/we/filter/AggregateFilter.java
+++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java
@@ -23,10 +23,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
+import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
@@ -46,6 +48,7 @@ import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.util.MapUtil;
+import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
@@ -97,7 +100,7 @@ public class AggregateFilter implements WebFilter {
}
long start = System.currentTimeMillis();
- FizzServerHttpRequestDecorator request = (FizzServerHttpRequestDecorator) exchange.getRequest();
+ ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse serverHttpResponse = exchange.getResponse();
String clientReqPathPrefix = WebUtils.getClientReqPathPrefix(exchange);
@@ -162,12 +165,19 @@ public class AggregateFilter implements WebFilter {
});
} else {
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
- DataBuffer buf = request.getRawBody();
- if (buf != null) {
- clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
- }
+ result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).flatMap(buf -> {
+ if (buf != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ try {
+ clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
+ } finally {
+ DataBufferUtils.release(buf);
+ }
+ }
+ return pipeline.run(input, clientInput, traceId);
+ });
+ } else {
+ result = pipeline.run(input, clientInput, traceId);
}
- result = pipeline.run(input, clientInput, traceId);
}
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
LogService.setBizId(traceId);
diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java
index 82d7ed4..40c733a 100644
--- a/fizz-core/src/main/java/we/filter/CallbackFilter.java
+++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@@ -41,8 +43,8 @@ import we.plugin.auth.Receiver;
import we.proxy.CallbackService;
import we.proxy.DiscoveryClientUriSelector;
import we.proxy.ServiceInstance;
-import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.util.Constants;
+import we.util.NettyDataBufferUtils;
import we.util.ThreadContext;
import we.util.WebUtils;
@@ -88,16 +90,36 @@ public class CallbackFilter extends FizzWebFilter {
ApiConfig ac = WebUtils.getApiConfig(exchange);
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
CallbackConfig cc = ac.callbackConfig;
- FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
- DataBuffer body = req.getRawBody();
- HashMap service2instMap = getService2instMap(ac);
- HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
- pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
- if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
- return directResponse(exchange, cc);
- } else {
- return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
- }
+ ServerHttpRequest req = exchange.getRequest();
+ return
+ DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
+ .flatMap(
+ b -> {
+ DataBuffer body = null;
+ if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ if (b instanceof PooledDataBuffer) {
+ byte[] bytes = new byte[b.readableByteCount()];
+ try {
+ b.read(bytes);
+ body = NettyDataBufferUtils.from(bytes);
+ } finally {
+ NettyDataBufferUtils.release(b);
+ }
+ } else {
+ body = b;
+ }
+ }
+ HashMap service2instMap = getService2instMap(ac);
+ HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
+ pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
+ if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
+ return directResponse(exchange, cc);
+ } else {
+ return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
+ }
+ }
+ )
+ ;
}
return chain.filter(exchange);
}
diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
index 197a422..771373e 100644
--- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java
+++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
-import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
@@ -33,8 +32,6 @@ import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AuthPluginFilter;
import we.plugin.stat.StatPluginFilter;
-import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
-import we.util.NettyDataBufferUtils;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -72,23 +69,8 @@ public class PreprocessFilter extends FizzWebFilter {
Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
- ServerHttpRequest req = exchange.getRequest();
- return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
- .flatMap(
- body -> {
- FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
- if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
- try {
- requestDecorator.setBody(body);
- } finally {
- NettyDataBufferUtils.release(body);
- }
- }
- ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).build();
- Mono vm = statPluginFilter.filter(newExchange, null, null);
- return process(newExchange, chain, eas, vm);
- }
- );
+ Mono vm = statPluginFilter.filter(exchange, null, null);
+ return process(exchange, chain, eas, vm);
}
// TODO
diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java
index a7a7daf..36be31b 100644
--- a/fizz-core/src/main/java/we/filter/RouteFilter.java
+++ b/fizz-core/src/main/java/we/filter/RouteFilter.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
-import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -39,11 +39,7 @@ import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
-import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
-import we.util.Constants;
-import we.util.JacksonUtils;
-import we.util.ThreadContext;
-import we.util.WebUtils;
+import we.util.*;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@@ -95,7 +91,7 @@ public class RouteFilter extends FizzWebFilter {
private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
- FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
+ ServerHttpRequest req = exchange.getRequest();
String rid = req.getId();
ApiConfig ac = WebUtils.getApiConfig(exchange);
HttpHeaders hdrs = null;
@@ -115,7 +111,7 @@ public class RouteFilter extends FizzWebFilter {
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
.toString();
- return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getRawBody()).flatMap(genServerResponse(exchange));
+ return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
} else if (ac.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, ac);
@@ -130,8 +126,8 @@ public class RouteFilter extends FizzWebFilter {
}
private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
- FizzServerHttpRequestDecorator clientReq = (FizzServerHttpRequestDecorator) exchange.getRequest();
- return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getRawBody()).flatMap(genServerResponse(exchange));
+ ServerHttpRequest clientReq = exchange.getRequest();
+ return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
}
private Function> genServerResponse(ServerWebExchange exchange) {
@@ -174,53 +170,58 @@ public class RouteFilter extends FizzWebFilter {
}
private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
+ final String[] ls = {null};
+ return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
+ .flatMap(
+ b -> {
+ HashMap parameters = null;
+ if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ String json = b.toString(StandardCharsets.UTF_8).trim();
+ ls[0] = json;
+ NettyDataBufferUtils.release(b);
+ if (json.charAt(0) == '[') {
+ ArrayList