v1.4.0 flow statistic and rate limit (#28)

support flow statistic and rate limit
This commit is contained in:
dxfeng10
2021-01-18 18:09:01 +08:00
committed by GitHub
parent 0622459ec0
commit d3f1925ba7
38 changed files with 3501 additions and 59 deletions

View File

@@ -90,6 +90,7 @@ Starting from v1.3.0, the frontend and backend of the management backend are mer
| Fizz-gateway-community | Fizz-manager-professional |
| ---------------------- | ------------------------- |
| v1.3.0 | v1.3.0 |
| v1.4.0 | v1.4.0 |
Please download the corresponding management backend version according to the version of the community version
@@ -189,6 +190,8 @@ Fizz官方技术交流③群512164278
## System screenshot
![](https://user-images.githubusercontent.com/6129661/104895987-84618880-59b1-11eb-9a73-a8569a7e6a69.png)
![](https://user-images.githubusercontent.com/184315/97131368-f5ace900-177e-11eb-9e00-24e73d4e24f5.png)
![](https://user-images.githubusercontent.com/184315/97131376-f9407000-177e-11eb-8c17-4922b3df5d48.png)
@@ -197,4 +200,4 @@ Fizz官方技术交流③群512164278
![](https://user-images.githubusercontent.com/184315/97131381-fba2ca00-177e-11eb-9e59-688dafa76aea.png)
![](https://user-images.githubusercontent.com/184315/97131382-fc3b6080-177e-11eb-908a-a5ffc8b08459.png)
![](https://user-images.githubusercontent.com/6129661/104897563-7ca2e380-59b3-11eb-8288-39a2b181183d.png)

View File

@@ -89,6 +89,7 @@ API地址http://demo.fizzgate.com/proxy/[服务名]/[API Path]
| Fizz-gateway-community | Fizz-manager-professional |
| ---------------------- | ------------------------- |
| v1.3.0 | v1.3.0 |
| v1.4.0 | v1.4.0 |
请根据社区版的版本下载对应的管理后台版本
@@ -188,6 +189,8 @@ Fizz官方技术交流③群512164278
## 系统截图
![](https://user-images.githubusercontent.com/6129661/104895987-84618880-59b1-11eb-9a73-a8569a7e6a69.png)
![](https://user-images.githubusercontent.com/184315/97131368-f5ace900-177e-11eb-9e00-24e73d4e24f5.png)
![](https://user-images.githubusercontent.com/184315/97131376-f9407000-177e-11eb-8c17-4922b3df5d48.png)
@@ -196,4 +199,4 @@ Fizz官方技术交流③群512164278
![](https://user-images.githubusercontent.com/184315/97131381-fba2ca00-177e-11eb-9e59-688dafa76aea.png)
![](https://user-images.githubusercontent.com/184315/97131382-fc3b6080-177e-11eb-908a-a5ffc8b08459.png)
![](https://user-images.githubusercontent.com/6129661/104897563-7ca2e380-59b3-11eb-8288-39a2b181183d.png)

20
pom.xml
View File

@@ -10,7 +10,7 @@
</parent>
<groupId>we</groupId>
<artifactId>fizz-gateway-community</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<name>fizz-gateway-community</name>
<repositories>
@@ -35,10 +35,10 @@
<properties>
<java.version>1.8</java.version>
<spring-framework.version>5.2.12.RELEASE</spring-framework.version>
<reactor-bom.version>Dysprosium-SR15</reactor-bom.version>
<lettuce.version>5.3.5.RELEASE</lettuce.version>
<reactor-bom.version>Dysprosium-SR16</reactor-bom.version>
<lettuce.version>5.3.6.RELEASE</lettuce.version>
<nacos.version>0.2.7</nacos.version>
<netty.version>4.1.56.Final</netty.version>
<netty.version>4.1.58.Final</netty.version>
<httpcore.version>4.4.14</httpcore.version>
<log4j2.version>2.13.3</log4j2.version>
</properties>
@@ -177,6 +177,18 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>it.ozimov</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.7.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.noear</groupId>

View File

@@ -1,3 +1,19 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we;
import org.springframework.context.ConfigurableApplicationContext;

View File

@@ -0,0 +1,37 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import we.stats.FlowStat;
/**
* @author hongqiaowei
*/
@ConditionalOnProperty(name = "flowControl", havingValue = "true")
@Configuration
public class FlowControlConfig {
@Bean
public FlowStat flowStat() {
return new FlowStat();
}
}

View File

@@ -0,0 +1,225 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.config;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.DateTimeUtils;
import we.util.NetworkUtils;
import we.util.ThreadContext;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author hongqiaowei
*/
@Configuration
@EnableScheduling
public class FlowStatSchedConfig extends SchedConfig {
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
private static final String _ip = "\"ip\":";
private static final String _id = "\"id\":";
private static final String _resource = "\"resource\":";
private static final String _type = "\"type\":";
private static final String _start = "\"start\":";
private static final String _reqs = "\"reqs\":";
private static final String _completeReqs = "\"completeReqs\":";
private static final String _peakConcurrents = "\"peakConcurrents\":";
private static final String _reqPerSec = "\"reqPerSec\":";
private static final String _blockReqs = "\"blockReqs\":";
private static final String _totalBlockReqs = "\"totalBlockReqs\":";
private static final String _errors = "\"errors\":";
private static final String _avgRespTime = "\"avgRespTime\":";
private static final String _minRespTime = "\"minRespTime\":";
private static final String _maxRespTime = "\"maxRespTime\":";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private FlowStat flowStat;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@NacosValue(value = "${flow-stat-sched.dest:redis}", autoRefreshed = true)
@Value("${flow-stat-sched.dest:redis}")
private String dest;
@NacosValue(value = "${flow-stat-sched.queue:fizz_resource_access_stat}", autoRefreshed = true)
@Value("${flow-stat-sched.queue:fizz_resource_access_stat}")
private String queue;
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
private final String ip = NetworkUtils.getServerIp();
private long startTimeSlot = 0;
private Map<String, AtomicLong> key2totalBlockMap = new HashMap<>();
@Scheduled(cron = "${flow-stat-sched.cron}")
public void sched() {
if (!flowControl) {
return;
}
if (startTimeSlot == 0) {
startTimeSlot = getRecentEndTimeSlot(flowStat);
return;
}
long st = System.currentTimeMillis();
long recentEndTimeSlot = getRecentEndTimeSlot(flowStat);
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, recentEndTimeSlot, 10);
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
log.info(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot) + " no flow stat data");
return;
}
key2totalBlockMap.clear();
resourceTimeWindowStats.forEach(rtws -> {
List<TimeWindowStat> wins = rtws.getWindows();
wins.forEach(w -> {
AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s",
ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0));
totalBlock.addAndGet(w.getBlockRequests());
});
});
resourceTimeWindowStats.forEach(
rtws -> {
String resource = rtws.getResourceId();
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
int id = (config == null ? 0 : config.id);
int type;
if (ResourceRateLimitConfig.GLOBAL.equals(resource)) {
type = ResourceRateLimitConfig.Type.GLOBAL;
} else if (resource.charAt(0) == '/') {
type = ResourceRateLimitConfig.Type.API;
} else {
type = ResourceRateLimitConfig.Type.SERVICE;
}
List<TimeWindowStat> wins = rtws.getWindows();
wins.forEach(
w -> {
StringBuilder b = ThreadContext.getStringBuilder();
Long winStart = w.getStartTime();
BigDecimal rps = w.getRps();
double qps;
if (rps == null) {
qps = 0.00;
} else {
qps = rps.doubleValue();
}
AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
b.append(Constants.Symbol.LEFT_BRACE);
b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA);
b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA);
b.append(_resource); toJsonStringValue(b, resource); b.append(Constants.Symbol.COMMA);
b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA);
b.append(_start); b.append(winStart); b.append(Constants.Symbol.COMMA);
b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA);
b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA);
b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA);
b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA);
b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA);
b.append(_totalBlockReqs); b.append(totalBlockReqs); b.append(Constants.Symbol.COMMA);
b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA);
b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA);
b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA);
b.append(_minRespTime); b.append(w.getMin());
b.append(Constants.Symbol.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(dest)) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
} else {
rt.convertAndSend(queue, msg).subscribe();
}
if (log.isDebugEnabled()) {
log.debug("report " + toDP19(winStart) + " win10: " + msg);
}
}
);
}
);
startTimeSlot = recentEndTimeSlot;
log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis()));
}
private long getRecentEndTimeSlot(FlowStat flowStat) {
long currentTimeSlot = flowStat.currentTimeSlotId();
int second = DateTimeUtils.from(currentTimeSlot).getSecond();
long interval;
if (second > 49) {
interval = second - 50;
} else if (second > 39) {
interval = second - 40;
} else if (second > 29) {
interval = second - 30;
} else if (second > 19) {
interval = second - 20;
} else if (second > 9) {
interval = second - 10;
} else if (second > 0) {
interval = second - 0;
} else {
interval = 0;
}
long recentEndTimeSlot = currentTimeSlot - interval * 1000;
return recentEndTimeSlot;
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
private String toDP23(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP23);
}
private static void toJsonStringValue(StringBuilder b, String value) {
b.append(Constants.Symbol.DOUBLE_QUOTE).append(value).append(Constants.Symbol.DOUBLE_QUOTE);
}
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.config;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
/**
* @author hongqiaowei
*/
@ConfigurationProperties(prefix = "sched")
public abstract class SchedConfig implements SchedulingConfigurer {
private int executors = 1;
public void setExecutors(int es) {
executors = es;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
taskRegistrar.addTriggerTask(new Runnable() {
public void run() {
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return null;
}
});
}
@Bean(destroyMethod = "shutdown")
public Executor taskScheduler() {
return Executors.newScheduledThreadPool(executors);
}
}

View File

