From 8c9ba12db6b5f2f003c4be47f8552501b770a9c7 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Sat, 4 Mar 2023 11:43:05 +0800 Subject: [PATCH] Fix flow control problem --- fizz-bootstrap/pom.xml | 4 +- fizz-common/pom.xml | 2 +- fizz-core/pom.xml | 2 +- .../filter/FilterExceptionHandlerConfig.java | 14 +- .../com/fizzgate/filter/FizzLogFilter.java | 4 +- .../fizzgate/filter/FlowControlFilter.java | 126 +++++++++--------- .../com/fizzgate/filter/PreprocessFilter.java | 19 ++- .../java/com/fizzgate/filter/RouteFilter.java | 106 +++++++-------- .../plugin/FizzPluginFilterChain.java | 12 ++ .../com/fizzgate/plugin/PluginFilter.java | 11 +- .../plugin/auth/AuthPluginFilter.java | 1 - .../plugin/requestbody/RequestBodyPlugin.java | 23 +++- .../plugin/stat/StatPluginFilter.java | 1 + .../com/fizzgate/proxy/FizzWebClient.java | 10 +- .../java/com/fizzgate/stats/FlowStat.java | 32 ++--- fizz-plugin/pom.xml | 2 +- fizz-spring-boot-starter/pom.xml | 2 +- pom.xml | 38 +++++- 18 files changed, 231 insertions(+), 178 deletions(-) diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index f30a1a4..b95c7f0 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -6,7 +6,7 @@ fizz-gateway-community com.fizzgate - 2.7.1 + 2.7.2-SNAPSHOT ../pom.xml @@ -18,7 +18,7 @@ Dragonfruit-SR3 Dysprosium-SR25 5.3.7.RELEASE - 4.1.89.Final + 4.1.90.Final 4.4.16 2.17.2 1.7.36 diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml index 9953932..e3e6140 100644 --- a/fizz-common/pom.xml +++ b/fizz-common/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.7.1 + 2.7.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml index d4278cd..4f67eed 100644 --- a/fizz-core/pom.xml +++ b/fizz-core/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.7.1 + 2.7.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/fizz-core/src/main/java/com/fizzgate/filter/FilterExceptionHandlerConfig.java b/fizz-core/src/main/java/com/fizzgate/filter/FilterExceptionHandlerConfig.java index c71bf75..e773488 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/FilterExceptionHandlerConfig.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/FilterExceptionHandlerConfig.java @@ -56,13 +56,19 @@ public class FilterExceptionHandlerConfig { public static class FilterExceptionHandler implements WebExceptionHandler { - private static final Logger log = LoggerFactory.getLogger(FilterExceptionHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FilterExceptionHandler.class); private static final String filterExceptionHandler = "filterExceptionHandler"; @Override public Mono handle(ServerWebExchange exchange, Throwable t) { exchange.getAttributes().put(WebUtils.ORIGINAL_ERROR, t); String traceId = WebUtils.getTraceId(exchange); + + if (LOGGER.isDebugEnabled()) { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + LOGGER.debug(Consts.S.EMPTY, t); + } + ServerHttpResponse resp = exchange.getResponse(); if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { if (t instanceof ResponseStatusException) { @@ -113,9 +119,8 @@ public class FilterExceptionHandlerConfig { if (t instanceof FizzRuntimeException) { FizzRuntimeException ex = (FizzRuntimeException) t; - // log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); - log.error(traceId + ' ' + tMsg, ex); + LOGGER.error(tMsg, ex); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); RespEntity rs = null; if (ex.getStepContext() != null && ex.getStepContext().returnContext()) { @@ -132,9 +137,8 @@ public class FilterExceptionHandlerConfig { if (fc == null) { // t came from flow control filter StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(exchange, b); - // log.error(b.toString(), LogService.BIZ_ID, traceId, t); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); - log.error(b.toString(), t); + LOGGER.error(b.toString(), t); String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes()))); diff --git a/fizz-core/src/main/java/com/fizzgate/filter/FizzLogFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/FizzLogFilter.java index 9878ce4..9fa7e0b 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/FizzLogFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/FizzLogFilter.java @@ -51,14 +51,14 @@ public class FizzLogFilter implements WebFilter { long start = System.currentTimeMillis(); return chain.filter(exchange).doFinally( (c) -> { - if (log.isInfoEnabled()) { + if (log.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(exchange, b); b.append(resp).append(exchange.getResponse().getStatusCode()) .append(in) .append(System.currentTimeMillis() - start); // log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange)); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); - log.info(b.toString()); + log.debug(b.toString()); } } ); diff --git a/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java index 6446d11..8eb7bd3 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java @@ -17,19 +17,6 @@ package com.fizzgate.filter; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.annotation.Order; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.stereotype.Component; -import org.springframework.web.server.ServerWebExchange; -import org.springframework.web.server.WebFilterChain; - import com.fizzgate.config.SystemConfig; import com.fizzgate.monitor.FizzMonitorService; import com.fizzgate.plugin.auth.ApiConfigService; @@ -44,7 +31,19 @@ import com.fizzgate.stats.degrade.DegradeRule; import com.fizzgate.stats.ratelimit.ResourceRateLimitConfig; import com.fizzgate.stats.ratelimit.ResourceRateLimitConfigService; import com.fizzgate.util.*; - +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; @@ -63,7 +62,7 @@ public class FlowControlFilter extends FizzWebFilter { public static final String FLOW_CONTROL_FILTER = "flowControlFilter"; - private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FlowControlFilter.class); private static final String admin = "admin"; @@ -160,7 +159,6 @@ public class FlowControlFilter extends FizzWebFilter { if (!favReq && flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) { String traceId = WebUtils.getTraceId(exchange); - // LogService.setBizId(traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); if (!apiConfigService.serviceConfigMap.containsKey(service)) { String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId); @@ -178,12 +176,10 @@ public class FlowControlFilter extends FizzWebFilter { }); if (result != null && !result.isSuccess()) { - long currentTimeMillis = System.currentTimeMillis(); String blockedResourceId = result.getBlockedResourceId(); if (BlockType.CIRCUIT_BREAK == result.getBlockType()) { fizzMonitorService.alarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null); - // log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId); - log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId); + LOGGER.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId); String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType(); String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent(); @@ -214,12 +210,10 @@ public class FlowControlFilter extends FizzWebFilter { } else { if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) { fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents); - // log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId); - log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId); + LOGGER.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId); } else { fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps); - // log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId); - log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId); + LOGGER.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId); } ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE); @@ -240,42 +234,50 @@ public class FlowControlFilter extends FizzWebFilter { return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rc.getBytes()))); } } else { - long start = System.currentTimeMillis(); - setTraceId(exchange); - String finalService = service; - String finalPath = path; - return chain.filter(exchange).doFinally(s -> { - long rt = System.currentTimeMillis() - start; - CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST); - HttpStatus statusCode = exchange.getResponse().getStatusCode(); - Throwable t = exchange.getAttribute(WebUtils.ORIGINAL_ERROR); - if (t instanceof TimeoutException) { - statusCode = HttpStatus.GATEWAY_TIMEOUT; - } - // if (s == SignalType.ON_ERROR || statusCode.is4xxClientError() || statusCode.is5xxServerError()) { - if (s == SignalType.ON_ERROR || statusCode.is5xxServerError()) { - flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false, statusCode); - if (cb != null) { - cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat); - } - if (statusCode == HttpStatus.GATEWAY_TIMEOUT) { - fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage()); - } else if (statusCode.is5xxServerError()) { - fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value())); - } else if (s == SignalType.ON_ERROR && t != null) { - fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage()); - } - } else { - flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode); - if (cb != null) { - cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat); - } - } - }); + long start = System.currentTimeMillis(); + String finalService = service; + String finalPath = path; + return chain.filter(exchange).doFinally(s -> { + + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + long rt = System.currentTimeMillis() - start; + CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST); + HttpStatus statusCode = exchange.getResponse().getStatusCode(); + Throwable t = exchange.getAttribute(WebUtils.ORIGINAL_ERROR); + if (t instanceof TimeoutException) { + statusCode = HttpStatus.GATEWAY_TIMEOUT; + } + + if (s == SignalType.ON_ERROR || statusCode.is5xxServerError()) { + flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false, statusCode); + if (cb != null) { + cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat); + } + if (statusCode == HttpStatus.GATEWAY_TIMEOUT) { + fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage()); + } else if (statusCode.is5xxServerError()) { + fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value())); + } else if (s == SignalType.ON_ERROR && t != null) { + fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage()); + } + } else { + flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode); + if (cb != null) { + cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat); + } + } + + if (s == SignalType.CANCEL) { + ClientResponse remoteResp = exchange.getAttribute("remoteResp"); + if (remoteResp != null) { + remoteResp.bodyToMono(Void.class).subscribe(); + LOGGER.warn("client cancel, and dispose remote response"); + } + } + }); } } - // setTraceId(exchange); return chain.filter(exchange); } @@ -340,15 +342,15 @@ public class FlowControlFilter extends FizzWebFilter { check = true; } } - if (log.isDebugEnabled()) { - log.debug("getResourceConfigItselfAndParents:\n" + JacksonUtils.writeValueAsString(rc) + '\n' + JacksonUtils.writeValueAsString(result)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("getResourceConfigItselfAndParents:\n" + JacksonUtils.writeValueAsString(rc) + '\n' + JacksonUtils.writeValueAsString(result)); } return result; } private List getFlowControlConfigs(String app, String ip, String node, String service, String path) { - if (log.isDebugEnabled()) { - log.debug("get flow control configs by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("get flow control configs by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path); } boolean hasHost = (StringUtils.isNotBlank(node) && !node.equals(ResourceIdUtils.NODE)); int sz = hasHost ? 10 : 9; @@ -377,8 +379,8 @@ public class FlowControlFilter extends FizzWebFilter { checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, path, null); } - if (log.isDebugEnabled()) { - log.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs)); } return resourceConfigs; } diff --git a/fizz-core/src/main/java/com/fizzgate/filter/PreprocessFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/PreprocessFilter.java index 0a30972..1060b32 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/PreprocessFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/PreprocessFilter.java @@ -17,6 +17,7 @@ package com.fizzgate.filter; +import com.fizzgate.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; @@ -35,10 +36,6 @@ import com.fizzgate.plugin.auth.GatewayGroup; import com.fizzgate.plugin.auth.GatewayGroupService; import com.fizzgate.plugin.stat.StatPluginFilter; import com.fizzgate.proxy.Route; -import com.fizzgate.util.ReactorUtils; -import com.fizzgate.util.Result; -import com.fizzgate.util.ThreadContext; -import com.fizzgate.util.WebUtils; import reactor.core.publisher.Mono; @@ -54,7 +51,7 @@ import java.util.function.Function; @Order(10) public class PreprocessFilter extends FizzWebFilter { - private static final Logger log = LoggerFactory.getLogger(PreprocessFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PreprocessFilter.class); public static final String PREPROCESS_FILTER = "preprocessFilter"; @@ -72,6 +69,12 @@ public class PreprocessFilter extends FizzWebFilter { @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + String traceId = WebUtils.getTraceId(exchange); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("preprocess filter start"); + } + Map fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr); Map appendHdrs = new HashMap<>(8); Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc); @@ -85,8 +88,14 @@ public class PreprocessFilter extends FizzWebFilter { .thenReturn(ReactorUtils.Void) .flatMap( v -> { + + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + Result auth = (Result) WebUtils.getFilterResultDataItem(exchange, AuthPluginFilter.AUTH_PLUGIN_FILTER, AuthPluginFilter.RESULT); if (auth.code == Result.FAIL) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("preprocess filter end 403"); + } return WebUtils.responseError(exchange, HttpStatus.FORBIDDEN.value(), auth.msg); } ApiConfig ac = auth.data; diff --git a/fizz-core/src/main/java/com/fizzgate/filter/RouteFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/RouteFilter.java index 65d9863..8ea9fdd 100644 --- a/fizz-core/src/main/java/com/fizzgate/filter/RouteFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/filter/RouteFilter.java @@ -17,11 +17,22 @@ package com.fizzgate.filter; +import com.fizzgate.config.SystemConfig; +import com.fizzgate.plugin.auth.ApiConfig; +import com.fizzgate.proxy.FizzWebClient; +import com.fizzgate.proxy.Route; +import com.fizzgate.proxy.dubbo.ApacheDubboGenericService; +import com.fizzgate.proxy.dubbo.DubboInterfaceDeclaration; +import com.fizzgate.service_registry.RegistryCenterService; +import com.fizzgate.stats.FlowStat; +import com.fizzgate.stats.ResourceConfig; +import com.fizzgate.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -32,22 +43,13 @@ import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; - -import com.fizzgate.config.SystemConfig; -import com.fizzgate.plugin.auth.ApiConfig; -import com.fizzgate.proxy.FizzWebClient; -import com.fizzgate.proxy.Route; -import com.fizzgate.proxy.dubbo.ApacheDubboGenericService; -import com.fizzgate.proxy.dubbo.DubboInterfaceDeclaration; -import com.fizzgate.service_registry.RegistryCenterService; -import com.fizzgate.util.*; - import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.function.Function; +import java.util.stream.Collectors; /** * @author hongqiaowei @@ -57,7 +59,7 @@ import java.util.function.Function; @Order(Ordered.LOWEST_PRECEDENCE) public class RouteFilter extends FizzWebFilter { - private static final Logger log = LoggerFactory.getLogger(RouteFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RouteFilter.class); @Resource private FizzWebClient fizzWebClient; @@ -68,6 +70,9 @@ public class RouteFilter extends FizzWebFilter { @Resource private SystemConfig systemConfig; + @Resource + private FlowControlFilter flowControlFilter; + @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { @@ -77,16 +82,13 @@ public class RouteFilter extends FizzWebFilter { } else { Mono resp = WebUtils.getDirectResponse(exchange); if (resp == null) { // should not reach here - ServerHttpRequest clientReq = exchange.getRequest(); String traceId = WebUtils.getTraceId(exchange); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); String msg = traceId + ' ' + pfr.id + " fail"; if (pfr.cause == null) { - // log.error(msg, LogService.BIZ_ID, traceId); - log.error(msg); + LOGGER.error(msg); } else { - // log.error(msg, LogService.BIZ_ID, traceId, pfr.cause); - log.error(msg, pfr.cause); + LOGGER.error(msg, pfr.cause); } HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR; if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { @@ -101,8 +103,13 @@ public class RouteFilter extends FizzWebFilter { private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) { - ServerHttpRequest req = exchange.getRequest(); String traceId = WebUtils.getTraceId(exchange); + if (LOGGER.isDebugEnabled()) { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + LOGGER.debug("route filter start"); + } + + ServerHttpRequest req = exchange.getRequest(); Route route = exchange.getAttribute(WebUtils.ROUTE); HttpHeaders hdrs = null; @@ -112,32 +119,17 @@ public class RouteFilter extends FizzWebFilter { } if (route == null) { - /*String pathQuery = WebUtils.getClientReqPathQuery(exchange); - return fizzWebClient.send2service(traceId, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs, req.getBody(), 0, 0, 0) - .flatMap(genServerResponse(exchange));*/ - Map.Entry> pathQueryTemplate = WebUtils.getClientReqPathQueryTemplate(exchange).entrySet().iterator().next(); return fizzWebClient.send2service(traceId, req.getMethod(), WebUtils.getClientService(exchange), pathQueryTemplate.getKey(), hdrs, req.getBody(), 0, 0, 0, pathQueryTemplate.getValue().toArray(new String[0])) .flatMap(genServerResponse(exchange)); } else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) { - /*String pathQuery = getBackendPathQuery(req, route); - String svc = RegistryCenterService.getServiceNameSpace(route.registryCenter, route.backendService); - return fizzWebClient.send2service(traceId, route.method, svc, pathQuery, hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval) - .flatMap(genServerResponse(exchange));*/ - Map.Entry> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next(); String svc = RegistryCenterService.getServiceNameSpace(route.registryCenter, route.backendService); return fizzWebClient.send2service(traceId, route.method, svc, pathQueryTemplate.getKey(), hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval, pathQueryTemplate.getValue().toArray(new String[0])) .flatMap(genServerResponse(exchange)); } else if (route.type == ApiConfig.Type.REVERSE_PROXY) { - /*String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort) - .append(getBackendPathQuery(req, route)) - .toString(); - return fizzWebClient.send(traceId, route.method, uri, hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval) - .flatMap(genServerResponse(exchange));*/ - Map.Entry> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next(); String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort) .append(pathQueryTemplate.getKey()) @@ -150,20 +142,6 @@ public class RouteFilter extends FizzWebFilter { } } - private String getBackendPathQuery(ServerHttpRequest request, Route route) { - String qry = route.query; - if (qry == null) { - MultiValueMap queryParams = request.getQueryParams(); - if (queryParams.isEmpty()) { - return route.backendPath; - } else { - return route.backendPath + Consts.S.QUESTION + WebUtils.toQueryString(queryParams); - } - } else { - return route.backendPath + Consts.S.QUESTION + qry; - } - } - private Map> getBackendPathQueryTemplate(ServerHttpRequest request, Route route) { String qry = route.query; if (qry == null) { @@ -183,6 +161,7 @@ public class RouteFilter extends FizzWebFilter { private Function> genServerResponse(ServerWebExchange exchange) { return remoteResp -> { + String traceId = WebUtils.getTraceId(exchange); ServerHttpResponse clientResp = exchange.getResponse(); clientResp.setStatusCode(remoteResp.statusCode()); HttpHeaders clientRespHeaders = clientResp.getHeaders(); @@ -203,24 +182,36 @@ public class RouteFilter extends FizzWebFilter { } } ); - if (log.isDebugEnabled()) { + if (LOGGER.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); - String traceId = WebUtils.getTraceId(exchange); WebUtils.response2stringBuilder(traceId, remoteResp, b); - // log.debug(b.toString(), LogService.BIZ_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); - log.debug(b.toString()); + LOGGER.debug(b.toString()); } return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers())) - .doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp)); + .doOnError( + t -> { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + exchange.getAttributes().put("remoteResp", remoteResp); + LOGGER.error("response client error", t); + } + ) + .doOnCancel( + () -> { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + exchange.getAttributes().put("remoteResp", remoteResp); + LOGGER.error("client signal cancel"); + // cleanup(remoteResp); + } + ); }; } - private void cleanup(ClientResponse clientResponse) { - if (clientResponse != null) { - clientResponse.bodyToMono(Void.class).subscribe(); - } - } + // private void cleanup(ClientResponse clientResponse) { + // if (clientResponse != null) { + // clientResponse.bodyToMono(Void.class).subscribe(); + // } + // } private Mono dubboRpc(ServerWebExchange exchange, Route route) { final String[] ls = {null}; @@ -272,9 +263,8 @@ public class RouteFilter extends FizzWebFilter { if (ls[0] != null) { b.append('\n').append(ls[0]); } - // log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); - log.error(b.toString(), t); + LOGGER.error(b.toString(), t); } ) ; diff --git a/fizz-core/src/main/java/com/fizzgate/plugin/FizzPluginFilterChain.java b/fizz-core/src/main/java/com/fizzgate/plugin/FizzPluginFilterChain.java index ff50658..9fc6995 100644 --- a/fizz-core/src/main/java/com/fizzgate/plugin/FizzPluginFilterChain.java +++ b/fizz-core/src/main/java/com/fizzgate/plugin/FizzPluginFilterChain.java @@ -17,6 +17,9 @@ package com.fizzgate.plugin; +import com.fizzgate.util.Consts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; @@ -38,6 +41,8 @@ import java.util.Map; public final class FizzPluginFilterChain { + private static final Logger LOGGER = LoggerFactory.getLogger(FizzPluginFilterChain.class); + private static final String pluginConfigsIt = "pcsit@"; public static final String WEB_FILTER_CHAIN = "wfc@"; @@ -62,6 +67,13 @@ public final class FizzPluginFilterChain { if (it.hasNext()) { PluginConfig pc = it.next(); FizzPluginFilter pf = Fizz.context.getBean(pc.plugin, FizzPluginFilter.class); + + String traceId = WebUtils.getTraceId(exchange); + if (LOGGER.isDebugEnabled()) { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + LOGGER.debug("{} start", pc.plugin); + } + Mono m = pf.filter(exchange, pc.config); if (pf instanceof PluginFilter) { boolean f = false; diff --git a/fizz-core/src/main/java/com/fizzgate/plugin/PluginFilter.java b/fizz-core/src/main/java/com/fizzgate/plugin/PluginFilter.java index de70d66..2665019 100644 --- a/fizz-core/src/main/java/com/fizzgate/plugin/PluginFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/plugin/PluginFilter.java @@ -43,7 +43,7 @@ import java.util.Map; @Deprecated public abstract class PluginFilter implements FizzPluginFilter { - private static final Logger log = LoggerFactory.getLogger(PluginFilter.class); + private final Logger LOGGER = LoggerFactory.getLogger(getClass()); @Override public Mono filter(ServerWebExchange exchange, Map config) { @@ -55,9 +55,8 @@ public abstract class PluginFilter implements FizzPluginFilter { FilterResult pfr = WebUtils.getPrevFilterResult(exchange); String traceId = WebUtils.getTraceId(exchange); ThreadContext.put(Consts.TRACE_ID, traceId); - if (log.isDebugEnabled()) { - // log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId); - log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail")); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} execute {}", pfr.id, pfr.success ? "success" : "fail"); } if (pfr.success) { return doFilter(exchange, config, fixedConfig); @@ -65,9 +64,9 @@ public abstract class PluginFilter implements FizzPluginFilter { if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here String msg = traceId + ' ' + pfr.id + " fail"; if (pfr.cause == null) { - log.error(msg); + LOGGER.error(msg); } else { - log.error(msg, pfr.cause); + LOGGER.error(msg, pfr.cause); } HttpStatus s = HttpStatus.OK; if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { diff --git a/fizz-core/src/main/java/com/fizzgate/plugin/auth/AuthPluginFilter.java b/fizz-core/src/main/java/com/fizzgate/plugin/auth/AuthPluginFilter.java index 87312ca..1048548 100644 --- a/fizz-core/src/main/java/com/fizzgate/plugin/auth/AuthPluginFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/plugin/auth/AuthPluginFilter.java @@ -56,7 +56,6 @@ public class AuthPluginFilter extends PluginFilter { r -> { if (log.isDebugEnabled()) { String traceId = WebUtils.getTraceId(exchange); - // log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId); ThreadContext.put(Consts.TRACE_ID, traceId); log.debug("{} req auth: {}", traceId, r); } diff --git a/fizz-core/src/main/java/com/fizzgate/plugin/requestbody/RequestBodyPlugin.java b/fizz-core/src/main/java/com/fizzgate/plugin/requestbody/RequestBodyPlugin.java index 542b034..2a53c12 100644 --- a/fizz-core/src/main/java/com/fizzgate/plugin/requestbody/RequestBodyPlugin.java +++ b/fizz-core/src/main/java/com/fizzgate/plugin/requestbody/RequestBodyPlugin.java @@ -46,13 +46,19 @@ import java.util.Map; @Component(RequestBodyPlugin.REQUEST_BODY_PLUGIN) public class RequestBodyPlugin implements FizzPluginFilter { - private static final Logger log = LoggerFactory.getLogger(RequestBodyPlugin.class); + private final Logger LOGGER = LoggerFactory.getLogger(getClass()); public static final String REQUEST_BODY_PLUGIN = "requestBodyPlugin"; @Override public Mono filter(ServerWebExchange exchange, Map config) { + // String traceId = WebUtils.getTraceId(exchange); + // if (LOGGER.isDebugEnabled()) { + // org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + // LOGGER.debug("{} start", getClass().getSimpleName()); + // } + ServerHttpRequest req = exchange.getRequest(); if (req instanceof FizzServerHttpRequestDecorator) { return doFilter(exchange, config); @@ -76,18 +82,21 @@ public class RequestBodyPlugin implements FizzPluginFilter { if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) { newExchange = new FizzServerWebExchangeDecorator(mutatedExchange); } - if (log.isDebugEnabled()) { - String traceId = WebUtils.getTraceId(exchange); - // log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId); - ThreadContext.put(Consts.TRACE_ID, traceId); - log.debug("{} request is decorated", traceId); - } + // if (LOGGER.isDebugEnabled()) { + // ThreadContext.put(Consts.TRACE_ID, traceId); + // LOGGER.debug("{} request is decorated", traceId); + // } return doFilter(newExchange, config); } ); } public Mono doFilter(ServerWebExchange exchange, Map config) { + String traceId = WebUtils.getTraceId(exchange); + if (LOGGER.isDebugEnabled()) { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + LOGGER.debug("{} end", getClass().getSimpleName()); + } return FizzPluginFilterChain.next(exchange); } } diff --git a/fizz-core/src/main/java/com/fizzgate/plugin/stat/StatPluginFilter.java b/fizz-core/src/main/java/com/fizzgate/plugin/stat/StatPluginFilter.java index e5ed774..c12a54f 100644 --- a/fizz-core/src/main/java/com/fizzgate/plugin/stat/StatPluginFilter.java +++ b/fizz-core/src/main/java/com/fizzgate/plugin/stat/StatPluginFilter.java @@ -90,6 +90,7 @@ public class StatPluginFilter extends PluginFilter { accessStat.reqTime = System.currentTimeMillis(); accessStat.reqs++; if (LOGGER.isDebugEnabled()) { + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19)); } } diff --git a/fizz-core/src/main/java/com/fizzgate/proxy/FizzWebClient.java b/fizz-core/src/main/java/com/fizzgate/proxy/FizzWebClient.java index a29fc01..a430b74 100644 --- a/fizz-core/src/main/java/com/fizzgate/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/com/fizzgate/proxy/FizzWebClient.java @@ -220,7 +220,6 @@ public class FizzWebClient { if (log.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b); - // log.debug(b.toString(), LogService.BIZ_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); log.debug(b.toString()); } @@ -243,10 +242,6 @@ public class FizzWebClient { } setHostHeader(uri, hdrs); if (systemConfig.isFizzWebClientXForwardedForEnable()) { - List values = hdrs.get(X_FORWARDED_FOR); - /* if (CollectionUtils.isEmpty(values)) { - hdrs.add(X_FORWARDED_FOR, WebUtils.getOriginIp(null)); - } */ if (systemConfig.isFizzWebClientXForwardedForAppendGatewayIp()) { hdrs.add(X_FORWARDED_FOR, NetworkUtils.getServerIp()); } @@ -269,8 +264,9 @@ public class FizzWebClient { Mono cr = req.exchange(); if (timeout == 0) { - if (systemConfig.getRouteTimeout() != 0) { - timeout = systemConfig.getRouteTimeout(); + long systemConfigRouteTimeout = systemConfig.getRouteTimeout(); + if (systemConfigRouteTimeout != 0) { + timeout = systemConfigRouteTimeout; } } if (timeout > 0) { diff --git a/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java b/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java index 16417ec..919177a 100644 --- a/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java +++ b/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java @@ -17,8 +17,20 @@ package com.fizzgate.stats; -import java.util.*; +import com.fizzgate.stats.circuitbreaker.CircuitBreakManager; +import com.fizzgate.stats.circuitbreaker.CircuitBreaker; +import com.fizzgate.util.Consts; +import com.fizzgate.util.ResourceIdUtils; +import com.fizzgate.util.WebUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.server.ServerWebExchange; + +import java.util.ArrayList; +import java.util.List; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -26,17 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.http.HttpStatus; -import org.springframework.web.server.ServerWebExchange; - -import com.fizzgate.stats.circuitbreaker.CircuitBreakManager; -import com.fizzgate.stats.circuitbreaker.CircuitBreaker; -import com.fizzgate.util.ResourceIdUtils; -import com.fizzgate.util.WebUtils; +import java.util.stream.Collectors; /** * Flow Statistic @@ -524,7 +526,7 @@ public class FlowStat { long slotInterval = slotIntervalInSec * 1000; if (resourceId == null) { - Set> entrys = resourceStats.entrySet(); + Set> entrys = resourceStats.entrySet(); for (Entry entry : entrys) { String rid = entry.getKey(); ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid); @@ -585,7 +587,7 @@ public class FlowStat { } }*/ for (long i = lastSlotId; i < slotId;) { - Set> entrys = stat.resourceStats.entrySet(); + Set> entrys = stat.resourceStats.entrySet(); for (Entry entry : entrys) { String resourceId = entry.getKey(); ConcurrentMap timeSlots = entry.getValue().getTimeSlots(); @@ -649,7 +651,7 @@ public class FlowStat { long curTimeSlotId = stat.currentTimeSlotId(); if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) { // log.debug("PeakConcurrentJob start"); - Set> entrys = stat.resourceStats.entrySet(); + Set> entrys = stat.resourceStats.entrySet(); for (Entry entry : entrys) { String resource = entry.getKey(); // log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId, diff --git a/fizz-plugin/pom.xml b/fizz-plugin/pom.xml index 1c192eb..52ec30d 100644 --- a/fizz-plugin/pom.xml +++ b/fizz-plugin/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.7.1 + 2.7.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/fizz-spring-boot-starter/pom.xml b/fizz-spring-boot-starter/pom.xml index 6cae080..eecfae5 100644 --- a/fizz-spring-boot-starter/pom.xml +++ b/fizz-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ fizz-gateway-community com.fizzgate - 2.7.1 + 2.7.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 142c19f..121ca24 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ Dysprosium-SR25 5.3.7.RELEASE 2.2.7.RELEASE - 4.1.89.Final + 4.1.90.Final 4.4.16 2.17.2 1.7.36 @@ -38,7 +38,7 @@ fizz-gateway-community ${project.artifactId} fizz gateway community - 2.7.1 + 2.7.2-SNAPSHOT pom fizz-common @@ -418,7 +418,7 @@ io.netty - netty-tcnative + netty-tcnative-classes ${netty-tcnative.version} @@ -428,7 +428,37 @@ io.netty - netty-tcnative-classes + netty-tcnative-boringssl-static + ${netty-tcnative.version} + linux-x86_64 + + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative.version} + linux-aarch_64 + + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative.version} + osx-x86_64 + + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative.version} + osx-aarch_64 + + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative.version} + windows-x86_64 + + + io.netty + netty-tcnative ${netty-tcnative.version}