From d3c9054638a4fe9baf0cf24ee0a9397e68e9f156 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Sat, 9 Jul 2022 15:33:16 +0800 Subject: [PATCH] Support redis cluster --- .../src/main/resources/log4j2-spring.xml | 1 - .../we/plugin/stat/AccessStatSchedConfig.java | 168 +++++++++--------- .../java/we/plugin/stat/StatPluginFilter.java | 125 +++++++------ .../stat/StatPluginFilterProperties.java | 2 +- 4 files changed, 147 insertions(+), 149 deletions(-) diff --git a/fizz-bootstrap/src/main/resources/log4j2-spring.xml b/fizz-bootstrap/src/main/resources/log4j2-spring.xml index eb8d908..2e9a9d8 100644 --- a/fizz-bootstrap/src/main/resources/log4j2-spring.xml +++ b/fizz-bootstrap/src/main/resources/log4j2-spring.xml @@ -8,7 +8,6 @@ - diff --git a/fizz-core/src/main/java/we/plugin/stat/AccessStatSchedConfig.java b/fizz-core/src/main/java/we/plugin/stat/AccessStatSchedConfig.java index 9580357..6884a40 100644 --- a/fizz-core/src/main/java/we/plugin/stat/AccessStatSchedConfig.java +++ b/fizz-core/src/main/java/we/plugin/stat/AccessStatSchedConfig.java @@ -1,84 +1,84 @@ -/* - * Copyright (C) 2021 the original author or authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package we.plugin.stat; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.core.ReactiveStringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import we.config.AggregateRedisConfig; -import we.config.SchedConfig; -import we.util.Consts; -import we.util.DateTimeUtils; -import we.util.StringUtils; - -import javax.annotation.Resource; -import java.util.Map; - -/** - * @author hongqiaowei - */ - -@Configuration -public class AccessStatSchedConfig extends SchedConfig { - - private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class); - - private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat"); - - @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) - private ReactiveStringRedisTemplate rt; - - @Resource - private StatPluginFilterProperties statPluginFilterProperties; - - @Resource - private StatPluginFilter statPluginFilter; - - @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}") - public void sched() { - long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2); - Map accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart); - - if (accessStatMap.isEmpty()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19)); - } - } else { - accessStatMap.forEach( - (smp, accessStat) -> { - String msg = accessStat.toString(); - String topic = statPluginFilterProperties.getFizzAccessStatTopic(); - if (StringUtils.isBlank(topic)) { - String channel = statPluginFilterProperties.getFizzAccessStatChannel(); - rt.convertAndSend(channel, msg).subscribe(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel); - } - } else { - STAT_LOGGER.info(msg); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19)); - } - } - } - ); - } - } -} +///* +// * Copyright (C) 2021 the original author or authors. +// * +// * This program is free software: you can redistribute it and/or modify +// * it under the terms of the GNU General Public License as published by +// * the Free Software Foundation, either version 3 of the License, or +// * any later version. +// * +// * This program is distributed in the hope that it will be useful, +// * but WITHOUT ANY WARRANTY; without even the implied warranty of +// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// * GNU General Public License for more details. +// * +// * You should have received a copy of the GNU General Public License +// * along with this program. If not, see . +// */ +// +//package we.plugin.stat; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +//import org.springframework.scheduling.annotation.Scheduled; +//import we.config.AggregateRedisConfig; +//import we.config.SchedConfig; +//import we.util.Consts; +//import we.util.DateTimeUtils; +//import we.util.StringUtils; +// +//import javax.annotation.Resource; +//import java.util.Map; +// +///** +// * @author hongqiaowei +// */ +// +//@Configuration +//public class AccessStatSchedConfig extends SchedConfig { +// +// private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class); +// +// private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat"); +// +// @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) +// private ReactiveStringRedisTemplate rt; +// +// @Resource +// private StatPluginFilterProperties statPluginFilterProperties; +// +// @Resource +// private StatPluginFilter statPluginFilter; +// +// @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}") +// public void sched() { +// long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2); +// Map accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart); +// +// if (accessStatMap.isEmpty()) { +// if (LOGGER.isDebugEnabled()) { +// LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19)); +// } +// } else { +// accessStatMap.forEach( +// (smp, accessStat) -> { +// String msg = accessStat.toString(); +// String topic = statPluginFilterProperties.getFizzAccessStatTopic(); +// if (StringUtils.isBlank(topic)) { +// String channel = statPluginFilterProperties.getFizzAccessStatChannel(); +// rt.convertAndSend(channel, msg).subscribe(); +// if (LOGGER.isDebugEnabled()) { +// LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel); +// } +// } else { +// STAT_LOGGER.info(msg); +// if (LOGGER.isDebugEnabled()) { +// LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19)); +// } +// } +// } +// ); +// } +// } +//} diff --git a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java index f6cbd4e..b021700 100644 --- a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java +++ b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java @@ -17,20 +17,21 @@ package we.plugin.stat; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; +import we.config.AggregateRedisConfig; import we.plugin.PluginFilter; +import we.plugin.auth.GatewayGroupService; import we.util.Consts; -import we.util.DateTimeUtils; import we.util.ThreadContext; import we.util.WebUtils; import javax.annotation.Resource; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; /** @@ -41,84 +42,82 @@ import java.util.Map; @Component(StatPluginFilter.STAT_PLUGIN_FILTER) public class StatPluginFilter extends PluginFilter { - private static final Logger LOGGER = LoggerFactory.getLogger(StatPluginFilter.class); + private static final Logger log = LoggerFactory.getLogger("stat"); public static final String STAT_PLUGIN_FILTER = "statPlugin"; + private static final String ip = "\"ip\":"; + + private static final String gatewayGroup = "\"gatewayGroup\":"; + + private static final String service = "\"service\":"; + + private static final String appid = "\"appid\":"; + + private static final String apiMethod = "\"apiMethod\":"; + + private static final String apiPath = "\"apiPath\":"; + + private static final String reqTime = "\"reqTime\":"; + @Resource private StatPluginFilterProperties statPluginFilterProperties; - private Map - > - > - threadTimeWinAccessStatMap = new HashMap<>(); + @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) + private ReactiveStringRedisTemplate rt; + + @Resource + private GatewayGroupService gatewayGroupService; @Override public Mono doFilter(ServerWebExchange exchange, Map config, String fixedConfig) { if (statPluginFilterProperties.isStatOpen()) { - long tid = Thread.currentThread().getId(); - Map> timeWinAccessStatMap = threadTimeWinAccessStatMap.get(tid); - if (timeWinAccessStatMap == null) { - timeWinAccessStatMap = new LinkedHashMap>(4, 1) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > 2; - } - }; - threadTimeWinAccessStatMap.put(tid, timeWinAccessStatMap); - } + StringBuilder b = ThreadContext.getStringBuilder(); + b.append(Consts.S.LEFT_BRACE); + b.append(ip); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Consts.S.COMMA); + b.append(gatewayGroup); toJsonStringValue(b, currentGatewayGroups()); b.append(Consts.S.COMMA); + b.append(service); toJsonStringValue(b, WebUtils.getClientService(exchange)); b.append(Consts.S.COMMA); - long currentTimeWinStart = DateTimeUtils.get10sTimeWinStart(1); - Map accessStatMap = timeWinAccessStatMap.computeIfAbsent(currentTimeWinStart, k -> new HashMap<>(128)); + String appId = WebUtils.getAppId(exchange); + if (appId != null) { + b.append(appid); toJsonStringValue(b, appId); b.append(Consts.S.COMMA); + } - String service = WebUtils.getClientService(exchange); - String method = exchange.getRequest().getMethodValue(); - String path = WebUtils.getClientReqPath(exchange); - String key = ThreadContext.getStringBuilder().append(service).append(method).append(path).toString(); - AccessStat accessStat = accessStatMap.get(key); - if (accessStat == null) { - accessStat = new AccessStat(); - accessStat.service = service; - accessStat.apiMethod = method; - accessStat.apiPath = path; - accessStatMap.put(key, accessStat); - } - accessStat.reqTime = System.currentTimeMillis(); - accessStat.reqs++; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19)); + b.append(apiMethod); toJsonStringValue(b, exchange.getRequest().getMethodValue()); b.append(Consts.S.COMMA); + b.append(apiPath); toJsonStringValue(b, WebUtils.getClientReqPath(exchange)); b.append(Consts.S.COMMA); + b.append(reqTime) .append(System.currentTimeMillis()); + b.append(Consts.S.RIGHT_BRACE); + + if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) { + rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe(); + } else { + // log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use + log.info(b.toString()); } } return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, STAT_PLUGIN_FILTER, null); } - public Map getAccessStat(long timeWinStart) { - Map result = ThreadContext.getHashMap(); - threadTimeWinAccessStatMap.forEach( - (t, timeWinAccessStatMap) -> { - Map accessStatMap = timeWinAccessStatMap.get(timeWinStart); - if (accessStatMap != null) { - accessStatMap.forEach( - (smp, accessStat) -> { - AccessStat as = result.get(smp); - if (as == null) { - accessStat.start = timeWinStart; - result.put(smp, accessStat); - } else { - as.reqs = as.reqs + accessStat.reqs; - if (accessStat.reqTime > as.reqTime) { - as.reqTime = accessStat.reqTime; - } - } - } - ); - } - } - ); - return result; + private String currentGatewayGroups() { + int sz = gatewayGroupService.currentGatewayGroupSet.size(); + if (sz == 1) { + return gatewayGroupService.currentGatewayGroupSet.iterator().next(); + } + StringBuilder b = ThreadContext.getStringBuilder(ThreadContext.sb0); + byte i = 0; + for (String g : gatewayGroupService.currentGatewayGroupSet) { + b.append(g); + i++; + if (i < sz) { + b.append(Consts.S.COMMA); + } + } + return b.toString(); + } + + private static void toJsonStringValue(StringBuilder b, String value) { + b.append(Consts.S.DOUBLE_QUOTE).append(value).append(Consts.S.DOUBLE_QUOTE); } } diff --git a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilterProperties.java b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilterProperties.java index 3697196..cd8b471 100644 --- a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilterProperties.java +++ b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilterProperties.java @@ -37,7 +37,7 @@ public class StatPluginFilterProperties { @Value("${stat.open:false}") private boolean statOpen = false; - @Value("${stat.channel:fizz_access_stat_new}") + @Value("${stat.channel:fizz_access_stat}") private String fizzAccessStatChannel; @Value("${stat.topic:}")