@@ -32,6 +32,7 @@ import reactor.core.publisher.Mono;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
import we.plugin.auth.GatewayGroupService;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.JacksonUtils;
import javax.annotation.Resource;
@@ -41,7 +42,8 @@ import javax.annotation.Resource;
*/
@RestController
public class HealthController {
@RequestMapping("/admin/cache")
public class CacheCheckController {
@Resource
private GatewayGroupService gatewayGroupService;
@@ -52,12 +54,8 @@ public class HealthController {
@Resource
private ApiConfigService apiConfigService;
// add by hongqiaowei
@GetMapping("/sysgc")
public Mono<String> sysgc(ServerWebExchange exchange) throws Exception {
System.gc();
return Mono.just("sysgc done");
}
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@GetMapping("/gatewayGroups")
public Mono<String> gatewayGroups(ServerWebExchange exchange) throws Exception {
@@ -78,4 +76,9 @@ public class HealthController {
public Mono<String> apiConfigs(ServerWebExchange exchange) throws Exception {
return Mono.just(JacksonUtils.writeValueAsString(apiConfigService.serviceConfigMap));
}
@GetMapping("/resourceRateLimitConfigs")
public Mono<String> resourceRateLimitConfigs(ServerWebExchange exchange) throws Exception {
return Mono.just(JacksonUtils.writeValueAsString(resourceRateLimitConfigService.getResourceRateLimitConfigMap()));
}
}

View File

@@ -0,0 +1,109 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.controller;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.Constants;
import we.util.DateTimeUtils;
import we.util.JacksonUtils;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author hongqiaowei
*/
@RestController
@RequestMapping("/admin/flowStat")
public class FlowControlController {
private static final Logger log = LoggerFactory.getLogger(FlowControlController.class);
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private FlowStat flowStat;
@GetMapping("/globalConcurrentsRps")
public Mono<String> globalConcurrentsRps(ServerWebExchange exchange, @RequestParam(value = "recent", required = false, defaultValue = "3") int recent) {
long concurrents = 0;
double rps = 0;
Map<String, Object> result = new HashMap<>();
result.put("concurrents", concurrents);
result.put("rps", rps);
if (flowControl) {
try {
long currentTimeSlot = flowStat.currentTimeSlotId();
long startTimeSlot = currentTimeSlot - recent * 1000;
TimeWindowStat timeWindowStat = null;
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
if (wins == null || wins.isEmpty()) {
result.put("rps", 0);
} else {
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
result.put("concurrents", concurrents);
timeWindowStat = wins.get(0).getWindows().get(0);
BigDecimal winrps = timeWindowStat.getRps();
if (winrps == null) {
rps = 0;
} else {
rps = winrps.doubleValue();
}
result.put("rps", rps);
}
if (log.isDebugEnabled()) {
long compReqs = -1;
if (timeWindowStat != null) {
compReqs = timeWindowStat.getCompReqs();
}
log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " result: " + JacksonUtils.writeValueAsString(result) + ", complete reqs: " + compReqs);
}
} catch (Throwable t) {
log.error("get current global concurrents and rps error", t);
}
}
return Mono.just(JacksonUtils.writeValueAsString(result));
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
public abstract class AbsFlowControlFilter extends ProxyAggrFilter {
protected static final Logger log = LoggerFactory.getLogger(AbsFlowControlFilter.class);
protected static final String exceed = " exceed ";
protected static final String concurrents = " concurrents ";
protected static final String orQps = " or qps ";
protected static final String currentTimeSlot = "currentTimeSlot";
protected static final String start = "start";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
protected boolean flowControl;
@Resource
protected ResourceRateLimitConfigService resourceRateLimitConfigService;
@Resource
protected FlowStat flowStat;
protected Mono<Void> generateExceedResponse(ServerWebExchange exchange, ResourceRateLimitConfig config) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed) .append(config.resource) .append(concurrents) .append(config.concurrents).append(orQps).append(config.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
}
protected void inTheEnd(ServerWebExchange exchange, String resource, long start, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
flowStat.decrConcurrentRequest(resource, currentTimeSlot);
flowStat.addRequestRT(resource, currentTimeSlot, spend, success);
}
}

View File

@@ -17,8 +17,8 @@
package we.filter;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@@ -32,10 +32,14 @@ import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.exception.RedirectException;
import we.exception.StopAndResponseException;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.JacksonUtils;
import we.util.ThreadContext;
import we.util.WebUtils;
import java.net.URI;
/**
* @author hongqiaowei
*/
@@ -44,13 +48,14 @@ import we.util.WebUtils;
public class FilterExceptionHandlerConfig {
public static class FilterExceptionHandler implements WebExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(FilterExceptionHandler.class);
private static final String filterExceptionHandler = "filterExceptionHandler";
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable t) {
ServerHttpResponse resp = exchange.getResponse();
if (t instanceof StopAndResponseException) {
StopAndResponseException ex = (StopAndResponseException) t;
if (ex.getData() != null) {
ServerHttpResponse resp = exchange.getResponse();
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(ex.getData().toString().getBytes())));
}
@@ -58,7 +63,6 @@ public class FilterExceptionHandlerConfig {
if (t instanceof RedirectException) {
RedirectException ex = (RedirectException) t;
if (ex.getRedirectUrl() != null) {
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.MOVED_PERMANENTLY);
resp.getHeaders().setLocation(URI.create(ex.getRedirectUrl()));
return Mono.empty();
@@ -66,7 +70,6 @@ public class FilterExceptionHandlerConfig {
}
if (t instanceof ExecuteScriptException) {
ExecuteScriptException ex = (ExecuteScriptException) t;
ServerHttpResponse resp = exchange.getResponse();
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
String reqId = exchange.getRequest().getId();
@@ -78,13 +81,23 @@ public class FilterExceptionHandlerConfig {
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
Mono<Void> vm = WebUtils.responseError(exchange, filterExceptionHandler, HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), t);
Mono<Void> vm;
Object fc = exchange.getAttributes().get(WebUtils.FILTER_CONTEXT);
if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), exchange.getRequest().getId());
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));
} else {
vm = WebUtils.responseError(exchange, filterExceptionHandler, HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), t);
}
return vm;
}
}
@Bean
@Order(-2)
@Order(-10)
public FilterExceptionHandler filterExceptionHandler() {
return new FilterExceptionHandler();
}

View File

@@ -0,0 +1,137 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.flume.clients.log4j2appender.LogService;
import we.stats.BlockType;
import we.stats.FlowStat;
import we.stats.IncrRequestResult;
import we.stats.ResourceConfig;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.WebUtils;
/**
* @author hongqiaowei
*/
@Component(FlowControlFilter.FLOW_CONTROL_FILTER)
@Order(-1)
public class FlowControlFilter extends ProxyAggrFilter {
public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
@Resource
private FlowStat flowStat;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
String service = WebUtils.getClientService(exchange);
// String reqPath = WebUtils.getClientReqPath(exchange);
long currentTimeSlot = flowStat.currentTimeSlotId();
ResourceRateLimitConfig globalConfig = resourceRateLimitConfigService
.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ResourceRateLimitConfig serviceConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (serviceConfig == null) {
serviceConfig = resourceRateLimitConfigService
.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
}
// global
List<ResourceConfig> resourceConfigs = new ArrayList<>();
ResourceConfig globalResCfg = new ResourceConfig(ResourceRateLimitConfig.GLOBAL, 0, 0);
if (globalConfig != null && globalConfig.isEnable()) {
globalResCfg.setMaxCon(globalConfig.concurrents);
globalResCfg.setMaxQPS(globalConfig.qps);
}
resourceConfigs.add(globalResCfg);
// service
ResourceConfig serviceResCfg = new ResourceConfig(service, 0, 0);
if (serviceConfig != null && serviceConfig.isEnable()) {
serviceResCfg.setMaxCon(serviceConfig.concurrents);
serviceResCfg.setMaxQPS(serviceConfig.qps);
}
resourceConfigs.add(serviceResCfg);
IncrRequestResult result = flowStat.incrRequest(resourceConfigs, currentTimeSlot);
if (result != null && !result.isSuccess()) {
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
log.info("exceed {} flow limit, blocked by maximum concurrent requests",
result.getBlockedResourceId(), LogService.BIZ_ID, exchange.getRequest().getId());
} else {
log.info("exceed {} flow limit, blocked by maximum QPS", result.getBlockedResourceId(),
LogService.BIZ_ID, exchange.getRequest().getId());
}
// ResourceRateLimitConfig config = result.getBlockedResourceId().equals(globalConfig.resource)
// ? globalConfig
// : serviceConfig;
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
} else {
long start = System.currentTimeMillis();
return chain.filter(exchange).doFinally(s -> {
long rt = System.currentTimeMillis() - start;
if (s == SignalType.ON_COMPLETE) {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true);
} else {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false);
}
});
}
}
return chain.filter(exchange);
}
}

View File

@@ -0,0 +1,113 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.flume.clients.log4j2appender.LogService;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.JacksonUtils;
import we.util.WebUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @author hongqiaowei
*/
//@Component(GlobalFlowControlFilter.GLOBAL_FLOW_CONTROL_FILTER)
//@Order(-4)
public class GlobalFlowControlFilter extends AbsFlowControlFilter {
public static final String GLOBAL_FLOW_CONTROL_FILTER = "globalFlowControlFilter";
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
// Map<String, Object> traceMap = new HashMap<>();
LogService.setBizId(exchange.getRequest().getId());
long currentTimeSlot = flowStat.currentTimeSlotId();
// traceMap.put("currentTimeSlot", currentTimeSlot);
exchange.getAttributes().put(AbsFlowControlFilter.currentTimeSlot, currentTimeSlot);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
if (config.isEnable()) {
// traceMap.put("globalConfig", "enable conns " + config.concurrents + " and incr now");
boolean concurrentOrRpsExceed = !flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed) {
// traceMap.put("globalConfigExceed", "true");
return generateExceedResponse(exchange, config);
}
} else {
// traceMap.put("noGlobalConfig", "incr now");
flowStat.incrRequest(ResourceRateLimitConfig.GLOBAL, currentTimeSlot, null, null);
}
// if (log.isDebugEnabled()) {
// log.debug(JacksonUtils.writeValueAsString(traceMap), LogService.BIZ_ID, exchange.getRequest().getId());
// }
// StringBuilder b = new StringBuilder();
// WebUtils.request2stringBuilder(exchange, b);
// b.append('\n');
long start = System.currentTimeMillis();
exchange.getAttributes().put(AbsFlowControlFilter.start, start);
return chain.filter(exchange)
// .doOnSuccess(
// r -> {
// // b.append(" succ ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// // b.append(" errs ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// // b.append(" cans ");
// // inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
// }
// )
.doFinally(
s -> {
if (s == SignalType.ON_COMPLETE) {
// b.append(" comps ");
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, true);
} else {
// b.append(" " + s);
inTheEnd(exchange, ResourceRateLimitConfig.GLOBAL, start, currentTimeSlot, false);
}
// if (log.isDebugEnabled()) {
// log.debug(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
// }
}
);
}
return chain.filter(exchange);
}
}

View File

