From 687f9fe7caff4cec7939a7a82b9c509797dcb940 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Thu, 12 Aug 2021 18:58:14 +0800 Subject: [PATCH] Optimize request body access in gateway #274 --- .../ext/FizzServerHttpRequestDecorator.java | 76 +++++++++++++++++++ ...ConvertedRequestBodyDataBufferWrapper.java | 33 -------- .../java/we/util/NettyDataBufferUtils.java | 2 + .../main/java/we/filter/AggregateFilter.java | 34 ++++----- .../main/java/we/filter/CallbackFilter.java | 4 +- .../main/java/we/filter/PreprocessFilter.java | 42 ++-------- .../src/main/java/we/filter/RouteFilter.java | 26 +++---- .../java/we/plugin/FizzPluginFilterChain.java | 7 +- .../src/main/java/we/proxy/FizzWebClient.java | 25 +----- fizz-core/src/main/java/we/util/WebUtils.java | 41 ---------- 10 files changed, 120 insertions(+), 170 deletions(-) create mode 100644 fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java delete mode 100644 fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java diff --git a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java new file mode 100644 index 0000000..e98a051 --- /dev/null +++ b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.spring.http.server.reactive.ext; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.PooledDataBuffer; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import reactor.core.publisher.Flux; +import we.util.NettyDataBufferUtils; + +import java.nio.charset.StandardCharsets; + +/** + * @author hongqiaowei + */ + +public class FizzServerHttpRequestDecorator extends ServerHttpRequestDecorator { + + private Flux body = Flux.empty(); + + public FizzServerHttpRequestDecorator(ServerHttpRequest delegate) { + super(delegate); + } + + public void setBody(DataBuffer body) { + if (body instanceof PooledDataBuffer) { + byte[] bytes = new byte[body.readableByteCount()]; + body.read(bytes); + setBody(bytes); + } else { + this.body = Flux.just(body); + } + } + + public void setBody(String body) { + byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + setBody(bytes); + } + + public void setBody(byte[] body) { + NettyDataBuffer from = NettyDataBufferUtils.from(body); + this.body = Flux.just(from); + } + + @Override + public Flux getBody() { + return body; + } + + public DataBuffer getRawBody() { + final DataBuffer[] raw = {null}; + body.subscribe( + dataBuffer -> { + raw[0] = dataBuffer; + } + ); + return raw[0]; + } +} diff --git a/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java b/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java deleted file mode 100644 index 113bf3e..0000000 --- a/fizz-common/src/main/java/we/util/ConvertedRequestBodyDataBufferWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2020 the original author or authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package we.util; - -import org.springframework.core.io.buffer.DataBuffer; - -/** - * @author hongqiaowei - */ - -public class ConvertedRequestBodyDataBufferWrapper { - - public DataBuffer body; - - public ConvertedRequestBodyDataBufferWrapper(DataBuffer body) { - this.body = body; - } -} diff --git a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java index 8b36a3f..440a377 100644 --- a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java +++ b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java @@ -41,6 +41,8 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + public static final DataBuffer EMPTY_DATA_BUFFER = from(new byte[0]); + public static NettyDataBuffer from(String s) { return from(s.getBytes(StandardCharsets.UTF_8)); } diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java index c072d70..5acdbc3 100644 --- a/fizz-core/src/main/java/we/filter/AggregateFilter.java +++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java @@ -17,36 +17,21 @@ package we.filter; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import javax.annotation.Resource; - +import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.codec.multipart.FilePart; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; - -import com.alibaba.fastjson.JSON; - -import io.netty.buffer.UnpooledByteBufAllocator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -59,11 +44,18 @@ import we.fizz.Pipeline; import we.fizz.input.Input; import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; -import we.util.Constants; +import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.MapUtil; -import we.util.NettyDataBufferUtils; import we.util.WebUtils; +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + /** * @author Francis Dong */ @@ -105,7 +97,7 @@ public class AggregateFilter implements WebFilter { } long start = System.currentTimeMillis(); - ServerHttpRequest request = exchange.getRequest(); + FizzServerHttpRequestDecorator request = (FizzServerHttpRequestDecorator) exchange.getRequest(); ServerHttpResponse serverHttpResponse = exchange.getResponse(); String clientReqPathPrefix = WebUtils.getClientReqPathPrefix(exchange); @@ -170,7 +162,7 @@ public class AggregateFilter implements WebFilter { }); } else { if (HttpMethod.POST.name().equalsIgnoreCase(method)) { - DataBuffer buf = WebUtils.getRequestBody(exchange); + DataBuffer buf = request.getRawBody(); if (buf != null) { clientInput.put("body", buf.toString(StandardCharsets.UTF_8)); } diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java index 7fd71c6..82d7ed4 100644 --- a/fizz-core/src/main/java/we/filter/CallbackFilter.java +++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java @@ -41,6 +41,7 @@ import we.plugin.auth.Receiver; import we.proxy.CallbackService; import we.proxy.DiscoveryClientUriSelector; import we.proxy.ServiceInstance; +import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.Constants; import we.util.ThreadContext; import we.util.WebUtils; @@ -87,7 +88,8 @@ public class CallbackFilter extends FizzWebFilter { ApiConfig ac = WebUtils.getApiConfig(exchange); if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { CallbackConfig cc = ac.callbackConfig; - DataBuffer body = WebUtils.getRequestBody(exchange); + FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); + DataBuffer body = req.getRawBody(); HashMap service2instMap = getService2instMap(ac); HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next()); diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java index 0da67de..197a422 100644 --- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java +++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java @@ -17,24 +17,15 @@ package we.filter; -import com.google.common.collect.BoundType; -import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FixedPluginFilter; import we.plugin.FizzPluginFilterChain; import we.plugin.PluginFilter; @@ -42,14 +33,12 @@ import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfigService; import we.plugin.auth.AuthPluginFilter; import we.plugin.stat.StatPluginFilter; -import we.util.ConvertedRequestBodyDataBufferWrapper; +import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.NettyDataBufferUtils; import we.util.ReactorUtils; import we.util.WebUtils; import javax.annotation.Resource; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,35 +73,20 @@ public class PreprocessFilter extends FizzWebFilter { eas.put(WebUtils.APPEND_HEADERS, appendHdrs); ServerHttpRequest req = exchange.getRequest(); - return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY) + return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) .flatMap( body -> { - if (body != WebUtils.EMPTY_BODY && body.readableByteCount() > 0) { + FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req); + if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { try { - byte[] bytes = new byte[body.readableByteCount()]; - body.read(bytes); - DataBuffer retain = NettyDataBufferUtils.from(bytes); - eas.put(WebUtils.REQUEST_BODY, retain); + requestDecorator.setBody(body); } finally { NettyDataBufferUtils.release(body); } } - Mono vm = statPluginFilter.filter(exchange, null, null); - return process(exchange, chain, eas, vm); - } - ) - .doFinally( - s -> { - Object convertedRequestBody = WebUtils.getConvertedRequestBody(exchange); - if (convertedRequestBody instanceof ConvertedRequestBodyDataBufferWrapper) { - DataBuffer b = ((ConvertedRequestBodyDataBufferWrapper) convertedRequestBody).body; - if (b != null) { - boolean release = NettyDataBufferUtils.release(req.getId(), b); - if (log.isDebugEnabled()) { - log.debug("release converted request body databuffer " + release, LogService.BIZ_ID, req.getId()); - } - } - } + ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).build(); + Mono vm = statPluginFilter.filter(newExchange, null, null); + return process(newExchange, chain, eas, vm); } ); } diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java index 49b792a..a7a7daf 100644 --- a/fizz-core/src/main/java/we/filter/RouteFilter.java +++ b/fizz-core/src/main/java/we/filter/RouteFilter.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -40,6 +39,7 @@ import we.plugin.auth.ApiConfig; import we.proxy.FizzWebClient; import we.proxy.dubbo.ApacheDubboGenericService; import we.proxy.dubbo.DubboInterfaceDeclaration; +import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.Constants; import we.util.JacksonUtils; import we.util.ThreadContext; @@ -47,7 +47,10 @@ import we.util.WebUtils; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; /** @@ -92,7 +95,7 @@ public class RouteFilter extends FizzWebFilter { private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) { - ServerHttpRequest req = exchange.getRequest(); + FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); String rid = req.getId(); ApiConfig ac = WebUtils.getApiConfig(exchange); HttpHeaders hdrs = null; @@ -112,11 +115,7 @@ public class RouteFilter extends FizzWebFilter { String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort()) .append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange)) .toString(); - Object requestBody = WebUtils.getConvertedRequestBody(exchange); - if (requestBody == null) { - requestBody = WebUtils.getRequestBody(exchange); - } - return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, requestBody).flatMap(genServerResponse(exchange)); + return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getRawBody()).flatMap(genServerResponse(exchange)); } else if (ac.type == ApiConfig.Type.DUBBO) { return dubboRpc(exchange, ac); @@ -131,12 +130,8 @@ public class RouteFilter extends FizzWebFilter { } private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) { - ServerHttpRequest clientReq = exchange.getRequest(); - Object requestBody = WebUtils.getConvertedRequestBody(exchange); - if (requestBody == null) { - requestBody = WebUtils.getRequestBody(exchange); - } - return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, requestBody).flatMap(genServerResponse(exchange)); + FizzServerHttpRequestDecorator clientReq = (FizzServerHttpRequestDecorator) exchange.getRequest(); + return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getRawBody()).flatMap(genServerResponse(exchange)); } private Function> genServerResponse(ServerWebExchange exchange) { @@ -180,7 +175,8 @@ public class RouteFilter extends FizzWebFilter { private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) { - DataBuffer b = WebUtils.getRequestBody(exchange); + FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); + DataBuffer b = req.getRawBody(); HashMap parameters = null; String json = Constants.Symbol.EMPTY; if (b != null) { diff --git a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java index b3ba1e3..2f7c420 100644 --- a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java +++ b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java @@ -34,9 +34,9 @@ import java.util.Map; public final class FizzPluginFilterChain { - private static final String pluginConfigsIt = "pcsit"; + private static final String pluginConfigsIt = "@pcsit"; - public static final String WEB_FILTER_CHAIN = "wfc"; + public static final String WEB_FILTER_CHAIN = "@wfc"; private FizzPluginFilterChain() { } @@ -69,7 +69,7 @@ public final class FizzPluginFilterChain { break; } } - if (!it.hasNext() && !f) { + if (!f && !it.hasNext()) { WebFilterChain chain = (WebFilterChain) attris.get(WEB_FILTER_CHAIN); m = m.defaultIfEmpty(ReactorUtils.NULL).flatMap( v -> { @@ -80,7 +80,6 @@ public final class FizzPluginFilterChain { } return m; } else { - // attris.remove(pluginFilterConfigsIt); WebFilterChain chain = (WebFilterChain) attris.get(WEB_FILTER_CHAIN); return chain.filter(exchange); } diff --git a/fizz-core/src/main/java/we/proxy/FizzWebClient.java b/fizz-core/src/main/java/we/proxy/FizzWebClient.java index ce82d32..d63ce47 100644 --- a/fizz-core/src/main/java/we/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/we/proxy/FizzWebClient.java @@ -21,7 +21,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; @@ -35,7 +34,9 @@ import reactor.core.publisher.Mono; import we.config.ProxyWebClientConfig; import we.config.SystemConfig; import we.flume.clients.log4j2appender.LogService; -import we.util.*; +import we.util.Constants; +import we.util.ThreadContext; +import we.util.WebUtils; import javax.annotation.Resource; import java.time.Duration; @@ -116,8 +117,6 @@ public class FizzWebClient { } ); - boolean b = false; - DataBuffer d = null; if (body != null) { if (body instanceof BodyInserter) { req.body((BodyInserter) body); @@ -125,16 +124,9 @@ public class FizzWebClient { Flux db = (Flux) body; req.body(BodyInserters.fromDataBuffers(db)); } else { - if (body instanceof ConvertedRequestBodyDataBufferWrapper) { - d = ((ConvertedRequestBodyDataBufferWrapper) body).body; - body = d; - b = true; - } req.bodyValue(body); } } - boolean finalB = b; - DataBuffer finalD = d; Mono cr = req.exchange(); if (timeout == 0) { @@ -146,16 +138,7 @@ public class FizzWebClient { cr = cr.timeout(Duration.ofMillis(timeout)); } - return cr.doFinally( - s -> { - if (finalB) { - boolean release = NettyDataBufferUtils.release(clientReqId, finalD); - if (log.isDebugEnabled()) { - log.debug("release converted request body databuffer " + release, LogService.BIZ_ID, clientReqId); - } - } - } - ); + return cr; } private void setHostHeader(String uri, HttpHeaders headers) { diff --git a/fizz-core/src/main/java/we/util/WebUtils.java b/fizz-core/src/main/java/we/util/WebUtils.java index 41a7972..e61a63a 100644 --- a/fizz-core/src/main/java/we/util/WebUtils.java +++ b/fizz-core/src/main/java/we/util/WebUtils.java @@ -17,13 +17,9 @@ package we.util; -import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -101,43 +97,6 @@ public abstract class WebUtils { public static Set LOG_HEADER_SET = Collections.EMPTY_SET; - public static final DataBuffer EMPTY_BODY = NettyDataBufferUtils.from(Constants.Symbol.EMPTY); - - public static final String REQUEST_BODY = "@rb"; - - public static final String CONVERTED_REQUEST_BODY = "@crb"; - - - public static DataBuffer getRequestBody(ServerWebExchange exchange) { - return exchange.getAttribute(REQUEST_BODY); - } - - /** - * @param convertedRequestBody can be DataBuffer or String type - */ - public static void setConvertedRequestBody(ServerWebExchange exchange, Object convertedRequestBody) { - Object prev = exchange.getAttribute(CONVERTED_REQUEST_BODY); - if (prev instanceof ConvertedRequestBodyDataBufferWrapper) { - ConvertedRequestBodyDataBufferWrapper p = (ConvertedRequestBodyDataBufferWrapper) prev; - if (p.body != null) { - DataBufferUtils.release(p.body); - } - } - if (convertedRequestBody instanceof DataBuffer) { - DataBuffer d = (DataBuffer) convertedRequestBody; - DataBufferUtils.retain(d); - exchange.getAttributes().put(CONVERTED_REQUEST_BODY, new ConvertedRequestBodyDataBufferWrapper(d)); - } else { - exchange.getAttributes().put(CONVERTED_REQUEST_BODY, convertedRequestBody); - } - } - - /** - * @return result may be String or ConvertedRequestBodyDataBufferWrapper type - */ - public static Object getConvertedRequestBody(ServerWebExchange exchange) { - return exchange.getAttribute(CONVERTED_REQUEST_BODY); - } public static void setGatewayPrefix(String p) { gatewayPrefix = p;