diff --git a/README.en-us.md b/README.en-us.md
index 8596284..e8d4f44 100644
--- a/README.en-us.md
+++ b/README.en-us.md
@@ -4,7 +4,7 @@ English | [简体中文](./README.md)
-
+
@@ -107,10 +107,8 @@ Starting from v1.3.0, the frontend and backend of the management backend are mer
|------------------------|---------------------------|
| v1.3.0 | v1.3.0 |
| ... | ... |
-| v2.6.6 | v2.6.6 |
-| v2.7.0 | v2.7.0 |
-| v2.7.1 | v2.7.1 |
| v2.7.2 | v2.7.2 |
+| v2.7.3 | v2.7.3 |
Please download the corresponding management backend version according to the version of the community version
diff --git a/README.md b/README.md
index 92459f3..6f4a840 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
-
+
@@ -115,33 +115,9 @@ API地址:https://demo.fizzgate.com/proxy/[服务名]/[API_Path]
| fizz-gateway-node | fizz-manager-professional |
|------------------------|---------------------------|
| v1.3.0 | v1.3.0 |
-| v1.4.0 | v1.4.0 |
-| v1.4.1 | v1.4.1 |
-| v1.5.0 | v1.5.0 |
-| v1.5.1 | v1.5.1 |
-| v2.0.0 | v2.0.0 |
-| v2.1.0 | v2.1.0 |
-| v2.2.0 | v2.2.0 |
-| v2.2.1 | v2.2.1 |
-| v2.2.3 | v2.2.3 |
-| v2.3.0 | v2.3.0 |
-| v2.3.2 | v2.3.2 |
-| v2.3.3 | v2.3.3 |
-| v2.4.0 | v2.4.0 |
-| v2.4.1 | v2.4.1 |
-| v2.5.0 | v2.5.0 |
-| v2.5.1 | v2.5.1 |
-| v2.5.2 | v2.5.2 |
-| v2.6.0 | v2.6.0 |
-| v2.6.1 | v2.6.1 |
-| v2.6.2 | v2.6.2 |
-| v2.6.3 | v2.6.3 |
-| v2.6.4 | v2.6.4 |
-| v2.6.5 | v2.6.5 |
-| v2.6.6 | v2.6.6 |
-| v2.7.0 | v2.7.0 |
-| v2.7.1 | v2.7.1 |
+| ... | ... |
| v2.7.2 | v2.7.2 |
+| v2.7.3 | v2.7.3 |
请根据节点端的版本下载对应的管理后台版本
diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml
index 7e46f76..45d2dcd 100644
--- a/fizz-bootstrap/pom.xml
+++ b/fizz-bootstrap/pom.xml
@@ -6,7 +6,7 @@
fizz-gateway-community
com.fizzgate
- 2.7.2
+ 2.7.3
../pom.xml
@@ -14,11 +14,11 @@
diff --git a/fizz-bootstrap/src/main/resources/application.yml b/fizz-bootstrap/src/main/resources/application.yml
index 9bd105e..28f67c1 100644
--- a/fizz-bootstrap/src/main/resources/application.yml
+++ b/fizz-bootstrap/src/main/resources/application.yml
@@ -93,7 +93,7 @@ flow-stat-sched:
queue: fizz_resource_access_stat
gateway:
- prefix: /proxy
+ prefix: /
aggr:
# set headers when calling the backend API
proxy_set_headers: X-Real-IP,X-Forwarded-Proto,X-Forwarded-For
diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml
index b109203..84c2ad7 100644
--- a/fizz-common/pom.xml
+++ b/fizz-common/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.7.2
+ 2.7.3
../pom.xml
4.0.0
diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml
index d35ef94..5e8d49b 100644
--- a/fizz-core/pom.xml
+++ b/fizz-core/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.7.2
+ 2.7.3
../pom.xml
4.0.0
diff --git a/fizz-core/src/main/java/com/fizzgate/filter/AggregateFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/AggregateFilter.java
index 96a73fc..9120834 100644
--- a/fizz-core/src/main/java/com/fizzgate/filter/AggregateFilter.java
+++ b/fizz-core/src/main/java/com/fizzgate/filter/AggregateFilter.java
@@ -56,11 +56,8 @@ import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
/**
* @author Francis Dong
@@ -160,6 +157,9 @@ public class AggregateFilter implements WebFilter {
clientInput.put("headers", headers);
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE));
+ Map pathParams = (Map) com.fizzgate.util.ThreadContext.get("pathParams");
+ clientInput.put("pathParams", pathParams == null ? Collections.emptyMap() : pathParams);
+ com.fizzgate.util.ThreadContext.remove("pathParams");
Mono result = null;
MediaType contentType = request.getHeaders().getContentType();
diff --git a/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java
index 212f9ed..206e81d 100644
--- a/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java
+++ b/fizz-core/src/main/java/com/fizzgate/filter/CallbackFilter.java
@@ -17,6 +17,25 @@
package com.fizzgate.filter;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Resource;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilterChain;
+
import com.alibaba.fastjson.JSON;
import com.fizzgate.config.AggregateRedisConfig;
import com.fizzgate.plugin.auth.ApiConfig;
@@ -27,33 +46,14 @@ import com.fizzgate.proxy.CallbackService;
import com.fizzgate.proxy.DiscoveryClientUriSelector;
import com.fizzgate.proxy.ServiceInstance;
import com.fizzgate.service_registry.RegistryCenterService;
+import com.fizzgate.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
+import com.fizzgate.spring.web.server.ext.FizzServerWebExchangeDecorator;
import com.fizzgate.util.Consts;
import com.fizzgate.util.NettyDataBufferUtils;
import com.fizzgate.util.ThreadContext;
import com.fizzgate.util.WebUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.annotation.Order;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.core.io.buffer.PooledDataBuffer;
-import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.stereotype.Component;
-import org.springframework.web.server.ServerWebExchange;
-import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
-import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-
/**
* @author hongqiaowei
*/
@@ -89,34 +89,49 @@ public class CallbackFilter extends FizzWebFilter {
@Resource
private GatewayGroupService gatewayGroupService;
+
@Override
- public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
+ public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
+ String traceId = WebUtils.getTraceId(exchange);
+ org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
- FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
- if (!pfr.success) {
- return WebUtils.getDirectResponse(exchange);
+ ServerHttpRequest req = exchange.getRequest();
+ if (req instanceof FizzServerHttpRequestDecorator) {
+ return doFilter(exchange, chain);
}
-
+ return
+ NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
+ .flatMap(
+ body -> {
+ FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
+ if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ try {
+ requestDecorator.setBody(body);
+ } finally {
+ NettyDataBufferUtils.release(body);
+ }
+ }
+ ServerWebExchange mutatedExchange = exchange.mutate().request(requestDecorator).build();
+ ServerWebExchange newExchange = mutatedExchange;
+ MediaType contentType = req.getHeaders().getContentType();
+ if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
+ newExchange = new FizzServerWebExchangeDecorator(mutatedExchange);
+ }
+ return doFilter(newExchange, chain);
+ }
+ );
+ }
+
+ public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
+ String traceId = WebUtils.getTraceId(exchange);
+ org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
+
ApiConfig ac = WebUtils.getApiConfig(exchange);
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
CallbackConfig cc = ac.callbackConfig;
- ServerHttpRequest req = exchange.getRequest();
- return
- DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
- .flatMap(
- b -> {
- DataBuffer body = null;
- if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
- if (b instanceof PooledDataBuffer) {
- try {
- body = NettyDataBufferUtils.copy2heap(b);
- } finally {
- NettyDataBufferUtils.release(b);
- }
- } else {
- body = b;
- }
- }
+ FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
+ return req.getBody().defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).single().flatMap(b -> {
+ String body = b.toString(StandardCharsets.UTF_8);
HashMap service2instMap = getService2instMap(ac);
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
@@ -175,7 +190,7 @@ public class CallbackFilter extends FizzWebFilter {
private static final String _receivers = "\"receivers\":";
private static final String _gatewayGroup = "\"gatewayGroup\":";
- private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, HashMap service2instMap, int callbackConfigId,
+ private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, Object body, HashMap service2instMap, int callbackConfigId,
String gatewayGroup) {
ServerHttpRequest req = exchange.getRequest();
@@ -215,7 +230,8 @@ public class CallbackFilter extends FizzWebFilter {
if (body != null) {
b.append(Consts.S.COMMA);
- String bodyStr = body.toString(StandardCharsets.UTF_8);
+// String bodyStr = body.toString(StandardCharsets.UTF_8);
+ String bodyStr = body.toString();
MediaType contentType = req.getHeaders().getContentType();
if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) {
b.append(_body); b.append(JSON.toJSONString(bodyStr));
diff --git a/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java b/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java
index 8eb7bd3..e68843a 100644
--- a/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java
+++ b/fizz-core/src/main/java/com/fizzgate/filter/FlowControlFilter.java
@@ -360,7 +360,7 @@ public class FlowControlFilter extends FizzWebFilter {
if (hasHost) {
// String resourceId = ResourceIdUtils.buildResourceId(app, ip, node, service, path);
String resourceId = ResourceIdUtils.buildResourceId(null, null, node, null, null);
- ResourceConfig resourceConfig = new ResourceConfig(resourceId, 0, 0);
+ ResourceConfig resourceConfig = new ResourceConfig(resourceId, -1L, -1L);
resourceConfigs.add(resourceConfig);
}
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceIdUtils.NODE, null, null, null);
@@ -407,11 +407,11 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
String node = ResourceIdUtils.getNode(resource);
if (node != null && node.equals(ResourceIdUtils.NODE)) {
- rc = new ResourceConfig(resource, 0, 0);
+ rc = new ResourceConfig(resource, -1L, -1L);
}
if (defaultRateLimitConfigId != null) {
if (defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) {
- rc = new ResourceConfig(resource, 0, 0);
+ rc = new ResourceConfig(resource, -1L, -1L);
rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
rc.setMaxCon(rateLimitConfig.concurrents);
@@ -441,7 +441,7 @@ public class FlowControlFilter extends FizzWebFilter {
}
}*/
if (cb != null) {
- rc = new ResourceConfig(resource, 0, 0);
+ rc = new ResourceConfig(resource, -1L, -1L);
resourceConfigs.add(rc);
}
}
@@ -508,7 +508,7 @@ public class FlowControlFilter extends FizzWebFilter {
private void something4(List resourceConfigs, String app, String ip, String service) {
String r = ResourceIdUtils.buildResourceId(app, ip, null, service, null);
- ResourceConfig rc = new ResourceConfig(r, 0, 0);
+ ResourceConfig rc = new ResourceConfig(r, -1L, -1L);
resourceConfigs.add(rc);
}
diff --git a/fizz-core/src/main/java/com/fizzgate/fizz/ConfigLoader.java b/fizz-core/src/main/java/com/fizzgate/fizz/ConfigLoader.java
index 54565a1..7cf7d1a 100644
--- a/fizz-core/src/main/java/com/fizzgate/fizz/ConfigLoader.java
+++ b/fizz-core/src/main/java/com/fizzgate/fizz/ConfigLoader.java
@@ -27,6 +27,7 @@ import com.fizzgate.fizz.input.InputType;
import com.fizzgate.util.Consts;
import com.fizzgate.util.ReactorUtils;
+import com.fizzgate.util.UrlTransformUtils;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
@@ -50,6 +51,7 @@ import java.lang.ref.SoftReference;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import static com.fizzgate.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static com.fizzgate.util.Consts.S.FORWARD_SLASH;
@@ -388,6 +390,47 @@ public class ConfigLoader {
ClientInputConfig cfg = (ClientInputConfig) input.getConfig();
return new AggregateResource(pipeline, input);
}
+ } else {
+
+ String aggrMethodPath = null;
+ try {
+ for (Map.Entry entry : aggregateResources.entrySet()) {
+ aggrMethodPath = entry.getKey();
+ boolean match = UrlTransformUtils.ANT_PATH_MATCHER.match(aggrMethodPath, key);
+ if (match) {
+ String configStr = aggregateResources.get(aggrMethodPath);
+ Input input = createInput(configStr);
+ Pipeline pipeline = createPipeline(configStr);
+ if (pipeline != null && input != null) {
+ Map pathVariables = UrlTransformUtils.ANT_PATH_MATCHER.extractUriTemplateVariables(aggrMethodPath, key);
+ Map map = Collections.emptyMap();
+ if (!CollectionUtils.isEmpty(pathVariables)) {
+ map = pathVariables.entrySet().stream().filter(
+ e -> {
+ return e.getKey().indexOf('$') == -1;
+ }
+ )
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e -> {
+ return (Object) e.getValue();
+ }
+ )
+ );
+ }
+ com.fizzgate.util.ThreadContext.set("pathParams", map);
+ return new AggregateResource(pipeline, input);
+ } else {
+ LOGGER.warn("request {} match {}, input {} pipeline {}", key, aggrMethodPath, input, pipeline);
+ return null;
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("request {} match {}, create input or pipeline error", key, aggrMethodPath, e);
+ return null;
+ }
}
return null;
}
diff --git a/fizz-core/src/main/java/com/fizzgate/fizz/Pipeline.java b/fizz-core/src/main/java/com/fizzgate/fizz/Pipeline.java
index db5e4c9..b7bf643 100644
--- a/fizz-core/src/main/java/com/fizzgate/fizz/Pipeline.java
+++ b/fizz-core/src/main/java/com/fizzgate/fizz/Pipeline.java
@@ -242,6 +242,7 @@ public class Pipeline {
inputRequest.put("method", clientInput.get("method"));
inputRequest.put("headers", clientInput.get("headers"));
inputRequest.put("params", clientInput.get("params"));
+ inputRequest.put("pathParams", clientInput.get("pathParams"));
stepContext.addFilePartMap((Map) clientInput.get("filePartMap"));
if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType())
diff --git a/fizz-core/src/main/java/com/fizzgate/fizz/function/CommonFunc.java b/fizz-core/src/main/java/com/fizzgate/fizz/function/CommonFunc.java
index 9b59ffc..4ce0827 100644
--- a/fizz-core/src/main/java/com/fizzgate/fizz/function/CommonFunc.java
+++ b/fizz-core/src/main/java/com/fizzgate/fizz/function/CommonFunc.java
@@ -18,7 +18,10 @@
package com.fizzgate.fizz.function;
import java.lang.reflect.Array;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -65,6 +68,8 @@ public class CommonFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "common.and", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.or", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.not", this);
+ FuncExecutor.register(NAME_SPACE_PREFIX + "common.emptyMap", this);
+ FuncExecutor.register(NAME_SPACE_PREFIX + "common.emptyList", this);
}
/**
@@ -174,4 +179,22 @@ public class CommonFunc implements IFunc {
return !(obj == null ? false : obj);
}
+ /**
+ * Return an empty map
+ *
+ * @return
+ */
+ public Map emptyMap() {
+ return new HashMap<>();
+ }
+
+ /**
+ * Return an empty list
+ *
+ * @return
+ */
+ public List> emptyList() {
+ return new ArrayList<>();
+ }
+
}
diff --git a/fizz-core/src/main/java/com/fizzgate/fizz/function/FuncExecutor.java b/fizz-core/src/main/java/com/fizzgate/fizz/function/FuncExecutor.java
index 4292196..e2e5fab 100644
--- a/fizz-core/src/main/java/com/fizzgate/fizz/function/FuncExecutor.java
+++ b/fizz-core/src/main/java/com/fizzgate/fizz/function/FuncExecutor.java
@@ -312,6 +312,26 @@ public class FuncExecutor {
argsStrContainer = this.trimArgStr(argsStrContainer, 5, isVarArgs, paramTypes.length, funcExpression);
argsStr = argsStrContainer.getArgsStr();
i = argsStrContainer.getIndex();
+ } else if (argsStr.matches("^\\[\\]\\s*,.*") || argsStr.matches("^\\[\\]\\s*\\).*")) { // []
+ if (isVarArgs && i == paramTypes.length - 1) {
+ varArgs.add(new ArrayList());
+ args[i] = varArgs.toArray(new ArrayList[varArgs.size()]);
+ } else {
+ args[i] = new ArrayList();
+ }
+ argsStrContainer = this.trimArgStr(argsStrContainer, 2, isVarArgs, paramTypes.length, funcExpression);
+ argsStr = argsStrContainer.getArgsStr();
+ i = argsStrContainer.getIndex();
+ } else if (argsStr.matches("^\\{\\}\\s*,.*") || argsStr.matches("^\\{\\}\\s*\\).*")) { // {}
+ if (isVarArgs && i == paramTypes.length - 1) {
+ varArgs.add(new HashMap());
+ args[i] = varArgs.toArray(new HashMap[varArgs.size()]);
+ } else {
+ args[i] = new HashMap();
+ }
+ argsStrContainer = this.trimArgStr(argsStrContainer, 2, isVarArgs, paramTypes.length, funcExpression);
+ argsStr = argsStrContainer.getArgsStr();
+ i = argsStrContainer.getIndex();
} else if (argsStr.startsWith("{")) { // reference value
int pos = argsStr.indexOf("}", 1);
if (pos != -1) {
diff --git a/fizz-core/src/main/java/com/fizzgate/fizz/function/ListFunc.java b/fizz-core/src/main/java/com/fizzgate/fizz/function/ListFunc.java
index 7c41a49..2ceeb67 100644
--- a/fizz-core/src/main/java/com/fizzgate/fizz/function/ListFunc.java
+++ b/fizz-core/src/main/java/com/fizzgate/fizz/function/ListFunc.java
@@ -62,6 +62,7 @@ public class ListFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "list.join", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.rename", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.removeFields", this);
+ FuncExecutor.register(NAME_SPACE_PREFIX + "list.emptyList", this);
}
/**
@@ -227,5 +228,14 @@ public class ListFunc implements IFunc {
}
return data;
}
+
+ /**
+ * Return an empty list
+ *
+ * @return
+ */
+ public List> emptyList() {
+ return new ArrayList<>();
+ }
}
diff --git a/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java b/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java
index dd14621..5440a98 100644
--- a/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java
+++ b/fizz-core/src/main/java/com/fizzgate/proxy/CallbackService.java
@@ -19,7 +19,6 @@ package com.fizzgate.proxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -81,7 +80,7 @@ public class CallbackService {
aggrConfigPrefix = systemConfig.getGatewayPrefix() + '/';
}
- public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map service2instMap) {
+ public Mono requestBackends(ServerWebExchange exchange, HttpHeaders headers, String body, CallbackConfig cc, Map service2instMap) {
ServerHttpRequest req = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange);
HttpMethod method = req.getMethod();
@@ -140,21 +139,21 @@ public class CallbackService {
;
}
- private Function> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) {
+ private Function> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) {
return t -> {
log(exchange, r, method, headers, body, t);
return Mono.just(new FizzFailClientResponse(t));
};
}
- private Function> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) {
+ private Function> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) {
return t -> {
log(exchange, r, method, headers, body, t);
return Mono.just(new FailAggregateResult(t));
};
}
- private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body, Throwable t) {
+ private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body, Throwable t) {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);
diff --git a/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java b/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java
index 919177a..b87c81d 100644
--- a/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java
+++ b/fizz-core/src/main/java/com/fizzgate/stats/FlowStat.java
@@ -161,10 +161,10 @@ public class FlowStat {
for (ResourceConfig resourceConfig : resourceConfigs) {
long maxCon = resourceConfig.getMaxCon();
long maxQPS = resourceConfig.getMaxQPS();
- if (maxCon > 0 || maxQPS > 0) {
+ if (maxCon >= 0 || maxQPS >= 0) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
// check concurrent request
- if (maxCon > 0) {
+ if (maxCon >= 0) {
long n = resourceStat.getConcurrentRequests().get();
if (n >= maxCon) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
@@ -184,7 +184,7 @@ public class FlowStat {
}
// check QPS
- if (maxQPS > 0) {
+ if (maxQPS >= 0) {
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter();
if (total >= maxQPS) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
diff --git a/fizz-core/src/main/java/com/fizzgate/stats/ResourceConfig.java b/fizz-core/src/main/java/com/fizzgate/stats/ResourceConfig.java
index 8c2b22f..651a185 100644
--- a/fizz-core/src/main/java/com/fizzgate/stats/ResourceConfig.java
+++ b/fizz-core/src/main/java/com/fizzgate/stats/ResourceConfig.java
@@ -46,14 +46,14 @@ public class ResourceConfig {
// Flow control rule
//---------------------------------------------------------------------
/**
- * Maximum concurrent request, zero or negative for no limit
+ * Maximum concurrent request, negative for no limit
*/
- private long maxCon;
+ private long maxCon = -1L;
/**
- * Maximum QPS, zero or negative for no limit
+ * Maximum QPS, negative for no limit
*/
- private long maxQPS;
+ private long maxQPS = -1L;
//---------------------------------------------------------------------
diff --git a/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java b/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java
index 839a8a2..4416b96 100644
--- a/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java
+++ b/fizz-core/src/main/java/com/fizzgate/stats/circuitbreaker/CircuitBreaker.java
@@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot;
import com.fizzgate.stats.TimeWindowStat;
import com.fizzgate.util.JacksonUtils;
import com.fizzgate.util.ResourceIdUtils;
-
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,8 +34,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -159,6 +157,8 @@ public class CircuitBreaker {
public final AtomicReference stateRef = new AtomicReference<>(State.CLOSED);
+ public final AtomicBoolean noProbe = new AtomicBoolean(true);
+
public long stateStartTime;
public CircuitBreaker() {
@@ -262,7 +262,7 @@ public class CircuitBreaker {
}
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
- long nThSecond = getStateDuration(currentTimeWindow);
+ long nThSecond = getStateDuration(currentTimeWindow) / 1000;
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
ResourceStat resourceStat = flowStat.getResourceStat(resource);
return ctx.permit(resourceStat, currentTimeWindow);
@@ -282,9 +282,20 @@ public class CircuitBreaker {
transit(s, State.CLOSED, currentTimeWindow, flowStat);
} else if (s == State.OPEN && stateDuration > breakDuration) {
- LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
- currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
- transit(s, State.CLOSED, currentTimeWindow, flowStat);
+ if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
+ LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
+ currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
+ transit(s, State.CLOSED, currentTimeWindow, flowStat);
+ } else if (resumeStrategy == ResumeStrategy.DETECTIVE) {
+ LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
+ currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
+ noProbe.set(true);
+ transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat);
+ } else if (resumeStrategy == ResumeStrategy.GRADUAL) {
+ LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
+ currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
+ transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
+ }
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
@@ -293,36 +304,9 @@ public class CircuitBreaker {
}
}
- /*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
- if (stateRef.get() == State.CLOSED) {
- long endTimeWindow = currentTimeWindow + 1000;
- // TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
- TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
- long reqCount = timeWindowStat.getCompReqs();
- long errCount = timeWindowStat.getErrors();
-
- if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
- LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
- resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
- transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
- } else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
- BigDecimal errors = new BigDecimal(errCount);
- BigDecimal requests = new BigDecimal(reqCount);
- float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
- if (p - errorRatioThreshold >= 0) {
- LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
- resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
- transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
- }
- }
- }
- }*/
-
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
if (stateRef.compareAndSet(current, target)) {
ResourceStat resourceStat = flowStat.getResourceStat(resource);
- /*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
- circuitBreakNum.set(0);*/
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
@@ -333,19 +317,25 @@ public class CircuitBreaker {
}
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
- correctState(currentTimeWindow, flowStat);
if (stateRef.get() == State.CLOSED) {
return permitCallInClosedState(currentTimeWindow, flowStat);
}
if (stateRef.get() == State.OPEN) {
- return permitCallInOpenState(exchange, currentTimeWindow, flowStat);
- }
- if (stateRef.get() == State.RESUME_DETECTIVE) {
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
+ LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return false;
}
+ if (stateRef.get() == State.RESUME_DETECTIVE) {
+ if (noProbe.compareAndSet(true, false)) {
+ exchange.getAttributes().put(DETECT_REQUEST, this);
+ return true;
+ } else {
+ flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
+ return false;
+ }
+ }
if (stateRef.get() == State.RESUME_GRADUALLY) {
- return permitCallInResumeGraduallyState(currentTimeWindow, flowStat);
+ return isResumeTraffic(currentTimeWindow, flowStat);
}
return true;
}
@@ -353,13 +343,12 @@ public class CircuitBreaker {
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
long endTimeWindow = currentTimeWindow + 1000;
- // TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
long reqCount = timeWindowStat.getCompReqs();
long errCount = timeWindowStat.getErrors();
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
- LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}",
+ LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -370,7 +359,7 @@ public class CircuitBreaker {
BigDecimal requests = new BigDecimal(reqCount);
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
if (p - errorRatioThreshold >= 0) {
- LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}",
+ LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -378,54 +367,11 @@ public class CircuitBreaker {
}
}
- LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
+ LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return true;
}
- private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
- long stateDuration = getStateDuration(currentTimeWindow);
- if (stateDuration > breakDuration) {
- if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
- LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
- currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
- transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
- return true;
- }
- if (resumeStrategy == ResumeStrategy.DETECTIVE) {
- LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
- currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
- if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
- exchange.getAttributes().put(DETECT_REQUEST, this);
- return true;
- }
- flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
- return false;
- }
- if (resumeStrategy == ResumeStrategy.GRADUAL) {
- LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
- currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
- transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
- return isResumeTraffic(currentTimeWindow, flowStat);
- }
- }
-
- flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
- LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
- return false;
- }
-
- private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
- long stateDuration = getStateDuration(currentTimeWindow);
- if (stateDuration > resumeDuration) {
- LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
- currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
- transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
- return true;
- }
- return isResumeTraffic(currentTimeWindow, flowStat);
- }
-
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
diff --git a/fizz-core/src/main/java/com/fizzgate/stats/ratelimit/ResourceRateLimitConfig.java b/fizz-core/src/main/java/com/fizzgate/stats/ratelimit/ResourceRateLimitConfig.java
index 9eaf39b..24453e1 100644
--- a/fizz-core/src/main/java/com/fizzgate/stats/ratelimit/ResourceRateLimitConfig.java
+++ b/fizz-core/src/main/java/com/fizzgate/stats/ratelimit/ResourceRateLimitConfig.java
@@ -63,9 +63,9 @@ public class ResourceRateLimitConfig {
public byte type;
- public long qps;
+ public long qps = -1L;
- public long concurrents;
+ public long concurrents = -1L;
public String responseType;
diff --git a/fizz-core/src/test/java/com/fizzgate/fizz/function/CommonFuncTests.java b/fizz-core/src/test/java/com/fizzgate/fizz/function/CommonFuncTests.java
index a195e6a..339fc35 100644
--- a/fizz-core/src/test/java/com/fizzgate/fizz/function/CommonFuncTests.java
+++ b/fizz-core/src/test/java/com/fizzgate/fizz/function/CommonFuncTests.java
@@ -16,6 +16,7 @@
*/
package com.fizzgate.fizz.function;
+import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
@@ -116,6 +117,46 @@ class CommonFuncTests {
assertEquals(true, result);
}
+ @Test
+ void testIif6() {
+ // test []
+ String funcExpression = "fn.common.iif(false, \"abc\", [])";
+ List result = (List)FuncExecutor.getInstance().exec(null, funcExpression);
+ assertEquals(true, result.size() == 0);
+ }
+
+ @Test
+ void testIif7() {
+ // test {}
+ String funcExpression = "fn.common.iif(false, \"abc\", {})";
+ Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
+ assertEquals(true, result.size() == 0);
+ }
+
+ @Test
+ void testIif8() {
+ // test []
+ String funcExpression = "fn.common.iif(false, \"abc\", fn.common.emptyList())";
+ List result = (List)FuncExecutor.getInstance().exec(null, funcExpression);
+ assertEquals(true, result.size() == 0);
+ }
+
+ @Test
+ void testIif9() {
+ // test {}
+ String funcExpression = "fn.common.iif(false, \"abc\", fn.common.emptyMap())";
+ Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
+ assertEquals(true, result.size() == 0);
+ }
+
+ @Test
+ void testIif10() {
+ // test null
+ String funcExpression = "fn.common.iif(false, \"abc\", null)";
+ Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
+ assertNull(result);
+ }
+
@Test
void testEquals() {
String funcExpression = "fn.common.equals(\"abc\", true)";
diff --git a/fizz-core/src/test/java/com/fizzgate/fizz/function/ListFuncTests.java b/fizz-core/src/test/java/com/fizzgate/fizz/function/ListFuncTests.java
index 81957fa..c2be563 100644
--- a/fizz-core/src/test/java/com/fizzgate/fizz/function/ListFuncTests.java
+++ b/fizz-core/src/test/java/com/fizzgate/fizz/function/ListFuncTests.java
@@ -106,6 +106,29 @@ class ListFuncTests {
assertEquals("a2", ((Map) result.get(1)).get("a").toString());
assertEquals("a4", ((Map) result.get(3)).get("a").toString());
}
+
+ @Test
+ void testMerge2() {
+ List subList1 = new ArrayList<>();
+ subList1.add(createRecord("a", "a1"));
+ subList1.add(createRecord("a", "a2"));
+ subList1.add(createRecord("a", "a3"));
+
+ List subList2 = new ArrayList<>();
+ subList2.add(createRecord("a", "a4"));
+ subList2.add(createRecord("a", "a5"));
+ subList2.add(createRecord("a", "a6"));
+
+ ONode ctxNode = ONode.load(new HashMap());
+ PathMapping.setByPath(ctxNode, "test.data1", subList1, true);
+ PathMapping.setByPath(ctxNode, "test.data2", subList2, true);
+
+ String funcExpression = "fn.list.merge({test.data1}, [] , {test.data2})";
+ List result = (List) FuncExecutor.getInstance().exec(ctxNode, funcExpression);
+ assertEquals(6, result.size());
+ assertEquals("a2", ((Map) result.get(1)).get("a").toString());
+ assertEquals("a4", ((Map) result.get(3)).get("a").toString());
+ }
@Test
void testExtract() {
diff --git a/fizz-core/src/test/java/com/fizzgate/proxy/CallbackServiceTests.java b/fizz-core/src/test/java/com/fizzgate/proxy/CallbackServiceTests.java
index b0e16e4..a6e7e04 100644
--- a/fizz-core/src/test/java/com/fizzgate/proxy/CallbackServiceTests.java
+++ b/fizz-core/src/test/java/com/fizzgate/proxy/CallbackServiceTests.java
@@ -23,6 +23,7 @@ import com.fizzgate.util.ReflectionUtils;
import reactor.core.publisher.Mono;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@@ -62,7 +63,7 @@ public class CallbackServiceTests {
HttpHeaders headers = new HttpHeaders();
headers.add("h1", "v1");
- DataBuffer body = null;
+ String body = null;
CallbackConfig callbackConfig = new CallbackConfig();
callbackConfig.receivers = new ArrayList<>();
@@ -100,7 +101,8 @@ public class CallbackServiceTests {
)
);
- Mono vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP);
+ // Mono vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP);
+ Mono vm = callbackService.requestBackends(exchange, headers, null, callbackConfig, Collections.EMPTY_MAP);
vm.subscribe();
Thread.sleep(2000);
diff --git a/fizz-core/src/test/java/com/fizzgate/stats/FlowStatTests.java b/fizz-core/src/test/java/com/fizzgate/stats/FlowStatTests.java
index 0b797cc..95ac803 100644
--- a/fizz-core/src/test/java/com/fizzgate/stats/FlowStatTests.java
+++ b/fizz-core/src/test/java/com/fizzgate/stats/FlowStatTests.java
@@ -115,8 +115,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c5 = new FlowRuleCase();
- c5.resourceConfigs.add(new ResourceConfig("_global5", 0, 0));
- c5.resourceConfigs.add(new ResourceConfig("service5", 0, 0));
+ c5.resourceConfigs.add(new ResourceConfig("_global5", -1L, -1L));
+ c5.resourceConfigs.add(new ResourceConfig("service5", -1L, -1L));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.expectResult = IncrRequestResult.success();
@@ -124,8 +124,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c6 = new FlowRuleCase();
- c6.resourceConfigs.add(new ResourceConfig("_global6", 20, 0));
- c6.resourceConfigs.add(new ResourceConfig("service6", 20, 0));
+ c6.resourceConfigs.add(new ResourceConfig("_global6", 20, -1L));
+ c6.resourceConfigs.add(new ResourceConfig("service6", 20, -1L));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST);
@@ -133,8 +133,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c7 = new FlowRuleCase();
- c7.resourceConfigs.add(new ResourceConfig("_global7", 0, 0));
- c7.resourceConfigs.add(new ResourceConfig("service7", 0, 20));
+ c7.resourceConfigs.add(new ResourceConfig("_global7", -1L, -1L));
+ c7.resourceConfigs.add(new ResourceConfig("service7", -1L, 20));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20));
c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS);
diff --git a/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java b/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java
index 812a562..e5b0685 100644
--- a/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java
+++ b/fizz-core/src/test/java/com/fizzgate/stats/circuitbreaker/CircuitBreakManagerTests.java
@@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import javax.annotation.Resource;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@TestPropertySource("/application.properties")
@@ -108,4 +109,119 @@ public class CircuitBreakManagerTests {
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
}
+
+ @Test
+ void detectiveResumeTest() throws InterruptedException {
+ FlowStat flowStat = new FlowStat(circuitBreakManager);
+ flowStat.cleanResource = false;
+ flowStat.createTimeSlotOnlyTraffic = false;
+ long currentTimeWindow = flowStat.currentTimeSlotId();
+
+ MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+
+ String service = "xservice";
+ String path = "ypath";
+
+ CircuitBreaker cb = new CircuitBreaker();
+ cb.service = service;
+ cb.path = path;
+ cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
+ cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
+ cb.monitorDuration = 5 * 1000;
+ cb.minRequests = 100;
+ cb.totalErrorThreshold = 10;
+ cb.breakDuration = 2 * 1000;
+ cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE;
+ cb.stateStartTime = currentTimeWindow;
+ Map circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
+ circuitBreakerMap.put(cb.resource, cb);
+
+ ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
+ TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
+ timeSlot.setCompReqs(200);
+ timeSlot.setErrors(11);
+
+ boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertFalse(permit);
+ Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
+ Thread.sleep(3000);
+ Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
+
+ currentTimeWindow = flowStat.currentTimeSlotId();
+ mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+ permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertTrue(permit);
+ Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
+ Assertions.assertFalse(cb.noProbe.get());
+
+ currentTimeWindow = flowStat.currentTimeSlotId();
+ mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+ permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
+ Assertions.assertFalse(permit);
+
+ cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success
+
+ currentTimeWindow = flowStat.currentTimeSlotId();
+ mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+ permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get());
+ Assertions.assertTrue(permit);
+ }
+
+ @Test
+ void gradualResumeTest() throws InterruptedException {
+ FlowStat flowStat = new FlowStat(circuitBreakManager);
+ flowStat.cleanResource = false;
+ flowStat.createTimeSlotOnlyTraffic = false;
+ long currentTimeWindow = flowStat.currentTimeSlotId();
+
+ MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+
+ String service = "xservice";
+ String path = "ypath";
+
+ CircuitBreaker cb = new CircuitBreaker();
+ cb.service = service;
+ cb.path = path;
+ cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
+ cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
+ cb.monitorDuration = 5 * 1000;
+ cb.minRequests = 100;
+ cb.totalErrorThreshold = 10;
+ cb.breakDuration = 2 * 1000;
+ cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL;
+ cb.resumeDuration = 3 * 1000;
+ cb.stateStartTime = currentTimeWindow;
+ cb.initGradualResumeTimeWindowContext();
+
+ Map circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
+ circuitBreakerMap.put(cb.resource, cb);
+
+ ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
+ TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
+ timeSlot.setCompReqs(200);
+ timeSlot.setErrors(11);
+
+ boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertFalse(permit);
+ Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
+ Thread.sleep(3000);
+ Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get());
+
+ currentTimeWindow = flowStat.currentTimeSlotId();
+ mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
+ mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
+ permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ Assertions.assertTrue(permit);
+ for (int i = 0; i < 68; i++) {
+ permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
+ }
+ Assertions.assertFalse(permit);
+ }
}
diff --git a/fizz-plugin/pom.xml b/fizz-plugin/pom.xml
index c07b832..afda84c 100644
--- a/fizz-plugin/pom.xml
+++ b/fizz-plugin/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.7.2
+ 2.7.3
../pom.xml
4.0.0
diff --git a/fizz-spring-boot-starter/pom.xml b/fizz-spring-boot-starter/pom.xml
index a73333b..c98ef29 100644
--- a/fizz-spring-boot-starter/pom.xml
+++ b/fizz-spring-boot-starter/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.7.2
+ 2.7.3
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index c1a0cd4..355fc68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,11 +6,11 @@
2.2.13.RELEASE
- 5.2.23.RELEASE
+ 5.2.24.RELEASE
Dysprosium-SR25
5.3.7.RELEASE
2.2.7.RELEASE
- 4.1.91.Final
+ 4.1.93.Final
4.4.16
2.17.2
1.7.36
@@ -22,7 +22,7 @@
0.8.2
0.9.11
2.11.1
- 2.0.59.Final
+ 2.0.61.Final
2.2.9.RELEASE
1.33
Moore-SR13
@@ -38,7 +38,7 @@
fizz-gateway-community
${project.artifactId}
fizz gateway community
- 2.7.2
+ 2.7.3
pom
fizz-common