diff --git a/fizz-common/src/main/java/we/util/NetworkUtils.java b/fizz-common/src/main/java/we/util/NetworkUtils.java index dc6a936..01a151b 100644 --- a/fizz-common/src/main/java/we/util/NetworkUtils.java +++ b/fizz-common/src/main/java/we/util/NetworkUtils.java @@ -42,7 +42,7 @@ public class NetworkUtils { private static String serverIp; - private static final String SERVER_IP = "SERVER_IP"; + private static final String SERVER_IP = "SERVER_IP"; public static String getServerIp() { try { diff --git a/fizz-common/src/main/java/we/util/Utils.java b/fizz-common/src/main/java/we/util/Utils.java index 8b64755..1d9976c 100644 --- a/fizz-common/src/main/java/we/util/Utils.java +++ b/fizz-common/src/main/java/we/util/Utils.java @@ -71,6 +71,28 @@ public abstract class Utils { b.append(k).append(c).append(v).append(separator); } + public static String extract(String str, char separator, int nx) { + int begin = 0, end = 0, n = 0, ny = nx + 1, l = str.length(); + for (int i = 0; i < l; i++) { + char c = str.charAt(i); + if (c == separator) { + n++; + if (n == nx) { + begin = i + 1; + } else if (n == ny) { + end = i; + break; + } + } + } + if (begin == 0) { + return Constants.Symbol.EMPTY; + } else if (end == 0) { + end = l; + } + return str.substring(begin, end); + } + public static String initials2lowerCase(String s) { if (StringUtils.isBlank(s)) { return s; diff --git a/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java b/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java index 017322a..2ab16df 100644 --- a/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java +++ b/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java @@ -68,6 +68,13 @@ public class FlowStatSchedConfig extends SchedConfig { private static final String _minRespTime = "\"minRespTime\":"; private static final String _maxRespTime = "\"maxRespTime\":"; + private static final String _app = "\"app\":"; + private static final String _sourceIp = "\"sourceIp\":"; + private static final String _service = "\"service\":"; + private static final String _path = "\"path\":"; + + private static final String parentResourceList = "$prl"; + @Resource private FlowStatSchedConfigProperties flowStatSchedConfigProperties; @@ -85,7 +92,7 @@ public class FlowStatSchedConfig extends SchedConfig { private long startTimeSlot = 0; - private Map key2totalBlockMap = new HashMap<>(); + private Map resourceTimeWindow2totalBlockRequestsMap = new HashMap<>(128); @Scheduled(cron = "${flow-stat-sched.cron}") public void sched() { @@ -105,78 +112,139 @@ public class FlowStatSchedConfig extends SchedConfig { return; } - key2totalBlockMap.clear(); + resourceTimeWindow2totalBlockRequestsMap.clear(); resourceTimeWindowStats.forEach(rtws -> { + String resource = rtws.getResourceId(); List wins = rtws.getWindows(); wins.forEach(w -> { - AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s", - ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0)); - totalBlock.addAndGet(w.getBlockRequests()); + long t = w.getStartTime(); + long blockRequests = w.getBlockRequests(); + resourceTimeWindow2totalBlockRequestsMap.put(resource + t, new AtomicLong(blockRequests)); + }); + }); + + resourceTimeWindowStats.forEach(rtws -> { + String resource = rtws.getResourceId(); + List wins = rtws.getWindows(); + wins.forEach(w -> { + accumulateParents(resource, w.getStartTime(), w.getBlockRequests()); }); }); resourceTimeWindowStats.forEach( rtws -> { String resource = rtws.getResourceId(); - ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource); - int id = (config == null ? 0 : config.id); - int type; - if (ResourceRateLimitConfig.GLOBAL.equals(resource)) { - type = ResourceRateLimitConfig.Type.GLOBAL; - } else if (resource.charAt(0) == '/') { - type = ResourceRateLimitConfig.Type.API; - } else { - type = ResourceRateLimitConfig.Type.SERVICE; - } - List wins = rtws.getWindows(); - wins.forEach( - w -> { - StringBuilder b = ThreadContext.getStringBuilder(); - Long winStart = w.getStartTime(); - BigDecimal rps = w.getRps(); - double qps; - if (rps == null) { - qps = 0.00; - } else { - qps = rps.doubleValue(); - } - - AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart)); - Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests(); - - b.append(Constants.Symbol.LEFT_BRACE); - b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA); - b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA); - b.append(_resource); toJsonStringValue(b, resource); b.append(Constants.Symbol.COMMA); - b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA); - b.append(_start); b.append(winStart); b.append(Constants.Symbol.COMMA); - b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA); - b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA); - b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA); - b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA); - b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA); - b.append(_totalBlockReqs); b.append(totalBlockReqs); b.append(Constants.Symbol.COMMA); - b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA); - b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA); - b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA); - b.append(_minRespTime); b.append(w.getMin()); - b.append(Constants.Symbol.RIGHT_BRACE); - String msg = b.toString(); - if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use - log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue())); - } else { - rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe(); - } - if (log.isDebugEnabled()) { - log.debug("report " + toDP19(winStart) + " win10: " + msg); - } + String app = null, pi = null, node = ResourceRateLimitConfig.NODE, service = null, path = null; + int type = ResourceRateLimitConfig.Type.NODE, id = 0; + ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(resource); + if (c == null) { + service = ResourceRateLimitConfig.getService(resource); + if (service != null) { + type = ResourceRateLimitConfig.Type.SERVICE_DEFAULT; + } else { + app = ResourceRateLimitConfig.getApp(resource); + if (app != null) { + type = ResourceRateLimitConfig.Type.APP_DEFAULT; } - ); + } + } else { + app = c.app; + pi = c.ip; + service = c.service; + path = c.path; + type = c.type; + id = c.id; + } + + List wins = rtws.getWindows(); + for (int i = 0; i < wins.size(); i++) { + TimeWindowStat w = wins.get(i); + StringBuilder b = ThreadContext.getStringBuilder(); + long timeWin = w.getStartTime(); + BigDecimal rps = w.getRps(); + double qps; + if (rps == null) { + qps = 0.00; + } else { + qps = rps.doubleValue(); + } + + AtomicLong totalBlockRequests = resourceTimeWindow2totalBlockRequestsMap.get(resource + timeWin); + long tbrs = (totalBlockRequests == null ? w.getBlockRequests() : totalBlockRequests.longValue()); + + b.append(Constants.Symbol.LEFT_BRACE); + b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA); + b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA); + + String r = null; + if (type == ResourceRateLimitConfig.Type.NODE) { + r = ResourceRateLimitConfig.NODE; + } else if (type == ResourceRateLimitConfig.Type.SERVICE_DEFAULT || type == ResourceRateLimitConfig.Type.SERVICE) { + r = service; + } + if (r != null) { + b.append(_resource); toJsonStringValue(b, r); b.append(Constants.Symbol.COMMA); + } + + b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA); + + if (app != null) { + b.append(_app); toJsonStringValue(b, app); b.append(Constants.Symbol.COMMA); + } + + if (pi != null) { + b.append(_sourceIp); toJsonStringValue(b, pi); b.append(Constants.Symbol.COMMA); + } + + if (service != null) { + b.append(_service); toJsonStringValue(b, service); b.append(Constants.Symbol.COMMA); + } + + if (path != null) { + b.append(_path); toJsonStringValue(b, path); b.append(Constants.Symbol.COMMA); + } + + b.append(_start); b.append(timeWin); b.append(Constants.Symbol.COMMA); + b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA); + b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA); + b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA); + b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA); + b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA); + b.append(_totalBlockReqs); b.append(tbrs); b.append(Constants.Symbol.COMMA); + b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA); + b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA); + b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA); + b.append(_minRespTime); b.append(w.getMin()); + b.append(Constants.Symbol.RIGHT_BRACE); + String msg = b.toString(); + if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use + log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue())); + } else { + rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe(); + } + if (log.isDebugEnabled()) { + log.debug("report " + toDP19(timeWin) + " win10: " + msg); + } + } } ); startTimeSlot = recentEndTimeSlot; - log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis())); + if (log.isInfoEnabled()) { + log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis())); + } + } + + private void accumulateParents(String resource, long timeWin, long blockRequests) { + List prl = ThreadContext.getArrayList(parentResourceList, String.class); + resourceRateLimitConfigService.getParentsTo(resource, prl); + for (int i = 0; i < prl.size(); i++) { + String parentResource = prl.get(i); + AtomicLong parentTotalBlockRequests = resourceTimeWindow2totalBlockRequestsMap.get(parentResource + timeWin); + if (parentTotalBlockRequests != null) { + parentTotalBlockRequests.addAndGet(blockRequests); + } + } } private long getRecentEndTimeSlot(FlowStat flowStat) { @@ -198,8 +266,7 @@ public class FlowStatSchedConfig extends SchedConfig { } else { interval = 0; } - long recentEndTimeSlot = currentTimeSlot - interval * 1000 - 10 * 1000; - return recentEndTimeSlot; + return currentTimeSlot - interval * 1000 - 10 * 1000; } private String toDP19(long startTimeSlot) { diff --git a/fizz-core/src/main/java/we/config/SystemConfig.java b/fizz-core/src/main/java/we/config/SystemConfig.java index 2c559b8..e3fbe53 100644 --- a/fizz-core/src/main/java/we/config/SystemConfig.java +++ b/fizz-core/src/main/java/we/config/SystemConfig.java @@ -50,6 +50,8 @@ public class SystemConfig { public static final String DEFAULT_GATEWAY_TEST_PREFIX = "/_proxytest"; + public static final String DEFAULT_GATEWAY_TEST = "_proxytest"; + public static final String DEFAULT_GATEWAY_TEST_PREFIX0 = "/_proxytest/"; private String gatewayPrefix = DEFAULT_GATEWAY_PREFIX; diff --git a/fizz-core/src/main/java/we/controller/FlowControlController.java b/fizz-core/src/main/java/we/controller/FlowControlController.java index 0ea8944..780e777 100644 --- a/fizz-core/src/main/java/we/controller/FlowControlController.java +++ b/fizz-core/src/main/java/we/controller/FlowControlController.java @@ -72,11 +72,11 @@ public class FlowControlController { long currentTimeSlot = flowStat.currentTimeSlotId(); long startTimeSlot = currentTimeSlot - recent * 1000; TimeWindowStat timeWindowStat = null; - List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent); + List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.NODE_RESOURCE, startTimeSlot, currentTimeSlot, recent); if (wins == null || wins.isEmpty()) { result.put("rps", 0); } else { - concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL); + concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.NODE_RESOURCE); result.put("concurrents", concurrents); timeWindowStat = wins.get(0).getWindows().get(0); BigDecimal winrps = timeWindowStat.getRps(); diff --git a/fizz-core/src/main/java/we/filter/FlowControlFilter.java b/fizz-core/src/main/java/we/filter/FlowControlFilter.java index 37f2832..55a66fa 100644 --- a/fizz-core/src/main/java/we/filter/FlowControlFilter.java +++ b/fizz-core/src/main/java/we/filter/FlowControlFilter.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Resource; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,7 +36,11 @@ import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; +import we.config.SystemConfig; import we.flume.clients.log4j2appender.LogService; +import we.legacy.RespEntity; +import we.plugin.auth.ApiConfigService; +import we.plugin.auth.AppService; import we.stats.BlockType; import we.stats.FlowStat; import we.stats.IncrRequestResult; @@ -43,6 +48,8 @@ import we.stats.ResourceConfig; import we.stats.ratelimit.ResourceRateLimitConfig; import we.stats.ratelimit.ResourceRateLimitConfigService; import we.util.Constants; +import we.util.JacksonUtils; +import we.util.ThreadContext; import we.util.WebUtils; /** @@ -69,10 +76,15 @@ public class FlowControlFilter extends FizzWebFilter { @Resource private ResourceRateLimitConfigService resourceRateLimitConfigService; - // @Resource @Autowired(required = false) private FlowStat flowStat; + @Resource + private ApiConfigService apiConfigService; + + @Resource + private AppService appService; + @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { @@ -81,61 +93,60 @@ public class FlowControlFilter extends FizzWebFilter { if (secFS == -1) { return WebUtils.responseError(exchange, HttpStatus.INTERNAL_SERVER_ERROR.value(), "request path should like /optional-prefix/service-name/real-biz-path"); } - String svc = path.substring(1, secFS); - boolean adminReq = false; - if (svc.equals(admin) || svc.equals(actuator)) { + String service = path.substring(1, secFS); + boolean adminReq = false, proxyTestReq = false; + if (service.equals(admin) || service.equals(actuator)) { adminReq = true; exchange.getAttributes().put(ADMIN_REQUEST, Constants.Symbol.EMPTY); + } else if (service.equals(SystemConfig.DEFAULT_GATEWAY_TEST)) { + proxyTestReq = true; + } else { + service = WebUtils.getClientService(exchange); } - if (flowControlFilterProperties.isFlowControl() && !adminReq) { - String service = WebUtils.getClientService(exchange); -// String reqPath = WebUtils.getClientReqPath(exchange); + if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq) { + LogService.setBizId(exchange.getRequest().getId()); + if (!apiConfigService.serviceConfigMap.containsKey(service)) { + String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no service " + service, exchange.getRequest().getId()); + return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json); + } + String app = WebUtils.getAppId(exchange); + if (app != null && !appService.getAppMap().containsKey(app)) { + String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no app " + app, exchange.getRequest().getId()); + return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json); + } + path = WebUtils.getClientReqPath(exchange); + String ip = WebUtils.getOriginIp(exchange); + long currentTimeSlot = flowStat.currentTimeSlotId(); - ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService - .getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL); - ResourceRateLimitConfig serviceConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(service); - if (serviceConfig == null) { - serviceConfig = resourceRateLimitConfigService - .getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT); - } - - // global - List resourceConfigs = new ArrayList<>(); - ResourceConfig globalResCfg = new ResourceConfig(ResourceRateLimitConfig.GLOBAL, 0, 0); - if (globalConfig != null && globalConfig.isEnable()) { - globalResCfg.setMaxCon(globalConfig.concurrents); - globalResCfg.setMaxQPS(globalConfig.qps); - } - resourceConfigs.add(globalResCfg); - - // service - ResourceConfig serviceResCfg = new ResourceConfig(service, 0, 0); - if (serviceConfig != null && serviceConfig.isEnable()) { - serviceResCfg.setMaxCon(serviceConfig.concurrents); - serviceResCfg.setMaxQPS(serviceConfig.qps); - } - resourceConfigs.add(serviceResCfg); - + List resourceConfigs = getFlowControlConfigs(app, ip, null, service, path); IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot); if (result != null && !result.isSuccess()) { + String blockedResourceId = result.getBlockedResourceId(); if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) { - log.info("exceed {} flow limit, blocked by maximum concurrent requests", - result.getBlockedResourceId(), LogService.BIZ_ID, exchange.getRequest().getId()); + log.info("exceed {} flow limit, blocked by maximum concurrent requests", blockedResourceId, LogService.BIZ_ID, exchange.getRequest().getId()); } else { - log.info("exceed {} flow limit, blocked by maximum QPS", result.getBlockedResourceId(), - LogService.BIZ_ID, exchange.getRequest().getId()); + log.info("exceed {} flow limit, blocked by maximum QPS", blockedResourceId, LogService.BIZ_ID, exchange.getRequest().getId()); } -// ResourceRateLimitConfig config = result.getBlockedResourceId().equals(globalConfig.resource) -// ? globalConfig -// : serviceConfig; + ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.NODE_RESOURCE); + String rt = c.responseType, rc = c.responseContent; + c = resourceRateLimitConfigService.getResourceRateLimitConfig(blockedResourceId); + if (c != null) { + if (StringUtils.isNotBlank(c.responseType)) { + rt = c.responseType; + } + if (StringUtils.isNotBlank(c.responseContent)) { + rc = c.responseContent; + } + } ServerHttpResponse resp = exchange.getResponse(); resp.setStatusCode(HttpStatus.OK); - resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType); - return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes()))); + resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, rt); + return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rc.getBytes()))); + } else { long start = System.currentTimeMillis(); return chain.filter(exchange).doFinally(s -> { @@ -151,4 +162,73 @@ public class FlowControlFilter extends FizzWebFilter { return chain.filter(exchange); } + + private List getFlowControlConfigs(String app, String ip, String node, String service, String path) { + if (log.isDebugEnabled()) { + log.debug("get flow control config by app={}, ip={}, node={}, service={}, path={}", app, ip, node, service, path); + } + List resourceConfigs = new ArrayList<>(9); + StringBuilder b = ThreadContext.getStringBuilder(); + + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceRateLimitConfig.NODE, null, null, null); + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, null, ResourceRateLimitConfig.SERVICE_DEFAULT); + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, path, null); + + if (app != null) { + checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, null, null, ResourceRateLimitConfig.APP_DEFAULT); + checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, service, null, null); + checkRateLimitConfigAndAddTo(resourceConfigs, b, app, null, null, service, path, null); + } + + if (ip != null) { + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, null, null, null); + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, null, null); + checkRateLimitConfigAndAddTo(resourceConfigs, b, null, ip, null, service, path, null); + } + + if (log.isDebugEnabled()) { + log.debug("resource configs: " + JacksonUtils.writeValueAsString(resourceConfigs)); + } + return resourceConfigs; + } + + private void checkRateLimitConfigAndAddTo(List resourceConfigs, StringBuilder b, String app, String ip, String node, String service, String path, String defaultRateLimitConfigId) { + ResourceRateLimitConfig.buildResourceIdTo(b, app, ip, node, service, path); + String resourceId = b.toString(); + checkRateLimitConfigAndAddTo(resourceConfigs, resourceId, defaultRateLimitConfigId); + b.delete(0, b.length()); + } + + private void checkRateLimitConfigAndAddTo(List resourceConfigs, String resource, String defaultRateLimitConfigId) { + ResourceConfig rc = null; + ResourceRateLimitConfig rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(resource); + if (rateLimitConfig != null && rateLimitConfig.isEnable()) { + rc = new ResourceConfig(resource, rateLimitConfig.concurrents, rateLimitConfig.qps); + resourceConfigs.add(rc); + } else { + String node = ResourceRateLimitConfig.getNode(resource); + if (node != null && node.equals(ResourceRateLimitConfig.NODE)) { + rc = new ResourceConfig(resource, 0, 0); + } + if (defaultRateLimitConfigId != null) { + if (defaultRateLimitConfigId.equals(ResourceRateLimitConfig.SERVICE_DEFAULT)) { + rc = new ResourceConfig(resource, 0, 0); + rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT_RESOURCE); + if (rateLimitConfig != null && rateLimitConfig.isEnable()) { + rc.setMaxCon(rateLimitConfig.concurrents); + rc.setMaxQPS(rateLimitConfig.qps); + } + } + if (defaultRateLimitConfigId.equals(ResourceRateLimitConfig.APP_DEFAULT)) { + rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.APP_DEFAULT_RESOURCE); + if (rateLimitConfig != null && rateLimitConfig.isEnable()) { + rc = new ResourceConfig(resource, rateLimitConfig.concurrents, rateLimitConfig.qps); + } + } + } + if (rc != null) { + resourceConfigs.add(rc); + } + } + } } diff --git a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java index 596538a..58a499c 100644 --- a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java +++ b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java @@ -17,7 +17,10 @@ package we.stats.ratelimit; +import com.fasterxml.jackson.annotation.JsonIgnore; +import we.util.Constants; import we.util.JacksonUtils; +import we.util.Utils; /** * @author hongqiaowei @@ -26,30 +29,51 @@ import we.util.JacksonUtils; public class ResourceRateLimitConfig { public static interface Type { - static final byte GLOBAL = 1; + static final byte NODE = 1; static final byte SERVICE_DEFAULT = 2; static final byte SERVICE = 3; static final byte API = 4; + static final byte APP_DEFAULT = 5; + static final byte APP = 6; + static final byte IP = 7; } - public static final int DELETED = 1; + public static final int DELETED = 1; - public static final String GLOBAL = "_global"; + public static final String NODE = "_global"; - public static final String SERVICE_DEFAULT = "service_default"; + public static final String NODE_RESOURCE = buildResourceId(null, null, NODE, null, null); - private static final int ENABLE = 1; + public static final String SERVICE_DEFAULT = "service_default"; - private static final int UNABLE = 0; + public static final String SERVICE_DEFAULT_RESOURCE = buildResourceId(null, null, null, SERVICE_DEFAULT, null); + + public static final String APP_DEFAULT = "app_default"; + + public static final String APP_DEFAULT_RESOURCE = buildResourceId(APP_DEFAULT, null, null, null, null); + + private static final int ENABLE = 1; + + private static final int UNABLE = 0; public int isDeleted = 0; public int id; - private boolean enable = true; + private boolean enable = true; public String resource; + public String service; + + public String path; + + public String app; + + public String ip; + + public String node; + public byte type; public long qps; @@ -72,6 +96,96 @@ public class ResourceRateLimitConfig { } } + public void setResource(String r) { + resource = r; + if (!resource.equals(NODE)) { + service = resource; + } + } + + public void setType(byte t) { + type = t; + if (type == Type.NODE) { + node = NODE; + } else if (type == Type.SERVICE_DEFAULT) { + service = SERVICE_DEFAULT; + } else if (type == Type.APP_DEFAULT) { + app = APP_DEFAULT; + } + } + + private String resourceId = null; + + @JsonIgnore + public String getResourceId() { + if (resourceId == null) { + resourceId = + (app == null ? "" : app) + '@' + + (ip == null ? "" : ip) + '@' + + (node == null ? "" : node) + '@' + + (service == null ? "" : service) + '@' + + (path == null ? "" : path) + ; + } + return resourceId; + } + + public static String buildResourceId(String app, String ip, String node, String service, String path) { + StringBuilder b = new StringBuilder(32); + buildResourceIdTo(b, app, ip, node, service, path); + return b.toString(); + } + + public static void buildResourceIdTo(StringBuilder b, String app, String ip, String node, String service, String path) { + b.append(app == null ? Constants.Symbol.EMPTY : app) .append(Constants.Symbol.AT); + b.append(ip == null ? Constants.Symbol.EMPTY : ip) .append(Constants.Symbol.AT); + b.append(node == null ? Constants.Symbol.EMPTY : node) .append(Constants.Symbol.AT); + b.append(service == null ? Constants.Symbol.EMPTY : service) .append(Constants.Symbol.AT); + b.append(path == null ? Constants.Symbol.EMPTY : path); + } + + public static String getApp(String resource) { + int i = resource.indexOf(Constants.Symbol.AT); + if (i == 0) { + return null; + } else { + return resource.substring(0, i); + } + } + + public static String getIp(String resource) { + String extract = Utils.extract(resource, Constants.Symbol.AT, 1); + if (extract.equals(Constants.Symbol.EMPTY)) { + return null; + } + return extract; + } + + public static String getNode(String resource) { + String extract = Utils.extract(resource, Constants.Symbol.AT, 2); + if (extract.equals(Constants.Symbol.EMPTY)) { + return null; + } + return extract; + } + + public static String getService(String resource) { + String extract = Utils.extract(resource, Constants.Symbol.AT, 3); + if (extract.equals(Constants.Symbol.EMPTY)) { + return null; + } + return extract; + } + + public static String getPath(String resource) { + int i = resource.lastIndexOf(Constants.Symbol.AT); + if (i == resource.length() - 1) { + return null; + } else { + return resource.substring(i); + } + } + @Override public String toString() { return JacksonUtils.writeValueAsString(this); diff --git a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java index 9d84f9b..6f046da 100644 --- a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java +++ b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java @@ -27,13 +27,11 @@ import we.config.AggregateRedisConfig; import we.flume.clients.log4j2appender.LogService; import we.util.JacksonUtils; import we.util.ReactorUtils; +import we.util.ThreadContext; import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -130,7 +128,7 @@ public class ResourceRateLimitConfigService { ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class); ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id); if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) { - resourceRateLimitConfigMap.remove(r.resource); + resourceRateLimitConfigMap.remove(r.getResourceId()); } updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMap); if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) { @@ -158,11 +156,11 @@ public class ResourceRateLimitConfigService { private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc, Map resourceRateLimitConfigMap) { if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) { - ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource); + ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.getResourceId()); log.info("remove " + removedRrlc); } else { - ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.resource); - resourceRateLimitConfigMap.put(rrlc.resource, rrlc); + ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.getResourceId()); + resourceRateLimitConfigMap.put(rrlc.getResourceId(), rrlc); if (existRrlc == null) { log.info("add " + rrlc); } else { @@ -182,4 +180,71 @@ public class ResourceRateLimitConfigService { public Map getResourceRateLimitConfigMap() { return resourceRateLimitConfigMap; } + + public void getParentsTo(String resource, List parentList) { + String app = null, ip = null, node = null, service = null, path = null; + ResourceRateLimitConfig c = resourceRateLimitConfigMap.get(resource); + if (c == null) { + service = ResourceRateLimitConfig.getService(resource); + if (service != null) { + parentList.add(ResourceRateLimitConfig.NODE_RESOURCE); + } + return; + } else { + if (c.type == ResourceRateLimitConfig.Type.NODE) { + return; + } + if (c.type == ResourceRateLimitConfig.Type.SERVICE) { + parentList.add(ResourceRateLimitConfig.NODE_RESOURCE); + return; + } + app = c.app; + ip = c.ip; + service = c.service; + path = c.path; + } + + StringBuilder b = ThreadContext.getStringBuilder(); + + if (app != null) { + if (path != null) { + ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, service, null); + checkRateLimitConfigAndAddTo(b, parentList); + ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, null, null); + checkRateLimitConfigAndAddTo(b, parentList); + } else if (service != null) { + ResourceRateLimitConfig.buildResourceIdTo(b, app, null, null, null, null); + checkRateLimitConfigAndAddTo(b, parentList); + } + } + + if (ip != null) { + if (path != null) { + ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, service, null); + checkRateLimitConfigAndAddTo(b, parentList); + ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, null, null); + checkRateLimitConfigAndAddTo(b, parentList); + } else if (service != null) { + ResourceRateLimitConfig.buildResourceIdTo(b, null, ip, null, null, null); + checkRateLimitConfigAndAddTo(b, parentList); + } + } + + if (path != null) { + ResourceRateLimitConfig.buildResourceIdTo(b, null, null, null, service, null); + parentList.add(b.toString()); + b.delete(0, b.length()); + } + + parentList.add(ResourceRateLimitConfig.NODE_RESOURCE); + } + + private void checkRateLimitConfigAndAddTo(StringBuilder resourceStringBuilder, List resourceList) { + String r = resourceStringBuilder.toString(); + ResourceRateLimitConfig c = resourceRateLimitConfigMap.get(r); + if (c != null) { + resourceList.add(r); + } + resourceStringBuilder.delete(0, resourceStringBuilder.length()); + } } diff --git a/fizz-core/src/test/java/we/filter/FlowControlFilterTests.java b/fizz-core/src/test/java/we/filter/FlowControlFilterTests.java index 5886e8f..e85f741 100644 --- a/fizz-core/src/test/java/we/filter/FlowControlFilterTests.java +++ b/fizz-core/src/test/java/we/filter/FlowControlFilterTests.java @@ -52,16 +52,24 @@ public class FlowControlFilterTests { // @Test void flowControlFilterTest() throws NoSuchFieldException, InterruptedException { - FlowControlFilter flowControlFilter = new FlowControlFilter(); - ReflectionUtils.set(flowControlFilter, "flowControl", true); + FlowControlFilter filter = new FlowControlFilter(); + FlowControlFilterProperties flowControlFilterProperties = new FlowControlFilterProperties(); + ReflectionUtils.set(flowControlFilterProperties, "flowControl", true); + ReflectionUtils.set(filter, "flowControlFilterProperties", flowControlFilterProperties); + FlowStat flowStat = new FlowStat(); - ReflectionUtils.set(flowControlFilter, "flowStat", flowStat); + ReflectionUtils.set(filter, "flowStat", flowStat); ResourceRateLimitConfigService resourceRateLimitConfigService = new ResourceRateLimitConfigService(); Map map = resourceRateLimitConfigService.getResourceRateLimitConfigMap(); + ResourceRateLimitConfig config = JacksonUtils.readValue("{\"concurrents\":66,\"enable\":1,\"id\":1,\"isDeleted\":0,\"resource\":\"_global\",\"type\":1}", ResourceRateLimitConfig.class); - map.put(ResourceRateLimitConfig.GLOBAL, config); - ReflectionUtils.set(flowControlFilter, "resourceRateLimitConfigService", resourceRateLimitConfigService); + map.put(ResourceRateLimitConfig.NODE_RESOURCE, config); + + config = JacksonUtils.readValue("{\"concurrents\":33,\"enable\":1,\"id\":2,\"isDeleted\":0, \"service\":\"xservice\", \"path\":\"/ypath\", \"type\":4}", ResourceRateLimitConfig.class); + map.put(config.getResourceId(), config); + + ReflectionUtils.set(filter, "resourceRateLimitConfigService", resourceRateLimitConfigService); WebTestClient client = WebTestClient.bindToWebHandler( new WebHandler() { @@ -74,14 +82,24 @@ public class FlowControlFilterTests { } } ) - .webFilter(flowControlFilter) + .webFilter(filter) .build(); + client.get().uri("/proxy/xservice/ypath").exchange(); Thread.sleep(1000); long currentTimeSlot = flowStat.currentTimeSlotId(); long startTimeSlot = currentTimeSlot - 10 * 1000; - List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats("xservice", startTimeSlot, currentTimeSlot, 10); + + // System.err.println(JacksonUtils.writeValueAsString(flowStat.resourceStats)); + + String xservice = ResourceRateLimitConfig.buildResourceId(null, null, null, "xservice", null); + List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(xservice, startTimeSlot, currentTimeSlot, 10); TimeWindowStat win = resourceTimeWindowStats.get(0).getWindows().get(0); assertEquals(win.getCompReqs(), 1); + + String xserviceYpath = ResourceRateLimitConfig.buildResourceId(null, null, null, "xservice", "/ypath"); + resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(xserviceYpath, startTimeSlot, currentTimeSlot, 10); + win = resourceTimeWindowStats.get(0).getWindows().get(0); + assertEquals(win.getCompReqs(), 1); } } diff --git a/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java b/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java index 8b59782..8c2bce0 100644 --- a/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java +++ b/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java @@ -9,9 +9,12 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import we.redis.RedisProperties; import we.redis.RedisServerConfiguration; import we.redis.RedisTemplateConfiguration; +import we.util.JacksonUtils; import javax.annotation.Resource; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -46,11 +49,17 @@ public class ResourceRateLimitConfigServiceTests { @Test void initTest() throws Throwable { stringRedisTemplate.opsForHash().put("fizz_rate_limit", "2", "{\"concurrents\":66,\"enable\":1,\"id\":2,\"isDeleted\":0,\"resource\":\"service_default\",\"type\":2}"); + stringRedisTemplate.opsForHash().put("fizz_rate_limit", "3", "{\"concurrents\":88,\"enable\":1,\"id\":3,\"isDeleted\":0, \"type\":6, \"app\":\"xapp\", \"service\":\"yservice\" }"); resourceRateLimitConfigService.init(); - ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("service_default"); + ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT_RESOURCE); + // Map resourceRateLimitConfigMap = resourceRateLimitConfigService.getResourceRateLimitConfigMap(); + // System.err.println(JacksonUtils.writeValueAsString(resourceRateLimitConfigMap)); assertEquals(resourceRateLimitConfig.concurrents, 66); // System.err.println(resourceRateLimitConfig); // Thread.currentThread().join(); + resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("xapp@@@yservice@"); + assertEquals(resourceRateLimitConfig.concurrents, 88); + Thread.sleep(4000); // System.err.println("init test end"); } diff --git a/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java b/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java index d8fb086..80aaac2 100644 --- a/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java +++ b/fizz-core/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java @@ -17,4 +17,31 @@ public class ResourceRateLimitConfigTests { ResourceRateLimitConfig resourceRateLimitConfig = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class); assertEquals("application/json; charset=UTF-8", resourceRateLimitConfig.responseType); } + + @Test + void resourceIdTest() { + String resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":1, \"resource\":\"_global\" }"; + ResourceRateLimitConfig c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class); + String resourceId = c.getResourceId(); + assertEquals("@@_global@@", resourceId); + + String node = ResourceRateLimitConfig.getNode(resourceId); + assertEquals("_global", node); + + resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":2, \"resource\":\"service_default\" }"; + c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class); + resourceId = c.getResourceId(); + assertEquals("@@@service_default@", resourceId); + + resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500, \"type\":3, \"resource\":\"xservice\" }"; + c = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class); + resourceId = c.getResourceId(); + assertEquals("@@@xservice@", resourceId); + + resourceId = ResourceRateLimitConfig.buildResourceId(null, null, ResourceRateLimitConfig.NODE, null, null); + assertEquals("@@_global@@", resourceId); + + resourceId = ResourceRateLimitConfig.buildResourceId(null, "192.168.1.1", null, "xservice", null); + assertEquals("@192.168.1.1@@xservice@", resourceId); + } }