Fix flow control problem

This commit is contained in:
hongqiaowei
2023-03-04 11:43:05 +08:00
committed by linwaiwai
parent b6b3bc474a
commit 8c9ba12db6
18 changed files with 231 additions and 178 deletions

View File

@@ -6,7 +6,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
@@ -18,7 +18,7 @@
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version> <spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version> <reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version> <lettuce.version>5.3.7.RELEASE</lettuce.version>
<netty.version>4.1.89.Final</netty.version> <netty.version>4.1.90.Final</netty.version>
<httpcore.version>4.4.16</httpcore.version> <httpcore.version>4.4.16</httpcore.version>
<log4j2.version>2.17.2</log4j2.version> <log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -56,13 +56,19 @@ public class FilterExceptionHandlerConfig {
public static class FilterExceptionHandler implements WebExceptionHandler { public static class FilterExceptionHandler implements WebExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(FilterExceptionHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(FilterExceptionHandler.class);
private static final String filterExceptionHandler = "filterExceptionHandler"; private static final String filterExceptionHandler = "filterExceptionHandler";
@Override @Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable t) { public Mono<Void> handle(ServerWebExchange exchange, Throwable t) {
exchange.getAttributes().put(WebUtils.ORIGINAL_ERROR, t); exchange.getAttributes().put(WebUtils.ORIGINAL_ERROR, t);
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
if (LOGGER.isDebugEnabled()) {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug(Consts.S.EMPTY, t);
}
ServerHttpResponse resp = exchange.getResponse(); ServerHttpResponse resp = exchange.getResponse();
if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {
if (t instanceof ResponseStatusException) { if (t instanceof ResponseStatusException) {
@@ -113,9 +119,8 @@ public class FilterExceptionHandlerConfig {
if (t instanceof FizzRuntimeException) { if (t instanceof FizzRuntimeException) {
FizzRuntimeException ex = (FizzRuntimeException) t; FizzRuntimeException ex = (FizzRuntimeException) t;
// log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(traceId + ' ' + tMsg, ex); LOGGER.error(tMsg, ex);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null; RespEntity rs = null;
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) { if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
@@ -132,9 +137,8 @@ public class FilterExceptionHandlerConfig {
if (fc == null) { // t came from flow control filter if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b); WebUtils.request2stringBuilder(exchange, b);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString(), t); LOGGER.error(b.toString(), t);
String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId); String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes()))); vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));

View File

@@ -51,14 +51,14 @@ public class FizzLogFilter implements WebFilter {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
return chain.filter(exchange).doFinally( return chain.filter(exchange).doFinally(
(c) -> { (c) -> {
if (log.isInfoEnabled()) { if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b); WebUtils.request2stringBuilder(exchange, b);
b.append(resp).append(exchange.getResponse().getStatusCode()) b.append(resp).append(exchange.getResponse().getStatusCode())
.append(in) .append(System.currentTimeMillis() - start); .append(in) .append(System.currentTimeMillis() - start);
// log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange)); // log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.info(b.toString()); log.debug(b.toString());
} }
} }
); );

View File

@@ -17,19 +17,6 @@
package com.fizzgate.filter; package com.fizzgate.filter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import com.fizzgate.config.SystemConfig; import com.fizzgate.config.SystemConfig;
import com.fizzgate.monitor.FizzMonitorService; import com.fizzgate.monitor.FizzMonitorService;
import com.fizzgate.plugin.auth.ApiConfigService; import com.fizzgate.plugin.auth.ApiConfigService;
@@ -44,7 +31,19 @@ import com.fizzgate.stats.degrade.DegradeRule;
import com.fizzgate.stats.ratelimit.ResourceRateLimitConfig; import com.fizzgate.stats.ratelimit.ResourceRateLimitConfig;
import com.fizzgate.stats.ratelimit.ResourceRateLimitConfigService; import com.fizzgate.stats.ratelimit.ResourceRateLimitConfigService;
import com.fizzgate.util.*; import com.fizzgate.util.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType; import reactor.core.publisher.SignalType;
@@ -63,7 +62,7 @@ public class FlowControlFilter extends FizzWebFilter {
public static final String FLOW_CONTROL_FILTER = "flowControlFilter"; public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class); private static final Logger LOGGER = LoggerFactory.getLogger(FlowControlFilter.class);
private static final String admin = "admin"; private static final String admin = "admin";
@@ -160,7 +159,6 @@ public class FlowControlFilter extends FizzWebFilter {
if (!favReq && flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) { if (!favReq && flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) {
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
// LogService.setBizId(traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (!apiConfigService.serviceConfigMap.containsKey(service)) { if (!apiConfigService.serviceConfigMap.containsKey(service)) {
String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId); String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId);
@@ -178,12 +176,10 @@ public class FlowControlFilter extends FizzWebFilter {
}); });
if (result != null && !result.isSuccess()) { if (result != null && !result.isSuccess()) {
long currentTimeMillis = System.currentTimeMillis();
String blockedResourceId = result.getBlockedResourceId(); String blockedResourceId = result.getBlockedResourceId();
if (BlockType.CIRCUIT_BREAK == result.getBlockType()) { if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
fizzMonitorService.alarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null); fizzMonitorService.alarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null);
// log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId); LOGGER.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId);
String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType(); String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType();
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent(); String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
@@ -214,12 +210,10 @@ public class FlowControlFilter extends FizzWebFilter {
} else { } else {
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) { if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents); fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents);
// log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId); LOGGER.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId);
} else { } else {
fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps); fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps);
// log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId); LOGGER.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId);
} }
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE); ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE);
@@ -240,42 +234,50 @@ public class FlowControlFilter extends FizzWebFilter {
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rc.getBytes()))); return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rc.getBytes())));
} }
} else { } else {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
setTraceId(exchange); String finalService = service;
String finalService = service; String finalPath = path;
String finalPath = path; return chain.filter(exchange).doFinally(s -> {
return chain.filter(exchange).doFinally(s -> {
long rt = System.currentTimeMillis() - start; org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST); long rt = System.currentTimeMillis() - start;
HttpStatus statusCode = exchange.getResponse().getStatusCode(); CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST);
Throwable t = exchange.getAttribute(WebUtils.ORIGINAL_ERROR); HttpStatus statusCode = exchange.getResponse().getStatusCode();
if (t instanceof TimeoutException) { Throwable t = exchange.getAttribute(WebUtils.ORIGINAL_ERROR);
statusCode = HttpStatus.GATEWAY_TIMEOUT; if (t instanceof TimeoutException) {
} statusCode = HttpStatus.GATEWAY_TIMEOUT;
// if (s == SignalType.ON_ERROR || statusCode.is4xxClientError() || statusCode.is5xxServerError()) { }
if (s == SignalType.ON_ERROR || statusCode.is5xxServerError()) {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false, statusCode); if (s == SignalType.ON_ERROR || statusCode.is5xxServerError()) {
if (cb != null) { flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false, statusCode);
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat); if (cb != null) {
} cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat);
if (statusCode == HttpStatus.GATEWAY_TIMEOUT) { }
fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage()); if (statusCode == HttpStatus.GATEWAY_TIMEOUT) {
} else if (statusCode.is5xxServerError()) { fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage());
fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value())); } else if (statusCode.is5xxServerError()) {
} else if (s == SignalType.ON_ERROR && t != null) { fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value()));
fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage()); } else if (s == SignalType.ON_ERROR && t != null) {
} fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage());
} else { }
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode); } else {
if (cb != null) { flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode);
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat); if (cb != null) {
} cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat);
} }
}); }
if (s == SignalType.CANCEL) {
ClientResponse remoteResp = exchange.getAttribute("remoteResp");
if (remoteResp != null) {
remoteResp.bodyToMono(Void.class).subscribe();
LOGGER.warn("client cancel, and dispose remote response");
}
}
});
} }
} }
// setTraceId(exchange);
return chain.filter(exchange); return chain.filter(exchange);
} }
@@ -340,15 +342,15 @@ public class FlowControlFilter extends FizzWebFilter {
check = true; check = true;
} }
} }
if (log.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
log.debug("getResourceConfigItselfAndParents:\n" + JacksonUtils.writeValueAsString(rc) + '\n' + JacksonUtils.writeValueAsString(result)); LOGGER.debug("getResourceConfigItselfAndParents:\n" + JacksonUtils.writeValueAsString(rc) + '\n' + JacksonUtils.writeValueAsString(result));
} }
return result; return result;
} }
private List<ResourceConfig> getFlowControlConfigs(String app, String ip, String node, String service, String path) { private List<ResourceConfig> getFlowControlConfigs(String app, String ip, String node, String service, String path) {
if (log.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
log.debug("get flow control configs by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path); LOGGER.debug("get flow control configs by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path);
} }
boolean hasHost = (StringUtils.isNotBlank(node) && !node.equals(ResourceIdUtils.NODE)); boolean hasHost = (StringUtils.isNotBlank(node) && !node.equals(ResourceIdUtils.NODE));
int sz = hasHost ? 10 : 9; int sz = hasHost ? 10 : 9;
@@ -377,8 +379,8 @@ public class FlowControlFilter extends FizzWebFilter {
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, path, null); checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, path, null);
} }
if (log.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
log.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs)); LOGGER.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs));
} }
return resourceConfigs; return resourceConfigs;
} }

