Merge pull request #484 from fizzgate/develop

Release v2.7.3
This commit is contained in:
dxfeng10
2023-07-02 13:57:10 +08:00
committed by GitHub
27 changed files with 420 additions and 206 deletions

View File

@@ -4,7 +4,7 @@ English | [简体中文](./README.md)
<a href="https://www.fizzgate.com"><img src="https://www.fizzgate.com/fizz/nav-bar/logo.png?v=1" width="70%"></a> <a href="https://www.fizzgate.com"><img src="https://www.fizzgate.com/fizz/nav-bar/logo.png?v=1" width="70%"></a>
</p> </p>
<p> <p>
<img alt="Version" src="https://img.shields.io/badge/version-2.7.2-blue.svg?cacheSeconds=2592000" /> <img alt="Version" src="https://img.shields.io/badge/version-2.7.3-blue.svg?cacheSeconds=2592000" />
<a href="http://www.fizzgate.com/fizz-gateway-node/" target="_blank"> <a href="http://www.fizzgate.com/fizz-gateway-node/" target="_blank">
<img alt="Documentation" src="https://img.shields.io/badge/documentation-yes-brightgreen.svg" /> <img alt="Documentation" src="https://img.shields.io/badge/documentation-yes-brightgreen.svg" />
</a> </a>
@@ -107,10 +107,8 @@ Starting from v1.3.0, the frontend and backend of the management backend are mer
|------------------------|---------------------------| |------------------------|---------------------------|
| v1.3.0 | v1.3.0 | | v1.3.0 | v1.3.0 |
| ... | ... | | ... | ... |
| v2.6.6 | v2.6.6 |
| v2.7.0 | v2.7.0 |
| v2.7.1 | v2.7.1 |
| v2.7.2 | v2.7.2 | | v2.7.2 | v2.7.2 |
| v2.7.3 | v2.7.3 |
Please download the corresponding management backend version according to the version of the community version Please download the corresponding management backend version according to the version of the community version

View File

@@ -3,7 +3,7 @@
<a href="https://www.fizzgate.com"><img src="https://www.fizzgate.com/fizz/nav-bar/logo.png?v=1" width="70%"></a> <a href="https://www.fizzgate.com"><img src="https://www.fizzgate.com/fizz/nav-bar/logo.png?v=1" width="70%"></a>
</p> </p>
<p> <p>
<img alt="Version" src="https://img.shields.io/badge/version-2.7.1-blue.svg?cacheSeconds=2592000" /> <img alt="Version" src="https://img.shields.io/badge/version-2.7.3-blue.svg?cacheSeconds=2592000" />
<a href="http://www.fizzgate.com/fizz-gateway-node/" target="_blank"> <a href="http://www.fizzgate.com/fizz-gateway-node/" target="_blank">
<img alt="Documentation" src="https://img.shields.io/badge/documentation-yes-brightgreen.svg" /> <img alt="Documentation" src="https://img.shields.io/badge/documentation-yes-brightgreen.svg" />
</a> </a>
@@ -115,33 +115,9 @@ API地址https://demo.fizzgate.com/proxy/[服务名]/[API_Path]
| fizz-gateway-node | fizz-manager-professional | | fizz-gateway-node | fizz-manager-professional |
|------------------------|---------------------------| |------------------------|---------------------------|
| v1.3.0 | v1.3.0 | | v1.3.0 | v1.3.0 |
| v1.4.0 | v1.4.0 | | ... | ... |
| v1.4.1 | v1.4.1 |
| v1.5.0 | v1.5.0 |
| v1.5.1 | v1.5.1 |
| v2.0.0 | v2.0.0 |
| v2.1.0 | v2.1.0 |
| v2.2.0 | v2.2.0 |
| v2.2.1 | v2.2.1 |
| v2.2.3 | v2.2.3 |
| v2.3.0 | v2.3.0 |
| v2.3.2 | v2.3.2 |
| v2.3.3 | v2.3.3 |
| v2.4.0 | v2.4.0 |
| v2.4.1 | v2.4.1 |
| v2.5.0 | v2.5.0 |
| v2.5.1 | v2.5.1 |
| v2.5.2 | v2.5.2 |
| v2.6.0 | v2.6.0 |
| v2.6.1 | v2.6.1 |
| v2.6.2 | v2.6.2 |
| v2.6.3 | v2.6.3 |
| v2.6.4 | v2.6.4 |
| v2.6.5 | v2.6.5 |
| v2.6.6 | v2.6.6 |
| v2.7.0 | v2.7.0 |
| v2.7.1 | v2.7.1 |
| v2.7.2 | v2.7.2 | | v2.7.2 | v2.7.2 |
| v2.7.3 | v2.7.3 |
请根据节点端的版本下载对应的管理后台版本 请根据节点端的版本下载对应的管理后台版本

View File

@@ -6,7 +6,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.2</version> <version>2.7.3</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
@@ -14,11 +14,11 @@
<properties> <properties>
<!--<java.version>1.8</java.version> <!--<java.version>1.8</java.version>
<spring-framework.version>5.2.23.RELEASE</spring-framework.version> <spring-framework.version>5.2.24.RELEASE</spring-framework.version>
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version> <spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version> <reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version> <lettuce.version>5.3.7.RELEASE</lettuce.version>
<netty.version>4.1.91.Final</netty.version> <netty.version>4.1.93.Final</netty.version>
<httpcore.version>4.4.16</httpcore.version> <httpcore.version>4.4.16</httpcore.version>
<log4j2.version>2.17.2</log4j2.version> <log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>
@@ -32,7 +32,7 @@
<commons-codec.version>1.15</commons-codec.version> <commons-codec.version>1.15</commons-codec.version>
<commons-pool2.version>2.11.1</commons-pool2.version> <commons-pool2.version>2.11.1</commons-pool2.version>
<gson.version>2.8.9</gson.version> <gson.version>2.8.9</gson.version>
<netty-tcnative.version>2.0.59.Final</netty-tcnative.version> <netty-tcnative.version>2.0.61.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version> <spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.33</snakeyaml.version> <snakeyaml.version>1.33</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>--> <spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>-->

