optimize: scheduler executors initialization
This commit is contained in:
@@ -99,6 +99,8 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
|
||||
private long startTimeSlot = 0;
|
||||
|
||||
private Map<String, AtomicLong> key2totalBlockMap = new HashMap<>();
|
||||
|
||||
@Scheduled(cron = "${flow-stat-sched.cron}")
|
||||
public void sched() {
|
||||
|
||||
@@ -118,11 +120,11 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, AtomicLong> key2TotalBlockMap = new HashMap<>(8);
|
||||
key2totalBlockMap.clear();
|
||||
resourceTimeWindowStats.forEach(rtws -> {
|
||||
List<TimeWindowStat> wins = rtws.getWindows();
|
||||
wins.forEach(w -> {
|
||||
AtomicLong totalBlock = key2TotalBlockMap.computeIfAbsent(String.format("%s%s",
|
||||
AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s",
|
||||
ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0));
|
||||
totalBlock.addAndGet(w.getBlockRequests());
|
||||
});
|
||||
@@ -154,7 +156,7 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
qps = rps.doubleValue();
|
||||
}
|
||||
|
||||
AtomicLong totalBlock = key2TotalBlockMap.get(String.format("%s%s", resource, winStart));
|
||||
AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
|
||||
Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
|
||||
|
||||
b.append(Constants.Symbol.LEFT_BRACE);
|
||||
@@ -176,7 +178,7 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
b.append(Constants.Symbol.RIGHT_BRACE);
|
||||
String msg = b.toString();
|
||||
if ("kafka".equals(dest)) {
|
||||
log.info(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
|
||||
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
|
||||
} else {
|
||||
rt.convertAndSend(queue, msg).subscribe();
|
||||
}
|
||||
|
||||
@@ -66,10 +66,8 @@ public class FlowControlController {
|
||||
try {
|
||||
FlowStat flowStat = flowControlFilter.getFlowStat();
|
||||
long currentTimeSlot = flowStat.currentTimeSlotId();
|
||||
// recent = 30;
|
||||
long startTimeSlot = currentTimeSlot - recent * 1000;
|
||||
TimeWindowStat timeWindowStat = null;
|
||||
// System.err.println("startTimeSlot: " + startTimeSlot + ", currentTimeSlot: " + currentTimeSlot + ", recent: " + recent);
|
||||
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
|
||||
if (wins == null || wins.isEmpty()) {
|
||||
result.put("rps", 0);
|
||||
|
||||
@@ -115,7 +115,9 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
if (concurrentOrRpsExceed) {
|
||||
|
||||
if (rlc.type == ResourceRateLimitConfig.Type.API || rlc.type == ResourceRateLimitConfig.Type.SERVICE || rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT) {
|
||||
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
|
||||
if (globalConfig != null && globalConfig.isEnable()) {
|
||||
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
|
||||
}
|
||||
}
|
||||
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
@@ -146,10 +148,6 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
() -> {
|
||||
inTheEnd(exchange, start, currentTimeSlot, false);
|
||||
}
|
||||
)
|
||||
.doOnDiscard(
|
||||
Object.class,
|
||||
o -> { inTheEnd(exchange, start, currentTimeSlot, false); }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user