Fix multi eureka and circuit breaker bug
This commit is contained in:
@@ -34,7 +34,7 @@
|
|||||||
<commons-codec.version>1.15</commons-codec.version>
|
<commons-codec.version>1.15</commons-codec.version>
|
||||||
<commons-pool2.version>2.11.1</commons-pool2.version>
|
<commons-pool2.version>2.11.1</commons-pool2.version>
|
||||||
<gson.version>2.8.9</gson.version>
|
<gson.version>2.8.9</gson.version>
|
||||||
<netty-tcnative.version>2.0.47.Final</netty-tcnative.version>
|
<netty-tcnative.version>2.0.48.Final</netty-tcnative.version>
|
||||||
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
|
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
|
||||||
<resilience4j.version>1.7.1</resilience4j.version>
|
<resilience4j.version>1.7.1</resilience4j.version>
|
||||||
<snakeyaml.version>1.30</snakeyaml.version>
|
<snakeyaml.version>1.30</snakeyaml.version>
|
||||||
|
|||||||
@@ -119,6 +119,8 @@ public class FlowControlFilter extends FizzWebFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setTraceId(exchange);
|
||||||
|
|
||||||
if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) {
|
if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) {
|
||||||
String traceId = WebUtils.getTraceId(exchange);
|
String traceId = WebUtils.getTraceId(exchange);
|
||||||
LogService.setBizId(traceId);
|
LogService.setBizId(traceId);
|
||||||
@@ -145,6 +147,10 @@ public class FlowControlFilter extends FizzWebFilter {
|
|||||||
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
|
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
|
||||||
|
|
||||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(blockedResourceId);
|
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(blockedResourceId);
|
||||||
|
if (cb == null) {
|
||||||
|
cb = circuitBreakManager.getCircuitBreaker(ResourceIdUtils.buildResourceId(null, null, null, service, null));
|
||||||
|
}
|
||||||
|
|
||||||
if (cb.responseContentType != null) {
|
if (cb.responseContentType != null) {
|
||||||
responseContentType = cb.responseContentType;
|
responseContentType = cb.responseContentType;
|
||||||
responseContent = cb.responseContent;
|
responseContent = cb.responseContent;
|
||||||
@@ -208,7 +214,7 @@ public class FlowControlFilter extends FizzWebFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setTraceId(exchange);
|
// setTraceId(exchange);
|
||||||
return chain.filter(exchange);
|
return chain.filter(exchange);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,6 +321,7 @@ public class FlowControlFilter extends FizzWebFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkRateLimitConfigAndAddTo(List<ResourceConfig> resourceConfigs, String resource, String defaultRateLimitConfigId, boolean checkDegradeRule) {
|
private void checkRateLimitConfigAndAddTo(List<ResourceConfig> resourceConfigs, String resource, String defaultRateLimitConfigId, boolean checkDegradeRule) {
|
||||||
|
int prevSize = resourceConfigs.size();
|
||||||
ResourceConfig rc = null;
|
ResourceConfig rc = null;
|
||||||
ResourceRateLimitConfig rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
|
ResourceRateLimitConfig rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
|
||||||
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
|
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
|
||||||
@@ -365,17 +372,22 @@ public class FlowControlFilter extends FizzWebFilter {
|
|||||||
}
|
}
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
if (checkDegradeRule) {
|
if (checkDegradeRule && resourceConfigs.size() == prevSize) {
|
||||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
|
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
|
||||||
|
if (cb == null) {
|
||||||
|
if (defaultRateLimitConfigId != null && defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) {
|
||||||
|
cb = circuitBreakManager.getCircuitBreaker(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||||
|
if (cb == null || !cb.serviceDefaultEnable) {
|
||||||
|
cb = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (cb != null) {
|
if (cb != null) {
|
||||||
if (cb.type == CircuitBreaker.Type.SERVICE_DEFAULT && !cb.serviceDefaultEnable) {
|
|
||||||
} else {
|
|
||||||
rc = new ResourceConfig(resource, 0, 0);
|
rc = new ResourceConfig(resource, 0, 0);
|
||||||
resourceConfigs.add(rc);
|
resourceConfigs.add(rc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private void something4appAndIp(List<ResourceConfig> resourceConfigs, ResourceRateLimitConfig rateLimitConfig) {
|
private void something4appAndIp(List<ResourceConfig> resourceConfigs, ResourceRateLimitConfig rateLimitConfig) {
|
||||||
int sz = resourceConfigs.size();
|
int sz = resourceConfigs.size();
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ public class RegistryCenterService implements ApplicationListener<ContextRefresh
|
|||||||
registryCenterMap.put(rc.name, rc);
|
registryCenterMap.put(rc.name, rc);
|
||||||
log.info("init registry center {}", rc.name);
|
log.info("init registry center {}", rc.name);
|
||||||
rc.initFizzServiceRegistration(applicationContext);
|
rc.initFizzServiceRegistration(applicationContext);
|
||||||
|
rc.getFizzServiceRegistration().register();
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
result.code = Result.FAIL;
|
result.code = Result.FAIL;
|
||||||
|
|||||||
@@ -79,15 +79,14 @@ public abstract class FizzEurekaHelper {
|
|||||||
PropertiesUtils.setBeanPropertyValue(eurekaInstanceConfig, eurekaProps);
|
PropertiesUtils.setBeanPropertyValue(eurekaInstanceConfig, eurekaProps);
|
||||||
|
|
||||||
String appname = eurekaInstanceConfig.getAppname();
|
String appname = eurekaInstanceConfig.getAppname();
|
||||||
if (appname == null) {
|
if (appname == null || appname.equals("unknown")) {
|
||||||
appname = applicationContext.getApplicationName();
|
appname = applicationContext.getEnvironment().getProperty("spring.application.name");
|
||||||
eurekaInstanceConfig.setAppname(appname);
|
eurekaInstanceConfig.setAppname(appname);
|
||||||
}
|
}
|
||||||
|
|
||||||
// VirtualHostName
|
// VirtualHostName
|
||||||
String virtualHostName = eurekaInstanceConfig.getVirtualHostName();
|
String virtualHostName = eurekaInstanceConfig.getVirtualHostName();
|
||||||
|
if (virtualHostName == null || virtualHostName.equals("unknown")) {
|
||||||
if (virtualHostName.equals("unknown")) {
|
|
||||||
eurekaInstanceConfig.setVirtualHostName(appname);
|
eurekaInstanceConfig.setVirtualHostName(appname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,11 +34,9 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.springframework.web.server.ServerWebExchange;
|
import org.springframework.web.server.ServerWebExchange;
|
||||||
import we.Fizz;
|
|
||||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||||
import we.stats.circuitbreaker.CircuitBreaker;
|
import we.stats.circuitbreaker.CircuitBreaker;
|
||||||
import we.util.ResourceIdUtils;
|
import we.util.ResourceIdUtils;
|
||||||
import we.util.Utils;
|
|
||||||
import we.util.WebUtils;
|
import we.util.WebUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -262,10 +260,14 @@ public class FlowStat {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (ResourceConfig resourceConfig : resourceConfigs) {
|
||||||
|
getResourceStat(resourceConfig.getResourceId());
|
||||||
|
}
|
||||||
|
|
||||||
String service = WebUtils.getClientService(exchange);
|
String service = WebUtils.getClientService(exchange);
|
||||||
String path = WebUtils.getClientReqPath(exchange);
|
String path = WebUtils.getClientReqPath(exchange);
|
||||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
boolean permit = circuitBreakManager.permit(exchange, curTimeSlotId, this, resource);
|
boolean permit = circuitBreakManager.permit(exchange, curTimeSlotId, this, service, path);
|
||||||
if (!permit) {
|
if (!permit) {
|
||||||
return IncrRequestResult.block(resource, BlockType.CIRCUIT_BREAK);
|
return IncrRequestResult.block(resource, BlockType.CIRCUIT_BREAK);
|
||||||
}
|
}
|
||||||
@@ -314,7 +316,7 @@ public class FlowStat {
|
|||||||
|
|
||||||
String service = WebUtils.getClientService(exchange);
|
String service = WebUtils.getClientService(exchange);
|
||||||
String path = WebUtils.getClientReqPath(exchange);
|
String path = WebUtils.getClientReqPath(exchange);
|
||||||
circuitBreakManager.correctCircuitBreakerState4error(exchange, timeSlotId, this, service, path);
|
circuitBreakManager.correctCircuitBreakerStateAsError(exchange, timeSlotId, this, service, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -25,16 +25,16 @@ import org.springframework.web.server.ServerWebExchange;
|
|||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import we.config.AggregateRedisConfig;
|
import we.config.AggregateRedisConfig;
|
||||||
|
import we.flume.clients.log4j2appender.LogService;
|
||||||
import we.stats.FlowStat;
|
import we.stats.FlowStat;
|
||||||
import we.util.JacksonUtils;
|
import we.util.JacksonUtils;
|
||||||
import we.util.ResourceIdUtils;
|
import we.util.ResourceIdUtils;
|
||||||
import we.util.Result;
|
import we.util.Result;
|
||||||
|
import we.util.WebUtils;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.Collections;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author hongqiaowei
|
* @author hongqiaowei
|
||||||
@@ -43,11 +43,13 @@ import java.util.Map;
|
|||||||
@Component
|
@Component
|
||||||
public class CircuitBreakManager {
|
public class CircuitBreakManager {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CircuitBreakManager.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(CircuitBreakManager.class);
|
||||||
|
|
||||||
private Map<String/*child resource*/, String/*parent resource*/> parentResourceMap = new HashMap<>(128);
|
// private Map<String/*child resource*/, String/*parent resource*/> parentResourceMap = new HashMap<>(128);
|
||||||
|
|
||||||
private Map<String/*resource*/, CircuitBreaker> circuitBreakerMap = new HashMap<>(64);
|
private final Map<String/*resource*/, CircuitBreaker> circuitBreakerMap = new HashMap<>(64);
|
||||||
|
|
||||||
|
private final Set<String> circuitBreakersFromServiceDefault = new HashSet<>(64);
|
||||||
|
|
||||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||||
private ReactiveStringRedisTemplate rt;
|
private ReactiveStringRedisTemplate rt;
|
||||||
@@ -62,7 +64,7 @@ public class CircuitBreakManager {
|
|||||||
if (result.code == Result.FAIL) {
|
if (result.code == Result.FAIL) {
|
||||||
throw new RuntimeException(result.msg, result.t);
|
throw new RuntimeException(result.msg, result.t);
|
||||||
}
|
}
|
||||||
log.info("init parentResourceMap: {}", parentResourceMap);
|
// LOGGER.info("init parentResourceMap: {}", parentResourceMap);
|
||||||
// schedule();
|
// schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,8 +82,8 @@ public class CircuitBreakManager {
|
|||||||
json = (String) e.getValue();
|
json = (String) e.getValue();
|
||||||
CircuitBreaker cb = JacksonUtils.readValue(json, CircuitBreaker.class);
|
CircuitBreaker cb = JacksonUtils.readValue(json, CircuitBreaker.class);
|
||||||
circuitBreakerMap.put(cb.resource, cb);
|
circuitBreakerMap.put(cb.resource, cb);
|
||||||
updateParentResourceMap(cb);
|
// updateParentResourceMap(cb);
|
||||||
log.info("init circuit breaker {}", cb);
|
LOGGER.info("init circuit breaker {}", cb);
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
result.code = Result.FAIL;
|
result.code = Result.FAIL;
|
||||||
@@ -89,7 +91,7 @@ public class CircuitBreakManager {
|
|||||||
result.t = t;
|
result.t = t;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.info("no circuit breaker config");
|
LOGGER.info("no circuit breaker config");
|
||||||
}
|
}
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
@@ -116,12 +118,12 @@ public class CircuitBreakManager {
|
|||||||
result.code = Result.FAIL;
|
result.code = Result.FAIL;
|
||||||
result.msg = "lsn error, channel: " + channel;
|
result.msg = "lsn error, channel: " + channel;
|
||||||
result.t = t;
|
result.t = t;
|
||||||
log.error("lsn channel {} error", channel, t);
|
LOGGER.error("lsn channel {} error", channel, t);
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
.doOnSubscribe(
|
.doOnSubscribe(
|
||||||
s -> {
|
s -> {
|
||||||
log.info("success to lsn on {}", channel);
|
LOGGER.info("success to lsn on {}", channel);
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
.doOnNext(
|
.doOnNext(
|
||||||
@@ -131,15 +133,23 @@ public class CircuitBreakManager {
|
|||||||
CircuitBreaker cb = JacksonUtils.readValue(message, CircuitBreaker.class);
|
CircuitBreaker cb = JacksonUtils.readValue(message, CircuitBreaker.class);
|
||||||
if (cb.isDeleted) {
|
if (cb.isDeleted) {
|
||||||
circuitBreakerMap.remove(cb.resource);
|
circuitBreakerMap.remove(cb.resource);
|
||||||
log.info("remove circuit breaker: {}", cb);
|
LOGGER.info("remove circuit breaker: {}", cb);
|
||||||
} else {
|
} else {
|
||||||
circuitBreakerMap.put(cb.resource, cb);
|
circuitBreakerMap.put(cb.resource, cb);
|
||||||
log.info("update circuit breaker: {}", cb);
|
LOGGER.info("update circuit breaker: {}", cb);
|
||||||
|
}
|
||||||
|
// updateParentResourceMap(cb);
|
||||||
|
// LOGGER.info("update parentResourceMap: {}", parentResourceMap);
|
||||||
|
if (cb.type == CircuitBreaker.Type.SERVICE_DEFAULT) {
|
||||||
|
if (cb.isDeleted || !cb.serviceDefaultEnable) {
|
||||||
|
for (String resource : circuitBreakersFromServiceDefault) {
|
||||||
|
circuitBreakerMap.remove(resource);
|
||||||
|
}
|
||||||
|
circuitBreakersFromServiceDefault.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
updateParentResourceMap(cb);
|
|
||||||
log.info("update parentResourceMap: {}", parentResourceMap);
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.error("update circuit breaker error, {}", message, t);
|
LOGGER.error("update circuit breaker error, {}", message, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@@ -147,7 +157,7 @@ public class CircuitBreakManager {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateParentResourceMap(CircuitBreaker cb) {
|
/*private void updateParentResourceMap(CircuitBreaker cb) {
|
||||||
String parentResource = null;
|
String parentResource = null;
|
||||||
if (cb.isDeleted) {
|
if (cb.isDeleted) {
|
||||||
if (cb.type == CircuitBreaker.Type.PATH) {
|
if (cb.type == CircuitBreaker.Type.PATH) {
|
||||||
@@ -170,14 +180,64 @@ public class CircuitBreakManager {
|
|||||||
parentResourceMap.put(parentResource, ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
parentResourceMap.put(parentResource, ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}*/
|
||||||
|
|
||||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
||||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
return permit(exchange, currentTimeWindow, flowStat, resource);
|
// return permit(exchange, currentTimeWindow, flowStat, resource);
|
||||||
|
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||||
|
if (cb == null) {
|
||||||
|
resource = ResourceIdUtils.buildResourceId(null, null, null, service, null);
|
||||||
|
cb = circuitBreakerMap.get(resource);
|
||||||
|
if (cb == null) {
|
||||||
|
cb = circuitBreakerMap.get(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||||
|
if (cb != null && cb.serviceDefaultEnable) {
|
||||||
|
cb = buildCircuitBreakerFromServiceDefault(service, resource);
|
||||||
|
} else {
|
||||||
|
cb = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cb == null) {
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
|
||||||
|
}
|
||||||
|
return cb.permit(exchange, currentTimeWindow, flowStat);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
private CircuitBreaker buildCircuitBreakerFromServiceDefault(String service, String resource) {
|
||||||
|
CircuitBreaker serviceDefaultCircuitBreaker = circuitBreakerMap.get(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||||
|
|
||||||
|
CircuitBreaker cb = new CircuitBreaker();
|
||||||
|
cb.type = CircuitBreaker.Type.SERVICE;
|
||||||
|
cb.service = service;
|
||||||
|
cb.serviceDefaultEnable = true;
|
||||||
|
cb.resource = resource;
|
||||||
|
cb.breakStrategy = serviceDefaultCircuitBreaker.breakStrategy;
|
||||||
|
cb.errorRatioThreshold = serviceDefaultCircuitBreaker.errorRatioThreshold;
|
||||||
|
cb.totalErrorThreshold = serviceDefaultCircuitBreaker.totalErrorThreshold;
|
||||||
|
cb.minRequests = serviceDefaultCircuitBreaker.minRequests;
|
||||||
|
cb.monitorDuration = serviceDefaultCircuitBreaker.monitorDuration;
|
||||||
|
cb.breakDuration = serviceDefaultCircuitBreaker.breakDuration;
|
||||||
|
cb.resumeStrategy = serviceDefaultCircuitBreaker.resumeStrategy;
|
||||||
|
if (cb.resumeStrategy == CircuitBreaker.ResumeStrategy.GRADUAL) {
|
||||||
|
cb.resumeDuration = serviceDefaultCircuitBreaker.resumeDuration;
|
||||||
|
cb.initGradualResumeTimeWindowContext();
|
||||||
|
}
|
||||||
|
cb.stateStartTime = serviceDefaultCircuitBreaker.stateStartTime;
|
||||||
|
|
||||||
|
circuitBreakerMap.put(resource, cb);
|
||||||
|
circuitBreakersFromServiceDefault.add(resource);
|
||||||
|
|
||||||
|
return cb;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
||||||
while (true) {
|
while (true) {
|
||||||
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||||
if (cb != null) {
|
if (cb != null) {
|
||||||
@@ -190,14 +250,37 @@ public class CircuitBreakManager {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}*/
|
||||||
|
|
||||||
public void correctCircuitBreakerState4error(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
public void correctCircuitBreakerStateAsError(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String service, String path) {
|
||||||
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
String resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
correctCircuitBreakerState4error(exchange, currentTimeWindow, flowStat, resource);
|
// correctCircuitBreakerState4error(exchange, currentTimeWindow, flowStat, resource);
|
||||||
|
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||||
|
if (cb == null) {
|
||||||
|
resource = ResourceIdUtils.buildResourceId(null, null, null, service, null);
|
||||||
|
cb = circuitBreakerMap.get(resource);
|
||||||
|
if (cb == null) {
|
||||||
|
cb = circuitBreakerMap.get(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||||
|
if (cb != null && cb.serviceDefaultEnable) {
|
||||||
|
cb = buildCircuitBreakerFromServiceDefault(service, resource);
|
||||||
|
} else {
|
||||||
|
cb = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cb == null) {
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
|
||||||
|
}
|
||||||
|
cb.correctCircuitBreakerStateAsError(currentTimeWindow, flowStat);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void correctCircuitBreakerState4error(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
/*public void correctCircuitBreakerState4error(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat, String resource) {
|
||||||
while (true) {
|
while (true) {
|
||||||
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
CircuitBreaker cb = circuitBreakerMap.get(resource);
|
||||||
if (cb != null) {
|
if (cb != null) {
|
||||||
@@ -211,7 +294,7 @@ public class CircuitBreakManager {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}*/
|
||||||
|
|
||||||
public CircuitBreaker getCircuitBreaker(String resource) {
|
public CircuitBreaker getCircuitBreaker(String resource) {
|
||||||
return circuitBreakerMap.get(resource);
|
return circuitBreakerMap.get(resource);
|
||||||
@@ -221,7 +304,7 @@ public class CircuitBreakManager {
|
|||||||
return circuitBreakerMap;
|
return circuitBreakerMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getParentResourceMap() {
|
/*public Map<String, String> getParentResourceMap() {
|
||||||
return parentResourceMap;
|
return parentResourceMap;
|
||||||
}
|
}*/
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -209,14 +209,14 @@ public class CircuitBreaker {
|
|||||||
totalErrorThreshold = exceptionCount;
|
totalErrorThreshold = exceptionCount;
|
||||||
}
|
}
|
||||||
minRequests = minRequestCount;
|
minRequests = minRequestCount;
|
||||||
breakDuration = timeWindow;
|
monitorDuration = statInterval * 1000;
|
||||||
monitorDuration = statInterval;
|
breakDuration = timeWindow * 1000;
|
||||||
|
|
||||||
if (recoveryStrategy == 1) {
|
if (recoveryStrategy == 1) {
|
||||||
resumeStrategy = ResumeStrategy.DETECTIVE;
|
resumeStrategy = ResumeStrategy.DETECTIVE;
|
||||||
} else if (recoveryStrategy == 2) {
|
} else if (recoveryStrategy == 2) {
|
||||||
resumeStrategy = ResumeStrategy.GRADUAL;
|
resumeStrategy = ResumeStrategy.GRADUAL;
|
||||||
resumeDuration = recoveryTimeWindow;
|
resumeDuration = recoveryTimeWindow * 1000;
|
||||||
initGradualResumeTimeWindowContext();
|
initGradualResumeTimeWindowContext();
|
||||||
} else {
|
} else {
|
||||||
resumeStrategy = ResumeStrategy.IMMEDIATE;
|
resumeStrategy = ResumeStrategy.IMMEDIATE;
|
||||||
@@ -236,16 +236,17 @@ public class CircuitBreaker {
|
|||||||
return timeMills / 1000 * 1000;
|
return timeMills / 1000 * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initGradualResumeTimeWindowContext() {
|
public void initGradualResumeTimeWindowContext() {
|
||||||
BigDecimal totalTraffic = new BigDecimal(100);
|
BigDecimal totalTraffic = new BigDecimal(100);
|
||||||
BigDecimal duration = new BigDecimal(resumeDuration);
|
int resumeDurationSecs = this.resumeDuration / 1000;
|
||||||
|
BigDecimal duration = new BigDecimal(resumeDurationSecs);
|
||||||
initialResumeTraffic = totalTraffic.divide(duration, 0, RoundingMode.HALF_UP).intValue();
|
initialResumeTraffic = totalTraffic.divide(duration, 0, RoundingMode.HALF_UP).intValue();
|
||||||
if (initialResumeTraffic == 0) {
|
if (initialResumeTraffic == 0) {
|
||||||
initialResumeTraffic = 1;
|
initialResumeTraffic = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
gradualResumeTimeWindowContexts = new ArrayList<>(resumeDuration);
|
gradualResumeTimeWindowContexts = new ArrayList<>(resumeDurationSecs);
|
||||||
for (int i = 1; i <= resumeDuration; i++) {
|
for (int i = 1; i <= resumeDurationSecs; i++) {
|
||||||
int resumeTraffic = initialResumeTraffic * i;
|
int resumeTraffic = initialResumeTraffic * i;
|
||||||
GradualResumeTimeWindowContext ctx = new GradualResumeTimeWindowContext(resumeTraffic);
|
GradualResumeTimeWindowContext ctx = new GradualResumeTimeWindowContext(resumeTraffic);
|
||||||
gradualResumeTimeWindowContexts.add(ctx);
|
gradualResumeTimeWindowContexts.add(ctx);
|
||||||
@@ -261,7 +262,7 @@ public class CircuitBreaker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private long getStateDuration(long currentTimeWindow) {
|
private long getStateDuration(long currentTimeWindow) {
|
||||||
return (currentTimeWindow - stateStartTime) / 1000 + 1;
|
return currentTimeWindow - stateStartTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void correctState(long currentTimeWindow, FlowStat flowStat) {
|
public void correctState(long currentTimeWindow, FlowStat flowStat) {
|
||||||
@@ -288,20 +289,21 @@ public class CircuitBreaker {
|
|||||||
public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
|
public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
|
||||||
if (stateRef.get() == State.CLOSED) {
|
if (stateRef.get() == State.CLOSED) {
|
||||||
long endTimeWindow = currentTimeWindow + 1000;
|
long endTimeWindow = currentTimeWindow + 1000;
|
||||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||||
|
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
||||||
long reqCount = timeWindowStat.getCompReqs();
|
long reqCount = timeWindowStat.getCompReqs();
|
||||||
long errCount = timeWindowStat.getErrors();
|
long errCount = timeWindowStat.getErrors();
|
||||||
|
|
||||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount > minRequests && errCount > totalErrorThreshold) {
|
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
||||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error count {} > total error threshold {}, correct to OPEN state as error",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount > minRequests) {
|
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
|
||||||
BigDecimal errors = new BigDecimal(errCount);
|
BigDecimal errors = new BigDecimal(errCount);
|
||||||
BigDecimal requests = new BigDecimal(reqCount);
|
BigDecimal requests = new BigDecimal(reqCount);
|
||||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||||
if (p - errorRatioThreshold > 0) {
|
if (p - errorRatioThreshold >= 0) {
|
||||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error ratio {} > error ratio threshold {}, correct to OPEN state as error",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
}
|
}
|
||||||
@@ -343,23 +345,24 @@ public class CircuitBreaker {
|
|||||||
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
||||||
|
|
||||||
long endTimeWindow = currentTimeWindow + 1000;
|
long endTimeWindow = currentTimeWindow + 1000;
|
||||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||||
|
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
||||||
long reqCount = timeWindowStat.getCompReqs();
|
long reqCount = timeWindowStat.getCompReqs();
|
||||||
long errCount = timeWindowStat.getErrors();
|
long errCount = timeWindowStat.getErrors();
|
||||||
|
|
||||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount > minRequests && errCount > totalErrorThreshold) {
|
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
||||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error count {} > total error threshold {}",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount > minRequests) {
|
if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
|
||||||
BigDecimal errors = new BigDecimal(errCount);
|
BigDecimal errors = new BigDecimal(errCount);
|
||||||
BigDecimal requests = new BigDecimal(reqCount);
|
BigDecimal requests = new BigDecimal(reqCount);
|
||||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||||
if (p - errorRatioThreshold > 0) {
|
if (p - errorRatioThreshold >= 0) {
|
||||||
LOGGER.debug("{} current time window {} request count {} > min requests {} error ratio {} > error ratio threshold {}",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
|
|||||||
@@ -14,11 +14,14 @@ import we.redis.RedisProperties;
|
|||||||
import we.redis.RedisServerConfiguration;
|
import we.redis.RedisServerConfiguration;
|
||||||
import we.redis.RedisTemplateConfiguration;
|
import we.redis.RedisTemplateConfiguration;
|
||||||
import we.service_registry.eureka.FizzEurekaServiceRegistration;
|
import we.service_registry.eureka.FizzEurekaServiceRegistration;
|
||||||
|
import we.util.PropertiesUtils;
|
||||||
import we.util.ReflectionUtils;
|
import we.util.ReflectionUtils;
|
||||||
|
import we.util.YmlUtils;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author hongqiaowei
|
* @author hongqiaowei
|
||||||
@@ -60,4 +63,20 @@ public class RegistryCenterServiceTests {
|
|||||||
fizzServiceRegistration.register();
|
fizzServiceRegistration.register();
|
||||||
// Thread.currentThread().join();
|
// Thread.currentThread().join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
void twoEurekaTest() throws InterruptedException {
|
||||||
|
System.setProperty("server.port", "8866");
|
||||||
|
Fizz.context = new GenericApplicationContext();
|
||||||
|
Fizz.context.refresh();
|
||||||
|
|
||||||
|
String e1 = FileUtil.readString("eureka1.yml", CharsetUtil.CHARSET_UTF_8);
|
||||||
|
FizzServiceRegistration fizzServiceRegistration1 = FizzServiceRegistration.getFizzServiceRegistration(Fizz.context, FizzServiceRegistration.Type.EUREKA, FizzServiceRegistration.ConfigFormat.YML, e1);
|
||||||
|
fizzServiceRegistration1.register();
|
||||||
|
|
||||||
|
String e2 = FileUtil.readString("eureka2.yml", CharsetUtil.CHARSET_UTF_8);
|
||||||
|
FizzServiceRegistration fizzServiceRegistration2 = FizzServiceRegistration.getFizzServiceRegistration(Fizz.context, FizzServiceRegistration.Type.EUREKA, FizzServiceRegistration.ConfigFormat.YML, e2);
|
||||||
|
fizzServiceRegistration2.register();
|
||||||
|
Thread.currentThread().join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -81,10 +81,10 @@ public class CircuitBreakManagerTests {
|
|||||||
cb.path = path;
|
cb.path = path;
|
||||||
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||||
cb.monitorDuration = 5;
|
cb.monitorDuration = 5 * 1000;
|
||||||
cb.minRequests = 100;
|
cb.minRequests = 100;
|
||||||
cb.totalErrorThreshold = 10;
|
cb.totalErrorThreshold = 10;
|
||||||
cb.breakDuration = 5;
|
cb.breakDuration = 5 * 1000;
|
||||||
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.IMMEDIATE;
|
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.IMMEDIATE;
|
||||||
cb.stateStartTime = currentTimeWindow;
|
cb.stateStartTime = currentTimeWindow;
|
||||||
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getCircuitBreakerMap();
|
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getCircuitBreakerMap();
|
||||||
|
|||||||
10
fizz-core/src/test/resources/eureka1.yml
Normal file
10
fizz-core/src/test/resources/eureka1.yml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
|
||||||
|
eureka:
|
||||||
|
client:
|
||||||
|
enabled: true
|
||||||
|
serviceUrl:
|
||||||
|
defaultZone: http://1.1.1.1:6600/eureka
|
||||||
|
instance:
|
||||||
|
appname: abc
|
||||||
|
prefer-ip-address: true
|
||||||
|
lease-renewal-interval-in-seconds: 66
|
||||||
10
fizz-core/src/test/resources/eureka2.yml
Normal file
10
fizz-core/src/test/resources/eureka2.yml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
|
||||||
|
eureka:
|
||||||
|
client:
|
||||||
|
enabled: true
|
||||||
|
serviceUrl:
|
||||||
|
defaultZone: http://2.2.2.2:6600/eureka
|
||||||
|
instance:
|
||||||
|
appname: abc
|
||||||
|
prefer-ip-address: true
|
||||||
|
lease-renewal-interval-in-seconds: 66
|
||||||
4
pom.xml
4
pom.xml
@@ -22,7 +22,7 @@
|
|||||||
<r2dbc-mysql.version>0.8.2</r2dbc-mysql.version>
|
<r2dbc-mysql.version>0.8.2</r2dbc-mysql.version>
|
||||||
<reflections.version>0.9.11</reflections.version>
|
<reflections.version>0.9.11</reflections.version>
|
||||||
<commons-pool2.version>2.11.1</commons-pool2.version>
|
<commons-pool2.version>2.11.1</commons-pool2.version>
|
||||||
<netty-tcnative.version>2.0.47.Final</netty-tcnative.version>
|
<netty-tcnative.version>2.0.48.Final</netty-tcnative.version>
|
||||||
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
|
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
|
||||||
<resilience4j.version>1.7.1</resilience4j.version>
|
<resilience4j.version>1.7.1</resilience4j.version>
|
||||||
<snakeyaml.version>1.30</snakeyaml.version>
|
<snakeyaml.version>1.30</snakeyaml.version>
|
||||||
@@ -443,7 +443,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>cn.hutool</groupId>
|
<groupId>cn.hutool</groupId>
|
||||||
<artifactId>hutool-crypto</artifactId>
|
<artifactId>hutool-crypto</artifactId>
|
||||||
<version>5.7.19</version>
|
<version>5.7.20</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|||||||
Reference in New Issue
Block a user