diff --git a/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java b/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java index 839a8a2..4416b96 100644 --- a/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java +++ b/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java @@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot; import com.fizzgate.stats.TimeWindowStat; import com.fizzgate.util.JacksonUtils; import com.fizzgate.util.ResourceIdUtils; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +34,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -159,6 +157,8 @@ public class CircuitBreaker { public final AtomicReference stateRef = new AtomicReference<>(State.CLOSED); + public final AtomicBoolean noProbe = new AtomicBoolean(true); + public long stateStartTime; public CircuitBreaker() { @@ -262,7 +262,7 @@ public class CircuitBreaker { } private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) { - long nThSecond = getStateDuration(currentTimeWindow); + long nThSecond = getStateDuration(currentTimeWindow) / 1000; GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond); ResourceStat resourceStat = flowStat.getResourceStat(resource); return ctx.permit(resourceStat, currentTimeWindow); @@ -282,9 +282,20 @@ public class CircuitBreaker { transit(s, State.CLOSED, currentTimeWindow, flowStat); } else if (s == State.OPEN && stateDuration > breakDuration) { - LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state", - currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); - transit(s, State.CLOSED, currentTimeWindow, flowStat); + if (resumeStrategy == ResumeStrategy.IMMEDIATE) { + LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state", + currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); + transit(s, State.CLOSED, currentTimeWindow, flowStat); + } else if (resumeStrategy == ResumeStrategy.DETECTIVE) { + LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective", + currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); + noProbe.set(true); + transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat); + } else if (resumeStrategy == ResumeStrategy.GRADUAL) { + LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual", + currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); + transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat); + } } else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) { LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state", @@ -293,36 +304,9 @@ public class CircuitBreaker { } } - /*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) { - if (stateRef.get() == State.CLOSED) { - long endTimeWindow = currentTimeWindow + 1000; - // TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow); - TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow); - long reqCount = timeWindowStat.getCompReqs(); - long errCount = timeWindowStat.getErrors(); - - if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) { - LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error", - resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold); - transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); - } else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) { - BigDecimal errors = new BigDecimal(errCount); - BigDecimal requests = new BigDecimal(reqCount); - float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue(); - if (p - errorRatioThreshold >= 0) { - LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error", - resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold); - transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); - } - } - } - }*/ - public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) { if (stateRef.compareAndSet(current, target)) { ResourceStat resourceStat = flowStat.getResourceStat(resource); - /*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum(); - circuitBreakNum.set(0);*/ resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0); resourceStat.updateCircuitBreakState(currentTimeWindow, current, target); LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target); @@ -333,19 +317,25 @@ public class CircuitBreaker { } public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) { - correctState(currentTimeWindow, flowStat); if (stateRef.get() == State.CLOSED) { return permitCallInClosedState(currentTimeWindow, flowStat); } if (stateRef.get() == State.OPEN) { - return permitCallInOpenState(exchange, currentTimeWindow, flowStat); - } - if (stateRef.get() == State.RESUME_DETECTIVE) { flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); + LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime); return false; } + if (stateRef.get() == State.RESUME_DETECTIVE) { + if (noProbe.compareAndSet(true, false)) { + exchange.getAttributes().put(DETECT_REQUEST, this); + return true; + } else { + flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); + return false; + } + } if (stateRef.get() == State.RESUME_GRADUALLY) { - return permitCallInResumeGraduallyState(currentTimeWindow, flowStat); + return isResumeTraffic(currentTimeWindow, flowStat); } return true; } @@ -353,13 +343,12 @@ public class CircuitBreaker { private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) { long endTimeWindow = currentTimeWindow + 1000; - // TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow); TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow); long reqCount = timeWindowStat.getCompReqs(); long errCount = timeWindowStat.getErrors(); if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) { - LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}", + LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request", resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); @@ -370,7 +359,7 @@ public class CircuitBreaker { BigDecimal requests = new BigDecimal(reqCount); float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue(); if (p - errorRatioThreshold >= 0) { - LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}", + LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request", resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); @@ -378,54 +367,11 @@ public class CircuitBreaker { } } - LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime); + LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime); return true; } - private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) { - long stateDuration = getStateDuration(currentTimeWindow); - if (stateDuration > breakDuration) { - if (resumeStrategy == ResumeStrategy.IMMEDIATE) { - LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately", - currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); - transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat); - return true; - } - if (resumeStrategy == ResumeStrategy.DETECTIVE) { - LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective", - currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); - if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) { - exchange.getAttributes().put(DETECT_REQUEST, this); - return true; - } - flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); - return false; - } - if (resumeStrategy == ResumeStrategy.GRADUAL) { - LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual", - currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); - transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat); - return isResumeTraffic(currentTimeWindow, flowStat); - } - } - - flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); - LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime); - return false; - } - - private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) { - long stateDuration = getStateDuration(currentTimeWindow); - if (stateDuration > resumeDuration) { - LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately", - currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration); - transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat); - return true; - } - return isResumeTraffic(currentTimeWindow, flowStat); - } - @Override public String toString() { return JacksonUtils.writeValueAsString(this); diff --git a/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java b/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java index 812a562..e5b0685 100644 --- a/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java +++ b/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java @@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import javax.annotation.Resource; import java.util.HashMap; +import java.util.List; import java.util.Map; @TestPropertySource("/application.properties") @@ -108,4 +109,119 @@ public class CircuitBreakManagerTests { Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get()); Assertions.assertEquals(2, timeSlot.getCircuitBreakNum()); } + + @Test + void detectiveResumeTest() throws InterruptedException { + FlowStat flowStat = new FlowStat(circuitBreakManager); + flowStat.cleanResource = false; + flowStat.createTimeSlotOnlyTraffic = false; + long currentTimeWindow = flowStat.currentTimeSlotId(); + + MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + + String service = "xservice"; + String path = "ypath"; + + CircuitBreaker cb = new CircuitBreaker(); + cb.service = service; + cb.path = path; + cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path); + cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS; + cb.monitorDuration = 5 * 1000; + cb.minRequests = 100; + cb.totalErrorThreshold = 10; + cb.breakDuration = 2 * 1000; + cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE; + cb.stateStartTime = currentTimeWindow; + Map circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap(); + circuitBreakerMap.put(cb.resource, cb); + + ResourceStat resourceStat = flowStat.getResourceStat(cb.resource); + TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow); + timeSlot.setCompReqs(200); + timeSlot.setErrors(11); + + boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertFalse(permit); + Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get()); + Thread.sleep(3000); + Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get()); + + currentTimeWindow = flowStat.currentTimeSlotId(); + mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertTrue(permit); + Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get()); + Assertions.assertFalse(cb.noProbe.get()); + + currentTimeWindow = flowStat.currentTimeSlotId(); + mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get()); + Assertions.assertFalse(permit); + + cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success + + currentTimeWindow = flowStat.currentTimeSlotId(); + mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get()); + Assertions.assertTrue(permit); + } + + @Test + void gradualResumeTest() throws InterruptedException { + FlowStat flowStat = new FlowStat(circuitBreakManager); + flowStat.cleanResource = false; + flowStat.createTimeSlotOnlyTraffic = false; + long currentTimeWindow = flowStat.currentTimeSlotId(); + + MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + + String service = "xservice"; + String path = "ypath"; + + CircuitBreaker cb = new CircuitBreaker(); + cb.service = service; + cb.path = path; + cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path); + cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS; + cb.monitorDuration = 5 * 1000; + cb.minRequests = 100; + cb.totalErrorThreshold = 10; + cb.breakDuration = 2 * 1000; + cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL; + cb.resumeDuration = 3 * 1000; + cb.stateStartTime = currentTimeWindow; + cb.initGradualResumeTimeWindowContext(); + + Map circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap(); + circuitBreakerMap.put(cb.resource, cb); + + ResourceStat resourceStat = flowStat.getResourceStat(cb.resource); + TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow); + timeSlot.setCompReqs(200); + timeSlot.setErrors(11); + + boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertFalse(permit); + Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get()); + Thread.sleep(3000); + Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get()); + + currentTimeWindow = flowStat.currentTimeSlotId(); + mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build(); + mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest); + permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + Assertions.assertTrue(permit); + for (int i = 0; i < 68; i++) { + permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path); + } + Assertions.assertFalse(permit); + } }