refactor: flow control filters v1

This commit is contained in:
hongqiaowei
2021-01-14 13:34:59 +08:00
parent f12e21b572
commit d81f178b35
7 changed files with 474 additions and 186 deletions

View File

@@ -26,7 +26,8 @@ import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import we.filter.FlowControlFilter;
// import we.filter.FlowControlFilter;
import we.filter.GlobalFlowControlFilter;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
@@ -51,7 +52,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Configuration
// @ConditionalOnProperty(name="flowControl",havingValue = "true")
@DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER)
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@EnableScheduling
// @ConfigurationProperties(prefix = "flow-stat-sched")
public class FlowStatSchedConfig extends SchedConfig {
@@ -78,8 +79,8 @@ public class FlowStatSchedConfig extends SchedConfig {
@Value("${flowControl:false}")
private boolean flowControl;
@Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
private FlowControlFilter flowControlFilter;
// @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@@ -107,7 +108,7 @@ public class FlowStatSchedConfig extends SchedConfig {
if (!flowControl) {
return;
}
FlowStat flowStat = flowControlFilter.getFlowStat();
// FlowStat flowStat = flowControlFilter.getFlowStat();
if (startTimeSlot == 0) {
startTimeSlot = getRecentEndTimeSlot(flowStat);
return;

View File

@@ -26,7 +26,8 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.filter.FlowControlFilter;
// import we.filter.FlowControlFilter;
import we.filter.GlobalFlowControlFilter;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
@@ -46,14 +47,16 @@ import java.util.Map;
*/
@RestController
@DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER)
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@RequestMapping("/admin/flowStat")
public class FlowControlController {
private static final Logger log = LoggerFactory.getLogger(FlowControlController.class);
@Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
private FlowControlFilter flowControlFilter;
// @Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
// private FlowControlFilter flowControlFilter;
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@GetMapping("/globalConcurrentsRps")
public Mono<String> globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) {
@@ -64,7 +67,7 @@ public class FlowControlController {
result.put("rps", rps);
try {
FlowStat flowStat = flowControlFilter.getFlowStat();
// FlowStat flowStat = flowControlFilter.getFlowStat();
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - recent * 1000;
TimeWindowStat timeWindowStat = null;

View File

@@ -1,176 +1,220 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
@Component(FlowControlFilter.FLOW_CONTROL_FILTER)
@Order(-1)
public class FlowControlFilter extends ProxyAggrFilter {
public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
private FlowStat flowStat = new FlowStat();
public FlowStat getFlowStat() {
return flowStat;
}
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
String service = WebUtils.getClientService(exchange);
String reqPath = WebUtils.getClientReqPath(exchange);
long currentTimeSlot = flowStat.currentTimeSlotId();
ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ResourceRateLimitConfig globalConfig = rlc;
boolean concurrentOrRpsExceed = false;
boolean globalExceed = concurrentOrRpsExceed;
if (rlc.isEnable()) {
concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps);
globalExceed = concurrentOrRpsExceed;
}
if (!concurrentOrRpsExceed) {
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath);
if (rlc == null) {
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (rlc == null) {
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
if (rlc == null || !rlc.isEnable()) {
} else {
concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
// if (!concurrentOrRpsExceed) {
// flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
// }
}
} else {
concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
// if (!concurrentOrRpsExceed) {
// flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
// }
}
} else { // should not reach here for now
concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps);
if (!concurrentOrRpsExceed) {
flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
}
}
}
if ( !globalConfig.isEnable() && ( rlc == null || (rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT && !rlc.isEnable()) ) ) {
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
flowStat.incrRequest(service, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
// flowStat.incrRequest(reqPath, currentTimeSlot, Long.MAX_VALUE, Long.MAX_VALUE);
} else {
log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
}
if (concurrentOrRpsExceed) {
if (!globalExceed) {
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
}
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(rlc.resource) .append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
} else {
long start = System.currentTimeMillis();
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
);
}
}
return chain.filter(exchange);
}
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
String service = WebUtils.getClientService(exchange);
flowStat.decrConcurrentRequest(service, currentTimeSlot);
flowStat.addRequestRT(service, currentTimeSlot, spend, success);
// String reqPath = WebUtils.getClientReqPath(exchange);
// flowStat.decrConcurrentRequest(reqPath, currentTimeSlot);
// flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success);
}
}
// /*
// * Copyright (C) 2020 the original author or authors.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//
// package we.filter;
//
// import com.alibaba.nacos.api.config.annotation.NacosValue;
// import org.apache.commons.lang.text.StrBuilder;
// import org.slf4j.Logger;
// import org.slf4j.LoggerFactory;
// import org.springframework.beans.factory.annotation.Value;
// import org.springframework.core.annotation.Order;
// import org.springframework.http.HttpHeaders;
// import org.springframework.http.HttpStatus;
// import org.springframework.http.server.reactive.ServerHttpResponse;
// import org.springframework.stereotype.Component;
// import org.springframework.web.server.ServerWebExchange;
// import org.springframework.web.server.WebFilterChain;
// import reactor.core.publisher.Mono;
// import we.flume.clients.log4j2appender.LogService;
// import we.stats.FlowStat;
// import we.stats.ratelimit.ResourceRateLimitConfig;
// import we.stats.ratelimit.ResourceRateLimitConfigService;
// import we.util.Constants;
// import we.util.JacksonUtils;
// import we.util.ThreadContext;
// import we.util.WebUtils;
//
// import javax.annotation.Resource;
// import java.util.HashMap;
// import java.util.Map;
//
// /**
// * @author hongqiaowei
// */
//
// @Component(FlowControlFilter.FLOW_CONTROL_FILTER)
// @Order(-1)
// public class FlowControlFilter extends ProxyAggrFilter {
//
// public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
//
// private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
//
// private static final String exceed = " exceed ";
// private static final String concurrents = " concurrents ";
// private static final String orQps = " or qps ";
//
// @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
// @Value("${flowControl:false}")
// private boolean flowControl;
//
// @Resource
// private ResourceRateLimitConfigService resourceRateLimitConfigService;
//
// private FlowStat flowStat = new FlowStat();
//
// public FlowStat getFlowStat() {
// return flowStat;
// }
//
// @Override
// public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
//
// if (flowControl) {
//
// Map<String, Object> m = new HashMap<>();
//
// String service = WebUtils.getClientService(exchange);
// String reqPath = WebUtils.getClientReqPath(exchange);
// long currentTimeSlot = flowStat.currentTimeSlotId();
// ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
// ResourceRateLimitConfig globalConfig = rlc;
//
// boolean concurrentOrRpsExceed = false;
// boolean globalExceed = concurrentOrRpsExceed;
// if (rlc.isEnable()) {
//
// m.put("global enable", "global enable");
//
// concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps);
// globalExceed = concurrentOrRpsExceed;
// }
//
// if (!concurrentOrRpsExceed) {
//
// m.put("aaa", "aaa");
//
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath);
// if (rlc == null) {
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
// if (rlc == null) {
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
// if (rlc == null || !rlc.isEnable()) {
// m.put("ccc", "ccc");
// } else {
// m.put("ddd", "ddd");
// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
// // if (!concurrentOrRpsExceed) {
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null);
// // }
// }
// } else {
// m.put("eee", "eee");
// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
// // if (!concurrentOrRpsExceed) {
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null);
// // }
// }
// } else { // should not reach here for now
// m.put("fff", "fff");
// concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps);
// if (!concurrentOrRpsExceed) {
// flowStat.incrRequest(service, currentTimeSlot, null, null);
// }
// }
// }
//
// if ( !globalConfig.isEnable() && ( rlc == null || (rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT && !rlc.isEnable()) ) ) {
//
// m.put("bbb", "bbb");
//
// flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
// flowStat.incrRequest(service, currentTimeSlot, null, null);
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null);
// } else {
// log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + globalConfig + " or " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
// }
//
// m.put("concurrentOrRpsExceed", concurrentOrRpsExceed);
// m.put("globalExceed", globalExceed);
//
// log.info(JacksonUtils.writeValueAsString(m), LogService.BIZ_ID, exchange.getRequest().getId());
//
// if (concurrentOrRpsExceed) {
// if (!globalExceed) {
//
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append("\n aa22");
// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
//
// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
// }
//
// StringBuilder b = ThreadContext.getStringBuilder();
// b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
// b.append(exceed) .append(rlc.resource) .append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps);
// log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
//
// ServerHttpResponse resp = exchange.getResponse();
// resp.setStatusCode(HttpStatus.OK);
// resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
// return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
//
// // } else {
//
//
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append('\n');
//
// long start = System.currentTimeMillis();
// return chain.filter(exchange)
// .doOnSuccess(
// r -> {
// b.append(" succ ");
// inTheEnd(exchange, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// b.append(" errs ");
// inTheEnd(exchange, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// b.append(" cans ");
// inTheEnd(exchange, start, currentTimeSlot, false);
// }
// )
// .doFinally(
// s -> {
// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
// }
// );
// }
// }
//
// return chain.filter(exchange);
// }
//
// private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
// long spend = System.currentTimeMillis() - start;
// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
// flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
//
// String service = WebUtils.getClientService(exchange);
// flowStat.decrConcurrentRequest(service, currentTimeSlot);
// flowStat.addRequestRT(service, currentTimeSlot, spend, success);
//
// // String reqPath = WebUtils.getClientReqPath(exchange);
// // flowStat.decrConcurrentRequest(reqPath, currentTimeSlot);
// // flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success);
// }
// }

