update: 210109-02
This commit is contained in:
@@ -126,7 +126,6 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
}
|
||||
long recentEndTimeSlot = currentTimeSlot - interval * 1000;
|
||||
long startTimeSlot = recentEndTimeSlot - 10 * 1000;
|
||||
// System.err.println(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot));
|
||||
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, recentEndTimeSlot, 10);
|
||||
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
|
||||
log.info(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot) + " no flow stat data");
|
||||
@@ -136,7 +135,8 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
resourceTimeWindowStats.forEach(
|
||||
rtws -> {
|
||||
String resource = rtws.getResourceId();
|
||||
int id = resourceRateLimitConfigService.getResourceRateLimitConfig(resource).id;
|
||||
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
|
||||
int id = (config == null ? 0 : config.id);
|
||||
int type;
|
||||
if (ResourceRateLimitConfig.GLOBAL.equals(resource)) {
|
||||
type = ResourceRateLimitConfig.Type.GLOBAL;
|
||||
|
||||
@@ -72,6 +72,8 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
|
||||
if (flowControl) {
|
||||
String service = WebUtils.getClientService(exchange);
|
||||
String reqPath = WebUtils.getClientReqPath(exchange);
|
||||
long currentTimeSlot = flowStat.currentTimeSlotId();
|
||||
ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
|
||||
ResourceRateLimitConfig globalConfig = rlc;
|
||||
@@ -82,10 +84,8 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
}
|
||||
|
||||
if (!concurrentOrRpsExceed) {
|
||||
String reqPath = WebUtils.getClientReqPath(exchange);
|
||||
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath);
|
||||
if (rlc == null) {
|
||||
String service = WebUtils.getClientService(exchange);
|
||||
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
|
||||
if (rlc == null) {
|
||||
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
|
||||
@@ -98,15 +98,19 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
}
|
||||
} else { // should not reach here for now
|
||||
concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps);
|
||||
if (!concurrentOrRpsExceed) {
|
||||
flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
if (rlc == null || !rlc.isEnable()) {
|
||||
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
// flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
} else {
|
||||
log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
|
||||
}
|
||||
if (rlc == null) {
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
if (concurrentOrRpsExceed) {
|
||||
|
||||
@@ -123,32 +127,34 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
} else {
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ResourceRateLimitConfig rlcCopy = rlc;
|
||||
return chain.filter(exchange)
|
||||
.doAfterTerminate(
|
||||
() -> {
|
||||
inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, true);
|
||||
inTheEnd(exchange, start, currentTimeSlot, true);
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
t -> {
|
||||
inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, false);
|
||||
inTheEnd(exchange, start, currentTimeSlot, false);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
private void inTheEnd(long start, ResourceRateLimitConfig globalConfig, ResourceRateLimitConfig apiOrServiceConfig, long currentTimeSlot, boolean success) {
|
||||
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
|
||||
long spend = System.currentTimeMillis() - start;
|
||||
if (globalConfig != null && globalConfig.isEnable()) {
|
||||
flowStat.decrConcurrentRequest(globalConfig.resource, currentTimeSlot);
|
||||
flowStat.addRequestRT(globalConfig.resource, currentTimeSlot, spend, success);
|
||||
}
|
||||
if (apiOrServiceConfig.type != ResourceRateLimitConfig.Type.GLOBAL && apiOrServiceConfig.type != ResourceRateLimitConfig.Type.SERVICE_DEFAULT) {
|
||||
flowStat.decrConcurrentRequest(apiOrServiceConfig.resource, currentTimeSlot);
|
||||
flowStat.addRequestRT(apiOrServiceConfig.resource, currentTimeSlot, spend, success);
|
||||
}
|
||||
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
|
||||
flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
|
||||
|
||||
String service = WebUtils.getClientService(exchange);
|
||||
flowStat.decrConcurrentRequest(service, currentTimeSlot);
|
||||
flowStat.addRequestRT(service, currentTimeSlot, spend, success);
|
||||
|
||||
// String reqPath = WebUtils.getClientReqPath(exchange);
|
||||
// flowStat.decrConcurrentRequest(reqPath, currentTimeSlot);
|
||||
// flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user