View File

@@ -17,6 +17,7 @@
package com.fizzgate.filter; package com.fizzgate.filter;
import com.fizzgate.util.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
@@ -35,10 +36,6 @@ import com.fizzgate.plugin.auth.GatewayGroup;
import com.fizzgate.plugin.auth.GatewayGroupService; import com.fizzgate.plugin.auth.GatewayGroupService;
import com.fizzgate.plugin.stat.StatPluginFilter; import com.fizzgate.plugin.stat.StatPluginFilter;
import com.fizzgate.proxy.Route; import com.fizzgate.proxy.Route;
import com.fizzgate.util.ReactorUtils;
import com.fizzgate.util.Result;
import com.fizzgate.util.ThreadContext;
import com.fizzgate.util.WebUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@@ -54,7 +51,7 @@ import java.util.function.Function;
@Order(10) @Order(10)
public class PreprocessFilter extends FizzWebFilter { public class PreprocessFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(PreprocessFilter.class); private static final Logger LOGGER = LoggerFactory.getLogger(PreprocessFilter.class);
public static final String PREPROCESS_FILTER = "preprocessFilter"; public static final String PREPROCESS_FILTER = "preprocessFilter";
@@ -72,6 +69,12 @@ public class PreprocessFilter extends FizzWebFilter {
@Override @Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("preprocess filter start");
}
Map<String, FilterResult> fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr); Map<String, FilterResult> fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr);
Map<String, String> appendHdrs = new HashMap<>(8); Map<String, String> appendHdrs = new HashMap<>(8);
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc); Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
@@ -85,8 +88,14 @@ public class PreprocessFilter extends FizzWebFilter {
.thenReturn(ReactorUtils.Void) .thenReturn(ReactorUtils.Void)
.flatMap( .flatMap(
v -> { v -> {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
Result<ApiConfig> auth = (Result<ApiConfig>) WebUtils.getFilterResultDataItem(exchange, AuthPluginFilter.AUTH_PLUGIN_FILTER, AuthPluginFilter.RESULT); Result<ApiConfig> auth = (Result<ApiConfig>) WebUtils.getFilterResultDataItem(exchange, AuthPluginFilter.AUTH_PLUGIN_FILTER, AuthPluginFilter.RESULT);
if (auth.code == Result.FAIL) { if (auth.code == Result.FAIL) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("preprocess filter end 403");
}
return WebUtils.responseError(exchange, HttpStatus.FORBIDDEN.value(), auth.msg); return WebUtils.responseError(exchange, HttpStatus.FORBIDDEN.value(), auth.msg);
} }
ApiConfig ac = auth.data; ApiConfig ac = auth.data;

View File

@@ -17,11 +17,22 @@
package com.fizzgate.filter; package com.fizzgate.filter;
import com.fizzgate.config.SystemConfig;
import com.fizzgate.plugin.auth.ApiConfig;
import com.fizzgate.proxy.FizzWebClient;
import com.fizzgate.proxy.Route;
import com.fizzgate.proxy.dubbo.ApacheDubboGenericService;
import com.fizzgate.proxy.dubbo.DubboInterfaceDeclaration;
import com.fizzgate.service_registry.RegistryCenterService;
import com.fizzgate.stats.FlowStat;
import com.fizzgate.stats.ResourceConfig;
import com.fizzgate.util.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -32,22 +43,13 @@ import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain; import org.springframework.web.server.WebFilterChain;
import com.fizzgate.config.SystemConfig;
import com.fizzgate.plugin.auth.ApiConfig;
import com.fizzgate.proxy.FizzWebClient;
import com.fizzgate.proxy.Route;
import com.fizzgate.proxy.dubbo.ApacheDubboGenericService;
import com.fizzgate.proxy.dubbo.DubboInterfaceDeclaration;
import com.fizzgate.service_registry.RegistryCenterService;
import com.fizzgate.util.*;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
/** /**
* @author hongqiaowei * @author hongqiaowei
@@ -57,7 +59,7 @@ import java.util.function.Function;
@Order(Ordered.LOWEST_PRECEDENCE) @Order(Ordered.LOWEST_PRECEDENCE)
public class RouteFilter extends FizzWebFilter { public class RouteFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(RouteFilter.class); private static final Logger LOGGER = LoggerFactory.getLogger(RouteFilter.class);
@Resource @Resource
private FizzWebClient fizzWebClient; private FizzWebClient fizzWebClient;
@@ -68,6 +70,9 @@ public class RouteFilter extends FizzWebFilter {
@Resource @Resource
private SystemConfig systemConfig; private SystemConfig systemConfig;
@Resource
private FlowControlFilter flowControlFilter;
@Override @Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -77,16 +82,13 @@ public class RouteFilter extends FizzWebFilter {
} else { } else {
Mono<Void> resp = WebUtils.getDirectResponse(exchange); Mono<Void> resp = WebUtils.getDirectResponse(exchange);
if (resp == null) { // should not reach here if (resp == null) { // should not reach here
ServerHttpRequest clientReq = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
String msg = traceId + ' ' + pfr.id + " fail"; String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) { if (pfr.cause == null) {
// log.error(msg, LogService.BIZ_ID, traceId); LOGGER.error(msg);
log.error(msg);
} else { } else {
// log.error(msg, LogService.BIZ_ID, traceId, pfr.cause); LOGGER.error(msg, pfr.cause);
log.error(msg, pfr.cause);
} }
HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR; HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR;
if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {
@@ -101,8 +103,13 @@ public class RouteFilter extends FizzWebFilter {
private Mono<Void> doFilter0(ServerWebExchange exchange, WebFilterChain chain) { private Mono<Void> doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest req = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
if (LOGGER.isDebugEnabled()) {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug("route filter start");
}
ServerHttpRequest req = exchange.getRequest();
Route route = exchange.getAttribute(WebUtils.ROUTE); Route route = exchange.getAttribute(WebUtils.ROUTE);
HttpHeaders hdrs = null; HttpHeaders hdrs = null;
@@ -112,32 +119,17 @@ public class RouteFilter extends FizzWebFilter {
} }
if (route == null) { if (route == null) {
/*String pathQuery = WebUtils.getClientReqPathQuery(exchange);
return fizzWebClient.send2service(traceId, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs, req.getBody(), 0, 0, 0)
.flatMap(genServerResponse(exchange));*/
Map.Entry<String, List<String>> pathQueryTemplate = WebUtils.getClientReqPathQueryTemplate(exchange).entrySet().iterator().next(); Map.Entry<String, List<String>> pathQueryTemplate = WebUtils.getClientReqPathQueryTemplate(exchange).entrySet().iterator().next();
return fizzWebClient.send2service(traceId, req.getMethod(), WebUtils.getClientService(exchange), pathQueryTemplate.getKey(), hdrs, req.getBody(), 0, 0, 0, pathQueryTemplate.getValue().toArray(new String[0])) return fizzWebClient.send2service(traceId, req.getMethod(), WebUtils.getClientService(exchange), pathQueryTemplate.getKey(), hdrs, req.getBody(), 0, 0, 0, pathQueryTemplate.getValue().toArray(new String[0]))
.flatMap(genServerResponse(exchange)); .flatMap(genServerResponse(exchange));
} else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) { } else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) {
/*String pathQuery = getBackendPathQuery(req, route);
String svc = RegistryCenterService.getServiceNameSpace(route.registryCenter, route.backendService);
return fizzWebClient.send2service(traceId, route.method, svc, pathQuery, hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval)
.flatMap(genServerResponse(exchange));*/
Map.Entry<String, List<String>> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next(); Map.Entry<String, List<String>> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next();
String svc = RegistryCenterService.getServiceNameSpace(route.registryCenter, route.backendService); String svc = RegistryCenterService.getServiceNameSpace(route.registryCenter, route.backendService);
return fizzWebClient.send2service(traceId, route.method, svc, pathQueryTemplate.getKey(), hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval, pathQueryTemplate.getValue().toArray(new String[0])) return fizzWebClient.send2service(traceId, route.method, svc, pathQueryTemplate.getKey(), hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval, pathQueryTemplate.getValue().toArray(new String[0]))
.flatMap(genServerResponse(exchange)); .flatMap(genServerResponse(exchange));
} else if (route.type == ApiConfig.Type.REVERSE_PROXY) { } else if (route.type == ApiConfig.Type.REVERSE_PROXY) {
/*String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
.append(getBackendPathQuery(req, route))
.toString();
return fizzWebClient.send(traceId, route.method, uri, hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval)
.flatMap(genServerResponse(exchange));*/
Map.Entry<String, List<String>> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next(); Map.Entry<String, List<String>> pathQueryTemplate = getBackendPathQueryTemplate(req, route).entrySet().iterator().next();
String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort) String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
.append(pathQueryTemplate.getKey()) .append(pathQueryTemplate.getKey())
@@ -150,20 +142,6 @@ public class RouteFilter extends FizzWebFilter {
} }
} }
private String getBackendPathQuery(ServerHttpRequest request, Route route) {
String qry = route.query;
if (qry == null) {
MultiValueMap<String, String> queryParams = request.getQueryParams();
if (queryParams.isEmpty()) {
return route.backendPath;
} else {
return route.backendPath + Consts.S.QUESTION + WebUtils.toQueryString(queryParams);
}
} else {
return route.backendPath + Consts.S.QUESTION + qry;
}
}
private Map<String, List<String>> getBackendPathQueryTemplate(ServerHttpRequest request, Route route) { private Map<String, List<String>> getBackendPathQueryTemplate(ServerHttpRequest request, Route route) {
String qry = route.query; String qry = route.query;
if (qry == null) { if (qry == null) {
@@ -183,6 +161,7 @@ public class RouteFilter extends FizzWebFilter {
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) { private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
return remoteResp -> { return remoteResp -> {
String traceId = WebUtils.getTraceId(exchange);
ServerHttpResponse clientResp = exchange.getResponse(); ServerHttpResponse clientResp = exchange.getResponse();
clientResp.setStatusCode(remoteResp.statusCode()); clientResp.setStatusCode(remoteResp.statusCode());
HttpHeaders clientRespHeaders = clientResp.getHeaders(); HttpHeaders clientRespHeaders = clientResp.getHeaders();
@@ -203,24 +182,36 @@ public class RouteFilter extends FizzWebFilter {
} }
} }
); );
if (log.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
String traceId = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(traceId, remoteResp, b); WebUtils.response2stringBuilder(traceId, remoteResp, b);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString()); LOGGER.debug(b.toString());
} }
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers())) return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp)); .doOnError(
t -> {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
exchange.getAttributes().put("remoteResp", remoteResp);
LOGGER.error("response client error", t);
}
)
.doOnCancel(
() -> {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
exchange.getAttributes().put("remoteResp", remoteResp);
LOGGER.error("client signal cancel");
// cleanup(remoteResp);
}
);
}; };
} }
private void cleanup(ClientResponse clientResponse) { // private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) { // if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe(); // clientResponse.bodyToMono(Void.class).subscribe();
} // }
} // }
private Mono<Void> dubboRpc(ServerWebExchange exchange, Route route) { private Mono<Void> dubboRpc(ServerWebExchange exchange, Route route) {
final String[] ls = {null}; final String[] ls = {null};
@@ -272,9 +263,8 @@ public class RouteFilter extends FizzWebFilter {
if (ls[0] != null) { if (ls[0] != null) {
b.append('\n').append(ls[0]); b.append('\n').append(ls[0]);
} }
// log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.error(b.toString(), t); LOGGER.error(b.toString(), t);
} }
) )
; ;