View File

@@ -0,0 +1,106 @@
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@Order(-4)
public class GlobalFlowControlFilter extends ProxyAggrFilter {
private static final Logger log = LoggerFactory.getLogger(GlobalFlowControlFilter.class);
public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
public static FlowStat flowStat = new FlowStat();
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = flowStat.currentTimeSlotId();
exchange.getAttributes().put("currentTimeSlot", currentTimeSlot);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
if (config.isEnable()) {
// 有流控配置
boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed) {
// 如果超了,直接响应
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, config.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(config.responseContent.getBytes())));
}
} else {
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
}
// 没配置或没超配置
long start = System.currentTimeMillis();
exchange.getAttributes().put("start", start);
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
);
}
return chain.filter(exchange);
}
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
}
}

View File

@@ -0,0 +1,108 @@
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER)
@DependsOn(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@Order(-3)
public class ServiceFlowControlFilter extends ProxyAggrFilter {
private static final Logger log = LoggerFactory.getLogger(ServiceFlowControlFilter.class);
public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
private FlowStat flowStat = GlobalFlowControlFilter.flowStat;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = exchange.getAttribute("currentTimeSlot");
String service = WebUtils.getClientService(exchange);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (config == null) {
config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
}
if (config == null || !config.isEnable()) {
// 无服务流控配置
flowStat.incrRequest(service, currentTimeSlot, null, null);
} else {
boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed ) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(service).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents).append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ResourceRateLimitConfig gc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, gc.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(gc.responseContent.getBytes())));
}
}
// 没配置或没超配置
long start = exchange.getAttribute("start");
return chain.filter(exchange)
.doOnSuccess(
r -> {
inTheEnd(exchange, start, currentTimeSlot, true);
}
)
.doOnError(
t -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
)
.doOnCancel(
() -> {
inTheEnd(exchange, start, currentTimeSlot, false);
}
);
}
return chain.filter(exchange);
}
private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
String service = WebUtils.getClientService(exchange);
flowStat.decrConcurrentRequest(service, currentTimeSlot);
flowStat.addRequestRT(service, currentTimeSlot, spend, success);
}
}

