From c796df2071925caeac30c5c643fb83641fa2fe41 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Fri, 13 Aug 2021 16:51:33 +0800 Subject: [PATCH] Add plugin to access request body #275 --- fizz-bootstrap/pom.xml | 3 +- fizz-common/pom.xml | 2 +- .../ext/FizzServerHttpRequestDecorator.java | 9 ++ fizz-core/pom.xml | 2 +- .../main/java/we/filter/AggregateFilter.java | 22 +++- .../main/java/we/filter/CallbackFilter.java | 44 +++++-- .../main/java/we/filter/PreprocessFilter.java | 22 +--- .../src/main/java/we/filter/RouteFilter.java | 113 +++++++++--------- .../plugin/requestbody/RequestBodyPlugin.java | 70 +++++++++++ fizz-core/src/main/java/we/util/WebUtils.java | 6 +- fizz-plugin/pom.xml | 2 +- fizz-spring-boot-starter/pom.xml | 2 +- pom.xml | 3 +- 13 files changed, 196 insertions(+), 104 deletions(-) create mode 100644 fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index f119d23..f55d660 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -12,7 +12,7 @@ com.fizzgate fizz-bootstrap - 2.2.4-SNAPSHOT + 2.2.4-beta1 1.8 @@ -32,6 +32,7 @@ 4.0.1 3.5.9 1.15 + 2.11.0 diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml index 2225981..492d019 100644 --- a/fizz-common/pom.xml +++ b/fizz-common/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.2.4-SNAPSHOT + 2.2.4-beta1 ../pom.xml 4.0.0 diff --git a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java index e98a051..a660e76 100644 --- a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java +++ b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java @@ -20,6 +20,7 @@ package we.spring.http.server.reactive.ext; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.PooledDataBuffer; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import reactor.core.publisher.Flux; @@ -33,10 +34,18 @@ import java.nio.charset.StandardCharsets; public class FizzServerHttpRequestDecorator extends ServerHttpRequestDecorator { + private HttpHeaders headers; + private Flux body = Flux.empty(); public FizzServerHttpRequestDecorator(ServerHttpRequest delegate) { super(delegate); + headers = HttpHeaders.writableHttpHeaders(delegate.getHeaders()); + } + + @Override + public HttpHeaders getHeaders() { + return headers; } public void setBody(DataBuffer body) { diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml index 298dd92..2b5cf48 100644 --- a/fizz-core/pom.xml +++ b/fizz-core/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.2.4-SNAPSHOT + 2.2.4-beta1 ../pom.xml 4.0.0 diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java index 5acdbc3..25f3a6a 100644 --- a/fizz-core/src/main/java/we/filter/AggregateFilter.java +++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java @@ -23,10 +23,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.multipart.FilePart; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; @@ -46,6 +48,7 @@ import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.MapUtil; +import we.util.NettyDataBufferUtils; import we.util.WebUtils; import javax.annotation.Resource; @@ -97,7 +100,7 @@ public class AggregateFilter implements WebFilter { } long start = System.currentTimeMillis(); - FizzServerHttpRequestDecorator request = (FizzServerHttpRequestDecorator) exchange.getRequest(); + ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse serverHttpResponse = exchange.getResponse(); String clientReqPathPrefix = WebUtils.getClientReqPathPrefix(exchange); @@ -162,12 +165,19 @@ public class AggregateFilter implements WebFilter { }); } else { if (HttpMethod.POST.name().equalsIgnoreCase(method)) { - DataBuffer buf = request.getRawBody(); - if (buf != null) { - clientInput.put("body", buf.toString(StandardCharsets.UTF_8)); - } + result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).flatMap(buf -> { + if (buf != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { + try { + clientInput.put("body", buf.toString(StandardCharsets.UTF_8)); + } finally { + DataBufferUtils.release(buf); + } + } + return pipeline.run(input, clientInput, traceId); + }); + } else { + result = pipeline.run(input, clientInput, traceId); } - result = pipeline.run(input, clientInput, traceId); } return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> { LogService.setBizId(traceId); diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java index 82d7ed4..40c733a 100644 --- a/fizz-core/src/main/java/we/filter/CallbackFilter.java +++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java @@ -23,6 +23,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -41,8 +43,8 @@ import we.plugin.auth.Receiver; import we.proxy.CallbackService; import we.proxy.DiscoveryClientUriSelector; import we.proxy.ServiceInstance; -import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.util.Constants; +import we.util.NettyDataBufferUtils; import we.util.ThreadContext; import we.util.WebUtils; @@ -88,16 +90,36 @@ public class CallbackFilter extends FizzWebFilter { ApiConfig ac = WebUtils.getApiConfig(exchange); if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { CallbackConfig cc = ac.callbackConfig; - FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); - DataBuffer body = req.getRawBody(); - HashMap service2instMap = getService2instMap(ac); - HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); - pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next()); - if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) { - return directResponse(exchange, cc); - } else { - return callbackService.requestBackends(exchange, headers, body, cc, service2instMap); - } + ServerHttpRequest req = exchange.getRequest(); + return + DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) + .flatMap( + b -> { + DataBuffer body = null; + if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { + if (b instanceof PooledDataBuffer) { + byte[] bytes = new byte[b.readableByteCount()]; + try { + b.read(bytes); + body = NettyDataBufferUtils.from(bytes); + } finally { + NettyDataBufferUtils.release(b); + } + } else { + body = b; + } + } + HashMap service2instMap = getService2instMap(ac); + HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); + pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next()); + if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) { + return directResponse(exchange, cc); + } else { + return callbackService.requestBackends(exchange, headers, body, cc, service2instMap); + } + } + ) + ; } return chain.filter(exchange); } diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java index 197a422..771373e 100644 --- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java +++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java @@ -21,7 +21,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; @@ -33,8 +32,6 @@ import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfigService; import we.plugin.auth.AuthPluginFilter; import we.plugin.stat.StatPluginFilter; -import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; -import we.util.NettyDataBufferUtils; import we.util.ReactorUtils; import we.util.WebUtils; @@ -72,23 +69,8 @@ public class PreprocessFilter extends FizzWebFilter { Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc); eas.put(WebUtils.APPEND_HEADERS, appendHdrs); - ServerHttpRequest req = exchange.getRequest(); - return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) - .flatMap( - body -> { - FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req); - if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { - try { - requestDecorator.setBody(body); - } finally { - NettyDataBufferUtils.release(body); - } - } - ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).build(); - Mono vm = statPluginFilter.filter(newExchange, null, null); - return process(newExchange, chain, eas, vm); - } - ); + Mono vm = statPluginFilter.filter(exchange, null, null); + return process(exchange, chain, eas, vm); } // TODO diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java index a7a7daf..36be31b 100644 --- a/fizz-core/src/main/java/we/filter/RouteFilter.java +++ b/fizz-core/src/main/java/we/filter/RouteFilter.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; -import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -39,11 +39,7 @@ import we.plugin.auth.ApiConfig; import we.proxy.FizzWebClient; import we.proxy.dubbo.ApacheDubboGenericService; import we.proxy.dubbo.DubboInterfaceDeclaration; -import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; -import we.util.Constants; -import we.util.JacksonUtils; -import we.util.ThreadContext; -import we.util.WebUtils; +import we.util.*; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; @@ -95,7 +91,7 @@ public class RouteFilter extends FizzWebFilter { private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) { - FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); + ServerHttpRequest req = exchange.getRequest(); String rid = req.getId(); ApiConfig ac = WebUtils.getApiConfig(exchange); HttpHeaders hdrs = null; @@ -115,7 +111,7 @@ public class RouteFilter extends FizzWebFilter { String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort()) .append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange)) .toString(); - return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getRawBody()).flatMap(genServerResponse(exchange)); + return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange)); } else if (ac.type == ApiConfig.Type.DUBBO) { return dubboRpc(exchange, ac); @@ -130,8 +126,8 @@ public class RouteFilter extends FizzWebFilter { } private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) { - FizzServerHttpRequestDecorator clientReq = (FizzServerHttpRequestDecorator) exchange.getRequest(); - return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getRawBody()).flatMap(genServerResponse(exchange)); + ServerHttpRequest clientReq = exchange.getRequest(); + return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); } private Function> genServerResponse(ServerWebExchange exchange) { @@ -174,53 +170,58 @@ public class RouteFilter extends FizzWebFilter { } private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) { + final String[] ls = {null}; + return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) + .flatMap( + b -> { + HashMap parameters = null; + if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { + String json = b.toString(StandardCharsets.UTF_8).trim(); + ls[0] = json; + NettyDataBufferUtils.release(b); + if (json.charAt(0) == '[') { + ArrayList lst = JacksonUtils.readValue(json, ArrayList.class); + parameters = new HashMap<>(); + for (int i = 0; i < lst.size(); i++) { + parameters.put("p" + (i + 1), lst.get(i)); + } + } else { + parameters = JacksonUtils.readValue(json, HashMap.class); + } + } - FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); - DataBuffer b = req.getRawBody(); - HashMap parameters = null; - String json = Constants.Symbol.EMPTY; - if (b != null) { - json = b.toString(StandardCharsets.UTF_8).trim(); - if (json.charAt(0) == '[') { - ArrayList lst = JacksonUtils.readValue(json, ArrayList.class); - parameters = new HashMap<>(); - for (int i = 0; i < lst.size(); i++) { - parameters.put("p" + (i + 1), lst.get(i)); - } - } else { - parameters = JacksonUtils.readValue(json, HashMap.class); - } - } - String finalJson = json; + DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration(); + declaration.setServiceName(ac.backendService); + declaration.setVersion(ac.rpcVersion); + declaration.setGroup(ac.rpcGroup); + declaration.setMethod(ac.rpcMethod); + declaration.setParameterTypes(ac.rpcParamTypes); + int t = 20_000; + if (ac.timeout != 0) { + t = (int) ac.timeout; + } + declaration.setTimeout(t); - DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration(); - declaration.setServiceName(ac.backendService); - declaration.setVersion(ac.rpcVersion); - declaration.setGroup(ac.rpcGroup); - declaration.setMethod(ac.rpcMethod); - declaration.setParameterTypes(ac.rpcParamTypes); - int t = 20_000; - if (ac.timeout != 0) { - t = (int) ac.timeout; - } - declaration.setTimeout(t); - - Map attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange)); - return dubboGenericService.send(parameters, declaration, attachments) - .flatMap( - dubboRpcResponseBody -> { - Mono m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody)); - return m; - } - ) - .doOnError( - e -> { - StringBuilder sb = ThreadContext.getStringBuilder(); - WebUtils.request2stringBuilder(exchange, sb); - sb.append('\n').append(finalJson); - log.error(sb.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), e); - } - ) - ; + Map attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange)); + return dubboGenericService.send(parameters, declaration, attachments); + } + ) + .flatMap( + dubboRpcResponseBody -> { + Mono m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody)); + return m; + } + ) + .doOnError( + t -> { + StringBuilder b = ThreadContext.getStringBuilder(); + WebUtils.request2stringBuilder(exchange, b); + if (ls[0] != null) { + b.append('\n').append(ls[0]); + } + log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t); + } + ) + ; } } diff --git a/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java new file mode 100644 index 0000000..ed0d23e --- /dev/null +++ b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.plugin.requestbody; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.stereotype.Component; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import we.flume.clients.log4j2appender.LogService; +import we.plugin.FizzPluginFilter; +import we.plugin.FizzPluginFilterChain; +import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; +import we.util.NettyDataBufferUtils; + +import java.util.Map; + +/** + * @author hongqiaowei + */ + +@Component(RequestBodyPlugin.REQUEST_BODY_PLUGIN) +public class RequestBodyPlugin implements FizzPluginFilter { + + private static final Logger log = LoggerFactory.getLogger(RequestBodyPlugin.class); + + public static final String REQUEST_BODY_PLUGIN = "requestBodyPlugin"; + + @Override + public Mono filter(ServerWebExchange exchange, Map config) { + + ServerHttpRequest req = exchange.getRequest(); + return + NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) + .flatMap( + body -> { + ServerWebExchange newExchange = exchange; + if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { + FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req); + try { + requestDecorator.setBody(body); + } finally { + NettyDataBufferUtils.release(body); + } + newExchange = exchange.mutate().request(requestDecorator).build(); + if (log.isDebugEnabled()) { + log.debug("retain body", LogService.BIZ_ID, req.getId()); + } + } + return FizzPluginFilterChain.next(newExchange); + } + ); + } +} diff --git a/fizz-core/src/main/java/we/util/WebUtils.java b/fizz-core/src/main/java/we/util/WebUtils.java index e61a63a..a0a2781 100644 --- a/fizz-core/src/main/java/we/util/WebUtils.java +++ b/fizz-core/src/main/java/we/util/WebUtils.java @@ -377,14 +377,10 @@ public abstract class WebUtils { if (appendHeaders.isEmpty()) { return req.getHeaders(); } - boolean b = appendHeaders.containsKey(HttpHeaders.CONTENT_TYPE); HttpHeaders hdrs = new HttpHeaders(); req.getHeaders().forEach( (h, vs) -> { - if (b && h.equals(HttpHeaders.CONTENT_TYPE)) { - } else { - hdrs.addAll(h, vs); - } + hdrs.addAll(h, vs); } ); appendHeaders.forEach( diff --git a/fizz-plugin/pom.xml b/fizz-plugin/pom.xml index e41a364..7aae6b4 100644 --- a/fizz-plugin/pom.xml +++ b/fizz-plugin/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.2.4-SNAPSHOT + 2.2.4-beta1 ../pom.xml 4.0.0 diff --git a/fizz-spring-boot-starter/pom.xml b/fizz-spring-boot-starter/pom.xml index ca7f550..224d290 100644 --- a/fizz-spring-boot-starter/pom.xml +++ b/fizz-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.2.4-SNAPSHOT + 2.2.4-beta1 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 1c97a0a..1cd2759 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 3.5.9 0.8.2 0.9.11 + 2.11.0 @@ -33,7 +34,7 @@ fizz-gateway-community ${project.artifactId} fizz gateway community - 2.2.4-SNAPSHOT + 2.2.4-beta1 pom fizz-common