View File

@@ -17,6 +17,9 @@
package com.fizzgate.plugin; package com.fizzgate.plugin;
import com.fizzgate.util.Consts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain; import org.springframework.web.server.WebFilterChain;
@@ -38,6 +41,8 @@ import java.util.Map;
public final class FizzPluginFilterChain { public final class FizzPluginFilterChain {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzPluginFilterChain.class);
private static final String pluginConfigsIt = "pcsit@"; private static final String pluginConfigsIt = "pcsit@";
public static final String WEB_FILTER_CHAIN = "wfc@"; public static final String WEB_FILTER_CHAIN = "wfc@";
@@ -62,6 +67,13 @@ public final class FizzPluginFilterChain {
if (it.hasNext()) { if (it.hasNext()) {
PluginConfig pc = it.next(); PluginConfig pc = it.next();
FizzPluginFilter pf = Fizz.context.getBean(pc.plugin, FizzPluginFilter.class); FizzPluginFilter pf = Fizz.context.getBean(pc.plugin, FizzPluginFilter.class);
String traceId = WebUtils.getTraceId(exchange);
if (LOGGER.isDebugEnabled()) {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug("{} start", pc.plugin);
}
Mono<Void> m = pf.filter(exchange, pc.config); Mono<Void> m = pf.filter(exchange, pc.config);
if (pf instanceof PluginFilter) { if (pf instanceof PluginFilter) {
boolean f = false; boolean f = false;

View File

@@ -43,7 +43,7 @@ import java.util.Map;
@Deprecated @Deprecated
public abstract class PluginFilter implements FizzPluginFilter { public abstract class PluginFilter implements FizzPluginFilter {
private static final Logger log = LoggerFactory.getLogger(PluginFilter.class); private final Logger LOGGER = LoggerFactory.getLogger(getClass());
@Override @Override
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) { public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
@@ -55,9 +55,8 @@ public abstract class PluginFilter implements FizzPluginFilter {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange); FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId); ThreadContext.put(Consts.TRACE_ID, traceId);
if (log.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
// log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId); LOGGER.debug("{} execute {}", pfr.id, pfr.success ? "success" : "fail");
log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"));
} }
if (pfr.success) { if (pfr.success) {
return doFilter(exchange, config, fixedConfig); return doFilter(exchange, config, fixedConfig);
@@ -65,9 +64,9 @@ public abstract class PluginFilter implements FizzPluginFilter {
if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here
String msg = traceId + ' ' + pfr.id + " fail"; String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) { if (pfr.cause == null) {
log.error(msg); LOGGER.error(msg);
} else { } else {
log.error(msg, pfr.cause); LOGGER.error(msg, pfr.cause);
} }
HttpStatus s = HttpStatus.OK; HttpStatus s = HttpStatus.OK;
if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {

View File

@@ -56,7 +56,6 @@ public class AuthPluginFilter extends PluginFilter {
r -> { r -> {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
// log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId);
ThreadContext.put(Consts.TRACE_ID, traceId); ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("{} req auth: {}", traceId, r); log.debug("{} req auth: {}", traceId, r);
} }

View File

@@ -46,13 +46,19 @@ import java.util.Map;
@Component(RequestBodyPlugin.REQUEST_BODY_PLUGIN) @Component(RequestBodyPlugin.REQUEST_BODY_PLUGIN)
public class RequestBodyPlugin implements FizzPluginFilter { public class RequestBodyPlugin implements FizzPluginFilter {
private static final Logger log = LoggerFactory.getLogger(RequestBodyPlugin.class); private final Logger LOGGER = LoggerFactory.getLogger(getClass());
public static final String REQUEST_BODY_PLUGIN = "requestBodyPlugin"; public static final String REQUEST_BODY_PLUGIN = "requestBodyPlugin";
@Override @Override
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) { public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
// String traceId = WebUtils.getTraceId(exchange);
// if (LOGGER.isDebugEnabled()) {
// org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
// LOGGER.debug("{} start", getClass().getSimpleName());
// }
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
if (req instanceof FizzServerHttpRequestDecorator) { if (req instanceof FizzServerHttpRequestDecorator) {
return doFilter(exchange, config); return doFilter(exchange, config);
@@ -76,18 +82,21 @@ public class RequestBodyPlugin implements FizzPluginFilter {
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) { if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
newExchange = new FizzServerWebExchangeDecorator(mutatedExchange); newExchange = new FizzServerWebExchangeDecorator(mutatedExchange);
} }
if (log.isDebugEnabled()) { // if (LOGGER.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange); // ThreadContext.put(Consts.TRACE_ID, traceId);
// log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId); // LOGGER.debug("{} request is decorated", traceId);
ThreadContext.put(Consts.TRACE_ID, traceId); // }
log.debug("{} request is decorated", traceId);
}
return doFilter(newExchange, config); return doFilter(newExchange, config);
} }
); );
} }
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config) { public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config) {
String traceId = WebUtils.getTraceId(exchange);
if (LOGGER.isDebugEnabled()) {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug("{} end", getClass().getSimpleName());
}
return FizzPluginFilterChain.next(exchange); return FizzPluginFilterChain.next(exchange);
} }
} }

View File

@@ -90,6 +90,7 @@ public class StatPluginFilter extends PluginFilter {
accessStat.reqTime = System.currentTimeMillis(); accessStat.reqTime = System.currentTimeMillis();
accessStat.reqs++; accessStat.reqs++;
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19)); LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19));
} }
} }

