flow control filter supports resource chain

This commit is contained in:
Francis Dong
2021-01-18 11:32:33 +08:00
parent d3fd8efa8d
commit 6e55ed8059
7 changed files with 180 additions and 238 deletions

View File

@@ -1,218 +1,140 @@
// /* /*
// * Copyright (C) 2020 the original author or authors. * Copyright (C) 2020 the original author or authors.
// * *
// * This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or * the Free Software Foundation, either version 3 of the License, or
// * any later version. * any later version.
// * *
// * This program is distributed in the hope that it will be useful, * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details. * GNU General Public License for more details.
// * *
// * You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */ */
//
// package we.filter; package we.filter;
//
// import com.alibaba.nacos.api.config.annotation.NacosValue; import java.util.ArrayList;
// import org.slf4j.Logger; import java.util.List;
// import org.slf4j.LoggerFactory;
// import org.springframework.beans.factory.annotation.Value; import javax.annotation.Resource;
// import org.springframework.core.annotation.Order;
// import org.springframework.http.HttpHeaders; import org.slf4j.Logger;
// import org.springframework.http.HttpStatus; import org.slf4j.LoggerFactory;
// import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.beans.factory.annotation.Value;
// import org.springframework.stereotype.Component; import org.springframework.core.annotation.Order;
// import org.springframework.web.server.ServerWebExchange; import org.springframework.http.HttpHeaders;
// import org.springframework.web.server.WebFilterChain; import org.springframework.http.HttpStatus;
// import reactor.core.publisher.Mono; import org.springframework.http.server.reactive.ServerHttpResponse;
// import we.flume.clients.log4j2appender.LogService; import org.springframework.stereotype.Component;
// import we.stats.FlowStat; import org.springframework.web.server.ServerWebExchange;
// import we.stats.ratelimit.ResourceRateLimitConfig; import org.springframework.web.server.WebFilterChain;
// import we.stats.ratelimit.ResourceRateLimitConfigService;
// import we.util.Constants; import com.alibaba.nacos.api.config.annotation.NacosValue;
// import we.util.JacksonUtils;
// import we.util.ThreadContext; import reactor.core.publisher.Mono;
// import we.util.WebUtils; import reactor.core.publisher.SignalType;
// import we.flume.clients.log4j2appender.LogService;
// import javax.annotation.Resource; import we.stats.BlockType;
// import java.util.HashMap; import we.stats.FlowStat;
// import java.util.Map; import we.stats.IncrRequestResult;
// import we.stats.ResourceConfig;
// /** import we.stats.ratelimit.ResourceRateLimitConfig;
// * @author hongqiaowei import we.stats.ratelimit.ResourceRateLimitConfigService;
// */ import we.util.WebUtils;
//
// @Component(FlowControlFilter.FLOW_CONTROL_FILTER) /**
// @Order(-1) * @author hongqiaowei
// public class FlowControlFilter extends ProxyAggrFilter { */
//
// public static final String FLOW_CONTROL_FILTER = "flowControlFilter"; @Component(FlowControlFilter.FLOW_CONTROL_FILTER)
// @Order(-3)
// private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class); public class FlowControlFilter extends ProxyAggrFilter {
//
// private static final String exceed = " exceed "; public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
// private static final String concurrents = " concurrents ";
// private static final String orQps = " or qps "; private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
//
// @NacosValue(value = "${flowControl:false}", autoRefreshed = true) @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
// @Value("${flowControl:false}") @Value("${flowControl:false}")
// private boolean flowControl; private boolean flowControl;
//
// @Resource @Resource
// private ResourceRateLimitConfigService resourceRateLimitConfigService; private ResourceRateLimitConfigService resourceRateLimitConfigService;
//
// private FlowStat flowStat = new FlowStat(); private FlowStat flowStat = new FlowStat();
//
// public FlowStat getFlowStat() { public FlowStat getFlowStat() {
// return flowStat; return flowStat;
// } }
//
// @Override @Override
// public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
//
// if (flowControl) { if (flowControl) {
// String service = WebUtils.getClientService(exchange);
// Map<String, Object> traceMap = new HashMap<>();
//
// String service = WebUtils.getClientService(exchange);
// String reqPath = WebUtils.getClientReqPath(exchange); // String reqPath = WebUtils.getClientReqPath(exchange);
// long currentTimeSlot = flowStat.currentTimeSlotId(); long currentTimeSlot = flowStat.currentTimeSlotId();
// ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService
// ResourceRateLimitConfig globalConfig = rlc; .getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
// ResourceRateLimitConfig serviceConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
// boolean concurrentOrRpsExceed = false; if (serviceConfig == null) {
// boolean globalExceed = concurrentOrRpsExceed; serviceConfig = resourceRateLimitConfigService
// if (rlc.isEnable()) { .getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
// }
// traceMap.put("global enable", null); // TODO remove
// // global
// concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps); List<ResourceConfig> resourceConfigs = new ArrayList<>();
// globalExceed = concurrentOrRpsExceed; ResourceConfig globalResCfg = new ResourceConfig(ResourceRateLimitConfig.GLOBAL, 0, 0);
// } if (globalConfig != null && globalConfig.isEnable()) {
// globalResCfg.setMaxCon(globalConfig.concurrents);
// if (!concurrentOrRpsExceed) { globalResCfg.setMaxQPS(globalConfig.qps);
// }
// traceMap.put("api config", null); resourceConfigs.add(globalResCfg);
//
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath); // service
// if (rlc == null) { ResourceConfig serviceResCfg = new ResourceConfig(service, 0, 0);
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service); if (serviceConfig != null && serviceConfig.isEnable()) {
// if (rlc == null) { serviceResCfg.setMaxCon(serviceConfig.concurrents);
// rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); serviceResCfg.setMaxQPS(serviceConfig.qps);
// if (rlc == null || !rlc.isEnable()) { }
// traceMap.put("rlc is null or unable", null); resourceConfigs.add(serviceResCfg);
// } else {
// traceMap.put("service default enable", null); IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot);
// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
// // if (!concurrentOrRpsExceed) { if (result != null && !result.isSuccess()) {
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null); if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
// // } log.info("exceed {} flow limit, blocked by maximum concurrent requests",
// } result.getBlockedResourceId(), LogService.BIZ_ID, exchange.getRequest().getId());
// } else { } else {
// traceMap.put("have service config", null); log.info("exceed {} flow limit, blocked by maximum QPS", result.getBlockedResourceId(),
// concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps); LogService.BIZ_ID, exchange.getRequest().getId());
// // if (!concurrentOrRpsExceed) { }
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null);
// // } // ResourceRateLimitConfig config = result.getBlockedResourceId().equals(globalConfig.resource)
// } // ? globalConfig
// } else { // should not reach here for now // : serviceConfig;
// traceMap.put("have api config", null);
// concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps); ServerHttpResponse resp = exchange.getResponse();
// if (!concurrentOrRpsExceed) { resp.setStatusCode(HttpStatus.OK);
// flowStat.incrRequest(service, currentTimeSlot, null, null); resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
// } return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
// } } else {
// } long start = System.currentTimeMillis();
// return chain.filter(exchange).doFinally(s -> {
// if ( !globalConfig.isEnable() && ( rlc == null || (rlc.type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT && !rlc.isEnable()) ) ) { long rt = System.currentTimeMillis() - start;
// if (s == SignalType.ON_COMPLETE) {
// traceMap.put("no any rate limit config", null); flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true);
// } else {
// flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null); flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false);
// flowStat.incrRequest(service, currentTimeSlot, null, null); }
// // flowStat.incrRequest(reqPath, currentTimeSlot, null, null); });
// } else { }
// log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + globalConfig + " or " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); }
// }
// return chain.filter(exchange);
// traceMap.put("concurrentOrRpsExceed", concurrentOrRpsExceed); }
// traceMap.put("globalExceed", globalExceed); }
//
// log.info(JacksonUtils.writeValueAsString(traceMap), LogService.BIZ_ID, exchange.getRequest().getId());
//
// if (concurrentOrRpsExceed) {
// if (!globalExceed) {
//
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append("\n concurrentOrRpsExceed is true but globalExceed is false");
// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
//
// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
// }
//
// StringBuilder b = ThreadContext.getStringBuilder();
// b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
// b.append(exceed) .append(rlc.resource) .append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps);
// log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
//
// ServerHttpResponse resp = exchange.getResponse();
// resp.setStatusCode(HttpStatus.OK);
// resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
// return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
//
// } else {
//
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append('\n');
//
// long start = System.currentTimeMillis();
// return chain.filter(exchange)
// .doOnSuccess(
// r -> {
// b.append(" succs ");
// inTheEnd(exchange, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// b.append(" errs ");
// inTheEnd(exchange, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// b.append(" cans ");
// inTheEnd(exchange, start, currentTimeSlot, false);
// }
// )
// .doFinally(
// s -> {
// log.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
// }
// );
// }
// }
//
// return chain.filter(exchange);
// }
//
// private void inTheEnd(ServerWebExchange exchange, long start, long currentTimeSlot, boolean success) {
// long spend = System.currentTimeMillis() - start;
// flowStat.decrConcurrentRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot);
// flowStat.addRequestRT(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, spend, success);
//
// String service = WebUtils.getClientService(exchange);
// flowStat.decrConcurrentRequest(service, currentTimeSlot);
// flowStat.addRequestRT(service, currentTimeSlot, spend, success);
//
// // String reqPath = WebUtils.getClientReqPath(exchange);
// // flowStat.decrConcurrentRequest(reqPath, currentTimeSlot);
// // flowStat.addRequestRT(reqPath, currentTimeSlot, spend, success);
// }
// }

