diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml
index dedc737..73d8755 100644
--- a/fizz-bootstrap/pom.xml
+++ b/fizz-bootstrap/pom.xml
@@ -20,9 +20,9 @@
Dragonfruit-SR3
Dysprosium-SR25
5.3.7.RELEASE
- 4.1.72.Final
+ 4.1.73.Final
4.4.15
- 2.17.0
+ 2.17.1
1.7.32
3.12.0
1.18.22
diff --git a/fizz-bootstrap/src/main/resources/application.yml b/fizz-bootstrap/src/main/resources/application.yml
index d29cc87..c1efce3 100644
--- a/fizz-bootstrap/src/main/resources/application.yml
+++ b/fizz-bootstrap/src/main/resources/application.yml
@@ -81,7 +81,7 @@ sched:
executors: 2
flowControl: true
flow-stat-sched:
- cron: 2/10 * * * * ?
+ cron: 8/10 * * * * ?
dest: redis
queue: fizz_resource_access_stat
diff --git a/fizz-core/src/main/java/we/config/FlowControlConfig.java b/fizz-core/src/main/java/we/config/FlowControlConfig.java
index 4a087d8..84fd2b0 100644
--- a/fizz-core/src/main/java/we/config/FlowControlConfig.java
+++ b/fizz-core/src/main/java/we/config/FlowControlConfig.java
@@ -21,6 +21,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import we.stats.FlowStat;
+import we.stats.circuitbreaker.CircuitBreakManager;
/**
* @author hongqiaowei
@@ -31,7 +32,7 @@ import we.stats.FlowStat;
public class FlowControlConfig {
@Bean
- public FlowStat flowStat() {
- return new FlowStat();
+ public FlowStat flowStat(CircuitBreakManager circuitBreakManager) {
+ return new FlowStat(circuitBreakManager);
}
}
diff --git a/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java b/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java
index 496a708..c2155fd 100644
--- a/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java
+++ b/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java
@@ -1,158 +1,162 @@
-/*
- * Copyright (C) 2021 the original author or authors.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-package we.config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.Scheduled;
-import we.fizz.ConfigLoader;
-import we.plugin.auth.ApiConfigService;
-import we.plugin.auth.ApiConfig2appsService;
-import we.plugin.auth.AppService;
-import we.plugin.auth.GatewayGroupService;
-import we.proxy.RpcInstanceService;
-import we.stats.degrade.DegradeRuleService;
-import we.stats.ratelimit.ResourceRateLimitConfigService;
-
-import javax.annotation.Resource;
-
-/**
- * refresh config local cache config
- * @see ApiConfigService#refreshLocalCache() refresh api config local cache
- * @see ApiConfig2appsService#refreshLocalCache() refresh api config to apps local cache
- * @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache
- * @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache
- * @see AppService#refreshLocalCache() refresh app local cache
- * @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
- * @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
- * @see DegradeRuleService#refreshLocalCache() refresh degrade rule local cache
- *
- * @author zhongjie
- */
-@Configuration
-public class RefreshLocalCacheConfig {
- private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class);
-
- @Resource
- private RefreshLocalCacheConfigProperties refreshLocalCacheConfigProperties;
-
- @Resource
- private ConfigLoader configLoader;
-
- @Resource
- private ApiConfigService apiConfigService;
-
- @Resource
- private ApiConfig2appsService apiConfig2AppsService;
-
- @Resource
- private GatewayGroupService gatewayGroupService;
-
- @Resource
- private AppService appService;
-
- @Resource
- private ResourceRateLimitConfigService resourceRateLimitConfigService;
-
- @Resource
- private RpcInstanceService rpcInstanceService;
-
- @Resource
- private FizzMangerConfig fizzMangerConfig;
-
- @Resource
- private DegradeRuleService degradeRuleService;
-
- @Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
- fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
- public void refreshLocalCache() {
- if (refreshLocalCacheConfigProperties.isApiConfigCacheRefreshEnabled()) {
- LOGGER.debug("refresh api config local cache");
- try {
- apiConfigService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh api config local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isApiConfig2AppsCacheRefreshEnabled()) {
- LOGGER.debug("refresh api config to apps local cache");
- try {
- apiConfig2AppsService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh api config to apps local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isAggregateConfigCacheRefreshEnabled()) {
- LOGGER.debug("refresh aggregate config local cache");
- try {
- configLoader.refreshLocalCache();
- } catch (Exception e) {
- LOGGER.warn("refresh aggregate config local cache exception", e);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isGatewayGroupCacheRefreshEnabled()) {
- LOGGER.debug("refresh gateway group local cache");
- try {
- gatewayGroupService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh gateway group local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isAppAuthCacheRefreshEnabled()) {
- LOGGER.debug("refresh app auth local cache");
- try {
- appService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh app auth local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isFlowControlRuleCacheRefreshEnabled()) {
- LOGGER.debug("refresh flow control rule local cache");
- try {
- resourceRateLimitConfigService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh flow control rule local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isRpcServiceCacheRefreshEnabled()) {
- LOGGER.debug("refresh rpc service local cache");
- try {
- rpcInstanceService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh rpc service local cache exception", t);
- }
- }
-
- if (refreshLocalCacheConfigProperties.isDegradeRuleCacheRefreshEnabled()) {
- LOGGER.debug("refresh degrade rule local cache");
- try {
- degradeRuleService.refreshLocalCache();
- } catch (Throwable t) {
- LOGGER.warn("refresh degrade rule local cache exception", t);
- }
- }
-
- fizzMangerConfig.updateMangerUrl();
- }
-}
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package we.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.Scheduled;
+import we.fizz.ConfigLoader;
+import we.plugin.auth.ApiConfigService;
+import we.plugin.auth.ApiConfig2appsService;
+import we.plugin.auth.AppService;
+import we.plugin.auth.GatewayGroupService;
+import we.proxy.RpcInstanceService;
+import we.stats.degrade.DegradeRuleService;
+import we.stats.ratelimit.ResourceRateLimitConfigService;
+
+import javax.annotation.Resource;
+
+/**
+ * refresh config local cache config
+ * @see ApiConfigService#refreshLocalCache() refresh api config local cache
+ * @see ApiConfig2appsService#refreshLocalCache() refresh api config to apps local cache
+ * @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache
+ * @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache
+ * @see AppService#refreshLocalCache() refresh app local cache
+ * @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
+ * @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
+ * @see DegradeRuleService#refreshLocalCache() refresh degrade rule local cache
+ *
+ * @author zhongjie
+ */
+@Configuration
+public class RefreshLocalCacheConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class);
+
+ @Resource
+ private RefreshLocalCacheConfigProperties refreshLocalCacheConfigProperties;
+
+ @Resource
+ private ConfigLoader configLoader;
+
+ @Resource
+ private ApiConfigService apiConfigService;
+
+ @Resource
+ private ApiConfig2appsService apiConfig2AppsService;
+
+ @Resource
+ private GatewayGroupService gatewayGroupService;
+
+ @Resource
+ private AppService appService;
+
+ @Resource
+ private ResourceRateLimitConfigService resourceRateLimitConfigService;
+
+ @Resource
+ private RpcInstanceService rpcInstanceService;
+
+ @Resource
+ private FizzMangerConfig fizzMangerConfig;
+
+
+// @Resource
+// private DegradeRuleService degradeRuleService;
+
+
+ @Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
+ fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
+ public void refreshLocalCache() {
+ if (refreshLocalCacheConfigProperties.isApiConfigCacheRefreshEnabled()) {
+ LOGGER.debug("refresh api config local cache");
+ try {
+ apiConfigService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh api config local cache exception", t);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isApiConfig2AppsCacheRefreshEnabled()) {
+ LOGGER.debug("refresh api config to apps local cache");
+ try {
+ apiConfig2AppsService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh api config to apps local cache exception", t);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isAggregateConfigCacheRefreshEnabled()) {
+ LOGGER.debug("refresh aggregate config local cache");
+ try {
+ configLoader.refreshLocalCache();
+ } catch (Exception e) {
+ LOGGER.warn("refresh aggregate config local cache exception", e);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isGatewayGroupCacheRefreshEnabled()) {
+ LOGGER.debug("refresh gateway group local cache");
+ try {
+ gatewayGroupService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh gateway group local cache exception", t);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isAppAuthCacheRefreshEnabled()) {
+ LOGGER.debug("refresh app auth local cache");
+ try {
+ appService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh app auth local cache exception", t);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isFlowControlRuleCacheRefreshEnabled()) {
+ LOGGER.debug("refresh flow control rule local cache");
+ try {
+ resourceRateLimitConfigService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh flow control rule local cache exception", t);
+ }
+ }
+
+ if (refreshLocalCacheConfigProperties.isRpcServiceCacheRefreshEnabled()) {
+ LOGGER.debug("refresh rpc service local cache");
+ try {
+ rpcInstanceService.refreshLocalCache();
+ } catch (Throwable t) {
+ LOGGER.warn("refresh rpc service local cache exception", t);
+ }
+ }
+
+
+// if (refreshLocalCacheConfigProperties.isDegradeRuleCacheRefreshEnabled()) {
+// LOGGER.debug("refresh degrade rule local cache");
+// try {
+// degradeRuleService.refreshLocalCache();
+// } catch (Throwable t) {
+// LOGGER.warn("refresh degrade rule local cache exception", t);
+// }
+// }
+
+
+ fizzMangerConfig.updateMangerUrl();
+ }
+}
diff --git a/fizz-core/src/main/java/we/controller/CacheCheckController.java b/fizz-core/src/main/java/we/controller/CacheCheckController.java
index 766d78b..eb5c798 100644
--- a/fizz-core/src/main/java/we/controller/CacheCheckController.java
+++ b/fizz-core/src/main/java/we/controller/CacheCheckController.java
@@ -27,6 +27,7 @@ import we.plugin.auth.ApiConfig2appsService;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
import we.plugin.auth.GatewayGroupService;
+import we.stats.circuitbreaker.CircuitBreakManager;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.JacksonUtils;
@@ -58,6 +59,9 @@ public class CacheCheckController {
@Resource
private GlobalResourceService globalResourceService;
+ @Resource
+ private CircuitBreakManager circuitBreakManager;
+
@GetMapping("/gatewayGroups")
public Mono gatewayGroups(ServerWebExchange exchange) {
return Mono.just(JacksonUtils.writeValueAsString(gatewayGroupService.gatewayGroupMap));
@@ -92,4 +96,9 @@ public class CacheCheckController {
public Mono globalResources(ServerWebExchange exchange) {
return Mono.just(JacksonUtils.writeValueAsString(globalResourceService.getResourceMap()));
}
+
+ @GetMapping("/circuitBreakers")
+ public Mono circuitBreakers(ServerWebExchange exchange) {
+ return Mono.just(JacksonUtils.writeValueAsString(circuitBreakManager.getCircuitBreakerMap()));
+ }
}
diff --git a/fizz-core/src/main/java/we/filter/FlowControlFilter.java b/fizz-core/src/main/java/we/filter/FlowControlFilter.java
index b388984..1d3e8ee 100644
--- a/fizz-core/src/main/java/we/filter/FlowControlFilter.java
+++ b/fizz-core/src/main/java/we/filter/FlowControlFilter.java
@@ -38,8 +38,11 @@ import we.stats.BlockType;
import we.stats.FlowStat;
import we.stats.IncrRequestResult;
import we.stats.ResourceConfig;
+
+import we.stats.circuitbreaker.CircuitBreakManager;
+import we.stats.circuitbreaker.CircuitBreaker;
import we.stats.degrade.DegradeRule;
-import we.stats.degrade.DegradeRuleService;
+
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.*;
@@ -81,13 +84,19 @@ public class FlowControlFilter extends FizzWebFilter {
private FlowStat flowStat;
@Resource
- private ApiConfigService apiConfigService;
+ private ApiConfigService apiConfigService;
@Resource
- private AppService appService;
+ private AppService appService;
@Resource
- private SystemConfig systemConfig;
+ private SystemConfig systemConfig;
+
+ /*@Resource
+ private DegradeRuleService degradeRuleService;*/
+
+ @Resource
+ private CircuitBreakManager circuitBreakManager;
@Resource
DegradeRuleService degradeRuleService;
@@ -128,18 +137,58 @@ public class FlowControlFilter extends FizzWebFilter {
long currentTimeSlot = flowStat.currentTimeSlotId();
List resourceConfigs = getFlowControlConfigs(app, ip, null, service, path);
- IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot, (rc, rcs) -> {
+ IncrRequestResult result = flowStat.incrRequest(exchange, resourceConfigs, currentTimeSlot, (rc, rcs) -> {
return getResourceConfigItselfAndParents(rc, rcs);
});
if (result != null && !result.isSuccess()) {
String blockedResourceId = result.getBlockedResourceId();
- if (BlockType.DEGRADE == result.getBlockType()) {
- log.info("{} exceed {} degrade limit, trigger degrade", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
+
+ if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
+ log.info("{} exceed {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
+
String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType();
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
+
+ CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(blockedResourceId);
+ if (cb.responseContentType != null) {
+ responseContentType = cb.responseContentType;
+ responseContent = cb.responseContent;
+ } else {
+ cb = circuitBreakManager.getCircuitBreaker(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
+ if (cb.responseContentType != null) {
+ responseContentType = cb.responseContentType;
+ responseContent = cb.responseContent;
+ }
+ }
+
+ ServerHttpResponse resp = exchange.getResponse();
+ resp.setStatusCode(HttpStatus.OK);
+ resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, responseContentType);
+ return resp.writeWith(Mono.just(resp.bufferFactory().wrap(responseContent.getBytes())));
+
+ } else {
+ if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
+ log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
+ } else {
+ log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
+ }
+
+ ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE);
+ String rt = c.responseType, rc = c.responseContent;
+ c = resourceRateLimitConfigService.getResourceRateLimitConfig(blockedResourceId);
+ if (c != null) {
+ if (StringUtils.isNotBlank(c.responseType)) {
+ rt = c.responseType;
+ }
+ if (StringUtils.isNotBlank(c.responseContent)) {
+ rc = c.responseContent;
+ }
+ }
+
+
DegradeRule degradeRule = degradeRuleService.getDegradeRule(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
if (degradeRule != null) {
responseContentType = degradeRule.getResponseContentType();
@@ -179,6 +228,7 @@ public class FlowControlFilter extends FizzWebFilter {
}
}
+
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, rt);
@@ -187,12 +237,21 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
long start = System.currentTimeMillis();
setTraceId(exchange);
+ String finalService = service;
+ String finalPath = path;
return chain.filter(exchange).doFinally(s -> {
long rt = System.currentTimeMillis() - start;
+ CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST);
if (s == SignalType.ON_ERROR || exchange.getResponse().getStatusCode().is5xxServerError()) {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false);
+ if (cb != null) {
+ cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat);
+ }
} else {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true);
+ if (cb != null) {
+ cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat);
+ }
}
});
}
@@ -337,7 +396,9 @@ public class FlowControlFilter extends FizzWebFilter {
}
}
- if (checkDegradeRule) {
+
+ /*if (checkDegradeRule) {
+
DegradeRule degradeRule = degradeRuleService.getDegradeRule(resource);
if (degradeRule != null && degradeRule.isEnable()) {
if (rc == null) {
@@ -353,6 +414,18 @@ public class FlowControlFilter extends FizzWebFilter {
}
}
}
+
+ }*/
+
+ if (checkDegradeRule) {
+ CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
+ if (cb != null) {
+ if (cb.type == CircuitBreaker.Type.SERVICE_DEFAULT && !cb.serviceDefaultEnable) {
+ } else {
+ rc = new ResourceConfig(resource, 0, 0);
+ resourceConfigs.add(rc);
+ }
+
}
}
diff --git a/fizz-core/src/main/java/we/stats/BlockType.java b/fizz-core/src/main/java/we/stats/BlockType.java
index 7b7a861..e83d0bc 100644
--- a/fizz-core/src/main/java/we/stats/BlockType.java
+++ b/fizz-core/src/main/java/we/stats/BlockType.java
@@ -36,5 +36,9 @@ public enum BlockType {
/**
* Blocked by degrade
*/
- DEGRADE
+
+ DEGRADE,
+
+ CIRCUIT_BREAK
+
}
diff --git a/fizz-core/src/main/java/we/stats/FlowStat.java b/fizz-core/src/main/java/we/stats/FlowStat.java
index 6ad7bcb..8b343b2 100644
--- a/fizz-core/src/main/java/we/stats/FlowStat.java
+++ b/fizz-core/src/main/java/we/stats/FlowStat.java
@@ -33,7 +33,13 @@ import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.web.server.ServerWebExchange;
+import we.Fizz;
+import we.stats.circuitbreaker.CircuitBreakManager;
+import we.stats.circuitbreaker.CircuitBreaker;
+import we.util.ResourceIdUtils;
import we.util.Utils;
+import we.util.WebUtils;
/**
* Flow Statistic
@@ -65,10 +71,21 @@ public class FlowStat {
private ExecutorService pool = Executors.newFixedThreadPool(2);
+ private CircuitBreakManager circuitBreakManager;
+
public FlowStat() {
runScheduleJob();
}
+ public FlowStat(CircuitBreakManager circuitBreakManager) {
+ this.circuitBreakManager = circuitBreakManager;
+ runScheduleJob();
+ }
+
+ public void setCircuitBreakManager(CircuitBreakManager circuitBreakManager) {
+ this.circuitBreakManager = circuitBreakManager;
+ }
+
private void runScheduleJob() {
pool.submit(new HousekeepJob(this));
pool.submit(new PeakConcurrentJob(this));
@@ -191,6 +208,81 @@ public class FlowStat {
}
}
+ public IncrRequestResult incrRequest(ServerWebExchange exchange, List resourceConfigs, long curTimeSlotId,
+ BiFunction, List> totalBlockFunc) {
+ if (resourceConfigs == null || resourceConfigs.size() == 0) {
+ return null;
+ }
+ w.lock();
+ try {
+ // check if exceed limit
+ for (ResourceConfig resourceConfig : resourceConfigs) {
+ long maxCon = resourceConfig.getMaxCon();
+ long maxQPS = resourceConfig.getMaxQPS();
+ if (maxCon > 0 || maxQPS > 0) {
+ ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
+ // check concurrent request
+ if (maxCon > 0) {
+ long n = resourceStat.getConcurrentRequests().get();
+ if (n >= maxCon) {
+ resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
+ if (totalBlockFunc != null) {
+ List parentResCfgs = totalBlockFunc.apply(resourceConfig,
+ resourceConfigs);
+ if (parentResCfgs != null && parentResCfgs.size() > 0) {
+ for (ResourceConfig pResCfg : parentResCfgs) {
+ getResourceStat(pResCfg.getResourceId())
+ .incrTotalBlockRequestToTimeSlot(curTimeSlotId);
+ }
+ }
+ }
+ return IncrRequestResult.block(resourceConfig.getResourceId(),
+ BlockType.CONCURRENT_REQUEST);
+ }
+ }
+
+ // check QPS
+ if (maxQPS > 0) {
+ long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
+ if (total >= maxQPS) {
+ resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
+ if (totalBlockFunc != null) {
+ List parentResCfgs = totalBlockFunc.apply(resourceConfig,
+ resourceConfigs);
+ if (parentResCfgs != null && parentResCfgs.size() > 0) {
+ for (ResourceConfig pResCfg : parentResCfgs) {
+ getResourceStat(pResCfg.getResourceId())
+ .incrTotalBlockRequestToTimeSlot(curTimeSlotId);
+ }
+ }
+ }
+ return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
+ }
+ }
+ }
+ }
+
+ String service = WebUtils.getClientService(exchange);
+ String path = WebUtils.getClientReqPath(exchange);
+ String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
+ boolean permit = circuitBreakManager.permit(exchange, curTimeSlotId, this, resource);
+ if (!permit) {
+ return IncrRequestResult.block(resource, BlockType.CIRCUIT_BREAK);
+ }
+
+ // increase request and concurrent request
+ for (ResourceConfig resourceConfig : resourceConfigs) {
+ ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
+ long cons = resourceStat.getConcurrentRequests().incrementAndGet();
+ resourceStat.getTimeSlot(curTimeSlotId).updatePeakConcurrentReqeusts(cons);
+ resourceStat.getTimeSlot(curTimeSlotId).incr();
+ }
+ return IncrRequestResult.success();
+ } finally {
+ w.unlock();
+ }
+ }
+
/**
* Add request RT and Decrease concurrent request for given resources chain
*
@@ -210,6 +302,21 @@ public class FlowStat {
}
}
+ public void addRequestRT(ServerWebExchange exchange, List resourceConfigs, long timeSlotId, long rt, boolean isSuccess) {
+ if (resourceConfigs == null || resourceConfigs.size() == 0) {
+ return;
+ }
+ for (int i = resourceConfigs.size() - 1; i >= 0; i--) {
+ ResourceStat resourceStat = getResourceStat(resourceConfigs.get(i).getResourceId());
+ resourceStat.decrConcurrentRequest(timeSlotId);
+ resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
+ }
+
+ String service = WebUtils.getClientService(exchange);
+ String path = WebUtils.getClientReqPath(exchange);
+ circuitBreakManager.correctCircuitBreakerState4error(exchange, timeSlotId, this, service, path);
+ }
+
/**
* Increase concurrent request counter of the specified resource
*
@@ -485,7 +592,14 @@ public class FlowStat {
String resourceId = entry.getKey();
// log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,
// curTimeSlotId);
- entry.getValue().getTimeSlot(curTimeSlotId);
+ ResourceStat resourceStat = entry.getValue();
+ resourceStat.getTimeSlot(curTimeSlotId);
+
+ String resource = resourceStat.getResourceId();
+ CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
+ if (cb != null) {
+ cb.correctState(curTimeSlotId, stat);
+ }
}
lastTimeSlotId = curTimeSlotId;
// log.debug("PeakConcurrentJob done");
diff --git a/fizz-core/src/main/java/we/stats/ResourceStat.java b/fizz-core/src/main/java/we/stats/ResourceStat.java
index 752ba60..2f2834c 100644
--- a/fizz-core/src/main/java/we/stats/ResourceStat.java
+++ b/fizz-core/src/main/java/we/stats/ResourceStat.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import we.stats.circuitbreaker.CircuitBreaker;
/**
*
@@ -173,6 +174,34 @@ public class ResourceStat {
}
}
+ public void updateCircuitBreakState(long timeSlot, CircuitBreaker.State current, CircuitBreaker.State target) {
+ getTimeSlot(timeSlot).getCircuitBreakState().compareAndSet(current, target);
+ }
+
+ public void incrCircuitBreakNum(long timeSlot) {
+ getTimeSlot(timeSlot).getCircuitBreakNum().incrementAndGet();
+ }
+
+ public void decrCircuitBreakNum(long timeSlot) {
+ getTimeSlot(timeSlot).getCircuitBreakNum().decrementAndGet();
+ }
+
+ public void incrGradualResumeNum(long timeSlot) {
+ getTimeSlot(timeSlot).getGradualResumeNum().incrementAndGet();
+ }
+
+ public void decrGradualResumeNum(long timeSlot) {
+ getTimeSlot(timeSlot).getGradualResumeNum().decrementAndGet();
+ }
+
+ public void incrGradualRejectNum(long timeSlot) {
+ getTimeSlot(timeSlot).getGradualRejectNum().incrementAndGet();
+ }
+
+ public void decrGradualRejectNum(long timeSlot) {
+ getTimeSlot(timeSlot).getGradualRejectNum().decrementAndGet();
+ }
+
/**
* Add request RT to the specified time slot
*
diff --git a/fizz-core/src/main/java/we/stats/TimeSlot.java b/fizz-core/src/main/java/we/stats/TimeSlot.java
index 765eed4..433bff1 100644
--- a/fizz-core/src/main/java/we/stats/TimeSlot.java
+++ b/fizz-core/src/main/java/we/stats/TimeSlot.java
@@ -16,7 +16,11 @@
*/
package we.stats;
+import we.stats.circuitbreaker.CircuitBreaker;
+
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
*
@@ -76,6 +80,38 @@ public class TimeSlot {
*/
private AtomicLong totalBlockRequests = new AtomicLong(0);
+
+ private AtomicReference circuitBreakState = new AtomicReference<>(CircuitBreaker.State.CLOSED);
+
+ private AtomicLong circuitBreakNum = new AtomicLong(0);
+
+ private AtomicLong gradualResumeNum = new AtomicLong(0);
+
+ private AtomicInteger resumeTrafficFactor = new AtomicInteger(1);
+
+ private AtomicLong gradualRejectNum = new AtomicLong(0);
+
+ public AtomicReference getCircuitBreakState() {
+ return circuitBreakState;
+ }
+
+ public AtomicLong getCircuitBreakNum() {
+ return circuitBreakNum;
+ }
+
+ public AtomicLong getGradualResumeNum() {
+ return gradualResumeNum;
+ }
+
+ public AtomicInteger getResumeTrafficFactor() {
+ return resumeTrafficFactor;
+ }
+
+ public AtomicLong getGradualRejectNum() {
+ return gradualRejectNum;
+ }
+
+
public TimeSlot(long id) {
this.id = id;
}
diff --git a/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java b/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java
new file mode 100644
index 0000000..480ad9d
--- /dev/null
+++ b/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats.circuitbreaker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import we.config.AggregateRedisConfig;
+import we.stats.FlowStat;
+import we.util.JacksonUtils;
+import we.util.ResourceIdUtils;
+import we.util.Result;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author hongqiaowei
+ */
+
+@Component
+public class CircuitBreakManager {
+
+ private static final Logger log = LoggerFactory.getLogger(CircuitBreakManager.class);
+
+ private Map parentResourceMap = new HashMap<>(128);
+
+ private Map circuitBreakerMap = new HashMap<>(64);
+
+ @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
+ private ReactiveStringRedisTemplate rt;
+
+ @PostConstruct
+ public void init() {
+ Result> result = initCircuitBreakers();
+ if (result.code == Result.FAIL) {
+ throw new RuntimeException(result.msg, result.t);
+ }
+ result = lsnCircuitBreakerChange();
+ if (result.code == Result.FAIL) {
+ throw new RuntimeException(result.msg, result.t);
+ }
+ log.info("init parentResourceMap: {}", parentResourceMap);
+ // schedule();
+ }
+
+ private Result> initCircuitBreakers() {
+ Result> result = Result.succ();
+ Flux> circuitBreakerConfigs = rt.opsForHash().entries("fizz_degrade_rule");
+ circuitBreakerConfigs.collectList()
+ .defaultIfEmpty(Collections.emptyList())
+ .flatMap(
+ es -> {
+ if (!es.isEmpty()) {
+ String json = null;
+ try {
+ for (Map.Entry