Fix circuit breaker bug
This commit is contained in:
@@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot;
|
|||||||
import com.fizzgate.stats.TimeWindowStat;
|
import com.fizzgate.stats.TimeWindowStat;
|
||||||
import com.fizzgate.util.JacksonUtils;
|
import com.fizzgate.util.JacksonUtils;
|
||||||
import com.fizzgate.util.ResourceIdUtils;
|
import com.fizzgate.util.ResourceIdUtils;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -35,8 +34,7 @@ import java.math.BigDecimal;
|
|||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -159,6 +157,8 @@ public class CircuitBreaker {
|
|||||||
|
|
||||||
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
|
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
|
||||||
|
|
||||||
|
public final AtomicBoolean noProbe = new AtomicBoolean(true);
|
||||||
|
|
||||||
public long stateStartTime;
|
public long stateStartTime;
|
||||||
|
|
||||||
public CircuitBreaker() {
|
public CircuitBreaker() {
|
||||||
@@ -262,7 +262,7 @@ public class CircuitBreaker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
|
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
|
||||||
long nThSecond = getStateDuration(currentTimeWindow);
|
long nThSecond = getStateDuration(currentTimeWindow) / 1000;
|
||||||
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
|
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
|
||||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||||
return ctx.permit(resourceStat, currentTimeWindow);
|
return ctx.permit(resourceStat, currentTimeWindow);
|
||||||
@@ -282,9 +282,20 @@ public class CircuitBreaker {
|
|||||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||||
|
|
||||||
} else if (s == State.OPEN && stateDuration > breakDuration) {
|
} else if (s == State.OPEN && stateDuration > breakDuration) {
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
|
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
|
||||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
|
||||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||||
|
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||||
|
} else if (resumeStrategy == ResumeStrategy.DETECTIVE) {
|
||||||
|
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
|
||||||
|
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||||
|
noProbe.set(true);
|
||||||
|
transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat);
|
||||||
|
} else if (resumeStrategy == ResumeStrategy.GRADUAL) {
|
||||||
|
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
|
||||||
|
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||||
|
transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
|
||||||
|
}
|
||||||
|
|
||||||
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
|
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
|
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
|
||||||
@@ -293,36 +304,9 @@ public class CircuitBreaker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
|
|
||||||
if (stateRef.get() == State.CLOSED) {
|
|
||||||
long endTimeWindow = currentTimeWindow + 1000;
|
|
||||||
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
|
||||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
|
||||||
long reqCount = timeWindowStat.getCompReqs();
|
|
||||||
long errCount = timeWindowStat.getErrors();
|
|
||||||
|
|
||||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
|
||||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
|
|
||||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
|
||||||
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
|
|
||||||
BigDecimal errors = new BigDecimal(errCount);
|
|
||||||
BigDecimal requests = new BigDecimal(reqCount);
|
|
||||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
|
||||||
if (p - errorRatioThreshold >= 0) {
|
|
||||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
|
|
||||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
|
|
||||||
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
|
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
|
||||||
if (stateRef.compareAndSet(current, target)) {
|
if (stateRef.compareAndSet(current, target)) {
|
||||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||||
/*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
|
|
||||||
circuitBreakNum.set(0);*/
|
|
||||||
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
|
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
|
||||||
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
|
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
|
||||||
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
|
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
|
||||||
@@ -333,19 +317,25 @@ public class CircuitBreaker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
||||||
correctState(currentTimeWindow, flowStat);
|
|
||||||
if (stateRef.get() == State.CLOSED) {
|
if (stateRef.get() == State.CLOSED) {
|
||||||
return permitCallInClosedState(currentTimeWindow, flowStat);
|
return permitCallInClosedState(currentTimeWindow, flowStat);
|
||||||
}
|
}
|
||||||
if (stateRef.get() == State.OPEN) {
|
if (stateRef.get() == State.OPEN) {
|
||||||
return permitCallInOpenState(exchange, currentTimeWindow, flowStat);
|
|
||||||
}
|
|
||||||
if (stateRef.get() == State.RESUME_DETECTIVE) {
|
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
|
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (stateRef.get() == State.RESUME_DETECTIVE) {
|
||||||
|
if (noProbe.compareAndSet(true, false)) {
|
||||||
|
exchange.getAttributes().put(DETECT_REQUEST, this);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (stateRef.get() == State.RESUME_GRADUALLY) {
|
if (stateRef.get() == State.RESUME_GRADUALLY) {
|
||||||
return permitCallInResumeGraduallyState(currentTimeWindow, flowStat);
|
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -353,13 +343,12 @@ public class CircuitBreaker {
|
|||||||
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
||||||
|
|
||||||
long endTimeWindow = currentTimeWindow + 1000;
|
long endTimeWindow = currentTimeWindow + 1000;
|
||||||
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
|
||||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
||||||
long reqCount = timeWindowStat.getCompReqs();
|
long reqCount = timeWindowStat.getCompReqs();
|
||||||
long errCount = timeWindowStat.getErrors();
|
long errCount = timeWindowStat.getErrors();
|
||||||
|
|
||||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
||||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
@@ -370,7 +359,7 @@ public class CircuitBreaker {
|
|||||||
BigDecimal requests = new BigDecimal(reqCount);
|
BigDecimal requests = new BigDecimal(reqCount);
|
||||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||||
if (p - errorRatioThreshold >= 0) {
|
if (p - errorRatioThreshold >= 0) {
|
||||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}",
|
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request",
|
||||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||||
@@ -378,54 +367,11 @@ public class CircuitBreaker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
|
||||||
long stateDuration = getStateDuration(currentTimeWindow);
|
|
||||||
if (stateDuration > breakDuration) {
|
|
||||||
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
|
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
|
|
||||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
|
||||||
transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (resumeStrategy == ResumeStrategy.DETECTIVE) {
|
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
|
|
||||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
|
||||||
if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
|
|
||||||
exchange.getAttributes().put(DETECT_REQUEST, this);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (resumeStrategy == ResumeStrategy.GRADUAL) {
|
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
|
|
||||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
|
||||||
transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
|
|
||||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
|
||||||
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
|
|
||||||
long stateDuration = getStateDuration(currentTimeWindow);
|
|
||||||
if (stateDuration > resumeDuration) {
|
|
||||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
|
|
||||||
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
|
|
||||||
transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return JacksonUtils.writeValueAsString(this);
|
return JacksonUtils.writeValueAsString(this);
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
|||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@TestPropertySource("/application.properties")
|
@TestPropertySource("/application.properties")
|
||||||
@@ -108,4 +109,119 @@ public class CircuitBreakManagerTests {
|
|||||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
|
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
|
||||||
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
|
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void detectiveResumeTest() throws InterruptedException {
|
||||||
|
FlowStat flowStat = new FlowStat(circuitBreakManager);
|
||||||
|
flowStat.cleanResource = false;
|
||||||
|
flowStat.createTimeSlotOnlyTraffic = false;
|
||||||
|
long currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
|
||||||
|
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
|
||||||
|
String service = "xservice";
|
||||||
|
String path = "ypath";
|
||||||
|
|
||||||
|
CircuitBreaker cb = new CircuitBreaker();
|
||||||
|
cb.service = service;
|
||||||
|
cb.path = path;
|
||||||
|
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
|
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||||
|
cb.monitorDuration = 5 * 1000;
|
||||||
|
cb.minRequests = 100;
|
||||||
|
cb.totalErrorThreshold = 10;
|
||||||
|
cb.breakDuration = 2 * 1000;
|
||||||
|
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE;
|
||||||
|
cb.stateStartTime = currentTimeWindow;
|
||||||
|
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
|
||||||
|
circuitBreakerMap.put(cb.resource, cb);
|
||||||
|
|
||||||
|
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
|
||||||
|
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||||
|
timeSlot.setCompReqs(200);
|
||||||
|
timeSlot.setErrors(11);
|
||||||
|
|
||||||
|
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertFalse(permit);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
|
||||||
|
Thread.sleep(3000);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||||
|
|
||||||
|
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertTrue(permit);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||||
|
Assertions.assertFalse(cb.noProbe.get());
|
||||||
|
|
||||||
|
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||||
|
Assertions.assertFalse(permit);
|
||||||
|
|
||||||
|
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success
|
||||||
|
|
||||||
|
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get());
|
||||||
|
Assertions.assertTrue(permit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void gradualResumeTest() throws InterruptedException {
|
||||||
|
FlowStat flowStat = new FlowStat(circuitBreakManager);
|
||||||
|
flowStat.cleanResource = false;
|
||||||
|
flowStat.createTimeSlotOnlyTraffic = false;
|
||||||
|
long currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
|
||||||
|
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
|
||||||
|
String service = "xservice";
|
||||||
|
String path = "ypath";
|
||||||
|
|
||||||
|
CircuitBreaker cb = new CircuitBreaker();
|
||||||
|
cb.service = service;
|
||||||
|
cb.path = path;
|
||||||
|
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||||
|
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||||
|
cb.monitorDuration = 5 * 1000;
|
||||||
|
cb.minRequests = 100;
|
||||||
|
cb.totalErrorThreshold = 10;
|
||||||
|
cb.breakDuration = 2 * 1000;
|
||||||
|
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL;
|
||||||
|
cb.resumeDuration = 3 * 1000;
|
||||||
|
cb.stateStartTime = currentTimeWindow;
|
||||||
|
cb.initGradualResumeTimeWindowContext();
|
||||||
|
|
||||||
|
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
|
||||||
|
circuitBreakerMap.put(cb.resource, cb);
|
||||||
|
|
||||||
|
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
|
||||||
|
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||||
|
timeSlot.setCompReqs(200);
|
||||||
|
timeSlot.setErrors(11);
|
||||||
|
|
||||||
|
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertFalse(permit);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
|
||||||
|
Thread.sleep(3000);
|
||||||
|
Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get());
|
||||||
|
|
||||||
|
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||||
|
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||||
|
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||||
|
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
Assertions.assertTrue(permit);
|
||||||
|
for (int i = 0; i < 68; i++) {
|
||||||
|
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||||
|
}
|
||||||
|
Assertions.assertFalse(permit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user