diff --git a/src/main/java/we/filter/AbsFlowControlFilter.java b/src/main/java/we/filter/AbsFlowControlFilter.java index 26598db..042d6ef 100644 --- a/src/main/java/we/filter/AbsFlowControlFilter.java +++ b/src/main/java/we/filter/AbsFlowControlFilter.java @@ -42,7 +42,7 @@ import javax.annotation.Resource; public abstract class AbsFlowControlFilter extends ProxyAggrFilter { - private static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class); + protected static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class); protected static final String exceed = " exceed "; protected static final String concurrents = " concurrents "; diff --git a/src/main/java/we/filter/GlobalFlowControlFilter.java b/src/main/java/we/filter/GlobalFlowControlFilter.java index baaef36..cf23ea2 100644 --- a/src/main/java/we/filter/GlobalFlowControlFilter.java +++ b/src/main/java/we/filter/GlobalFlowControlFilter.java @@ -22,7 +22,14 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import we.flume.clients.log4j2appender.LogService; import we.stats.ratelimit.ResourceRateLimitConfig; +import we.util.JacksonUtils; +import we.util.WebUtils; + +import java.util.HashMap; +import java.util.Map; /** * @author hongqiaowei @@ -38,34 +45,66 @@ public class GlobalFlowControlFilter extends AbsFlowControlFilter { public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { if (flowControl) { + + // Map traceMap = new HashMap<>(); + LogService.setBizId(exchange.getRequest().getId()); long currentTimeSlot = flowStat.currentTimeSlotId(); + // traceMap.put("currentTimeSlot", currentTimeSlot); + exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot); ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); if (config.isEnable()) { + // traceMap.put("globalConfig", "enable conns " + config.concurrents + " and incr now"); boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps); if (concurrentOrRpsExceed) { + // traceMap.put("globalConfigExceed", "true"); return generateExceedResponse(exchange, config); } } else { + // traceMap.put("noGlobalConfig", "incr now"); flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null); } + // if (log.isDebugEnabled()) { + // log.debug(JacksonUtils.writeValueAsString(traceMap), LogService.BIZ_ID, exchange.getRequest().getId()); + // } + // StringBuilder b = new StringBuilder(); + // WebUtils.request2stringBuilder(exchange, b); + // b.append('\n'); + long start = System.currentTimeMillis(); exchange.getAttributes().put(AbsFlowControlFilter.start, start); return chain.filter(exchange) - .doOnSuccess( - r -> { - inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true); - } - ) - .doOnError( - t -> { - inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); - } - ) - .doOnCancel( - () -> { - inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); + // .doOnSuccess( + // r -> { + // // b.append(" succ "); + // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true); + // } + // ) + // .doOnError( + // t -> { + // // b.append(" errs "); + // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); + // } + // ) + // .doOnCancel( + // () -> { + // // b.append(" cans "); + // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); + // } + // ) + .doFinally( + s -> { + if (s == SignalType.ON_COMPLETE) { + // b.append(" comps "); + inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true); + } else { + // b.append(" " + s); + inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); + } + // if (log.isDebugEnabled()) { + // log.debug(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); + // } } ); } diff --git a/src/main/java/we/filter/ServiceFlowControlFilter.java b/src/main/java/we/filter/ServiceFlowControlFilter.java index 605e4d2..3164cb6 100644 --- a/src/main/java/we/filter/ServiceFlowControlFilter.java +++ b/src/main/java/we/filter/ServiceFlowControlFilter.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import we.stats.ratelimit.ResourceRateLimitConfig; import we.util.WebUtils; @@ -57,19 +58,28 @@ public class ServiceFlowControlFilter extends AbsFlowControlFilter { long start = exchange.getAttribute(AbsFlowControlFilter.start); return chain.filter(exchange) - .doOnSuccess( - r -> { - inTheEnd(exchange, service, start, currentTimeSlot, true); - } - ) - .doOnError( - t -> { - inTheEnd(exchange, service, start, currentTimeSlot, false); - } - ) - .doOnCancel( - () -> { - inTheEnd(exchange, service, start, currentTimeSlot, false); + // .doOnSuccess( + // r -> { + // inTheEnd(exchange, service, start, currentTimeSlot, true); + // } + // ) + // .doOnError( + // t -> { + // inTheEnd(exchange, service, start, currentTimeSlot, false); + // } + // ) + // .doOnCancel( + // () -> { + // inTheEnd(exchange, service, start, currentTimeSlot, false); + // } + // ) + .doFinally( + s -> { + if (s == SignalType.ON_COMPLETE) { + inTheEnd(exchange, service, start, currentTimeSlot, true); + } else { + inTheEnd(exchange, service, start, currentTimeSlot, false); + } } ); } diff --git a/src/main/java/we/stats/FlowStat.java b/src/main/java/we/stats/FlowStat.java index a1e47cf..e875659 100644 --- a/src/main/java/we/stats/FlowStat.java +++ b/src/main/java/we/stats/FlowStat.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import we.util.Utils; /** * Flow Statistic @@ -132,6 +133,17 @@ public class FlowStat { return; } ResourceStat resourceStat = getResourceStat(resourceId); + + long conns = resourceStat.getConcurrentRequests().get(); + if (conns == 0) { + if (log.isDebugEnabled()) { + StringBuilder b = new StringBuilder(256); + b.append(timeSlotId + " " + resourceId + " conns 0 before decr it").append('\n'); + Utils.threadCurrentStack2stringBuilder(b); + log.debug(b.toString()); + } + } + resourceStat.decrConcurrentRequest(timeSlotId); } @@ -158,6 +170,9 @@ public class FlowStat { resourceStat = resourceStats.get(resourceId); } else { resourceStat = new ResourceStat(resourceId); + if (log.isDebugEnabled()) { + log.debug("no resource stat for " + resourceId + ", create one " + resourceStat); + } ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat); if (rs != null) { resourceStat = rs; diff --git a/src/test/java/we/stats/ratelimit/RateLimitTests.java b/src/test/java/we/stats/ratelimit/RateLimitTests.java index 576c6bd..2689a57 100644 --- a/src/test/java/we/stats/ratelimit/RateLimitTests.java +++ b/src/test/java/we/stats/ratelimit/RateLimitTests.java @@ -52,7 +52,7 @@ public class RateLimitTests { private ConnectionProvider getConnectionProvider() { return ConnectionProvider .builder("flow-control-cp") - .maxConnections(100) + .maxConnections(500) .pendingAcquireTimeout(Duration.ofMillis(6_000)) .maxIdleTime(Duration.ofMillis(40_000)) .build(); @@ -96,12 +96,15 @@ public class RateLimitTests { } @Test - public void flowControlTests() { + public void flowControlTests() throws InterruptedException { WebClient webClient = getWebClient(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100_000; i++) { + // String uri = "http://12.5.3.8:8600/proxy/fizz" + i + "/fntrol/mock"; + String uri = "http://12.5.3.8:8600/proxy/fizz/fntrol/mock" + i; + System.err.println(i); webClient .method(HttpMethod.GET) - .uri("") + .uri(uri) .headers(hdrs -> {}) .body(Mono.just(""), String.class) .exchange().name("") @@ -110,19 +113,22 @@ public class RateLimitTests { .doOnError(t -> { t.printStackTrace(); }) - .timeout(Duration.ofMillis(6_000)) + .timeout(Duration.ofMillis(16_000)) .flatMap( remoteResp -> { remoteResp.bodyToMono(String.class) .doOnSuccess( s -> { - System.out.println(s); + // System.out.println(s); } ); return Mono.empty(); } - ); + ) + .subscribe() + ; } + Thread.currentThread().join(); } @Test @@ -130,12 +136,12 @@ public class RateLimitTests { FlowStat flowStat = new FlowStat(); - long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23); + long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23); boolean success = flowStat.incrRequest("resourceX", incrTime, Long.MAX_VALUE, Long.MAX_VALUE); System.err.println("incrTime: " + incrTime + ", success: " + success); long startTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:41.000", Constants.DatetimePattern.DP23); - long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23); + long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23); List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, endTimeSlot, 3); if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {