Circuit breaker init
This commit is contained in:
@@ -21,6 +21,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import we.stats.FlowStat;
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -31,7 +32,7 @@ import we.stats.FlowStat;
|
||||
public class FlowControlConfig {
|
||||
|
||||
@Bean
|
||||
public FlowStat flowStat() {
|
||||
return new FlowStat();
|
||||
public FlowStat flowStat(CircuitBreakManager circuitBreakManager) {
|
||||
return new FlowStat(circuitBreakManager);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,158 +1,162 @@
|
||||
/*
|
||||
* Copyright (C) 2021 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package we.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import we.fizz.ConfigLoader;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.ApiConfig2appsService;
|
||||
import we.plugin.auth.AppService;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.proxy.RpcInstanceService;
|
||||
import we.stats.degrade.DegradeRuleService;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* refresh config local cache config
|
||||
* @see ApiConfigService#refreshLocalCache() refresh api config local cache
|
||||
* @see ApiConfig2appsService#refreshLocalCache() refresh api config to apps local cache
|
||||
* @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache
|
||||
* @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache
|
||||
* @see AppService#refreshLocalCache() refresh app local cache
|
||||
* @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
|
||||
* @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
|
||||
* @see DegradeRuleService#refreshLocalCache() refresh degrade rule local cache
|
||||
*
|
||||
* @author zhongjie
|
||||
*/
|
||||
@Configuration
|
||||
public class RefreshLocalCacheConfig {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class);
|
||||
|
||||
@Resource
|
||||
private RefreshLocalCacheConfigProperties refreshLocalCacheConfigProperties;
|
||||
|
||||
@Resource
|
||||
private ConfigLoader configLoader;
|
||||
|
||||
@Resource
|
||||
private ApiConfigService apiConfigService;
|
||||
|
||||
@Resource
|
||||
private ApiConfig2appsService apiConfig2AppsService;
|
||||
|
||||
@Resource
|
||||
private GatewayGroupService gatewayGroupService;
|
||||
|
||||
@Resource
|
||||
private AppService appService;
|
||||
|
||||
@Resource
|
||||
private ResourceRateLimitConfigService resourceRateLimitConfigService;
|
||||
|
||||
@Resource
|
||||
private RpcInstanceService rpcInstanceService;
|
||||
|
||||
@Resource
|
||||
private FizzMangerConfig fizzMangerConfig;
|
||||
|
||||
@Resource
|
||||
private DegradeRuleService degradeRuleService;
|
||||
|
||||
@Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
|
||||
fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
|
||||
public void refreshLocalCache() {
|
||||
if (refreshLocalCacheConfigProperties.isApiConfigCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh api config local cache");
|
||||
try {
|
||||
apiConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isApiConfig2AppsCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh api config to apps local cache");
|
||||
try {
|
||||
apiConfig2AppsService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config to apps local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isAggregateConfigCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh aggregate config local cache");
|
||||
try {
|
||||
configLoader.refreshLocalCache();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("refresh aggregate config local cache exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isGatewayGroupCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh gateway group local cache");
|
||||
try {
|
||||
gatewayGroupService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh gateway group local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isAppAuthCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh app auth local cache");
|
||||
try {
|
||||
appService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh app auth local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isFlowControlRuleCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh flow control rule local cache");
|
||||
try {
|
||||
resourceRateLimitConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh flow control rule local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isRpcServiceCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh rpc service local cache");
|
||||
try {
|
||||
rpcInstanceService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh rpc service local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isDegradeRuleCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh degrade rule local cache");
|
||||
try {
|
||||
degradeRuleService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh degrade rule local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
fizzMangerConfig.updateMangerUrl();
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2021 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package we.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import we.fizz.ConfigLoader;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.ApiConfig2appsService;
|
||||
import we.plugin.auth.AppService;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.proxy.RpcInstanceService;
|
||||
import we.stats.degrade.DegradeRuleService;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* refresh config local cache config
|
||||
* @see ApiConfigService#refreshLocalCache() refresh api config local cache
|
||||
* @see ApiConfig2appsService#refreshLocalCache() refresh api config to apps local cache
|
||||
* @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache
|
||||
* @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache
|
||||
* @see AppService#refreshLocalCache() refresh app local cache
|
||||
* @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
|
||||
* @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
|
||||
* @see DegradeRuleService#refreshLocalCache() refresh degrade rule local cache
|
||||
*
|
||||
* @author zhongjie
|
||||
*/
|
||||
@Configuration
|
||||
public class RefreshLocalCacheConfig {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class);
|
||||
|
||||
@Resource
|
||||
private RefreshLocalCacheConfigProperties refreshLocalCacheConfigProperties;
|
||||
|
||||
@Resource
|
||||
private ConfigLoader configLoader;
|
||||
|
||||
@Resource
|
||||
private ApiConfigService apiConfigService;
|
||||
|
||||
@Resource
|
||||
private ApiConfig2appsService apiConfig2AppsService;
|
||||
|
||||
@Resource
|
||||
private GatewayGroupService gatewayGroupService;
|
||||
|
||||
@Resource
|
||||
private AppService appService;
|
||||
|
||||
@Resource
|
||||
private ResourceRateLimitConfigService resourceRateLimitConfigService;
|
||||
|
||||
@Resource
|
||||
private RpcInstanceService rpcInstanceService;
|
||||
|
||||
@Resource
|
||||
private FizzMangerConfig fizzMangerConfig;
|
||||
|
||||
|
||||
// @Resource
|
||||
// private DegradeRuleService degradeRuleService;
|
||||
|
||||
|
||||
@Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
|
||||
fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
|
||||
public void refreshLocalCache() {
|
||||
if (refreshLocalCacheConfigProperties.isApiConfigCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh api config local cache");
|
||||
try {
|
||||
apiConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isApiConfig2AppsCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh api config to apps local cache");
|
||||
try {
|
||||
apiConfig2AppsService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config to apps local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isAggregateConfigCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh aggregate config local cache");
|
||||
try {
|
||||
configLoader.refreshLocalCache();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("refresh aggregate config local cache exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isGatewayGroupCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh gateway group local cache");
|
||||
try {
|
||||
gatewayGroupService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh gateway group local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isAppAuthCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh app auth local cache");
|
||||
try {
|
||||
appService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh app auth local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isFlowControlRuleCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh flow control rule local cache");
|
||||
try {
|
||||
resourceRateLimitConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh flow control rule local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (refreshLocalCacheConfigProperties.isRpcServiceCacheRefreshEnabled()) {
|
||||
LOGGER.debug("refresh rpc service local cache");
|
||||
try {
|
||||
rpcInstanceService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh rpc service local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// if (refreshLocalCacheConfigProperties.isDegradeRuleCacheRefreshEnabled()) {
|
||||
// LOGGER.debug("refresh degrade rule local cache");
|
||||
// try {
|
||||
// degradeRuleService.refreshLocalCache();
|
||||
// } catch (Throwable t) {
|
||||
// LOGGER.warn("refresh degrade rule local cache exception", t);
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
fizzMangerConfig.updateMangerUrl();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import we.plugin.auth.ApiConfig2appsService;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.AppService;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
import we.util.JacksonUtils;
|
||||
|
||||
@@ -58,6 +59,9 @@ public class CacheCheckController {
|
||||
@Resource
|
||||
private GlobalResourceService globalResourceService;
|
||||
|
||||
@Resource
|
||||
private CircuitBreakManager circuitBreakManager;
|
||||
|
||||
@GetMapping("/gatewayGroups")
|
||||
public Mono<String> gatewayGroups(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(gatewayGroupService.gatewayGroupMap));
|
||||
@@ -92,4 +96,9 @@ public class CacheCheckController {
|
||||
public Mono<String> globalResources(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(globalResourceService.getResourceMap()));
|
||||
}
|
||||
|
||||
@GetMapping("/circuitBreakers")
|
||||
public Mono<String> circuitBreakers(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(circuitBreakManager.getCircuitBreakerMap()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,8 +38,11 @@ import we.stats.BlockType;
|
||||
import we.stats.FlowStat;
|
||||
import we.stats.IncrRequestResult;
|
||||
import we.stats.ResourceConfig;
|
||||
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
import we.stats.circuitbreaker.CircuitBreaker;
|
||||
import we.stats.degrade.DegradeRule;
|
||||
import we.stats.degrade.DegradeRuleService;
|
||||
|
||||
import we.stats.ratelimit.ResourceRateLimitConfig;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
import we.util.*;
|
||||
@@ -81,13 +84,19 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
private FlowStat flowStat;
|
||||
|
||||
@Resource
|
||||
private ApiConfigService apiConfigService;
|
||||
private ApiConfigService apiConfigService;
|
||||
|
||||
@Resource
|
||||
private AppService appService;
|
||||
private AppService appService;
|
||||
|
||||
@Resource
|
||||
private SystemConfig systemConfig;
|
||||
private SystemConfig systemConfig;
|
||||
|
||||
/*@Resource
|
||||
private DegradeRuleService degradeRuleService;*/
|
||||
|
||||
@Resource
|
||||
private CircuitBreakManager circuitBreakManager;
|
||||
|
||||
@Resource
|
||||
DegradeRuleService degradeRuleService;
|
||||
@@ -128,18 +137,58 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
|
||||
long currentTimeSlot = flowStat.currentTimeSlotId();
|
||||
List<ResourceConfig> resourceConfigs = getFlowControlConfigs(app, ip, null, service, path);
|
||||
IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot, (rc, rcs) -> {
|
||||
IncrRequestResult result = flowStat.incrRequest(exchange, resourceConfigs, currentTimeSlot, (rc, rcs) -> {
|
||||
return getResourceConfigItselfAndParents(rc, rcs);
|
||||
});
|
||||
|
||||
if (result != null && !result.isSuccess()) {
|
||||
String blockedResourceId = result.getBlockedResourceId();
|
||||
if (BlockType.DEGRADE == result.getBlockType()) {
|
||||
log.info("{} exceed {} degrade limit, trigger degrade", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
|
||||
|
||||
if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
|
||||
log.info("{} exceed {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
|
||||
|
||||
|
||||
String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType();
|
||||
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
|
||||
|
||||
|
||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(blockedResourceId);
|
||||
if (cb.responseContentType != null) {
|
||||
responseContentType = cb.responseContentType;
|
||||
responseContent = cb.responseContent;
|
||||
} else {
|
||||
cb = circuitBreakManager.getCircuitBreaker(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
if (cb.responseContentType != null) {
|
||||
responseContentType = cb.responseContentType;
|
||||
responseContent = cb.responseContent;
|
||||
}
|
||||
}
|
||||
|
||||
ServerHttpResponse resp = exchange.getResponse();
|
||||
resp.setStatusCode(HttpStatus.OK);
|
||||
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, responseContentType);
|
||||
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(responseContent.getBytes())));
|
||||
|
||||
} else {
|
||||
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
|
||||
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
|
||||
} else {
|
||||
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
|
||||
}
|
||||
|
||||
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE);
|
||||
String rt = c.responseType, rc = c.responseContent;
|
||||
c = resourceRateLimitConfigService.getResourceRateLimitConfig(blockedResourceId);
|
||||
if (c != null) {
|
||||
if (StringUtils.isNotBlank(c.responseType)) {
|
||||
rt = c.responseType;
|
||||
}
|
||||
if (StringUtils.isNotBlank(c.responseContent)) {
|
||||
rc = c.responseContent;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
DegradeRule degradeRule = degradeRuleService.getDegradeRule(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
if (degradeRule != null) {
|
||||
responseContentType = degradeRule.getResponseContentType();
|
||||
@@ -179,6 +228,7 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ServerHttpResponse resp = exchange.getResponse();
|
||||
resp.setStatusCode(HttpStatus.OK);
|
||||
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, rt);
|
||||
@@ -187,12 +237,21 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
} else {
|
||||
long start = System.currentTimeMillis();
|
||||
setTraceId(exchange);
|
||||
String finalService = service;
|
||||
String finalPath = path;
|
||||
return chain.filter(exchange).doFinally(s -> {
|
||||
long rt = System.currentTimeMillis() - start;
|
||||
CircuitBreaker cb = exchange.getAttribute(CircuitBreaker.DETECT_REQUEST);
|
||||
if (s == SignalType.ON_ERROR || exchange.getResponse().getStatusCode().is5xxServerError()) {
|
||||
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false);
|
||||
if (cb != null) {
|
||||
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat);
|
||||
}
|
||||
} else {
|
||||
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true);
|
||||
if (cb != null) {
|
||||
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeSlot, flowStat);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -337,7 +396,9 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
}
|
||||
}
|
||||
|
||||
if (checkDegradeRule) {
|
||||
|
||||
/*if (checkDegradeRule) {
|
||||
|
||||
DegradeRule degradeRule = degradeRuleService.getDegradeRule(resource);
|
||||
if (degradeRule != null && degradeRule.isEnable()) {
|
||||
if (rc == null) {
|
||||
@@ -353,6 +414,18 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}*/
|
||||
|
||||
if (checkDegradeRule) {
|
||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
|
||||
if (cb != null) {
|
||||
if (cb.type == CircuitBreaker.Type.SERVICE_DEFAULT && !cb.serviceDefaultEnable) {
|
||||
} else {
|
||||
rc = new ResourceConfig(resource, 0, 0);
|
||||
resourceConfigs.add(rc);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,5 +36,9 @@ public enum BlockType {
|
||||
/**
|
||||
* Blocked by degrade
|
||||
*/
|
||||
DEGRADE
|
||||
|
||||
DEGRADE,
|
||||
|
||||
CIRCUIT_BREAK
|
||||
|
||||
}
|
||||
|
||||
@@ -33,7 +33,13 @@ import java.util.function.BiFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import we.Fizz;
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
import we.stats.circuitbreaker.CircuitBreaker;
|
||||
import we.util.ResourceIdUtils;
|
||||
import we.util.Utils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
/**
|
||||
* Flow Statistic
|
||||
@@ -65,10 +71,21 @@ public class FlowStat {
|
||||
|
||||
private ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
|
||||
private CircuitBreakManager circuitBreakManager;
|
||||
|
||||
public FlowStat() {
|
||||
runScheduleJob();
|
||||
}
|
||||
|
||||
public FlowStat(CircuitBreakManager circuitBreakManager) {
|
||||
this.circuitBreakManager = circuitBreakManager;
|
||||
runScheduleJob();
|
||||
}
|
||||
|
||||
public void setCircuitBreakManager(CircuitBreakManager circuitBreakManager) {
|
||||
this.circuitBreakManager = circuitBreakManager;
|
||||
}
|
||||
|
||||
private void runScheduleJob() {
|
||||
pool.submit(new HousekeepJob(this));
|
||||
pool.submit(new PeakConcurrentJob(this));
|
||||
@@ -191,6 +208,81 @@ public class FlowStat {
|
||||
}
|
||||
}
|
||||
|
||||
public IncrRequestResult incrRequest(ServerWebExchange exchange, List<ResourceConfig> resourceConfigs, long curTimeSlotId,
|
||||
BiFunction<ResourceConfig, List<ResourceConfig>, List<ResourceConfig>> totalBlockFunc) {
|
||||
if (resourceConfigs == null || resourceConfigs.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
w.lock();
|
||||
try {
|
||||
// check if exceed limit
|
||||
for (ResourceConfig resourceConfig : resourceConfigs) {
|
||||
long maxCon = resourceConfig.getMaxCon();
|
||||
long maxQPS = resourceConfig.getMaxQPS();
|
||||
if (maxCon > 0 || maxQPS > 0) {
|
||||
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
|
||||
// check concurrent request
|
||||
if (maxCon > 0) {
|
||||
long n = resourceStat.getConcurrentRequests().get();
|
||||
if (n >= maxCon) {
|
||||
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
|
||||
if (totalBlockFunc != null) {
|
||||
List<ResourceConfig> parentResCfgs = totalBlockFunc.apply(resourceConfig,
|
||||
resourceConfigs);
|
||||
if (parentResCfgs != null && parentResCfgs.size() > 0) {
|
||||
for (ResourceConfig pResCfg : parentResCfgs) {
|
||||
getResourceStat(pResCfg.getResourceId())
|
||||
.incrTotalBlockRequestToTimeSlot(curTimeSlotId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return IncrRequestResult.block(resourceConfig.getResourceId(),
|
||||
BlockType.CONCURRENT_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
// check QPS
|
||||
if (maxQPS > 0) {
|
||||
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
|
||||
if (total >= maxQPS) {
|
||||
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
|
||||
if (totalBlockFunc != null) {
|
||||
List<ResourceConfig> parentResCfgs = totalBlockFunc.apply(resourceConfig,
|
||||
resourceConfigs);
|
||||
if (parentResCfgs != null && parentResCfgs.size() > 0) {
|
||||
for (ResourceConfig pResCfg : parentResCfgs) {
|
||||
getResourceStat(pResCfg.getResourceId())
|
||||
.incrTotalBlockRequestToTimeSlot(curTimeSlotId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String service = WebUtils.getClientService(exchange);
|
||||
String path = WebUtils.getClientReqPath(exchange);
|
||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
boolean permit = circuitBreakManager.permit(exchange, curTimeSlotId, this, resource);
|
||||
if (!permit) {
|
||||
return IncrRequestResult.block(resource, BlockType.CIRCUIT_BREAK);
|
||||
}
|
||||
|
||||
// increase request and concurrent request
|
||||
for (ResourceConfig resourceConfig : resourceConfigs) {
|
||||
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
|
||||
long cons = resourceStat.getConcurrentRequests().incrementAndGet();
|
||||
resourceStat.getTimeSlot(curTimeSlotId).updatePeakConcurrentReqeusts(cons);
|
||||
resourceStat.getTimeSlot(curTimeSlotId).incr();
|
||||
}
|
||||
return IncrRequestResult.success();
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add request RT and Decrease concurrent request for given resources chain
|
||||
*
|
||||
@@ -210,6 +302,21 @@ public class FlowStat {
|
||||
}
|
||||
}
|
||||
|
||||
public void addRequestRT(ServerWebExchange exchange, List<ResourceConfig> resourceConfigs, long timeSlotId, long rt, boolean isSuccess) {
|
||||
if (resourceConfigs == null || resourceConfigs.size() == 0) {
|
||||
return;
|
||||
}
|
||||
for (int i = resourceConfigs.size() - 1; i >= 0; i--) {
|
||||
ResourceStat resourceStat = getResourceStat(resourceConfigs.get(i).getResourceId());
|
||||
resourceStat.decrConcurrentRequest(timeSlotId);
|
||||
resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
|
||||
}
|
||||
|
||||
String service = WebUtils.getClientService(exchange);
|
||||
String path = WebUtils.getClientReqPath(exchange);
|
||||
circuitBreakManager.correctCircuitBreakerState4error(exchange, timeSlotId, this, service, path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase concurrent request counter of the specified resource
|
||||
*
|
||||
@@ -485,7 +592,14 @@ public class FlowStat {
|
||||
String resourceId = entry.getKey();
|
||||
// log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,
|
||||
// curTimeSlotId);
|
||||
entry.getValue().getTimeSlot(curTimeSlotId);
|
||||
ResourceStat resourceStat = entry.getValue();
|
||||
resourceStat.getTimeSlot(curTimeSlotId);
|
||||
|
||||
String resource = resourceStat.getResourceId();
|
||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
|
||||
if (cb != null) {
|
||||
cb.correctState(curTimeSlotId, stat);
|
||||
}
|
||||
}
|
||||
lastTimeSlotId = curTimeSlotId;
|
||||
// log.debug("PeakConcurrentJob done");
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import we.stats.circuitbreaker.CircuitBreaker;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -173,6 +174,34 @@ public class ResourceStat {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateCircuitBreakState(long timeSlot, CircuitBreaker.State current, CircuitBreaker.State target) {
|
||||
getTimeSlot(timeSlot).getCircuitBreakState().compareAndSet(current, target);
|
||||
}
|
||||
|
||||
public void incrCircuitBreakNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getCircuitBreakNum().incrementAndGet();
|
||||
}
|
||||
|
||||
public void decrCircuitBreakNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getCircuitBreakNum().decrementAndGet();
|
||||
}
|
||||
|
||||
public void incrGradualResumeNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getGradualResumeNum().incrementAndGet();
|
||||
}
|
||||
|
||||
public void decrGradualResumeNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getGradualResumeNum().decrementAndGet();
|
||||
}
|
||||
|
||||
public void incrGradualRejectNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getGradualRejectNum().incrementAndGet();
|
||||
}
|
||||
|
||||
public void decrGradualRejectNum(long timeSlot) {
|
||||
getTimeSlot(timeSlot).getGradualRejectNum().decrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add request RT to the specified time slot
|
||||
*
|
||||
|
||||
@@ -16,7 +16,11 @@
|
||||
*/
|
||||
package we.stats;
|
||||
|
||||
import we.stats.circuitbreaker.CircuitBreaker;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -76,6 +80,38 @@ public class TimeSlot {
|
||||
*/
|
||||
private AtomicLong totalBlockRequests = new AtomicLong(0);
|
||||
|
||||
|
||||
private AtomicReference<CircuitBreaker.State> circuitBreakState = new AtomicReference<>(CircuitBreaker.State.CLOSED);
|
||||
|
||||
private AtomicLong circuitBreakNum = new AtomicLong(0);
|
||||
|
||||
private AtomicLong gradualResumeNum = new AtomicLong(0);
|
||||
|
||||
private AtomicInteger resumeTrafficFactor = new AtomicInteger(1);
|
||||
|
||||
private AtomicLong gradualRejectNum = new AtomicLong(0);
|
||||
|
||||
public AtomicReference<CircuitBreaker.State> getCircuitBreakState() {
|
||||
return circuitBreakState;
|
||||
}
|
||||
|
||||
public AtomicLong getCircuitBreakNum() {
|
||||
return circuitBreakNum;
|
||||
}
|
||||
|
||||
public AtomicLong getGradualResumeNum() {
|
||||
return gradualResumeNum;
|
||||
}
|
||||
|
||||
public AtomicInteger getResumeTrafficFactor() {
|
||||
return resumeTrafficFactor;
|
||||
}
|
||||
|
||||
public AtomicLong getGradualRejectNum() {
|
||||
return gradualRejectNum;
|
||||
}
|
||||
|
||||
|
||||
public TimeSlot(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,227 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.stats.circuitbreaker;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.config.AggregateRedisConfig;
|
||||
import we.stats.FlowStat;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ResourceIdUtils;
|
||||
import we.util.Result;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class CircuitBreakManager {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CircuitBreakManager.class);
|
||||
|
||||
private Map<String/*child resource*/, String/*parent resource*/> parentResourceMap = new HashMap<>(128);
|
||||
|
||||
private Map<String/*resource*/, CircuitBreaker> circuitBreakerMap = new HashMap<>(64);
|
||||
|
||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||
private ReactiveStringRedisTemplate rt;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Result<?> result = initCircuitBreakers();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
result = lsnCircuitBreakerChange();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
log.info("init parentResourceMap: {}", parentResourceMap);
|
||||
// schedule();
|
||||
}
|
||||
|
||||
private Result<?> initCircuitBreakers() {
|
||||
Result<?> result = Result.succ();
|
||||
Flux<Map.Entry<Object, Object>> circuitBreakerConfigs = rt.opsForHash().entries("fizz_degrade_rule");
|
||||
circuitBreakerConfigs.collectList()
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(
|
||||
es -> {
|
||||
if (!es.isEmpty()) {
|
||||
String json = null;
|
||||
try {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
json = (String) e.getValue();
|
||||
CircuitBreaker cb = JacksonUtils.readValue(json, CircuitBreaker.class);
|
||||
circuitBreakerMap.put(cb.resource, cb);
|
||||
updateParentResourceMap(cb);
|
||||
log.info("init circuit breaker {}", cb);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init circuit breaker error, json: " + json;
|
||||
result.t = t;
|
||||
}
|
||||
} else {
|
||||
log.info("no circuit breaker config");
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
)
|
||||
.onErrorReturn(
|
||||
throwable -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init circuit breaker error";
|
||||
result.t = throwable;
|
||||
return true;
|
||||
},
|
||||
result
|
||||
)
|
||||
.block();
|
||||
return result;
|
||||
}
|
||||
|
||||
private Result<?> lsnCircuitBreakerChange() {
|
||||
Result<?> result = Result.succ();
|
||||
String channel = "fizz_degrade_rule_channel";
|
||||
rt.listenToChannel(channel)
|
||||
.doOnError(
|
||||
t -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "lsn error, channel: " + channel;
|
||||
result.t = t;
|
||||
log.error("lsn channel {} error", channel, t);
|
||||
}
|
||||
)
|
||||
.doOnSubscribe(
|
||||
s -> {
|
||||
log.info("success to lsn on {}", channel);
|
||||
}
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
CircuitBreaker cb = JacksonUtils.readValue(message, CircuitBreaker.class);
|
||||
if (cb.isDeleted) {
|
||||
circuitBreakerMap.remove(cb.resource);
|
||||
log.info("remove circuit breaker: {}", cb);
|
||||
} else {
|
||||
circuitBreakerMap.put(cb.resource, cb);
|
||||
log.info("update circuit breaker: {}", cb);
|
||||
}
|
||||
updateParentResourceMap(cb);
|
||||
log.info("update parentResourceMap: {}", parentResourceMap);
|
||||
} catch (Throwable t) {
|
||||
log.error("update circuit breaker error, {}", message, t);
|
||||
}
|
||||
}
|
||||
)
|
||||
.subscribe();
|
||||
return result;
|
||||
}
|
||||
|
||||
private void updateParentResourceMap(CircuitBreaker cb) {
|
||||
String parentResource = null;
|
||||
if (cb.isDeleted) {
|
||||
if (cb.type == CircuitBreaker.Type.PATH) {
|
||||
parentResourceMap.remove(cb.resource);
|
||||
parentResource = ResourceIdUtils.buildResourceId(null, null, null, cb.service, null);
|
||||
parentResourceMap.remove(parentResource);
|
||||
} else if (cb.type == CircuitBreaker.Type.SERVICE) {
|
||||
parentResourceMap.remove(cb.resource);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cb.type == CircuitBreaker.Type.PATH) {
|
||||
parentResource = ResourceIdUtils.buildResourceId(null, null, null, cb.service, null);
|
||||
parentResourceMap.put(cb.resource, parentResource);
|
||||
}
|
||||
if (cb.type != CircuitBreaker.Type.SERVICE_DEFAULT) {
|
||||
if (parentResource == null) {
|
||||
parentResourceMap.put(cb.resource, ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
} else {
|
||||
parentResourceMap.put(parentResource, ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
return permit(exchange, currentTimeWindow, flowStat, resource);
|
||||
}
|
||||
|
||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
||||
while (true) {
|
||||
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||
if (cb != null) {
|
||||
if (cb.type != CircuitBreaker.Type.SERVICE_DEFAULT || cb.serviceDefaultEnable) {
|
||||
return cb.permit(exchange, currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
resource = parentResourceMap.get(resource);
|
||||
if (resource == null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void correctCircuitBreakerState4error(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
correctCircuitBreakerState4error(exchange, currentTimeWindow, flowStat, resource);
|
||||
}
|
||||
|
||||
public void correctCircuitBreakerState4error(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
||||
while (true) {
|
||||
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||
if (cb != null) {
|
||||
if (cb.type != CircuitBreaker.Type.SERVICE_DEFAULT || cb.serviceDefaultEnable) {
|
||||
cb.correctCircuitBreakerStateAsError(currentTimeWindow, flowStat);
|
||||
return;
|
||||
}
|
||||
}
|
||||
resource = parentResourceMap.get(resource);
|
||||
if (resource == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public CircuitBreaker getCircuitBreaker(String resource) {
|
||||
return circuitBreakerMap.get(resource);
|
||||
}
|
||||
|
||||
public Map<String, CircuitBreaker> getCircuitBreakerMap() {
|
||||
return circuitBreakerMap;
|
||||
}
|
||||
|
||||
public Map<String, String> getParentResourceMap() {
|
||||
return parentResourceMap;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,422 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.stats.circuitbreaker;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import we.stats.FlowStat;
|
||||
import we.stats.ResourceStat;
|
||||
import we.stats.TimeSlot;
|
||||
import we.stats.TimeWindowStat;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ResourceIdUtils;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* inaccuracy is acceptable
|
||||
*
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class CircuitBreaker {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CircuitBreaker.class);
|
||||
|
||||
public enum Type {
|
||||
SERVICE_DEFAULT, SERVICE, PATH
|
||||
}
|
||||
|
||||
// not use strategy pattern
|
||||
public enum BreakStrategy {
|
||||
TOTAL_ERRORS, ERRORS_RATIO
|
||||
}
|
||||
|
||||
public enum ResumeStrategy {
|
||||
IMMEDIATE, GRADUAL, DETECTIVE
|
||||
}
|
||||
|
||||
public enum State {
|
||||
CLOSED/* and monitoring*/, OPEN, RESUME_GRADUALLY, RESUME_DETECTIVE
|
||||
}
|
||||
|
||||
private static class GradualResumeTimeWindowContext {
|
||||
private final int resumeTraffic;
|
||||
private final int rejectTraffic;
|
||||
|
||||
public GradualResumeTimeWindowContext(int resumeTraffic) {
|
||||
this.resumeTraffic = resumeTraffic;
|
||||
rejectTraffic = 100 - this.resumeTraffic;
|
||||
}
|
||||
|
||||
public boolean permit(ResourceStat resourceStat, long currentTimeWindow) {
|
||||
|
||||
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||
AtomicLong resumeCount = timeSlot.getGradualResumeNum();
|
||||
AtomicInteger resumeTrafficFactor = timeSlot.getResumeTrafficFactor();
|
||||
long n = resumeTrafficFactor.get();
|
||||
if (resumeCount.incrementAndGet() <= resumeTraffic * n) {
|
||||
LOGGER.debug("{} current time window {}, resume traffic {}, resume traffic factor {}, resume count {}, resume current request",
|
||||
resourceStat.getResourceId(), currentTimeWindow, resumeTraffic, n, resumeCount.get());
|
||||
return true;
|
||||
}
|
||||
AtomicLong rejectCount = timeSlot.getGradualRejectNum();
|
||||
if (rejectCount.incrementAndGet() <= rejectTraffic * n) {
|
||||
resumeCount.decrementAndGet();
|
||||
LOGGER.debug("{} current time window {}, reject traffic {}, resume traffic factor {}, reject count {}, reject current request",
|
||||
resourceStat.getResourceId(), currentTimeWindow, rejectTraffic, n, rejectCount.get());
|
||||
return false;
|
||||
}
|
||||
rejectCount.decrementAndGet();
|
||||
resumeTrafficFactor.incrementAndGet();
|
||||
LOGGER.debug("{} current time window {}, resume traffic {}, reject traffic {}, resume traffic factor {}, resume count {}, reject count {}, resume current request",
|
||||
resourceStat.getResourceId(), currentTimeWindow, resumeTraffic, rejectTraffic, n, resumeCount.get(), rejectCount.get());
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "GRTWC{resumeTraffic=" + resumeTraffic + ",rejectTraffic=" + rejectTraffic + '}';
|
||||
}
|
||||
}
|
||||
|
||||
public static final String DETECT_REQUEST = "detectReq@";
|
||||
|
||||
|
||||
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
|
||||
public boolean isDeleted = false;
|
||||
|
||||
public Type type;
|
||||
|
||||
public boolean serviceDefaultEnable = false;
|
||||
|
||||
public long id;
|
||||
|
||||
public String service;
|
||||
|
||||
public String path;
|
||||
|
||||
public String resource;
|
||||
|
||||
|
||||
public BreakStrategy breakStrategy;
|
||||
|
||||
public float errorRatioThreshold;
|
||||
|
||||
public int totalErrorThreshold;
|
||||
|
||||
public int monitorDuration;
|
||||
|
||||
public int minRequests;
|
||||
|
||||
public int breakDuration;
|
||||
|
||||
|
||||
public ResumeStrategy resumeStrategy;
|
||||
|
||||
public int resumeDuration;
|
||||
|
||||
private List<GradualResumeTimeWindowContext> gradualResumeTimeWindowContexts;
|
||||
|
||||
public int initialResumeTraffic;
|
||||
|
||||
|
||||
public String responseContentType;
|
||||
|
||||
public String responseContent;
|
||||
|
||||
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
|
||||
|
||||
public long stateStartTime;
|
||||
|
||||
public CircuitBreaker() {
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public CircuitBreaker(
|
||||
@JsonProperty("isDeleted") int isDeleted,
|
||||
@JsonProperty("type") int type,
|
||||
@JsonProperty("enable") int enable,
|
||||
@JsonProperty("id") long id,
|
||||
@JsonProperty("service") String service,
|
||||
@JsonProperty("path") String path,
|
||||
@JsonProperty("strategy") int strategy,
|
||||
@JsonProperty("ratioThreshold") float ratioThreshold,
|
||||
@JsonProperty("exceptionCount") int exceptionCount,
|
||||
@JsonProperty("minRequestCount") int minRequestCount,
|
||||
@JsonProperty("timeWindow") int timeWindow,
|
||||
@JsonProperty("statInterval") int statInterval,
|
||||
@JsonProperty("recoveryStrategy") int recoveryStrategy,
|
||||
@JsonProperty("recoveryTimeWindow") int recoveryTimeWindow,
|
||||
@JsonProperty("responseContentType") String responseContentType,
|
||||
@JsonProperty("responseContent") String responseContent) {
|
||||
|
||||
if (isDeleted == 1) {
|
||||
this.isDeleted = true;
|
||||
}
|
||||
|
||||
if (type == 1) {
|
||||
this.type = Type.SERVICE_DEFAULT;
|
||||
this.service = ResourceIdUtils.SERVICE_DEFAULT;
|
||||
if (enable == 1) {
|
||||
this.serviceDefaultEnable = true;
|
||||
}
|
||||
} else if (type == 2) {
|
||||
this.type = Type.SERVICE;
|
||||
} else {
|
||||
this.type = Type.PATH;
|
||||
}
|
||||
|
||||
this.id = id;
|
||||
if (this.type != Type.SERVICE_DEFAULT) {
|
||||
this.service = service;
|
||||
}
|
||||
if (StringUtils.isNotBlank(path)) {
|
||||
this.path = path;
|
||||
}
|
||||
resource = ResourceIdUtils.buildResourceId(null, null, null, this.service, this.path);
|
||||
|
||||
if (strategy == 1) {
|
||||
breakStrategy = BreakStrategy.ERRORS_RATIO;
|
||||
errorRatioThreshold = ratioThreshold;
|
||||
} else {
|
||||
breakStrategy = BreakStrategy.TOTAL_ERRORS;
|
||||
totalErrorThreshold = exceptionCount;
|
||||
}
|
||||
minRequests = minRequestCount;
|
||||
breakDuration = timeWindow;
|
||||
monitorDuration = statInterval;
|
||||
|
||||
if (recoveryStrategy == 1) {
|
||||
resumeStrategy = ResumeStrategy.DETECTIVE;
|
||||
} else if (recoveryStrategy == 2) {
|
||||
resumeStrategy = ResumeStrategy.GRADUAL;
|
||||
resumeDuration = recoveryTimeWindow;
|
||||
initGradualResumeTimeWindowContext();
|
||||
} else {
|
||||
resumeStrategy = ResumeStrategy.IMMEDIATE;
|
||||
}
|
||||
|
||||
this.responseContentType = responseContentType;
|
||||
this.responseContent = responseContent;
|
||||
|
||||
stateStartTime = currentTimeWindow();
|
||||
}
|
||||
|
||||
private static long currentTimeWindow() {
|
||||
return timeWindow(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
private static long timeWindow(long timeMills) {
|
||||
return timeMills / 1000 * 1000;
|
||||
}
|
||||
|
||||
private void initGradualResumeTimeWindowContext() {
|
||||
BigDecimal totalTraffic = new BigDecimal(100);
|
||||
BigDecimal duration = new BigDecimal(resumeDuration);
|
||||
initialResumeTraffic = totalTraffic.divide(duration, 0, RoundingMode.HALF_UP).intValue();
|
||||
if (initialResumeTraffic == 0) {
|
||||
initialResumeTraffic = 1;
|
||||
}
|
||||
|
||||
gradualResumeTimeWindowContexts = new ArrayList<>(resumeDuration);
|
||||
for (int i = 1; i <= resumeDuration; i++) {
|
||||
int resumeTraffic = initialResumeTraffic * i;
|
||||
GradualResumeTimeWindowContext ctx = new GradualResumeTimeWindowContext(resumeTraffic);
|
||||
gradualResumeTimeWindowContexts.add(ctx);
|
||||
}
|
||||
LOGGER.info("{} gradualResumeTimeWindowContexts: {}", resource, gradualResumeTimeWindowContexts);
|
||||
}
|
||||
|
||||
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
|
||||
long nThSecond = getStateDuration(currentTimeWindow);
|
||||
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||
return ctx.permit(resourceStat, currentTimeWindow);
|
||||
}
|
||||
|
||||
private long getStateDuration(long currentTimeWindow) {
|
||||
return (currentTimeWindow - stateStartTime) / 1000 + 1;
|
||||
}
|
||||
|
||||
public void correctState(long currentTimeWindow, FlowStat flowStat) {
|
||||
State s = stateRef.get();
|
||||
long stateDuration = getStateDuration(currentTimeWindow);
|
||||
|
||||
if (s == State.CLOSED && stateDuration > monitorDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than monitor duration {}, correct to CLOSED state",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), monitorDuration);
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
|
||||
} else if (s == State.OPEN && stateDuration > breakDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
|
||||
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
|
||||
public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
|
||||
if (stateRef.get() == State.CLOSED) {
|
||||
long endTimeWindow = currentTimeWindow + 1000;
|
||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||
long reqCount = timeWindowStat.getCompReqs();
|
||||
long errCount = timeWindowStat.getErrors();
|
||||
|
||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount > minRequests && errCount > totalErrorThreshold) {
|
||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error count {} > total error threshold {}, correct to OPEN state as error",
|
||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount > minRequests) {
|
||||
BigDecimal errors = new BigDecimal(errCount);
|
||||
BigDecimal requests = new BigDecimal(reqCount);
|
||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||
if (p - errorRatioThreshold > 0) {
|
||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error ratio {} > error ratio threshold {}, correct to OPEN state as error",
|
||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
|
||||
if (stateRef.compareAndSet(current, target)) {
|
||||
stateStartTime = currentTimeWindow;
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||
AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
|
||||
circuitBreakNum.set(0);
|
||||
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
|
||||
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
||||
correctState(currentTimeWindow, flowStat);
|
||||
if (stateRef.get() == State.CLOSED) {
|
||||
return permitCallInClosedState(currentTimeWindow, flowStat);
|
||||
}
|
||||
if (stateRef.get() == State.OPEN) {
|
||||
return permitCallInOpenState(exchange, currentTimeWindow, flowStat);
|
||||
}
|
||||
if (stateRef.get() == State.RESUME_DETECTIVE) {
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
if (stateRef.get() == State.RESUME_GRADUALLY) {
|
||||
return permitCallInResumeGraduallyState(currentTimeWindow, flowStat);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
||||
|
||||
long endTimeWindow = currentTimeWindow + 1000;
|
||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||
long reqCount = timeWindowStat.getCompReqs();
|
||||
long errCount = timeWindowStat.getErrors();
|
||||
|
||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount > minRequests && errCount > totalErrorThreshold) {
|
||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error count {} > total error threshold {}",
|
||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount > minRequests) {
|
||||
BigDecimal errors = new BigDecimal(errCount);
|
||||
BigDecimal requests = new BigDecimal(reqCount);
|
||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||
if (p - errorRatioThreshold > 0) {
|
||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error ratio {} > error ratio threshold {}",
|
||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
||||
long stateDuration = getStateDuration(currentTimeWindow);
|
||||
if (stateDuration > breakDuration) {
|
||||
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
|
||||
return true;
|
||||
}
|
||||
if (resumeStrategy == ResumeStrategy.DETECTIVE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
|
||||
exchange.getAttributes().put(DETECT_REQUEST, this);
|
||||
return true;
|
||||
}
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
if (resumeStrategy == ResumeStrategy.GRADUAL) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
|
||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
|
||||
long stateDuration = getStateDuration(currentTimeWindow);
|
||||
if (stateDuration > resumeDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
|
||||
transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
|
||||
return true;
|
||||
}
|
||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JacksonUtils.writeValueAsString(this);
|
||||
}
|
||||
}
|
||||
@@ -1,168 +1,179 @@
|
||||
/*
|
||||
* Copyright (C) 2021 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package we.stats.degrade;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.config.AggregateRedisConfig;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.Result;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Degrade rule service
|
||||
*
|
||||
* @author zhongjie
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class DegradeRuleService {
|
||||
|
||||
/**
|
||||
* Redis degrade rule change channel
|
||||
*/
|
||||
private static final String DEGRADE_RULE_CHANNEL = "fizz_degrade_rule_channel";
|
||||
/**
|
||||
* redis degrade rule info hash key
|
||||
*/
|
||||
private static final String DEGRADE_RULE_HASH_KEY = "fizz_degrade_rule";
|
||||
|
||||
|
||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||
private ReactiveStringRedisTemplate rt;
|
||||
|
||||
private Map<String, DegradeRule> resourceId2DegradeRuleMap = new ConcurrentHashMap<>(32);
|
||||
private Map<Long, DegradeRule> id2DegradeRuleMap = new ConcurrentHashMap<>(32);
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Result<?> result = initDegradeRule();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
result = lsnDegradeRuleChange();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
}
|
||||
|
||||
public DegradeRule getDegradeRule(String resourceId) {
|
||||
return resourceId2DegradeRuleMap.get(resourceId);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.initDegradeRule();
|
||||
}
|
||||
|
||||
private Result<?> initDegradeRule() {
|
||||
Result<?> result = Result.succ();
|
||||
|
||||
Map<String, DegradeRule> resourceId2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
|
||||
Map<Long, DegradeRule> id2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
|
||||
|
||||
Flux<Map.Entry<Object, Object>> degradeRuleEntries = rt.opsForHash().entries(DEGRADE_RULE_HASH_KEY);
|
||||
degradeRuleEntries.collectList()
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(es -> {
|
||||
if (!es.isEmpty()) {
|
||||
String json = null;
|
||||
try {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
json = (String) e.getValue();
|
||||
DegradeRule degradeRule = JacksonUtils.readValue(json, DegradeRule.class);
|
||||
resourceId2DegradeRuleMapTmp.put(degradeRule.getResourceId(), degradeRule);
|
||||
id2DegradeRuleMapTmp.put(degradeRule.getId(), degradeRule);
|
||||
log.info("init degrade rule: {}", json);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init degrade rule error, json: " + json;
|
||||
result.t = t;
|
||||
}
|
||||
} else {
|
||||
log.info("no degrade rule");
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
)
|
||||
.onErrorReturn(
|
||||
throwable -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init degrade rule error";
|
||||
result.t = throwable;
|
||||
return true;
|
||||
},
|
||||
result
|
||||
)
|
||||
.block();
|
||||
resourceId2DegradeRuleMap = resourceId2DegradeRuleMapTmp;
|
||||
id2DegradeRuleMap = id2DegradeRuleMapTmp;
|
||||
return result;
|
||||
}
|
||||
|
||||
private Result<?> lsnDegradeRuleChange() {
|
||||
Result<?> result = Result.succ();
|
||||
rt.listenToChannel(DEGRADE_RULE_CHANNEL)
|
||||
.doOnError(
|
||||
t -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "lsn error, channel: " + DEGRADE_RULE_CHANNEL;
|
||||
result.t = t;
|
||||
log.error("lsn channel {} error", DEGRADE_RULE_CHANNEL, t);
|
||||
}
|
||||
)
|
||||
.doOnSubscribe(
|
||||
s -> log.info("success to lsn on {}", DEGRADE_RULE_CHANNEL)
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
DegradeRule degradeRule = JacksonUtils.readValue(message, DegradeRule.class);
|
||||
if (degradeRule.isDeleted()) {
|
||||
DegradeRule remove = id2DegradeRuleMap.remove(degradeRule.getId());
|
||||
if (remove != null) {
|
||||
resourceId2DegradeRuleMap.remove(remove.getResourceId());
|
||||
}
|
||||
log.info("remove degrade rule {}", message);
|
||||
} else {
|
||||
DegradeRule previous = id2DegradeRuleMap.put(degradeRule.getId(), degradeRule);
|
||||
if (previous != null) {
|
||||
if (!previous.getResourceId().equals(degradeRule.getResourceId())) {
|
||||
resourceId2DegradeRuleMap.remove(previous.getResourceId());
|
||||
}
|
||||
}
|
||||
resourceId2DegradeRuleMap.put(degradeRule.getResourceId(), degradeRule);
|
||||
log.info("update degrade rule {}", message);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.error("update degrade rule error, {}", message, t);
|
||||
}
|
||||
}
|
||||
)
|
||||
.subscribe();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2021 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package we.stats.degrade;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.config.AggregateRedisConfig;
|
||||
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.Result;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Degrade rule service
|
||||
*
|
||||
* @author zhongjie
|
||||
*/
|
||||
|
||||
//@Service
|
||||
//@Slf4j
|
||||
public class DegradeRuleService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DegradeRuleService.class);
|
||||
|
||||
|
||||
/**
|
||||
* Redis degrade rule change channel
|
||||
*/
|
||||
private static final String DEGRADE_RULE_CHANNEL = "fizz_degrade_rule_channel";
|
||||
/**
|
||||
* redis degrade rule info hash key
|
||||
*/
|
||||
private static final String DEGRADE_RULE_HASH_KEY = "fizz_degrade_rule";
|
||||
|
||||
|
||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||
private ReactiveStringRedisTemplate rt;
|
||||
|
||||
private Map<String, DegradeRule> resourceId2DegradeRuleMap = new ConcurrentHashMap<>(32);
|
||||
private Map<Long, DegradeRule> id2DegradeRuleMap = new ConcurrentHashMap<>(32);
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Result<?> result = initDegradeRule();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
result = lsnDegradeRuleChange();
|
||||
if (result.code == Result.FAIL) {
|
||||
throw new RuntimeException(result.msg, result.t);
|
||||
}
|
||||
}
|
||||
|
||||
public DegradeRule getDegradeRule(String resourceId) {
|
||||
return resourceId2DegradeRuleMap.get(resourceId);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.initDegradeRule();
|
||||
}
|
||||
|
||||
private Result<?> initDegradeRule() {
|
||||
Result<?> result = Result.succ();
|
||||
|
||||
Map<String, DegradeRule> resourceId2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
|
||||
Map<Long, DegradeRule> id2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
|
||||
|
||||
Flux<Map.Entry<Object, Object>> degradeRuleEntries = rt.opsForHash().entries(DEGRADE_RULE_HASH_KEY);
|
||||
degradeRuleEntries.collectList()
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(es -> {
|
||||
if (!es.isEmpty()) {
|
||||
String json = null;
|
||||
try {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
json = (String) e.getValue();
|
||||
DegradeRule degradeRule = JacksonUtils.readValue(json, DegradeRule.class);
|
||||
resourceId2DegradeRuleMapTmp.put(degradeRule.getResourceId(), degradeRule);
|
||||
id2DegradeRuleMapTmp.put(degradeRule.getId(), degradeRule);
|
||||
log.info("init degrade rule: {}", json);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init degrade rule error, json: " + json;
|
||||
result.t = t;
|
||||
}
|
||||
} else {
|
||||
log.info("no degrade rule");
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
)
|
||||
.onErrorReturn(
|
||||
throwable -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "init degrade rule error";
|
||||
result.t = throwable;
|
||||
return true;
|
||||
},
|
||||
result
|
||||
)
|
||||
.block();
|
||||
resourceId2DegradeRuleMap = resourceId2DegradeRuleMapTmp;
|
||||
id2DegradeRuleMap = id2DegradeRuleMapTmp;
|
||||
return result;
|
||||
}
|
||||
|
||||
private Result<?> lsnDegradeRuleChange() {
|
||||
Result<?> result = Result.succ();
|
||||
rt.listenToChannel(DEGRADE_RULE_CHANNEL)
|
||||
.doOnError(
|
||||
t -> {
|
||||
result.code = Result.FAIL;
|
||||
result.msg = "lsn error, channel: " + DEGRADE_RULE_CHANNEL;
|
||||
result.t = t;
|
||||
log.error("lsn channel {} error", DEGRADE_RULE_CHANNEL, t);
|
||||
}
|
||||
)
|
||||
.doOnSubscribe(
|
||||
s -> log.info("success to lsn on {}", DEGRADE_RULE_CHANNEL)
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
DegradeRule degradeRule = JacksonUtils.readValue(message, DegradeRule.class);
|
||||
if (degradeRule.isDeleted()) {
|
||||
DegradeRule remove = id2DegradeRuleMap.remove(degradeRule.getId());
|
||||
if (remove != null) {
|
||||
resourceId2DegradeRuleMap.remove(remove.getResourceId());
|
||||
}
|
||||
log.info("remove degrade rule {}", message);
|
||||
} else {
|
||||
DegradeRule previous = id2DegradeRuleMap.put(degradeRule.getId(), degradeRule);
|
||||
if (previous != null) {
|
||||
if (!previous.getResourceId().equals(degradeRule.getResourceId())) {
|
||||
resourceId2DegradeRuleMap.remove(previous.getResourceId());
|
||||
}
|
||||
}
|
||||
resourceId2DegradeRuleMap.put(degradeRule.getResourceId(), degradeRule);
|
||||
log.info("update degrade rule {}", message);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.error("update degrade rule error, {}", message, t);
|
||||
}
|
||||
}
|
||||
)
|
||||
.subscribe();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -630,7 +630,7 @@ public abstract class WebUtils {
|
||||
if (v != null) {
|
||||
b.append(Consts.S.EQUAL);
|
||||
if (!Consts.S.EMPTY.equals(v)) {
|
||||
if (StringUtils.indexOfAny(v, Consts.S.LEFT_BRACE, Consts.S.FORWARD_SLASH, Consts.S.HASH) > 0) {
|
||||
if (StringUtils.indexOfAny(v, Consts.S.LEFT_BRACE, Consts.S.FORWARD_SLASH, Consts.S.HASH, Consts.S.EQUAL) > 0) {
|
||||
b.append(URLEncoder.encode(v, Consts.C.UTF8));
|
||||
} else {
|
||||
b.append(v);
|
||||
|
||||
@@ -241,7 +241,7 @@ public class FlowStatTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testPeakConcurrentJob() throws Throwable {
|
||||
long curTimeSlotId = stat.currentTimeSlotId();
|
||||
long nextSlotId = curTimeSlotId + 1000;
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
package we.stats.circuitbreaker;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
|
||||
import org.springframework.mock.web.server.MockServerWebExchange;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
||||
import we.Fizz;
|
||||
import we.redis.RedisProperties;
|
||||
import we.redis.RedisServerConfiguration;
|
||||
import we.redis.RedisTemplateConfiguration;
|
||||
import we.stats.FlowStat;
|
||||
import we.stats.ResourceStat;
|
||||
import we.stats.TimeSlot;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ReflectionUtils;
|
||||
import we.util.ResourceIdUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@TestPropertySource("/application.properties")
|
||||
@SpringJUnitConfig(classes = {RedisProperties.class, RedisTemplateConfiguration.class, RedisServerConfiguration.class})
|
||||
public class CircuitBreakManagerTests {
|
||||
|
||||
@Resource
|
||||
StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
ReactiveStringRedisTemplate reactiveStringRedisTemplate;
|
||||
|
||||
CircuitBreakManager circuitBreakManager;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws NoSuchFieldException {
|
||||
circuitBreakManager = new CircuitBreakManager();
|
||||
ReflectionUtils.set(circuitBreakManager, "rt", reactiveStringRedisTemplate);
|
||||
}
|
||||
|
||||
@Test
|
||||
void constructTest() throws JsonProcessingException {
|
||||
String json = "{\"id\":123456789012345,\"type\":1,\"service\":\"xservice\",\"path\":\"/ypath\",\"strategy\":1,\"ratioThreshold\":0.1,\"exceptionCount\":10,\"minRequestCount\":20,\"timeWindow\":5,\"statInterval\":5,\"recoveryStrategy\":2,\"recoveryTimeWindow\":5,\"responseContentType\":\"application/json\",\"responseContent\":\"error\",\"enable\":1,\"isDeleted\":0}";
|
||||
CircuitBreaker cb = JacksonUtils.readValue(json, CircuitBreaker.class);
|
||||
System.err.println("CircuitBreaker: " + cb);
|
||||
}
|
||||
|
||||
@Test
|
||||
void initTest() throws Throwable {
|
||||
|
||||
Fizz.context = new GenericApplicationContext();
|
||||
Fizz.context.refresh();
|
||||
|
||||
Map<String, String> circuitBreakerMap = new HashMap<>();
|
||||
circuitBreakerMap.put("123456789012345", "{\"id\":123456789012345,\"type\":3,\"service\":\"xservice\",\"path\":\"/ypath\",\"strategy\":2,\"exceptionCount\":10,\"minRequestCount\":20,\"timeWindow\":5,\"statInterval\":5,\"recoveryStrategy\":3,\"responseContentType\":\"application/json\",\"responseContent\":\"error\",\"enable\":1,\"isDeleted\":0}");
|
||||
circuitBreakerMap.put("123456789012346", "{\"id\":123456789012346,\"type\":1,\"service\":\"service_default\",\"strategy\":2,\"exceptionCount\":20,\"minRequestCount\":40,\"timeWindow\":5,\"statInterval\":5,\"recoveryStrategy\":3,\"responseContentType\":\"application/json\",\"responseContent\":\"error\",\"enable\":1,\"isDeleted\":0}");
|
||||
stringRedisTemplate.opsForHash().putAll("fizz_degrade_rule", circuitBreakerMap);
|
||||
|
||||
circuitBreakManager.init();
|
||||
}
|
||||
|
||||
@Test
|
||||
void permitTest() {
|
||||
FlowStat flowStat = new FlowStat(circuitBreakManager);
|
||||
long currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
|
||||
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
|
||||
String service = "xservice";
|
||||
String path = "ypath";
|
||||
|
||||
CircuitBreaker cb = new CircuitBreaker();
|
||||
cb.service = service;
|
||||
cb.path = path;
|
||||
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||
cb.monitorDuration = 5;
|
||||
cb.minRequests = 100;
|
||||
cb.totalErrorThreshold = 10;
|
||||
cb.breakDuration = 5;
|
||||
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.IMMEDIATE;
|
||||
cb.stateStartTime = currentTimeWindow;
|
||||
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getCircuitBreakerMap();
|
||||
circuitBreakerMap.put(cb.resource, cb);
|
||||
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
|
||||
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||
timeSlot.getCompReqs().set(200);
|
||||
timeSlot.getErrors().set(11);
|
||||
|
||||
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertFalse(permit);
|
||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertFalse(permit);
|
||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
|
||||
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum().get());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user