View File

@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,6 +133,20 @@ public class FlowStat {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
long concurrents = resourceStat.getConcurrentRequests().get();
if (concurrents == 0) {
// RuntimeException e = new RuntimeException();
// log.warn(timeSlotId + " " + resourceId + " concurrents is zero", e);
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
StringBuilder b = new StringBuilder(timeSlotId + " " + resourceId + " concurrents is zero\n");
for (int i = 0; i < stackTraces.length; i++) {
b.append(stackTraces[i].toString()).append('\n');
}
log.warn(b.toString());
}
resourceStat.decrConcurrentRequest(timeSlotId);
}
@@ -158,6 +173,7 @@ public class FlowStat {
resourceStat = resourceStats.get(resourceId);
} else {
resourceStat = new ResourceStat(resourceId);
log.info("no resource stat for " + resourceId + " and create one");
ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat);
if (rs != null) {
resourceStat = rs;

View File

@@ -16,6 +16,9 @@
*/
package we.stats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -30,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class ResourceStat {
private static final Logger log = LoggerFactory.getLogger(ResourceStat.class);
/**
* Resource ID
*/
@@ -112,6 +117,11 @@ public class ResourceStat {
*/
public void decrConcurrentRequest(long timeSlotId) {
long conns = this.concurrentRequests.decrementAndGet();
if (conns == -1) {
log.warn(timeSlotId + " concurrents is one");
}
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}