View File

@@ -220,7 +220,6 @@ public class FizzWebClient {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b); WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString()); log.debug(b.toString());
} }
@@ -243,10 +242,6 @@ public class FizzWebClient {
} }
setHostHeader(uri, hdrs); setHostHeader(uri, hdrs);
if (systemConfig.isFizzWebClientXForwardedForEnable()) { if (systemConfig.isFizzWebClientXForwardedForEnable()) {
List<String> values = hdrs.get(X_FORWARDED_FOR);
/* if (CollectionUtils.isEmpty(values)) {
hdrs.add(X_FORWARDED_FOR, WebUtils.getOriginIp(null));
} */
if (systemConfig.isFizzWebClientXForwardedForAppendGatewayIp()) { if (systemConfig.isFizzWebClientXForwardedForAppendGatewayIp()) {
hdrs.add(X_FORWARDED_FOR, NetworkUtils.getServerIp()); hdrs.add(X_FORWARDED_FOR, NetworkUtils.getServerIp());
} }
@@ -269,8 +264,9 @@ public class FizzWebClient {
Mono<ClientResponse> cr = req.exchange(); Mono<ClientResponse> cr = req.exchange();
if (timeout == 0) { if (timeout == 0) {
if (systemConfig.getRouteTimeout() != 0) { long systemConfigRouteTimeout = systemConfig.getRouteTimeout();
timeout = systemConfig.getRouteTimeout(); if (systemConfigRouteTimeout != 0) {
timeout = systemConfigRouteTimeout;
} }
} }
if (timeout > 0) { if (timeout > 0) {

View File

@@ -17,8 +17,20 @@
package com.fizzgate.stats; package com.fizzgate.stats;
import java.util.*; import com.fizzgate.stats.circuitbreaker.CircuitBreakManager;
import com.fizzgate.stats.circuitbreaker.CircuitBreaker;
import com.fizzgate.util.Consts;
import com.fizzgate.util.ResourceIdUtils;
import com.fizzgate.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ServerWebExchange;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -26,17 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ServerWebExchange;
import com.fizzgate.stats.circuitbreaker.CircuitBreakManager;
import com.fizzgate.stats.circuitbreaker.CircuitBreaker;
import com.fizzgate.util.ResourceIdUtils;
import com.fizzgate.util.WebUtils;
/** /**
* Flow Statistic * Flow Statistic
@@ -524,7 +526,7 @@ public class FlowStat {
long slotInterval = slotIntervalInSec * 1000; long slotInterval = slotIntervalInSec * 1000;
if (resourceId == null) { if (resourceId == null) {
Set<Map.Entry<String, ResourceStat>> entrys = resourceStats.entrySet(); Set<Entry<String, ResourceStat>> entrys = resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) { for (Entry<String, ResourceStat> entry : entrys) {
String rid = entry.getKey(); String rid = entry.getKey();
ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid); ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid);
@@ -585,7 +587,7 @@ public class FlowStat {
} }
}*/ }*/
for (long i = lastSlotId; i < slotId;) { for (long i = lastSlotId; i < slotId;) {
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet(); Set<Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) { for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey(); String resourceId = entry.getKey();
ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots(); ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots();
@@ -649,7 +651,7 @@ public class FlowStat {
long curTimeSlotId = stat.currentTimeSlotId(); long curTimeSlotId = stat.currentTimeSlotId();
if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) { if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) {
// log.debug("PeakConcurrentJob start"); // log.debug("PeakConcurrentJob start");
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet(); Set<Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) { for (Entry<String, ResourceStat> entry : entrys) {
String resource = entry.getKey(); String resource = entry.getKey();
// log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId, // log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

38
pom.xml
View File

@@ -10,7 +10,7 @@
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version> <reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version> <lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version> <nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version>
<netty.version>4.1.89.Final</netty.version> <netty.version>4.1.90.Final</netty.version>
<httpcore.version>4.4.16</httpcore.version> <httpcore.version>4.4.16</httpcore.version>
<log4j2.version>2.17.2</log4j2.version> <log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>
@@ -38,7 +38,7 @@
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description>fizz gateway community</description> <description>fizz gateway community</description>
<version>2.7.1</version> <version>2.7.2-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>fizz-common</module> <module>fizz-common</module>
@@ -418,7 +418,7 @@
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId> <artifactId>netty-tcnative-classes</artifactId>
<version>${netty-tcnative.version}</version> <version>${netty-tcnative.version}</version>
</dependency> </dependency>
<dependency> <dependency>
@@ -428,7 +428,37 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-tcnative-classes</artifactId> <artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative.version}</version>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative.version}</version>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative.version}</version>
<classifier>osx-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative.version}</version>
<classifier>windows-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>${netty-tcnative.version}</version> <version>${netty-tcnative.version}</version>
</dependency> </dependency>