View File

@@ -35,8 +35,8 @@ import java.util.Map;
* @author hongqiaowei * @author hongqiaowei
*/ */
@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER) //@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
@Order(-4) //@Order(-4)
public class GlobalFlowControlFilter extends AbsFlowControlFilter { public class GlobalFlowControlFilter extends AbsFlowControlFilter {
public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter"; public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";

View File

@@ -30,8 +30,8 @@ import we.util.WebUtils;
* @author hongqiaowei * @author hongqiaowei
*/ */
@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER) //@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER)
@Order(-3) //@Order(-3)
public class ServiceFlowControlFilter extends AbsFlowControlFilter { public class ServiceFlowControlFilter extends AbsFlowControlFilter {
public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter"; public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";

View File

@@ -0,0 +1,35 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public enum BlockType {
/**
* Blocked by concurrent request rule
*/
CONCURRENT_REQUEST,
/**
* Blocked by QPS
*/
QPS;
}

View File

@@ -32,7 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import we.stats.IncrRequestResult.BlockType; import we.stats.BlockType;
import we.util.Utils; import we.util.Utils;
/** /**

View File

@@ -24,18 +24,6 @@ package we.stats;
*/ */
public class IncrRequestResult { public class IncrRequestResult {
enum BlockType {
/**
* Blocked by concurrent request rule
*/
CONCURRENT_REQUEST,
/**
* Blocked by QPS
*/
QPS;
}
/** /**
* true if success, otherwise false * true if success, otherwise false
*/ */

View File

@@ -27,11 +27,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import we.stats.IncrRequestResult.BlockType;
import we.util.JacksonUtils; import we.util.JacksonUtils;
/** /**