diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java index 49c9c5d..18b63d1 100644 --- a/src/main/java/we/config/FlowStatSchedConfig.java +++ b/src/main/java/we/config/FlowStatSchedConfig.java @@ -126,7 +126,6 @@ public class FlowStatSchedConfig extends SchedConfig { } long recentEndTimeSlot = currentTimeSlot - interval * 1000; long startTimeSlot = recentEndTimeSlot - 10 * 1000; - // System.err.println(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot)); List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, recentEndTimeSlot, 10); if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) { log.info(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot) + " no flow stat data"); @@ -136,7 +135,8 @@ public class FlowStatSchedConfig extends SchedConfig { resourceTimeWindowStats.forEach( rtws -> { String resource = rtws.getResourceId(); - int id = resourceRateLimitConfigService.getResourceRateLimitConfig(resource).id; + ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource); + int id = (config == null ? 0 : config.id); int type; if (ResourceRateLimitConfig.GLOBAL.equals(resource)) { type = ResourceRateLimitConfig.Type.GLOBAL; diff --git a/src/main/java/we/filter/FlowControlFilter.java b/src/main/java/we/filter/FlowControlFilter.java index bfb6a95..4eb06cf 100644 --- a/src/main/java/we/filter/FlowControlFilter.java +++ b/src/main/java/we/filter/FlowControlFilter.java @@ -72,6 +72,8 @@ public class FlowControlFilter extends ProxyAggrFilter { public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { if (flowControl) { + String service = WebUtils.getClientService(exchange); + String reqPath = WebUtils.getClientReqPath(exchange); long currentTimeSlot = flowStat.currentTimeSlotId(); ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); ResourceRateLimitConfig globalConfig = rlc; @@ -82,10 +84,8 @@ public class FlowControlFilter extends ProxyAggrFilter { } if (!concurrentOrRpsExceed) { - String reqPath = WebUtils.getClientReqPath(exchange); rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath); if (rlc == null) { - String service = WebUtils.getClientService(exchange); rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service); if (rlc == null) { rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); @@ -98,15 +98,19 @@ public class FlowControlFilter extends ProxyAggrFilter { } } else { // should not reach here for now concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps); + if (!concurrentOrRpsExceed) { + flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); + } } } - if (log.isDebugEnabled()) { + if (rlc == null || !rlc.isEnable()) { + flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); + flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); + // flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); + } else { log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); } - if (rlc == null) { - return chain.filter(exchange); - } if (concurrentOrRpsExceed) { @@ -123,32 +127,34 @@ public class FlowControlFilter extends ProxyAggrFilter { } else { long start = System.currentTimeMillis(); - ResourceRateLimitConfig rlcCopy = rlc; return chain.filter(exchange) .doAfterTerminate( () -> { - inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, true); + inTheEnd(exchange, start, currentTimeSlot, true); } ) .doOnError( t -> { - inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, false); + inTheEnd(exchange, start, currentTimeSlot, false); } ); } } + return chain.filter(exchange); } - private void inTheEnd(long start, ResourceRateLimitConfig globalConfig, ResourceRateLimitConfig apiOrServiceConfig, long currentTimeSlot, boolean success) { + private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { long spend = System.currentTimeMillis() - start; - if (globalConfig != null && globalConfig.isEnable()) { - flowStat.decrConcurrentRequest(globalConfig.resource, currentTimeSlot); - flowStat.addRequestRT(globalConfig.resource, currentTimeSlot, spend, success); - } - if (apiOrServiceConfig.type != ResourceRateLimitConfig.Type.GLOBAL && apiOrServiceConfig.type != ResourceRateLimitConfig.Type.SERVICE_DEFAULT) { - flowStat.decrConcurrentRequest(apiOrServiceConfig.resource, currentTimeSlot); - flowStat.addRequestRT(apiOrServiceConfig.resource, currentTimeSlot, spend, success); - } + flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); + flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success); + + String service = WebUtils.getClientService(exchange); + flowStat.decrConcurrentRequest(service, currentTimeSlot); + flowStat.addRequestRT(service, currentTimeSlot, spend, success); + + // String reqPath = WebUtils.getClientReqPath(exchange); + // flowStat.decrConcurrentRequest(reqPath, currentTimeSlot); + // flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success); } }