fix: global concurrents -1

This commit is contained in:
hongqiaowei
2021-01-15 10:56:18 +08:00
parent 7745c63ce3
commit 5843dcc651
5 changed files with 106 additions and 36 deletions

View File

@@ -42,7 +42,7 @@ import javax.annotation.Resource;
public abstract class AbsFlowControlFilter extends ProxyAggrFilter {
private static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class);
protected static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class);
protected static final String exceed = " exceed ";
protected static final String concurrents = " concurrents ";

View File

@@ -22,7 +22,14 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.flume.clients.log4j2appender.LogService;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.JacksonUtils;
import we.util.WebUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @author hongqiaowei
@@ -38,34 +45,66 @@ public class GlobalFlowControlFilter extends AbsFlowControlFilter {
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
// Map<String, Object> traceMap = new HashMap<>();
LogService.setBizId(exchange.getRequest().getId());
long currentTimeSlot = flowStat.currentTimeSlotId();
// traceMap.put("currentTimeSlot", currentTimeSlot);
exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
if (config.isEnable()) {
// traceMap.put("globalConfig", "enable conns " + config.concurrents + " and incr now");
boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed) {
// traceMap.put("globalConfigExceed", "true");
return generateExceedResponse(exchange, config);
}
} else {
// traceMap.put("noGlobalConfig", "incr now");
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
}
// if (log.isDebugEnabled()) {
// log.debug(JacksonUtils.writeValueAsString(traceMap), LogService.BIZ_ID, exchange.getRequest().getId());
// }
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append('\n');
long start = System.currentTimeMillis();
exchange.getAttributes().put(AbsFlowControlFilter.start, start);
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
// .doOnSuccess(
// r -> {
// // b.append(" succ ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// // b.append(" errs ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// // b.append(" cans ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
// }
// )
.doFinally(
s -> {
if (s == SignalType.ON_COMPLETE) {
// b.append(" comps ");
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
} else {
// b.append(" " + s);
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
}
// if (log.isDebugEnabled()) {
// log.debug(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
// }
}
);
}

View File

@@ -22,6 +22,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.WebUtils;
@@ -57,19 +58,28 @@ public class ServiceFlowControlFilter extends AbsFlowControlFilter {
long start = exchange.getAttribute(AbsFlowControlFilter.start);
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, service, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, service, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, service, start, currentTimeSlot, false);
// .doOnSuccess(
// r -> {
// inTheEnd(exchange, service, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// inTheEnd(exchange, service, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// inTheEnd(exchange, service, start, currentTimeSlot, false);
// }
// )
.doFinally(
s -> {
if (s == SignalType.ON_COMPLETE) {
inTheEnd(exchange, service, start, currentTimeSlot, true);
} else {
inTheEnd(exchange, service, start, currentTimeSlot, false);
}
}
);
}

View File

@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import we.util.Utils;
/**
* Flow Statistic
@@ -132,6 +133,17 @@ public class FlowStat {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
long conns = resourceStat.getConcurrentRequests().get();
if (conns == 0) {
if (log.isDebugEnabled()) {
StringBuilder b = new StringBuilder(256);
b.append(timeSlotId + " " + resourceId + " conns 0 before decr it").append('\n');
Utils.threadCurrentStack2stringBuilder(b);
log.debug(b.toString());
}
}
resourceStat.decrConcurrentRequest(timeSlotId);
}
@@ -158,6 +170,9 @@ public class FlowStat {
resourceStat = resourceStats.get(resourceId);
} else {
resourceStat = new ResourceStat(resourceId);
if (log.isDebugEnabled()) {
log.debug("no resource stat for " + resourceId + ", create one " + resourceStat);
}
ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat);
if (rs != null) {
resourceStat = rs;

View File

@@ -52,7 +52,7 @@ public class RateLimitTests {
private ConnectionProvider getConnectionProvider() {
return ConnectionProvider
.builder("flow-control-cp")
.maxConnections(100)
.maxConnections(500)
.pendingAcquireTimeout(Duration.ofMillis(6_000))
.maxIdleTime(Duration.ofMillis(40_000))
.build();
@@ -96,12 +96,15 @@ public class RateLimitTests {
}
@Test
public void flowControlTests() {
public void flowControlTests() throws InterruptedException {
WebClient webClient = getWebClient();
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 100_000; i++) {
// String uri = "http://12.5.3.8:8600/proxy/fizz" + i + "/fntrol/mock";
String uri = "http://12.5.3.8:8600/proxy/fizz/fntrol/mock" + i;
System.err.println(i);
webClient
.method(HttpMethod.GET)
.uri("")
.uri(uri)
.headers(hdrs -> {})
.body(Mono.just(""), String.class)
.exchange().name("")
@@ -110,19 +113,22 @@ public class RateLimitTests {
.doOnError(t -> {
t.printStackTrace();
})
.timeout(Duration.ofMillis(6_000))
.timeout(Duration.ofMillis(16_000))
.flatMap(
remoteResp -> {
remoteResp.bodyToMono(String.class)
.doOnSuccess(
s -> {
System.out.println(s);
// System.out.println(s);
}
);
return Mono.empty();
}
);
)
.subscribe()
;
}
Thread.currentThread().join();
}
@Test
@@ -130,12 +136,12 @@ public class RateLimitTests {
FlowStat flowStat = new FlowStat();
long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23);
long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23);
boolean success = flowStat.incrRequest("resourceX", incrTime, Long.MAX_VALUE, Long.MAX_VALUE);
System.err.println("incrTime: " + incrTime + ", success: " + success);
long startTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:41.000", Constants.DatetimePattern.DP23);
long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23);
long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23);
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, endTimeSlot, 3);
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {