From 6b7110546d30bad74e85408b5d9b620f38df1a89 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 2 Feb 2021 11:37:16 +0800 Subject: [PATCH] update: add ApiConifg2appsServiceTests --- src/main/java/we/filter/CallbackFilter.java | 16 +++---- src/main/java/we/proxy/CallbackService.java | 50 ++++++++------------- src/main/java/we/util/ReactorUtils.java | 2 +- src/main/java/we/util/Utils.java | 16 ++++++- src/main/java/we/util/WebUtils.java | 47 ++++++++++--------- 5 files changed, 66 insertions(+), 65 deletions(-) diff --git a/src/main/java/we/filter/CallbackFilter.java b/src/main/java/we/filter/CallbackFilter.java index 8a23558..d4214ae 100644 --- a/src/main/java/we/filter/CallbackFilter.java +++ b/src/main/java/we/filter/CallbackFilter.java @@ -45,7 +45,6 @@ import we.plugin.auth.GatewayGroupService; import we.plugin.auth.Receiver; import we.proxy.CallbackService; import we.proxy.DiscoveryClientUriSelector; -import we.proxy.FizzWebClient; import we.proxy.ServiceInstance; import we.util.Constants; import we.util.ThreadContext; @@ -70,16 +69,11 @@ public class CallbackFilter extends FizzWebFilter { private static final String s2im = "$s2im"; - private static final DataBuffer emptyBody = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes()); - private static final String json = "json"; @Resource private DiscoveryClientUriSelector discoveryClientSelector; - @Resource - private FizzWebClient fizzWebClient; - @NacosValue(value = "${callback.push.dest:redis}", autoRefreshed = true) @Value("${callback.push.dest:redis}") private String dest; @@ -106,16 +100,15 @@ public class CallbackFilter extends FizzWebFilter { ServerHttpRequest req = exchange.getRequest(); DataBuffer[] body = {null}; return - DataBufferUtils.join(req.getBody()).defaultIfEmpty(emptyBody) + DataBufferUtils.join(req.getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY) .flatMap( b -> { - if (b != emptyBody) { + if (b != WebUtils.EMPTY_BODY) { body[0] = b; } - String bodyStr = body[0].toString(StandardCharsets.UTF_8); HashMap service2instMap = getService2instMap(ac); HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); - pushReq2manager(exchange, headers, bodyStr, service2instMap); + pushReq2manager(exchange, headers, body[0], service2instMap); if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) { return directResponse(exchange, cc); } else { @@ -170,7 +163,7 @@ public class CallbackFilter extends FizzWebFilter { private static final String _receivers = "\"receivers\":"; private static final String _gatewayGroup = "\"gatewayGroup\":"; - private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, String bodyStr, HashMap service2instMap) { + private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, HashMap service2instMap) { ServerHttpRequest req = exchange.getRequest(); StringBuilder b = ThreadContext.getStringBuilder(); @@ -196,6 +189,7 @@ public class CallbackFilter extends FizzWebFilter { String gg = gatewayGroupService.currentGatewayGroupSet.iterator().next(); b.append(_gatewayGroup); toJsonStringValue(b, gg); b.append(Constants.Symbol.COMMA); + String bodyStr = body.toString(StandardCharsets.UTF_8); MediaType contentType = req.getHeaders().getContentType(); if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) { b.append(_body); b.append(JSON.toJSONString(bodyStr)); diff --git a/src/main/java/we/proxy/CallbackService.java b/src/main/java/we/proxy/CallbackService.java index 66424bc..eec11e0 100644 --- a/src/main/java/we/proxy/CallbackService.java +++ b/src/main/java/we/proxy/CallbackService.java @@ -71,8 +71,7 @@ public class CallbackService { @Resource private ApiConfigService apiConfigService; - public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, - HashMap service2instMap) { + public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, HashMap service2instMap) { ServerHttpRequest req = exchange.getRequest(); String reqId = req.getId(); HttpMethod method = req.getMethod(); @@ -101,31 +100,23 @@ public class CallbackService { sends[i] = send; } return Flux.mergeSequential(sends) - .reduce( - new ArrayList(rs), - (respCollector, resp) -> { - respCollector.add(resp); - return respCollector; - } - ) + .collectList() .flatMap( - resps -> { + sendResults -> { Object r = null; - for (int i = 1; i < resps.size(); i++) { - r = resps.get(i); + for (int i = 1; i < sendResults.size(); i++) { + r = sendResults.get(i); if (r instanceof ClientResponse && r != fcr) { clean((ClientResponse) r); } } - r = resps.get(0); + r = sendResults.get(0); if (r == fcr || r == far) { - return Mono.error(new RuntimeException("cant response client with " + r, null, false, false) {}); + return Mono.error(Utils.runtimeExceptionWithoutStack("cant response client with " + r)); } else if (r instanceof ClientResponse) { - ClientResponse cr = (ClientResponse) r; - return genServerResponse(exchange, cr); + return genServerResponse(exchange, (ClientResponse) r); } else { - AggregateResult ar = (AggregateResult) r; - return aggregateService.genAggregateResponse(exchange, ar); + return aggregateService.genAggregateResponse(exchange, (AggregateResult) r); } } ) @@ -200,7 +191,11 @@ public class CallbackService { public Mono replay(CallbackReplayReq req) { - CallbackConfig cc = apiConfigService.getApiConfig(req.service, req.method, req.path, req.gatewayGroup, req.app).callbackConfig; + ApiConfig ac = apiConfigService.getApiConfig(req.service, req.method, req.path, req.gatewayGroup, req.app); + if (ac == null) { + return Mono.just(ReactiveResult.fail("no api config for " + req.path)); + } + CallbackConfig cc = ac.callbackConfig; if (req.headers.getContentType().getSubtype().equalsIgnoreCase("json")) { req.body = (String) JSON.parse(req.body); } @@ -234,31 +229,24 @@ public class CallbackService { if (stp.type == ApiConfig.Type.SERVICE_DISCOVERY) { send = fizzWebClient.proxySend2service(req.id, req.method, stp.service, stp.path, req.headers, req.body) .onErrorResume( crError(req, stp.service, stp.path) ); - sends.add(send); } else { String traceId = CommonConstants.TRACE_ID_PREFIX + req.id; send = aggregateService.request(traceId, "/proxy/", req.method.name(), stp.service, stp.path, null, req.headers, req.body) .onErrorResume( arError(req, stp.service, stp.path) ); - sends.add(send); } + sends.add(send); } } int ss = sends.size(); Mono[] sendArr = sends.toArray(new Mono[ss]); return Flux.mergeSequential(sendArr) - .reduce( - new ArrayList(ss), - (respCollector, resp) -> { - respCollector.add(resp); - return respCollector; - } - ) + .collectList() .map( - resps -> { + sendResults -> { int c = ReactiveResult.SUCC; - for (int i = 0; i < resps.size(); i++) { - Object r = resps.get(i); + for (int i = 0; i < sendResults.size(); i++) { + Object r = sendResults.get(i); if (r == fcr || r == far) { c = ReactiveResult.FAIL; } else if (r instanceof ClientResponse) { diff --git a/src/main/java/we/util/ReactorUtils.java b/src/main/java/we/util/ReactorUtils.java index 388868f..42e63a7 100644 --- a/src/main/java/we/util/ReactorUtils.java +++ b/src/main/java/we/util/ReactorUtils.java @@ -30,7 +30,7 @@ public interface ReactorUtils { static final Object NULL = OBJ; - static final Throwable EMPTY_THROWABLE = new Throwable(null, null, false, false) {}; // XXX + static final Throwable EMPTY_THROWABLE = Utils.throwableWithoutStack(null); // XXX static Mono getInitiateMono() { return Mono.just(OBJ); diff --git a/src/main/java/we/util/Utils.java b/src/main/java/we/util/Utils.java index 17c5f1f..ef6f3a6 100644 --- a/src/main/java/we/util/Utils.java +++ b/src/main/java/we/util/Utils.java @@ -18,7 +18,6 @@ package we.util; import org.apache.commons.lang3.StringUtils; -import org.apache.http.Consts; import java.time.LocalDate; import java.time.LocalDateTime; @@ -93,4 +92,19 @@ public abstract class Utils { } } } + + public static RuntimeException runtimeExceptionWithoutStack(String msg) { + return new RuntimeException(msg, null, false, false) { + }; + } + + public static Exception exceptionWithoutStack(String msg) { + return new Exception(msg, null, false, false) { + }; + } + + public static Throwable throwableWithoutStack(String msg) { + return new Throwable(null, null, false, false) { + }; + } } diff --git a/src/main/java/we/util/WebUtils.java b/src/main/java/we/util/WebUtils.java index dab6b37..c8df184 100644 --- a/src/main/java/we/util/WebUtils.java +++ b/src/main/java/we/util/WebUtils.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -55,47 +58,49 @@ import java.util.Set; public abstract class WebUtils { - private static final Logger log = LoggerFactory.getLogger(WebUtils.class); + private static final Logger log = LoggerFactory.getLogger(WebUtils.class); - private static final String clientService = "clientService"; + private static final String clientService = "clientService"; - public static final String BACKEND_SERVICE = "backendService"; + public static final String BACKEND_SERVICE = "backendService"; - private static final String xForwardedFor = "X-FORWARDED-FOR"; + private static final String xForwardedFor = "X-FORWARDED-FOR"; - private static final String unknown = "unknown"; + private static final String unknown = "unknown"; - private static final String loopBack = "127.0.0.1"; + private static final String loopBack = "127.0.0.1"; - private static final String binaryAddress = "0:0:0:0:0:0:0:1"; + private static final String binaryAddress = "0:0:0:0:0:0:0:1"; - private static final String directResponse = "directResponse"; + private static final String directResponse = "directResponse"; - private static final String response = " response "; + private static final String response = " response "; - private static final String originIp = "originIp"; + private static final String originIp = "originIp"; - public static final String APP_HEADER = "fizz-appid"; + public static final String APP_HEADER = "fizz-appid"; - public static final String FILTER_CONTEXT = "filterContext"; + public static final String FILTER_CONTEXT = "filterContext"; - public static final String APPEND_HEADERS = "appendHeaders"; + public static final String APPEND_HEADERS = "appendHeaders"; - public static final String PREV_FILTER_RESULT = "prevFilterResult"; + public static final String PREV_FILTER_RESULT = "prevFilterResult"; - private static final String CLIENT_REQUEST_PATH = "clientRequestPath"; + private static final String CLIENT_REQUEST_PATH = "clientRequestPath"; - private static final String CLIENT_REQUEST_QUERY = "clientRequestQuery"; + private static final String CLIENT_REQUEST_QUERY = "clientRequestQuery"; - private static final String traceId = "traceId"; + private static final String traceId = "traceId"; - public static final String BACKEND_PATH = "backendPath"; + public static final String BACKEND_PATH = "backendPath"; - public static boolean logResponseBody = false; + public static boolean logResponseBody = false; - public static Set logHeaderSet = Collections.EMPTY_SET; + public static Set logHeaderSet = Collections.EMPTY_SET; - public static final String PATH_PREFIX = "/proxy/"; + public static final String PATH_PREFIX = "/proxy/"; + + public static final DataBuffer EMPTY_BODY = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes()); public static String getHeaderValue(ServerWebExchange exchange, String header) { return exchange.getRequest().getHeaders().getFirst(header);