refactor: flow control filters v2

This commit is contained in:
hongqiaowei
2021-01-14 15:41:37 +08:00
parent 1c1c26a390
commit 4bf1b50579
6 changed files with 227 additions and 176 deletions

View File

@@ -0,0 +1,37 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import we.stats.FlowStat;
/**
* @author hongqiaowei
*/
@ConditionalOnProperty(name = "flowControl", havingValue = "true")
@Configuration
public class FlowControlConfig {
@Bean
public FlowStat flowStat() {
return new FlowStat();
}
}

View File

@@ -22,12 +22,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
// import we.filter.FlowControlFilter;
import we.filter.GlobalFlowControlFilter;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
@@ -51,10 +48,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@Configuration
// @ConditionalOnProperty(name="flowControl",havingValue = "true")
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@EnableScheduling
// @ConfigurationProperties(prefix = "flow-stat-sched")
public class FlowStatSchedConfig extends SchedConfig {
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
@@ -79,8 +73,8 @@ public class FlowStatSchedConfig extends SchedConfig {
@Value("${flowControl:false}")
private boolean flowControl;
// @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@Resource
private FlowStat flowStat;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@@ -108,7 +102,6 @@ public class FlowStatSchedConfig extends SchedConfig {
if (!flowControl) {
return;
}
// FlowStat flowStat = flowControlFilter.getFlowStat();
if (startTimeSlot == 0) {
startTimeSlot = getRecentEndTimeSlot(flowStat);
return;

View File

@@ -17,17 +17,16 @@
package we.controller;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
// import we.filter.FlowControlFilter;
import we.filter.GlobalFlowControlFilter;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
@@ -47,60 +46,64 @@ import java.util.Map;
*/
@RestController
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@RequestMapping("/admin/flowStat")
public class FlowControlController {
private static final Logger log = LoggerFactory.getLogger(FlowControlController.class);
private static final Logger log = LoggerFactory.getLogger(FlowControlController.class);
// @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
// private FlowControlFilter flowControlFilter;
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@Resource
private FlowStat flowStat;
@GetMapping("/globalConcurrentsRps")
public Mono<String> globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) {
long concurrents = 0; double rps = 0;
Map<String, Object> result = new HashMap<>();
result.put("concurrents", concurrents);
result.put("rps", rps);
long concurrents = 0;
double rps = 0;
Map<String, Object> result = new HashMap<>();
result.put("concurrents", concurrents);
result.put("rps", rps);
try {
// FlowStat flowStat = flowControlFilter.getFlowStat();
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - recent * 1000;
TimeWindowStat timeWindowStat = null;
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
if (wins == null || wins.isEmpty()) {
result.put("rps", 0);
} else {
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
result.put("concurrents", concurrents);
timeWindowStat = wins.get(0).getWindows().get(0);
BigDecimal winrps = timeWindowStat.getRps();
if (winrps == null) {
rps = 0;
} else {
rps = winrps.doubleValue();
}
result.put("rps", rps);
}
if (log.isDebugEnabled()) {
long compReqs = -1;
if (timeWindowStat != null) {
compReqs = timeWindowStat.getCompReqs();
}
log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs);
}
if (flowControl) {
try {
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - recent * 1000;
TimeWindowStat timeWindowStat = null;
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
if (wins == null || wins.isEmpty()) {
result.put("rps", 0);
} else {
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
result.put("concurrents", concurrents);
timeWindowStat = wins.get(0).getWindows().get(0);
BigDecimal winrps = timeWindowStat.getRps();
if (winrps == null) {
rps = 0;
} else {
rps = winrps.doubleValue();
}
result.put("rps", rps);
}
if (log.isDebugEnabled()) {
long compReqs = -1;
if (timeWindowStat != null) {
compReqs = timeWindowStat.getCompReqs();
}
log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs);
}
} catch (Throwable t) {
log.error("get current global concurrents and rps error", t);
}
return Mono.just(JacksonUtils.writeValueAsString(result));
} catch (Throwable t) {
log.error("get current global concurrents and rps error", t);
}
}
return Mono.just(JacksonUtils.writeValueAsString(result));
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
public abstract class AbsFlowControlFilter extends ProxyAggrFilter {
private static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class);
protected static final String exceed = " exceed ";
protected static final String concurrents = " concurrents ";
protected static final String orQps = " or qps ";
protected static final String currentTimeSlot = "currentTimeSlot";
protected static final String start = "start";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
protected boolean flowControl;
@Resource
protected ResourceRateLimitConfigService resourceRateLimitConfigService;
@Resource
protected FlowStat flowStat;
protected Mono<Void> generateExceedResponse(ServerWebExchange exchange, ResourceRateLimitConfig config) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents) .append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes())));
}
protected void inTheEnd(ServerWebExchange exchange, String resource, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
flowStat.decrConcurrentRequest(resource, currentTimeSlot);
flowStat.addRequestRT(resource, currentTimeSlot, spend, success);
}
}

View File

@@ -1,106 +1,74 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@Order(-4)
public class GlobalFlowControlFilter extends ProxyAggrFilter {
private static final Logger log = LoggerFactory.getLogger(GlobalFlowControlFilter.class);
public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
public static FlowStat flowStat = new FlowStat();
public class GlobalFlowControlFilter extends AbsFlowControlFilter {
public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = flowStat.currentTimeSlotId();
exchange.getAttributes().put("currentTimeSlot", currentTimeSlot);
exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
if (config.isEnable()) {
// 有流控配置
boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed) {
// 如果超了,直接响应
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes())));
return generateExceedResponse(exchange, config);
}
} else {
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
}
// 没配置或没超配置
long start = System.currentTimeMillis();
exchange.getAttributes().put("start", start);
long start = System.currentTimeMillis();
exchange.getAttributes().put(AbsFlowControlFilter.start, start);
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, start, currentTimeSlot, true);
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, start, currentTimeSlot, false);
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
}
);
}
return chain.filter(exchange);
}
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
}
}

View File

@@ -1,55 +1,45 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER)
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@Order(-3)
public class ServiceFlowControlFilter extends ProxyAggrFilter {
public class ServiceFlowControlFilter extends AbsFlowControlFilter {
private static final Logger log = LoggerFactory.getLogger(ServiceFlowControlFilter.class);
public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = exchange.getAttribute("currentTimeSlot");
long currentTimeSlot = exchange.getAttribute(AbsFlowControlFilter.currentTimeSlot);
String service = WebUtils.getClientService(exchange);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
@@ -57,52 +47,32 @@ public class ServiceFlowControlFilter extends ProxyAggrFilter {
config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
}
if (config == null || !config.isEnable()) {
// 无服务流控配置
flowStat.incrRequest(service, currentTimeSlot, null, null);
} else {
boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed ) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(service).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ResourceRateLimitConfig gc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, gc.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(gc.responseContent.getBytes())));
if (concurrentOrRpsExceed) {
return generateExceedResponse(exchange, config);
}
}
// 没配置或没超配置
long start = exchange.getAttribute("start");
long start = exchange.getAttribute(AbsFlowControlFilter.start);
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, start, currentTimeSlot, true);
inTheEnd(exchange, service, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, start, currentTimeSlot, false);
inTheEnd(exchange, service, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
inTheEnd(exchange, service, start, currentTimeSlot, false);
}
);
}
return chain.filter(exchange);
}
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
String service = WebUtils.getClientService(exchange);
flowStat.decrConcurrentRequest(service, currentTimeSlot);
flowStat.addRequestRT(service, currentTimeSlot, spend, success);
}
}