View File

@@ -93,7 +93,7 @@ flow-stat-sched:
queue: fizz_resource_access_stat queue: fizz_resource_access_stat
gateway: gateway:
prefix: /proxy prefix: /
aggr: aggr:
# set headers when calling the backend API # set headers when calling the backend API
proxy_set_headers: X-Real-IP,X-Forwarded-Proto,X-Forwarded-For proxy_set_headers: X-Real-IP,X-Forwarded-Proto,X-Forwarded-For

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.2</version> <version>2.7.3</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.2</version> <version>2.7.3</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -56,11 +56,8 @@ import reactor.core.scheduler.Schedulers;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
/** /**
* @author Francis Dong * @author Francis Dong
@@ -160,6 +157,9 @@ public class AggregateFilter implements WebFilter {
clientInput.put("headers", headers); clientInput.put("headers", headers);
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams())); clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE)); clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE));
Map<String, Object> pathParams = (Map<String, Object>) com.fizzgate.util.ThreadContext.get("pathParams");
clientInput.put("pathParams", pathParams == null ? Collections.emptyMap() : pathParams);
com.fizzgate.util.ThreadContext.remove("pathParams");
Mono<AggregateResult> result = null; Mono<AggregateResult> result = null;
MediaType contentType = request.getHeaders().getContentType(); MediaType contentType = request.getHeaders().getContentType();

View File

@@ -17,6 +17,25 @@
package com.fizzgate.filter; package com.fizzgate.filter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.fizzgate.config.AggregateRedisConfig; import com.fizzgate.config.AggregateRedisConfig;
import com.fizzgate.plugin.auth.ApiConfig; import com.fizzgate.plugin.auth.ApiConfig;
@@ -27,33 +46,14 @@ import com.fizzgate.proxy.CallbackService;
import com.fizzgate.proxy.DiscoveryClientUriSelector; import com.fizzgate.proxy.DiscoveryClientUriSelector;
import com.fizzgate.proxy.ServiceInstance; import com.fizzgate.proxy.ServiceInstance;
import com.fizzgate.service_registry.RegistryCenterService; import com.fizzgate.service_registry.RegistryCenterService;
import com.fizzgate.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import com.fizzgate.spring.web.server.ext.FizzServerWebExchangeDecorator;
import com.fizzgate.util.Consts; import com.fizzgate.util.Consts;
import com.fizzgate.util.NettyDataBufferUtils; import com.fizzgate.util.NettyDataBufferUtils;
import com.fizzgate.util.ThreadContext; import com.fizzgate.util.ThreadContext;
import com.fizzgate.util.WebUtils; import com.fizzgate.util.WebUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
/** /**
* @author hongqiaowei * @author hongqiaowei
*/ */
@@ -89,34 +89,49 @@ public class CallbackFilter extends FizzWebFilter {
@Resource @Resource
private GatewayGroupService gatewayGroupService; private GatewayGroupService gatewayGroupService;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange); @Override
if (!pfr.success) { public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return WebUtils.getDirectResponse(exchange); String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
ServerHttpRequest req = exchange.getRequest();
if (req instanceof FizzServerHttpRequestDecorator) {
return doFilter(exchange, chain);
} }
return
NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
body -> {
FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
try {
requestDecorator.setBody(body);
} finally {
NettyDataBufferUtils.release(body);
}
}
ServerWebExchange mutatedExchange = exchange.mutate().request(requestDecorator).build();
ServerWebExchange newExchange = mutatedExchange;
MediaType contentType = req.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
newExchange = new FizzServerWebExchangeDecorator(mutatedExchange);
}
return doFilter(newExchange, chain);
}
);
}
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
ApiConfig ac = WebUtils.getApiConfig(exchange); ApiConfig ac = WebUtils.getApiConfig(exchange);
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
CallbackConfig cc = ac.callbackConfig; CallbackConfig cc = ac.callbackConfig;
ServerHttpRequest req = exchange.getRequest(); FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) exchange.getRequest();
return return req.getBody().defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER).single().flatMap(b -> {
DataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) String body = b.toString(StandardCharsets.UTF_8);
.flatMap(
b -> {
DataBuffer body = null;
if (b != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
if (b instanceof PooledDataBuffer) {
try {
body = NettyDataBufferUtils.copy2heap(b);
} finally {
NettyDataBufferUtils.release(b);
}
} else {
body = b;
}
}
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac); HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next()); pushReq2manager(exchange, headers, body, service2instMap, cc.id, ac.gatewayGroups.iterator().next());
@@ -175,7 +190,7 @@ public class CallbackFilter extends FizzWebFilter {
private static final String _receivers = "\"receivers\":"; private static final String _receivers = "\"receivers\":";
private static final String _gatewayGroup = "\"gatewayGroup\":"; private static final String _gatewayGroup = "\"gatewayGroup\":";
private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, HashMap<String, ServiceInstance> service2instMap, int callbackConfigId, private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, Object body, HashMap<String, ServiceInstance> service2instMap, int callbackConfigId,
String gatewayGroup) { String gatewayGroup) {
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
@@ -215,7 +230,8 @@ public class CallbackFilter extends FizzWebFilter {
if (body != null) { if (body != null) {
b.append(Consts.S.COMMA); b.append(Consts.S.COMMA);
String bodyStr = body.toString(StandardCharsets.UTF_8); // String bodyStr = body.toString(StandardCharsets.UTF_8);
String bodyStr = body.toString();
MediaType contentType = req.getHeaders().getContentType(); MediaType contentType = req.getHeaders().getContentType();
if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) { if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) {
b.append(_body); b.append(JSON.toJSONString(bodyStr)); b.append(_body); b.append(JSON.toJSONString(bodyStr));

View File

@@ -360,7 +360,7 @@ public class FlowControlFilter extends FizzWebFilter {
if (hasHost) { if (hasHost) {
// String resourceId = ResourceIdUtils.buildResourceId(app, ip, node, service, path); // String resourceId = ResourceIdUtils.buildResourceId(app, ip, node, service, path);
String resourceId = ResourceIdUtils.buildResourceId(null, null, node, null, null); String resourceId = ResourceIdUtils.buildResourceId(null, null, node, null, null);
ResourceConfig resourceConfig = new ResourceConfig(resourceId, 0, 0); ResourceConfig resourceConfig = new ResourceConfig(resourceId, -1L, -1L);
resourceConfigs.add(resourceConfig); resourceConfigs.add(resourceConfig);
} }
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceIdUtils.NODE, null, null, null); checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceIdUtils.NODE, null, null, null);
@@ -407,11 +407,11 @@ public class FlowControlFilter extends FizzWebFilter {
} else { } else {
String node = ResourceIdUtils.getNode(resource); String node = ResourceIdUtils.getNode(resource);
if (node != null && node.equals(ResourceIdUtils.NODE)) { if (node != null && node.equals(ResourceIdUtils.NODE)) {
rc = new ResourceConfig(resource, 0, 0); rc = new ResourceConfig(resource, -1L, -1L);
} }
if (defaultRateLimitConfigId != null) { if (defaultRateLimitConfigId != null) {
if (defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) { if (defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) {
rc = new ResourceConfig(resource, 0, 0); rc = new ResourceConfig(resource, -1L, -1L);
rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE); rateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
if (rateLimitConfig != null && rateLimitConfig.isEnable()) { if (rateLimitConfig != null && rateLimitConfig.isEnable()) {
rc.setMaxCon(rateLimitConfig.concurrents); rc.setMaxCon(rateLimitConfig.concurrents);
@@ -441,7 +441,7 @@ public class FlowControlFilter extends FizzWebFilter {
} }
}*/ }*/
if (cb != null) { if (cb != null) {
rc = new ResourceConfig(resource, 0, 0); rc = new ResourceConfig(resource, -1L, -1L);
resourceConfigs.add(rc); resourceConfigs.add(rc);
} }
} }
@@ -508,7 +508,7 @@ public class FlowControlFilter extends FizzWebFilter {
private void something4(List<ResourceConfig> resourceConfigs, String app, String ip, String service) { private void something4(List<ResourceConfig> resourceConfigs, String app, String ip, String service) {
String r = ResourceIdUtils.buildResourceId(app, ip, null, service, null); String r = ResourceIdUtils.buildResourceId(app, ip, null, service, null);
ResourceConfig rc = new ResourceConfig(r, 0, 0); ResourceConfig rc = new ResourceConfig(r, -1L, -1L);
resourceConfigs.add(rc); resourceConfigs.add(rc);
} }

View File

@@ -27,6 +27,7 @@ import com.fizzgate.fizz.input.InputType;
import com.fizzgate.util.Consts; import com.fizzgate.util.Consts;
import com.fizzgate.util.ReactorUtils; import com.fizzgate.util.ReactorUtils;
import com.fizzgate.util.UrlTransformUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode; import org.noear.snack.ONode;
@@ -50,6 +51,7 @@ import java.lang.ref.SoftReference;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.fizzgate.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE; import static com.fizzgate.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static com.fizzgate.util.Consts.S.FORWARD_SLASH; import static com.fizzgate.util.Consts.S.FORWARD_SLASH;
@@ -388,6 +390,47 @@ public class ConfigLoader {
ClientInputConfig cfg = (ClientInputConfig) input.getConfig(); ClientInputConfig cfg = (ClientInputConfig) input.getConfig();
return new AggregateResource(pipeline, input); return new AggregateResource(pipeline, input);
} }
} else {
String aggrMethodPath = null;
try {
for (Map.Entry<String, String> entry : aggregateResources.entrySet()) {
aggrMethodPath = entry.getKey();
boolean match = UrlTransformUtils.ANT_PATH_MATCHER.match(aggrMethodPath, key);
if (match) {
String configStr = aggregateResources.get(aggrMethodPath);
Input input = createInput(configStr);
Pipeline pipeline = createPipeline(configStr);
if (pipeline != null && input != null) {
Map<String, String> pathVariables = UrlTransformUtils.ANT_PATH_MATCHER.extractUriTemplateVariables(aggrMethodPath, key);
Map<String, Object> map = Collections.emptyMap();
if (!CollectionUtils.isEmpty(pathVariables)) {
map = pathVariables.entrySet().stream().filter(
e -> {
return e.getKey().indexOf('$') == -1;
}
)
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> {
return (Object) e.getValue();
}
)
);
}
com.fizzgate.util.ThreadContext.set("pathParams", map);
return new AggregateResource(pipeline, input);
} else {
LOGGER.warn("request {} match {}, input {} pipeline {}", key, aggrMethodPath, input, pipeline);
return null;
}
}
}
} catch (IOException e) {
LOGGER.warn("request {} match {}, create input or pipeline error", key, aggrMethodPath, e);
return null;
}
} }
return null; return null;
} }

