From d81f178b35e08cb6cd5f00f5d8737107c7a3a0aa Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Thu, 14 Jan 2021 13:34:59 +0800 Subject: [PATCH] refactor: flow control filters v1 --- .../java/we/config/FlowStatSchedConfig.java | 11 +- .../we/controller/FlowControlController.java | 13 +- .../java/we/filter/FlowControlFilter.java | 396 ++++++++++-------- .../we/filter/GlobalFlowControlFilter.java | 106 +++++ .../we/filter/ServiceFlowControlFilter.java | 108 +++++ src/main/java/we/stats/FlowStat.java | 16 + src/main/java/we/stats/ResourceStat.java | 10 + 7 files changed, 474 insertions(+), 186 deletions(-) create mode 100644 src/main/java/we/filter/GlobalFlowControlFilter.java create mode 100644 src/main/java/we/filter/ServiceFlowControlFilter.java diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java index d9b4196..50077ec 100644 --- a/src/main/java/we/config/FlowStatSchedConfig.java +++ b/src/main/java/we/config/FlowStatSchedConfig.java @@ -26,7 +26,8 @@ import org.springframework.context.annotation.DependsOn; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; -import we.filter.FlowControlFilter; +// import we.filter.FlowControlFilter; +import we.filter.GlobalFlowControlFilter; import we.flume.clients.log4j2appender.LogService; import we.stats.FlowStat; import we.stats.ResourceTimeWindowStat; @@ -51,7 +52,7 @@ import java.util.concurrent.atomic.AtomicLong; @Configuration // @ConditionalOnProperty(name="flowControl",havingValue = "true") -@DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER) +@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @EnableScheduling // @ConfigurationProperties(prefix = "flow-stat-sched") public class FlowStatSchedConfig extends SchedConfig { @@ -78,8 +79,8 @@ public class FlowStatSchedConfig extends SchedConfig { @Value("${flowControl:false}") private boolean flowControl; - @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) - private FlowControlFilter flowControlFilter; + // @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) + private FlowStat flowStat = GlobalFlowControlFilter.flowStat; @Resource private ResourceRateLimitConfigService resourceRateLimitConfigService; @@ -107,7 +108,7 @@ public class FlowStatSchedConfig extends SchedConfig { if (!flowControl) { return; } - FlowStat flowStat = flowControlFilter.getFlowStat(); + // FlowStat flowStat = flowControlFilter.getFlowStat(); if (startTimeSlot == 0) { startTimeSlot = getRecentEndTimeSlot(flowStat); return; diff --git a/src/main/java/we/controller/FlowControlController.java b/src/main/java/we/controller/FlowControlController.java index 0a1ae75..8e0943e 100644 --- a/src/main/java/we/controller/FlowControlController.java +++ b/src/main/java/we/controller/FlowControlController.java @@ -26,7 +26,8 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -import we.filter.FlowControlFilter; +// import we.filter.FlowControlFilter; +import we.filter.GlobalFlowControlFilter; import we.stats.FlowStat; import we.stats.ResourceTimeWindowStat; import we.stats.TimeWindowStat; @@ -46,14 +47,16 @@ import java.util.Map; */ @RestController -@DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER) +@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @RequestMapping("/admin/flowStat") public class FlowControlController { private static final Logger log = LoggerFactory.getLogger(FlowControlController.class); - @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) - private FlowControlFilter flowControlFilter; + // @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) + // private FlowControlFilter flowControlFilter; + + private FlowStat flowStat = GlobalFlowControlFilter.flowStat; @GetMapping("/globalConcurrentsRps") public Mono globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) { @@ -64,7 +67,7 @@ public class FlowControlController { result.put("rps", rps); try { - FlowStat flowStat = flowControlFilter.getFlowStat(); + // FlowStat flowStat = flowControlFilter.getFlowStat(); long currentTimeSlot = flowStat.currentTimeSlotId(); long startTimeSlot = currentTimeSlot - recent * 1000; TimeWindowStat timeWindowStat = null; diff --git a/src/main/java/we/filter/FlowControlFilter.java b/src/main/java/we/filter/FlowControlFilter.java index 37236ad..2bc8b7a 100644 --- a/src/main/java/we/filter/FlowControlFilter.java +++ b/src/main/java/we/filter/FlowControlFilter.java @@ -1,176 +1,220 @@ -/* - * Copyright (C) 2020 the original author or authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package we.filter; - -import com.alibaba.nacos.api.config.annotation.NacosValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.annotation.Order; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.stereotype.Component; -import org.springframework.web.server.ServerWebExchange; -import org.springframework.web.server.WebFilterChain; -import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; -import we.stats.FlowStat; -import we.stats.ratelimit.ResourceRateLimitConfig; -import we.stats.ratelimit.ResourceRateLimitConfigService; -import we.util.Constants; -import we.util.ThreadContext; -import we.util.WebUtils; - -import javax.annotation.Resource; - -/** - * @author hongqiaowei - */ - -@Component(FlowControlFilter.FLOW_CONTROL_FILTER) -@Order(-1) -public class FlowControlFilter extends ProxyAggrFilter { - - public static final String FLOW_CONTROL_FILTER = "flowControlFilter"; - - private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class); - - private static final String exceed = " exceed "; - private static final String concurrents = " concurrents "; - private static final String orQps = " or qps "; - - @NacosValue(value = "${flowControl:false}", autoRefreshed = true) - @Value("${flowControl:false}") - private boolean flowControl; - - @Resource - private ResourceRateLimitConfigService resourceRateLimitConfigService; - - private FlowStat flowStat = new FlowStat(); - - public FlowStat getFlowStat() { - return flowStat; - } - - @Override - public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { - - if (flowControl) { - String service = WebUtils.getClientService(exchange); - String reqPath = WebUtils.getClientReqPath(exchange); - long currentTimeSlot = flowStat.currentTimeSlotId(); - ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); - ResourceRateLimitConfig globalConfig = rlc; - - boolean concurrentOrRpsExceed = false; - boolean globalExceed = concurrentOrRpsExceed; - if (rlc.isEnable()) { - concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps); - globalExceed = concurrentOrRpsExceed; - } - - if (!concurrentOrRpsExceed) { - rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath); - if (rlc == null) { - rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service); - if (rlc == null) { - rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); - if (rlc == null || !rlc.isEnable()) { - } else { - concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps); - // if (!concurrentOrRpsExceed) { - // flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - // } - } - } else { - concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps); - // if (!concurrentOrRpsExceed) { - // flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - // } - } - } else { // should not reach here for now - concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps); - if (!concurrentOrRpsExceed) { - flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - } - } - } - - if ( !globalConfig.isEnable() && ( rlc == null || (rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT && !rlc.isEnable()) ) ) { - flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - // flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE); - } else { - log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); - } - - if (concurrentOrRpsExceed) { - if (!globalExceed) { - flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); - } - - StringBuilder b = ThreadContext.getStringBuilder(); - b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); - b.append(exceed) .append(rlc.resource) .append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps); - log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); - - ServerHttpResponse resp = exchange.getResponse(); - resp.setStatusCode(HttpStatus.OK); - resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType); - return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes()))); - - } else { - - long start = System.currentTimeMillis(); - return chain.filter(exchange) - .doOnSuccess( - r -> { - inTheEnd(exchange, start, currentTimeSlot, true); - } - ) - .doOnError( - t -> { - inTheEnd(exchange, start, currentTimeSlot, false); - } - ) - .doOnCancel( - () -> { - inTheEnd(exchange, start, currentTimeSlot, false); - } - ); - } - } - - return chain.filter(exchange); - } - - private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { - long spend = System.currentTimeMillis() - start; - flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); - flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success); - - String service = WebUtils.getClientService(exchange); - flowStat.decrConcurrentRequest(service, currentTimeSlot); - flowStat.addRequestRT(service, currentTimeSlot, spend, success); - - // String reqPath = WebUtils.getClientReqPath(exchange); - // flowStat.decrConcurrentRequest(reqPath, currentTimeSlot); - // flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success); - } -} +// /* +// * Copyright (C) 2020 the original author or authors. +// * +// * This program is free software: you can redistribute it and/or modify +// * it under the terms of the GNU General Public License as published by +// * the Free Software Foundation, either version 3 of the License, or +// * any later version. +// * +// * This program is distributed in the hope that it will be useful, +// * but WITHOUT ANY WARRANTY; without even the implied warranty of +// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// * GNU General Public License for more details. +// * +// * You should have received a copy of the GNU General Public License +// * along with this program. If not, see . +// */ +// +// package we.filter; +// +// import com.alibaba.nacos.api.config.annotation.NacosValue; +// import org.apache.commons.lang.text.StrBuilder; +// import org.slf4j.Logger; +// import org.slf4j.LoggerFactory; +// import org.springframework.beans.factory.annotation.Value; +// import org.springframework.core.annotation.Order; +// import org.springframework.http.HttpHeaders; +// import org.springframework.http.HttpStatus; +// import org.springframework.http.server.reactive.ServerHttpResponse; +// import org.springframework.stereotype.Component; +// import org.springframework.web.server.ServerWebExchange; +// import org.springframework.web.server.WebFilterChain; +// import reactor.core.publisher.Mono; +// import we.flume.clients.log4j2appender.LogService; +// import we.stats.FlowStat; +// import we.stats.ratelimit.ResourceRateLimitConfig; +// import we.stats.ratelimit.ResourceRateLimitConfigService; +// import we.util.Constants; +// import we.util.JacksonUtils; +// import we.util.ThreadContext; +// import we.util.WebUtils; +// +// import javax.annotation.Resource; +// import java.util.HashMap; +// import java.util.Map; +// +// /** +// * @author hongqiaowei +// */ +// +// @Component(FlowControlFilter.FLOW_CONTROL_FILTER) +// @Order(-1) +// public class FlowControlFilter extends ProxyAggrFilter { +// +// public static final String FLOW_CONTROL_FILTER = "flowControlFilter"; +// +// private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class); +// +// private static final String exceed = " exceed "; +// private static final String concurrents = " concurrents "; +// private static final String orQps = " or qps "; +// +// @NacosValue(value = "${flowControl:false}", autoRefreshed = true) +// @Value("${flowControl:false}") +// private boolean flowControl; +// +// @Resource +// private ResourceRateLimitConfigService resourceRateLimitConfigService; +// +// private FlowStat flowStat = new FlowStat(); +// +// public FlowStat getFlowStat() { +// return flowStat; +// } +// +// @Override +// public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { +// +// if (flowControl) { +// +// Map m = new HashMap<>(); +// +// String service = WebUtils.getClientService(exchange); +// String reqPath = WebUtils.getClientReqPath(exchange); +// long currentTimeSlot = flowStat.currentTimeSlotId(); +// ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); +// ResourceRateLimitConfig globalConfig = rlc; +// +// boolean concurrentOrRpsExceed = false; +// boolean globalExceed = concurrentOrRpsExceed; +// if (rlc.isEnable()) { +// +// m.put("global enable", "global enable"); +// +// concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps); +// globalExceed = concurrentOrRpsExceed; +// } +// +// if (!concurrentOrRpsExceed) { +// +// m.put("aaa", "aaa"); +// +// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath); +// if (rlc == null) { +// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service); +// if (rlc == null) { +// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); +// if (rlc == null || !rlc.isEnable()) { +// m.put("ccc", "ccc"); +// } else { +// m.put("ddd", "ddd"); +// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps); +// // if (!concurrentOrRpsExceed) { +// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null); +// // } +// } +// } else { +// m.put("eee", "eee"); +// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps); +// // if (!concurrentOrRpsExceed) { +// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null); +// // } +// } +// } else { // should not reach here for now +// m.put("fff", "fff"); +// concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps); +// if (!concurrentOrRpsExceed) { +// flowStat.incrRequest(service, currentTimeSlot, null, null); +// } +// } +// } +// +// if ( !globalConfig.isEnable() && ( rlc == null || (rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT && !rlc.isEnable()) ) ) { +// +// m.put("bbb", "bbb"); +// +// flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null); +// flowStat.incrRequest(service, currentTimeSlot, null, null); +// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null); +// } else { +// log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + globalConfig + " or " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); +// } +// +// m.put("concurrentOrRpsExceed", concurrentOrRpsExceed); +// m.put("globalExceed", globalExceed); +// +// log.info(JacksonUtils.writeValueAsString(m), LogService.BIZ_ID, exchange.getRequest().getId()); +// +// if (concurrentOrRpsExceed) { +// if (!globalExceed) { +// +// StringBuilder b = new StringBuilder(); +// WebUtils.request2stringBuilder(exchange, b); +// b.append("\n aa22"); +// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); +// +// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); +// } +// +// StringBuilder b = ThreadContext.getStringBuilder(); +// b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); +// b.append(exceed) .append(rlc.resource) .append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps); +// log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); +// +// ServerHttpResponse resp = exchange.getResponse(); +// resp.setStatusCode(HttpStatus.OK); +// resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType); +// return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes()))); +// +// // } else { +// +// +// StringBuilder b = new StringBuilder(); +// WebUtils.request2stringBuilder(exchange, b); +// b.append('\n'); +// +// long start = System.currentTimeMillis(); +// return chain.filter(exchange) +// .doOnSuccess( +// r -> { +// b.append(" succ "); +// inTheEnd(exchange, start, currentTimeSlot, true); +// } +// ) +// .doOnError( +// t -> { +// b.append(" errs "); +// inTheEnd(exchange, start, currentTimeSlot, false); +// } +// ) +// .doOnCancel( +// () -> { +// b.append(" cans "); +// inTheEnd(exchange, start, currentTimeSlot, false); +// } +// ) +// .doFinally( +// s -> { +// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); +// } +// ); +// } +// } +// +// return chain.filter(exchange); +// } +// +// private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { +// long spend = System.currentTimeMillis() - start; +// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); +// flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success); +// +// String service = WebUtils.getClientService(exchange); +// flowStat.decrConcurrentRequest(service, currentTimeSlot); +// flowStat.addRequestRT(service, currentTimeSlot, spend, success); +// +// // String reqPath = WebUtils.getClientReqPath(exchange); +// // flowStat.decrConcurrentRequest(reqPath, currentTimeSlot); +// // flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success); +// } +// } diff --git a/src/main/java/we/filter/GlobalFlowControlFilter.java b/src/main/java/we/filter/GlobalFlowControlFilter.java new file mode 100644 index 0000000..a96db61 --- /dev/null +++ b/src/main/java/we/filter/GlobalFlowControlFilter.java @@ -0,0 +1,106 @@ +package we.filter; + +import com.alibaba.nacos.api.config.annotation.NacosValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.stereotype.Component; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebFilterChain; +import reactor.core.publisher.Mono; +import we.flume.clients.log4j2appender.LogService; +import we.stats.FlowStat; +import we.stats.ratelimit.ResourceRateLimitConfig; +import we.stats.ratelimit.ResourceRateLimitConfigService; +import we.util.Constants; +import we.util.ThreadContext; +import we.util.WebUtils; + +import javax.annotation.Resource; + +@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) +@Order(-4) +public class GlobalFlowControlFilter extends ProxyAggrFilter { + + private static final Logger log = LoggerFactory.getLogger(GlobalFlowControlFilter.class); + + public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter"; + + private static final String exceed = " exceed "; + private static final String concurrents = " concurrents "; + private static final String orQps = " or qps "; + + @NacosValue(value = "${flowControl:false}", autoRefreshed = true) + @Value("${flowControl:false}") + private boolean flowControl; + + @Resource + private ResourceRateLimitConfigService resourceRateLimitConfigService; + + public static FlowStat flowStat = new FlowStat(); + + + @Override + public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + + if (flowControl) { + long currentTimeSlot = flowStat.currentTimeSlotId(); + exchange.getAttributes().put("currentTimeSlot", currentTimeSlot); + ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); + if (config.isEnable()) { + // 有流控配置 + + + boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps); + if (concurrentOrRpsExceed) { + // 如果超了,直接响应 + + StringBuilder b = ThreadContext.getStringBuilder(); + b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); + b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps); + log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); + + ServerHttpResponse resp = exchange.getResponse(); + resp.setStatusCode(HttpStatus.OK); + resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType); + return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes()))); + } + + } else { + flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null); + } + + // 没配置或没超配置 + long start = System.currentTimeMillis(); + exchange.getAttributes().put("start", start); + return chain.filter(exchange) + .doOnSuccess( + r -> { + inTheEnd(exchange, start, currentTimeSlot, true); + } + ) + .doOnError( + t -> { + inTheEnd(exchange, start, currentTimeSlot, false); + } + ) + .doOnCancel( + () -> { + inTheEnd(exchange, start, currentTimeSlot, false); + } + ); + } + + return chain.filter(exchange); + } + + private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { + long spend = System.currentTimeMillis() - start; + flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); + flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success); + } +} diff --git a/src/main/java/we/filter/ServiceFlowControlFilter.java b/src/main/java/we/filter/ServiceFlowControlFilter.java new file mode 100644 index 0000000..d7d6748 --- /dev/null +++ b/src/main/java/we/filter/ServiceFlowControlFilter.java @@ -0,0 +1,108 @@ +package we.filter; + +import com.alibaba.nacos.api.config.annotation.NacosValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.stereotype.Component; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebFilterChain; +import reactor.core.publisher.Mono; +import we.flume.clients.log4j2appender.LogService; +import we.stats.FlowStat; +import we.stats.ratelimit.ResourceRateLimitConfig; +import we.stats.ratelimit.ResourceRateLimitConfigService; +import we.util.Constants; +import we.util.ThreadContext; +import we.util.WebUtils; + +import javax.annotation.Resource; + +@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER) +@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) +@Order(-3) +public class ServiceFlowControlFilter extends ProxyAggrFilter { + + private static final Logger log = LoggerFactory.getLogger(ServiceFlowControlFilter.class); + + public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter"; + + + private static final String exceed = " exceed "; + private static final String concurrents = " concurrents "; + private static final String orQps = " or qps "; + + @NacosValue(value = "${flowControl:false}", autoRefreshed = true) + @Value("${flowControl:false}") + private boolean flowControl; + + private FlowStat flowStat = GlobalFlowControlFilter.flowStat; + + @Resource + private ResourceRateLimitConfigService resourceRateLimitConfigService; + + @Override + public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + if (flowControl) { + long currentTimeSlot = exchange.getAttribute("currentTimeSlot"); + String service = WebUtils.getClientService(exchange); + ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service); + + if (config == null) { + config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); + } + if (config == null || !config.isEnable()) { + // 无服务流控配置 + flowStat.incrRequest(service, currentTimeSlot, null, null); + } else { + boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps); + if (concurrentOrRpsExceed ) { + StringBuilder b = ThreadContext.getStringBuilder(); + b.append(service).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); + b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps); + log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); + + ResourceRateLimitConfig gc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); + + ServerHttpResponse resp = exchange.getResponse(); + resp.setStatusCode(HttpStatus.OK); + resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, gc.responseType); + return resp.writeWith(Mono.just(resp.bufferFactory().wrap(gc.responseContent.getBytes()))); + } + } + + // 没配置或没超配置 + long start = exchange.getAttribute("start"); + return chain.filter(exchange) + .doOnSuccess( + r -> { + inTheEnd(exchange, start, currentTimeSlot, true); + } + ) + .doOnError( + t -> { + inTheEnd(exchange, start, currentTimeSlot, false); + } + ) + .doOnCancel( + () -> { + inTheEnd(exchange, start, currentTimeSlot, false); + } + ); + } + + return chain.filter(exchange); + } + + private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { + long spend = System.currentTimeMillis() - start; + String service = WebUtils.getClientService(exchange); + flowStat.decrConcurrentRequest(service, currentTimeSlot); + flowStat.addRequestRT(service, currentTimeSlot, spend, success); + } +} diff --git a/src/main/java/we/stats/FlowStat.java b/src/main/java/we/stats/FlowStat.java index 0edc865..9c183e2 100644 --- a/src/main/java/we/stats/FlowStat.java +++ b/src/main/java/we/stats/FlowStat.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,6 +133,20 @@ public class FlowStat { return; } ResourceStat resourceStat = getResourceStat(resourceId); + + long concurrents = resourceStat.getConcurrentRequests().get(); + if (concurrents == 0) { + // RuntimeException e = new RuntimeException(); + // log.warn(timeSlotId + " " + resourceId + " concurrents is zero", e); + + StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); + StringBuilder b = new StringBuilder(timeSlotId + " " + resourceId + " concurrents is zero\n"); + for (int i = 0; i < stackTraces.length; i++) { + b.append(stackTraces[i].toString()).append('\n'); + } + log.warn(b.toString()); + } + resourceStat.decrConcurrentRequest(timeSlotId); } @@ -158,6 +173,7 @@ public class FlowStat { resourceStat = resourceStats.get(resourceId); } else { resourceStat = new ResourceStat(resourceId); + log.info("no resource stat for " + resourceId + " and create one"); ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat); if (rs != null) { resourceStat = rs; diff --git a/src/main/java/we/stats/ResourceStat.java b/src/main/java/we/stats/ResourceStat.java index 2d2c241..7ac98f6 100644 --- a/src/main/java/we/stats/ResourceStat.java +++ b/src/main/java/we/stats/ResourceStat.java @@ -16,6 +16,9 @@ */ package we.stats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -30,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ResourceStat { + private static final Logger log = LoggerFactory.getLogger(ResourceStat.class); + /** * Resource ID */ @@ -112,6 +117,11 @@ public class ResourceStat { */ public void decrConcurrentRequest(long timeSlotId) { long conns = this.concurrentRequests.decrementAndGet(); + + if (conns == -1) { + log.warn(timeSlotId + " concurrents is one"); + } + this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns); }