@@ -114,20 +114,20 @@ public class PreFilter extends ProxyAggrFilter {
}
private void afterAuth(ServerWebExchange exchange, ApiConfig ac) {
String bs = null, bp = null;
String bs = null, bp;
if (ac == null) {
bs = WebUtils.getClientService(exchange);
bp = WebUtils.getClientReqPath(exchange);
} else {
if (ac.type != ApiConfig.Type.REVERSE_PROXY) {
bs = ac.backendService;
bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
if (bs != null) {
WebUtils.setBackendService(exchange, bs);
WebUtils.setBackendPath(exchange, bp);
}
WebUtils.setBackendPath(exchange, bp);
}
private Mono chain(ServerWebExchange exchange, Mono m, PluginFilter pf) {

View File

@@ -0,0 +1,88 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.WebUtils;
/**
* @author hongqiaowei
*/
//@Component(ServiceFlowControlFilter.SERVICE_FLOW_CONTROL_FILTER)
//@Order(-3)
public class ServiceFlowControlFilter extends AbsFlowControlFilter {
public static final String SERVICE_FLOW_CONTROL_FILTER = "serviceFlowControlFilter";
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = exchange.getAttribute(AbsFlowControlFilter.currentTimeSlot);
String service = WebUtils.getClientService(exchange);
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (config == null) {
config = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
}
if (config == null || !config.isEnable()) {
flowStat.incrRequest(service, currentTimeSlot, null, null);
} else {
boolean concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, config.concurrents, config.qps);
if (concurrentOrRpsExceed) {
return generateExceedResponse(exchange, config);
}
}
long start = exchange.getAttribute(AbsFlowControlFilter.start);
return chain.filter(exchange)
// .doOnSuccess(
// r -> {
// inTheEnd(exchange, service, start, currentTimeSlot, true);
// }
// )
// .doOnError(
// t -> {
// inTheEnd(exchange, service, start, currentTimeSlot, false);
// }
// )
// .doOnCancel(
// () -> {
// inTheEnd(exchange, service, start, currentTimeSlot, false);
// }
// )
.doFinally(
s -> {
if (s == SignalType.ON_COMPLETE) {
inTheEnd(exchange, service, start, currentTimeSlot, true);
} else {
inTheEnd(exchange, service, start, currentTimeSlot, false);
}
}
);
}
return chain.filter(exchange);
}
}

View File

