diff --git a/README.en-us.md b/README.en-us.md
index 4c551cd..8246652 100644
--- a/README.en-us.md
+++ b/README.en-us.md
@@ -90,6 +90,7 @@ Starting from v1.3.0, the frontend and backend of the management backend are mer
| Fizz-gateway-community | Fizz-manager-professional |
| ---------------------- | ------------------------- |
| v1.3.0 | v1.3.0 |
+| v1.4.0 | v1.4.0 |
Please download the corresponding management backend version according to the version of the community version
@@ -189,6 +190,8 @@ Fizz官方技术交流③群:512164278
## System screenshot
+
+


@@ -197,4 +200,4 @@ Fizz官方技术交流③群:512164278

-
+
diff --git a/README.md b/README.md
index eaea8a6..8b3b148 100644
--- a/README.md
+++ b/README.md
@@ -89,6 +89,7 @@ API地址:http://demo.fizzgate.com/proxy/[服务名]/[API Path]
| Fizz-gateway-community | Fizz-manager-professional |
| ---------------------- | ------------------------- |
| v1.3.0 | v1.3.0 |
+| v1.4.0 | v1.4.0 |
请根据社区版的版本下载对应的管理后台版本
@@ -188,6 +189,8 @@ Fizz官方技术交流③群:512164278
## 系统截图
+
+


@@ -196,4 +199,4 @@ Fizz官方技术交流③群:512164278

