Support appid level flow control #212

This commit is contained in:
hongqiaowei
2021-06-23 19:35:06 +08:00
committed by GitHub
parent 1ec59d51fc
commit 6733cf08c9
11 changed files with 532 additions and 128 deletions

View File

@@ -68,6 +68,13 @@ public class FlowStatSchedConfig extends SchedConfig {
private static final String _minRespTime = "\"minRespTime\":";
private static final String _maxRespTime = "\"maxRespTime\":";
private static final String _app = "\"app\":";
private static final String _sourceIp = "\"sourceIp\":";
private static final String _service = "\"service\":";
private static final String _path = "\"path\":";
private static final String parentResourceList = "$prl";
@Resource
private FlowStatSchedConfigProperties flowStatSchedConfigProperties;
@@ -85,7 +92,7 @@ public class FlowStatSchedConfig extends SchedConfig {
private long startTimeSlot = 0;
private Map<String, AtomicLong> key2totalBlockMap = new HashMap<>();
private Map<String, AtomicLong> resourceTimeWindow2totalBlockRequestsMap = new HashMap<>(128);
@Scheduled(cron = "${flow-stat-sched.cron}")
public void sched() {
@@ -105,78 +112,139 @@ public class FlowStatSchedConfig extends SchedConfig {
return;
}
key2totalBlockMap.clear();
resourceTimeWindow2totalBlockRequestsMap.clear();
resourceTimeWindowStats.forEach(rtws -> {
String resource = rtws.getResourceId();
List<TimeWindowStat> wins = rtws.getWindows();
wins.forEach(w -> {
AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s",
ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0));
totalBlock.addAndGet(w.getBlockRequests());
long t = w.getStartTime();
long blockRequests = w.getBlockRequests();
resourceTimeWindow2totalBlockRequestsMap.put(resource + t, new AtomicLong(blockRequests));
});
});
resourceTimeWindowStats.forEach(rtws -> {
String resource = rtws.getResourceId();
List<TimeWindowStat> wins = rtws.getWindows();
wins.forEach(w -> {
accumulateParents(resource, w.getStartTime(), w.getBlockRequests());
});
});
resourceTimeWindowStats.forEach(
rtws -> {
String resource = rtws.getResourceId();
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
int id = (config == null ? 0 : config.id);
int type;
if (ResourceRateLimitConfig.GLOBAL.equals(resource)) {
type = ResourceRateLimitConfig.Type.GLOBAL;
} else if (resource.charAt(0) == '/') {
type = ResourceRateLimitConfig.Type.API;
} else {
type = ResourceRateLimitConfig.Type.SERVICE;
}
List<TimeWindowStat> wins = rtws.getWindows();
wins.forEach(
w -> {
StringBuilder b = ThreadContext.getStringBuilder();
Long winStart = w.getStartTime();
BigDecimal rps = w.getRps();
double qps;
if (rps == null) {
qps = 0.00;
} else {
qps = rps.doubleValue();
}
AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
b.append(Constants.Symbol.LEFT_BRACE);
b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA);
b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA);
b.append(_resource); toJsonStringValue(b, resource); b.append(Constants.Symbol.COMMA);
b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA);
b.append(_start); b.append(winStart); b.append(Constants.Symbol.COMMA);
b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA);
b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA);
b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA);
b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA);
b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA);
b.append(_totalBlockReqs); b.append(totalBlockReqs); b.append(Constants.Symbol.COMMA);
b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA);
b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA);
b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA);
b.append(_minRespTime); b.append(w.getMin());
b.append(Constants.Symbol.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
} else {
rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe();
}
if (log.isDebugEnabled()) {
log.debug("report " + toDP19(winStart) + " win10: " + msg);
}
String app = null, pi = null, node = ResourceRateLimitConfig.NODE, service = null, path = null;
int type = ResourceRateLimitConfig.Type.NODE, id = 0;
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
if (c == null) {
service = ResourceRateLimitConfig.getService(resource);
if (service != null) {
type = ResourceRateLimitConfig.Type.SERVICE_DEFAULT;
} else {
app = ResourceRateLimitConfig.getApp(resource);
if (app != null) {
type = ResourceRateLimitConfig.Type.APP_DEFAULT;
}
);
}
} else {
app = c.app;
pi = c.ip;
service = c.service;
path = c.path;
type = c.type;
id = c.id;
}
List<TimeWindowStat> wins = rtws.getWindows();
for (int i = 0; i < wins.size(); i++) {
TimeWindowStat w = wins.get(i);
StringBuilder b = ThreadContext.getStringBuilder();
long timeWin = w.getStartTime();
BigDecimal rps = w.getRps();
double qps;
if (rps == null) {
qps = 0.00;
} else {
qps = rps.doubleValue();
}
AtomicLong totalBlockRequests = resourceTimeWindow2totalBlockRequestsMap.get(resource + timeWin);
long tbrs = (totalBlockRequests == null ? w.getBlockRequests() : totalBlockRequests.longValue());
b.append(Constants.Symbol.LEFT_BRACE);
b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA);
b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA);
String r = null;
if (type == ResourceRateLimitConfig.Type.NODE) {
r = ResourceRateLimitConfig.NODE;
} else if (type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT || type == ResourceRateLimitConfig.Type.SERVICE) {
r = service;
}
if (r != null) {
b.append(_resource); toJsonStringValue(b, r); b.append(Constants.Symbol.COMMA);
}
b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA);
if (app != null) {
b.append(_app); toJsonStringValue(b, app); b.append(Constants.Symbol.COMMA);
}
if (pi != null) {
b.append(_sourceIp); toJsonStringValue(b, pi); b.append(Constants.Symbol.COMMA);
}
if (service != null) {
b.append(_service); toJsonStringValue(b, service); b.append(Constants.Symbol.COMMA);
}
if (path != null) {
b.append(_path); toJsonStringValue(b, path); b.append(Constants.Symbol.COMMA);
}
b.append(_start); b.append(timeWin); b.append(Constants.Symbol.COMMA);
b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA);
b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA);
b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA);
b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA);
b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA);
b.append(_totalBlockReqs); b.append(tbrs); b.append(Constants.Symbol.COMMA);
b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA);
b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA);
b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA);
b.append(_minRespTime); b.append(w.getMin());
b.append(Constants.Symbol.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
} else {
rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe();
}
if (log.isDebugEnabled()) {
log.debug("report " + toDP19(timeWin) + " win10: " + msg);
}
}
}
);
startTimeSlot = recentEndTimeSlot;
log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis()));
if (log.isInfoEnabled()) {
log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis()));
}
}
private void accumulateParents(String resource, long timeWin, long blockRequests) {
List<String> prl = ThreadContext.getArrayList(parentResourceList, String.class);
resourceRateLimitConfigService.getParentsTo(resource, prl);
for (int i = 0; i < prl.size(); i++) {
String parentResource = prl.get(i);
AtomicLong parentTotalBlockRequests = resourceTimeWindow2totalBlockRequestsMap.get(parentResource + timeWin);
if (parentTotalBlockRequests != null) {
parentTotalBlockRequests.addAndGet(blockRequests);
}
}
}
private long getRecentEndTimeSlot(FlowStat flowStat) {
@@ -198,8 +266,7 @@ public class FlowStatSchedConfig extends SchedConfig {
} else {
interval = 0;
}
long recentEndTimeSlot = currentTimeSlot - interval * 1000 - 10 * 1000;
return recentEndTimeSlot;
return currentTimeSlot - interval * 1000 - 10 * 1000;
}
private String toDP19(long startTimeSlot) {

View File

@@ -50,6 +50,8 @@ public class SystemConfig {
public static final String DEFAULT_GATEWAY_TEST_PREFIX = "/_proxytest";
public static final String DEFAULT_GATEWAY_TEST = "_proxytest";
public static final String DEFAULT_GATEWAY_TEST_PREFIX0 = "/_proxytest/";
private String gatewayPrefix = DEFAULT_GATEWAY_PREFIX;

View File

@@ -72,11 +72,11 @@ public class FlowControlController {
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - recent * 1000;
TimeWindowStat timeWindowStat = null;
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.NODE_RESOURCE, startTimeSlot, currentTimeSlot, recent);
if (wins == null || wins.isEmpty()) {
result.put("rps", 0);
} else {
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.NODE_RESOURCE);
result.put("concurrents", concurrents);
timeWindowStat = wins.get(0).getWindows().get(0);
BigDecimal winrps = timeWindowStat.getRps();

View File

@@ -22,6 +22,7 @@ import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +36,11 @@ import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
import we.stats.BlockType;
import we.stats.FlowStat;
import we.stats.IncrRequestResult;
@@ -43,6 +48,8 @@ import we.stats.ResourceConfig;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.JacksonUtils;
import we.util.ThreadContext;
import we.util.WebUtils;
/**
@@ -69,10 +76,15 @@ public class FlowControlFilter extends FizzWebFilter {
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
// @Resource
@Autowired(required = false)
private FlowStat flowStat;
@Resource
private ApiConfigService apiConfigService;
@Resource
private AppService appService;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -81,61 +93,60 @@ public class FlowControlFilter extends FizzWebFilter {
if (secFS == -1) {
return WebUtils.responseError(exchange, HttpStatus.INTERNAL_SERVER_ERROR.value(), "request path should like /optional-prefix/service-name/real-biz-path");
}
String svc = path.substring(1, secFS);
boolean adminReq = false;
if (svc.equals(admin) || svc.equals(actuator)) {
String service = path.substring(1, secFS);
boolean adminReq = false, proxyTestReq = false;
if (service.equals(admin) || service.equals(actuator)) {
adminReq = true;
exchange.getAttributes().put(ADMIN_REQUEST, Constants.Symbol.EMPTY);
} else if (service.equals(SystemConfig.DEFAULT_GATEWAY_TEST)) {
proxyTestReq = true;
} else {
service = WebUtils.getClientService(exchange);
}
if (flowControlFilterProperties.isFlowControl() && !adminReq) {
String service = WebUtils.getClientService(exchange);
// String reqPath = WebUtils.getClientReqPath(exchange);
if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq) {
LogService.setBizId(exchange.getRequest().getId());
if (!apiConfigService.serviceConfigMap.containsKey(service)) {
String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no service " + service, exchange.getRequest().getId());
return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
}
String app = WebUtils.getAppId(exchange);
if (app != null && !appService.getAppMap().containsKey(app)) {
String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no app " + app, exchange.getRequest().getId());
return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
}
path = WebUtils.getClientReqPath(exchange);
String ip = WebUtils.getOriginIp(exchange);
long currentTimeSlot = flowStat.currentTimeSlotId();
ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService
.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ResourceRateLimitConfig serviceConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (serviceConfig == null) {
serviceConfig = resourceRateLimitConfigService
.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
}
// global
List<ResourceConfig> resourceConfigs = new ArrayList<>();
ResourceConfig globalResCfg = new ResourceConfig(ResourceRateLimitConfig.GLOBAL, 0, 0);
if (globalConfig != null && globalConfig.isEnable()) {
globalResCfg.setMaxCon(globalConfig.concurrents);
globalResCfg.setMaxQPS(globalConfig.qps);
}
resourceConfigs.add(globalResCfg);
// service
ResourceConfig serviceResCfg = new ResourceConfig(service, 0, 0);
if (serviceConfig != null && serviceConfig.isEnable()) {
serviceResCfg.setMaxCon(serviceConfig.concurrents);
serviceResCfg.setMaxQPS(serviceConfig.qps);
}
resourceConfigs.add(serviceResCfg);
List<ResourceConfig> resourceConfigs = getFlowControlConfigs(app, ip, null, service, path);
IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot);
if (result != null && !result.isSuccess()) {
String blockedResourceId = result.getBlockedResourceId();
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
log.info("exceed {} flow limit, blocked by maximum concurrent requests",
result.getBlockedResourceId(), LogService.BIZ_ID, exchange.getRequest().getId());
log.info("exceed {} flow limit, blocked by maximum concurrent requests", blockedResourceId, LogService.BIZ_ID, exchange.getRequest().getId());
} else {
log.info("exceed {} flow limit, blocked by maximum QPS", result.getBlockedResourceId(),
LogService.BIZ_ID, exchange.getRequest().getId());
log.info("exceed {} flow limit, blocked by maximum QPS", blockedResourceId, LogService.BIZ_ID, exchange.getRequest().getId());
}
// ResourceRateLimitConfig config = result.getBlockedResourceId().equals(globalConfig.resource)
// ? globalConfig
// : serviceConfig;
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.NODE_RESOURCE);
String rt = c.responseType, rc = c.responseContent;
c = resourceRateLimitConfigService.getResourceRateLimitConfig(blockedResourceId);
if (c != null) {
if (StringUtils.isNotBlank(c.responseType)) {
rt = c.responseType;
}
if (StringUtils.isNotBlank(c.responseContent)) {
rc = c.responseContent;
}
}
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, rt);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rc.getBytes())));
} else {
long start = System.currentTimeMillis();
return chain.filter(exchange).doFinally(s -> {
@@ -151,4 +162,73 @@ public class FlowControlFilter extends FizzWebFilter {
return chain.filter(exchange);
}
private List<ResourceConfig> getFlowControlConfigs(String app, String ip, String node, String service, String path) {
if (log.isDebugEnabled()) {
log.debug("get flow control config by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path);
}
List<ResourceConfig> resourceConfigs = new ArrayList<>(9);
StringBuilder b = ThreadContext.getStringBuilder();
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceRateLimitConfig.NODE, null, null, null);
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, null, ResourceRateLimitConfig.SERVICE_DEFAULT);
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, path, null);
if (app != null) {
checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, null, null, ResourceRateLimitConfig.APP_DEFAULT);
checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, service, null, null);
checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, service, path, null);
}
if (ip != null) {
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, null, null, null);
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, null, null);
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, path, null);
}
if (log.isDebugEnabled()) {
log.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs));
}
return resourceConfigs;
}
private void checkRateLimitConfigAndAddTo(List<ResourceConfig> resourceConfigs, StringBuilder b, String app, String ip, String node, String service, String path, String defaultRateLimitConfigId) {
ResourceRateLimitConfig.buildResourceIdTo(b, app, ip, node, service, path);
String resourceId = b.toString();
checkRateLimitConfigAndAddTo(resourceConfigs, resourceId, defaultRateLimitConfigId);
b.delete(0, b.length());
}
private void checkRateLimitConfigAndAddTo(List<ResourceConfig> resourceConfigs, String resource, String defaultRateLimitConfigId) {
ResourceConfig rc = null;
ResourceRateLimitConfig rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
rc = new ResourceConfig(resource, rateLimitConfig.concurrents, rateLimitConfig.qps);
resourceConfigs.add(rc);
} else {
String node = ResourceRateLimitConfig.getNode(resource);
if (node != null && node.equals(ResourceRateLimitConfig.NODE)) {
rc = new ResourceConfig(resource, 0, 0);
}
if (defaultRateLimitConfigId != null) {
if (defaultRateLimitConfigId.equals(ResourceRateLimitConfig.SERVICE_DEFAULT)) {
rc = new ResourceConfig(resource, 0, 0);
rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT_RESOURCE);
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
rc.setMaxCon(rateLimitConfig.concurrents);
rc.setMaxQPS(rateLimitConfig.qps);
}
}
if (defaultRateLimitConfigId.equals(ResourceRateLimitConfig.APP_DEFAULT)) {
rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.APP_DEFAULT_RESOURCE);
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
rc = new ResourceConfig(resource, rateLimitConfig.concurrents, rateLimitConfig.qps);
}
}
}
if (rc != null) {
resourceConfigs.add(rc);
}
}
}
}

View File

@@ -17,7 +17,10 @@
package we.stats.ratelimit;
import com.fasterxml.jackson.annotation.JsonIgnore;
import we.util.Constants;
import we.util.JacksonUtils;
import we.util.Utils;
/**
* @author hongqiaowei
@@ -26,30 +29,51 @@ import we.util.JacksonUtils;
public class ResourceRateLimitConfig {
public static interface Type {
static final byte GLOBAL = 1;
static final byte NODE = 1;
static final byte SERVICE_DEFAULT = 2;
static final byte SERVICE = 3;
static final byte API = 4;
static final byte APP_DEFAULT = 5;
static final byte APP = 6;
static final byte IP = 7;
}
public static final int DELETED = 1;
public static final int DELETED = 1;
public static final String GLOBAL = "_global";
public static final String NODE = "_global";
public static final String SERVICE_DEFAULT = "service_default";
public static final String NODE_RESOURCE = buildResourceId(null, null, NODE, null, null);
private static final int ENABLE = 1;
public static final String SERVICE_DEFAULT = "service_default";
private static final int UNABLE = 0;
public static final String SERVICE_DEFAULT_RESOURCE = buildResourceId(null, null, null, SERVICE_DEFAULT, null);
public static final String APP_DEFAULT = "app_default";
public static final String APP_DEFAULT_RESOURCE = buildResourceId(APP_DEFAULT, null, null, null, null);
private static final int ENABLE = 1;
private static final int UNABLE = 0;
public int isDeleted = 0;
public int id;
private boolean enable = true;
private boolean enable = true;
public String resource;
public String service;
public String path;
public String app;
public String ip;
public String node;
public byte type;
public long qps;
@@ -72,6 +96,96 @@ public class ResourceRateLimitConfig {
}
}
public void setResource(String r) {
resource = r;
if (!resource.equals(NODE)) {
service = resource;
}
}
public void setType(byte t) {
type = t;
if (type == Type.NODE) {
node = NODE;
} else if (type == Type.SERVICE_DEFAULT) {
service = SERVICE_DEFAULT;
} else if (type == Type.APP_DEFAULT) {
app = APP_DEFAULT;
}
}
private String resourceId = null;
@JsonIgnore
public String getResourceId() {
if (resourceId == null) {
resourceId =
(app == null ? "" : app) + '@' +
(ip == null ? "" : ip) + '@' +
(node == null ? "" : node) + '@' +
(service == null ? "" : service) + '@' +
(path == null ? "" : path)
;
}
return resourceId;
}
public static String buildResourceId(String app, String ip, String node, String service, String path) {
StringBuilder b = new StringBuilder(32);
buildResourceIdTo(b, app, ip, node, service, path);
return b.toString();
}
public static void buildResourceIdTo(StringBuilder b, String app, String ip, String node, String service, String path) {
b.append(app == null ? Constants.Symbol.EMPTY : app) .append(Constants.Symbol.AT);
b.append(ip == null ? Constants.Symbol.EMPTY : ip) .append(Constants.Symbol.AT);
b.append(node == null ? Constants.Symbol.EMPTY : node) .append(Constants.Symbol.AT);
b.append(service == null ? Constants.Symbol.EMPTY : service) .append(Constants.Symbol.AT);
b.append(path == null ? Constants.Symbol.EMPTY : path);
}
public static String getApp(String resource) {
int i = resource.indexOf(Constants.Symbol.AT);
if (i == 0) {
return null;
} else {
return resource.substring(0, i);
}
}
public static String getIp(String resource) {
String extract = Utils.extract(resource, Constants.Symbol.AT, 1);
if (extract.equals(Constants.Symbol.EMPTY)) {
return null;
}
return extract;
}
public static String getNode(String resource) {
String extract = Utils.extract(resource, Constants.Symbol.AT, 2);
if (extract.equals(Constants.Symbol.EMPTY)) {
return null;
}
return extract;
}
public static String getService(String resource) {
String extract = Utils.extract(resource, Constants.Symbol.AT, 3);
if (extract.equals(Constants.Symbol.EMPTY)) {
return null;
}
return extract;
}
public static String getPath(String resource) {
int i = resource.lastIndexOf(Constants.Symbol.AT);
if (i == resource.length() - 1) {
return null;
} else {
return resource.substring(i);
}
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);

View File

@@ -27,13 +27,11 @@ import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import we.util.ThreadContext;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -130,7 +128,7 @@ public class ResourceRateLimitConfigService {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) {
resourceRateLimitConfigMap.remove(r.resource);
resourceRateLimitConfigMap.remove(r.getResourceId());
}
updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMap);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) {
@@ -158,11 +156,11 @@ public class ResourceRateLimitConfigService {
private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc,
Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMap) {
if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) {
ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource);
ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.getResourceId());
log.info("remove " + removedRrlc);
} else {
ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.resource);
resourceRateLimitConfigMap.put(rrlc.resource, rrlc);
ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.getResourceId());
resourceRateLimitConfigMap.put(rrlc.getResourceId(), rrlc);
if (existRrlc == null) {
log.info("add " + rrlc);
} else {
@@ -182,4 +180,71 @@ public class ResourceRateLimitConfigService {
public Map<String, ResourceRateLimitConfig> getResourceRateLimitConfigMap() {
return resourceRateLimitConfigMap;
}
public void getParentsTo(String resource, List<String> parentList) {
String app = null, ip = null, node = null, service = null, path = null;
ResourceRateLimitConfig c = resourceRateLimitConfigMap.get(resource);
if (c == null) {
service = ResourceRateLimitConfig.getService(resource);
if (service != null) {
parentList.add(ResourceRateLimitConfig.NODE_RESOURCE);
}
return;
} else {
if (c.type == ResourceRateLimitConfig.Type.NODE) {
return;
}
if (c.type == ResourceRateLimitConfig.Type.SERVICE) {
parentList.add(ResourceRateLimitConfig.NODE_RESOURCE);
return;
}
app = c.app;
ip = c.ip;
service = c.service;
path = c.path;
}
StringBuilder b = ThreadContext.getStringBuilder();
if (app != null) {
if (path != null) {
ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, service, null);
checkRateLimitConfigAndAddTo(b, parentList);
ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, null, null);
checkRateLimitConfigAndAddTo(b, parentList);
} else if (service != null) {
ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, null, null);
checkRateLimitConfigAndAddTo(b, parentList);
}
}
if (ip != null) {
if (path != null) {
ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, service, null);
checkRateLimitConfigAndAddTo(b, parentList);
ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, null, null);
checkRateLimitConfigAndAddTo(b, parentList);
} else if (service != null) {
ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, null, null);
checkRateLimitConfigAndAddTo(b, parentList);
}
}
if (path != null) {
ResourceRateLimitConfig.buildResourceIdTo(b, null, null, null, service, null);
parentList.add(b.toString());
b.delete(0, b.length());
}
parentList.add(ResourceRateLimitConfig.NODE_RESOURCE);
}
private void checkRateLimitConfigAndAddTo(StringBuilder resourceStringBuilder, List<String> resourceList) {
String r = resourceStringBuilder.toString();
ResourceRateLimitConfig c = resourceRateLimitConfigMap.get(r);
if (c != null) {
resourceList.add(r);
}
resourceStringBuilder.delete(0, resourceStringBuilder.length());
}
}

View File

@@ -52,16 +52,24 @@ public class FlowControlFilterTests {
// @Test
void flowControlFilterTest() throws NoSuchFieldException, InterruptedException {
FlowControlFilter flowControlFilter = new FlowControlFilter();
ReflectionUtils.set(flowControlFilter, "flowControl", true);
FlowControlFilter filter = new FlowControlFilter();
FlowControlFilterProperties flowControlFilterProperties = new FlowControlFilterProperties();
ReflectionUtils.set(flowControlFilterProperties, "flowControl", true);
ReflectionUtils.set(filter, "flowControlFilterProperties", flowControlFilterProperties);
FlowStat flowStat = new FlowStat();
ReflectionUtils.set(flowControlFilter, "flowStat", flowStat);
ReflectionUtils.set(filter, "flowStat", flowStat);
ResourceRateLimitConfigService resourceRateLimitConfigService = new ResourceRateLimitConfigService();
Map<String, ResourceRateLimitConfig> map = resourceRateLimitConfigService.getResourceRateLimitConfigMap();
ResourceRateLimitConfig config = JacksonUtils.readValue("{\"concurrents\":66,\"enable\":1,\"id\":1,\"isDeleted\":0,\"resource\":\"_global\",\"type\":1}", ResourceRateLimitConfig.class);
map.put(ResourceRateLimitConfig.GLOBAL, config);
ReflectionUtils.set(flowControlFilter, "resourceRateLimitConfigService", resourceRateLimitConfigService);
map.put(ResourceRateLimitConfig.NODE_RESOURCE, config);
config = JacksonUtils.readValue("{\"concurrents\":33,\"enable\":1,\"id\":2,\"isDeleted\":0, \"service\":\"xservice\", \"path\":\"/ypath\", \"type\":4}", ResourceRateLimitConfig.class);
map.put(config.getResourceId(), config);
ReflectionUtils.set(filter, "resourceRateLimitConfigService", resourceRateLimitConfigService);
WebTestClient client = WebTestClient.bindToWebHandler(
new WebHandler() {
@@ -74,14 +82,24 @@ public class FlowControlFilterTests {
}
}
)
.webFilter(flowControlFilter)
.webFilter(filter)
.build();
client.get().uri("/proxy/xservice/ypath").exchange();
Thread.sleep(1000);
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - 10 * 1000;
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats("xservice", startTimeSlot, currentTimeSlot, 10);
// System.err.println(JacksonUtils.writeValueAsString(flowStat.resourceStats));
String xservice = ResourceRateLimitConfig.buildResourceId(null, null, null, "xservice", null);
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(xservice, startTimeSlot, currentTimeSlot, 10);
TimeWindowStat win = resourceTimeWindowStats.get(0).getWindows().get(0);
assertEquals(win.getCompReqs(), 1);
String xserviceYpath = ResourceRateLimitConfig.buildResourceId(null, null, null, "xservice", "/ypath");
resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(xserviceYpath, startTimeSlot, currentTimeSlot, 10);
win = resourceTimeWindowStats.get(0).getWindows().get(0);
assertEquals(win.getCompReqs(), 1);
}
}

View File

@@ -9,9 +9,12 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import we.redis.RedisProperties;
import we.redis.RedisServerConfiguration;
import we.redis.RedisTemplateConfiguration;
import we.util.JacksonUtils;
import javax.annotation.Resource;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -46,11 +49,17 @@ public class ResourceRateLimitConfigServiceTests {
@Test
void initTest() throws Throwable {
stringRedisTemplate.opsForHash().put("fizz_rate_limit", "2", "{\"concurrents\":66,\"enable\":1,\"id\":2,\"isDeleted\":0,\"resource\":\"service_default\",\"type\":2}");
stringRedisTemplate.opsForHash().put("fizz_rate_limit", "3", "{\"concurrents\":88,\"enable\":1,\"id\":3,\"isDeleted\":0, \"type\":6, \"app\":\"xapp\", \"service\":\"yservice\" }");
resourceRateLimitConfigService.init();
ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("service_default");
ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT_RESOURCE);
// Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMap = resourceRateLimitConfigService.getResourceRateLimitConfigMap();
// System.err.println(JacksonUtils.writeValueAsString(resourceRateLimitConfigMap));
assertEquals(resourceRateLimitConfig.concurrents, 66);
// System.err.println(resourceRateLimitConfig);
// Thread.currentThread().join();
resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("xapp@@@yservice@");
assertEquals(resourceRateLimitConfig.concurrents, 88);
Thread.sleep(4000);
// System.err.println("init test end");
}

View File

@@ -17,4 +17,31 @@ public class ResourceRateLimitConfigTests {
ResourceRateLimitConfig resourceRateLimitConfig = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
assertEquals("application/json; charset=UTF-8", resourceRateLimitConfig.responseType);
}
@Test
void resourceIdTest() {
String resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":1, \"resource\":\"_global\" }";
ResourceRateLimitConfig c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
String resourceId = c.getResourceId();
assertEquals("@@_global@@", resourceId);
String node = ResourceRateLimitConfig.getNode(resourceId);
assertEquals("_global", node);
resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":2, \"resource\":\"service_default\" }";
c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
resourceId = c.getResourceId();
assertEquals("@@@service_default@", resourceId);
resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":3, \"resource\":\"xservice\" }";
c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
resourceId = c.getResourceId();
assertEquals("@@@xservice@", resourceId);
resourceId = ResourceRateLimitConfig.buildResourceId(null, null, ResourceRateLimitConfig.NODE, null, null);
assertEquals("@@_global@@", resourceId);
resourceId = ResourceRateLimitConfig.buildResourceId(null, "192.168.1.1", null, "xservice", null);
assertEquals("@192.168.1.1@@xservice@", resourceId);
}
}