Support redis cluster

This commit is contained in:
hongqiaowei
2022-07-09 15:33:16 +08:00
parent b108282ea1
commit d3c9054638
4 changed files with 147 additions and 149 deletions

View File

@@ -8,7 +8,6 @@
</properties> </properties>
<Appenders> <Appenders>
<Console name="Console" target="SYSTEM_OUT"> <Console name="Console" target="SYSTEM_OUT">
<!--<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %level %logger{2} - %msg{nolookups}%n"/>-->
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %level %logger{2} - %X{traceId} %msg{nolookups}%n"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %level %logger{2} - %X{traceId} %msg{nolookups}%n"/>
</Console> </Console>
<LogSend name="LogSend"> <LogSend name="LogSend">

View File

@@ -1,84 +1,84 @@
/* ///*
* Copyright (C) 2021 the original author or authors. // * Copyright (C) 2021 the original author or authors.
* // *
* This program is free software: you can redistribute it and/or modify // * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by // * it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or // * the Free Software Foundation, either version 3 of the License, or
* any later version. // * any later version.
* // *
* This program is distributed in the hope that it will be useful, // * This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of // * but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details. // * GNU General Public License for more details.
* // *
* You should have received a copy of the GNU General Public License // * You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. // * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ // */
//
package we.plugin.stat; //package we.plugin.stat;
//
import org.slf4j.Logger; //import org.slf4j.Logger;
import org.slf4j.LoggerFactory; //import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; //import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import we.config.AggregateRedisConfig; //import we.config.AggregateRedisConfig;
import we.config.SchedConfig; //import we.config.SchedConfig;
import we.util.Consts; //import we.util.Consts;
import we.util.DateTimeUtils; //import we.util.DateTimeUtils;
import we.util.StringUtils; //import we.util.StringUtils;
//
import javax.annotation.Resource; //import javax.annotation.Resource;
import java.util.Map; //import java.util.Map;
//
/** ///**
* @author hongqiaowei // * @author hongqiaowei
*/ // */
//
@Configuration //@Configuration
public class AccessStatSchedConfig extends SchedConfig { //public class AccessStatSchedConfig extends SchedConfig {
//
private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class); // private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class);
//
private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat"); // private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat");
//
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) // @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt; // private ReactiveStringRedisTemplate rt;
//
@Resource // @Resource
private StatPluginFilterProperties statPluginFilterProperties; // private StatPluginFilterProperties statPluginFilterProperties;
//
@Resource // @Resource
private StatPluginFilter statPluginFilter; // private StatPluginFilter statPluginFilter;
//
@Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}") // @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}")
public void sched() { // public void sched() {
long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2); // long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2);
Map<String, AccessStat> accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart); // Map<String, AccessStat> accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart);
//
if (accessStatMap.isEmpty()) { // if (accessStatMap.isEmpty()) {
if (LOGGER.isDebugEnabled()) { // if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19)); // LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19));
} // }
} else { // } else {
accessStatMap.forEach( // accessStatMap.forEach(
(smp, accessStat) -> { // (smp, accessStat) -> {
String msg = accessStat.toString(); // String msg = accessStat.toString();
String topic = statPluginFilterProperties.getFizzAccessStatTopic(); // String topic = statPluginFilterProperties.getFizzAccessStatTopic();
if (StringUtils.isBlank(topic)) { // if (StringUtils.isBlank(topic)) {
String channel = statPluginFilterProperties.getFizzAccessStatChannel(); // String channel = statPluginFilterProperties.getFizzAccessStatChannel();
rt.convertAndSend(channel, msg).subscribe(); // rt.convertAndSend(channel, msg).subscribe();
if (LOGGER.isDebugEnabled()) { // if (LOGGER.isDebugEnabled()) {
LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel); // LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel);
} // }
} else { // } else {
STAT_LOGGER.info(msg); // STAT_LOGGER.info(msg);
if (LOGGER.isDebugEnabled()) { // if (LOGGER.isDebugEnabled()) {
LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19)); // LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19));
} // }
} // }
} // }
); // );
} // }
} // }
} //}

View File