View File

@@ -242,6 +242,7 @@ public class Pipeline {
inputRequest.put("method", clientInput.get("method")); inputRequest.put("method", clientInput.get("method"));
inputRequest.put("headers", clientInput.get("headers")); inputRequest.put("headers", clientInput.get("headers"));
inputRequest.put("params", clientInput.get("params")); inputRequest.put("params", clientInput.get("params"));
inputRequest.put("pathParams", clientInput.get("pathParams"));
stepContext.addFilePartMap((Map<String, FilePart>) clientInput.get("filePartMap")); stepContext.addFilePartMap((Map<String, FilePart>) clientInput.get("filePartMap"));
if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType()) if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType())

View File

@@ -18,7 +18,10 @@
package com.fizzgate.fizz.function; package com.fizzgate.fizz.function;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@@ -65,6 +68,8 @@ public class CommonFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "common.and", this); FuncExecutor.register(NAME_SPACE_PREFIX + "common.and", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.or", this); FuncExecutor.register(NAME_SPACE_PREFIX + "common.or", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.not", this); FuncExecutor.register(NAME_SPACE_PREFIX + "common.not", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.emptyMap", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "common.emptyList", this);
} }
/** /**
@@ -174,4 +179,22 @@ public class CommonFunc implements IFunc {
return !(obj == null ? false : obj); return !(obj == null ? false : obj);
} }
/**
* Return an empty map
*
* @return
*/
public Map<String, Object> emptyMap() {
return new HashMap<>();
}
/**
* Return an empty list
*
* @return
*/
public List<Map<String, Object>> emptyList() {
return new ArrayList<>();
}
} }

