Merge branch 'feature/flow'

This commit is contained in:
Francis Dong
2021-01-05 17:22:47 +08:00
6 changed files with 455 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
package we.config;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import we.filter.FlowControlFilter;
import we.stats.FlowStat;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
// @DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER)
// @Configuration
// @EnableScheduling
// @ConfigurationProperties(prefix = "flow-stat-sched")
public class FlowStatSchedConfig extends SchedConfig {
protected static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
@Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
private FlowControlFilter flowControlFilter;
@NacosValue(value = "${flow-stat-sched.dest:redis}", autoRefreshed = true)
@Value("${flow-stat-sched.dest:redis}")
private String dest;
@NacosValue(value = "${flow-stat-sched.queue:fizz_resource_access_stat}", autoRefreshed = true)
@Value("${flow-stat-sched.queue:fizz_resource_access_stat}")
private String queue;
@Scheduled(cron = "${flow-stat-sched.cron}")
public void sched() {
// System.err.println("now: " + LocalDateTime.now());
FlowStat flowStat = flowControlFilter.getFlowStat();
// TODO: rpt data
}
}

View File

@@ -0,0 +1,43 @@
package we.config;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
/**
* @author hongqiaowei
*/
public abstract class SchedConfig implements SchedulingConfigurer {
private int executors;
public void setExecutors(int es) {
executors = es;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
taskRegistrar.addTriggerTask(new Runnable() {
public void run() {
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return null;
}
});
}
@Bean(destroyMethod = "shutdown")
public Executor taskScheduler() {
return Executors.newScheduledThreadPool(executors);
}
}

View File

@@ -32,6 +32,7 @@ import reactor.core.publisher.Mono;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
import we.plugin.auth.GatewayGroupService;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.JacksonUtils;
import javax.annotation.Resource;
@@ -52,6 +53,9 @@ public class HealthController {
@Resource
private ApiConfigService apiConfigService;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
// add by hongqiaowei
@GetMapping("/sysgc")
public Mono<String> sysgc(ServerWebExchange exchange) throws Exception {
@@ -78,4 +82,9 @@ public class HealthController {
public Mono<String> apiConfigs(ServerWebExchange exchange) throws Exception {
return Mono.just(JacksonUtils.writeValueAsString(apiConfigService.serviceConfigMap));
}
@GetMapping("/resourceRateLimitConfigs")
public Mono<String> resourceRateLimitConfigs(ServerWebExchange exchange) throws Exception {
return Mono.just(JacksonUtils.writeValueAsString(resourceRateLimitConfigService.getResourceRateLimitConfigMap()));
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.filter;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
/**
* @author hongqiaowei
*/
@Component(FlowControlFilter.FLOW_CONTROL_FILTER)
@Order(-1)
public class FlowControlFilter extends ProxyAggrFilter {
public static final String FLOW_CONTROL_FILTER = "flowControlFilter";
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
private static final String exceed = " exceed ";
private static final String concurrents = " concurrents ";
private static final String orQps = " or qps ";
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
@Value("${flowControl:false}")
private boolean flowControl;
@Resource
private ResourceRateLimitConfigService resourceRateLimitConfigService;
private FlowStat flowStat = new FlowStat();
public FlowStat getFlowStat() {
return flowStat;
}
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
if (flowControl) {
long currentTimeSlot = flowStat.currentTimeSlotId();
ResourceRateLimitConfig rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.GLOBAL);
ResourceRateLimitConfig globalConfig = rlc;
boolean concurrentOrRpsExceed = false;
if (rlc.isEnable()) {
concurrentOrRpsExceed = !flowStat.incrRequest(rlc.resource, currentTimeSlot, rlc.concurrents, rlc.qps);
}
if (!concurrentOrRpsExceed) {
String reqPath = WebUtils.getClientReqPath(exchange);
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(reqPath);
if (rlc == null) {
String service = WebUtils.getClientService(exchange);
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(service);
if (rlc == null) {
rlc = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceRateLimitConfig.SERVICE_DEFAULT);
if (rlc == null || !rlc.isEnable()) {
} else {
concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
}
} else {
concurrentOrRpsExceed = !flowStat.incrRequest(service, currentTimeSlot, rlc.concurrents, rlc.qps);
}
} else { // should not reach here for now
concurrentOrRpsExceed = !flowStat.incrRequest(reqPath, currentTimeSlot, rlc.concurrents, rlc.qps);
}
}
if (concurrentOrRpsExceed) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(WebUtils.getClientService(exchange)).append(Constants.Symbol.SPACE).append(WebUtils.getClientReqPath(exchange));
b.append(exceed).append(rlc.resource).append(concurrents).append(rlc.concurrents).append(orQps).append(rlc.qps);
log.warn(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
ServerHttpResponse resp = exchange.getResponse();
resp.setStatusCode(HttpStatus.OK);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, globalConfig.responseType);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(globalConfig.responseContent.getBytes())));
} else {
long start = System.currentTimeMillis();
ResourceRateLimitConfig rlcCopy = rlc;
return chain.filter(exchange)
.doAfterTerminate(
() -> {
inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, true);
}
)
.doOnError(
throwable -> {
inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, false);
}
);
}
}
return chain.filter(exchange);
}
private void inTheEnd(long start, ResourceRateLimitConfig globalConfig, ResourceRateLimitConfig apiOrServiceConfig, long currentTimeSlot, boolean success) {
long spend = System.currentTimeMillis() - start;
if (globalConfig.isEnable()) {
flowStat.decrConcurrentRequest(globalConfig.resource, currentTimeSlot);
flowStat.addRequestRT(globalConfig.resource, currentTimeSlot, spend, success);
}
if (globalConfig != apiOrServiceConfig) {
flowStat.decrConcurrentRequest(apiOrServiceConfig.resource, currentTimeSlot);
flowStat.addRequestRT(apiOrServiceConfig.resource, currentTimeSlot, spend, success);
}
}
}

