diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java index cd82a50..9141a7c 100644 --- a/src/main/java/we/config/FlowStatSchedConfig.java +++ b/src/main/java/we/config/FlowStatSchedConfig.java @@ -99,6 +99,8 @@ public class FlowStatSchedConfig extends SchedConfig { private long startTimeSlot = 0; + private Map key2totalBlockMap = new HashMap<>(); + @Scheduled(cron = "${flow-stat-sched.cron}") public void sched() { @@ -118,11 +120,11 @@ public class FlowStatSchedConfig extends SchedConfig { return; } - Map key2TotalBlockMap = new HashMap<>(8); + key2totalBlockMap.clear(); resourceTimeWindowStats.forEach(rtws -> { List wins = rtws.getWindows(); wins.forEach(w -> { - AtomicLong totalBlock = key2TotalBlockMap.computeIfAbsent(String.format("%s%s", + AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s", ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0)); totalBlock.addAndGet(w.getBlockRequests()); }); @@ -154,7 +156,7 @@ public class FlowStatSchedConfig extends SchedConfig { qps = rps.doubleValue(); } - AtomicLong totalBlock = key2TotalBlockMap.get(String.format("%s%s", resource, winStart)); + AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart)); Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests(); b.append(Constants.Symbol.LEFT_BRACE); @@ -176,7 +178,7 @@ public class FlowStatSchedConfig extends SchedConfig { b.append(Constants.Symbol.RIGHT_BRACE); String msg = b.toString(); if ("kafka".equals(dest)) { - log.info(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); + log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); } else { rt.convertAndSend(queue, msg).subscribe(); } diff --git a/src/main/java/we/controller/FlowControlController.java b/src/main/java/we/controller/FlowControlController.java index 05d1353..0a1ae75 100644 --- a/src/main/java/we/controller/FlowControlController.java +++ b/src/main/java/we/controller/FlowControlController.java @@ -66,10 +66,8 @@ public class FlowControlController { try { FlowStat flowStat = flowControlFilter.getFlowStat(); long currentTimeSlot = flowStat.currentTimeSlotId(); - // recent = 30; long startTimeSlot = currentTimeSlot - recent * 1000; TimeWindowStat timeWindowStat = null; - // System.err.println("startTimeSlot: " + startTimeSlot + ", currentTimeSlot: " + currentTimeSlot + ", recent: " + recent); List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent); if (wins == null || wins.isEmpty()) { result.put("rps", 0); diff --git a/src/main/java/we/filter/FlowControlFilter.java b/src/main/java/we/filter/FlowControlFilter.java index d892ab0..48f634c 100644 --- a/src/main/java/we/filter/FlowControlFilter.java +++ b/src/main/java/we/filter/FlowControlFilter.java @@ -115,7 +115,9 @@ public class FlowControlFilter extends ProxyAggrFilter { if (concurrentOrRpsExceed) { if (rlc.type == ResourceRateLimitConfig.Type.API || rlc.type == ResourceRateLimitConfig.Type.SERVICE || rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT) { - flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); + if (globalConfig != null && globalConfig.isEnable()) { + flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); + } } StringBuilder b = ThreadContext.getStringBuilder(); @@ -146,10 +148,6 @@ public class FlowControlFilter extends ProxyAggrFilter { () -> { inTheEnd(exchange, start, currentTimeSlot, false); } - ) - .doOnDiscard( - Object.class, - o -> { inTheEnd(exchange, start, currentTimeSlot, false); } ); } }