View File

@@ -312,6 +312,26 @@ public class FuncExecutor {
argsStrContainer = this.trimArgStr(argsStrContainer, 5, isVarArgs, paramTypes.length, funcExpression); argsStrContainer = this.trimArgStr(argsStrContainer, 5, isVarArgs, paramTypes.length, funcExpression);
argsStr = argsStrContainer.getArgsStr(); argsStr = argsStrContainer.getArgsStr();
i = argsStrContainer.getIndex(); i = argsStrContainer.getIndex();
} else if (argsStr.matches("^\\[\\]\\s*,.*") || argsStr.matches("^\\[\\]\\s*\\).*")) { // []
if (isVarArgs && i == paramTypes.length - 1) {
varArgs.add(new ArrayList());
args[i] = varArgs.toArray(new ArrayList[varArgs.size()]);
} else {
args[i] = new ArrayList();
}
argsStrContainer = this.trimArgStr(argsStrContainer, 2, isVarArgs, paramTypes.length, funcExpression);
argsStr = argsStrContainer.getArgsStr();
i = argsStrContainer.getIndex();
} else if (argsStr.matches("^\\{\\}\\s*,.*") || argsStr.matches("^\\{\\}\\s*\\).*")) { // {}
if (isVarArgs && i == paramTypes.length - 1) {
varArgs.add(new HashMap());
args[i] = varArgs.toArray(new HashMap[varArgs.size()]);
} else {
args[i] = new HashMap();
}
argsStrContainer = this.trimArgStr(argsStrContainer, 2, isVarArgs, paramTypes.length, funcExpression);
argsStr = argsStrContainer.getArgsStr();
i = argsStrContainer.getIndex();
} else if (argsStr.startsWith("{")) { // reference value } else if (argsStr.startsWith("{")) { // reference value
int pos = argsStr.indexOf("}", 1); int pos = argsStr.indexOf("}", 1);
if (pos != -1) { if (pos != -1) {

View File

@@ -62,6 +62,7 @@ public class ListFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "list.join", this); FuncExecutor.register(NAME_SPACE_PREFIX + "list.join", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.rename", this); FuncExecutor.register(NAME_SPACE_PREFIX + "list.rename", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.removeFields", this); FuncExecutor.register(NAME_SPACE_PREFIX + "list.removeFields", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.emptyList", this);
} }
/** /**
@@ -228,4 +229,13 @@ public class ListFunc implements IFunc {
return data; return data;
} }
/**
* Return an empty list
*
* @return
*/
public List<Map<String, Object>> emptyList() {
return new ArrayList<>();
}
} }

View File

@@ -19,7 +19,6 @@ package com.fizzgate.proxy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -81,7 +80,7 @@ public class CallbackService {
aggrConfigPrefix = systemConfig.getGatewayPrefix() + '/'; aggrConfigPrefix = systemConfig.getGatewayPrefix() + '/';
} }
public Mono<Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map<String, ServiceInstance> service2instMap) { public Mono<Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, String body, CallbackConfig cc, Map<String, ServiceInstance> service2instMap) {
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange); String traceId = WebUtils.getTraceId(exchange);
HttpMethod method = req.getMethod(); HttpMethod method = req.getMethod();
@@ -140,21 +139,21 @@ public class CallbackService {
; ;
} }
private Function<Throwable, Mono<? extends ClientResponse>> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) { private Function<Throwable, Mono<? extends ClientResponse>> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) {
return t -> { return t -> {
log(exchange, r, method, headers, body, t); log(exchange, r, method, headers, body, t);
return Mono.just(new FizzFailClientResponse(t)); return Mono.just(new FizzFailClientResponse(t));
}; };
} }
private Function<Throwable, Mono<AggregateResult>> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) { private Function<Throwable, Mono<AggregateResult>> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body) {
return t -> { return t -> {
log(exchange, r, method, headers, body, t); log(exchange, r, method, headers, body, t);
return Mono.just(new FailAggregateResult(t)); return Mono.just(new FailAggregateResult(t));
}; };
} }
private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body, Throwable t) { private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, String body, Throwable t) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b); WebUtils.request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR); b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);

View File