View File

@@ -0,0 +1,62 @@
package we.stats.ratelimit;
import we.util.JacksonUtils;
/**
* @author hongqiaowei
*/
public class ResourceRateLimitConfig {
public static interface Type {
static final byte GLOBAL = 1;
static final byte SERVICE_DEFAULT = 2;
static final byte SERVICE = 3;
static final byte API = 4;
}
public static final int DELETED = 1;
public static final String GLOBAL = "_global";
public static final String SERVICE_DEFAULT = "service_default";
private static final int ENABLE = 1;
private static final int UNABLE = 0;
public int isDeleted = 0;
public int id;
private boolean enable = true;
public String resource;
public byte type;
public long qps;
public long concurrents;
public String responseType;
public String responseContent;
public boolean isEnable() {
return enable;
}
public void setEnable(int v) {
if (v == ENABLE) {
enable = true;
} else {
enable = false;
}
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -0,0 +1,147 @@
package we.stats.ratelimit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.App;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author hongqiaowei
*/
@Service
public class ResourceRateLimitConfigService {
private static final Logger log = LoggerFactory.getLogger(ResourceRateLimitConfigService.class);
private static final String fizzRateLimit = "fizz_rate_limit";
private static final String fizzRateLimitChannel = "fizz_rate_limit_channel";
private Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMap = new HashMap<>(32);
private Map<Integer, ResourceRateLimitConfig> oldResourceRateLimitConfigMap = new HashMap<>(32);
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
@PostConstruct
public void init() throws Throwable {
final Throwable[] throwable = new Throwable[1];
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzRateLimit)
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
log.info(null, t);
})
.concatMap(e -> {
Object k = e.getKey();
if (k == ReactorUtils.OBJ) {
return Flux.just(e);
}
Object v = e.getValue();
log.info("rateLimitConfig: " + v.toString(), LogService.BIZ_ID, k.toString());
String json = (String) v;
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
updateResourceRateLimitConfigMap(rrlc);
return Flux.just(e);
} catch (Throwable t) {
throwable[0] = t;
log.info(json, t);
return Flux.error(t);
}
}).blockLast())).flatMap(
e -> {
if (throwable[0] != null) {
return Mono.error(throwable[0]);
}
return lsnResourceRateLimitConfigChange();
}
).block();
if (error != ReactorUtils.EMPTY_THROWABLE) {
throw error;
}
}
private Mono<Throwable> lsnResourceRateLimitConfigChange() {
final Throwable[] throwable = new Throwable[1];
final boolean[] b = {false};
rt.listenToChannel(fizzRateLimitChannel).doOnError(t -> {
throwable[0] = t;
b[0] = false;
log.error("lsn " + fizzRateLimitChannel, t);
}).doOnSubscribe(
s -> {
b[0] = true;
log.info("success to lsn on " + fizzRateLimitChannel);
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) {
resourceRateLimitConfigMap.remove(r.resource);
}
updateResourceRateLimitConfigMap(rrlc);
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) {
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
}
} catch (Throwable t) {
log.info(json, t);
}
}).subscribe();
Throwable t = throwable[0];
while (!b[0]) {
if (t != null) {
return Mono.error(t);
} else {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
return Mono.error(e);
}
}
}
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
}
private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc) {
if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) {
ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource);
log.info("remove " + removedRrlc);
} else {
ResourceRateLimitConfig existRrlc = resourceRateLimitConfigMap.get(rrlc.resource);
resourceRateLimitConfigMap.put(rrlc.resource, rrlc);
if (existRrlc == null) {
log.info("add " + rrlc);
} else {
log.info("update " + existRrlc + " with " + rrlc);
}
}
}
public ResourceRateLimitConfig getResourceRateLimitConfig(String resource) {
return resourceRateLimitConfigMap.get(resource);
}
public Map<String, ResourceRateLimitConfig> getResourceRateLimitConfigMap() {
return resourceRateLimitConfigMap;
}
}