From d7b5fb784a3109aecbec656ff4f13b7dc2e37e49 Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Thu, 29 Jun 2023 11:27:25 +0800 Subject: [PATCH] Fix repeated read issue of request body of CallbackFilter #478 --- .../com/fizzgate/filter/CallbackFilter.java | 68 +++++++++++-------- .../com/fizzgate/proxy/CallbackService.java | 9 ++- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java index 212f9ed..51ae47c 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java @@ -27,6 +27,8 @@ import com.fizzgate.proxy.CallbackService; import com.fizzgate.proxy.DiscoveryClientUriSelector; import com.fizzgate.proxy.ServiceInstance; import com.fizzgate.service_registry.RegistryCenterService; +import com.fizzgate.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; +import com.fizzgate.spring.web.server.ext.FizzServerWebExchangeDecorator; import com.fizzgate.util.Consts; import com.fizzgate.util.NettyDataBufferUtils; import com.fizzgate.util.ThreadContext; @@ -36,9 +38,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -90,33 +89,47 @@ public class CallbackFilter extends FizzWebFilter { private GatewayGroupService gatewayGroupService; @Override - public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { + String traceId = WebUtils.getTraceId(exchange); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); - FilterResult pfr = WebUtils.getPrevFilterResult(exchange); - if (!pfr.success) { - return WebUtils.getDirectResponse(exchange); + ServerHttpRequest req = exchange.getRequest(); + if (req instanceof FizzServerHttpRequestDecorator) { + return doFilter(exchange, chain); } - + return + NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) + .flatMap( + body -> { + FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req); + if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { + try { + requestDecorator.setBody(body); + } finally { + NettyDataBufferUtils.release(body); + } + } + ServerWebExchange mutatedExchange = exchange.mutate().request(requestDecorator).build(); + ServerWebExchange newExchange = mutatedExchange; + MediaType contentType = req.getHeaders().getContentType(); + if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) { + newExchange = new FizzServerWebExchangeDecorator(mutatedExchange); + } + return doFilter(newExchange, chain); + } + ); + } + + public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + String traceId = WebUtils.getTraceId(exchange); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + ApiConfig ac = WebUtils.getApiConfig(exchange); if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { CallbackConfig cc = ac.callbackConfig; - ServerHttpRequest req = exchange.getRequest(); - return - DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) - .flatMap( - b -> { - DataBuffer body = null; - if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) { - if (b instanceof PooledDataBuffer) { - try { - body = NettyDataBufferUtils.copy2heap(b); - } finally { - NettyDataBufferUtils.release(b); - } - } else { - body = b; - } - } + FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest(); + return req.getBody().defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).single().flatMap(b -> { + String body = b.toString(StandardCharsets.UTF_8); HashMap service2instMap = getService2instMap(ac); HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next()); @@ -175,7 +188,7 @@ public class CallbackFilter extends FizzWebFilter { private static final String _receivers = "\"receivers\":"; private static final String _gatewayGroup = "\"gatewayGroup\":"; - private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, HashMap service2instMap, int callbackConfigId, + private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, Object body, HashMap service2instMap, int callbackConfigId, String gatewayGroup) { ServerHttpRequest req = exchange.getRequest(); @@ -215,7 +228,8 @@ public class CallbackFilter extends FizzWebFilter { if (body != null) { b.append(Consts.S.COMMA); - String bodyStr = body.toString(StandardCharsets.UTF_8); + // String bodyStr = body.toString(StandardCharsets.UTF_8); + String bodyStr = body.toString(); MediaType contentType = req.getHeaders().getContentType(); if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) { b.append(_body); b.append(JSON.toJSONString(bodyStr)); diff --git a/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java b/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java index dd14621..5440a98 100644 --- a/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java +++ b/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java @@ -19,7 +19,6 @@ package com.fizzgate.proxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -81,7 +80,7 @@ public class CallbackService { aggrConfigPrefix = systemConfig.getGatewayPrefix() + '/'; } - public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map service2instMap) { + public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, String body, CallbackConfig cc, Map service2instMap) { ServerHttpRequest req = exchange.getRequest(); String traceId = WebUtils.getTraceId(exchange); HttpMethod method = req.getMethod(); @@ -140,21 +139,21 @@ public class CallbackService { ; } - private Function> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) { + private Function> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) { return t -> { log(exchange, r, method, headers, body, t); return Mono.just(new FizzFailClientResponse(t)); }; } - private Function> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) { + private Function> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) { return t -> { log(exchange, r, method, headers, body, t); return Mono.just(new FailAggregateResult(t)); }; } - private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body, Throwable t) { + private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body, Throwable t) { StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(exchange, b); b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);