fix: handle case which resource may not decr in some error

This commit is contained in:
hongqiaowei
2021-01-09 20:41:44 +08:00
parent 2ecc63642a
commit c27235e9ef

View File

@@ -77,10 +77,8 @@ public class FlowControlFilter extends ProxyAggrFilter {
long currentTimeSlot = flowStat.currentTimeSlotId(); long currentTimeSlot = flowStat.currentTimeSlotId();
ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ResourceRateLimitConfig globalConfig = rlc; ResourceRateLimitConfig globalConfig = rlc;
long start = System.currentTimeMillis();
boolean incr = false;
boolean concurrentOrRpsExceed = false; boolean concurrentOrRpsExceed = false;
try {
if (rlc != null && rlc.isEnable()) { if (rlc != null && rlc.isEnable()) {
concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps); concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps);
} }
@@ -114,8 +112,6 @@ try {
log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
} }
incr = true;
if (concurrentOrRpsExceed) { if (concurrentOrRpsExceed) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
@@ -130,10 +126,10 @@ incr = true;
} else { } else {
long start = System.currentTimeMillis();
return chain.filter(exchange) return chain.filter(exchange)
.doAfterTerminate( .doOnSuccess(
() -> { r -> {
inTheEnd(exchange, start, currentTimeSlot, true); inTheEnd(exchange, start, currentTimeSlot, true);
} }
) )
@@ -141,15 +137,17 @@ incr = true;
t -> { t -> {
inTheEnd(exchange, start, currentTimeSlot, false); inTheEnd(exchange, start, currentTimeSlot, false);
} }
).doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
)
.doOnDiscard(
Object.class,
o -> { inTheEnd(exchange, start, currentTimeSlot, false); }
); );
} }
} }
catch (Throwable t) {
if (incr) {
inTheEnd(exchange, start, currentTimeSlot, false);
}
}
}
return chain.filter(exchange); return chain.filter(exchange);
} }