Merge branch 'develop' into feature/v3.0.0

This commit is contained in:
Francis Dong
2023-06-28 17:05:57 +08:00
2 changed files with 148 additions and 86 deletions

View File

@@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot;
import com.fizzgate.stats.TimeWindowStat; import com.fizzgate.stats.TimeWindowStat;
import com.fizzgate.util.JacksonUtils; import com.fizzgate.util.JacksonUtils;
import com.fizzgate.util.ResourceIdUtils; import com.fizzgate.util.ResourceIdUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -35,8 +34,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@@ -159,6 +157,8 @@ public class CircuitBreaker {
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED); public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
public final AtomicBoolean noProbe = new AtomicBoolean(true);
public long stateStartTime; public long stateStartTime;
public CircuitBreaker() { public CircuitBreaker() {
@@ -262,7 +262,7 @@ public class CircuitBreaker {
} }
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) { private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
long nThSecond = getStateDuration(currentTimeWindow); long nThSecond = getStateDuration(currentTimeWindow) / 1000;
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond); GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
ResourceStat resourceStat = flowStat.getResourceStat(resource); ResourceStat resourceStat = flowStat.getResourceStat(resource);
return ctx.permit(resourceStat, currentTimeWindow); return ctx.permit(resourceStat, currentTimeWindow);
@@ -282,9 +282,20 @@ public class CircuitBreaker {
transit(s, State.CLOSED, currentTimeWindow, flowStat); transit(s, State.CLOSED, currentTimeWindow, flowStat);
} else if (s == State.OPEN && stateDuration > breakDuration) { } else if (s == State.OPEN && stateDuration > breakDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state", if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
transit(s, State.CLOSED, currentTimeWindow, flowStat); currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(s, State.CLOSED, currentTimeWindow, flowStat);
} else if (resumeStrategy == ResumeStrategy.DETECTIVE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
noProbe.set(true);
transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat);
} else if (resumeStrategy == ResumeStrategy.GRADUAL) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
}
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) { } else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state", LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
@@ -293,36 +304,9 @@ public class CircuitBreaker {
} }
} }
/*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
if (stateRef.get() == State.CLOSED) {
long endTimeWindow = currentTimeWindow + 1000;
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
long reqCount = timeWindowStat.getCompReqs();
long errCount = timeWindowStat.getErrors();
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
BigDecimal errors = new BigDecimal(errCount);
BigDecimal requests = new BigDecimal(reqCount);
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
if (p - errorRatioThreshold >= 0) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
}
}
}
}*/
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) { public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
if (stateRef.compareAndSet(current, target)) { if (stateRef.compareAndSet(current, target)) {
ResourceStat resourceStat = flowStat.getResourceStat(resource); ResourceStat resourceStat = flowStat.getResourceStat(resource);
/*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
circuitBreakNum.set(0);*/
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0); resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target); resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target); LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
@@ -333,19 +317,25 @@ public class CircuitBreaker {
} }
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) { public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
correctState(currentTimeWindow, flowStat);
if (stateRef.get() == State.CLOSED) { if (stateRef.get() == State.CLOSED) {
return permitCallInClosedState(currentTimeWindow, flowStat); return permitCallInClosedState(currentTimeWindow, flowStat);
} }
if (stateRef.get() == State.OPEN) { if (stateRef.get() == State.OPEN) {
return permitCallInOpenState(exchange, currentTimeWindow, flowStat);
}
if (stateRef.get() == State.RESUME_DETECTIVE) {
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return false; return false;
} }
if (stateRef.get() == State.RESUME_DETECTIVE) {
if (noProbe.compareAndSet(true, false)) {
exchange.getAttributes().put(DETECT_REQUEST, this);
return true;
} else {
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
return false;
}
}
if (stateRef.get() == State.RESUME_GRADUALLY) { if (stateRef.get() == State.RESUME_GRADUALLY) {
return permitCallInResumeGraduallyState(currentTimeWindow, flowStat); return isResumeTraffic(currentTimeWindow, flowStat);
} }
return true; return true;
} }
@@ -353,13 +343,12 @@ public class CircuitBreaker {
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) { private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
long endTimeWindow = currentTimeWindow + 1000; long endTimeWindow = currentTimeWindow + 1000;
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow); TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
long reqCount = timeWindowStat.getCompReqs(); long reqCount = timeWindowStat.getCompReqs();
long errCount = timeWindowStat.getErrors(); long errCount = timeWindowStat.getErrors();
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) { if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}", LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold); resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -370,7 +359,7 @@ public class CircuitBreaker {
BigDecimal requests = new BigDecimal(reqCount); BigDecimal requests = new BigDecimal(reqCount);
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue(); float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
if (p - errorRatioThreshold >= 0) { if (p - errorRatioThreshold >= 0) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}", LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold); resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -378,54 +367,11 @@ public class CircuitBreaker {
} }
} }
LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime); LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return true; return true;
} }
private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
long stateDuration = getStateDuration(currentTimeWindow);
if (stateDuration > breakDuration) {
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
return true;
}
if (resumeStrategy == ResumeStrategy.DETECTIVE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
exchange.getAttributes().put(DETECT_REQUEST, this);
return true;
}
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
return false;
}
if (resumeStrategy == ResumeStrategy.GRADUAL) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
return isResumeTraffic(currentTimeWindow, flowStat);
}
}
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return false;
}
private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
long stateDuration = getStateDuration(currentTimeWindow);
if (stateDuration > resumeDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
return true;
}
return isResumeTraffic(currentTimeWindow, flowStat);
}
@Override @Override
public String toString() { public String toString() {
return JacksonUtils.writeValueAsString(this); return JacksonUtils.writeValueAsString(this);

View File

@@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
@TestPropertySource("/application.properties") @TestPropertySource("/application.properties")
@@ -108,4 +109,119 @@ public class CircuitBreakManagerTests {
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get()); Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum()); Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
} }
@Test
void detectiveResumeTest() throws InterruptedException {
FlowStat flowStat = new FlowStat(circuitBreakManager);
flowStat.cleanResource = false;
flowStat.createTimeSlotOnlyTraffic = false;
long currentTimeWindow = flowStat.currentTimeSlotId();
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
String service = "xservice";
String path = "ypath";
CircuitBreaker cb = new CircuitBreaker();
cb.service = service;
cb.path = path;
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
cb.monitorDuration = 5 * 1000;
cb.minRequests = 100;
cb.totalErrorThreshold = 10;
cb.breakDuration = 2 * 1000;
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE;
cb.stateStartTime = currentTimeWindow;
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
circuitBreakerMap.put(cb.resource, cb);
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
timeSlot.setCompReqs(200);
timeSlot.setErrors(11);
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertFalse(permit);
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
Thread.sleep(3000);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertTrue(permit);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
Assertions.assertFalse(cb.noProbe.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
Assertions.assertFalse(permit);
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get());
Assertions.assertTrue(permit);
}
@Test
void gradualResumeTest() throws InterruptedException {
FlowStat flowStat = new FlowStat(circuitBreakManager);
flowStat.cleanResource = false;
flowStat.createTimeSlotOnlyTraffic = false;
long currentTimeWindow = flowStat.currentTimeSlotId();
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
String service = "xservice";
String path = "ypath";
CircuitBreaker cb = new CircuitBreaker();
cb.service = service;
cb.path = path;
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
cb.monitorDuration = 5 * 1000;
cb.minRequests = 100;
cb.totalErrorThreshold = 10;
cb.breakDuration = 2 * 1000;
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL;
cb.resumeDuration = 3 * 1000;
cb.stateStartTime = currentTimeWindow;
cb.initGradualResumeTimeWindowContext();
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
circuitBreakerMap.put(cb.resource, cb);
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
timeSlot.setCompReqs(200);
timeSlot.setErrors(11);
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertFalse(permit);
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
Thread.sleep(3000);
Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertTrue(permit);
for (int i = 0; i < 68; i++) {
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
}
Assertions.assertFalse(permit);
}
} }