From 4bf1b5057928e5275376b0949481bcceafd92159 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Thu, 14 Jan 2021 15:41:37 +0800 Subject: [PATCH] refactor: flow control filters v2 --- .../java/we/config/FlowControlConfig.java | 37 ++++++++ .../java/we/config/FlowStatSchedConfig.java | 11 +-- .../we/controller/FlowControlController.java | 95 ++++++++++--------- .../java/we/filter/AbsFlowControlFilter.java | 80 ++++++++++++++++ .../we/filter/GlobalFlowControlFilter.java | 90 ++++++------------ .../we/filter/ServiceFlowControlFilter.java | 90 ++++++------------ 6 files changed, 227 insertions(+), 176 deletions(-) create mode 100644 src/main/java/we/config/FlowControlConfig.java create mode 100644 src/main/java/we/filter/AbsFlowControlFilter.java diff --git a/src/main/java/we/config/FlowControlConfig.java b/src/main/java/we/config/FlowControlConfig.java new file mode 100644 index 0000000..4a087d8 --- /dev/null +++ b/src/main/java/we/config/FlowControlConfig.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import we.stats.FlowStat; + +/** + * @author hongqiaowei + */ + +@ConditionalOnProperty(name = "flowControl", havingValue = "true") +@Configuration +public class FlowControlConfig { + + @Bean + public FlowStat flowStat() { + return new FlowStat(); + } +} diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java index 50077ec..a38cb97 100644 --- a/src/main/java/we/config/FlowStatSchedConfig.java +++ b/src/main/java/we/config/FlowStatSchedConfig.java @@ -22,12 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; -// import we.filter.FlowControlFilter; -import we.filter.GlobalFlowControlFilter; import we.flume.clients.log4j2appender.LogService; import we.stats.FlowStat; import we.stats.ResourceTimeWindowStat; @@ -51,10 +48,7 @@ import java.util.concurrent.atomic.AtomicLong; */ @Configuration -// @ConditionalOnProperty(name="flowControl",havingValue = "true") -@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @EnableScheduling -// @ConfigurationProperties(prefix = "flow-stat-sched") public class FlowStatSchedConfig extends SchedConfig { private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class); @@ -79,8 +73,8 @@ public class FlowStatSchedConfig extends SchedConfig { @Value("${flowControl:false}") private boolean flowControl; - // @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) - private FlowStat flowStat = GlobalFlowControlFilter.flowStat; + @Resource + private FlowStat flowStat; @Resource private ResourceRateLimitConfigService resourceRateLimitConfigService; @@ -108,7 +102,6 @@ public class FlowStatSchedConfig extends SchedConfig { if (!flowControl) { return; } - // FlowStat flowStat = flowControlFilter.getFlowStat(); if (startTimeSlot == 0) { startTimeSlot = getRecentEndTimeSlot(flowStat); return; diff --git a/src/main/java/we/controller/FlowControlController.java b/src/main/java/we/controller/FlowControlController.java index 8e0943e..b25563d 100644 --- a/src/main/java/we/controller/FlowControlController.java +++ b/src/main/java/we/controller/FlowControlController.java @@ -17,17 +17,16 @@ package we.controller; +import com.alibaba.nacos.api.config.annotation.NacosValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.DependsOn; +import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -// import we.filter.FlowControlFilter; -import we.filter.GlobalFlowControlFilter; import we.stats.FlowStat; import we.stats.ResourceTimeWindowStat; import we.stats.TimeWindowStat; @@ -47,60 +46,64 @@ import java.util.Map; */ @RestController -@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @RequestMapping("/admin/flowStat") public class FlowControlController { - private static final Logger log = LoggerFactory.getLogger(FlowControlController.class); + private static final Logger log = LoggerFactory.getLogger(FlowControlController.class); - // @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER) - // private FlowControlFilter flowControlFilter; + @NacosValue(value = "${flowControl:false}", autoRefreshed = true) + @Value("${flowControl:false}") + private boolean flowControl; - private FlowStat flowStat = GlobalFlowControlFilter.flowStat; + @Resource + private FlowStat flowStat; @GetMapping("/globalConcurrentsRps") public Mono globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) { - long concurrents = 0; double rps = 0; - Map result = new HashMap<>(); - result.put("concurrents", concurrents); - result.put("rps", rps); + long concurrents = 0; + double rps = 0; + Map result = new HashMap<>(); + result.put("concurrents", concurrents); + result.put("rps", rps); - try { - // FlowStat flowStat = flowControlFilter.getFlowStat(); - long currentTimeSlot = flowStat.currentTimeSlotId(); - long startTimeSlot = currentTimeSlot - recent * 1000; - TimeWindowStat timeWindowStat = null; - List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent); - if (wins == null || wins.isEmpty()) { - result.put("rps", 0); - } else { - concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL); - result.put("concurrents", concurrents); - timeWindowStat = wins.get(0).getWindows().get(0); - BigDecimal winrps = timeWindowStat.getRps(); - if (winrps == null) { - rps = 0; - } else { - rps = winrps.doubleValue(); - } - result.put("rps", rps); - } - if (log.isDebugEnabled()) { - long compReqs = -1; - if (timeWindowStat != null) { - compReqs = timeWindowStat.getCompReqs(); - } - log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs); - } + if (flowControl) { + try { + long currentTimeSlot = flowStat.currentTimeSlotId(); + long startTimeSlot = currentTimeSlot - recent * 1000; + TimeWindowStat timeWindowStat = null; + List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent); + if (wins == null || wins.isEmpty()) { + result.put("rps", 0); + } else { + concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL); + result.put("concurrents", concurrents); + timeWindowStat = wins.get(0).getWindows().get(0); + BigDecimal winrps = timeWindowStat.getRps(); + if (winrps == null) { + rps = 0; + } else { + rps = winrps.doubleValue(); + } + result.put("rps", rps); + } + if (log.isDebugEnabled()) { + long compReqs = -1; + if (timeWindowStat != null) { + compReqs = timeWindowStat.getCompReqs(); + } + log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs); + } - } catch (Throwable t) { - log.error("get current global concurrents and rps error", t); - } - return Mono.just(JacksonUtils.writeValueAsString(result)); + } catch (Throwable t) { + log.error("get current global concurrents and rps error", t); + } + } + + return Mono.just(JacksonUtils.writeValueAsString(result)); } - private String toDP19(long startTimeSlot) { - return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19); - } + private String toDP19(long startTimeSlot) { + return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19); + } } diff --git a/src/main/java/we/filter/AbsFlowControlFilter.java b/src/main/java/we/filter/AbsFlowControlFilter.java new file mode 100644 index 0000000..26598db --- /dev/null +++ b/src/main/java/we/filter/AbsFlowControlFilter.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.filter; + +import com.alibaba.nacos.api.config.annotation.NacosValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import we.flume.clients.log4j2appender.LogService; +import we.stats.FlowStat; +import we.stats.ratelimit.ResourceRateLimitConfig; +import we.stats.ratelimit.ResourceRateLimitConfigService; +import we.util.Constants; +import we.util.ThreadContext; +import we.util.WebUtils; + +import javax.annotation.Resource; + +/** + * @author hongqiaowei + */ + +public abstract class AbsFlowControlFilter extends ProxyAggrFilter { + + private static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class); + + protected static final String exceed = " exceed "; + protected static final String concurrents = " concurrents "; + protected static final String orQps = " or qps "; + protected static final String currentTimeSlot = "currentTimeSlot"; + protected static final String start = "start"; + + @NacosValue(value = "${flowControl:false}", autoRefreshed = true) + @Value("${flowControl:false}") + protected boolean flowControl; + + @Resource + protected ResourceRateLimitConfigService resourceRateLimitConfigService; + + @Resource + protected FlowStat flowStat; + + protected Mono generateExceedResponse(ServerWebExchange exchange, ResourceRateLimitConfig config) { + StringBuilder b = ThreadContext.getStringBuilder(); + b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); + b.append(exceed) .append(config.resource) .append(concurrents) .append(config.concurrents).append(orQps).append(config.qps); + log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); + + ServerHttpResponse resp = exchange.getResponse(); + resp.setStatusCode(HttpStatus.OK); + resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType); + return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes()))); + } + + protected void inTheEnd(ServerWebExchange exchange, String resource, long start, long currentTimeSlot, boolean success) { + long spend = System.currentTimeMillis() - start; + flowStat.decrConcurrentRequest(resource, currentTimeSlot); + flowStat.addRequestRT(resource, currentTimeSlot, spend, success); + } +} diff --git a/src/main/java/we/filter/GlobalFlowControlFilter.java b/src/main/java/we/filter/GlobalFlowControlFilter.java index a96db61..baaef36 100644 --- a/src/main/java/we/filter/GlobalFlowControlFilter.java +++ b/src/main/java/we/filter/GlobalFlowControlFilter.java @@ -1,106 +1,74 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + package we.filter; -import com.alibaba.nacos.api.config.annotation.NacosValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.annotation.Order; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; -import we.stats.FlowStat; import we.stats.ratelimit.ResourceRateLimitConfig; -import we.stats.ratelimit.ResourceRateLimitConfigService; -import we.util.Constants; -import we.util.ThreadContext; -import we.util.WebUtils; -import javax.annotation.Resource; +/** + * @author hongqiaowei + */ @Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @Order(-4) -public class GlobalFlowControlFilter extends ProxyAggrFilter { - - private static final Logger log = LoggerFactory.getLogger(GlobalFlowControlFilter.class); - - public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter"; - - private static final String exceed = " exceed "; - private static final String concurrents = " concurrents "; - private static final String orQps = " or qps "; - - @NacosValue(value = "${flowControl:false}", autoRefreshed = true) - @Value("${flowControl:false}") - private boolean flowControl; - - @Resource - private ResourceRateLimitConfigService resourceRateLimitConfigService; - - public static FlowStat flowStat = new FlowStat(); +public class GlobalFlowControlFilter extends AbsFlowControlFilter { + public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter"; @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { if (flowControl) { long currentTimeSlot = flowStat.currentTimeSlotId(); - exchange.getAttributes().put("currentTimeSlot", currentTimeSlot); + exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot); ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); if (config.isEnable()) { - // 有流控配置 - - boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps); if (concurrentOrRpsExceed) { - // 如果超了,直接响应 - - StringBuilder b = ThreadContext.getStringBuilder(); - b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); - b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps); - log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); - - ServerHttpResponse resp = exchange.getResponse(); - resp.setStatusCode(HttpStatus.OK); - resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType); - return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes()))); + return generateExceedResponse(exchange, config); } - } else { flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null); } - // 没配置或没超配置 - long start = System.currentTimeMillis(); - exchange.getAttributes().put("start", start); + long start = System.currentTimeMillis(); + exchange.getAttributes().put(AbsFlowControlFilter.start, start); return chain.filter(exchange) .doOnSuccess( r -> { - inTheEnd(exchange, start, currentTimeSlot, true); + inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true); } ) .doOnError( t -> { - inTheEnd(exchange, start, currentTimeSlot, false); + inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); } ) .doOnCancel( () -> { - inTheEnd(exchange, start, currentTimeSlot, false); + inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false); } ); } - return chain.filter(exchange); } - - private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { - long spend = System.currentTimeMillis() - start; - flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot); - flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success); - } } diff --git a/src/main/java/we/filter/ServiceFlowControlFilter.java b/src/main/java/we/filter/ServiceFlowControlFilter.java index d7d6748..605e4d2 100644 --- a/src/main/java/we/filter/ServiceFlowControlFilter.java +++ b/src/main/java/we/filter/ServiceFlowControlFilter.java @@ -1,55 +1,45 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + package we.filter; -import com.alibaba.nacos.api.config.annotation.NacosValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.DependsOn; import org.springframework.core.annotation.Order; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; -import we.stats.FlowStat; import we.stats.ratelimit.ResourceRateLimitConfig; -import we.stats.ratelimit.ResourceRateLimitConfigService; -import we.util.Constants; -import we.util.ThreadContext; import we.util.WebUtils; -import javax.annotation.Resource; +/** + * @author hongqiaowei + */ @Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER) -@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) @Order(-3) -public class ServiceFlowControlFilter extends ProxyAggrFilter { +public class ServiceFlowControlFilter extends AbsFlowControlFilter { - private static final Logger log = LoggerFactory.getLogger(ServiceFlowControlFilter.class); - - public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter"; - - - private static final String exceed = " exceed "; - private static final String concurrents = " concurrents "; - private static final String orQps = " or qps "; - - @NacosValue(value = "${flowControl:false}", autoRefreshed = true) - @Value("${flowControl:false}") - private boolean flowControl; - - private FlowStat flowStat = GlobalFlowControlFilter.flowStat; - - @Resource - private ResourceRateLimitConfigService resourceRateLimitConfigService; + public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter"; @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + if (flowControl) { - long currentTimeSlot = exchange.getAttribute("currentTimeSlot"); + long currentTimeSlot = exchange.getAttribute(AbsFlowControlFilter.currentTimeSlot); String service = WebUtils.getClientService(exchange); ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service); @@ -57,52 +47,32 @@ public class ServiceFlowControlFilter extends ProxyAggrFilter { config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); } if (config == null || !config.isEnable()) { - // 无服务流控配置 flowStat.incrRequest(service, currentTimeSlot, null, null); } else { boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps); - if (concurrentOrRpsExceed ) { - StringBuilder b = ThreadContext.getStringBuilder(); - b.append(service).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange)); - b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps); - log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId()); - - ResourceRateLimitConfig gc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); - - ServerHttpResponse resp = exchange.getResponse(); - resp.setStatusCode(HttpStatus.OK); - resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, gc.responseType); - return resp.writeWith(Mono.just(resp.bufferFactory().wrap(gc.responseContent.getBytes()))); + if (concurrentOrRpsExceed) { + return generateExceedResponse(exchange, config); } } - // 没配置或没超配置 - long start = exchange.getAttribute("start"); + long start = exchange.getAttribute(AbsFlowControlFilter.start); return chain.filter(exchange) .doOnSuccess( r -> { - inTheEnd(exchange, start, currentTimeSlot, true); + inTheEnd(exchange, service, start, currentTimeSlot, true); } ) .doOnError( t -> { - inTheEnd(exchange, start, currentTimeSlot, false); + inTheEnd(exchange, service, start, currentTimeSlot, false); } ) .doOnCancel( () -> { - inTheEnd(exchange, start, currentTimeSlot, false); + inTheEnd(exchange, service, start, currentTimeSlot, false); } ); } - return chain.filter(exchange); } - - private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) { - long spend = System.currentTimeMillis() - start; - String service = WebUtils.getClientService(exchange); - flowStat.decrConcurrentRequest(service, currentTimeSlot); - flowStat.addRequestRT(service, currentTimeSlot, spend, success); - } }