@@ -17,20 +17,21 @@
package we.plugin.stat; package we.plugin.stat;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.plugin.PluginFilter; import we.plugin.PluginFilter;
import we.plugin.auth.GatewayGroupService;
import we.util.Consts; import we.util.Consts;
import we.util.DateTimeUtils;
import we.util.ThreadContext; import we.util.ThreadContext;
import we.util.WebUtils; import we.util.WebUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
/** /**
@@ -41,84 +42,82 @@ import java.util.Map;
@Component(StatPluginFilter.STAT_PLUGIN_FILTER) @Component(StatPluginFilter.STAT_PLUGIN_FILTER)
public class StatPluginFilter extends PluginFilter { public class StatPluginFilter extends PluginFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(StatPluginFilter.class); private static final Logger log = LoggerFactory.getLogger("stat");
public static final String STAT_PLUGIN_FILTER = "statPlugin"; public static final String STAT_PLUGIN_FILTER = "statPlugin";
private static final String ip = "\"ip\":";
private static final String gatewayGroup = "\"gatewayGroup\":";
private static final String service = "\"service\":";
private static final String appid = "\"appid\":";
private static final String apiMethod = "\"apiMethod\":";
private static final String apiPath = "\"apiPath\":";
private static final String reqTime = "\"reqTime\":";
@Resource @Resource
private StatPluginFilterProperties statPluginFilterProperties; private StatPluginFilterProperties statPluginFilterProperties;
private Map<Long/*thread id*/, @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
Map/*LinkedHashMap*/<Long/*time win start*/, private ReactiveStringRedisTemplate rt;
Map<String/*service+apiMethod+apiPath*/, AccessStat>
> @Resource
> private GatewayGroupService gatewayGroupService;
threadTimeWinAccessStatMap = new HashMap<>();
@Override @Override
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) { public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
if (statPluginFilterProperties.isStatOpen()) { if (statPluginFilterProperties.isStatOpen()) {
long tid = Thread.currentThread().getId(); StringBuilder b = ThreadContext.getStringBuilder();
Map<Long, Map<String, AccessStat>> timeWinAccessStatMap = threadTimeWinAccessStatMap.get(tid); b.append(Consts.S.LEFT_BRACE);
if (timeWinAccessStatMap == null) { b.append(ip); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Consts.S.COMMA);
timeWinAccessStatMap = new LinkedHashMap<Long, Map<String, AccessStat>>(4, 1) { b.append(gatewayGroup); toJsonStringValue(b, currentGatewayGroups()); b.append(Consts.S.COMMA);
@Override b.append(service); toJsonStringValue(b, WebUtils.getClientService(exchange)); b.append(Consts.S.COMMA);
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > 2; String appId = WebUtils.getAppId(exchange);
} if (appId != null) {
}; b.append(appid); toJsonStringValue(b, appId); b.append(Consts.S.COMMA);
threadTimeWinAccessStatMap.put(tid, timeWinAccessStatMap);
} }
long currentTimeWinStart = DateTimeUtils.get10sTimeWinStart(1); b.append(apiMethod); toJsonStringValue(b, exchange.getRequest().getMethodValue()); b.append(Consts.S.COMMA);
Map<String, AccessStat> accessStatMap = timeWinAccessStatMap.computeIfAbsent(currentTimeWinStart, k -> new HashMap<>(128)); b.append(apiPath); toJsonStringValue(b, WebUtils.getClientReqPath(exchange)); b.append(Consts.S.COMMA);
b.append(reqTime) .append(System.currentTimeMillis());
b.append(Consts.S.RIGHT_BRACE);
String service = WebUtils.getClientService(exchange); if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) {
String method = exchange.getRequest().getMethodValue(); rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe();
String path = WebUtils.getClientReqPath(exchange); } else {
String key = ThreadContext.getStringBuilder().append(service).append(method).append(path).toString(); // log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use
AccessStat accessStat = accessStatMap.get(key); log.info(b.toString());
if (accessStat == null) {
accessStat = new AccessStat();
accessStat.service = service;
accessStat.apiMethod = method;
accessStat.apiPath = path;
accessStatMap.put(key, accessStat);
}
accessStat.reqTime = System.currentTimeMillis();
accessStat.reqs++;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19));
} }
} }
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, STAT_PLUGIN_FILTER, null); return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, STAT_PLUGIN_FILTER, null);
} }
public Map<String, AccessStat> getAccessStat(long timeWinStart) { private String currentGatewayGroups() {
Map<String, AccessStat> result = ThreadContext.getHashMap(); int sz = gatewayGroupService.currentGatewayGroupSet.size();
threadTimeWinAccessStatMap.forEach( if (sz == 1) {
(t, timeWinAccessStatMap) -> { return gatewayGroupService.currentGatewayGroupSet.iterator().next();
Map<String, AccessStat> accessStatMap = timeWinAccessStatMap.get(timeWinStart); }
if (accessStatMap != null) { StringBuilder b = ThreadContext.getStringBuilder(ThreadContext.sb0);
accessStatMap.forEach( byte i = 0;
(smp, accessStat) -> { for (String g : gatewayGroupService.currentGatewayGroupSet) {
AccessStat as = result.get(smp); b.append(g);
if (as == null) { i++;
accessStat.start = timeWinStart; if (i < sz) {
result.put(smp, accessStat); b.append(Consts.S.COMMA);
} else {
as.reqs = as.reqs + accessStat.reqs;
if (accessStat.reqTime > as.reqTime) {
as.reqTime = accessStat.reqTime;
} }
} }
return b.toString();
} }
);
} private static void toJsonStringValue(StringBuilder b, String value) {
} b.append(Consts.S.DOUBLE_QUOTE).append(value).append(Consts.S.DOUBLE_QUOTE);
);
return result;
} }
} }

View File

@@ -37,7 +37,7 @@ public class StatPluginFilterProperties {
@Value("${stat.open:false}") @Value("${stat.open:false}")
private boolean statOpen = false; private boolean statOpen = false;
@Value("${stat.channel:fizz_access_stat_new}") @Value("${stat.channel:fizz_access_stat}")
private String fizzAccessStatChannel; private String fizzAccessStatChannel;
@Value("${stat.topic:}") @Value("${stat.topic:}")