@@ -161,10 +161,10 @@ public class FlowStat {
for (ResourceConfig resourceConfig : resourceConfigs) { for (ResourceConfig resourceConfig : resourceConfigs) {
long maxCon = resourceConfig.getMaxCon(); long maxCon = resourceConfig.getMaxCon();
long maxQPS = resourceConfig.getMaxQPS(); long maxQPS = resourceConfig.getMaxQPS();
if (maxCon > 0 || maxQPS > 0) { if (maxCon >= 0 || maxQPS >= 0) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId()); ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
// check concurrent request // check concurrent request
if (maxCon > 0) { if (maxCon >= 0) {
long n = resourceStat.getConcurrentRequests().get(); long n = resourceStat.getConcurrentRequests().get();
if (n >= maxCon) { if (n >= maxCon) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
@@ -184,7 +184,7 @@ public class FlowStat {
} }
// check QPS // check QPS
if (maxQPS > 0) { if (maxQPS >= 0) {
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter(); long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter();
if (total >= maxQPS) { if (total >= maxQPS) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);

View File

@@ -46,14 +46,14 @@ public class ResourceConfig {
// Flow control rule // Flow control rule
//--------------------------------------------------------------------- //---------------------------------------------------------------------
/** /**
* Maximum concurrent request, zero or negative for no limit * Maximum concurrent request, negative for no limit
*/ */
private long maxCon; private long maxCon = -1L;
/** /**
* Maximum QPS, zero or negative for no limit * Maximum QPS, negative for no limit
*/ */
private long maxQPS; private long maxQPS = -1L;
//--------------------------------------------------------------------- //---------------------------------------------------------------------

View File

@@ -25,7 +25,6 @@ import com.fizzgate.stats.TimeSlot;
import com.fizzgate.stats.TimeWindowStat; import com.fizzgate.stats.TimeWindowStat;
import com.fizzgate.util.JacksonUtils; import com.fizzgate.util.JacksonUtils;
import com.fizzgate.util.ResourceIdUtils; import com.fizzgate.util.ResourceIdUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -35,8 +34,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@@ -159,6 +157,8 @@ public class CircuitBreaker {
public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED); public final AtomicReference<State> stateRef = new AtomicReference<>(State.CLOSED);
public final AtomicBoolean noProbe = new AtomicBoolean(true);
public long stateStartTime; public long stateStartTime;
public CircuitBreaker() { public CircuitBreaker() {
@@ -262,7 +262,7 @@ public class CircuitBreaker {
} }
private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) { private boolean isResumeTraffic(long currentTimeWindow, FlowStat flowStat) {
long nThSecond = getStateDuration(currentTimeWindow); long nThSecond = getStateDuration(currentTimeWindow) / 1000;
GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond); GradualResumeTimeWindowContext ctx = gradualResumeTimeWindowContexts.get((int) nThSecond);
ResourceStat resourceStat = flowStat.getResourceStat(resource); ResourceStat resourceStat = flowStat.getResourceStat(resource);
return ctx.permit(resourceStat, currentTimeWindow); return ctx.permit(resourceStat, currentTimeWindow);
@@ -282,9 +282,20 @@ public class CircuitBreaker {
transit(s, State.CLOSED, currentTimeWindow, flowStat); transit(s, State.CLOSED, currentTimeWindow, flowStat);
} else if (s == State.OPEN && stateDuration > breakDuration) { } else if (s == State.OPEN && stateDuration > breakDuration) {
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state", LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, correct to CLOSED state",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration); currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(s, State.CLOSED, currentTimeWindow, flowStat); transit(s, State.CLOSED, currentTimeWindow, flowStat);
} else if (resumeStrategy == ResumeStrategy.DETECTIVE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
noProbe.set(true);
transit(s, State.RESUME_DETECTIVE, currentTimeWindow, flowStat);
} else if (resumeStrategy == ResumeStrategy.GRADUAL) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(s, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
}
} else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) { } else if (s == State.RESUME_GRADUALLY && stateDuration > resumeDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state", LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, correct to CLOSED state",
@@ -293,36 +304,9 @@ public class CircuitBreaker {
} }
} }
/*public void correctCircuitBreakerStateAsError(long currentTimeWindow, FlowStat flowStat) {
if (stateRef.get() == State.CLOSED) {
long endTimeWindow = currentTimeWindow + 1000;
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
long reqCount = timeWindowStat.getCompReqs();
long errCount = timeWindowStat.getErrors();
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, correct to OPEN state as error",
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
} else if (breakStrategy == BreakStrategy.ERRORS_RATIO && reqCount >= minRequests) {
BigDecimal errors = new BigDecimal(errCount);
BigDecimal requests = new BigDecimal(reqCount);
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
if (p - errorRatioThreshold >= 0) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, correct to OPEN state as error",
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
}
}
}
}*/
public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) { public boolean transit(State current, State target, long currentTimeWindow, FlowStat flowStat) {
if (stateRef.compareAndSet(current, target)) { if (stateRef.compareAndSet(current, target)) {
ResourceStat resourceStat = flowStat.getResourceStat(resource); ResourceStat resourceStat = flowStat.getResourceStat(resource);
/*AtomicLong circuitBreakNum = resourceStat.getTimeSlot(currentTimeWindow).getCircuitBreakNum();
circuitBreakNum.set(0);*/
resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0); resourceStat.getTimeSlot(currentTimeWindow).setCircuitBreakNum(0);
resourceStat.updateCircuitBreakState(currentTimeWindow, current, target); resourceStat.updateCircuitBreakState(currentTimeWindow, current, target);
LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target); LOGGER.debug("transit {} current time window {} from {} which start at {} to {}", resource, currentTimeWindow, current, stateStartTime, target);
@@ -333,19 +317,25 @@ public class CircuitBreaker {
} }
public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) { public boolean permit(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
correctState(currentTimeWindow, flowStat);
if (stateRef.get() == State.CLOSED) { if (stateRef.get() == State.CLOSED) {
return permitCallInClosedState(currentTimeWindow, flowStat); return permitCallInClosedState(currentTimeWindow, flowStat);
} }
if (stateRef.get() == State.OPEN) { if (stateRef.get() == State.OPEN) {
return permitCallInOpenState(exchange, currentTimeWindow, flowStat); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return false;
} }
if (stateRef.get() == State.RESUME_DETECTIVE) { if (stateRef.get() == State.RESUME_DETECTIVE) {
if (noProbe.compareAndSet(true, false)) {
exchange.getAttributes().put(DETECT_REQUEST, this);
return true;
} else {
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
return false; return false;
} }
}
if (stateRef.get() == State.RESUME_GRADUALLY) { if (stateRef.get() == State.RESUME_GRADUALLY) {
return permitCallInResumeGraduallyState(currentTimeWindow, flowStat); return isResumeTraffic(currentTimeWindow, flowStat);
} }
return true; return true;
} }
@@ -353,13 +343,12 @@ public class CircuitBreaker {
private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) { private boolean permitCallInClosedState(long currentTimeWindow, FlowStat flowStat) {
long endTimeWindow = currentTimeWindow + 1000; long endTimeWindow = currentTimeWindow + 1000;
// TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, endTimeWindow - monitorDuration, endTimeWindow);
TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow); TimeWindowStat timeWindowStat = flowStat.getTimeWindowStat(resource, stateStartTime, endTimeWindow);
long reqCount = timeWindowStat.getCompReqs(); long reqCount = timeWindowStat.getCompReqs();
long errCount = timeWindowStat.getErrors(); long errCount = timeWindowStat.getErrors();
if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) { if (breakStrategy == BreakStrategy.TOTAL_ERRORS && reqCount >= minRequests && errCount >= totalErrorThreshold) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}", LOGGER.debug("{} current time window {} request count {} >= min requests {} error count {} >= total error threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold); resource, currentTimeWindow, reqCount, minRequests, errCount, totalErrorThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -370,7 +359,7 @@ public class CircuitBreaker {
BigDecimal requests = new BigDecimal(reqCount); BigDecimal requests = new BigDecimal(reqCount);
float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue(); float p = errors.divide(requests, 2, RoundingMode.HALF_UP).floatValue();
if (p - errorRatioThreshold >= 0) { if (p - errorRatioThreshold >= 0) {
LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}", LOGGER.debug("{} current time window {} request count {} >= min requests {} error ratio {} >= error ratio threshold {}, reject request",
resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold); resource, currentTimeWindow, reqCount, minRequests, p, errorRatioThreshold);
transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat); transit(State.CLOSED, State.OPEN, currentTimeWindow, flowStat);
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow); flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
@@ -378,54 +367,11 @@ public class CircuitBreaker {
} }
} }
LOGGER.debug("{} current time window {} in {} which start at {}, permit current request", resource, currentTimeWindow, stateRef.get(), stateStartTime); LOGGER.debug("{} current time window {} in {} which start at {}, permit request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return true; return true;
} }
private boolean permitCallInOpenState(ServerWebExchange exchange, long currentTimeWindow, FlowStat flowStat) {
long stateDuration = getStateDuration(currentTimeWindow);
if (stateDuration > breakDuration) {
if (resumeStrategy == ResumeStrategy.IMMEDIATE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume immediately",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(State.OPEN, State.CLOSED, currentTimeWindow, flowStat);
return true;
}
if (resumeStrategy == ResumeStrategy.DETECTIVE) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume detective",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
if (transit(State.OPEN, State.RESUME_DETECTIVE, currentTimeWindow, flowStat)) {
exchange.getAttributes().put(DETECT_REQUEST, this);
return true;
}
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
return false;
}
if (resumeStrategy == ResumeStrategy.GRADUAL) {
LOGGER.debug("current time window {}, {} last {} second in {} large than break duration {}, resume gradual",
currentTimeWindow, resource, stateDuration, stateRef.get(), breakDuration);
transit(State.OPEN, State.RESUME_GRADUALLY, currentTimeWindow, flowStat);
return isResumeTraffic(currentTimeWindow, flowStat);
}
}
flowStat.getResourceStat(resource).incrCircuitBreakNum(currentTimeWindow);
LOGGER.debug("{} current time window {} in {} which start at {}, reject current request", resource, currentTimeWindow, stateRef.get(), stateStartTime);
return false;
}
private boolean permitCallInResumeGraduallyState(long currentTimeWindow, FlowStat flowStat) {
long stateDuration = getStateDuration(currentTimeWindow);
if (stateDuration > resumeDuration) {
LOGGER.debug("current time window {}, {} last {} second in {} large than resume duration {}, resume immediately",
currentTimeWindow, resource, stateDuration, stateRef.get(), resumeDuration);
transit(State.RESUME_GRADUALLY, State.CLOSED, currentTimeWindow, flowStat);
return true;
}
return isResumeTraffic(currentTimeWindow, flowStat);
}
@Override @Override
public String toString() { public String toString() {
return JacksonUtils.writeValueAsString(this); return JacksonUtils.writeValueAsString(this);

View File

@@ -63,9 +63,9 @@ public class ResourceRateLimitConfig {
public byte type; public byte type;
public long qps; public long qps = -1L;
public long concurrents; public long concurrents = -1L;
public String responseType; public String responseType;

View File

@@ -16,6 +16,7 @@
*/ */
package com.fizzgate.fizz.function; package com.fizzgate.fizz.function;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList; import java.util.ArrayList;
@@ -116,6 +117,46 @@ class CommonFuncTests {
assertEquals(true, result); assertEquals(true, result);
} }
@Test
void testIif6() {
// test []
String funcExpression = "fn.common.iif(false, \"abc\", [])";
List result = (List)FuncExecutor.getInstance().exec(null, funcExpression);
assertEquals(true, result.size() == 0);
}
@Test
void testIif7() {
// test {}
String funcExpression = "fn.common.iif(false, \"abc\", {})";
Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
assertEquals(true, result.size() == 0);
}
@Test
void testIif8() {
// test []
String funcExpression = "fn.common.iif(false, \"abc\", fn.common.emptyList())";
List result = (List)FuncExecutor.getInstance().exec(null, funcExpression);
assertEquals(true, result.size() == 0);
}
@Test
void testIif9() {
// test {}
String funcExpression = "fn.common.iif(false, \"abc\", fn.common.emptyMap())";
Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
assertEquals(true, result.size() == 0);
}
@Test
void testIif10() {
// test null
String funcExpression = "fn.common.iif(false, \"abc\", null)";
Map result = (Map)FuncExecutor.getInstance().exec(null, funcExpression);
assertNull(result);
}
@Test @Test
void testEquals() { void testEquals() {
String funcExpression = "fn.common.equals(\"abc\", true)"; String funcExpression = "fn.common.equals(\"abc\", true)";

View File

@@ -107,6 +107,29 @@ class ListFuncTests {
assertEquals("a4", ((Map<String, Object>) result.get(3)).get("a").toString()); assertEquals("a4", ((Map<String, Object>) result.get(3)).get("a").toString());
} }
@Test
void testMerge2() {
List<Object> subList1 = new ArrayList<>();
subList1.add(createRecord("a", "a1"));
subList1.add(createRecord("a", "a2"));
subList1.add(createRecord("a", "a3"));
List<Object> subList2 = new ArrayList<>();
subList2.add(createRecord("a", "a4"));
subList2.add(createRecord("a", "a5"));
subList2.add(createRecord("a", "a6"));
ONode ctxNode = ONode.load(new HashMap());
PathMapping.setByPath(ctxNode, "test.data1", subList1, true);
PathMapping.setByPath(ctxNode, "test.data2", subList2, true);
String funcExpression = "fn.list.merge({test.data1}, [] , {test.data2})";
List<Object> result = (List<Object>) FuncExecutor.getInstance().exec(ctxNode, funcExpression);
assertEquals(6, result.size());
assertEquals("a2", ((Map<String, Object>) result.get(1)).get("a").toString());
assertEquals("a4", ((Map<String, Object>) result.get(3)).get("a").toString());
}
@Test @Test
void testExtract() { void testExtract() {
List<Object> subList1 = new ArrayList<>(); List<Object> subList1 = new ArrayList<>();

View File

@@ -23,6 +23,7 @@ import com.fizzgate.util.ReflectionUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@@ -62,7 +63,7 @@ public class CallbackServiceTests {
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
headers.add("h1", "v1"); headers.add("h1", "v1");
DataBuffer body = null; String body = null;
CallbackConfig callbackConfig = new CallbackConfig(); CallbackConfig callbackConfig = new CallbackConfig();
callbackConfig.receivers = new ArrayList<>(); callbackConfig.receivers = new ArrayList<>();
@@ -100,7 +101,8 @@ public class CallbackServiceTests {
) )
); );
Mono<Void> vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP); // Mono<Void> vm = callbackService.requestBackends(exchange, headers, body, callbackConfig, Collections.EMPTY_MAP);
Mono<Void> vm = callbackService.requestBackends(exchange, headers, null, callbackConfig, Collections.EMPTY_MAP);
vm.subscribe(); vm.subscribe();
Thread.sleep(2000); Thread.sleep(2000);

View File

@@ -115,8 +115,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data // Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c5 = new FlowRuleCase(); FlowRuleCase c5 = new FlowRuleCase();
c5.resourceConfigs.add(new ResourceConfig("_global5", 0, 0)); c5.resourceConfigs.add(new ResourceConfig("_global5", -1L, -1L));
c5.resourceConfigs.add(new ResourceConfig("service5", 0, 0)); c5.resourceConfigs.add(new ResourceConfig("service5", -1L, -1L));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0)); c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0)); c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.expectResult = IncrRequestResult.success(); c5.expectResult = IncrRequestResult.success();
@@ -124,8 +124,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data // Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c6 = new FlowRuleCase(); FlowRuleCase c6 = new FlowRuleCase();
c6.resourceConfigs.add(new ResourceConfig("_global6", 20, 0)); c6.resourceConfigs.add(new ResourceConfig("_global6", 20, -1L));
c6.resourceConfigs.add(new ResourceConfig("service6", 20, 0)); c6.resourceConfigs.add(new ResourceConfig("service6", 20, -1L));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20)); c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0)); c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST); c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST);
@@ -133,8 +133,8 @@ public class FlowStatTests {
// Note: use different resource ID to avoid being affected by previous test data // Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c7 = new FlowRuleCase(); FlowRuleCase c7 = new FlowRuleCase();
c7.resourceConfigs.add(new ResourceConfig("_global7", 0, 0)); c7.resourceConfigs.add(new ResourceConfig("_global7", -1L, -1L));
c7.resourceConfigs.add(new ResourceConfig("service7", 0, 20)); c7.resourceConfigs.add(new ResourceConfig("service7", -1L, 20));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0)); c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20)); c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20));
c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS); c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS);