-
+
diff --git a/pom.xml b/pom.xml
index 6f2a81c..ad3f7a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
we
fizz-gateway-community
- 1.3.0
+ 1.4.0
fizz-gateway-community
@@ -35,10 +35,10 @@
1.8
5.2.12.RELEASE
- Dysprosium-SR15
- 5.3.5.RELEASE
+ Dysprosium-SR16
+ 5.3.6.RELEASE
0.2.7
- 4.1.56.Final
+ 4.1.58.Final
4.4.14
2.13.3
@@ -177,6 +177,18 @@
org.springframework.boot
spring-boot-starter-data-redis-reactive
+
+ it.ozimov
+ embedded-redis
+ 0.7.3
+ test
+
+
+ org.slf4j
+ slf4j-simple
+
+
+
org.noear
diff --git a/src/main/java/we/FizzAppContext.java b/src/main/java/we/FizzAppContext.java
index f9647a7..e8f48a1 100644
--- a/src/main/java/we/FizzAppContext.java
+++ b/src/main/java/we/FizzAppContext.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
package we;
import org.springframework.context.ConfigurableApplicationContext;
diff --git a/src/main/java/we/config/FlowControlConfig.java b/src/main/java/we/config/FlowControlConfig.java
new file mode 100644
index 0000000..4a087d8
--- /dev/null
+++ b/src/main/java/we/config/FlowControlConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.config;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import we.stats.FlowStat;
+
+/**
+ * @author hongqiaowei
+ */
+
+@ConditionalOnProperty(name = "flowControl", havingValue = "true")
+@Configuration
+public class FlowControlConfig {
+
+ @Bean
+ public FlowStat flowStat() {
+ return new FlowStat();
+ }
+}
diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java
new file mode 100644
index 0000000..a38cb97
--- /dev/null
+++ b/src/main/java/we/config/FlowStatSchedConfig.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.config;
+
+import com.alibaba.nacos.api.config.annotation.NacosValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import we.flume.clients.log4j2appender.LogService;
+import we.stats.FlowStat;
+import we.stats.ResourceTimeWindowStat;
+import we.stats.TimeWindowStat;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.stats.ratelimit.ResourceRateLimitConfigService;
+import we.util.Constants;
+import we.util.DateTimeUtils;
+import we.util.NetworkUtils;
+import we.util.ThreadContext;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author hongqiaowei
+ */
+
+@Configuration
+@EnableScheduling
+public class FlowStatSchedConfig extends SchedConfig {
+
+ private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
+
+ private static final String _ip = "\"ip\":";
+ private static final String _id = "\"id\":";
+ private static final String _resource = "\"resource\":";
+ private static final String _type = "\"type\":";
+ private static final String _start = "\"start\":";
+ private static final String _reqs = "\"reqs\":";
+ private static final String _completeReqs = "\"completeReqs\":";
+ private static final String _peakConcurrents = "\"peakConcurrents\":";
+ private static final String _reqPerSec = "\"reqPerSec\":";
+ private static final String _blockReqs = "\"blockReqs\":";
+ private static final String _totalBlockReqs = "\"totalBlockReqs\":";
+ private static final String _errors = "\"errors\":";
+ private static final String _avgRespTime = "\"avgRespTime\":";
+ private static final String _minRespTime = "\"minRespTime\":";
+ private static final String _maxRespTime = "\"maxRespTime\":";
+
+ @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
+ @Value("${flowControl:false}")
+ private boolean flowControl;
+
+ @Resource
+ private FlowStat flowStat;
+
+ @Resource
+ private ResourceRateLimitConfigService resourceRateLimitConfigService;
+
+ @NacosValue(value = "${flow-stat-sched.dest:redis}", autoRefreshed = true)
+ @Value("${flow-stat-sched.dest:redis}")
+ private String dest;
+
+ @NacosValue(value = "${flow-stat-sched.queue:fizz_resource_access_stat}", autoRefreshed = true)
+ @Value("${flow-stat-sched.queue:fizz_resource_access_stat}")
+ private String queue;
+
+ @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
+ private ReactiveStringRedisTemplate rt;
+
+ private final String ip = NetworkUtils.getServerIp();
+
+ private long startTimeSlot = 0;
+
+ private Map key2totalBlockMap = new HashMap<>();
+
+ @Scheduled(cron = "${flow-stat-sched.cron}")
+ public void sched() {
+
+ if (!flowControl) {
+ return;
+ }
+ if (startTimeSlot == 0) {
+ startTimeSlot = getRecentEndTimeSlot(flowStat);
+ return;
+ }
+ long st = System.currentTimeMillis();
+ long recentEndTimeSlot = getRecentEndTimeSlot(flowStat);
+ List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, recentEndTimeSlot, 10);
+ if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
+ log.info(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot) + " no flow stat data");
+ return;
+ }
+
+ key2totalBlockMap.clear();
+ resourceTimeWindowStats.forEach(rtws -> {
+ List wins = rtws.getWindows();
+ wins.forEach(w -> {
+ AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s",
+ ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0));
+ totalBlock.addAndGet(w.getBlockRequests());
+ });
+ });
+
+ resourceTimeWindowStats.forEach(
+ rtws -> {
+ String resource = rtws.getResourceId();
+ ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
+ int id = (config == null ? 0 : config.id);
+ int type;
+ if (ResourceRateLimitConfig.GLOBAL.equals(resource)) {
+ type = ResourceRateLimitConfig.Type.GLOBAL;
+ } else if (resource.charAt(0) == '/') {
+ type = ResourceRateLimitConfig.Type.API;
+ } else {
+ type = ResourceRateLimitConfig.Type.SERVICE;
+ }
+ List wins = rtws.getWindows();
+ wins.forEach(
+ w -> {
+ StringBuilder b = ThreadContext.getStringBuilder();
+ Long winStart = w.getStartTime();
+ BigDecimal rps = w.getRps();
+ double qps;
+ if (rps == null) {
+ qps = 0.00;
+ } else {
+ qps = rps.doubleValue();
+ }
+
+ AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
+ Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
+
+ b.append(Constants.Symbol.LEFT_BRACE);
+ b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA);
+ b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA);
+ b.append(_resource); toJsonStringValue(b, resource); b.append(Constants.Symbol.COMMA);
+ b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA);
+ b.append(_start); b.append(winStart); b.append(Constants.Symbol.COMMA);
+ b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA);
+ b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA);
+ b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA);
+ b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA);
+ b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA);
+ b.append(_totalBlockReqs); b.append(totalBlockReqs); b.append(Constants.Symbol.COMMA);
+ b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA);
+ b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA);
+ b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA);
+ b.append(_minRespTime); b.append(w.getMin());
+ b.append(Constants.Symbol.RIGHT_BRACE);
+ String msg = b.toString();
+ if ("kafka".equals(dest)) { // for internal use
+ log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
+ } else {
+ rt.convertAndSend(queue, msg).subscribe();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("report " + toDP19(winStart) + " win10: " + msg);
+ }
+ }
+ );
+ }
+ );
+
+ startTimeSlot = recentEndTimeSlot;
+ log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis()));
+ }
+
+ private long getRecentEndTimeSlot(FlowStat flowStat) {
+ long currentTimeSlot = flowStat.currentTimeSlotId();
+ int second = DateTimeUtils.from(currentTimeSlot).getSecond();
+ long interval;
+ if (second > 49) {
+ interval = second - 50;
+ } else if (second > 39) {
+ interval = second - 40;
+ } else if (second > 29) {
+ interval = second - 30;
+ } else if (second > 19) {
+ interval = second - 20;
+ } else if (second > 9) {
+ interval = second - 10;
+ } else if (second > 0) {
+ interval = second - 0;
+ } else {
+ interval = 0;
+ }
+ long recentEndTimeSlot = currentTimeSlot - interval * 1000;
+ return recentEndTimeSlot;
+ }
+
+ private String toDP19(long startTimeSlot) {
+ return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
+ }
+
+ private String toDP23(long startTimeSlot) {
+ return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP23);
+ }
+
+ private static void toJsonStringValue(StringBuilder b, String value) {
+ b.append(Constants.Symbol.DOUBLE_QUOTE).append(value).append(Constants.Symbol.DOUBLE_QUOTE);
+ }
+}
diff --git a/src/main/java/we/config/SchedConfig.java b/src/main/java/we/config/SchedConfig.java
new file mode 100644
index 0000000..80e0e8d
--- /dev/null
+++ b/src/main/java/we/config/SchedConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.config;
+
+import java.util.Date;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.Trigger;
+import org.springframework.scheduling.TriggerContext;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+/**
+ * @author hongqiaowei
+ */
+
+@ConfigurationProperties(prefix = "sched")
+public abstract class SchedConfig implements SchedulingConfigurer {
+
+ private int executors = 1;
+
+ public void setExecutors(int es) {
+ executors = es;
+ }
+
+ @Override
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+ taskRegistrar.setScheduler(taskScheduler());
+ taskRegistrar.addTriggerTask(new Runnable() {
+ public void run() {
+ }
+ }, new Trigger() {
+ @Override
+ public Date nextExecutionTime(TriggerContext triggerContext) {
+ return null;
+ }
+ });
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ public Executor taskScheduler() {
+ return Executors.newScheduledThreadPool(executors);
+ }
+}
diff --git a/src/main/java/we/controller/HealthController.java b/src/main/java/we/controller/CacheCheckController.java
similarity index 83%
rename from src/main/java/we/controller/HealthController.java
rename to src/main/java/we/controller/CacheCheckController.java
index 8e1c3ef..075b94e 100644
--- a/src/main/java/we/controller/HealthController.java
+++ b/src/main/java/we/controller/CacheCheckController.java
@@ -32,6 +32,7 @@ import reactor.core.publisher.Mono;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
import we.plugin.auth.GatewayGroupService;
+import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.JacksonUtils;
import javax.annotation.Resource;
@@ -41,7 +42,8 @@ import javax.annotation.Resource;
*/
@RestController
-public class HealthController {
+@RequestMapping("/admin/cache")
+public class CacheCheckController {
@Resource
private GatewayGroupService gatewayGroupService;
@@ -52,12 +54,8 @@ public class HealthController {
@Resource
private ApiConfigService apiConfigService;
- // add by hongqiaowei
- @GetMapping("/sysgc")
- public Mono sysgc(ServerWebExchange exchange) throws Exception {
- System.gc();
- return Mono.just("sysgc done");
- }
+ @Resource
+ private ResourceRateLimitConfigService resourceRateLimitConfigService;
@GetMapping("/gatewayGroups")
public Mono gatewayGroups(ServerWebExchange exchange) throws Exception {
@@ -78,4 +76,9 @@ public class HealthController {
public Mono apiConfigs(ServerWebExchange exchange) throws Exception {
return Mono.just(JacksonUtils.writeValueAsString(apiConfigService.serviceConfigMap));
}
+
+ @GetMapping("/resourceRateLimitConfigs")
+ public Mono resourceRateLimitConfigs(ServerWebExchange exchange) throws Exception {
+ return Mono.just(JacksonUtils.writeValueAsString(resourceRateLimitConfigService.getResourceRateLimitConfigMap()));
+ }
}
diff --git a/src/main/java/we/controller/FlowControlController.java b/src/main/java/we/controller/FlowControlController.java
new file mode 100644
index 0000000..b25563d
--- /dev/null
+++ b/src/main/java/we/controller/FlowControlController.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.controller;
+
+import com.alibaba.nacos.api.config.annotation.NacosValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import we.stats.FlowStat;
+import we.stats.ResourceTimeWindowStat;
+import we.stats.TimeWindowStat;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.util.Constants;
+import we.util.DateTimeUtils;
+import we.util.JacksonUtils;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author hongqiaowei
+ */
+
+@RestController
+@RequestMapping("/admin/flowStat")
+public class FlowControlController {
+
+ private static final Logger log = LoggerFactory.getLogger(FlowControlController.class);
+
+ @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
+ @Value("${flowControl:false}")
+ private boolean flowControl;
+
+ @Resource
+ private FlowStat flowStat;
+
+ @GetMapping("/globalConcurrentsRps")
+ public Mono globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) {
+
+ long concurrents = 0;
+ double rps = 0;
+ Map result = new HashMap<>();
+ result.put("concurrents", concurrents);
+ result.put("rps", rps);
+
+ if (flowControl) {
+ try {
+ long currentTimeSlot = flowStat.currentTimeSlotId();
+ long startTimeSlot = currentTimeSlot - recent * 1000;
+ TimeWindowStat timeWindowStat = null;
+ List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
+ if (wins == null || wins.isEmpty()) {
+ result.put("rps", 0);
+ } else {
+ concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
+ result.put("concurrents", concurrents);
+ timeWindowStat = wins.get(0).getWindows().get(0);
+ BigDecimal winrps = timeWindowStat.getRps();
+ if (winrps == null) {
+ rps = 0;
+ } else {
+ rps = winrps.doubleValue();
+ }
+ result.put("rps", rps);
+ }
+ if (log.isDebugEnabled()) {
+ long compReqs = -1;
+ if (timeWindowStat != null) {
+ compReqs = timeWindowStat.getCompReqs();
+ }
+ log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs);
+ }
+
+ } catch (Throwable t) {
+ log.error("get current global concurrents and rps error", t);
+ }
+ }
+
+ return Mono.just(JacksonUtils.writeValueAsString(result));
+ }
+
+ private String toDP19(long startTimeSlot) {
+ return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
+ }
+}
diff --git a/src/main/java/we/filter/AbsFlowControlFilter.java b/src/main/java/we/filter/AbsFlowControlFilter.java
new file mode 100644
index 0000000..4a86dc3
--- /dev/null
+++ b/src/main/java/we/filter/AbsFlowControlFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.filter;
+
+import com.alibaba.nacos.api.config.annotation.NacosValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import we.flume.clients.log4j2appender.LogService;
+import we.stats.FlowStat;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.stats.ratelimit.ResourceRateLimitConfigService;
+import we.util.Constants;
+import we.util.ThreadContext;
+import we.util.WebUtils;
+
+import javax.annotation.Resource;
+
+/**
+ * @author hongqiaowei
+ */
+
+public abstract class AbsFlowControlFilter extends ProxyAggrFilter {
+
+ protected static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class);
+
+ protected static final String exceed = " exceed ";
+ protected static final String concurrents = " concurrents ";
+ protected static final String orQps = " or qps ";
+ protected static final String currentTimeSlot = "currentTimeSlot";
+ protected static final String start = "start";
+
+ @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
+ @Value("${flowControl:false}")
+ protected boolean flowControl;
+
+ @Resource
+ protected ResourceRateLimitConfigService resourceRateLimitConfigService;
+
+ @Resource
+ protected FlowStat flowStat;
+
+ protected Mono generateExceedResponse(ServerWebExchange exchange, ResourceRateLimitConfig config) {
+ StringBuilder b = ThreadContext.getStringBuilder();
+ b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
+ b.append(exceed) .append(config.resource) .append(concurrents) .append(config.concurrents).append(orQps).append(config.qps);
+ log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
+
+ ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
+ ServerHttpResponse resp = exchange.getResponse();
+ resp.setStatusCode(HttpStatus.OK);
+ resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
+ return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
+ }
+
+ protected void inTheEnd(ServerWebExchange exchange, String resource, long start, long currentTimeSlot, boolean success) {
+ long spend = System.currentTimeMillis() - start;
+ flowStat.decrConcurrentRequest(resource, currentTimeSlot);
+ flowStat.addRequestRT(resource, currentTimeSlot, spend, success);
+ }
+}
diff --git a/src/main/java/we/filter/FilterExceptionHandlerConfig.java b/src/main/java/we/filter/FilterExceptionHandlerConfig.java
index a714090..6342a15 100644
--- a/src/main/java/we/filter/FilterExceptionHandlerConfig.java
+++ b/src/main/java/we/filter/FilterExceptionHandlerConfig.java
@@ -17,8 +17,8 @@
package we.filter;
-import java.net.URI;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@@ -32,10 +32,14 @@ import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.exception.RedirectException;
import we.exception.StopAndResponseException;
+import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.JacksonUtils;
+import we.util.ThreadContext;
import we.util.WebUtils;
+import java.net.URI;
+
/**
* @author hongqiaowei
*/
@@ -44,13 +48,14 @@ import we.util.WebUtils;
public class FilterExceptionHandlerConfig {
public static class FilterExceptionHandler implements WebExceptionHandler {
+ private static final Logger log = LoggerFactory.getLogger(FilterExceptionHandler.class);
private static final String filterExceptionHandler = "filterExceptionHandler";
@Override
public Mono handle(ServerWebExchange exchange, Throwable t) {
+ ServerHttpResponse resp = exchange.getResponse();
if (t instanceof StopAndResponseException) {
StopAndResponseException ex = (StopAndResponseException) t;
if (ex.getData() != null) {
- ServerHttpResponse resp = exchange.getResponse();
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(ex.getData().toString().getBytes())));
}
@@ -58,7 +63,6 @@ public class FilterExceptionHandlerConfig {
if (t instanceof RedirectException) {
RedirectException ex = (RedirectException) t;
if (ex.getRedirectUrl() != null) {
- ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.MOVED_PERMANENTLY);
resp.getHeaders().setLocation(URI.create(ex.getRedirectUrl()));
return Mono.empty();
@@ -66,7 +70,6 @@ public class FilterExceptionHandlerConfig {
}
if (t instanceof ExecuteScriptException) {
ExecuteScriptException ex = (ExecuteScriptException) t;
- ServerHttpResponse resp = exchange.getResponse();
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
String reqId = exchange.getRequest().getId();
@@ -78,13 +81,23 @@ public class FilterExceptionHandlerConfig {
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
- Mono vm = WebUtils.responseError(exchange, filterExceptionHandler, HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), t);
+ Mono vm;
+ Object fc = exchange.getAttributes().get(WebUtils.FILTER_CONTEXT);
+ if (fc == null) { // t came from flow control filter
+ StringBuilder b = ThreadContext.getStringBuilder();
+ WebUtils.request2stringBuilder(exchange, b);
+ log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
+ String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), exchange.getRequest().getId());
+ vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));
+ } else {
+ vm = WebUtils.responseError(exchange, filterExceptionHandler, HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), t);
+ }
return vm;
}
}
@Bean
- @Order(-2)
+ @Order(-10)
public FilterExceptionHandler filterExceptionHandler() {
return new FilterExceptionHandler();
}
diff --git a/src/main/java/we/filter/FlowControlFilter.java b/src/main/java/we/filter/FlowControlFilter.java
new file mode 100644
index 0000000..de2e6d7
--- /dev/null
+++ b/src/main/java/we/filter/FlowControlFilter.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.annotation.Order;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilterChain;
+
+import com.alibaba.nacos.api.config.annotation.NacosValue;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+import we.flume.clients.log4j2appender.LogService;
+import we.stats.BlockType;
+import we.stats.FlowStat;
+import we.stats.IncrRequestResult;
+import we.stats.ResourceConfig;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.stats.ratelimit.ResourceRateLimitConfigService;
+import we.util.WebUtils;
+
+/**
+ * @author hongqiaowei
+ */
+
+@Component(FlowControlFilter.FLOW_CONTROL_FILTER)
+@Order(-1)
+public class FlowControlFilter extends ProxyAggrFilter {
+
+ public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
+
+ private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
+
+ @NacosValue(value = "${flowControl:false}", autoRefreshed = true)
+ @Value("${flowControl:false}")
+ private boolean flowControl;
+
+ @Resource
+ private ResourceRateLimitConfigService resourceRateLimitConfigService;
+
+ @Resource
+ private FlowStat flowStat;
+
+ @Override
+ public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
+
+ if (flowControl) {
+ String service = WebUtils.getClientService(exchange);
+// String reqPath = WebUtils.getClientReqPath(exchange);
+ long currentTimeSlot = flowStat.currentTimeSlotId();
+ ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService
+ .getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
+ ResourceRateLimitConfig serviceConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
+ if (serviceConfig == null) {
+ serviceConfig = resourceRateLimitConfigService
+ .getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
+ }
+
+ // global
+ List resourceConfigs = new ArrayList<>();
+ ResourceConfig globalResCfg = new ResourceConfig(ResourceRateLimitConfig.GLOBAL, 0, 0);
+ if (globalConfig != null && globalConfig.isEnable()) {
+ globalResCfg.setMaxCon(globalConfig.concurrents);
+ globalResCfg.setMaxQPS(globalConfig.qps);
+ }
+ resourceConfigs.add(globalResCfg);
+
+ // service
+ ResourceConfig serviceResCfg = new ResourceConfig(service, 0, 0);
+ if (serviceConfig != null && serviceConfig.isEnable()) {
+ serviceResCfg.setMaxCon(serviceConfig.concurrents);
+ serviceResCfg.setMaxQPS(serviceConfig.qps);
+ }
+ resourceConfigs.add(serviceResCfg);
+
+ IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot);
+
+ if (result != null && !result.isSuccess()) {
+ if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
+ log.info("exceed {} flow limit, blocked by maximum concurrent requests",
+ result.getBlockedResourceId(), LogService.BIZ_ID, exchange.getRequest().getId());
+ } else {
+ log.info("exceed {} flow limit, blocked by maximum QPS", result.getBlockedResourceId(),
+ LogService.BIZ_ID, exchange.getRequest().getId());
+ }
+
+// ResourceRateLimitConfig config = result.getBlockedResourceId().equals(globalConfig.resource)
+// ? globalConfig
+// : serviceConfig;
+
+ ServerHttpResponse resp = exchange.getResponse();
+ resp.setStatusCode(HttpStatus.OK);
+ resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
+ return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
+ } else {
+ long start = System.currentTimeMillis();
+ return chain.filter(exchange).doFinally(s -> {
+ long rt = System.currentTimeMillis() - start;
+ if (s == SignalType.ON_COMPLETE) {
+ flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true);
+ } else {
+ flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false);
+ }
+ });
+ }
+ }
+
+ return chain.filter(exchange);
+ }
+}
diff --git a/src/main/java/we/filter/GlobalFlowControlFilter.java b/src/main/java/we/filter/GlobalFlowControlFilter.java
new file mode 100644
index 0000000..4710100
--- /dev/null
+++ b/src/main/java/we/filter/GlobalFlowControlFilter.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.filter;
+
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+import we.flume.clients.log4j2appender.LogService;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.util.JacksonUtils;
+import we.util.WebUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author hongqiaowei
+ */
+
+//@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
+//@Order(-4)
+public class GlobalFlowControlFilter extends AbsFlowControlFilter {
+
+ public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";
+
+ @Override
+ public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
+
+ if (flowControl) {
+
+ // Map traceMap = new HashMap<>();
+ LogService.setBizId(exchange.getRequest().getId());
+ long currentTimeSlot = flowStat.currentTimeSlotId();
+ // traceMap.put("currentTimeSlot", currentTimeSlot);
+
+ exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot);
+ ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
+ if (config.isEnable()) {
+ // traceMap.put("globalConfig", "enable conns " + config.concurrents + " and incr now");
+ boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps);
+ if (concurrentOrRpsExceed) {
+ // traceMap.put("globalConfigExceed", "true");
+ return generateExceedResponse(exchange, config);
+ }
+ } else {
+ // traceMap.put("noGlobalConfig", "incr now");
+ flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
+ }
+
+ // if (log.isDebugEnabled()) {
+ // log.debug(JacksonUtils.writeValueAsString(traceMap), LogService.BIZ_ID, exchange.getRequest().getId());
+ // }
+ // StringBuilder b = new StringBuilder();
+ // WebUtils.request2stringBuilder(exchange, b);
+ // b.append('\n');
+
+ long start = System.currentTimeMillis();
+ exchange.getAttributes().put(AbsFlowControlFilter.start, start);
+ return chain.filter(exchange)
+ // .doOnSuccess(
+ // r -> {
+ // // b.append(" succ ");
+ // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
+ // }
+ // )
+ // .doOnError(
+ // t -> {
+ // // b.append(" errs ");
+ // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
+ // }
+ // )
+ // .doOnCancel(
+ // () -> {
+ // // b.append(" cans ");
+ // // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
+ // }
+ // )
+ .doFinally(
+ s -> {
+ if (s == SignalType.ON_COMPLETE) {
+ // b.append(" comps ");
+ inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
+ } else {
+ // b.append(" " + s);
+ inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
+ }
+ // if (log.isDebugEnabled()) {
+ // log.debug(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
+ // }
+ }
+ );
+ }
+ return chain.filter(exchange);
+ }
+}
diff --git a/src/main/java/we/filter/PreFilter.java b/src/main/java/we/filter/PreFilter.java
index d2568c2..423d138 100644
--- a/src/main/java/we/filter/PreFilter.java
+++ b/src/main/java/we/filter/PreFilter.java
@@ -114,20 +114,20 @@ public class PreFilter extends ProxyAggrFilter {
}
private void afterAuth(ServerWebExchange exchange, ApiConfig ac) {
- String bs = null, bp = null;
+ String bs = null, bp;
if (ac == null) {
bs = WebUtils.getClientService(exchange);
bp = WebUtils.getClientReqPath(exchange);
} else {
if (ac.type != ApiConfig.Type.REVERSE_PROXY) {
bs = ac.backendService;
- bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
+ bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
if (bs != null) {
WebUtils.setBackendService(exchange, bs);
- WebUtils.setBackendPath(exchange, bp);
}
+ WebUtils.setBackendPath(exchange, bp);
}
private Mono chain(ServerWebExchange exchange, Mono m, PluginFilter pf) {
diff --git a/src/main/java/we/filter/ServiceFlowControlFilter.java b/src/main/java/we/filter/ServiceFlowControlFilter.java
new file mode 100644
index 0000000..55f15c8
--- /dev/null
+++ b/src/main/java/we/filter/ServiceFlowControlFilter.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.filter;
+
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+import we.stats.ratelimit.ResourceRateLimitConfig;
+import we.util.WebUtils;
+
+/**
+ * @author hongqiaowei
+ */
+
+//@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER)
+//@Order(-3)
+public class ServiceFlowControlFilter extends AbsFlowControlFilter {
+
+ public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";
+
+ @Override
+ public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
+
+ if (flowControl) {
+ long currentTimeSlot = exchange.getAttribute(AbsFlowControlFilter.currentTimeSlot);
+ String service = WebUtils.getClientService(exchange);
+ ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
+
+ if (config == null) {
+ config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
+ }
+ if (config == null || !config.isEnable()) {
+ flowStat.incrRequest(service, currentTimeSlot, null, null);
+ } else {
+ boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps);
+ if (concurrentOrRpsExceed) {
+ return generateExceedResponse(exchange, config);
+ }
+ }
+
+ long start = exchange.getAttribute(AbsFlowControlFilter.start);
+ return chain.filter(exchange)
+ // .doOnSuccess(
+ // r -> {
+ // inTheEnd(exchange, service, start, currentTimeSlot, true);
+ // }
+ // )
+ // .doOnError(
+ // t -> {
+ // inTheEnd(exchange, service, start, currentTimeSlot, false);
+ // }
+ // )
+ // .doOnCancel(
+ // () -> {
+ // inTheEnd(exchange, service, start, currentTimeSlot, false);
+ // }
+ // )
+ .doFinally(
+ s -> {
+ if (s == SignalType.ON_COMPLETE) {
+ inTheEnd(exchange, service, start, currentTimeSlot, true);
+ } else {
+ inTheEnd(exchange, service, start, currentTimeSlot, false);
+ }
+ }
+ );
+ }
+ return chain.filter(exchange);
+ }
+}
diff --git a/src/main/java/we/plugin/auth/GatewayGroup2appsToApiConfig.java b/src/main/java/we/plugin/auth/GatewayGroup2apiConfig.java
similarity index 97%
rename from src/main/java/we/plugin/auth/GatewayGroup2appsToApiConfig.java
rename to src/main/java/we/plugin/auth/GatewayGroup2apiConfig.java
index 504f503..5bbb085 100644
--- a/src/main/java/we/plugin/auth/GatewayGroup2appsToApiConfig.java
+++ b/src/main/java/we/plugin/auth/GatewayGroup2apiConfig.java
@@ -28,9 +28,9 @@ import java.util.Map;
* @author hongqiaowei
*/
-public class GatewayGroup2appsToApiConfig {
+public class GatewayGroup2apiConfig {
- private static final Logger log = LoggerFactory.getLogger(GatewayGroup2appsToApiConfig.class);
+ private static final Logger log = LoggerFactory.getLogger(GatewayGroup2apiConfig.class);
private Map> configMap = new HashMap<>(6);
diff --git a/src/main/java/we/plugin/auth/GatewayGroupService.java b/src/main/java/we/plugin/auth/GatewayGroupService.java
index 91d0002..2fb2034 100644
--- a/src/main/java/we/plugin/auth/GatewayGroupService.java
+++ b/src/main/java/we/plugin/auth/GatewayGroupService.java
@@ -34,6 +34,8 @@ import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* @author hongqiaowei
@@ -52,7 +54,7 @@ public class GatewayGroupService {
private Map oldGatewayGroupMap = new HashMap<>(6);
- public Set currentGatewayGroupSet = new HashSet<>(6);
+ public Set currentGatewayGroupSet = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet());
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
diff --git a/src/main/java/we/plugin/auth/ServiceConfig.java b/src/main/java/we/plugin/auth/ServiceConfig.java
index 27f22d0..f747a9e 100644
--- a/src/main/java/we/plugin/auth/ServiceConfig.java
+++ b/src/main/java/we/plugin/auth/ServiceConfig.java
@@ -46,7 +46,7 @@ public class ServiceConfig {
@JsonIgnore
public Map apiConfigMap = new HashMap<>(32);
- public Map> path2methodToApiConfigMapMap = new HashMap<>(6);
+ public Map> path2methodToApiConfigMapMap = new HashMap<>(6);
public ServiceConfig(String id) {
this.id = id;
@@ -54,36 +54,36 @@ public class ServiceConfig {
public void add(ApiConfig ac) {
apiConfigMap.put(ac.id, ac);
- EnumMap method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
+ EnumMap method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
method2apiConfigMap = new EnumMap<>(HttpMethod.class);
- GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
- gatewayGroup2appsToApiConfig.add(ac);
- method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
+ GatewayGroup2apiConfig gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
+ gatewayGroup2apiConfig.add(ac);
+ method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
path2methodToApiConfigMapMap.put(ac.path, method2apiConfigMap);
} else {
- GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
- if (gatewayGroup2appsToApiConfig == null) {
- gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
- method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
+ GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
+ if (gatewayGroup2apiConfig == null) {
+ gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
+ method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
}
- gatewayGroup2appsToApiConfig.add(ac);
+ gatewayGroup2apiConfig.add(ac);
}
log.info("add " + ac);
}
public void remove(ApiConfig ac) {
ApiConfig remove = apiConfigMap.remove(ac.id);
- Map method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
+ Map method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
log.info("no config to delete for " + ac.service + ' ' + ac.path);
} else {
- GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
- if (gatewayGroup2appsToApiConfig == null) {
+ GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
+ if (gatewayGroup2apiConfig == null) {
log.info("no config to delete for " + ac.service + ' ' + ac.method + ' ' + ac.path);
} else {
log.info(id + " remove " + ac);
- gatewayGroup2appsToApiConfig.remove(ac);
+ gatewayGroup2apiConfig.remove(ac);
}
}
}
@@ -91,22 +91,22 @@ public class ServiceConfig {
public void update(ApiConfig ac) {
ApiConfig prev = apiConfigMap.put(ac.id, ac);
log.info(prev + " is updated by " + ac + " in api config map");
- EnumMap method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
+ EnumMap method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
method2apiConfigMap = new EnumMap<>(HttpMethod.class);
- GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
- gatewayGroup2appsToApiConfig.add(ac);
- method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
+ GatewayGroup2apiConfig gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
+ gatewayGroup2apiConfig.add(ac);
+ method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
path2methodToApiConfigMapMap.put(ac.path, method2apiConfigMap);
} else {
- GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
- if (gatewayGroup2appsToApiConfig == null) {
- gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
- method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
- gatewayGroup2appsToApiConfig.add(ac);
+ GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
+ if (gatewayGroup2apiConfig == null) {
+ gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
+ method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
+ gatewayGroup2apiConfig.add(ac);
} else {
log.info(id + " update " + ac);
- gatewayGroup2appsToApiConfig.update(ac);
+ gatewayGroup2apiConfig.update(ac);
}
}
}
@@ -114,7 +114,7 @@ public class ServiceConfig {
@JsonIgnore
public ApiConfig getApiConfig(HttpMethod method, String path, String gatewayGroup, String app) {
// GatewayGroup2appsToApiConfig r = getApiConfig0(method, path);
- GatewayGroup2appsToApiConfig r = getApiConfig(method, path);
+ GatewayGroup2apiConfig r = getApiConfig(method, path);
if (r == null) {
return null;
}
@@ -124,19 +124,19 @@ public class ServiceConfig {
return r.get(gatewayGroup, app);
}
- private GatewayGroup2appsToApiConfig getApiConfig(HttpMethod method, String reqPath) {
+ private GatewayGroup2apiConfig getApiConfig(HttpMethod method, String reqPath) {
List matchPathPatterns = ThreadContext.getArrayList(mpps, String.class);
- Set>> es = path2methodToApiConfigMapMap.entrySet();
- for (Map.Entry> e : es) {
+ Set>> es = path2methodToApiConfigMapMap.entrySet();
+ for (Map.Entry> e : es) {
String pathPattern = e.getKey();
if (ApiConfig.isAntPathPattern(pathPattern)) {
if (antPathMatcher.match(pathPattern, reqPath)) {
matchPathPatterns.add(pathPattern);
}
} else if (reqPath.equals(pathPattern)) {
- return getGatewayGroup2appsToApiConfig(method, e.getValue());
+ return getGatewayGroup2apiConfig(method, e.getValue());
}
}
if (matchPathPatterns.isEmpty()) {
@@ -149,7 +149,7 @@ public class ServiceConfig {
"\nmatch patterns: " + matchPathPatterns +
"\nbest one: " + bestPattern);
}
- return getGatewayGroup2appsToApiConfig(method, path2methodToApiConfigMapMap.get(bestPattern));
+ return getGatewayGroup2apiConfig(method, path2methodToApiConfigMapMap.get(bestPattern));
}
}
@@ -174,8 +174,8 @@ public class ServiceConfig {
// }
// }
- private GatewayGroup2appsToApiConfig getGatewayGroup2appsToApiConfig(HttpMethod method, EnumMap method2apiConfigMap) {
- GatewayGroup2appsToApiConfig r = method2apiConfigMap.get(method);
+ private GatewayGroup2apiConfig getGatewayGroup2apiConfig(HttpMethod method, EnumMap method2apiConfigMap) {
+ GatewayGroup2apiConfig r = method2apiConfigMap.get(method);
if (r == null) {
return method2apiConfigMap.get(HttpMethod.X);
} else {
diff --git a/src/main/java/we/plugin/stat/StatPluginFilter.java b/src/main/java/we/plugin/stat/StatPluginFilter.java
index 9100a7c..ec1dc86 100644
--- a/src/main/java/we/plugin/stat/StatPluginFilter.java
+++ b/src/main/java/we/plugin/stat/StatPluginFilter.java
@@ -119,7 +119,7 @@ public class StatPluginFilter extends PluginFilter {
if (StringUtils.isBlank(fizzAccessStatTopic)) {
rt.convertAndSend(fizzAccessStatChannel, b.toString()).subscribe();
} else {
- log.info(b.toString(), LogService.HANDLE_STGY, LogService.toKF(fizzAccessStatTopic));
+ log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(fizzAccessStatTopic)); // for internal use
}
}
diff --git a/src/main/java/we/stats/BlockType.java b/src/main/java/we/stats/BlockType.java
new file mode 100644
index 0000000..70ce566
--- /dev/null
+++ b/src/main/java/we/stats/BlockType.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public enum BlockType {
+ /**
+ * Blocked by concurrent request rule
+ */
+ CONCURRENT_REQUEST,
+
+ /**
+ * Blocked by QPS
+ */
+ QPS;
+}
diff --git a/src/main/java/we/stats/FlowStat.java b/src/main/java/we/stats/FlowStat.java
new file mode 100644
index 0000000..3cc0180
--- /dev/null
+++ b/src/main/java/we/stats/FlowStat.java
@@ -0,0 +1,483 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import we.stats.BlockType;
+import we.util.Utils;
+
+/**
+ * Flow Statistic
+ *
+ * @author Francis Dong
+ *
+ */
+public class FlowStat {
+
+ private static final Logger log = LoggerFactory.getLogger(FlowStat.class);
+
+ /**
+ * Time slot interval in millisecond
+ */
+ public static long INTERVAL = 1000;
+
+ /**
+ * A string Resource ID as key
+ */
+ public ConcurrentMap resourceStats = new ConcurrentHashMap<>(100);
+
+ /**
+ * Retention time of statistic data
+ */
+ public static long RETENTION_TIME_IN_MINUTES = 10;
+
+ private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+ private Lock w = rwl.writeLock();
+
+ private ExecutorService pool = Executors.newFixedThreadPool(2);
+
+ public FlowStat() {
+ runScheduleJob();
+ }
+
+ private void runScheduleJob() {
+ pool.submit(new HousekeepJob(this));
+ pool.submit(new PeakConcurrentJob(this));
+ }
+
+ /**
+ * Update retention time
+ *
+ * @param retentionTimeInMinutes
+ */
+ public void updateRetentionTime(int retentionTimeInMinutes) {
+ RETENTION_TIME_IN_MINUTES = retentionTimeInMinutes;
+ }
+
+ /**
+ * Returns the current time slot ID
+ *
+ * @return
+ */
+ public long currentTimeSlotId() {
+ return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
+ }
+
+ /**
+ * Returns the time slot ID of the specified time
+ *
+ * @param timeMilli
+ * @return
+ */
+ public long getTimeSlotId(long timeMilli) {
+ return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
+ }
+
+ /**
+ * Increase concurrent request counter for given resources chain
+ *
+ * @param resourceConfigs Resource configurations
+ * @param curTimeSlotId current time slot ID, it should be generated by
+ * Flowstat.currentTimeSlotId()
+ * @return IncrRequestResult
+ */
+ public IncrRequestResult incrRequest(List resourceConfigs, long curTimeSlotId) {
+ if (resourceConfigs == null || resourceConfigs.size() == 0) {
+ return null;
+ }
+ w.lock();
+ try {
+ // check if exceed limit
+ for (ResourceConfig resourceConfig : resourceConfigs) {
+ long maxCon = resourceConfig.getMaxCon();
+ long maxQPS = resourceConfig.getMaxQPS();
+ if (maxCon > 0 || maxQPS > 0) {
+ ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
+ // check concurrent request
+ if (maxCon > 0) {
+ long n = resourceStat.getConcurrentRequests().get();
+ if (n >= maxCon) {
+ resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
+ return IncrRequestResult.block(resourceConfig.getResourceId(),
+ BlockType.CONCURRENT_REQUEST);
+ }
+ }
+
+ // check QPS
+ if (maxQPS > 0) {
+ long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
+ if (total >= maxQPS) {
+ resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
+ return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
+ }
+ }
+ }
+ }
+
+ // increase request and concurrent request
+ for (ResourceConfig resourceConfig : resourceConfigs) {
+ ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
+ long cons = resourceStat.getConcurrentRequests().incrementAndGet();
+ resourceStat.getTimeSlot(curTimeSlotId).updatePeakConcurrentReqeusts(cons);
+ resourceStat.getTimeSlot(curTimeSlotId).incr();
+ }
+ return IncrRequestResult.success();
+ } finally {
+ w.unlock();
+ }
+ }
+
+ /**
+ * Add request RT and Decrease concurrent request for given resources chain
+ *
+ * @param resourceConfigs
+ * @param timeSlotId
+ * @param rt
+ * @param isSuccess
+ */
+ public void addRequestRT(List resourceConfigs, long timeSlotId, long rt, boolean isSuccess) {
+ if (resourceConfigs == null || resourceConfigs.size() == 0) {
+ return;
+ }
+ for (int i = resourceConfigs.size() - 1; i >= 0; i--) {
+ ResourceStat resourceStat = getResourceStat(resourceConfigs.get(i).getResourceId());
+ resourceStat.decrConcurrentRequest(timeSlotId);
+ resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
+ }
+ }
+
+ /**
+ * Increase concurrent request counter of the specified resource
+ *
+ * @param resourceId Resource ID
+ * @param curTimeSlotId current time slot ID, it should be generated by
+ * Flowstat.currentTimeSlotId()
+ * @param maxCon Maximum concurrent request of the specified resource,
+ * null/zero/negative for no limit
+ * @param maxRPS Maximum RPS of the specified resource,
+ * null/zero/negative for no limit
+ * @return true if the request is not blocked; false if exceed the maximum
+ * concurrent request/RPS of the specified resource
+ */
+ public boolean incrRequest(String resourceId, long curTimeSlotId, Long maxCon, Long maxRPS) {
+ ResourceStat resourceStat = getResourceStat(resourceId);
+ boolean success = resourceStat.incrConcurrentRequest(curTimeSlotId, maxCon);
+ if (success) {
+ success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(resourceId + " incr req for current time slot " + curTimeSlotId + " with max con " + maxCon
+ + " and max rps " + maxRPS);
+ }
+ return success;
+ }
+
+ /**
+ * Decrease concurrent request of the specified resource of the specified time
+ * slot
+ *
+ * @param resourceId Resource ID
+ * @param timeSlotId TimeSlot ID
+ * @return
+ */
+ public void decrConcurrentRequest(String resourceId, long timeSlotId) {
+ if (resourceId == null) {
+ return;
+ }
+ ResourceStat resourceStat = getResourceStat(resourceId);
+
+ long conns = resourceStat.getConcurrentRequests().get();
+ if (conns == 0) {
+ if (log.isDebugEnabled()) {
+ StringBuilder b = new StringBuilder(256);
+ b.append(timeSlotId + " " + resourceId + " conns 0 before decr it").append('\n');
+ Utils.threadCurrentStack2stringBuilder(b);
+ log.debug(b.toString());
+ }
+ }
+
+ resourceStat.decrConcurrentRequest(timeSlotId);
+ }
+
+ /**
+ * Add request RT to the specified time slot counter
+ *
+ * @param resourceId Resource ID
+ * @param timeSlotId TimeSlot ID
+ * @param rt Response time of request
+ * @param isSuccess Whether the request is success or not
+ * @return
+ */
+ public void addRequestRT(String resourceId, long timeSlotId, long rt, boolean isSuccess) {
+ if (resourceId == null) {
+ return;
+ }
+ ResourceStat resourceStat = getResourceStat(resourceId);
+ resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
+ }
+
+ public ResourceStat getResourceStat(String resourceId) {
+ ResourceStat resourceStat = null;
+ if (resourceStats.containsKey(resourceId)) {
+ resourceStat = resourceStats.get(resourceId);
+ } else {
+ resourceStat = new ResourceStat(resourceId);
+ if (log.isDebugEnabled()) {
+ log.debug("no resource stat for " + resourceId + ", create one " + resourceStat);
+ }
+ ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat);
+ if (rs != null) {
+ resourceStat = rs;
+ }
+ }
+ return resourceStat;
+ }
+
+ /**
+ * Returns the current concurrent requests of the specified resource
+ *
+ *
+ * @param resourceId Resource ID
+ */
+ public long getConcurrentRequests(String resourceId) {
+ ResourceStat resourceStat = getResourceStat(resourceId);
+ return resourceStat.getConcurrentRequests().get();
+ }
+
+ /**
+ * Returns current TimeWindowStat of the specified resource
+ *
+ * @param resourceId
+ * @return
+ */
+ public TimeWindowStat getCurrentTimeWindowStat(String resourceId) {
+ long startTimeMilli = currentTimeSlotId();
+ return getTimeWindowStat(resourceId, startTimeMilli, startTimeMilli + 1000);
+ }
+
+ /**
+ * Returns current TimeWindowStat of the specified resource
+ *
+ * @param resourceId
+ * @param curTimeSlotId
+ * @return
+ */
+ @SuppressWarnings("unused")
+ private TimeWindowStat getCurrentTimeWindowStat(String resourceId, long curTimeSlotId) {
+ return getTimeWindowStat(resourceId, curTimeSlotId, curTimeSlotId + 1000);
+ }
+
+ /**
+ * Returns the TimeWindowStat of previous second of the specified time
+ *
+ * @param resourceId
+ * @param timeMilli
+ * @return
+ */
+ public TimeWindowStat getPreviousSecondStat(String resourceId, long timeMilli) {
+ long endTimeMilli = (timeMilli / INTERVAL) * INTERVAL;
+ return getTimeWindowStat(resourceId, endTimeMilli - 1000, endTimeMilli);
+ }
+
+ /**
+ * Returns the timeWindowStat of the specific resource in the specified time
+ * window [startTimeMilli, endTimeMilli)
+ *
+ * @param startTimeMilli included
+ * @param endTimeMilli excluded
+ * @return
+ */
+ public TimeWindowStat getTimeWindowStat(String resourceId, long startTimeMilli, long endTimeMilli) {
+ long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
+ long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
+
+ if (startSlotId == endSlotId) {
+ endSlotId = endSlotId + INTERVAL;
+ }
+ if (resourceStats.containsKey(resourceId)) {
+ ResourceStat resourceStat = resourceStats.get(resourceId);
+ return resourceStat.getTimeWindowStat(startSlotId, endSlotId);
+ }
+ return null;
+ }
+
+ /**
+ * Returns the ResourceTimeWindowStat list in the specified time window
+ * [startTimeMilli, endTimeMilli), The time slot unit is one second
+ *
+ * @param resourceId optional, returns ResourceSlot list of all resources
+ * while resourceId is null
+ * @param startTimeMilli
+ * @param endTimeMilli
+ * @return
+ */
+ @SuppressWarnings("unused")
+ public List getResourceTimeWindowStats(String resourceId, long startTimeMilli,
+ long endTimeMilli) {
+ return this.getResourceTimeWindowStats(resourceId, startTimeMilli, endTimeMilli, 1);
+ }
+
+ /**
+ * Returns the ResourceTimeWindow list in the specified time window
+ * [startTimeMilli, endTimeMilli)
+ *
+ * @param resourceId optional, returns ResourceTimeWindowStat list of all
+ * resources while resourceId is null
+ * @param startTimeMilli
+ * @param endTimeMilli
+ * @param slotIntervalInSec interval of custom time slot in millisecond, such as
+ * 60 for 1 minutes
+ * @return
+ */
+ @SuppressWarnings("unused")
+ public List getResourceTimeWindowStats(String resourceId, long startTimeMilli,
+ long endTimeMilli, long slotIntervalInSec) {
+ List list = new ArrayList<>();
+ long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
+ long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
+
+ if (startSlotId == endSlotId) {
+ endSlotId = endSlotId + INTERVAL;
+ }
+ if (slotIntervalInSec < 1 || (endSlotId - startSlotId) / 1000 < slotIntervalInSec) {
+ return list;
+ }
+ long slotInterval = slotIntervalInSec * 1000;
+
+ if (resourceId == null) {
+ Set> entrys = resourceStats.entrySet();
+ for (Entry entry : entrys) {
+ String rid = entry.getKey();
+ ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid);
+ long end = startSlotId + slotInterval;
+ for (long start = startSlotId; end <= endSlotId;) {
+ TimeWindowStat tws = getTimeWindowStat(rid, start, end);
+ if (tws != null) {
+ resourceWin.getWindows().add(tws);
+ }
+ start += slotInterval;
+ end += slotInterval;
+ }
+ if (resourceWin.getWindows().size() > 0) {
+ list.add(resourceWin);
+ }
+ }
+ } else {
+ ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(resourceId);
+ long end = startSlotId + slotInterval;
+ for (long start = startSlotId; end <= endSlotId;) {
+ TimeWindowStat tws = getTimeWindowStat(resourceId, start, end);
+ if (tws != null) {
+ resourceWin.getWindows().add(tws);
+ }
+ start += slotInterval;
+ end += slotInterval;
+ }
+ if (resourceWin.getWindows().size() > 0) {
+ list.add(resourceWin);
+ }
+ }
+ return list;
+ }
+
+ class HousekeepJob implements Runnable {
+
+ private FlowStat stat;
+
+ public HousekeepJob(FlowStat stat) {
+ this.stat = stat;
+ }
+
+ @Override
+ public void run() {
+ long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
+ long lastSlotId = stat.currentTimeSlotId() - n;
+ while (true) {
+ // log.debug("housekeeping start");
+ long slotId = stat.currentTimeSlotId() - n;
+ for (long i = lastSlotId; i < slotId;) {
+ Set> entrys = stat.resourceStats.entrySet();
+ for (Entry entry : entrys) {
+ String resourceId = entry.getKey();
+ ConcurrentMap timeSlots = entry.getValue().getTimeSlots();
+ // log.debug("housekeeping remove slot: resourceId={} slotId=={}", resourceId,
+ // i);
+ timeSlots.remove(i);
+ }
+ i = i + FlowStat.INTERVAL;
+ }
+ lastSlotId = slotId;
+ // log.debug("housekeeping done");
+ try {
+ Thread.sleep(60 * 1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ class PeakConcurrentJob implements Runnable {
+
+ private FlowStat stat;
+
+ public PeakConcurrentJob(FlowStat stat) {
+ this.stat = stat;
+ }
+
+ @Override
+ public void run() {
+ Long lastTimeSlotId = null;
+ while (true) {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) {
+ // log.debug("PeakConcurrentJob start");
+ Set> entrys = stat.resourceStats.entrySet();
+ for (Entry entry : entrys) {
+ String resourceId = entry.getKey();
+ // log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,
+ // curTimeSlotId);
+ entry.getValue().getTimeSlot(curTimeSlotId);
+ }
+ lastTimeSlotId = curTimeSlotId;
+ // log.debug("PeakConcurrentJob done");
+ }
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/we/stats/IncrRequestResult.java b/src/main/java/we/stats/IncrRequestResult.java
new file mode 100644
index 0000000..2025200
--- /dev/null
+++ b/src/main/java/we/stats/IncrRequestResult.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class IncrRequestResult {
+
+ /**
+ * true if success, otherwise false
+ */
+ private boolean success;
+
+ /**
+ * Resource ID that causes block
+ */
+ private String blockedResourceId;
+
+ /**
+ * block type
+ */
+ private BlockType blockType;
+
+ public static IncrRequestResult success() {
+ return new IncrRequestResult(true, null, null);
+ }
+
+ public static IncrRequestResult block(String resourceId, BlockType blockType) {
+ return new IncrRequestResult(false, resourceId, blockType);
+ }
+
+ public IncrRequestResult(boolean success, String resourceId, BlockType blockType) {
+ this.success = success;
+ this.blockedResourceId = resourceId;
+ this.blockType = blockType;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public String getBlockedResourceId() {
+ return blockedResourceId;
+ }
+
+ public void setBlockedResourceId(String blockedResourceId) {
+ this.blockedResourceId = blockedResourceId;
+ }
+
+ public BlockType getBlockType() {
+ return blockType;
+ }
+
+ public void setBlockType(BlockType blockType) {
+ this.blockType = blockType;
+ }
+
+}
diff --git a/src/main/java/we/stats/ResourceConfig.java b/src/main/java/we/stats/ResourceConfig.java
new file mode 100644
index 0000000..8faab86
--- /dev/null
+++ b/src/main/java/we/stats/ResourceConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class ResourceConfig {
+ /**
+ * Resouce ID
+ */
+ private String resourceId;
+
+ /**
+ * Maximum concurrent request, zero or negative for no limit
+ */
+ private long maxCon;
+
+ /**
+ * Maximum QPS, zero or negative for no limit
+ */
+ private long maxQPS;
+
+ public ResourceConfig(String resourceId, long maxCon, long maxQPS) {
+ this.resourceId = resourceId;
+ this.maxCon = maxCon;
+ this.maxQPS = maxQPS;
+ }
+
+ public ResourceConfig() {
+ }
+
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public long getMaxCon() {
+ return maxCon;
+ }
+
+ public void setMaxCon(long maxCon) {
+ this.maxCon = maxCon;
+ }
+
+ public long getMaxQPS() {
+ return maxQPS;
+ }
+
+ public void setMaxQPS(long maxQPS) {
+ this.maxQPS = maxQPS;
+ }
+
+}
diff --git a/src/main/java/we/stats/ResourceStat.java b/src/main/java/we/stats/ResourceStat.java
new file mode 100644
index 0000000..c48fb1a
--- /dev/null
+++ b/src/main/java/we/stats/ResourceStat.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+import java.math.BigDecimal;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class ResourceStat {
+
+ private static final Logger log = LoggerFactory.getLogger(ResourceStat.class);
+
+ /**
+ * Resource ID
+ */
+ private String resourceId;
+
+ /**
+ * Request count of time slot, the beginning timestamp(timeId) as key
+ */
+ private ConcurrentMap timeSlots = new ConcurrentHashMap<>(100);
+
+ /**
+ * Concurrent requests
+ */
+ private AtomicLong concurrentRequests = new AtomicLong(0);
+
+ private ReentrantReadWriteLock rwl1 = new ReentrantReadWriteLock();
+ private ReentrantReadWriteLock rwl2 = new ReentrantReadWriteLock();
+ private Lock w1 = rwl1.writeLock();
+ private Lock w2 = rwl2.writeLock();
+
+ public ResourceStat(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ /**
+ * Returns Time slot of the specified time slot ID
+ *
+ * @param timeSlotId
+ * @return
+ */
+ public TimeSlot getTimeSlot(long timeSlotId) {
+ if (timeSlots.containsKey(timeSlotId)) {
+ return timeSlots.get(timeSlotId);
+ } else {
+ TimeSlot timeSlot = new TimeSlot(timeSlotId);
+ timeSlot.setPeakConcurrentReqeusts(this.concurrentRequests.get());
+ TimeSlot old = timeSlots.putIfAbsent(timeSlotId, timeSlot);
+ if (old != null) {
+ return old;
+ } else {
+ return timeSlot;
+ }
+ }
+ }
+
+ /**
+ * Increase concurrent request counter of the resource
+ *
+ * @param timeSlotId
+ * @param maxCon
+ * @return false if exceed the maximum concurrent request of the specified
+ * resource
+ */
+ public boolean incrConcurrentRequest(long timeSlotId, Long maxCon) {
+ w1.lock();
+ try {
+ boolean isExceeded = false;
+ if (maxCon != null && maxCon.intValue() > 0) {
+ long n = this.concurrentRequests.get();
+ if (n >= maxCon.longValue()) {
+ isExceeded = true;
+ this.incrBlockRequestToTimeSlot(timeSlotId);
+ } else {
+ long conns = this.concurrentRequests.incrementAndGet();
+ this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
+ }
+ } else {
+ long conns = this.concurrentRequests.incrementAndGet();
+ this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
+ }
+ return !isExceeded;
+ } finally {
+ w1.unlock();
+ }
+ }
+
+ /**
+ * Decrease concurrent request counter of the resource
+ *
+ */
+ public void decrConcurrentRequest(long timeSlotId) {
+ long conns = this.concurrentRequests.decrementAndGet();
+ this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
+ }
+
+ /**
+ * Increase block request to the specified time slot
+ *
+ */
+ public void incrBlockRequestToTimeSlot(long timeSlotId) {
+ this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet();
+ }
+
+ /**
+ * Add request to the specified time slot
+ *
+ * @param timeSlotId
+ * @return false if exceed the maximum RPS of the specified resource
+ */
+ public boolean incrRequestToTimeSlot(long timeSlotId, Long maxRPS) {
+ w2.lock();
+ try {
+ boolean isExceeded = false;
+ if (maxRPS != null && maxRPS.intValue() > 0) {
+// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId);
+// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) {
+// isExceeded = true;
+// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
+// }
+
+ // time slot unit is one second
+ long total = this.getTimeSlot(timeSlotId).getCounter().get();
+ long max = Long.valueOf(maxRPS);
+ if (total >= max) {
+ isExceeded = true;
+ this.incrBlockRequestToTimeSlot(timeSlotId);
+ this.decrConcurrentRequest(timeSlotId);
+ } else {
+ this.getTimeSlot(timeSlotId).incr();
+ }
+ } else {
+ this.getTimeSlot(timeSlotId).incr();
+ }
+ return !isExceeded;
+ } finally {
+ w2.unlock();
+ }
+ }
+
+ /**
+ * Add request RT to the specified time slot
+ *
+ * @param timeSlotId
+ * @param rt response time of the request
+ * @param isSuccess Whether the request is success or not
+ * @return
+ */
+ public void addRequestRT(long timeSlotId, long rt, boolean isSuccess) {
+ this.getTimeSlot(timeSlotId).addRequestRT(rt, isSuccess);
+ }
+
+ /**
+ * Returns statistic of the specified time window
+ *
+ * @param startSlotId
+ * @param endSlotId
+ * @return
+ */
+ public TimeWindowStat getTimeWindowStat(long startSlotId, long endSlotId) {
+ TimeWindowStat tws = new TimeWindowStat();
+
+ tws.setStartTime(startSlotId);
+ tws.setEndTime(endSlotId);
+
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ long totalReqs = 0;
+ long totalRt = 0;
+ long peakConcurrences = 0;
+ long errors = 0;
+ long blockReqs = 0;
+ long compReqs = 0;
+ for (long i = startSlotId; i < endSlotId;) {
+ if (timeSlots.containsKey(i)) {
+ TimeSlot timeSlot = timeSlots.get(i);
+ min = timeSlot.getMin() < min ? timeSlot.getMin() : min;
+ max = timeSlot.getMax() > max ? timeSlot.getMax() : max;
+ peakConcurrences = timeSlot.getPeakConcurrentReqeusts() > peakConcurrences
+ ? timeSlot.getPeakConcurrentReqeusts()
+ : peakConcurrences;
+ totalReqs = totalReqs + timeSlot.getCounter().get();
+ totalRt = totalRt + timeSlot.getTotalRt().get();
+ errors = errors + timeSlot.getErrors().get();
+ blockReqs = blockReqs + timeSlot.getBlockRequests().get();
+ compReqs = compReqs + timeSlot.getCompReqs().get();
+ }
+ i = i + FlowStat.INTERVAL;
+ }
+ tws.setMin(min == Long.MAX_VALUE ? null : min);
+ tws.setMax(max == Long.MIN_VALUE ? null : max);
+ tws.setPeakConcurrentReqeusts(peakConcurrences);
+ tws.setTotal(totalReqs);
+ tws.setErrors(errors);
+ tws.setBlockRequests(blockReqs);
+ tws.setCompReqs(compReqs);
+
+ if (compReqs > 0) {
+ tws.setAvgRt(totalRt / compReqs);
+ }
+
+ if (totalReqs > 0) {
+ BigDecimal nsec = new BigDecimal(endSlotId - startSlotId).divide(new BigDecimal(1000), 5,
+ BigDecimal.ROUND_HALF_UP);
+ BigDecimal rps = new BigDecimal(totalReqs).divide(nsec, 5, BigDecimal.ROUND_HALF_UP);
+
+ if (rps.compareTo(new BigDecimal(10)) >= 0) {
+ rps = rps.setScale(0, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
+ } else {
+ rps = rps.setScale(2, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
+ }
+ tws.setRps(rps);
+ }
+
+ return tws;
+ }
+
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public ConcurrentMap getTimeSlots() {
+ return timeSlots;
+ }
+
+ public void setTimeSlots(ConcurrentMap timeSlots) {
+ this.timeSlots = timeSlots;
+ }
+
+ public AtomicLong getConcurrentRequests() {
+ return concurrentRequests;
+ }
+
+ public void setConcurrentRequests(AtomicLong concurrentRequests) {
+ this.concurrentRequests = concurrentRequests;
+ }
+
+}
diff --git a/src/main/java/we/stats/ResourceTimeWindowStat.java b/src/main/java/we/stats/ResourceTimeWindowStat.java
new file mode 100644
index 0000000..8cdcbb4
--- /dev/null
+++ b/src/main/java/we/stats/ResourceTimeWindowStat.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package we.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class ResourceTimeWindowStat {
+
+ /**
+ * Resource ID
+ */
+ private String resourceId;
+
+ private List windows = new ArrayList<>();
+
+ public ResourceTimeWindowStat(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public List getWindows() {
+ return windows;
+ }
+
+ public void setWindows(List windows) {
+ this.windows = windows;
+ }
+
+}
diff --git a/src/main/java/we/stats/TimeSlot.java b/src/main/java/we/stats/TimeSlot.java
new file mode 100644
index 0000000..2561e0f
--- /dev/null
+++ b/src/main/java/we/stats/TimeSlot.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package we.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class TimeSlot {
+
+ /**
+ * Time slot start timestamp as ID
+ */
+ private long id;
+
+ /**
+ * Request counter
+ */
+ private AtomicLong counter = new AtomicLong();
+
+ /**
+ * Error request counter
+ */
+ private AtomicLong errors = new AtomicLong();
+
+ /**
+ * Minimum response time
+ */
+ private long min = Long.MAX_VALUE;
+
+ /**
+ * Maximum response time
+ */
+ private long max = Long.MIN_VALUE;
+
+ /**
+ * Total response time
+ */
+ private AtomicLong totalRt = new AtomicLong(0);
+
+ /**
+ * Completed Request counter
+ */
+ private AtomicLong compReqs = new AtomicLong();
+
+
+ /**
+ * Peak concurrent requests
+ */
+ private long peakConcurrentReqeusts;
+
+ /**
+ * Block requests
+ */
+ private AtomicLong blockRequests = new AtomicLong(0);
+
+ public TimeSlot(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * Add request to time slot
+ *
+ */
+ public void incr() {
+ counter.incrementAndGet();
+ }
+
+ /**
+ * Add request RT information to time slot
+ *
+ * @param rt
+ * @param isSuccess Whether the request is success or not
+ */
+ public synchronized void addRequestRT(long rt, boolean isSuccess) {
+ totalRt.addAndGet(rt);
+ compReqs.incrementAndGet();
+ if (!isSuccess) {
+ errors.incrementAndGet();
+ }
+ min = rt < min ? rt : min;
+ max = rt > max ? rt : max;
+ }
+
+ /**
+ * Update peak concurrent requests of this time slot
+ *
+ * @param concurrentRequests Current concurrent requests
+ */
+ public synchronized void updatePeakConcurrentReqeusts(long concurrentRequests) {
+ peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests
+ : peakConcurrentReqeusts;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public AtomicLong getCounter() {
+ return counter;
+ }
+
+ public void setCounter(AtomicLong counter) {
+ this.counter = counter;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public void setMin(long min) {
+ this.min = min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public void setMax(long max) {
+ this.max = max;
+ }
+
+ public AtomicLong getTotalRt() {
+ return totalRt;
+ }
+
+ public void setTotalRt(AtomicLong totalRt) {
+ this.totalRt = totalRt;
+ }
+
+ public long getPeakConcurrentReqeusts() {
+ return peakConcurrentReqeusts;
+ }
+
+ public void setPeakConcurrentReqeusts(long peakConcurrentReqeusts) {
+ this.peakConcurrentReqeusts = peakConcurrentReqeusts;
+ }
+
+ public AtomicLong getErrors() {
+ return errors;
+ }
+
+ public void setErrors(AtomicLong errors) {
+ this.errors = errors;
+ }
+
+ public AtomicLong getBlockRequests() {
+ return blockRequests;
+ }
+
+ public void setBlockRequests(AtomicLong blockRequests) {
+ this.blockRequests = blockRequests;
+ }
+
+ public AtomicLong getCompReqs() {
+ return compReqs;
+ }
+
+ public void setCompReqs(AtomicLong compReqs) {
+ this.compReqs = compReqs;
+ }
+
+}
diff --git a/src/main/java/we/stats/TimeWindowStat.java b/src/main/java/we/stats/TimeWindowStat.java
new file mode 100644
index 0000000..796f4e2
--- /dev/null
+++ b/src/main/java/we/stats/TimeWindowStat.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package we.stats;
+
+import java.math.BigDecimal;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class TimeWindowStat {
+
+ /**
+ * Start time of time window,[startTime,endTime)
+ */
+ private Long startTime;
+
+ /**
+ * End time of time window, [startTime,endTime)
+ */
+ private Long endTime;
+
+ /**
+ * Minimum response time
+ */
+ private Long min;
+
+ /**
+ * Maximum response time
+ */
+ private Long max;
+
+ /**
+ * Average response time
+ */
+ private Long avgRt;
+
+ /**
+ * Total requests
+ */
+ private Long total;
+
+ /**
+ * Completed requests
+ */
+ private Long compReqs;
+
+ /**
+ * Total error requests
+ */
+ private Long errors;
+
+ /**
+ * the average RPS(Requests Per Second) of time window
+ */
+ private BigDecimal rps;
+
+ /**
+ * Peak concurrent requests of the time window
+ */
+ private Long peakConcurrentReqeusts;
+
+ /**
+ * Block requests
+ */
+ private Long blockRequests;
+
+ public Long getBlockRequests() {
+ return blockRequests;
+ }
+
+ public void setBlockRequests(Long blockRequests) {
+ this.blockRequests = blockRequests;
+ }
+
+ public Long getPeakConcurrentReqeusts() {
+ return peakConcurrentReqeusts;
+ }
+
+ public void setPeakConcurrentReqeusts(Long peakConcurrentReqeusts) {
+ this.peakConcurrentReqeusts = peakConcurrentReqeusts;
+ }
+
+ public Long getErrors() {
+ return errors;
+ }
+
+ public void setErrors(Long errors) {
+ this.errors = errors;
+ }
+
+ public Long getMin() {
+ return min;
+ }
+
+ public void setMin(Long min) {
+ this.min = min;
+ }
+
+ public Long getMax() {
+ return max;
+ }
+
+ public void setMax(Long max) {
+ this.max = max;
+ }
+
+ public BigDecimal getRps() {
+ return rps;
+ }
+
+ public void setRps(BigDecimal rps) {
+ this.rps = rps;
+ }
+
+ public Long getAvgRt() {
+ return avgRt;
+ }
+
+ public void setAvgRt(Long avgRt) {
+ this.avgRt = avgRt;
+ }
+
+ public Long getTotal() {
+ return total;
+ }
+
+ public void setTotal(Long total) {
+ this.total = total;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Long startTime) {
+ this.startTime = startTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public Long getCompReqs() {
+ return compReqs;
+ }
+
+ public void setCompReqs(Long compReqs) {
+ this.compReqs = compReqs;
+ }
+
+}
diff --git a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java
new file mode 100644
index 0000000..596538a
--- /dev/null
+++ b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats.ratelimit;
+
+import we.util.JacksonUtils;
+
+/**
+ * @author hongqiaowei
+ */
+
+public class ResourceRateLimitConfig {
+
+ public static interface Type {
+ static final byte GLOBAL = 1;
+ static final byte SERVICE_DEFAULT = 2;
+ static final byte SERVICE = 3;
+ static final byte API = 4;
+ }
+
+ public static final int DELETED = 1;
+
+ public static final String GLOBAL = "_global";
+
+ public static final String SERVICE_DEFAULT = "service_default";
+
+ private static final int ENABLE = 1;
+
+ private static final int UNABLE = 0;
+
+ public int isDeleted = 0;
+
+ public int id;
+
+ private boolean enable = true;
+
+ public String resource;
+
+ public byte type;
+
+ public long qps;
+
+ public long concurrents;
+
+ public String responseType;
+
+ public String responseContent;
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(int v) {
+ if (v == ENABLE) {
+ enable = true;
+ } else {
+ enable = false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return JacksonUtils.writeValueAsString(this);
+ }
+}
diff --git a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java
new file mode 100644
index 0000000..1289a9c
--- /dev/null
+++ b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats.ratelimit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import we.config.AggregateRedisConfig;
+import we.flume.clients.log4j2appender.LogService;
+import we.util.JacksonUtils;
+import we.util.ReactorUtils;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author hongqiaowei
+ */
+
+@Service
+public class ResourceRateLimitConfigService {
+
+ private static final Logger log = LoggerFactory.getLogger(ResourceRateLimitConfigService.class);
+
+ private static final String fizzRateLimit = "fizz_rate_limit";
+
+ private static final String fizzRateLimitChannel = "fizz_rate_limit_channel";
+
+ private Map resourceRateLimitConfigMap = new HashMap<>(32);
+
+ private Map oldResourceRateLimitConfigMap = new HashMap<>(32);
+
+ @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
+ private ReactiveStringRedisTemplate rt;
+
+ @PostConstruct
+ public void init() throws Throwable {
+ final Throwable[] throwable = new Throwable[1];
+ Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzRateLimit)
+ .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
+ log.info(null, t);
+ })
+ .concatMap(e -> {
+ Object k = e.getKey();
+ if (k == ReactorUtils.OBJ) {
+ return Flux.just(e);
+ }
+ Object v = e.getValue();
+ log.info("rateLimitConfig: " + v.toString(), LogService.BIZ_ID, k.toString());
+ String json = (String) v;
+ try {
+ ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
+ oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
+ updateResourceRateLimitConfigMap(rrlc);
+ return Flux.just(e);
+ } catch (Throwable t) {
+ throwable[0] = t;
+ log.info(json, t);
+ return Flux.error(t);
+ }
+ }).blockLast())).flatMap(
+ e -> {
+ if (throwable[0] != null) {
+ return Mono.error(throwable[0]);
+ }
+ return lsnResourceRateLimitConfigChange();
+ }
+ ).block();
+ if (error != ReactorUtils.EMPTY_THROWABLE) {
+ throw error;
+ }
+ }
+
+ private Mono lsnResourceRateLimitConfigChange() {
+ final Throwable[] throwable = new Throwable[1];
+ final boolean[] b = {false};
+ rt.listenToChannel(fizzRateLimitChannel).doOnError(t -> {
+ throwable[0] = t;
+ b[0] = false;
+ log.error("lsn " + fizzRateLimitChannel, t);
+ }).doOnSubscribe(
+ s -> {
+ b[0] = true;
+ log.info("success to lsn on " + fizzRateLimitChannel);
+ }
+ ).doOnNext(msg -> {
+ String json = msg.getMessage();
+ log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
+ try {
+ ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
+ ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);
+ if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) {
+ resourceRateLimitConfigMap.remove(r.resource);
+ }
+ updateResourceRateLimitConfigMap(rrlc);
+ if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) {
+ oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
+ }
+ } catch (Throwable t) {
+ log.info(json, t);
+ }
+ }).subscribe();
+ Throwable t = throwable[0];
+ while (!b[0]) {
+ if (t != null) {
+ return Mono.error(t);
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ } catch (InterruptedException e) {
+ return Mono.error(e);
+ }
+ }
+ }
+ return Mono.just(ReactorUtils.EMPTY_THROWABLE);
+ }
+
+ private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc) {
+ if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) {
+ ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource);
+ log.info("remove " + removedRrlc);
+ } else {
+ ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.resource);
+ resourceRateLimitConfigMap.put(rrlc.resource, rrlc);
+ if (existRrlc == null) {
+ log.info("add " + rrlc);
+ } else {
+ log.info("update " + existRrlc + " with " + rrlc);
+ }
+ }
+ }
+
+ public void setReactiveStringRedisTemplate(ReactiveStringRedisTemplate rt) {
+ this.rt = rt;
+ }
+
+ public ResourceRateLimitConfig getResourceRateLimitConfig(String resource) {
+ return resourceRateLimitConfigMap.get(resource);
+ }
+
+ public Map getResourceRateLimitConfigMap() {
+ return resourceRateLimitConfigMap;
+ }
+}
diff --git a/src/main/java/we/util/Utils.java b/src/main/java/we/util/Utils.java
index 709d643..17c5f1f 100644
--- a/src/main/java/we/util/Utils.java
+++ b/src/main/java/we/util/Utils.java
@@ -18,6 +18,7 @@
package we.util;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Consts;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -83,4 +84,13 @@ public abstract class Utils {
ca[0] += 32;
return String.valueOf(ca);
}
+
+ public static void threadCurrentStack2stringBuilder(StringBuilder b) {
+ StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
+ if (stackTraces != null) {
+ for (int i = 0; i < stackTraces.length; i++) {
+ b.append(stackTraces[i]).append(Constants.Symbol.LF);
+ }
+ }
+ }
}
diff --git a/src/test/java/we/redis/RedisProperties.java b/src/test/java/we/redis/RedisProperties.java
new file mode 100644
index 0000000..a6b449f
--- /dev/null
+++ b/src/test/java/we/redis/RedisProperties.java
@@ -0,0 +1,54 @@
+package we.redis;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+
+/**
+ * @author hongqiaowei
+ */
+
+@TestConfiguration
+public class RedisProperties {
+
+ private String host;
+ private int port;
+ private int database;
+
+ public RedisProperties(
+ @Value("${embeded.redis.port}") int port,
+ @Value("${embeded.redis.host}") String host,
+ @Value("${embeded.redis.database}") int database) {
+ this.port = port;
+ this.host = host;
+ this.database = database;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int redisPort) {
+ this.port = redisPort;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String redisHost) {
+ this.host = redisHost;
+ }
+
+ public int getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(int database) {
+ this.database = database;
+ }
+
+ @Override
+ public String toString() {
+ return "redis:[host:" + host + ",port:" + port + ",database:" + database + "]";
+ }
+}
diff --git a/src/test/java/we/redis/RedisServerConfiguration.java b/src/test/java/we/redis/RedisServerConfiguration.java
new file mode 100644
index 0000000..7736ed6
--- /dev/null
+++ b/src/test/java/we/redis/RedisServerConfiguration.java
@@ -0,0 +1,34 @@
+package we.redis;
+
+import org.springframework.boot.test.context.TestConfiguration;
+import redis.embedded.RedisServer;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+/**
+ * @author hongqiaowei
+ */
+
+@TestConfiguration
+public class RedisServerConfiguration {
+
+ private RedisServer redisServer;
+
+ public RedisServerConfiguration(RedisProperties redisProperties) {
+ redisServer = RedisServer.builder()
+ .port(redisProperties.getPort())
+ .setting("maxmemory 32M")
+ .build();
+ }
+
+ @PostConstruct
+ public void postConstruct() {
+ redisServer.start();
+ }
+
+ @PreDestroy
+ public void preDestroy() {
+ redisServer.stop();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/we/redis/RedisTemplateConfiguration.java b/src/test/java/we/redis/RedisTemplateConfiguration.java
new file mode 100644
index 0000000..997f0e1
--- /dev/null
+++ b/src/test/java/we/redis/RedisTemplateConfiguration.java
@@ -0,0 +1,40 @@
+package we.redis;
+
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
+
+/**
+ * @author hongqiaowei
+ */
+
+@TestConfiguration
+// @EnableRedisRepositories
+public class RedisTemplateConfiguration {
+
+ @Bean
+ public LettuceConnectionFactory redisConnectionFactory(
+ RedisProperties redisProperties) {
+ LettuceConnectionFactory cf = new LettuceConnectionFactory(
+ redisProperties.getHost(),
+ redisProperties.getPort());
+ cf.setDatabase(redisProperties.getDatabase());
+ return cf;
+ }
+
+ @Bean
+ public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory connectionFactory) {
+ StringRedisTemplate template = new StringRedisTemplate();
+ template.setConnectionFactory(connectionFactory);
+ return template;
+ }
+
+ @Bean
+ public ReactiveStringRedisTemplate reactiveRedisTemplate(LettuceConnectionFactory factory) {
+ return new ReactiveStringRedisTemplate(factory);
+ }
+}
diff --git a/src/test/java/we/stats/FlowStatTests.java b/src/test/java/we/stats/FlowStatTests.java
new file mode 100644
index 0000000..ba9efe4
--- /dev/null
+++ b/src/test/java/we/stats/FlowStatTests.java
@@ -0,0 +1,540 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import we.util.JacksonUtils;
+
+/**
+ *
+ * @author Francis Dong
+ *
+ */
+public class FlowStatTests {
+
+ private FlowStat stat = new FlowStat();
+
+ class FlowRuleCase {
+ public int threads = 10;
+ public int requests = 10000;
+ public int totalReqs = threads * requests;
+ public List resourceConfigs = new ArrayList<>();
+ public List resourceExpects = new ArrayList<>();
+ public IncrRequestResult expectResult;
+ }
+
+ class ResourceExpect {
+ public long concurrents;
+ public long QPS;
+ public long total;
+ public long blockedReqs;
+
+ public ResourceExpect(long concurrents, long QPS, long total, long blockedReqs) {
+ this.concurrents = concurrents;
+ this.QPS = QPS;
+ this.total = total;
+ this.blockedReqs = blockedReqs;
+ }
+
+ public ResourceExpect() {
+ }
+ }
+
+ public List createFlowRuleCase() {
+ List cases = new ArrayList<>();
+ // blocked by service concurrent request
+ FlowRuleCase c1 = new FlowRuleCase();
+ c1.resourceConfigs.add(new ResourceConfig("_global1", 100, 200));
+ c1.resourceConfigs.add(new ResourceConfig("service1", 10, 200));
+ c1.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
+ c1.resourceExpects.add(new ResourceExpect(10, 10, 10, c1.totalReqs - 10));
+ c1.expectResult = IncrRequestResult.block("service1", BlockType.CONCURRENT_REQUEST);
+ cases.add(c1);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c2 = new FlowRuleCase();
+ c2.resourceConfigs.add(new ResourceConfig("_global2", 10, 200));
+ c2.resourceConfigs.add(new ResourceConfig("service2", 200, 200));
+ c2.resourceExpects.add(new ResourceExpect(10, 10, 10, c2.totalReqs - 10));
+ c2.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
+ c2.expectResult = IncrRequestResult.block("_global2", BlockType.CONCURRENT_REQUEST);
+ cases.add(c2);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c3 = new FlowRuleCase();
+ c3.resourceConfigs.add(new ResourceConfig("_global3", 200, 10));
+ c3.resourceConfigs.add(new ResourceConfig("service3", 200, 100));
+ c3.resourceExpects.add(new ResourceExpect(10, 10, 10, c3.totalReqs - 10));
+ c3.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
+ c3.expectResult = IncrRequestResult.block("_global3", BlockType.QPS);
+ cases.add(c3);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c4 = new FlowRuleCase();
+ c4.resourceConfigs.add(new ResourceConfig("_global4", 200, 100));
+ c4.resourceConfigs.add(new ResourceConfig("service4", 200, 10));
+ c4.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
+ c4.resourceExpects.add(new ResourceExpect(10, 10, 10, c4.totalReqs - 10));
+ c4.expectResult = IncrRequestResult.block("service4", BlockType.QPS);
+ cases.add(c4);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c5 = new FlowRuleCase();
+ c5.resourceConfigs.add(new ResourceConfig("_global5", 0, 0));
+ c5.resourceConfigs.add(new ResourceConfig("service5", 0, 0));
+ c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
+ c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
+ c5.expectResult = IncrRequestResult.success();
+ cases.add(c5);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c6 = new FlowRuleCase();
+ c6.resourceConfigs.add(new ResourceConfig("_global6", 20, 0));
+ c6.resourceConfigs.add(new ResourceConfig("service6", 20, 0));
+ c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20));
+ c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
+ c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST);
+ cases.add(c6);
+
+ // Note: use different resource ID to avoid being affected by previous test data
+ FlowRuleCase c7 = new FlowRuleCase();
+ c7.resourceConfigs.add(new ResourceConfig("_global7", 0, 0));
+ c7.resourceConfigs.add(new ResourceConfig("service7", 0, 20));
+ c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
+ c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20));
+ c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS);
+ cases.add(c7);
+
+ return cases;
+ }
+
+ class ConcurrentJob1 implements Runnable {
+ public ConcurrentJob1(int requests, long curTimeSlotId, List resourceConfigs,
+ IncrRequestResult expectResult) {
+ this.requests = requests;
+ this.resourceConfigs = resourceConfigs;
+ this.curTimeSlotId = curTimeSlotId;
+ this.expectResult = expectResult;
+ }
+
+ private int requests = 0;
+ private List resourceConfigs;
+ private long curTimeSlotId = 0;
+ private IncrRequestResult expectResult;
+
+ @Override
+ public void run() {
+ for (int i = 0; i < requests; i++) {
+ IncrRequestResult result = stat.incrRequest(resourceConfigs, curTimeSlotId);
+ if (result != null && !result.isSuccess()) {
+ assertEquals(expectResult.getBlockedResourceId(), result.getBlockedResourceId());
+ assertEquals(expectResult.getBlockType(), result.getBlockType());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testIncrRequestResultByResourceChain() throws Throwable {
+ // concurrent
+ FlowRuleCase c1 = new FlowRuleCase();
+ c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global1", 100, 200));
+ c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service1", 10, 200));
+
+ long startTimeSlotId = stat.currentTimeSlotId();
+ long endTimeSlotId = startTimeSlotId + 1000;
+ for (int i = 0; i < 10; i++) {
+ stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
+ }
+
+ IncrRequestResult result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
+ assertTrue(!result.isSuccess());
+ assertEquals("testIncrRequestResultByResourceChain_service1", result.getBlockedResourceId());
+ assertEquals(BlockType.CONCURRENT_REQUEST, result.getBlockType());
+
+ stat.addRequestRT(c1.resourceConfigs, startTimeSlotId, 1, true);
+
+ result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
+ assertTrue(result.isSuccess());
+
+ // QPS
+ FlowRuleCase c2 = new FlowRuleCase();
+ c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global2", 100, 200));
+ c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service2", 100, 10));
+
+ for (int i = 0; i < 10; i++) {
+ stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
+ }
+
+ result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
+ assertTrue(!result.isSuccess());
+ assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
+ assertEquals(BlockType.QPS, result.getBlockType());
+
+ stat.addRequestRT(c2.resourceConfigs, startTimeSlotId, 1, true);
+
+ result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
+ assertTrue(!result.isSuccess());
+ assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
+ assertEquals(BlockType.QPS, result.getBlockType());
+
+ }
+
+ @Test
+ public void testIncrRequestByResourceChain() throws Throwable {
+ // create data
+ List cases = createFlowRuleCase();
+ long startTimeSlotId = stat.currentTimeSlotId();
+ long endTimeSlotId = startTimeSlotId + 1000;
+ for (FlowRuleCase c : cases) {
+ ExecutorService pool = Executors.newFixedThreadPool(c.threads);
+ long t1 = System.currentTimeMillis();
+ for (int i = 0; i < c.threads; i++) {
+ pool.submit(new ConcurrentJob1(c.requests, startTimeSlotId, c.resourceConfigs, c.expectResult));
+ }
+ pool.shutdown();
+ if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
+ long t2 = System.currentTimeMillis();
+ System.out.println("testIncrRequestByResourceChain elapsed time: " + (t2 - t1) + "ms for " + c.totalReqs
+ + " requests");
+ for (int i = 0; i < c.resourceConfigs.size(); i++) {
+ ResourceConfig cfg = c.resourceConfigs.get(i);
+ ResourceExpect expect = c.resourceExpects.get(i);
+
+ TimeWindowStat tws = stat.getTimeWindowStat(cfg.getResourceId(), startTimeSlotId, endTimeSlotId);
+ assertEquals(expect.concurrents, tws.getPeakConcurrentReqeusts());
+ assertEquals(expect.QPS, tws.getTotal());
+ assertEquals(expect.total, tws.getTotal());
+ assertEquals(expect.blockedReqs, tws.getBlockRequests());
+ }
+ } else {
+ System.out.println("testIncrRequestByResourceChain timeout");
+ }
+ startTimeSlotId = startTimeSlotId + 1000;
+ endTimeSlotId = endTimeSlotId + 1000;
+ }
+ }
+
+ @Test
+ public void testPeakConcurrentJob() throws Throwable {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ long nextSlotId = curTimeSlotId + 1000;
+ String resourceId = "PeakConcurrentJob";
+ stat.incrRequest(resourceId, curTimeSlotId, null, null);
+ Thread.sleep(1200);
+ TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, nextSlotId + 1000);
+ assertEquals(1, tws.getPeakConcurrentReqeusts());
+ }
+
+ @Test
+ public void testIncr() throws Throwable {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ long slotId = curTimeSlotId + 1000;
+ String resourceId = "a";
+
+ stat.incrRequest(resourceId, curTimeSlotId, null, null);
+ TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, slotId);
+ assertEquals(1, tws.getTotal());
+
+ stat.incrRequest(resourceId, curTimeSlotId, null, null);
+ stat.addRequestRT(resourceId, curTimeSlotId, 100, false);
+ stat.addRequestRT(resourceId, curTimeSlotId, 300, true);
+
+ tws = stat.getPreviousSecondStat(resourceId, slotId);
+ assertEquals(2, tws.getTotal());
+ assertEquals(200, tws.getAvgRt());
+ assertEquals(100, tws.getMin());
+ assertEquals(300, tws.getMax());
+ assertEquals(2, tws.getRps().intValue());
+ assertEquals(1, tws.getErrors());
+
+ stat.decrConcurrentRequest(resourceId, curTimeSlotId);
+ Long con = stat.getConcurrentRequests(resourceId);
+ assertEquals(1, con);
+
+ // System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
+ }
+
+ @Test
+ public void testIncrRequest() throws Throwable {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ long nextSlotId = curTimeSlotId + 1000;
+ String resourceId = "b";
+ Long maxCon = 10l;
+ Long maxRPS = 20l;
+
+ stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
+
+ TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
+ long peakCon = tws.getPeakConcurrentReqeusts();
+ assertEquals(1l, peakCon);
+ }
+
+ @Test
+ public void testBlockedByMaxCon() throws Throwable {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ long nextSlotId = curTimeSlotId + 1000;
+ Long maxCon = 10l;
+ Long maxRPS = 20l;
+ int threads = 100;
+ int requests = 10000;
+ int totalRequests = threads * requests;
+ String resourceId = "c";
+
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ long t1 = System.currentTimeMillis();
+ for (int i = 0; i < threads; i++) {
+ pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
+ }
+ pool.shutdown();
+ if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
+ long t2 = System.currentTimeMillis();
+ TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
+ assertEquals(maxCon, tws.getPeakConcurrentReqeusts());
+ assertEquals(totalRequests - maxCon, tws.getBlockRequests());
+ System.out.println("testBlockedByMaxCon total elapsed time for " + threads * requests + " requests:"
+ + (t2 - t1) + "ms");
+ } else {
+ System.out.println("testIncrConcurrentRequest timeout");
+ }
+ }
+
+ @Test
+ public void testBlockedByMaxRPS() throws Throwable {
+ long curTimeSlotId = stat.currentTimeSlotId();
+ long nextSlotId = curTimeSlotId + 1000;
+ Long maxCon = Long.MAX_VALUE;
+ Long maxRPS = 20l;
+ int threads = 100;
+ int requests = 10000;
+ int totalRequests = threads * requests;
+ String resourceId = "c";
+
+ for (int i = 0; i < maxRPS; i++) {
+ stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
+ }
+
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ long t1 = System.currentTimeMillis();
+ for (int i = 0; i < threads; i++) {
+ pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
+ }
+ pool.shutdown();
+ if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
+ long t2 = System.currentTimeMillis();
+ TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
+ assertEquals(maxRPS, tws.getRps().intValue());
+ assertEquals(totalRequests, tws.getBlockRequests());
+ System.out.println("testIncrConcurrentRequest total elapsed time for " + threads * requests + " requests:"
+ + (t2 - t1) + "ms");
+ } else {
+ System.out.println("testIncrConcurrentRequest timeout");
+ }
+ }
+
+ @Test
+ public void testStat() throws Throwable {
+ // requests per slot per resource
+ int requests = 100;
+ int threads = 10;
+ int resources = 10;
+ int slots = 100;
+ long rt = 100;
+ long t1 = System.currentTimeMillis();
+ long start = (t1 / FlowStat.INTERVAL) * FlowStat.INTERVAL;
+
+ int totalRequests = requests * threads * resources * slots;
+
+ ExecutorService pool = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < threads; i++) {
+ pool.submit(new Job(requests, resources, slots, start, rt));
+ }
+ pool.shutdown();
+ if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
+ long t2 = System.currentTimeMillis();
+
+ long end = start + slots * FlowStat.INTERVAL;
+ long nsecs = (end - start) / 1000;
+
+ System.out.println("total requests:" + totalRequests);
+ System.out.println("total elapsed time:" + (t2 - t1) + "ms");
+ System.out.println("Testing Time Window:" + (end - start) + "ms");
+
+ int resource1 = 1;
+ int resource2 = 2;
+ int rtBase1 = 1;
+ int rtBase3 = 3;
+ TimeWindowStat tws1 = stat.getTimeWindowStat("resource-" + resource1, start, end);
+ TimeWindowStat tws2 = stat.getTimeWindowStat("resource-" + resource2, start, end);
+
+ assertEquals(totalRequests / resources, tws1.getTotal());
+ assertEquals(rt * rtBase1, tws1.getAvgRt());
+ assertEquals(rt * rtBase1, tws1.getMin());
+ assertEquals(rt * rtBase1, tws1.getMax());
+ assertEquals(totalRequests / resources / nsecs, tws1.getRps().intValue());
+ assertEquals(totalRequests / resources / 10, tws1.getErrors().intValue());
+ System.out.println("RPS of resource1: " + tws1.getRps().intValue());
+
+ assertEquals(totalRequests / resources, tws2.getTotal());
+ assertEquals(rt * rtBase3, tws2.getAvgRt());
+ assertEquals(rt * rtBase3, tws2.getMin());
+ assertEquals(rt * rtBase3, tws2.getMax());
+ assertEquals(totalRequests / resources / nsecs, tws2.getRps().intValue());
+ assertEquals(totalRequests / resources / 10, tws2.getErrors().intValue());
+ System.out.println("RPS of resource2: " + tws2.getRps().intValue());
+
+ // performance of getTimeWindowStat
+ for (int n = 0; n < 10; n++) {
+ long t3 = System.currentTimeMillis();
+ int times = 100000;
+ for (int i = 0; i < times; i++) {
+ stat.getTimeWindowStat("resource-" + resource1, start, end);
+ }
+ long t4 = System.currentTimeMillis();
+ System.out.println("performance of getTimeWindowStat: " + (t4 - t3) + "ms " + times + " times");
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
+
+ List list = stat.getResourceTimeWindowStats("resource-" + 1, start, end + 3 * 1000,
+ 10);
+ assertEquals(nsecs / 10, list.get(0).getWindows().size());
+ System.out.println(JacksonUtils.writeValueAsString(list));
+ } else {
+ System.out.println("timeout");
+ }
+
+ }
+
+ class Job implements Runnable {
+
+ public Job(int requests, int resources, int slots, long startSlotId, long rt) {
+ this.requests = requests;
+ this.resources = resources;
+ this.slots = slots;
+ this.startSlotId = startSlotId;
+ this.rt = rt;
+ }
+
+ private int requests = 0;
+ private int resources = 0;
+ private int slots = 0;
+ private long startSlotId = 0;
+ private long rt = 0;
+
+ @Override
+ public void run() {
+ for (int m = 0; m < slots; m++) {
+ for (int i = 0; i < requests; i++) {
+ for (int j = 0; j < resources; j++) {
+ stat.incrRequest("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), null, null);
+ // 10% error
+ boolean isSuccess = i % 10 == 1 ? false : true;
+ // rt will be triple while even
+ stat.addRequestRT("resource-" + j, startSlotId + (m * FlowStat.INTERVAL),
+ rt * (j % 2 == 0 ? 3 : 1), isSuccess);
+ }
+ try {
+ // Thread.sleep(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ class ConcurrentJob implements Runnable {
+
+ public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Long maxCon, Long maxRPS) {
+ this.requests = requests;
+ this.resourceId = resourceId;
+ this.maxRPS = maxRPS;
+ this.maxCon = maxCon;
+ this.curTimeSlotId = curTimeSlotId;
+ }
+
+ private int requests = 0;
+ private String resourceId;
+ private Long maxCon = 0l;
+ private Long maxRPS = 0l;
+ private long curTimeSlotId = 0;
+
+ @Override
+ public void run() {
+ for (int i = 0; i < requests; i++) {
+ stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
+ }
+ }
+ }
+
+ @Test
+ public void testGetResourceStat() throws Throwable {
+ int threads = 10;
+ int requests = 100000;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ long t1 = System.currentTimeMillis();
+ for (int i = 0; i < threads; i++) {
+ pool.submit(new GetResourceStatJob(requests));
+ }
+ pool.shutdown();
+ if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
+ System.out.println("testGetResourceStat done");
+ } else {
+ System.out.println("testGetResourceStat timeout");
+ }
+ }
+
+ class GetResourceStatJob implements Runnable {
+
+ public GetResourceStatJob(int requests) {
+ this.requests = requests;
+ }
+
+ private int requests = 0;
+
+ @Override
+ public void run() {
+ for (int i = 0; i < requests; i++) {
+ try {
+ ResourceStat rs = stat.getResourceStat("" + i);
+ assertNotNull(rs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+}
diff --git a/src/test/java/we/stats/ratelimit/RateLimitTests.java b/src/test/java/we/stats/ratelimit/RateLimitTests.java
new file mode 100644
index 0000000..f3be29a
--- /dev/null
+++ b/src/test/java/we/stats/ratelimit/RateLimitTests.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2021 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.stats.ratelimit;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.http.client.reactive.ReactorResourceFactory;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.resources.LoopResources;
+import we.stats.FlowStat;
+import we.stats.ResourceTimeWindowStat;
+import we.util.Constants;
+import we.util.DateTimeUtils;
+import we.util.JacksonUtils;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author hongqiaowei
+ */
+
+public class RateLimitTests {
+
+ private FlowStat flowStat = new FlowStat();
+
+ private ConnectionProvider getConnectionProvider() {
+ return ConnectionProvider
+ .builder("flow-control-cp")
+ .maxConnections(500)
+ .pendingAcquireTimeout(Duration.ofMillis(6_000))
+ .maxIdleTime(Duration.ofMillis(40_000))
+ .build();
+ }
+
+ private LoopResources getLoopResources() {
+ LoopResources lr = LoopResources.create("flow-control-el", Runtime.getRuntime().availableProcessors(), true);
+ lr.onServer(false);
+ return lr;
+ }
+
+ private ReactorResourceFactory reactorResourceFactory() {
+ ReactorResourceFactory fact = new ReactorResourceFactory();
+ fact.setUseGlobalResources(false);
+ fact.setConnectionProvider(getConnectionProvider());
+ fact.setLoopResources(getLoopResources());
+ fact.afterPropertiesSet();
+ return fact;
+ }
+
+ private WebClient getWebClient() {
+ ConnectionProvider cp = getConnectionProvider();
+ LoopResources lr = getLoopResources();
+ HttpClient httpClient = HttpClient.create(cp).compress(false).tcpConfiguration(
+ tcpClient -> {
+ return tcpClient.runOn(lr, false)
+ .doOnConnected(
+ connection -> {
+ connection.addHandlerLast(new ReadTimeoutHandler( 20_000, TimeUnit.MILLISECONDS))
+ .addHandlerLast(new WriteTimeoutHandler( 20_000, TimeUnit.MILLISECONDS));
+ }
+ )
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 20_000)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ }
+ );
+ return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build())
+ .clientConnector(new ReactorClientHttpConnector(httpClient)).build();
+ }
+
+ @Test
+ public void flowControlTests() throws InterruptedException {
+ WebClient webClient = getWebClient();
+ for (int i = 0; i < 0; i++) {
+ // String uri = "http://12.5.3.8:8600/proxy/fizz" + i + "/ftrol/mock";
+ String uri = "http://12.5.3.8:8600/proxy/fizz/ftrol/mock" + i;
+ System.err.println(i);
+ webClient
+ .method(HttpMethod.GET)
+ .uri(uri)
+ .headers(hdrs -> {})
+ .body(Mono.just(""), String.class)
+ .exchange().name("")
+ .doOnRequest(l -> {})
+ .doOnSuccess(r -> {})
+ .doOnError(t -> {
+ t.printStackTrace();
+ })
+ .timeout(Duration.ofMillis(16_000))
+ .flatMap(
+ remoteResp -> {
+ remoteResp.bodyToMono(String.class)
+ .doOnSuccess(
+ s -> {
+ // System.out.println(s);
+ }
+ );
+ return Mono.empty();
+ }
+ )
+ .subscribe()
+ ;
+ }
+ // Thread.currentThread().join();
+ }
+
+ @Test
+ public void test() {
+
+ FlowStat flowStat = new FlowStat();
+
+ long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23);
+ boolean success = flowStat.incrRequest("resourceX", incrTime, Long.MAX_VALUE, Long.MAX_VALUE);
+ System.err.println("incrTime: " + incrTime + ", success: " + success);
+
+ long startTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:41.000", Constants.DatetimePattern.DP23);
+ long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23);
+
+ List resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, endTimeSlot, 3);
+ if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
+ System.err.println(toDP19(startTimeSlot) + " - " + toDP19(endTimeSlot) + " no flow stat data");
+ } else {
+ System.err.println(JacksonUtils.writeValueAsString(resourceTimeWindowStats));
+ }
+ }
+
+ private String toDP19(long mills) {
+ return DateTimeUtils.toDate(mills, Constants.DatetimePattern.DP19);
+ }
+
+ @Test
+ public void test0() {
+ FlowStat flowStat = new FlowStat();
+ boolean success = flowStat.incrRequest("resourceX", 1610181704000l, Long.MAX_VALUE, Long.MAX_VALUE);
+ List r = flowStat.getResourceTimeWindowStats("resourceX", 1610181681000l, 1610181711000l, 30);
+ System.err.println("r: " + JacksonUtils.writeValueAsString(r));
+ }
+}
diff --git a/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java b/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java
new file mode 100644
index 0000000..6a90aaa
--- /dev/null
+++ b/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigServiceTests.java
@@ -0,0 +1,72 @@
+package we.stats.ratelimit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
+import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.event.annotation.BeforeTestMethod;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import org.springframework.util.ReflectionUtils;
+import we.redis.RedisProperties;
+import we.redis.RedisServerConfiguration;
+import we.redis.RedisTemplateConfiguration;
+
+import javax.annotation.Resource;
+
+import java.lang.reflect.Field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author hongqiaowei
+ */
+
+@TestPropertySource("/application.properties")
+@SpringJUnitConfig(classes = {RedisProperties.class, RedisTemplateConfiguration.class, RedisServerConfiguration.class})
+// @ActiveProfiles("unittest")
+public class ResourceRateLimitConfigServiceTests {
+
+ @Resource
+ RedisProperties redisProperties;
+
+ @Resource
+ StringRedisTemplate stringRedisTemplate;
+
+ @Resource
+ ReactiveStringRedisTemplate reactiveStringRedisTemplate;
+
+ ResourceRateLimitConfigService resourceRateLimitConfigService;
+
+ @BeforeEach
+ void beforeEach() throws NoSuchFieldException {
+ resourceRateLimitConfigService = new ResourceRateLimitConfigService();
+ // Field rt = ResourceRateLimitConfigService.class.getField("rt");
+ // ReflectionUtils.makeAccessible(rt);
+ // ReflectionUtils.setField(rt, resourceRateLimitConfigService, reactiveStringRedisTemplate);
+ resourceRateLimitConfigService.setReactiveStringRedisTemplate( reactiveStringRedisTemplate);
+ }
+
+ @Test
+ void initTest() throws Throwable {
+ // System.err.println(redisProperties);
+ // System.err.println(stringRedisTemplate);
+ // System.err.println(reactiveStringRedisTemplate);
+
+ // stringRedisTemplate.opsForValue().set("name", "F-22");
+ // Thread.sleep(2000);
+ // String name = stringRedisTemplate.opsForValue().get("name");
+ // assertEquals(name, "F-22");
+ // System.err.println(name);
+
+ // stringRedisTemplate.opsForHash().put("fizz_rate_limit", "2", "{\"concurrents\":100,\"enable\":1,\"id\":2,\"isDeleted\":0,\"resource\":\"service_default\",\"type\":2}");
+ // resourceRateLimitConfigService.init();
+ // ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("service_default");
+ //
+ // System.err.println(resourceRateLimitConfig);
+ // System.err.println("init test end");
+ // Thread.currentThread().join();
+ }
+}
diff --git a/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java b/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java
new file mode 100644
index 0000000..d8fb086
--- /dev/null
+++ b/src/test/java/we/stats/ratelimit/ResourceRateLimitConfigTests.java
@@ -0,0 +1,20 @@
+package we.stats.ratelimit;
+
+import org.junit.jupiter.api.Test;
+import we.util.JacksonUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author hongqiaowei
+ */
+
+public class ResourceRateLimitConfigTests {
+
+ @Test
+ void test() {
+ String resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500,\"resource\":\"_global\",\"responseContent\":\"{\\\"msg\\\":\\\"rate limit, please try again\\\"}\",\"responseType\":\"application/json; charset=UTF-8\",\"type\":1}";
+ ResourceRateLimitConfig resourceRateLimitConfig = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
+ assertEquals("application/json; charset=UTF-8", resourceRateLimitConfig.responseType);
+ }
+}
diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties
new file mode 100644
index 0000000..c0ba817
--- /dev/null
+++ b/src/test/resources/application.properties
@@ -0,0 +1,5 @@
+# author: hongqiaowei
+
+embeded.redis.host = localhost
+embeded.redis.port = 6379
+embeded.redis.database = 4