Support request body access in plugin #261
This commit is contained in:
@@ -12,13 +12,13 @@
|
||||
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<artifactId>fizz-bootstrap</artifactId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
|
||||
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR21</reactor-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR22</reactor-bom.version>
|
||||
<lettuce.version>5.3.7.RELEASE</lettuce.version>
|
||||
<netty.version>4.1.66.Final</netty.version>
|
||||
<httpcore.version>4.4.14</httpcore.version>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.util;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class ConvertedRequestBodyDataBufferWrapper {
|
||||
|
||||
public DataBuffer body;
|
||||
|
||||
public ConvertedRequestBodyDataBufferWrapper(DataBuffer body) {
|
||||
this.body = body;
|
||||
}
|
||||
}
|
||||
80
fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
Normal file
80
fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
Normal file
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.util;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public abstract class NettyDataBufferUtils extends org.springframework.core.io.buffer.DataBufferUtils {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(NettyDataBufferUtils.class);
|
||||
|
||||
private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
|
||||
|
||||
public static NettyDataBuffer from(String s) {
|
||||
return from(s.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public static NettyDataBuffer from(byte[] bytes) {
|
||||
return (NettyDataBuffer) dataBufferFactory.wrap(bytes);
|
||||
}
|
||||
|
||||
public static NettyDataBuffer from(ByteBuffer byteBuffer) {
|
||||
return dataBufferFactory.wrap(byteBuffer);
|
||||
}
|
||||
|
||||
public static NettyDataBuffer from(ByteBuf byteBuf) {
|
||||
return dataBufferFactory.wrap(byteBuf);
|
||||
}
|
||||
|
||||
public static boolean release(@Nullable String reqId, @Nullable DataBuffer dataBuffer) {
|
||||
if (dataBuffer instanceof PooledDataBuffer) {
|
||||
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
|
||||
if (pooledDataBuffer.isAllocated()) {
|
||||
if (pooledDataBuffer instanceof NettyDataBuffer) {
|
||||
NettyDataBuffer ndb = (NettyDataBuffer) pooledDataBuffer;
|
||||
ByteBuf nativeBuffer = ndb.getNativeBuffer();
|
||||
int refCnt = nativeBuffer.refCnt();
|
||||
if (refCnt < 1) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, reqId);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return pooledDataBuffer.release();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
@@ -61,6 +61,7 @@ import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.auth.ApiConfig;
|
||||
import we.util.Constants;
|
||||
import we.util.MapUtil;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
/**
|
||||
@@ -72,8 +73,6 @@ public class AggregateFilter implements WebFilter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AggregateFilter.class);
|
||||
|
||||
private static final DataBuffer emptyBody = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes());
|
||||
|
||||
@Resource
|
||||
private ConfigLoader configLoader;
|
||||
|
||||
@@ -171,19 +170,12 @@ public class AggregateFilter implements WebFilter {
|
||||
});
|
||||
} else {
|
||||
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
|
||||
result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> {
|
||||
if (buf != null && buf != emptyBody) {
|
||||
try {
|
||||
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
|
||||
} finally {
|
||||
DataBufferUtils.release(buf);
|
||||
}
|
||||
}
|
||||
return pipeline.run(input, clientInput, traceId);
|
||||
});
|
||||
} else {
|
||||
result = pipeline.run(input, clientInput, traceId);
|
||||
DataBuffer buf = WebUtils.getRequestBody(exchange);
|
||||
if (buf != null) {
|
||||
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
result = pipeline.run(input, clientInput, traceId);
|
||||
}
|
||||
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
|
||||
LogService.setBizId(traceId);
|
||||
|
||||
@@ -18,14 +18,11 @@
|
||||
package we.filter;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
@@ -90,33 +87,15 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
ApiConfig ac = WebUtils.getApiConfig(exchange);
|
||||
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
|
||||
CallbackConfig cc = ac.callbackConfig;
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
DataBuffer[] body = {null};
|
||||
return
|
||||
DataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
|
||||
.flatMap(
|
||||
b -> {
|
||||
if (b != WebUtils.EMPTY_BODY) {
|
||||
body[0] = b;
|
||||
}
|
||||
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
|
||||
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
|
||||
pushReq2manager(exchange, headers, body[0], service2instMap, cc.id, ac.gatewayGroups.iterator().next());
|
||||
if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
|
||||
return directResponse(exchange, cc);
|
||||
} else {
|
||||
return callbackService.requestBackends(exchange, headers, body[0], cc, service2instMap);
|
||||
}
|
||||
}
|
||||
)
|
||||
.doFinally(
|
||||
s -> {
|
||||
if (body[0] != null) {
|
||||
DataBufferUtils.release(body[0]);
|
||||
}
|
||||
}
|
||||
)
|
||||
;
|
||||
DataBuffer body = WebUtils.getRequestBody(exchange);
|
||||
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
|
||||
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
|
||||
pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
|
||||
if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
|
||||
return directResponse(exchange, cc);
|
||||
} else {
|
||||
return callbackService.requestBackends(exchange, headers, body, cc, service2instMap);
|
||||
}
|
||||
}
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
@@ -35,9 +36,7 @@ import we.exception.StopAndResponseException;
|
||||
import we.fizz.exception.FizzRuntimeException;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.legacy.RespEntity;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.WebUtils;
|
||||
import we.util.*;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
@@ -54,7 +53,7 @@ public class FilterExceptionHandlerConfig {
|
||||
@Override
|
||||
public Mono<Void> handle(ServerWebExchange exchange, Throwable t) {
|
||||
ServerHttpResponse resp = exchange.getResponse();
|
||||
if (t instanceof StopAndResponseException) {
|
||||
if (t instanceof StopAndResponseException) {
|
||||
StopAndResponseException ex = (StopAndResponseException) t;
|
||||
if (ex.getData() != null) {
|
||||
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
|
||||
@@ -83,8 +82,8 @@ public class FilterExceptionHandlerConfig {
|
||||
}
|
||||
}
|
||||
if (t instanceof FizzRuntimeException) {
|
||||
FizzRuntimeException ex = (FizzRuntimeException) t;
|
||||
log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
|
||||
FizzRuntimeException ex = (FizzRuntimeException) t;
|
||||
log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
|
||||
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
|
||||
RespEntity rs = null;
|
||||
String reqId = exchange.getRequest().getId();
|
||||
|
||||
@@ -17,12 +17,24 @@
|
||||
|
||||
package we.filter;
|
||||
|
||||
import com.google.common.collect.BoundType;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.FixedPluginFilter;
|
||||
import we.plugin.FizzPluginFilterChain;
|
||||
import we.plugin.PluginFilter;
|
||||
@@ -30,10 +42,14 @@ import we.plugin.auth.ApiConfig;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.AuthPluginFilter;
|
||||
import we.plugin.stat.StatPluginFilter;
|
||||
import we.util.ConvertedRequestBodyDataBufferWrapper;
|
||||
import we.util.NettyDataBufferUtils;
|
||||
import we.util.ReactorUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -47,6 +63,8 @@ import java.util.function.Function;
|
||||
@Order(10)
|
||||
public class PreprocessFilter extends FizzWebFilter {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PreprocessFilter.class);
|
||||
|
||||
public static final String PREPROCESS_FILTER = "preprocessFilter";
|
||||
|
||||
private static final FilterResult succFr = FilterResult.SUCCESS(PREPROCESS_FILTER);
|
||||
@@ -65,7 +83,42 @@ public class PreprocessFilter extends FizzWebFilter {
|
||||
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
|
||||
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
|
||||
|
||||
Mono vm = statPluginFilter.filter(exchange, null, null);
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
return NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
|
||||
.flatMap(
|
||||
body -> {
|
||||
if (body != WebUtils.EMPTY_BODY && body.readableByteCount() > 0) {
|
||||
try {
|
||||
byte[] bytes = new byte[body.readableByteCount()];
|
||||
body.read(bytes);
|
||||
DataBuffer retain = NettyDataBufferUtils.from(bytes);
|
||||
eas.put(WebUtils.REQUEST_BODY, retain);
|
||||
} finally {
|
||||
NettyDataBufferUtils.release(body);
|
||||
}
|
||||
}
|
||||
Mono vm = statPluginFilter.filter(exchange, null, null);
|
||||
return process(exchange, chain, eas, vm);
|
||||
}
|
||||
)
|
||||
.doFinally(
|
||||
s -> {
|
||||
Object convertedRequestBody = WebUtils.getConvertedRequestBody(exchange);
|
||||
if (convertedRequestBody instanceof ConvertedRequestBodyDataBufferWrapper) {
|
||||
DataBuffer b = ((ConvertedRequestBodyDataBufferWrapper) convertedRequestBody).body;
|
||||
if (b != null) {
|
||||
boolean release = NettyDataBufferUtils.release(req.getId(), b);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("release converted request body databuffer " + release, LogService.BIZ_ID, req.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// TODO
|
||||
private Mono<Void> process(ServerWebExchange exchange, WebFilterChain chain, Map<String, Object> eas, Mono vm) {
|
||||
return chain(exchange, vm, authPluginFilter).defaultIfEmpty(ReactorUtils.NULL)
|
||||
.flatMap(
|
||||
v -> {
|
||||
|
||||
@@ -112,7 +112,11 @@ public class RouteFilter extends FizzWebFilter {
|
||||
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
|
||||
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
|
||||
.toString();
|
||||
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
|
||||
Object requestBody = WebUtils.getConvertedRequestBody(exchange);
|
||||
if (requestBody == null) {
|
||||
requestBody = WebUtils.getRequestBody(exchange);
|
||||
}
|
||||
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, requestBody).flatMap(genServerResponse(exchange));
|
||||
|
||||
} else if (ac.type == ApiConfig.Type.DUBBO) {
|
||||
return dubboRpc(exchange, ac);
|
||||
@@ -128,7 +132,11 @@ public class RouteFilter extends FizzWebFilter {
|
||||
|
||||
private Mono<Void> send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
|
||||
ServerHttpRequest clientReq = exchange.getRequest();
|
||||
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
|
||||
Object requestBody = WebUtils.getConvertedRequestBody(exchange);
|
||||
if (requestBody == null) {
|
||||
requestBody = WebUtils.getRequestBody(exchange);
|
||||
}
|
||||
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, requestBody).flatMap(genServerResponse(exchange));
|
||||
}
|
||||
|
||||
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
|
||||
@@ -171,64 +179,52 @@ public class RouteFilter extends FizzWebFilter {
|
||||
}
|
||||
|
||||
private Mono<Void> dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
|
||||
DataBuffer[] body = {null};
|
||||
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
|
||||
.flatMap(
|
||||
b -> {
|
||||
HashMap<String, Object> parameters = null;
|
||||
if (b != WebUtils.EMPTY_BODY) {
|
||||
body[0] = b;
|
||||
String json = body[0].toString(StandardCharsets.UTF_8).trim();
|
||||
if (json.charAt(0) == '[') {
|
||||
ArrayList<Object> lst = JacksonUtils.readValue(json, ArrayList.class);
|
||||
parameters = new HashMap<>();
|
||||
for (int i = 0; i < lst.size(); i++) {
|
||||
parameters.put("p" + (i + 1), lst.get(i));
|
||||
}
|
||||
} else {
|
||||
parameters = JacksonUtils.readValue(json, HashMap.class);
|
||||
}
|
||||
}
|
||||
|
||||
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
|
||||
declaration.setServiceName(ac.backendService);
|
||||
declaration.setVersion(ac.rpcVersion);
|
||||
declaration.setGroup(ac.rpcGroup);
|
||||
declaration.setMethod(ac.rpcMethod);
|
||||
declaration.setParameterTypes(ac.rpcParamTypes);
|
||||
int t = 20_000;
|
||||
if (ac.timeout != 0) {
|
||||
t = (int) ac.timeout;
|
||||
}
|
||||
declaration.setTimeout(t);
|
||||
DataBuffer b = WebUtils.getRequestBody(exchange);
|
||||
HashMap<String, Object> parameters = null;
|
||||
String json = Constants.Symbol.EMPTY;
|
||||
if (b != null) {
|
||||
json = b.toString(StandardCharsets.UTF_8).trim();
|
||||
if (json.charAt(0) == '[') {
|
||||
ArrayList<Object> lst = JacksonUtils.readValue(json, ArrayList.class);
|
||||
parameters = new HashMap<>();
|
||||
for (int i = 0; i < lst.size(); i++) {
|
||||
parameters.put("p" + (i + 1), lst.get(i));
|
||||
}
|
||||
} else {
|
||||
parameters = JacksonUtils.readValue(json, HashMap.class);
|
||||
}
|
||||
}
|
||||
String finalJson = json;
|
||||
|
||||
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
|
||||
return dubboGenericService.send(parameters, declaration, attachments);
|
||||
}
|
||||
)
|
||||
.flatMap(
|
||||
dubboRpcResponseBody -> {
|
||||
Mono<Void> m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody));
|
||||
return m;
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
t -> {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
WebUtils.request2stringBuilder(exchange, b);
|
||||
if (body[0] != null) {
|
||||
b.append('\n').append(body[0].toString(StandardCharsets.UTF_8));
|
||||
}
|
||||
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
|
||||
}
|
||||
)
|
||||
.doFinally(
|
||||
s -> {
|
||||
if (body[0] != null) {
|
||||
DataBufferUtils.release(body[0]);
|
||||
}
|
||||
}
|
||||
)
|
||||
;
|
||||
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
|
||||
declaration.setServiceName(ac.backendService);
|
||||
declaration.setVersion(ac.rpcVersion);
|
||||
declaration.setGroup(ac.rpcGroup);
|
||||
declaration.setMethod(ac.rpcMethod);
|
||||
declaration.setParameterTypes(ac.rpcParamTypes);
|
||||
int t = 20_000;
|
||||
if (ac.timeout != 0) {
|
||||
t = (int) ac.timeout;
|
||||
}
|
||||
declaration.setTimeout(t);
|
||||
|
||||
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
|
||||
return dubboGenericService.send(parameters, declaration, attachments)
|
||||
.flatMap(
|
||||
dubboRpcResponseBody -> {
|
||||
Mono<Void> m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody));
|
||||
return m;
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
e -> {
|
||||
StringBuilder sb = ThreadContext.getStringBuilder();
|
||||
WebUtils.request2stringBuilder(exchange, sb);
|
||||
sb.append('\n').append(finalJson);
|
||||
log.error(sb.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), e);
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,19 +296,18 @@ public class ApiConfigService {
|
||||
// TODO: improve ...
|
||||
private Mono<Object> canAccess(ServerWebExchange exchange, String app, String ip, String timestamp, String sign, String service, HttpMethod method, String path) {
|
||||
|
||||
if (!systemConfig.isAggregateTestAuth()) {
|
||||
if (SystemConfig.DEFAULT_GATEWAY_TEST_PREFIX0.equals(WebUtils.getClientReqPathPrefix(exchange))) {
|
||||
return Mono.just(Access.YES);
|
||||
}
|
||||
}
|
||||
|
||||
ApiConfig ac = getApiConfig(app, service, method, path);
|
||||
if (ac == null) {
|
||||
String authMsg = (String) ThreadContext.remove(AUTH_MSG);
|
||||
if (authMsg == null) {
|
||||
authMsg = deny;
|
||||
}
|
||||
if (SystemConfig.DEFAULT_GATEWAY_TEST_PREFIX0.equals(WebUtils.getClientReqPathPrefix(exchange))) {
|
||||
if (systemConfig.isAggregateTestAuth()) {
|
||||
return logAndResult(authMsg);
|
||||
} else {
|
||||
return Mono.just(Access.YES);
|
||||
}
|
||||
}
|
||||
if (!apiConfigServiceProperties.isNeedAuth()) {
|
||||
return Mono.just(Access.YES);
|
||||
} else {
|
||||
|
||||
@@ -78,7 +78,7 @@ public class CallbackService {
|
||||
aggrConfigPrefix = systemConfig.getGatewayPrefix() + '/';
|
||||
}
|
||||
|
||||
public Mono<? extends Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map<String, ServiceInstance> service2instMap) {
|
||||
public Mono<Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map<String, ServiceInstance> service2instMap) {
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
String reqId = req.getId();
|
||||
HttpMethod method = req.getMethod();
|
||||
@@ -172,7 +172,7 @@ public class CallbackService {
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
private Mono<? extends Void> genServerResponse(ServerWebExchange exchange, ClientResponse remoteResp) {
|
||||
private Mono<Void> genServerResponse(ServerWebExchange exchange, ClientResponse remoteResp) {
|
||||
ServerHttpResponse clientResp = exchange.getResponse();
|
||||
clientResp.setStatusCode(remoteResp.statusCode());
|
||||
HttpHeaders clientRespHeaders = clientResp.getHeaders();
|
||||
|
||||
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -34,9 +35,7 @@ import reactor.core.publisher.Mono;
|
||||
import we.config.ProxyWebClientConfig;
|
||||
import we.config.SystemConfig;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.util.Constants;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.WebUtils;
|
||||
import we.util.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
@@ -117,19 +116,25 @@ public class FizzWebClient {
|
||||
}
|
||||
);
|
||||
|
||||
boolean b = false;
|
||||
DataBuffer d = null;
|
||||
if (body != null) {
|
||||
if (body instanceof BodyInserter) {
|
||||
req.body((BodyInserter) body);
|
||||
} else if (body instanceof Flux) {
|
||||
Flux<DataBuffer> db = (Flux<DataBuffer>) body;
|
||||
req.body(BodyInserters.fromDataBuffers(db));
|
||||
} else if (body instanceof String) {
|
||||
String s = (String) body;
|
||||
req.body(Mono.just(s), String.class);
|
||||
} else {
|
||||
if (body instanceof ConvertedRequestBodyDataBufferWrapper) {
|
||||
d = ((ConvertedRequestBodyDataBufferWrapper) body).body;
|
||||
body = d;
|
||||
b = true;
|
||||
}
|
||||
req.bodyValue(body);
|
||||
}
|
||||
}
|
||||
boolean finalB = b;
|
||||
DataBuffer finalD = d;
|
||||
|
||||
Mono<ClientResponse> cr = req.exchange();
|
||||
if (timeout == 0) {
|
||||
@@ -140,7 +145,17 @@ public class FizzWebClient {
|
||||
if (timeout != 0) {
|
||||
cr = cr.timeout(Duration.ofMillis(timeout));
|
||||
}
|
||||
return cr;
|
||||
|
||||
return cr.doFinally(
|
||||
s -> {
|
||||
if (finalB) {
|
||||
boolean release = NettyDataBufferUtils.release(clientReqId, finalD);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("release converted request body databuffer " + release, LogService.BIZ_ID, clientReqId);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void setHostHeader(String uri, HttpHeaders headers) {
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
@@ -56,7 +57,7 @@ public abstract class WebUtils {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(WebUtils.class);
|
||||
|
||||
private static final String clientService = "clientService";
|
||||
private static final String clientService = "@cs";
|
||||
|
||||
private static final String xForwardedFor = "X-FORWARDED-FOR";
|
||||
|
||||
@@ -66,17 +67,17 @@ public abstract class WebUtils {
|
||||
|
||||
private static final String binaryAddress = "0:0:0:0:0:0:0:1";
|
||||
|
||||
private static final String directResponse = "directResponse";
|
||||
private static final String directResponse = "@dr";
|
||||
|
||||
private static final String response = " response ";
|
||||
|
||||
private static final String originIp = "originIp";
|
||||
private static final String originIp = "@oi";
|
||||
|
||||
private static final String clientRequestPath = "clientRequestPath";
|
||||
private static final String clientRequestPath = "@crp";
|
||||
|
||||
private static final String clientRequestPathPrefix = "clientRequestPathPrefix";
|
||||
private static final String clientRequestPathPrefix = "@crpp";
|
||||
|
||||
private static final String clientRequestQuery = "clientRequestQuery";
|
||||
private static final String clientRequestQuery = "@crq";
|
||||
|
||||
private static final String traceId = "traceId";
|
||||
|
||||
@@ -86,23 +87,57 @@ public abstract class WebUtils {
|
||||
|
||||
private static final String app = "app";
|
||||
|
||||
public static final String BACKEND_SERVICE = "backendService";
|
||||
public static final String BACKEND_SERVICE = "@bs";
|
||||
|
||||
public static final String FILTER_CONTEXT = "filterContext";
|
||||
public static final String FILTER_CONTEXT = "@fc";
|
||||
|
||||
public static final String APPEND_HEADERS = "appendHeaders";
|
||||
public static final String APPEND_HEADERS = "@ahs";
|
||||
|
||||
public static final String PREV_FILTER_RESULT = "prevFilterResult";
|
||||
public static final String PREV_FILTER_RESULT = "@pfr";
|
||||
|
||||
public static final String BACKEND_PATH = "backendPath";
|
||||
public static final String BACKEND_PATH = "@bp";
|
||||
|
||||
public static boolean LOG_RESPONSE_BODY = false;
|
||||
|
||||
public static Set<String> LOG_HEADER_SET = Collections.EMPTY_SET;
|
||||
|
||||
public static final DataBuffer EMPTY_BODY = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes());
|
||||
public static final DataBuffer EMPTY_BODY = NettyDataBufferUtils.from(Constants.Symbol.EMPTY);
|
||||
|
||||
public static final String REQUEST_BODY = "@rb";
|
||||
|
||||
public static final String CONVERTED_REQUEST_BODY = "@crb";
|
||||
|
||||
|
||||
public static DataBuffer getRequestBody(ServerWebExchange exchange) {
|
||||
return exchange.getAttribute(REQUEST_BODY);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param convertedRequestBody can be DataBuffer or String type
|
||||
*/
|
||||
public static void setConvertedRequestBody(ServerWebExchange exchange, Object convertedRequestBody) {
|
||||
Object prev = exchange.getAttribute(CONVERTED_REQUEST_BODY);
|
||||
if (prev instanceof ConvertedRequestBodyDataBufferWrapper) {
|
||||
ConvertedRequestBodyDataBufferWrapper p = (ConvertedRequestBodyDataBufferWrapper) prev;
|
||||
if (p.body != null) {
|
||||
DataBufferUtils.release(p.body);
|
||||
}
|
||||
}
|
||||
if (convertedRequestBody instanceof DataBuffer) {
|
||||
DataBuffer d = (DataBuffer) convertedRequestBody;
|
||||
DataBufferUtils.retain(d);
|
||||
exchange.getAttributes().put(CONVERTED_REQUEST_BODY, new ConvertedRequestBodyDataBufferWrapper(d));
|
||||
} else {
|
||||
exchange.getAttributes().put(CONVERTED_REQUEST_BODY, convertedRequestBody);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return result may be String or ConvertedRequestBodyDataBufferWrapper type
|
||||
*/
|
||||
public static Object getConvertedRequestBody(ServerWebExchange exchange) {
|
||||
return exchange.getAttribute(CONVERTED_REQUEST_BODY);
|
||||
}
|
||||
|
||||
public static void setGatewayPrefix(String p) {
|
||||
gatewayPrefix = p;
|
||||
@@ -383,10 +418,14 @@ public abstract class WebUtils {
|
||||
if (appendHeaders.isEmpty()) {
|
||||
return req.getHeaders();
|
||||
}
|
||||
boolean b = appendHeaders.containsKey(HttpHeaders.CONTENT_TYPE);
|
||||
HttpHeaders hdrs = new HttpHeaders();
|
||||
req.getHeaders().forEach(
|
||||
(h, vs) -> {
|
||||
hdrs.addAll(h, vs);
|
||||
if (b && h.equals(HttpHeaders.CONTENT_TYPE)) {
|
||||
} else {
|
||||
hdrs.addAll(h, vs);
|
||||
}
|
||||
}
|
||||
);
|
||||
appendHeaders.forEach(
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<groupId>com.fizzgate</groupId>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
7
pom.xml
7
pom.xml
@@ -7,7 +7,7 @@
|
||||
<!--<java.version>1.8</java.version>-->
|
||||
<spring-boot.version>2.2.13.RELEASE</spring-boot.version>
|
||||
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
|
||||
<reactor-bom.version>Dysprosium-SR21</reactor-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR22</reactor-bom.version>
|
||||
<lettuce.version>5.3.7.RELEASE</lettuce.version>
|
||||
<nacos.cloud.version>2.2.5.RELEASE</nacos.cloud.version>
|
||||
<netty.version>4.1.66.Final</netty.version>
|
||||
@@ -33,7 +33,7 @@
|
||||
<artifactId>fizz-gateway-community</artifactId>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>fizz gateway community</description>
|
||||
<version>2.2.3</version>
|
||||
<version>2.2.4-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>fizz-common</module>
|
||||
@@ -133,7 +133,7 @@
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<version>3.3.18.RELEASE</version>
|
||||
<version>3.3.19.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
@@ -502,5 +502,4 @@
|
||||
</distributionManagement>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
</project>
|
||||
|
||||
Reference in New Issue
Block a user