2021-01-05 17:26:10 +08:00
|
|
|
/*
|
|
|
|
|
* Copyright (C) 2021 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
|
* any later version.
|
|
|
|
|
*
|
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
|
*
|
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
*/
|
2021-01-07 15:04:50 +08:00
|
|
|
|
2020-12-31 17:34:14 +08:00
|
|
|
package we.config;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.nacos.api.config.annotation.NacosValue;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
import org.springframework.context.annotation.DependsOn;
|
2021-01-06 18:29:18 +08:00
|
|
|
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
2020-12-31 17:34:14 +08:00
|
|
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
|
import we.filter.FlowControlFilter;
|
2021-01-06 18:29:18 +08:00
|
|
|
import we.flume.clients.log4j2appender.LogService;
|
2020-12-31 17:34:14 +08:00
|
|
|
import we.stats.FlowStat;
|
2021-01-06 18:29:18 +08:00
|
|
|
import we.stats.ResourceTimeWindowStat;
|
|
|
|
|
import we.stats.TimeWindowStat;
|
|
|
|
|
import we.stats.ratelimit.ResourceRateLimitConfig;
|
|
|
|
|
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
2021-01-09 11:12:42 +08:00
|
|
|
import we.util.Constants;
|
|
|
|
|
import we.util.DateTimeUtils;
|
|
|
|
|
import we.util.NetworkUtils;
|
|
|
|
|
import we.util.ThreadContext;
|
2020-12-31 17:34:14 +08:00
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
2021-01-06 18:29:18 +08:00
|
|
|
import java.math.BigDecimal;
|
2021-01-11 19:03:08 +08:00
|
|
|
import java.util.HashMap;
|
2021-01-06 18:29:18 +08:00
|
|
|
import java.util.List;
|
2021-01-11 19:03:08 +08:00
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2020-12-31 17:34:14 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author hongqiaowei
|
|
|
|
|
*/
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
@Configuration
|
|
|
|
|
// @ConditionalOnProperty(name="flowControl",havingValue = "true")
|
|
|
|
|
@DependsOn(FlowControlFilter.FLOW_CONTROL_FILTER)
|
|
|
|
|
@EnableScheduling
|
2021-01-11 19:39:57 +08:00
|
|
|
// @ConfigurationProperties(prefix = "flow-stat-sched")
|
2020-12-31 17:34:14 +08:00
|
|
|
public class FlowStatSchedConfig extends SchedConfig {
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
|
|
|
|
|
|
|
|
|
|
private static final String _ip = "\"ip\":";
|
|
|
|
|
private static final String _id = "\"id\":";
|
|
|
|
|
private static final String _resource = "\"resource\":";
|
|
|
|
|
private static final String _type = "\"type\":";
|
|
|
|
|
private static final String _start = "\"start\":";
|
|
|
|
|
private static final String _reqs = "\"reqs\":";
|
|
|
|
|
private static final String _completeReqs = "\"completeReqs\":";
|
|
|
|
|
private static final String _peakConcurrents = "\"peakConcurrents\":";
|
|
|
|
|
private static final String _reqPerSec = "\"reqPerSec\":";
|
|
|
|
|
private static final String _blockReqs = "\"blockReqs\":";
|
2021-01-11 19:03:08 +08:00
|
|
|
private static final String _totalBlockReqs = "\"totalBlockReqs\":";
|
2021-01-06 18:29:18 +08:00
|
|
|
private static final String _errors = "\"errors\":";
|
|
|
|
|
private static final String _avgRespTime = "\"avgRespTime\":";
|
|
|
|
|
private static final String _minRespTime = "\"minRespTime\":";
|
|
|
|
|
private static final String _maxRespTime = "\"maxRespTime\":";
|
|
|
|
|
|
|
|
|
|
@NacosValue(value = "${flowControl:false}", autoRefreshed = true)
|
|
|
|
|
@Value("${flowControl:false}")
|
|
|
|
|
private boolean flowControl;
|
2020-12-31 17:34:14 +08:00
|
|
|
|
|
|
|
|
@Resource(name = FlowControlFilter.FLOW_CONTROL_FILTER)
|
|
|
|
|
private FlowControlFilter flowControlFilter;
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
@Resource
|
|
|
|
|
private ResourceRateLimitConfigService resourceRateLimitConfigService;
|
|
|
|
|
|
2020-12-31 17:34:14 +08:00
|
|
|
@NacosValue(value = "${flow-stat-sched.dest:redis}", autoRefreshed = true)
|
|
|
|
|
@Value("${flow-stat-sched.dest:redis}")
|
|
|
|
|
private String dest;
|
|
|
|
|
|
|
|
|
|
@NacosValue(value = "${flow-stat-sched.queue:fizz_resource_access_stat}", autoRefreshed = true)
|
|
|
|
|
@Value("${flow-stat-sched.queue:fizz_resource_access_stat}")
|
|
|
|
|
private String queue;
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
|
|
|
|
private ReactiveStringRedisTemplate rt;
|
|
|
|
|
|
2021-01-09 11:12:42 +08:00
|
|
|
private final String ip = NetworkUtils.getServerIp();
|
2021-01-06 18:29:18 +08:00
|
|
|
|
2021-01-09 19:00:48 +08:00
|
|
|
private long startTimeSlot = 0;
|
|
|
|
|
|
2021-01-12 15:16:35 +08:00
|
|
|
private Map<String, AtomicLong> key2totalBlockMap = new HashMap<>();
|
|
|
|
|
|
2020-12-31 17:34:14 +08:00
|
|
|
@Scheduled(cron = "${flow-stat-sched.cron}")
|
|
|
|
|
public void sched() {
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
if (!flowControl) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
2020-12-31 17:34:14 +08:00
|
|
|
FlowStat flowStat = flowControlFilter.getFlowStat();
|
2021-01-09 19:00:48 +08:00
|
|
|
if (startTimeSlot == 0) {
|
|
|
|
|
startTimeSlot = getRecentEndTimeSlot(flowStat);
|
|
|
|
|
return;
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
2021-01-09 19:00:48 +08:00
|
|
|
long st = System.currentTimeMillis();
|
|
|
|
|
long recentEndTimeSlot = getRecentEndTimeSlot(flowStat);
|
2021-01-09 11:12:42 +08:00
|
|
|
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, recentEndTimeSlot, 10);
|
2021-01-07 15:04:50 +08:00
|
|
|
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
|
2021-01-09 11:12:42 +08:00
|
|
|
log.info(toDP19(startTimeSlot) + " - " + toDP19(recentEndTimeSlot) + " no flow stat data");
|
2021-01-07 15:04:50 +08:00
|
|
|
return;
|
|
|
|
|
}
|
2021-01-09 11:12:42 +08:00
|
|
|
|
2021-01-12 15:16:35 +08:00
|
|
|
key2totalBlockMap.clear();
|
2021-01-11 19:03:08 +08:00
|
|
|
resourceTimeWindowStats.forEach(rtws -> {
|
|
|
|
|
List<TimeWindowStat> wins = rtws.getWindows();
|
|
|
|
|
wins.forEach(w -> {
|
2021-01-12 15:16:35 +08:00
|
|
|
AtomicLong totalBlock = key2totalBlockMap.computeIfAbsent(String.format("%s%s",
|
2021-01-11 19:03:08 +08:00
|
|
|
ResourceRateLimitConfig.GLOBAL, w.getStartTime()), key -> new AtomicLong(0));
|
|
|
|
|
totalBlock.addAndGet(w.getBlockRequests());
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
2021-01-07 15:04:50 +08:00
|
|
|
resourceTimeWindowStats.forEach(
|
2021-01-06 18:29:18 +08:00
|
|
|
rtws -> {
|
|
|
|
|
String resource = rtws.getResourceId();
|
2021-01-09 14:32:20 +08:00
|
|
|
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
|
|
|
|
|
int id = (config == null ? 0 : config.id);
|
2021-01-09 11:12:42 +08:00
|
|
|
int type;
|
|
|
|
|
if (ResourceRateLimitConfig.GLOBAL.equals(resource)) {
|
|
|
|
|
type = ResourceRateLimitConfig.Type.GLOBAL;
|
|
|
|
|
} else if (resource.charAt(0) == '/') {
|
|
|
|
|
type = ResourceRateLimitConfig.Type.API;
|
2021-01-06 18:29:18 +08:00
|
|
|
} else {
|
2021-01-09 11:12:42 +08:00
|
|
|
type = ResourceRateLimitConfig.Type.SERVICE;
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
2021-01-09 11:12:42 +08:00
|
|
|
List<TimeWindowStat> wins = rtws.getWindows();
|
|
|
|
|
wins.forEach(
|
|
|
|
|
w -> {
|
|
|
|
|
StringBuilder b = ThreadContext.getStringBuilder();
|
|
|
|
|
Long winStart = w.getStartTime();
|
|
|
|
|
BigDecimal rps = w.getRps();
|
|
|
|
|
double qps;
|
|
|
|
|
if (rps == null) {
|
|
|
|
|
qps = 0.00;
|
2021-01-06 18:29:18 +08:00
|
|
|
} else {
|
2021-01-09 11:12:42 +08:00
|
|
|
qps = rps.doubleValue();
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
2021-01-11 19:03:08 +08:00
|
|
|
|
2021-01-12 15:16:35 +08:00
|
|
|
AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
|
2021-01-11 19:03:08 +08:00
|
|
|
Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
|
|
|
|
|
|
2021-01-09 11:12:42 +08:00
|
|
|
b.append(Constants.Symbol.LEFT_BRACE);
|
|
|
|
|
b.append(_ip); toJsonStringValue(b, ip); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_id); b.append(id); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_resource); toJsonStringValue(b, resource); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_type); b.append(type); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_start); b.append(winStart); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_reqs); b.append(w.getTotal()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_completeReqs); b.append(w.getCompReqs()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_peakConcurrents); b.append(w.getPeakConcurrentReqeusts()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_reqPerSec); b.append(qps); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_blockReqs); b.append(w.getBlockRequests()); b.append(Constants.Symbol.COMMA);
|
2021-01-11 19:03:08 +08:00
|
|
|
b.append(_totalBlockReqs); b.append(totalBlockReqs); b.append(Constants.Symbol.COMMA);
|
2021-01-09 11:12:42 +08:00
|
|
|
b.append(_errors); b.append(w.getErrors()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_avgRespTime); b.append(w.getAvgRt()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_maxRespTime); b.append(w.getMax()); b.append(Constants.Symbol.COMMA);
|
|
|
|
|
b.append(_minRespTime); b.append(w.getMin());
|
|
|
|
|
b.append(Constants.Symbol.RIGHT_BRACE);
|
|
|
|
|
String msg = b.toString();
|
2021-01-13 14:31:24 +08:00
|
|
|
if ("kafka".equals(dest)) { // for internal use
|
2021-01-12 15:16:35 +08:00
|
|
|
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
|
2021-01-09 11:12:42 +08:00
|
|
|
} else {
|
|
|
|
|
rt.convertAndSend(queue, msg).subscribe();
|
|
|
|
|
}
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
log.debug("report " + toDP19(winStart) + " win10: " + msg);
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
|
|
|
|
}
|
2021-01-09 11:12:42 +08:00
|
|
|
);
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
|
|
|
|
);
|
2021-01-09 19:00:48 +08:00
|
|
|
|
|
|
|
|
startTimeSlot = recentEndTimeSlot;
|
|
|
|
|
log.info(toDP23(st) + " fss " + toDP23(System.currentTimeMillis()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private long getRecentEndTimeSlot(FlowStat flowStat) {
|
|
|
|
|
long currentTimeSlot = flowStat.currentTimeSlotId();
|
|
|
|
|
int second = DateTimeUtils.from(currentTimeSlot).getSecond();
|
|
|
|
|
long interval;
|
|
|
|
|
if (second > 49) {
|
|
|
|
|
interval = second - 50;
|
|
|
|
|
} else if (second > 39) {
|
|
|
|
|
interval = second - 40;
|
|
|
|
|
} else if (second > 29) {
|
|
|
|
|
interval = second - 30;
|
|
|
|
|
} else if (second > 19) {
|
|
|
|
|
interval = second - 20;
|
|
|
|
|
} else if (second > 9) {
|
|
|
|
|
interval = second - 10;
|
|
|
|
|
} else if (second > 0) {
|
|
|
|
|
interval = second - 0;
|
|
|
|
|
} else {
|
|
|
|
|
interval = 0;
|
|
|
|
|
}
|
|
|
|
|
long recentEndTimeSlot = currentTimeSlot - interval * 1000;
|
|
|
|
|
return recentEndTimeSlot;
|
2021-01-06 18:29:18 +08:00
|
|
|
}
|
|
|
|
|
|
2021-01-08 17:51:07 +08:00
|
|
|
private String toDP19(long startTimeSlot) {
|
|
|
|
|
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
|
|
|
|
|
}
|
|
|
|
|
|
2021-01-09 19:00:48 +08:00
|
|
|
private String toDP23(long startTimeSlot) {
|
|
|
|
|
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP23);
|
|
|
|
|
}
|
|
|
|
|
|
2021-01-06 18:29:18 +08:00
|
|
|
private static void toJsonStringValue(StringBuilder b, String value) {
|
|
|
|
|
b.append(Constants.Symbol.DOUBLE_QUOTE).append(value).append(Constants.Symbol.DOUBLE_QUOTE);
|
2020-12-31 17:34:14 +08:00
|
|
|
}
|
|
|
|
|
}
|