Merge branch 'develop' into feature/v2.7.3
This commit is contained in:
@@ -56,11 +56,8 @@ import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author Francis Dong
|
||||
@@ -160,6 +157,9 @@ public class AggregateFilter implements WebFilter {
|
||||
clientInput.put("headers", headers);
|
||||
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
|
||||
clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE));
|
||||
Map<String, Object> pathParams = (Map<String, Object>) com.fizzgate.util.ThreadContext.get("pathParams");
|
||||
clientInput.put("pathParams", pathParams == null ? Collections.emptyMap() : pathParams);
|
||||
com.fizzgate.util.ThreadContext.remove("pathParams");
|
||||
|
||||
Mono<AggregateResult> result = null;
|
||||
MediaType contentType = request.getHeaders().getContentType();
|
||||
|
||||
@@ -17,6 +17,25 @@
|
||||
|
||||
package com.fizzgate.filter;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.fizzgate.config.AggregateRedisConfig;
|
||||
import com.fizzgate.plugin.auth.ApiConfig;
|
||||
@@ -33,26 +52,8 @@ import com.fizzgate.util.Consts;
|
||||
import com.fizzgate.util.NettyDataBufferUtils;
|
||||
import com.fizzgate.util.ThreadContext;
|
||||
import com.fizzgate.util.WebUtils;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
@@ -88,6 +89,7 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
@Resource
|
||||
private GatewayGroupService gatewayGroupService;
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
String traceId = WebUtils.getTraceId(exchange);
|
||||
@@ -228,8 +230,8 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
|
||||
if (body != null) {
|
||||
b.append(Consts.S.COMMA);
|
||||
// String bodyStr = body.toString(StandardCharsets.UTF_8);
|
||||
String bodyStr = body.toString();
|
||||
// String bodyStr = body.toString(StandardCharsets.UTF_8);
|
||||
String bodyStr = body.toString();
|
||||
MediaType contentType = req.getHeaders().getContentType();
|
||||
if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) {
|
||||
b.append(_body); b.append(JSON.toJSONString(bodyStr));
|
||||
|
||||
@@ -27,6 +27,7 @@ import com.fizzgate.fizz.input.InputType;
|
||||
import com.fizzgate.util.Consts;
|
||||
import com.fizzgate.util.ReactorUtils;
|
||||
|
||||
import com.fizzgate.util.UrlTransformUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.logging.log4j.ThreadContext;
|
||||
import org.noear.snack.ONode;
|
||||
@@ -50,6 +51,7 @@ import java.lang.ref.SoftReference;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.fizzgate.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
|
||||
import static com.fizzgate.util.Consts.S.FORWARD_SLASH;
|
||||
@@ -388,6 +390,47 @@ public class ConfigLoader {
|
||||
ClientInputConfig cfg = (ClientInputConfig) input.getConfig();
|
||||
return new AggregateResource(pipeline, input);
|
||||
}
|
||||
} else {
|
||||
|
||||
String aggrMethodPath = null;
|
||||
try {
|
||||
for (Map.Entry<String, String> entry : aggregateResources.entrySet()) {
|
||||
aggrMethodPath = entry.getKey();
|
||||
boolean match = UrlTransformUtils.ANT_PATH_MATCHER.match(aggrMethodPath, key);
|
||||
if (match) {
|
||||
String configStr = aggregateResources.get(aggrMethodPath);
|
||||
Input input = createInput(configStr);
|
||||
Pipeline pipeline = createPipeline(configStr);
|
||||
if (pipeline != null && input != null) {
|
||||
Map<String, String> pathVariables = UrlTransformUtils.ANT_PATH_MATCHER.extractUriTemplateVariables(aggrMethodPath, key);
|
||||
Map<String, Object> map = Collections.emptyMap();
|
||||
if (!CollectionUtils.isEmpty(pathVariables)) {
|
||||
map = pathVariables.entrySet().stream().filter(
|
||||
e -> {
|
||||
return e.getKey().indexOf('$') == -1;
|
||||
}
|
||||
)
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
e -> {
|
||||
return (Object) e.getValue();
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
com.fizzgate.util.ThreadContext.set("pathParams", map);
|
||||
return new AggregateResource(pipeline, input);
|
||||
} else {
|
||||
LOGGER.warn("request {} match {}, input {} pipeline {}", key, aggrMethodPath, input, pipeline);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("request {} match {}, create input or pipeline error", key, aggrMethodPath, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -242,6 +242,7 @@ public class Pipeline {
|
||||
inputRequest.put("method", clientInput.get("method"));
|
||||
inputRequest.put("headers", clientInput.get("headers"));
|
||||
inputRequest.put("params", clientInput.get("params"));
|
||||
inputRequest.put("pathParams", clientInput.get("pathParams"));
|
||||
stepContext.addFilePartMap((Map<String, FilePart>) clientInput.get("filePartMap"));
|
||||
|
||||
if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType())
|
||||
|
||||
@@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot;
|
||||
import com.fizzgate.stats.TimeWindowStat;
|
||||
import com.fizzgate.util.JacksonUtils;
|
||||
import com.fizzgate.util.ResourceIdUtils;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -35,8 +34,7 @@ import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
@@ -159,6 +157,8 @@ public class CircuitBreaker {
|
||||
|
||||
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
|
||||
|
||||
public final AtomicBoolean noProbe = new AtomicBoolean(true);
|
||||
|
||||
public long stateStartTime;
|
||||
|
||||
public CircuitBreaker() {
|
||||
@@ -262,7 +262,7 @@ public class CircuitBreaker {
|
||||
}
|
||||
|
||||
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
|
||||
long nThSecond = getStateDuration(currentTimeWindow);
|
||||
long nThSecond = getStateDuration(currentTimeWindow) / 1000;
|
||||
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||
return ctx.permit(resourceStat, currentTimeWindow);
|
||||
@@ -282,9 +282,20 @@ public class CircuitBreaker {
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
|
||||
} else if (s == State.OPEN && stateDuration > breakDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(s, State.CLOSED, currentTimeWindow, flowStat);
|
||||
} else if (resumeStrategy == ResumeStrategy.DETECTIVE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
noProbe.set(true);
|
||||
transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat);
|
||||
} else if (resumeStrategy == ResumeStrategy.GRADUAL) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
|
||||
}
|
||||
|
||||
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
|
||||
@@ -293,36 +304,9 @@ public class CircuitBreaker {
|
||||
}
|
||||
}
|
||||
|
||||
/*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
|
||||
if (stateRef.get() == State.CLOSED) {
|
||||
long endTimeWindow = currentTimeWindow + 1000;
|
||||
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
||||
long reqCount = timeWindowStat.getCompReqs();
|
||||
long errCount = timeWindowStat.getErrors();
|
||||
|
||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
|
||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
|
||||
BigDecimal errors = new BigDecimal(errCount);
|
||||
BigDecimal requests = new BigDecimal(reqCount);
|
||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||
if (p - errorRatioThreshold >= 0) {
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
|
||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
|
||||
if (stateRef.compareAndSet(current, target)) {
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(resource);
|
||||
/*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
|
||||
circuitBreakNum.set(0);*/
|
||||
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
|
||||
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
|
||||
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
|
||||
@@ -333,19 +317,25 @@ public class CircuitBreaker {
|
||||
}
|
||||
|
||||
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
||||
correctState(currentTimeWindow, flowStat);
|
||||
if (stateRef.get() == State.CLOSED) {
|
||||
return permitCallInClosedState(currentTimeWindow, flowStat);
|
||||
}
|
||||
if (stateRef.get() == State.OPEN) {
|
||||
return permitCallInOpenState(exchange, currentTimeWindow, flowStat);
|
||||
}
|
||||
if (stateRef.get() == State.RESUME_DETECTIVE) {
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
return false;
|
||||
}
|
||||
if (stateRef.get() == State.RESUME_DETECTIVE) {
|
||||
if (noProbe.compareAndSet(true, false)) {
|
||||
exchange.getAttributes().put(DETECT_REQUEST, this);
|
||||
return true;
|
||||
} else {
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (stateRef.get() == State.RESUME_GRADUALLY) {
|
||||
return permitCallInResumeGraduallyState(currentTimeWindow, flowStat);
|
||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -353,13 +343,12 @@ public class CircuitBreaker {
|
||||
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
|
||||
|
||||
long endTimeWindow = currentTimeWindow + 1000;
|
||||
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
|
||||
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
|
||||
long reqCount = timeWindowStat.getCompReqs();
|
||||
long errCount = timeWindowStat.getErrors();
|
||||
|
||||
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}",
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request",
|
||||
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
@@ -370,7 +359,7 @@ public class CircuitBreaker {
|
||||
BigDecimal requests = new BigDecimal(reqCount);
|
||||
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
|
||||
if (p - errorRatioThreshold >= 0) {
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}",
|
||||
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request",
|
||||
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
|
||||
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
@@ -378,54 +367,11 @@ public class CircuitBreaker {
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
|
||||
long stateDuration = getStateDuration(currentTimeWindow);
|
||||
if (stateDuration > breakDuration) {
|
||||
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
|
||||
return true;
|
||||
}
|
||||
if (resumeStrategy == ResumeStrategy.DETECTIVE) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
|
||||
exchange.getAttributes().put(DETECT_REQUEST, this);
|
||||
return true;
|
||||
}
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
return false;
|
||||
}
|
||||
if (resumeStrategy == ResumeStrategy.GRADUAL) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
|
||||
transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
|
||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||
}
|
||||
}
|
||||
|
||||
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
|
||||
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
|
||||
long stateDuration = getStateDuration(currentTimeWindow);
|
||||
if (stateDuration > resumeDuration) {
|
||||
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
|
||||
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
|
||||
transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
|
||||
return true;
|
||||
}
|
||||
return isResumeTraffic(currentTimeWindow, flowStat);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JacksonUtils.writeValueAsString(this);
|
||||
|
||||
@@ -101,7 +101,8 @@ public class CallbackServiceTests {
|
||||
)
|
||||
);
|
||||
|
||||
Mono<Void> vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP);
|
||||
// Mono<Void> vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP);
|
||||
Mono<Void> vm = callbackService.requestBackends(exchange, headers, null, callbackConfig, Collections.EMPTY_MAP);
|
||||
vm.subscribe();
|
||||
Thread.sleep(2000);
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@TestPropertySource("/application.properties")
|
||||
@@ -108,4 +109,119 @@ public class CircuitBreakManagerTests {
|
||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
|
||||
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
|
||||
}
|
||||
|
||||
@Test
|
||||
void detectiveResumeTest() throws InterruptedException {
|
||||
FlowStat flowStat = new FlowStat(circuitBreakManager);
|
||||
flowStat.cleanResource = false;
|
||||
flowStat.createTimeSlotOnlyTraffic = false;
|
||||
long currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
|
||||
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
|
||||
String service = "xservice";
|
||||
String path = "ypath";
|
||||
|
||||
CircuitBreaker cb = new CircuitBreaker();
|
||||
cb.service = service;
|
||||
cb.path = path;
|
||||
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||
cb.monitorDuration = 5 * 1000;
|
||||
cb.minRequests = 100;
|
||||
cb.totalErrorThreshold = 10;
|
||||
cb.breakDuration = 2 * 1000;
|
||||
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE;
|
||||
cb.stateStartTime = currentTimeWindow;
|
||||
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
|
||||
circuitBreakerMap.put(cb.resource, cb);
|
||||
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
|
||||
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||
timeSlot.setCompReqs(200);
|
||||
timeSlot.setErrors(11);
|
||||
|
||||
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertFalse(permit);
|
||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
|
||||
Thread.sleep(3000);
|
||||
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||
|
||||
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertTrue(permit);
|
||||
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||
Assertions.assertFalse(cb.noProbe.get());
|
||||
|
||||
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
|
||||
Assertions.assertFalse(permit);
|
||||
|
||||
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success
|
||||
|
||||
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get());
|
||||
Assertions.assertTrue(permit);
|
||||
}
|
||||
|
||||
@Test
|
||||
void gradualResumeTest() throws InterruptedException {
|
||||
FlowStat flowStat = new FlowStat(circuitBreakManager);
|
||||
flowStat.cleanResource = false;
|
||||
flowStat.createTimeSlotOnlyTraffic = false;
|
||||
long currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
|
||||
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
|
||||
String service = "xservice";
|
||||
String path = "ypath";
|
||||
|
||||
CircuitBreaker cb = new CircuitBreaker();
|
||||
cb.service = service;
|
||||
cb.path = path;
|
||||
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
|
||||
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
|
||||
cb.monitorDuration = 5 * 1000;
|
||||
cb.minRequests = 100;
|
||||
cb.totalErrorThreshold = 10;
|
||||
cb.breakDuration = 2 * 1000;
|
||||
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL;
|
||||
cb.resumeDuration = 3 * 1000;
|
||||
cb.stateStartTime = currentTimeWindow;
|
||||
cb.initGradualResumeTimeWindowContext();
|
||||
|
||||
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
|
||||
circuitBreakerMap.put(cb.resource, cb);
|
||||
|
||||
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
|
||||
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
|
||||
timeSlot.setCompReqs(200);
|
||||
timeSlot.setErrors(11);
|
||||
|
||||
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertFalse(permit);
|
||||
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
|
||||
Thread.sleep(3000);
|
||||
Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get());
|
||||
|
||||
currentTimeWindow = flowStat.currentTimeSlotId();
|
||||
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
|
||||
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
Assertions.assertTrue(permit);
|
||||
for (int i = 0; i < 68; i++) {
|
||||
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
|
||||
}
|
||||
Assertions.assertFalse(permit);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user