Add plugin to access request body #275
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<version>2.2.4-beta1</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
@@ -23,10 +23,12 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.multipart.FilePart;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
@@ -46,6 +48,7 @@ import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.auth.ApiConfig;
|
||||
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
|
||||
import we.util.MapUtil;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@@ -97,7 +100,7 @@ public class AggregateFilter implements WebFilter {
|
||||
}
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
FizzServerHttpRequestDecorator request = (FizzServerHttpRequestDecorator) exchange.getRequest();
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
ServerHttpResponse serverHttpResponse = exchange.getResponse();
|
||||
|
||||
String clientReqPathPrefix = WebUtils.getClientReqPathPrefix(exchange);
|
||||
@@ -162,12 +165,19 @@ public class AggregateFilter implements WebFilter {
|
||||
});
|
||||
} else {
|
||||
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
|
||||
DataBuffer buf = request.getRawBody();
|
||||
if (buf != null) {
|
||||
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
|
||||
}
|
||||
result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).flatMap(buf -> {
|
||||
if (buf != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
|
||||
try {
|
||||
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
|
||||
} finally {
|
||||
DataBufferUtils.release(buf);
|
||||
}
|
||||
}
|
||||
return pipeline.run(input, clientInput, traceId);
|
||||
});
|
||||
} else {
|
||||
result = pipeline.run(input, clientInput, traceId);
|
||||
}
|
||||
result = pipeline.run(input, clientInput, traceId);
|
||||
}
|
||||
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
|
||||
LogService.setBizId(traceId);
|
||||
|
||||
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
@@ -41,8 +43,8 @@ import we.plugin.auth.Receiver;
|
||||
import we.proxy.CallbackService;
|
||||
import we.proxy.DiscoveryClientUriSelector;
|
||||
import we.proxy.ServiceInstance;
|
||||
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
|
||||
import we.util.Constants;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.WebUtils;
|
||||
|
||||
@@ -88,16 +90,36 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
ApiConfig ac = WebUtils.getApiConfig(exchange);
|
||||
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
|
||||
CallbackConfig cc = ac.callbackConfig;
|
||||
FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
|
||||
DataBuffer body = req.getRawBody();
|
||||
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
|
||||
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
|
||||
pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
|
||||
if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
|
||||
return directResponse(exchange, cc);
|
||||
} else {
|
||||
return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
|
||||
}
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
return
|
||||
DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
|
||||
.flatMap(
|
||||
b -> {
|
||||
DataBuffer body = null;
|
||||
if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
|
||||
if (b instanceof PooledDataBuffer) {
|
||||
byte[] bytes = new byte[b.readableByteCount()];
|
||||
try {
|
||||
b.read(bytes);
|
||||
body = NettyDataBufferUtils.from(bytes);
|
||||
} finally {
|
||||
NettyDataBufferUtils.release(b);
|
||||
}
|
||||
} else {
|
||||
body = b;
|
||||
}
|
||||
}
|
||||
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
|
||||
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
|
||||
pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
|
||||
if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
|
||||
return directResponse(exchange, cc);
|
||||
} else {
|
||||
return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
|
||||
}
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
@@ -33,8 +32,6 @@ import we.plugin.auth.ApiConfig;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.AuthPluginFilter;
|
||||
import we.plugin.stat.StatPluginFilter;
|
||||
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
import we.util.ReactorUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
@@ -72,23 +69,8 @@ public class PreprocessFilter extends FizzWebFilter {
|
||||
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
|
||||
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
|
||||
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
|
||||
.flatMap(
|
||||
body -> {
|
||||
FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
|
||||
if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
|
||||
try {
|
||||
requestDecorator.setBody(body);
|
||||
} finally {
|
||||
NettyDataBufferUtils.release(body);
|
||||
}
|
||||
}
|
||||
ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).build();
|
||||
Mono vm = statPluginFilter.filter(newExchange, null, null);
|
||||
return process(newExchange, chain, eas, vm);
|
||||
}
|
||||
);
|
||||
Mono vm = statPluginFilter.filter(exchange, null, null);
|
||||
return process(exchange, chain, eas, vm);
|
||||
}
|
||||
|
||||
// TODO
|
||||
|
||||
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
@@ -39,11 +39,7 @@ import we.plugin.auth.ApiConfig;
|
||||
import we.proxy.FizzWebClient;
|
||||
import we.proxy.dubbo.ApacheDubboGenericService;
|
||||
import we.proxy.dubbo.DubboInterfaceDeclaration;
|
||||
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
|
||||
import we.util.Constants;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.WebUtils;
|
||||
import we.util.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -95,7 +91,7 @@ public class RouteFilter extends FizzWebFilter {
|
||||
|
||||
private Mono<Void> doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
|
||||
FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
String rid = req.getId();
|
||||
ApiConfig ac = WebUtils.getApiConfig(exchange);
|
||||
HttpHeaders hdrs = null;
|
||||
@@ -115,7 +111,7 @@ public class RouteFilter extends FizzWebFilter {
|
||||
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
|
||||
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
|
||||
.toString();
|
||||
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getRawBody()).flatMap(genServerResponse(exchange));
|
||||
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
|
||||
|
||||
} else if (ac.type == ApiConfig.Type.DUBBO) {
|
||||
return dubboRpc(exchange, ac);
|
||||
@@ -130,8 +126,8 @@ public class RouteFilter extends FizzWebFilter {
|
||||
}
|
||||
|
||||
private Mono<Void> send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
|
||||
FizzServerHttpRequestDecorator clientReq = (FizzServerHttpRequestDecorator) exchange.getRequest();
|
||||
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getRawBody()).flatMap(genServerResponse(exchange));
|
||||
ServerHttpRequest clientReq = exchange.getRequest();
|
||||
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
|
||||
}
|
||||
|
||||
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
|
||||
@@ -174,53 +170,58 @@ public class RouteFilter extends FizzWebFilter {
|
||||
}
|
||||
|
||||
private Mono<Void> dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
|
||||
final String[] ls = {null};
|
||||
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
|
||||
.flatMap(
|
||||
b -> {
|
||||
HashMap<String, Object> parameters = null;
|
||||
if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
|
||||
String json = b.toString(StandardCharsets.UTF_8).trim();
|
||||
ls[0] = json;
|
||||
NettyDataBufferUtils.release(b);
|
||||
if (json.charAt(0) == '[') {
|
||||
ArrayList<Object> lst = JacksonUtils.readValue(json, ArrayList.class);
|
||||
parameters = new HashMap<>();
|
||||
for (int i = 0; i < lst.size(); i++) {
|
||||
parameters.put("p" + (i + 1), lst.get(i));
|
||||
}
|
||||
} else {
|
||||
parameters = JacksonUtils.readValue(json, HashMap.class);
|
||||
}
|
||||
}
|
||||
|
||||
FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
|
||||
DataBuffer b = req.getRawBody();
|
||||
HashMap<String, Object> parameters = null;
|
||||
String json = Constants.Symbol.EMPTY;
|
||||
if (b != null) {
|
||||
json = b.toString(StandardCharsets.UTF_8).trim();
|
||||
if (json.charAt(0) == '[') {
|
||||
ArrayList<Object> lst = JacksonUtils.readValue(json, ArrayList.class);
|
||||
parameters = new HashMap<>();
|
||||
for (int i = 0; i < lst.size(); i++) {
|
||||
parameters.put("p" + (i + 1), lst.get(i));
|
||||
}
|
||||
} else {
|
||||
parameters = JacksonUtils.readValue(json, HashMap.class);
|
||||
}
|
||||
}
|
||||
String finalJson = json;
|
||||
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
|
||||
declaration.setServiceName(ac.backendService);
|
||||
declaration.setVersion(ac.rpcVersion);
|
||||
declaration.setGroup(ac.rpcGroup);
|
||||
declaration.setMethod(ac.rpcMethod);
|
||||
declaration.setParameterTypes(ac.rpcParamTypes);
|
||||
int t = 20_000;
|
||||
if (ac.timeout != 0) {
|
||||
t = (int) ac.timeout;
|
||||
}
|
||||
declaration.setTimeout(t);
|
||||
|
||||
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
|
||||
declaration.setServiceName(ac.backendService);
|
||||
declaration.setVersion(ac.rpcVersion);
|
||||
declaration.setGroup(ac.rpcGroup);
|
||||
declaration.setMethod(ac.rpcMethod);
|
||||
declaration.setParameterTypes(ac.rpcParamTypes);
|
||||
int t = 20_000;
|
||||
if (ac.timeout != 0) {
|
||||
t = (int) ac.timeout;
|
||||
}
|
||||
declaration.setTimeout(t);
|
||||
|
||||
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
|
||||
return dubboGenericService.send(parameters, declaration, attachments)
|
||||
.flatMap(
|
||||
dubboRpcResponseBody -> {
|
||||
Mono<Void> m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody));
|
||||
return m;
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
e -> {
|
||||
StringBuilder sb = ThreadContext.getStringBuilder();
|
||||
WebUtils.request2stringBuilder(exchange, sb);
|
||||
sb.append('\n').append(finalJson);
|
||||
log.error(sb.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), e);
|
||||
}
|
||||
)
|
||||
;
|
||||
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
|
||||
return dubboGenericService.send(parameters, declaration, attachments);
|
||||
}
|
||||
)
|
||||
.flatMap(
|
||||
dubboRpcResponseBody -> {
|
||||
Mono<Void> m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody));
|
||||
return m;
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
t -> {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
WebUtils.request2stringBuilder(exchange, b);
|
||||
if (ls[0] != null) {
|
||||
b.append('\n').append(ls[0]);
|
||||
}
|
||||
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.plugin.requestbody;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.FizzPluginFilter;
|
||||
import we.plugin.FizzPluginFilterChain;
|
||||
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
@Component(RequestBodyPlugin.REQUEST_BODY_PLUGIN)
|
||||
public class RequestBodyPlugin implements FizzPluginFilter {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RequestBodyPlugin.class);
|
||||
|
||||
public static final String REQUEST_BODY_PLUGIN = "requestBodyPlugin";
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
|
||||
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
return
|
||||
NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
|
||||
.flatMap(
|
||||
body -> {
|
||||
ServerWebExchange newExchange = exchange;
|
||||
if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
|
||||
FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
|
||||
try {
|
||||
requestDecorator.setBody(body);
|
||||
} finally {
|
||||
NettyDataBufferUtils.release(body);
|
||||
}
|
||||
newExchange = exchange.mutate().request(requestDecorator).build();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("retain body", LogService.BIZ_ID, req.getId());
|
||||
}
|
||||
}
|
||||
return FizzPluginFilterChain.next(newExchange);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -377,14 +377,10 @@ public abstract class WebUtils {
|
||||
if (appendHeaders.isEmpty()) {
|
||||
return req.getHeaders();
|
||||
}
|
||||
boolean b = appendHeaders.containsKey(HttpHeaders.CONTENT_TYPE);
|
||||
HttpHeaders hdrs = new HttpHeaders();
|
||||
req.getHeaders().forEach(
|
||||
(h, vs) -> {
|
||||
if (b && h.equals(HttpHeaders.CONTENT_TYPE)) {
|
||||
} else {
|
||||
hdrs.addAll(h, vs);
|
||||
}
|
||||
hdrs.addAll(h, vs);
|
||||
}
|
||||
);
|
||||
appendHeaders.forEach(
|
||||
|
||||
Reference in New Issue
Block a user