@@ -28,9 +28,9 @@ import java.util.Map;
* @author hongqiaowei
*/
public class GatewayGroup2appsToApiConfig {
public class GatewayGroup2apiConfig {
private static final Logger log = LoggerFactory.getLogger(GatewayGroup2appsToApiConfig.class);
private static final Logger log = LoggerFactory.getLogger(GatewayGroup2apiConfig.class);
private Map<String/*gg*/, Map<String/*a*/, ApiConfig>> configMap = new HashMap<>(6);

View File

@@ -34,6 +34,8 @@ import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author hongqiaowei
@@ -52,7 +54,7 @@ public class GatewayGroupService {
private Map<Integer, GatewayGroup> oldGatewayGroupMap = new HashMap<>(6);
public Set<String> currentGatewayGroupSet = new HashSet<>(6);
public Set<String> currentGatewayGroupSet = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet());
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;

View File

@@ -46,7 +46,7 @@ public class ServiceConfig {
@JsonIgnore
public Map<Integer, ApiConfig> apiConfigMap = new HashMap<>(32);
public Map<String, EnumMap<HttpMethod, GatewayGroup2appsToApiConfig>> path2methodToApiConfigMapMap = new HashMap<>(6);
public Map<String, EnumMap<HttpMethod, GatewayGroup2apiConfig>> path2methodToApiConfigMapMap = new HashMap<>(6);
public ServiceConfig(String id) {
this.id = id;
@@ -54,36 +54,36 @@ public class ServiceConfig {
public void add(ApiConfig ac) {
apiConfigMap.put(ac.id, ac);
EnumMap<HttpMethod, GatewayGroup2appsToApiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
EnumMap<HttpMethod, GatewayGroup2apiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
method2apiConfigMap = new EnumMap<>(HttpMethod.class);
GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
gatewayGroup2appsToApiConfig.add(ac);
method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
GatewayGroup2apiConfig gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
gatewayGroup2apiConfig.add(ac);
method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
path2methodToApiConfigMapMap.put(ac.path, method2apiConfigMap);
} else {
GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2appsToApiConfig == null) {
gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2apiConfig == null) {
gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
}
gatewayGroup2appsToApiConfig.add(ac);
gatewayGroup2apiConfig.add(ac);
}
log.info("add " + ac);
}
public void remove(ApiConfig ac) {
ApiConfig remove = apiConfigMap.remove(ac.id);
Map<HttpMethod, GatewayGroup2appsToApiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
Map<HttpMethod, GatewayGroup2apiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
log.info("no config to delete for " + ac.service + ' ' + ac.path);
} else {
GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2appsToApiConfig == null) {
GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2apiConfig == null) {
log.info("no config to delete for " + ac.service + ' ' + ac.method + ' ' + ac.path);
} else {
log.info(id + " remove " + ac);
gatewayGroup2appsToApiConfig.remove(ac);
gatewayGroup2apiConfig.remove(ac);
}
}
}
@@ -91,22 +91,22 @@ public class ServiceConfig {
public void update(ApiConfig ac) {
ApiConfig prev = apiConfigMap.put(ac.id, ac);
log.info(prev + " is updated by " + ac + " in api config map");
EnumMap<HttpMethod, GatewayGroup2appsToApiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
EnumMap<HttpMethod, GatewayGroup2apiConfig> method2apiConfigMap = path2methodToApiConfigMapMap.get(ac.path);
if (method2apiConfigMap == null) {
method2apiConfigMap = new EnumMap<>(HttpMethod.class);
GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
gatewayGroup2appsToApiConfig.add(ac);
method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
GatewayGroup2apiConfig gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
gatewayGroup2apiConfig.add(ac);
method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
path2methodToApiConfigMapMap.put(ac.path, method2apiConfigMap);
} else {
GatewayGroup2appsToApiConfig gatewayGroup2appsToApiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2appsToApiConfig == null) {
gatewayGroup2appsToApiConfig = new GatewayGroup2appsToApiConfig();
method2apiConfigMap.put(ac.method, gatewayGroup2appsToApiConfig);
gatewayGroup2appsToApiConfig.add(ac);
GatewayGroup2apiConfig gatewayGroup2apiConfig = method2apiConfigMap.get(ac.method);
if (gatewayGroup2apiConfig == null) {
gatewayGroup2apiConfig = new GatewayGroup2apiConfig();
method2apiConfigMap.put(ac.method, gatewayGroup2apiConfig);
gatewayGroup2apiConfig.add(ac);
} else {
log.info(id + " update " + ac);
gatewayGroup2appsToApiConfig.update(ac);
gatewayGroup2apiConfig.update(ac);
}
}
}
@@ -114,7 +114,7 @@ public class ServiceConfig {
@JsonIgnore
public ApiConfig getApiConfig(HttpMethod method, String path, String gatewayGroup, String app) {
// GatewayGroup2appsToApiConfig r = getApiConfig0(method, path);
GatewayGroup2appsToApiConfig r = getApiConfig(method, path);
GatewayGroup2apiConfig r = getApiConfig(method, path);
if (r == null) {
return null;
}
@@ -124,19 +124,19 @@ public class ServiceConfig {
return r.get(gatewayGroup, app);
}
private GatewayGroup2appsToApiConfig getApiConfig(HttpMethod method, String reqPath) {
private GatewayGroup2apiConfig getApiConfig(HttpMethod method, String reqPath) {
List<String> matchPathPatterns = ThreadContext.getArrayList(mpps, String.class);
Set<Map.Entry<String, EnumMap<HttpMethod, GatewayGroup2appsToApiConfig>>> es = path2methodToApiConfigMapMap.entrySet();
for (Map.Entry<String, EnumMap<HttpMethod, GatewayGroup2appsToApiConfig>> e : es) {
Set<Map.Entry<String, EnumMap<HttpMethod, GatewayGroup2apiConfig>>> es = path2methodToApiConfigMapMap.entrySet();
for (Map.Entry<String, EnumMap<HttpMethod, GatewayGroup2apiConfig>> e : es) {
String pathPattern = e.getKey();
if (ApiConfig.isAntPathPattern(pathPattern)) {
if (antPathMatcher.match(pathPattern, reqPath)) {
matchPathPatterns.add(pathPattern);
}
} else if (reqPath.equals(pathPattern)) {
return getGatewayGroup2appsToApiConfig(method, e.getValue());
return getGatewayGroup2apiConfig(method, e.getValue());
}
}
if (matchPathPatterns.isEmpty()) {
@@ -149,7 +149,7 @@ public class ServiceConfig {
"\nmatch patterns: " + matchPathPatterns +
"\nbest one: " + bestPattern);
}
return getGatewayGroup2appsToApiConfig(method, path2methodToApiConfigMapMap.get(bestPattern));
return getGatewayGroup2apiConfig(method, path2methodToApiConfigMapMap.get(bestPattern));
}
}
@@ -174,8 +174,8 @@ public class ServiceConfig {
// }
// }
private GatewayGroup2appsToApiConfig getGatewayGroup2appsToApiConfig(HttpMethod method, EnumMap<HttpMethod, GatewayGroup2appsToApiConfig> method2apiConfigMap) {
GatewayGroup2appsToApiConfig r = method2apiConfigMap.get(method);
private GatewayGroup2apiConfig getGatewayGroup2apiConfig(HttpMethod method, EnumMap<HttpMethod, GatewayGroup2apiConfig> method2apiConfigMap) {
GatewayGroup2apiConfig r = method2apiConfigMap.get(method);
if (r == null) {
return method2apiConfigMap.get(HttpMethod.X);
} else {

View File

@@ -119,7 +119,7 @@ public class StatPluginFilter extends PluginFilter {
if (StringUtils.isBlank(fizzAccessStatTopic)) {
rt.convertAndSend(fizzAccessStatChannel, b.toString()).subscribe();
} else {
log.info(b.toString(), LogService.HANDLE_STGY, LogService.toKF(fizzAccessStatTopic));
log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(fizzAccessStatTopic)); // for internal use
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public enum BlockType {
/**
* Blocked by concurrent request rule
*/
CONCURRENT_REQUEST,
/**
* Blocked by QPS
*/
QPS;
}

View File

@@ -0,0 +1,483 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import we.stats.BlockType;
import we.util.Utils;
/**
* Flow Statistic
*
* @author Francis Dong
*
*/
public class FlowStat {
private static final Logger log = LoggerFactory.getLogger(FlowStat.class);
/**
* Time slot interval in millisecond
*/
public static long INTERVAL = 1000;
/**
* A string Resource ID as key
*/
public ConcurrentMap<String, ResourceStat> resourceStats = new ConcurrentHashMap<>(100);
/**
* Retention time of statistic data
*/
public static long RETENTION_TIME_IN_MINUTES = 10;
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock w = rwl.writeLock();
private ExecutorService pool = Executors.newFixedThreadPool(2);
public FlowStat() {
runScheduleJob();
}
private void runScheduleJob() {
pool.submit(new HousekeepJob(this));
pool.submit(new PeakConcurrentJob(this));
}
/**
* Update retention time
*
* @param retentionTimeInMinutes
*/
public void updateRetentionTime(int retentionTimeInMinutes) {
RETENTION_TIME_IN_MINUTES = retentionTimeInMinutes;
}
/**
* Returns the current time slot ID
*
* @return
*/
public long currentTimeSlotId() {
return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
}
/**
* Returns the time slot ID of the specified time
*
* @param timeMilli
* @return
*/
public long getTimeSlotId(long timeMilli) {
return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
}
/**
* Increase concurrent request counter for given resources chain
*
* @param resourceConfigs Resource configurations
* @param curTimeSlotId current time slot ID, it should be generated by
* Flowstat.currentTimeSlotId()
* @return IncrRequestResult
*/
public IncrRequestResult incrRequest(List<ResourceConfig> resourceConfigs, long curTimeSlotId) {
if (resourceConfigs == null || resourceConfigs.size() == 0) {
return null;
}
w.lock();
try {
// check if exceed limit
for (ResourceConfig resourceConfig : resourceConfigs) {
long maxCon = resourceConfig.getMaxCon();
long maxQPS = resourceConfig.getMaxQPS();
if (maxCon > 0 || maxQPS > 0) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
// check concurrent request
if (maxCon > 0) {
long n = resourceStat.getConcurrentRequests().get();
if (n >= maxCon) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
return IncrRequestResult.block(resourceConfig.getResourceId(),
BlockType.CONCURRENT_REQUEST);
}
}
// check QPS
if (maxQPS > 0) {
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
if (total >= maxQPS) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
}
}
}
}
// increase request and concurrent request
for (ResourceConfig resourceConfig : resourceConfigs) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
long cons = resourceStat.getConcurrentRequests().incrementAndGet();
resourceStat.getTimeSlot(curTimeSlotId).updatePeakConcurrentReqeusts(cons);
resourceStat.getTimeSlot(curTimeSlotId).incr();
}
return IncrRequestResult.success();
} finally {
w.unlock();
}
}
/**
* Add request RT and Decrease concurrent request for given resources chain
*
* @param resourceConfigs
* @param timeSlotId
* @param rt
* @param isSuccess
*/
public void addRequestRT(List<ResourceConfig> resourceConfigs, long timeSlotId, long rt, boolean isSuccess) {
if (resourceConfigs == null || resourceConfigs.size() == 0) {
return;
}
for (int i = resourceConfigs.size() - 1; i >= 0; i--) {
ResourceStat resourceStat = getResourceStat(resourceConfigs.get(i).getResourceId());
resourceStat.decrConcurrentRequest(timeSlotId);
resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
}
}
/**
* Increase concurrent request counter of the specified resource
*
* @param resourceId Resource ID
* @param curTimeSlotId current time slot ID, it should be generated by
* Flowstat.currentTimeSlotId()
* @param maxCon Maximum concurrent request of the specified resource,
* null/zero/negative for no limit
* @param maxRPS Maximum RPS of the specified resource,
* null/zero/negative for no limit
* @return true if the request is not blocked; false if exceed the maximum
* concurrent request/RPS of the specified resource
*/
public boolean incrRequest(String resourceId, long curTimeSlotId, Long maxCon, Long maxRPS) {
ResourceStat resourceStat = getResourceStat(resourceId);
boolean success = resourceStat.incrConcurrentRequest(curTimeSlotId, maxCon);
if (success) {
success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS);
}
if (log.isDebugEnabled()) {
log.debug(resourceId + " incr req for current time slot " + curTimeSlotId + " with max con " + maxCon
+ " and max rps " + maxRPS);
}
return success;
}
/**
* Decrease concurrent request of the specified resource of the specified time
* slot
*
* @param resourceId Resource ID
* @param timeSlotId TimeSlot ID
* @return
*/
public void decrConcurrentRequest(String resourceId, long timeSlotId) {
if (resourceId == null) {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
long conns = resourceStat.getConcurrentRequests().get();
if (conns == 0) {
if (log.isDebugEnabled()) {
StringBuilder b = new StringBuilder(256);
b.append(timeSlotId + " " + resourceId + " conns 0 before decr it").append('\n');
Utils.threadCurrentStack2stringBuilder(b);
log.debug(b.toString());
}
}
resourceStat.decrConcurrentRequest(timeSlotId);
}
/**
* Add request RT to the specified time slot counter
*
* @param resourceId Resource ID
* @param timeSlotId TimeSlot ID
* @param rt Response time of request
* @param isSuccess Whether the request is success or not
* @return
*/
public void addRequestRT(String resourceId, long timeSlotId, long rt, boolean isSuccess) {
if (resourceId == null) {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
}
public ResourceStat getResourceStat(String resourceId) {
ResourceStat resourceStat = null;
if (resourceStats.containsKey(resourceId)) {
resourceStat = resourceStats.get(resourceId);
} else {
resourceStat = new ResourceStat(resourceId);
if (log.isDebugEnabled()) {
log.debug("no resource stat for " + resourceId + ", create one " + resourceStat);
}
ResourceStat rs = resourceStats.putIfAbsent(resourceId, resourceStat);
if (rs != null) {
resourceStat = rs;
}
}
return resourceStat;
}
/**
* Returns the current concurrent requests of the specified resource<br/>
* <br/>
*
* @param resourceId Resource ID
*/
public long getConcurrentRequests(String resourceId) {
ResourceStat resourceStat = getResourceStat(resourceId);
return resourceStat.getConcurrentRequests().get();
}
/**
* Returns current TimeWindowStat of the specified resource
*
* @param resourceId
* @return
*/
public TimeWindowStat getCurrentTimeWindowStat(String resourceId) {
long startTimeMilli = currentTimeSlotId();
return getTimeWindowStat(resourceId, startTimeMilli, startTimeMilli + 1000);
}
/**
* Returns current TimeWindowStat of the specified resource
*
* @param resourceId
* @param curTimeSlotId
* @return
*/
@SuppressWarnings("unused")
private TimeWindowStat getCurrentTimeWindowStat(String resourceId, long curTimeSlotId) {
return getTimeWindowStat(resourceId, curTimeSlotId, curTimeSlotId + 1000);
}
/**
* Returns the TimeWindowStat of previous second of the specified time
*
* @param resourceId
* @param timeMilli
* @return
*/
public TimeWindowStat getPreviousSecondStat(String resourceId, long timeMilli) {
long endTimeMilli = (timeMilli / INTERVAL) * INTERVAL;
return getTimeWindowStat(resourceId, endTimeMilli - 1000, endTimeMilli);
}
/**
* Returns the timeWindowStat of the specific resource in the specified time
* window [startTimeMilli, endTimeMilli)
*
* @param startTimeMilli included
* @param endTimeMilli excluded
* @return
*/
public TimeWindowStat getTimeWindowStat(String resourceId, long startTimeMilli, long endTimeMilli) {
long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
if (startSlotId == endSlotId) {
endSlotId = endSlotId + INTERVAL;
}
if (resourceStats.containsKey(resourceId)) {
ResourceStat resourceStat = resourceStats.get(resourceId);
return resourceStat.getTimeWindowStat(startSlotId, endSlotId);
}
return null;
}
/**
* Returns the ResourceTimeWindowStat list in the specified time window
* [startTimeMilli, endTimeMilli), The time slot unit is one second
*
* @param resourceId optional, returns ResourceSlot list of all resources
* while resourceId is null
* @param startTimeMilli
* @param endTimeMilli
* @return
*/
@SuppressWarnings("unused")
public List<ResourceTimeWindowStat> getResourceTimeWindowStats(String resourceId, long startTimeMilli,
long endTimeMilli) {
return this.getResourceTimeWindowStats(resourceId, startTimeMilli, endTimeMilli, 1);
}
/**
* Returns the ResourceTimeWindow list in the specified time window
* [startTimeMilli, endTimeMilli)
*
* @param resourceId optional, returns ResourceTimeWindowStat list of all
* resources while resourceId is null
* @param startTimeMilli
* @param endTimeMilli
* @param slotIntervalInSec interval of custom time slot in millisecond, such as
* 60 for 1 minutes
* @return
*/
@SuppressWarnings("unused")
public List<ResourceTimeWindowStat> getResourceTimeWindowStats(String resourceId, long startTimeMilli,
long endTimeMilli, long slotIntervalInSec) {
List<ResourceTimeWindowStat> list = new ArrayList<>();
long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
if (startSlotId == endSlotId) {
endSlotId = endSlotId + INTERVAL;
}
if (slotIntervalInSec < 1 || (endSlotId - startSlotId) / 1000 < slotIntervalInSec) {
return list;
}
long slotInterval = slotIntervalInSec * 1000;
if (resourceId == null) {
Set<Map.Entry<String, ResourceStat>> entrys = resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String rid = entry.getKey();
ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid);
long end = startSlotId + slotInterval;
for (long start = startSlotId; end <= endSlotId;) {
TimeWindowStat tws = getTimeWindowStat(rid, start, end);
if (tws != null) {
resourceWin.getWindows().add(tws);
}
start += slotInterval;
end += slotInterval;
}
if (resourceWin.getWindows().size() > 0) {
list.add(resourceWin);
}
}
} else {
ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(resourceId);
long end = startSlotId + slotInterval;
for (long start = startSlotId; end <= endSlotId;) {
TimeWindowStat tws = getTimeWindowStat(resourceId, start, end);
if (tws != null) {
resourceWin.getWindows().add(tws);
}
start += slotInterval;
end += slotInterval;
}
if (resourceWin.getWindows().size() > 0) {
list.add(resourceWin);
}
}
return list;
}
class HousekeepJob implements Runnable {
private FlowStat stat;
public HousekeepJob(FlowStat stat) {
this.stat = stat;
}
@Override
public void run() {
long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
long lastSlotId = stat.currentTimeSlotId() - n;
while (true) {
// log.debug("housekeeping start");
long slotId = stat.currentTimeSlotId() - n;
for (long i = lastSlotId; i < slotId;) {
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey();
ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots();
// log.debug("housekeeping remove slot: resourceId={} slotId=={}", resourceId,
// i);
timeSlots.remove(i);
}
i = i + FlowStat.INTERVAL;
}
lastSlotId = slotId;
// log.debug("housekeeping done");
try {
Thread.sleep(60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class PeakConcurrentJob implements Runnable {
private FlowStat stat;
public PeakConcurrentJob(FlowStat stat) {
this.stat = stat;
}
@Override
public void run() {
Long lastTimeSlotId = null;
while (true) {
long curTimeSlotId = stat.currentTimeSlotId();
if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) {
// log.debug("PeakConcurrentJob start");
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey();
// log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,
// curTimeSlotId);
entry.getValue().getTimeSlot(curTimeSlotId);
}
lastTimeSlotId = curTimeSlotId;
// log.debug("PeakConcurrentJob done");
}
try {
Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public class IncrRequestResult {
/**
* true if success, otherwise false
*/
private boolean success;
/**
* Resource ID that causes block
*/
private String blockedResourceId;
/**
* block type
*/
private BlockType blockType;
public static IncrRequestResult success() {
return new IncrRequestResult(true, null, null);
}
public static IncrRequestResult block(String resourceId, BlockType blockType) {
return new IncrRequestResult(false, resourceId, blockType);
}
public IncrRequestResult(boolean success, String resourceId, BlockType blockType) {
this.success = success;
this.blockedResourceId = resourceId;
this.blockType = blockType;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getBlockedResourceId() {
return blockedResourceId;
}
public void setBlockedResourceId(String blockedResourceId) {
this.blockedResourceId = blockedResourceId;
}
public BlockType getBlockType() {
return blockType;
}
public void setBlockType(BlockType blockType) {
this.blockType = blockType;
}
}

View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public class ResourceConfig {
/**
* Resouce ID
*/
private String resourceId;
/**
* Maximum concurrent request, zero or negative for no limit
*/
private long maxCon;
/**
* Maximum QPS, zero or negative for no limit
*/
private long maxQPS;
public ResourceConfig(String resourceId, long maxCon, long maxQPS) {
this.resourceId = resourceId;
this.maxCon = maxCon;
this.maxQPS = maxQPS;
}
public ResourceConfig() {
}
public String getResourceId() {
return resourceId;
}
public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}
public long getMaxCon() {
return maxCon;
}
public void setMaxCon(long maxCon) {
this.maxCon = maxCon;
}
public long getMaxQPS() {
return maxQPS;
}
public void setMaxQPS(long maxQPS) {
this.maxQPS = maxQPS;
}
}

View File

@@ -0,0 +1,269 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Francis Dong
*
*/
public class ResourceStat {
private static final Logger log = LoggerFactory.getLogger(ResourceStat.class);
/**
* Resource ID
*/
private String resourceId;
/**
* Request count of time slot, the beginning timestamp(timeId) as key
*/
private ConcurrentMap<Long, TimeSlot> timeSlots = new ConcurrentHashMap<>(100);
/**
* Concurrent requests
*/
private AtomicLong concurrentRequests = new AtomicLong(0);
private ReentrantReadWriteLock rwl1 = new ReentrantReadWriteLock();
private ReentrantReadWriteLock rwl2 = new ReentrantReadWriteLock();
private Lock w1 = rwl1.writeLock();
private Lock w2 = rwl2.writeLock();
public ResourceStat(String resourceId) {
this.resourceId = resourceId;
}
/**
* Returns Time slot of the specified time slot ID
*
* @param timeSlotId
* @return
*/
public TimeSlot getTimeSlot(long timeSlotId) {
if (timeSlots.containsKey(timeSlotId)) {
return timeSlots.get(timeSlotId);
} else {
TimeSlot timeSlot = new TimeSlot(timeSlotId);
timeSlot.setPeakConcurrentReqeusts(this.concurrentRequests.get());
TimeSlot old = timeSlots.putIfAbsent(timeSlotId, timeSlot);
if (old != null) {
return old;
} else {
return timeSlot;
}
}
}
/**
* Increase concurrent request counter of the resource
*
* @param timeSlotId
* @param maxCon
* @return false if exceed the maximum concurrent request of the specified
* resource
*/
public boolean incrConcurrentRequest(long timeSlotId, Long maxCon) {
w1.lock();
try {
boolean isExceeded = false;
if (maxCon != null && maxCon.intValue() > 0) {
long n = this.concurrentRequests.get();
if (n >= maxCon.longValue()) {
isExceeded = true;
this.incrBlockRequestToTimeSlot(timeSlotId);
} else {
long conns = this.concurrentRequests.incrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}
} else {
long conns = this.concurrentRequests.incrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}
return !isExceeded;
} finally {
w1.unlock();
}
}
/**
* Decrease concurrent request counter of the resource
*
*/
public void decrConcurrentRequest(long timeSlotId) {
long conns = this.concurrentRequests.decrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}
/**
* Increase block request to the specified time slot
*
*/
public void incrBlockRequestToTimeSlot(long timeSlotId) {
this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet();
}
/**
* Add request to the specified time slot
*
* @param timeSlotId
* @return false if exceed the maximum RPS of the specified resource
*/
public boolean incrRequestToTimeSlot(long timeSlotId, Long maxRPS) {
w2.lock();
try {
boolean isExceeded = false;
if (maxRPS != null && maxRPS.intValue() > 0) {
// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId);
// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) {
// isExceeded = true;
// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
// }
// time slot unit is one second
long total = this.getTimeSlot(timeSlotId).getCounter().get();
long max = Long.valueOf(maxRPS);
if (total >= max) {
isExceeded = true;
this.incrBlockRequestToTimeSlot(timeSlotId);
this.decrConcurrentRequest(timeSlotId);
} else {
this.getTimeSlot(timeSlotId).incr();
}
} else {
this.getTimeSlot(timeSlotId).incr();
}
return !isExceeded;
} finally {
w2.unlock();
}
}
/**
* Add request RT to the specified time slot
*
* @param timeSlotId
* @param rt response time of the request
* @param isSuccess Whether the request is success or not
* @return
*/
public void addRequestRT(long timeSlotId, long rt, boolean isSuccess) {
this.getTimeSlot(timeSlotId).addRequestRT(rt, isSuccess);
}
/**
* Returns statistic of the specified time window
*
* @param startSlotId
* @param endSlotId
* @return
*/
public TimeWindowStat getTimeWindowStat(long startSlotId, long endSlotId) {
TimeWindowStat tws = new TimeWindowStat();
tws.setStartTime(startSlotId);
tws.setEndTime(endSlotId);
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
long totalReqs = 0;
long totalRt = 0;
long peakConcurrences = 0;
long errors = 0;
long blockReqs = 0;
long compReqs = 0;
for (long i = startSlotId; i < endSlotId;) {
if (timeSlots.containsKey(i)) {
TimeSlot timeSlot = timeSlots.get(i);
min = timeSlot.getMin() < min ? timeSlot.getMin() : min;
max = timeSlot.getMax() > max ? timeSlot.getMax() : max;
peakConcurrences = timeSlot.getPeakConcurrentReqeusts() > peakConcurrences
? timeSlot.getPeakConcurrentReqeusts()
: peakConcurrences;
totalReqs = totalReqs + timeSlot.getCounter().get();
totalRt = totalRt + timeSlot.getTotalRt().get();
errors = errors + timeSlot.getErrors().get();
blockReqs = blockReqs + timeSlot.getBlockRequests().get();
compReqs = compReqs + timeSlot.getCompReqs().get();
}
i = i + FlowStat.INTERVAL;
}
tws.setMin(min == Long.MAX_VALUE ? null : min);
tws.setMax(max == Long.MIN_VALUE ? null : max);
tws.setPeakConcurrentReqeusts(peakConcurrences);
tws.setTotal(totalReqs);
tws.setErrors(errors);
tws.setBlockRequests(blockReqs);
tws.setCompReqs(compReqs);
if (compReqs > 0) {
tws.setAvgRt(totalRt / compReqs);
}
if (totalReqs > 0) {
BigDecimal nsec = new BigDecimal(endSlotId - startSlotId).divide(new BigDecimal(1000), 5,
BigDecimal.ROUND_HALF_UP);
BigDecimal rps = new BigDecimal(totalReqs).divide(nsec, 5, BigDecimal.ROUND_HALF_UP);
if (rps.compareTo(new BigDecimal(10)) >= 0) {
rps = rps.setScale(0, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
} else {
rps = rps.setScale(2, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
}
tws.setRps(rps);
}
return tws;
}
public String getResourceId() {
return resourceId;
}
public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}
public ConcurrentMap<Long, TimeSlot> getTimeSlots() {
return timeSlots;
}
public void setTimeSlots(ConcurrentMap<Long, TimeSlot> timeSlots) {
this.timeSlots = timeSlots;
}
public AtomicLong getConcurrentRequests() {
return concurrentRequests;
}
public void setConcurrentRequests(AtomicLong concurrentRequests) {
this.concurrentRequests = concurrentRequests;
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author Francis Dong
*
*/
public class ResourceTimeWindowStat {
/**
* Resource ID
*/
private String resourceId;
private List<TimeWindowStat> windows = new ArrayList<>();
public ResourceTimeWindowStat(String resourceId) {
this.resourceId = resourceId;
}
public String getResourceId() {
return resourceId;
}
public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}
public List<TimeWindowStat> getWindows() {
return windows;
}
public void setWindows(List<TimeWindowStat> windows) {
this.windows = windows;
}
}

View File

@@ -0,0 +1,184 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* @author Francis Dong
*
*/
public class TimeSlot {
/**
* Time slot start timestamp as ID
*/
private long id;
/**
* Request counter
*/
private AtomicLong counter = new AtomicLong();
/**
* Error request counter
*/
private AtomicLong errors = new AtomicLong();
/**
* Minimum response time
*/
private long min = Long.MAX_VALUE;
/**
* Maximum response time
*/
private long max = Long.MIN_VALUE;
/**
* Total response time
*/
private AtomicLong totalRt = new AtomicLong(0);
/**
* Completed Request counter
*/
private AtomicLong compReqs = new AtomicLong();
/**
* Peak concurrent requests
*/
private long peakConcurrentReqeusts;
/**
* Block requests <br/>
*/
private AtomicLong blockRequests = new AtomicLong(0);
public TimeSlot(long id) {
this.id = id;
}
public long getId() {
return id;
}
/**
* Add request to time slot
*
*/
public void incr() {
counter.incrementAndGet();
}
/**
* Add request RT information to time slot
*
* @param rt
* @param isSuccess Whether the request is success or not
*/
public synchronized void addRequestRT(long rt, boolean isSuccess) {
totalRt.addAndGet(rt);
compReqs.incrementAndGet();
if (!isSuccess) {
errors.incrementAndGet();
}
min = rt < min ? rt : min;
max = rt > max ? rt : max;
}
/**
* Update peak concurrent requests of this time slot
*
* @param concurrentRequests Current concurrent requests
*/
public synchronized void updatePeakConcurrentReqeusts(long concurrentRequests) {
peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests
: peakConcurrentReqeusts;
}
public void setId(long id) {
this.id = id;
}
public AtomicLong getCounter() {
return counter;
}
public void setCounter(AtomicLong counter) {
this.counter = counter;
}
public long getMin() {
return min;
}
public void setMin(long min) {
this.min = min;
}
public long getMax() {
return max;
}
public void setMax(long max) {
this.max = max;
}
public AtomicLong getTotalRt() {
return totalRt;
}
public void setTotalRt(AtomicLong totalRt) {
this.totalRt = totalRt;
}
public long getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(long peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}
public AtomicLong getErrors() {
return errors;
}
public void setErrors(AtomicLong errors) {
this.errors = errors;
}
public AtomicLong getBlockRequests() {
return blockRequests;
}
public void setBlockRequests(AtomicLong blockRequests) {
this.blockRequests = blockRequests;
}
public AtomicLong getCompReqs() {
return compReqs;
}
public void setCompReqs(AtomicLong compReqs) {
this.compReqs = compReqs;
}
}

View File

@@ -0,0 +1,171 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import java.math.BigDecimal;
/**
*
* @author Francis Dong
*
*/
public class TimeWindowStat {
/**
* Start time of time window[startTime,endTime)
*/
private Long startTime;
/**
* End time of time window, [startTime,endTime)
*/
private Long endTime;
/**
* Minimum response time
*/
private Long min;
/**
* Maximum response time
*/
private Long max;
/**
* Average response time
*/
private Long avgRt;
/**
* Total requests
*/
private Long total;
/**
* Completed requests
*/
private Long compReqs;
/**
* Total error requests
*/
private Long errors;
/**
* the average RPS(Requests Per Second) of time window
*/
private BigDecimal rps;
/**
* Peak concurrent requests of the time window
*/
private Long peakConcurrentReqeusts;
/**
* Block requests
*/
private Long blockRequests;
public Long getBlockRequests() {
return blockRequests;
}
public void setBlockRequests(Long blockRequests) {
this.blockRequests = blockRequests;
}
public Long getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(Long peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}
public Long getErrors() {
return errors;
}
public void setErrors(Long errors) {
this.errors = errors;
}
public Long getMin() {
return min;
}
public void setMin(Long min) {
this.min = min;
}
public Long getMax() {
return max;
}
public void setMax(Long max) {
this.max = max;
}
public BigDecimal getRps() {
return rps;
}
public void setRps(BigDecimal rps) {
this.rps = rps;
}
public Long getAvgRt() {
return avgRt;
}
public void setAvgRt(Long avgRt) {
this.avgRt = avgRt;
}
public Long getTotal() {
return total;
}
public void setTotal(Long total) {
this.total = total;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public Long getCompReqs() {
return compReqs;
}
public void setCompReqs(Long compReqs) {
this.compReqs = compReqs;
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats.ratelimit;
import we.util.JacksonUtils;
/**
* @author hongqiaowei
*/
public class ResourceRateLimitConfig {
public static interface Type {
static final byte GLOBAL = 1;
static final byte SERVICE_DEFAULT = 2;
static final byte SERVICE = 3;
static final byte API = 4;
}
public static final int DELETED = 1;
public static final String GLOBAL = "_global";
public static final String SERVICE_DEFAULT = "service_default";
private static final int ENABLE = 1;
private static final int UNABLE = 0;
public int isDeleted = 0;
public int id;
private boolean enable = true;
public String resource;
public byte type;
public long qps;
public long concurrents;
public String responseType;
public String responseContent;
public boolean isEnable() {
return enable;
}
public void setEnable(int v) {
if (v == ENABLE) {
enable = true;
} else {
enable = false;
}
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -0,0 +1,167 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats.ratelimit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author hongqiaowei
*/
@Service
public class ResourceRateLimitConfigService {
private static final Logger log = LoggerFactory.getLogger(ResourceRateLimitConfigService.class);
private static final String fizzRateLimit = "fizz_rate_limit";
private static final String fizzRateLimitChannel = "fizz_rate_limit_channel";
private Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMap = new HashMap<>(32);
private Map<Integer, ResourceRateLimitConfig> oldResourceRateLimitConfigMap = new HashMap<>(32);
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
@PostConstruct
public void init() throws Throwable {
final Throwable[] throwable = new Throwable[1];
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzRateLimit)
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
log.info(null, t);
})
.concatMap(e -> {
Object k = e.getKey();
if (k == ReactorUtils.OBJ) {
return Flux.just(e);
}
Object v = e.getValue();
log.info("rateLimitConfig: " + v.toString(), LogService.BIZ_ID, k.toString());
String json = (String) v;
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
updateResourceRateLimitConfigMap(rrlc);
return Flux.just(e);
} catch (Throwable t) {
throwable[0] = t;
log.info(json, t);
return Flux.error(t);
}
}).blockLast())).flatMap(
e -> {
if (throwable[0] != null) {
return Mono.error(throwable[0]);
}
return lsnResourceRateLimitConfigChange();
}
).block();
if (error != ReactorUtils.EMPTY_THROWABLE) {
throw error;
}
}
private Mono<Throwable> lsnResourceRateLimitConfigChange() {
final Throwable[] throwable = new Throwable[1];
final boolean[] b = {false};
rt.listenToChannel(fizzRateLimitChannel).doOnError(t -> {
throwable[0] = t;
b[0] = false;
log.error("lsn " + fizzRateLimitChannel, t);
}).doOnSubscribe(
s -> {
b[0] = true;
log.info("success to lsn on " + fizzRateLimitChannel);
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) {
resourceRateLimitConfigMap.remove(r.resource);
}
updateResourceRateLimitConfigMap(rrlc);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) {
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
}
} catch (Throwable t) {
log.info(json, t);
}
}).subscribe();
Throwable t = throwable[0];
while (!b[0]) {
if (t != null) {
return Mono.error(t);
} else {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
return Mono.error(e);
}
}
}
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
}
private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc) {
if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) {
ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource);
log.info("remove " + removedRrlc);
} else {
ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.resource);
resourceRateLimitConfigMap.put(rrlc.resource, rrlc);
if (existRrlc == null) {
log.info("add " + rrlc);
} else {
log.info("update " + existRrlc + " with " + rrlc);
}
}
}
public void setReactiveStringRedisTemplate(ReactiveStringRedisTemplate rt) {
this.rt = rt;
}
public ResourceRateLimitConfig getResourceRateLimitConfig(String resource) {
return resourceRateLimitConfigMap.get(resource);
}
public Map<String, ResourceRateLimitConfig> getResourceRateLimitConfigMap() {
return resourceRateLimitConfigMap;
}
}

View File

@@ -18,6 +18,7 @@
package we.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Consts;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -83,4 +84,13 @@ public abstract class Utils {
ca[0] += 32;
return String.valueOf(ca);
}
public static void threadCurrentStack2stringBuilder(StringBuilder b) {
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
if (stackTraces != null) {
for (int i = 0; i < stackTraces.length; i++) {
b.append(stackTraces[i]).append(Constants.Symbol.LF);
}
}
}
}

View File

@@ -0,0 +1,54 @@
package we.redis;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.TestConfiguration;
/**
* @author hongqiaowei
*/
@TestConfiguration
public class RedisProperties {
private String host;
private int port;
private int database;
public RedisProperties(
@Value("${embeded.redis.port}") int port,
@Value("${embeded.redis.host}") String host,
@Value("${embeded.redis.database}") int database) {
this.port = port;
this.host = host;
this.database = database;
}
public int getPort() {
return port;
}
public void setPort(int redisPort) {
this.port = redisPort;
}
public String getHost() {
return host;
}
public void setHost(String redisHost) {
this.host = redisHost;
}
public int getDatabase() {
return database;
}
public void setDatabase(int database) {
this.database = database;
}
@Override
public String toString() {
return "redis:[host:" + host + ",port:" + port + ",database:" + database + "]";
}
}

View File

@@ -0,0 +1,34 @@
package we.redis;
import org.springframework.boot.test.context.TestConfiguration;
import redis.embedded.RedisServer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @author hongqiaowei
*/
@TestConfiguration
public class RedisServerConfiguration {
private RedisServer redisServer;
public RedisServerConfiguration(RedisProperties redisProperties) {
redisServer = RedisServer.builder()
.port(redisProperties.getPort())
.setting("maxmemory 32M")
.build();
}
@PostConstruct
public void postConstruct() {
redisServer.start();
}
@PreDestroy
public void preDestroy() {
redisServer.stop();
}
}

View File

@@ -0,0 +1,40 @@
package we.redis;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
/**
* @author hongqiaowei
*/
@TestConfiguration
// @EnableRedisRepositories
public class RedisTemplateConfiguration {
@Bean
public LettuceConnectionFactory redisConnectionFactory(
RedisProperties redisProperties) {
LettuceConnectionFactory cf = new LettuceConnectionFactory(
redisProperties.getHost(),
redisProperties.getPort());
cf.setDatabase(redisProperties.getDatabase());
return cf;
}
@Bean
public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory connectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(connectionFactory);
return template;
}
@Bean
public ReactiveStringRedisTemplate reactiveRedisTemplate(LettuceConnectionFactory factory) {
return new ReactiveStringRedisTemplate(factory);
}
}

View File

@@ -0,0 +1,540 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import we.util.JacksonUtils;
/**
*
* @author Francis Dong
*
*/
public class FlowStatTests {
private FlowStat stat = new FlowStat();
class FlowRuleCase {
public int threads = 10;
public int requests = 10000;
public int totalReqs = threads * requests;
public List<ResourceConfig> resourceConfigs = new ArrayList<>();
public List<ResourceExpect> resourceExpects = new ArrayList<>();
public IncrRequestResult expectResult;
}
class ResourceExpect {
public long concurrents;
public long QPS;
public long total;
public long blockedReqs;
public ResourceExpect(long concurrents, long QPS, long total, long blockedReqs) {
this.concurrents = concurrents;
this.QPS = QPS;
this.total = total;
this.blockedReqs = blockedReqs;
}
public ResourceExpect() {
}
}
public List<FlowRuleCase> createFlowRuleCase() {
List<FlowRuleCase> cases = new ArrayList<>();
// blocked by service concurrent request
FlowRuleCase c1 = new FlowRuleCase();
c1.resourceConfigs.add(new ResourceConfig("_global1", 100, 200));
c1.resourceConfigs.add(new ResourceConfig("service1", 10, 200));
c1.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c1.resourceExpects.add(new ResourceExpect(10, 10, 10, c1.totalReqs - 10));
c1.expectResult = IncrRequestResult.block("service1", BlockType.CONCURRENT_REQUEST);
cases.add(c1);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c2 = new FlowRuleCase();
c2.resourceConfigs.add(new ResourceConfig("_global2", 10, 200));
c2.resourceConfigs.add(new ResourceConfig("service2", 200, 200));
c2.resourceExpects.add(new ResourceExpect(10, 10, 10, c2.totalReqs - 10));
c2.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c2.expectResult = IncrRequestResult.block("_global2", BlockType.CONCURRENT_REQUEST);
cases.add(c2);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c3 = new FlowRuleCase();
c3.resourceConfigs.add(new ResourceConfig("_global3", 200, 10));
c3.resourceConfigs.add(new ResourceConfig("service3", 200, 100));
c3.resourceExpects.add(new ResourceExpect(10, 10, 10, c3.totalReqs - 10));
c3.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c3.expectResult = IncrRequestResult.block("_global3", BlockType.QPS);
cases.add(c3);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c4 = new FlowRuleCase();
c4.resourceConfigs.add(new ResourceConfig("_global4", 200, 100));
c4.resourceConfigs.add(new ResourceConfig("service4", 200, 10));
c4.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c4.resourceExpects.add(new ResourceExpect(10, 10, 10, c4.totalReqs - 10));
c4.expectResult = IncrRequestResult.block("service4", BlockType.QPS);
cases.add(c4);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c5 = new FlowRuleCase();
c5.resourceConfigs.add(new ResourceConfig("_global5", 0, 0));
c5.resourceConfigs.add(new ResourceConfig("service5", 0, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.expectResult = IncrRequestResult.success();
cases.add(c5);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c6 = new FlowRuleCase();
c6.resourceConfigs.add(new ResourceConfig("_global6", 20, 0));
c6.resourceConfigs.add(new ResourceConfig("service6", 20, 0));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST);
cases.add(c6);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c7 = new FlowRuleCase();
c7.resourceConfigs.add(new ResourceConfig("_global7", 0, 0));
c7.resourceConfigs.add(new ResourceConfig("service7", 0, 20));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20));
c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS);
cases.add(c7);
return cases;
}
class ConcurrentJob1 implements Runnable {
public ConcurrentJob1(int requests, long curTimeSlotId, List<ResourceConfig> resourceConfigs,
IncrRequestResult expectResult) {
this.requests = requests;
this.resourceConfigs = resourceConfigs;
this.curTimeSlotId = curTimeSlotId;
this.expectResult = expectResult;
}
private int requests = 0;
private List<ResourceConfig> resourceConfigs;
private long curTimeSlotId = 0;
private IncrRequestResult expectResult;
@Override
public void run() {
for (int i = 0; i < requests; i++) {
IncrRequestResult result = stat.incrRequest(resourceConfigs, curTimeSlotId);
if (result != null && !result.isSuccess()) {
assertEquals(expectResult.getBlockedResourceId(), result.getBlockedResourceId());
assertEquals(expectResult.getBlockType(), result.getBlockType());
}
}
}
}
@Test
public void testIncrRequestResultByResourceChain() throws Throwable {
// concurrent
FlowRuleCase c1 = new FlowRuleCase();
c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global1", 100, 200));
c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service1", 10, 200));
long startTimeSlotId = stat.currentTimeSlotId();
long endTimeSlotId = startTimeSlotId + 1000;
for (int i = 0; i < 10; i++) {
stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
}
IncrRequestResult result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service1", result.getBlockedResourceId());
assertEquals(BlockType.CONCURRENT_REQUEST, result.getBlockType());
stat.addRequestRT(c1.resourceConfigs, startTimeSlotId, 1, true);
result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
assertTrue(result.isSuccess());
// QPS
FlowRuleCase c2 = new FlowRuleCase();
c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global2", 100, 200));
c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service2", 100, 10));
for (int i = 0; i < 10; i++) {
stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
}
result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
assertEquals(BlockType.QPS, result.getBlockType());
stat.addRequestRT(c2.resourceConfigs, startTimeSlotId, 1, true);
result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
assertEquals(BlockType.QPS, result.getBlockType());
}
@Test
public void testIncrRequestByResourceChain() throws Throwable {
// create data
List<FlowRuleCase> cases = createFlowRuleCase();
long startTimeSlotId = stat.currentTimeSlotId();
long endTimeSlotId = startTimeSlotId + 1000;
for (FlowRuleCase c : cases) {
ExecutorService pool = Executors.newFixedThreadPool(c.threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < c.threads; i++) {
pool.submit(new ConcurrentJob1(c.requests, startTimeSlotId, c.resourceConfigs, c.expectResult));
}
pool.shutdown();
if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
System.out.println("testIncrRequestByResourceChain elapsed time: " + (t2 - t1) + "ms for " + c.totalReqs
+ " requests");
for (int i = 0; i < c.resourceConfigs.size(); i++) {
ResourceConfig cfg = c.resourceConfigs.get(i);
ResourceExpect expect = c.resourceExpects.get(i);
TimeWindowStat tws = stat.getTimeWindowStat(cfg.getResourceId(), startTimeSlotId, endTimeSlotId);
assertEquals(expect.concurrents, tws.getPeakConcurrentReqeusts());
assertEquals(expect.QPS, tws.getTotal());
assertEquals(expect.total, tws.getTotal());
assertEquals(expect.blockedReqs, tws.getBlockRequests());
}
} else {
System.out.println("testIncrRequestByResourceChain timeout");
}
startTimeSlotId = startTimeSlotId + 1000;
endTimeSlotId = endTimeSlotId + 1000;
}
}
@Test
public void testPeakConcurrentJob() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
String resourceId = "PeakConcurrentJob";
stat.incrRequest(resourceId, curTimeSlotId, null, null);
Thread.sleep(1200);
TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, nextSlotId + 1000);
assertEquals(1, tws.getPeakConcurrentReqeusts());
}
@Test
public void testIncr() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long slotId = curTimeSlotId + 1000;
String resourceId = "a";
stat.incrRequest(resourceId, curTimeSlotId, null, null);
TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, slotId);
assertEquals(1, tws.getTotal());
stat.incrRequest(resourceId, curTimeSlotId, null, null);
stat.addRequestRT(resourceId, curTimeSlotId, 100, false);
stat.addRequestRT(resourceId, curTimeSlotId, 300, true);
tws = stat.getPreviousSecondStat(resourceId, slotId);
assertEquals(2, tws.getTotal());
assertEquals(200, tws.getAvgRt());
assertEquals(100, tws.getMin());
assertEquals(300, tws.getMax());
assertEquals(2, tws.getRps().intValue());
assertEquals(1, tws.getErrors());
stat.decrConcurrentRequest(resourceId, curTimeSlotId);
Long con = stat.getConcurrentRequests(resourceId);
assertEquals(1, con);
// System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
}
@Test
public void testIncrRequest() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
String resourceId = "b";
Long maxCon = 10l;
Long maxRPS = 20l;
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
long peakCon = tws.getPeakConcurrentReqeusts();
assertEquals(1l, peakCon);
}
@Test
public void testBlockedByMaxCon() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
Long maxCon = 10l;
Long maxRPS = 20l;
int threads = 100;
int requests = 10000;
int totalRequests = threads * requests;
String resourceId = "c";
ExecutorService pool = Executors.newFixedThreadPool(threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
assertEquals(maxCon, tws.getPeakConcurrentReqeusts());
assertEquals(totalRequests - maxCon, tws.getBlockRequests());
System.out.println("testBlockedByMaxCon total elapsed time for " + threads * requests + " requests"
+ (t2 - t1) + "ms");
} else {
System.out.println("testIncrConcurrentRequest timeout");
}
}
@Test
public void testBlockedByMaxRPS() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
Long maxCon = Long.MAX_VALUE;
Long maxRPS = 20l;
int threads = 100;
int requests = 10000;
int totalRequests = threads * requests;
String resourceId = "c";
for (int i = 0; i < maxRPS; i++) {
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
}
ExecutorService pool = Executors.newFixedThreadPool(threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
assertEquals(maxRPS, tws.getRps().intValue());
assertEquals(totalRequests, tws.getBlockRequests());
System.out.println("testIncrConcurrentRequest total elapsed time for " + threads * requests + " requests"
+ (t2 - t1) + "ms");
} else {
System.out.println("testIncrConcurrentRequest timeout");
}
}
@Test
public void testStat() throws Throwable {
// requests per slot per resource
int requests = 100;
int threads = 10;
int resources = 10;
int slots = 100;
long rt = 100;
long t1 = System.currentTimeMillis();
long start = (t1 / FlowStat.INTERVAL) * FlowStat.INTERVAL;
int totalRequests = requests * threads * resources * slots;
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threads; i++) {
pool.submit(new Job(requests, resources, slots, start, rt));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
long end = start + slots * FlowStat.INTERVAL;
long nsecs = (end - start) / 1000;
System.out.println("total requests" + totalRequests);
System.out.println("total elapsed time" + (t2 - t1) + "ms");
System.out.println("Testing Time Window" + (end - start) + "ms");
int resource1 = 1;
int resource2 = 2;
int rtBase1 = 1;
int rtBase3 = 3;
TimeWindowStat tws1 = stat.getTimeWindowStat("resource-" + resource1, start, end);
TimeWindowStat tws2 = stat.getTimeWindowStat("resource-" + resource2, start, end);
assertEquals(totalRequests / resources, tws1.getTotal());
assertEquals(rt * rtBase1, tws1.getAvgRt());
assertEquals(rt * rtBase1, tws1.getMin());
assertEquals(rt * rtBase1, tws1.getMax());
assertEquals(totalRequests / resources / nsecs, tws1.getRps().intValue());
assertEquals(totalRequests / resources / 10, tws1.getErrors().intValue());
System.out.println("RPS of resource1: " + tws1.getRps().intValue());
assertEquals(totalRequests / resources, tws2.getTotal());
assertEquals(rt * rtBase3, tws2.getAvgRt());
assertEquals(rt * rtBase3, tws2.getMin());
assertEquals(rt * rtBase3, tws2.getMax());
assertEquals(totalRequests / resources / nsecs, tws2.getRps().intValue());
assertEquals(totalRequests / resources / 10, tws2.getErrors().intValue());
System.out.println("RPS of resource2: " + tws2.getRps().intValue());
// performance of getTimeWindowStat
for (int n = 0; n < 10; n++) {
long t3 = System.currentTimeMillis();
int times = 100000;
for (int i = 0; i < times; i++) {
stat.getTimeWindowStat("resource-" + resource1, start, end);
}
long t4 = System.currentTimeMillis();
System.out.println("performance of getTimeWindowStat: " + (t4 - t3) + "ms " + times + " times");
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
// System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
List<ResourceTimeWindowStat> list = stat.getResourceTimeWindowStats("resource-" + 1, start, end + 3 * 1000,
10);
assertEquals(nsecs / 10, list.get(0).getWindows().size());
System.out.println(JacksonUtils.writeValueAsString(list));
} else {
System.out.println("timeout");
}
}
class Job implements Runnable {
public Job(int requests, int resources, int slots, long startSlotId, long rt) {
this.requests = requests;
this.resources = resources;
this.slots = slots;
this.startSlotId = startSlotId;
this.rt = rt;
}
private int requests = 0;
private int resources = 0;
private int slots = 0;
private long startSlotId = 0;
private long rt = 0;
@Override
public void run() {
for (int m = 0; m < slots; m++) {
for (int i = 0; i < requests; i++) {
for (int j = 0; j < resources; j++) {
stat.incrRequest("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), null, null);
// 10% error
boolean isSuccess = i % 10 == 1 ? false : true;
// rt will be triple while even
stat.addRequestRT("resource-" + j, startSlotId + (m * FlowStat.INTERVAL),
rt * (j % 2 == 0 ? 3 : 1), isSuccess);
}
try {
// Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
class ConcurrentJob implements Runnable {
public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Long maxCon, Long maxRPS) {
this.requests = requests;
this.resourceId = resourceId;
this.maxRPS = maxRPS;
this.maxCon = maxCon;
this.curTimeSlotId = curTimeSlotId;
}
private int requests = 0;
private String resourceId;
private Long maxCon = 0l;
private Long maxRPS = 0l;
private long curTimeSlotId = 0;
@Override
public void run() {
for (int i = 0; i < requests; i++) {
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
}
}
}
@Test
public void testGetResourceStat() throws Throwable {
int threads = 10;
int requests = 100000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(new GetResourceStatJob(requests));
}
pool.shutdown();
if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("testGetResourceStat done");
} else {
System.out.println("testGetResourceStat timeout");
}
}
class GetResourceStatJob implements Runnable {
public GetResourceStatJob(int requests) {
this.requests = requests;
}
private int requests = 0;
@Override
public void run() {
for (int i = 0; i < requests; i++) {
try {
ResourceStat rs = stat.getResourceStat("" + i);
assertNotNull(rs);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -0,0 +1,165 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats.ratelimit;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.junit.jupiter.api.Test;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.util.Constants;
import we.util.DateTimeUtils;
import we.util.JacksonUtils;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author hongqiaowei
*/
public class RateLimitTests {
private FlowStat flowStat = new FlowStat();
private ConnectionProvider getConnectionProvider() {
return ConnectionProvider
.builder("flow-control-cp")
.maxConnections(500)
.pendingAcquireTimeout(Duration.ofMillis(6_000))
.maxIdleTime(Duration.ofMillis(40_000))
.build();
}
private LoopResources getLoopResources() {
LoopResources lr = LoopResources.create("flow-control-el", Runtime.getRuntime().availableProcessors(), true);
lr.onServer(false);
return lr;
}
private ReactorResourceFactory reactorResourceFactory() {
ReactorResourceFactory fact = new ReactorResourceFactory();
fact.setUseGlobalResources(false);
fact.setConnectionProvider(getConnectionProvider());
fact.setLoopResources(getLoopResources());
fact.afterPropertiesSet();
return fact;
}
private WebClient getWebClient() {
ConnectionProvider cp = getConnectionProvider();
LoopResources lr = getLoopResources();
HttpClient httpClient = HttpClient.create(cp).compress(false).tcpConfiguration(
tcpClient -> {
return tcpClient.runOn(lr, false)
.doOnConnected(
connection -> {
connection.addHandlerLast(new ReadTimeoutHandler( 20_000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler( 20_000, TimeUnit.MILLISECONDS));
}
)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 20_000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
}
);
return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build())
.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
@Test
public void flowControlTests() throws InterruptedException {
WebClient webClient = getWebClient();
for (int i = 0; i < 0; i++) {
// String uri = "http://12.5.3.8:8600/proxy/fizz" + i + "/ftrol/mock";
String uri = "http://12.5.3.8:8600/proxy/fizz/ftrol/mock" + i;
System.err.println(i);
webClient
.method(HttpMethod.GET)
.uri(uri)
.headers(hdrs -> {})
.body(Mono.just(""), String.class)
.exchange().name("")
.doOnRequest(l -> {})
.doOnSuccess(r -> {})
.doOnError(t -> {
t.printStackTrace();
})
.timeout(Duration.ofMillis(16_000))
.flatMap(
remoteResp -> {
remoteResp.bodyToMono(String.class)
.doOnSuccess(
s -> {
// System.out.println(s);
}
);
return Mono.empty();
}
)
.subscribe()
;
}
// Thread.currentThread().join();
}
@Test
public void test() {
FlowStat flowStat = new FlowStat();
long incrTime = DateTimeUtils.toMillis("2021-01-08 21:28:42.000", Constants.DatetimePattern.DP23);
boolean success = flowStat.incrRequest("resourceX", incrTime, Long.MAX_VALUE, Long.MAX_VALUE);
System.err.println("incrTime: " + incrTime + ", success: " + success);
long startTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:41.000", Constants.DatetimePattern.DP23);
long endTimeSlot = DateTimeUtils.toMillis("2021-01-08 21:28:44.000", Constants.DatetimePattern.DP23);
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, endTimeSlot, 3);
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
System.err.println(toDP19(startTimeSlot) + " - " + toDP19(endTimeSlot) + " no flow stat data");
} else {
System.err.println(JacksonUtils.writeValueAsString(resourceTimeWindowStats));
}
}
private String toDP19(long mills) {
return DateTimeUtils.toDate(mills, Constants.DatetimePattern.DP19);
}
@Test
public void test0() {
FlowStat flowStat = new FlowStat();
boolean success = flowStat.incrRequest("resourceX", 1610181704000l, Long.MAX_VALUE, Long.MAX_VALUE);
List<ResourceTimeWindowStat> r = flowStat.getResourceTimeWindowStats("resourceX", 1610181681000l, 1610181711000l, 30);
System.err.println("r: " + JacksonUtils.writeValueAsString(r));
}
}

View File

@@ -0,0 +1,72 @@
package we.stats.ratelimit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.event.annotation.BeforeTestMethod;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.ReflectionUtils;
import we.redis.RedisProperties;
import we.redis.RedisServerConfiguration;
import we.redis.RedisTemplateConfiguration;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* @author hongqiaowei
*/
@TestPropertySource("/application.properties")
@SpringJUnitConfig(classes = {RedisProperties.class, RedisTemplateConfiguration.class, RedisServerConfiguration.class})
// @ActiveProfiles("unittest")
public class ResourceRateLimitConfigServiceTests {
@Resource
RedisProperties redisProperties;
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
ReactiveStringRedisTemplate reactiveStringRedisTemplate;
ResourceRateLimitConfigService resourceRateLimitConfigService;
@BeforeEach
void beforeEach() throws NoSuchFieldException {
resourceRateLimitConfigService = new ResourceRateLimitConfigService();
// Field rt = ResourceRateLimitConfigService.class.getField("rt");
// ReflectionUtils.makeAccessible(rt);
// ReflectionUtils.setField(rt, resourceRateLimitConfigService, reactiveStringRedisTemplate);
resourceRateLimitConfigService.setReactiveStringRedisTemplate( reactiveStringRedisTemplate);
}
@Test
void initTest() throws Throwable {
// System.err.println(redisProperties);
// System.err.println(stringRedisTemplate);
// System.err.println(reactiveStringRedisTemplate);
// stringRedisTemplate.opsForValue().set("name", "F-22");
// Thread.sleep(2000);
// String name = stringRedisTemplate.opsForValue().get("name");
// assertEquals(name, "F-22");
// System.err.println(name);
// stringRedisTemplate.opsForHash().put("fizz_rate_limit", "2", "{\"concurrents\":100,\"enable\":1,\"id\":2,\"isDeleted\":0,\"resource\":\"service_default\",\"type\":2}");
// resourceRateLimitConfigService.init();
// ResourceRateLimitConfig resourceRateLimitConfig = resourceRateLimitConfigService.getResourceRateLimitConfig("service_default");
//
// System.err.println(resourceRateLimitConfig);
// System.err.println("init test end");
// Thread.currentThread().join();
}
}

View File

@@ -0,0 +1,20 @@
package we.stats.ratelimit;
import org.junit.jupiter.api.Test;
import we.util.JacksonUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* @author hongqiaowei
*/
public class ResourceRateLimitConfigTests {
@Test
void test() {
String resourceRateLimitConfigJson = "{\"concurrents\":1000,\"enable\":1,\"id\":1,\"isDeleted\":0,\"qps\":500,\"resource\":\"_global\",\"responseContent\":\"{\\\"msg\\\":\\\"rate limit, please try again\\\"}\",\"responseType\":\"application/json; charset=UTF-8\",\"type\":1}";
ResourceRateLimitConfig resourceRateLimitConfig = JacksonUtils.readValue(resourceRateLimitConfigJson, ResourceRateLimitConfig.class);
assertEquals("application/json; charset=UTF-8", resourceRateLimitConfig.responseType);
}
}

View File

@@ -0,0 +1,5 @@
# author: hongqiaowei
embeded.redis.host = localhost
embeded.redis.port = 6379
embeded.redis.database = 4