View File

@@ -27,6 +27,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
@TestPropertySource("/application.properties") @TestPropertySource("/application.properties")
@@ -108,4 +109,119 @@ public class CircuitBreakManagerTests {
Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get()); Assertions.assertEquals(CircuitBreaker.State.OPEN, timeSlot.getCircuitBreakState().get());
Assertions.assertEquals(2, timeSlot.getCircuitBreakNum()); Assertions.assertEquals(2, timeSlot.getCircuitBreakNum());
} }
@Test
void detectiveResumeTest() throws InterruptedException {
FlowStat flowStat = new FlowStat(circuitBreakManager);
flowStat.cleanResource = false;
flowStat.createTimeSlotOnlyTraffic = false;
long currentTimeWindow = flowStat.currentTimeSlotId();
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
String service = "xservice";
String path = "ypath";
CircuitBreaker cb = new CircuitBreaker();
cb.service = service;
cb.path = path;
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
cb.monitorDuration = 5 * 1000;
cb.minRequests = 100;
cb.totalErrorThreshold = 10;
cb.breakDuration = 2 * 1000;
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.DETECTIVE;
cb.stateStartTime = currentTimeWindow;
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
circuitBreakerMap.put(cb.resource, cb);
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
timeSlot.setCompReqs(200);
timeSlot.setErrors(11);
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertFalse(permit);
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
Thread.sleep(3000);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertTrue(permit);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
Assertions.assertFalse(cb.noProbe.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertEquals(CircuitBreaker.State.RESUME_DETECTIVE, cb.stateRef.get());
Assertions.assertFalse(permit);
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.CLOSED, currentTimeWindow, flowStat); // mock probe request success
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertEquals(CircuitBreaker.State.CLOSED, cb.stateRef.get());
Assertions.assertTrue(permit);
}
@Test
void gradualResumeTest() throws InterruptedException {
FlowStat flowStat = new FlowStat(circuitBreakManager);
flowStat.cleanResource = false;
flowStat.createTimeSlotOnlyTraffic = false;
long currentTimeWindow = flowStat.currentTimeSlotId();
MockServerHttpRequest mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
MockServerWebExchange mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
String service = "xservice";
String path = "ypath";
CircuitBreaker cb = new CircuitBreaker();
cb.service = service;
cb.path = path;
cb.resource = ResourceIdUtils.buildResourceId(null, null, null, service, path);
cb.breakStrategy = CircuitBreaker.BreakStrategy.TOTAL_ERRORS;
cb.monitorDuration = 5 * 1000;
cb.minRequests = 100;
cb.totalErrorThreshold = 10;
cb.breakDuration = 2 * 1000;
cb.resumeStrategy = CircuitBreaker.ResumeStrategy.GRADUAL;
cb.resumeDuration = 3 * 1000;
cb.stateStartTime = currentTimeWindow;
cb.initGradualResumeTimeWindowContext();
Map<String, CircuitBreaker> circuitBreakerMap = circuitBreakManager.getResource2circuitBreakerMap();
circuitBreakerMap.put(cb.resource, cb);
ResourceStat resourceStat = flowStat.getResourceStat(cb.resource);
TimeSlot timeSlot = resourceStat.getTimeSlot(currentTimeWindow);
timeSlot.setCompReqs(200);
timeSlot.setErrors(11);
boolean permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertFalse(permit);
Assertions.assertEquals(CircuitBreaker.State.OPEN, cb.stateRef.get());
Thread.sleep(3000);
Assertions.assertEquals(CircuitBreaker.State.RESUME_GRADUALLY, cb.stateRef.get());
currentTimeWindow = flowStat.currentTimeSlotId();
mockServerHttpRequest = MockServerHttpRequest.get("/xxx").build();
mockServerWebExchange = MockServerWebExchange.from(mockServerHttpRequest);
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
Assertions.assertTrue(permit);
for (int i = 0; i < 68; i++) {
permit = circuitBreakManager.permit(mockServerWebExchange, currentTimeWindow, flowStat, service, path);
}
Assertions.assertFalse(permit);
}
} }

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.2</version> <version>2.7.3</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId> <groupId>com.fizzgate</groupId>
<version>2.7.2</version> <version>2.7.3</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -6,11 +6,11 @@
<properties> <properties>
<!--<java.version>1.8</java.version>--> <!--<java.version>1.8</java.version>-->
<spring-boot.version>2.2.13.RELEASE</spring-boot.version> <spring-boot.version>2.2.13.RELEASE</spring-boot.version>
<spring-framework.version>5.2.23.RELEASE</spring-framework.version> <spring-framework.version>5.2.24.RELEASE</spring-framework.version>
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version> <reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version> <lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version> <nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version>
<netty.version>4.1.91.Final</netty.version> <netty.version>4.1.93.Final</netty.version>
<httpcore.version>4.4.16</httpcore.version> <httpcore.version>4.4.16</httpcore.version>
<log4j2.version>2.17.2</log4j2.version> <log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>
@@ -22,7 +22,7 @@
<r2dbc-mysql.version>0.8.2</r2dbc-mysql.version> <r2dbc-mysql.version>0.8.2</r2dbc-mysql.version>
<reflections.version>0.9.11</reflections.version> <reflections.version>0.9.11</reflections.version>
<commons-pool2.version>2.11.1</commons-pool2.version> <commons-pool2.version>2.11.1</commons-pool2.version>
<netty-tcnative.version>2.0.59.Final</netty-tcnative.version> <netty-tcnative.version>2.0.61.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version> <spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.33</snakeyaml.version> <snakeyaml.version>1.33</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version> <spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>
@@ -38,7 +38,7 @@
<artifactId>fizz-gateway-community</artifactId> <artifactId>fizz-gateway-community</artifactId>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description>fizz gateway community</description> <description>fizz gateway community</description>
<version>2.7.2</version> <version>2.7.3</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>fizz-common</module> <module>fizz-common</module>