diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml
index 3c3de67..f119d23 100644
--- a/fizz-bootstrap/pom.xml
+++ b/fizz-bootstrap/pom.xml
@@ -12,13 +12,13 @@
com.fizzgate
fizz-bootstrap
- 2.2.3
+ 2.2.4-SNAPSHOT
1.8
5.2.13.RELEASE
Dragonfruit-SR3
- Dysprosium-SR21
+ Dysprosium-SR22
5.3.7.RELEASE
4.1.66.Final
4.4.14
diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml
index 8c26568..2225981 100644
--- a/fizz-common/pom.xml
+++ b/fizz-common/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.2.3
+ 2.2.4-SNAPSHOT
../pom.xml
4.0.0
diff --git a/fizz-core/src/main/java/we/constants/CommonConstants.java b/fizz-common/src/main/java/we/constants/CommonConstants.java
similarity index 100%
rename from fizz-core/src/main/java/we/constants/CommonConstants.java
rename to fizz-common/src/main/java/we/constants/CommonConstants.java
diff --git a/fizz-core/src/main/java/we/flume/clients/log4j2appender/LogService.java b/fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java
similarity index 100%
rename from fizz-core/src/main/java/we/flume/clients/log4j2appender/LogService.java
rename to fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java
diff --git a/fizz-core/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java b/fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java
similarity index 100%
rename from fizz-core/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java
rename to fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java
diff --git a/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java b/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java
new file mode 100644
index 0000000..113bf3e
--- /dev/null
+++ b/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.util;
+
+import org.springframework.core.io.buffer.DataBuffer;
+
+/**
+ * @author hongqiaowei
+ */
+
+public class ConvertedRequestBodyDataBufferWrapper {
+
+ public DataBuffer body;
+
+ public ConvertedRequestBodyDataBufferWrapper(DataBuffer body) {
+ this.body = body;
+ }
+}
diff --git a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
new file mode 100644
index 0000000..8b36a3f
--- /dev/null
+++ b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.NettyDataBuffer;
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
+import org.springframework.core.io.buffer.PooledDataBuffer;
+import org.springframework.lang.Nullable;
+import we.flume.clients.log4j2appender.LogService;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author hongqiaowei
+ */
+
+public abstract class NettyDataBufferUtils extends org.springframework.core.io.buffer.DataBufferUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(NettyDataBufferUtils.class);
+
+ private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
+
+ public static NettyDataBuffer from(String s) {
+ return from(s.getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static NettyDataBuffer from(byte[] bytes) {
+ return (NettyDataBuffer) dataBufferFactory.wrap(bytes);
+ }
+
+ public static NettyDataBuffer from(ByteBuffer byteBuffer) {
+ return dataBufferFactory.wrap(byteBuffer);
+ }
+
+ public static NettyDataBuffer from(ByteBuf byteBuf) {
+ return dataBufferFactory.wrap(byteBuf);
+ }
+
+ public static boolean release(@Nullable String reqId, @Nullable DataBuffer dataBuffer) {
+ if (dataBuffer instanceof PooledDataBuffer) {
+ PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
+ if (pooledDataBuffer.isAllocated()) {
+ if (pooledDataBuffer instanceof NettyDataBuffer) {
+ NettyDataBuffer ndb = (NettyDataBuffer) pooledDataBuffer;
+ ByteBuf nativeBuffer = ndb.getNativeBuffer();
+ int refCnt = nativeBuffer.refCnt();
+ if (refCnt < 1) {
+ if (log.isDebugEnabled()) {
+ log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, reqId);
+ }
+ return false;
+ }
+ }
+ return pooledDataBuffer.release();
+ }
+ }
+ return false;
+ }
+}
diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml
index 60cab3e..298dd92 100644
--- a/fizz-core/pom.xml
+++ b/fizz-core/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.2.3
+ 2.2.4-SNAPSHOT
../pom.xml
4.0.0
diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java
index c22926d..c072d70 100644
--- a/fizz-core/src/main/java/we/filter/AggregateFilter.java
+++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java
@@ -61,6 +61,7 @@ import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.util.Constants;
import we.util.MapUtil;
+import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
/**
@@ -72,8 +73,6 @@ public class AggregateFilter implements WebFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregateFilter.class);
- private static final DataBuffer emptyBody = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes());
-
@Resource
private ConfigLoader configLoader;
@@ -171,19 +170,12 @@ public class AggregateFilter implements WebFilter {
});
} else {
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
- result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> {
- if (buf != null && buf != emptyBody) {
- try {
- clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
- } finally {
- DataBufferUtils.release(buf);
- }
- }
- return pipeline.run(input, clientInput, traceId);
- });
- } else {
- result = pipeline.run(input, clientInput, traceId);
+ DataBuffer buf = WebUtils.getRequestBody(exchange);
+ if (buf != null) {
+ clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
+ }
}
+ result = pipeline.run(input, clientInput, traceId);
}
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
LogService.setBizId(traceId);
diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java
index 4639689..7fd71c6 100644
--- a/fizz-core/src/main/java/we/filter/CallbackFilter.java
+++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java
@@ -18,14 +18,11 @@
package we.filter;
import com.alibaba.fastjson.JSON;
-import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@@ -90,33 +87,15 @@ public class CallbackFilter extends FizzWebFilter {
ApiConfig ac = WebUtils.getApiConfig(exchange);
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
CallbackConfig cc = ac.callbackConfig;
- ServerHttpRequest req = exchange.getRequest();
- DataBuffer[] body = {null};
- return
- DataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
- .flatMap(
- b -> {
- if (b != WebUtils.EMPTY_BODY) {
- body[0] = b;
- }
- HashMap service2instMap = getService2instMap(ac);
- HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
- pushReq2manager(exchange, headers, body[0], service2instMap, cc.id, ac.gatewayGroups.iterator().next());
- if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
- return directResponse(exchange, cc);
- } else {
- return callbackService.requestBackends(exchange, headers, body[0], cc, service2instMap);
- }
- }
- )
- .doFinally(
- s -> {
- if (body[0] != null) {
- DataBufferUtils.release(body[0]);
- }
- }
- )
- ;
+ DataBuffer body = WebUtils.getRequestBody(exchange);
+ HashMap service2instMap = getService2instMap(ac);
+ HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
+ pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
+ if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
+ return directResponse(exchange, cc);
+ } else {
+ return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
+ }
}
return chain.filter(exchange);
}
diff --git a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
index e1bfd49..bdcfb82 100644
--- a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
+++ b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
+import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -35,9 +36,7 @@ import we.exception.StopAndResponseException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
-import we.util.JacksonUtils;
-import we.util.ThreadContext;
-import we.util.WebUtils;
+import we.util.*;
import java.net.URI;
@@ -54,7 +53,7 @@ public class FilterExceptionHandlerConfig {
@Override
public Mono handle(ServerWebExchange exchange, Throwable t) {
ServerHttpResponse resp = exchange.getResponse();
- if (t instanceof StopAndResponseException) {
+ if (t instanceof StopAndResponseException) {
StopAndResponseException ex = (StopAndResponseException) t;
if (ex.getData() != null) {
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
@@ -83,8 +82,8 @@ public class FilterExceptionHandlerConfig {
}
}
if (t instanceof FizzRuntimeException) {
- FizzRuntimeException ex = (FizzRuntimeException) t;
- log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
+ FizzRuntimeException ex = (FizzRuntimeException) t;
+ log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
String reqId = exchange.getRequest().getId();
diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
index 82e9dfd..0da67de 100644
--- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java
+++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
@@ -17,12 +17,24 @@
package we.filter;
+import com.google.common.collect.BoundType;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import we.flume.clients.log4j2appender.LogService;
import we.plugin.FixedPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.plugin.PluginFilter;
@@ -30,10 +42,14 @@ import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AuthPluginFilter;
import we.plugin.stat.StatPluginFilter;
+import we.util.ConvertedRequestBodyDataBufferWrapper;
+import we.util.NettyDataBufferUtils;
import we.util.ReactorUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,6 +63,8 @@ import java.util.function.Function;
@Order(10)
public class PreprocessFilter extends FizzWebFilter {
+ private static final Logger log = LoggerFactory.getLogger(PreprocessFilter.class);
+
public static final String PREPROCESS_FILTER = "preprocessFilter";
private static final FilterResult succFr = FilterResult.SUCCESS(PREPROCESS_FILTER);
@@ -65,7 +83,42 @@ public class PreprocessFilter extends FizzWebFilter {
Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
- Mono vm = statPluginFilter.filter(exchange, null, null);
+ ServerHttpRequest req = exchange.getRequest();
+ return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
+ .flatMap(
+ body -> {
+ if (body != WebUtils.EMPTY_BODY && body.readableByteCount() > 0) {
+ try {
+ byte[] bytes = new byte[body.readableByteCount()];
+ body.read(bytes);
+ DataBuffer retain = NettyDataBufferUtils.from(bytes);
+ eas.put(WebUtils.REQUEST_BODY, retain);
+ } finally {
+ NettyDataBufferUtils.release(body);
+ }
+ }
+ Mono vm = statPluginFilter.filter(exchange, null, null);
+ return process(exchange, chain, eas, vm);
+ }
+ )
+ .doFinally(
+ s -> {
+ Object convertedRequestBody = WebUtils.getConvertedRequestBody(exchange);
+ if (convertedRequestBody instanceof ConvertedRequestBodyDataBufferWrapper) {
+ DataBuffer b = ((ConvertedRequestBodyDataBufferWrapper) convertedRequestBody).body;
+ if (b != null) {
+ boolean release = NettyDataBufferUtils.release(req.getId(), b);
+ if (log.isDebugEnabled()) {
+ log.debug("release converted request body databuffer " + release, LogService.BIZ_ID, req.getId());
+ }
+ }
+ }
+ }
+ );
+ }
+
+ // TODO
+ private Mono process(ServerWebExchange exchange, WebFilterChain chain, Map eas, Mono vm) {
return chain(exchange, vm, authPluginFilter).defaultIfEmpty(ReactorUtils.NULL)
.flatMap(
v -> {
diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java
index dc2e302..49b792a 100644
--- a/fizz-core/src/main/java/we/filter/RouteFilter.java
+++ b/fizz-core/src/main/java/we/filter/RouteFilter.java
@@ -112,7 +112,11 @@ public class RouteFilter extends FizzWebFilter {
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
.toString();
- return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
+ Object requestBody = WebUtils.getConvertedRequestBody(exchange);
+ if (requestBody == null) {
+ requestBody = WebUtils.getRequestBody(exchange);
+ }
+ return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, requestBody).flatMap(genServerResponse(exchange));
} else if (ac.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, ac);
@@ -128,7 +132,11 @@ public class RouteFilter extends FizzWebFilter {
private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
ServerHttpRequest clientReq = exchange.getRequest();
- return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
+ Object requestBody = WebUtils.getConvertedRequestBody(exchange);
+ if (requestBody == null) {
+ requestBody = WebUtils.getRequestBody(exchange);
+ }
+ return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, requestBody).flatMap(genServerResponse(exchange));
}
private Function> genServerResponse(ServerWebExchange exchange) {
@@ -171,64 +179,52 @@ public class RouteFilter extends FizzWebFilter {
}
private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
- DataBuffer[] body = {null};
- return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
- .flatMap(
- b -> {
- HashMap parameters = null;
- if (b != WebUtils.EMPTY_BODY) {
- body[0] = b;
- String json = body[0].toString(StandardCharsets.UTF_8).trim();
- if (json.charAt(0) == '[') {
- ArrayList