This commit is contained in:
hongqiaowei
2022-07-11 11:20:43 +08:00
parent 4dd9be1590
commit e5ebc2f624
7 changed files with 269 additions and 187 deletions

View File

@@ -21,25 +21,25 @@
</Policies> </Policies>
<DefaultRolloverStrategy max="50"/> <DefaultRolloverStrategy max="50"/>
</RollingRandomAccessFile>--> </RollingRandomAccessFile>-->
<!--<Kafka name="KafkaAppender4biz" topic="log-zt-fizz-bootstrap" syncSend="false"> <!--<Kafka name="KafkaAppender4biz" topic="fizz-bootstrap" syncSend="false">
<JsonTemplateLayout eventTemplateUri="classpath:log4j2-kafka.json"> <JsonTemplateLayout eventTemplateUri="classpath:log4j2-kafka.json">
<EventTemplateAdditionalField key="traceId" value="$${ctx:traceId}"/> <EventTemplateAdditionalField key="traceId" value="$${ctx:traceId}"/>
</JsonTemplateLayout> </JsonTemplateLayout>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property> <Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka> </Kafka>
<Kafka name="KafkaAppender4monitor" topic="log-zt-fizz-bootstrap-monitor" syncSend="false"> <Kafka name="KafkaAppender4monitor" topic="fizz-bootstrap-monitor" syncSend="false">
<PatternLayout pattern="%m"/> <PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property> <Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka> </Kafka>
<Kafka name="KafkaAppender4stat" topic="log-zt-fizz-bootstrap-stat" syncSend="false"> <Kafka name="KafkaAppender4stat" topic="fizz-bootstrap-stat" syncSend="false">
<PatternLayout pattern="%m"/> <PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property> <Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka> </Kafka>
<Kafka name="KafkaAppender4flow" topic="log-zt-fizz-bootstrap-flow" syncSend="false"> <Kafka name="KafkaAppender4flow" topic="fizz-bootstrap-flow" syncSend="false">
<PatternLayout pattern="%m"/> <PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property> <Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka> </Kafka>
<Kafka name="KafkaAppender4callback" topic="log-zt-fizz-bootstrap-callback" syncSend="false"> <Kafka name="KafkaAppender4callback" topic="fizz-bootstrap-callback" syncSend="false">
<PatternLayout pattern="%m"/> <PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property> <Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>--> </Kafka>-->

View File

@@ -168,7 +168,7 @@ public class FlowControlFilter extends FizzWebFilter {
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
String blockedResourceId = result.getBlockedResourceId(); String blockedResourceId = result.getBlockedResourceId();
if (BlockType.CIRCUIT_BREAK == result.getBlockType()) { if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null, currentTimeMillis); fizzMonitorService.alarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null);
// log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId); // log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId); log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId);
@@ -200,11 +200,11 @@ public class FlowControlFilter extends FizzWebFilter {
} else { } else {
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) { if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents, currentTimeMillis); fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents);
// log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId); // log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId); log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId);
} else { } else {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps, currentTimeMillis); fizzMonitorService.alarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps);
// log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId); // log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId); log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId);
} }
@@ -246,11 +246,11 @@ public class FlowControlFilter extends FizzWebFilter {
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat); cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat);
} }
if (statusCode == HttpStatus.GATEWAY_TIMEOUT) { if (statusCode == HttpStatus.GATEWAY_TIMEOUT) {
fizzMonitorService.sendAlarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage(), start); fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.TIMEOUT_ALARM, t.getMessage());
} else if (statusCode.is5xxServerError()) { } else if (statusCode.is5xxServerError()) {
fizzMonitorService.sendAlarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value()), start); fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, String.valueOf(statusCode.value()));
} else if (s == SignalType.ON_ERROR && t != null) { } else if (s == SignalType.ON_ERROR && t != null) {
fizzMonitorService.sendAlarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage(), start); fizzMonitorService.alarm(finalService, finalPath, FizzMonitorService.ERROR_ALARM, t.getMessage());
} }
} else { } else {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode); flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, true, statusCode);

View File

@@ -21,32 +21,50 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import we.config.AggregateRedisConfig; import we.config.AggregateRedisConfig;
import we.config.SchedConfig;
import we.util.Consts; import we.util.Consts;
import we.util.DateTimeUtils;
import we.util.JacksonUtils;
import we.util.ThreadContext; import we.util.ThreadContext;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
/** /**
* @author hongqiaowei * @author hongqiaowei
*/ */
@Service @Service
public class FizzMonitorService { public class FizzMonitorService extends SchedConfig {
private static final Logger LOGGER = LoggerFactory.getLogger("monitor"); private static final Logger MONITOR_LOGGER = LoggerFactory.getLogger("monitor");
private static final Logger LOGGER = LoggerFactory.getLogger(FizzMonitorService.class);
public static final byte ERROR_ALARM = 1; public static final byte ERROR_ALARM = 1;
public static final byte TIMEOUT_ALARM = 2; public static final byte TIMEOUT_ALARM = 2;
public static final byte RATE_LIMIT_ALARM = 3; public static final byte RATE_LIMIT_ALARM = 3;
public static final byte CIRCUIT_BREAK_ALARM = 4; public static final byte CIRCUIT_BREAK_ALARM = 4;
private static final String _service = "\"service\":"; private static class Alarm {
private static final String _path = "\"path\":";
private static final String _type = "\"type\":"; public String service;
private static final String _desc = "\"desc\":"; public String path;
private static final String _timestamp = "\"timestamp\":"; public int type;
public String desc;
public long timestamp;
public int reqs = 0;
public long start;
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}
@Value("${fizz.monitor.alarm.enable:true}") @Value("${fizz.monitor.alarm.enable:true}")
private boolean alarmEnable; private boolean alarmEnable;
@@ -54,38 +72,102 @@ public class FizzMonitorService {
@Value("${fizz.monitor.alarm.dest:redis}") @Value("${fizz.monitor.alarm.dest:redis}")
private String dest; private String dest;
@Value("${fizz.monitor.alarm.queue:fizz_alarm_channel}") @Value("${fizz.monitor.alarm.queue:fizz_alarm_channel_new}")
private String queue; private String queue;
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt; private ReactiveStringRedisTemplate rt;
public void sendAlarm(String service, String path, byte type, String desc, long timestamp) { private Map<Long/*thread id*/,
if (alarmEnable) { Map/*LinkedHashMap*/<Long/*time win start*/,
StringBuilder b = ThreadContext.getStringBuilder(); Map<String/*service+path+type*/, Alarm>
b.append(Consts.S.LEFT_BRACE); >
b.append(_service); toJsonStrVal(b, service); b.append(Consts.S.COMMA); >
b.append(_path); toJsonStrVal(b, path); b.append(Consts.S.COMMA); threadTimeWinAlarmMap = new HashMap<>();
b.append(_type); b.append(type); b.append(Consts.S.COMMA);
if (desc != null) { public void alarm(String service, String path, byte type, String desc) {
b.append(_desc); toJsonStrVal(b, desc); b.append(Consts.S.COMMA); if (alarmEnable) {
long tid = Thread.currentThread().getId();
Map<Long, Map<String, Alarm>> timeWinAlarmMap = threadTimeWinAlarmMap.get(tid);
if (timeWinAlarmMap == null) {
timeWinAlarmMap = new LinkedHashMap<Long, Map<String, Alarm>>(4, 1) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > 2;
}
};
threadTimeWinAlarmMap.put(tid, timeWinAlarmMap);
} }
b.append(_timestamp) .append(timestamp); long currentTimeWinStart = DateTimeUtils.get10sTimeWinStart(1);
b.append(Consts.S.RIGHT_BRACE); Map<String, Alarm> alarmMap = timeWinAlarmMap.computeIfAbsent(currentTimeWinStart, k -> new HashMap<>(128));
String msg = b.toString();
String key = ThreadContext.getStringBuilder().append(service).append(path).append(type).toString();
Alarm alarm = alarmMap.get(key);
if (alarm == null) {
alarm = new Alarm();
alarm.service = service;
alarm.path = path;
alarm.type = type;
alarmMap.put(key, alarm);
}
alarm.desc = desc;
alarm.timestamp = System.currentTimeMillis();
alarm.reqs++;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("update alarm: {} at {}", alarm, DateTimeUtils.convert(alarm.timestamp, Consts.DP.DP19));
}
}
}
@Scheduled(cron = "${fizz.monitor.alarm.sched.cron:2/10 * * * * ?}")
public void sched() {
long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2);
Map<String, Alarm> alarmMap = ThreadContext.getHashMap();
threadTimeWinAlarmMap.forEach(
(t, timeWinAlarmMap) -> {
Map<String, Alarm> alarmMap0 = timeWinAlarmMap.get(prevTimeWinStart);
if (alarmMap0 != null) {
alarmMap0.forEach(
(spt, alarm) -> {
Alarm a = alarmMap.get(spt);
if (a == null) {
alarm.start = prevTimeWinStart;
alarmMap.put(spt, alarm);
} else {
a.reqs = a.reqs + alarm.reqs;
if (alarm.timestamp > a.timestamp) {
a.timestamp = alarm.timestamp;
a.desc = alarm.desc;
}
}
}
);
}
}
);
if (alarmMap.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no alarm in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19));
}
} else {
alarmMap.forEach(
(spt, alarm) -> {
String msg = alarm.toString();
if (Consts.KAFKA.equals(dest)) { if (Consts.KAFKA.equals(dest)) {
// LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); MONITOR_LOGGER.info(msg);
LOGGER.info(msg); if (LOGGER.isDebugEnabled()) {
LOGGER.debug("send alarm {} which belong to {} window to topic", msg, DateTimeUtils.convert(alarm.start, Consts.DP.DP19));
}
} else { } else {
rt.convertAndSend(queue, msg).subscribe(); rt.convertAndSend(queue, msg).subscribe();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("send alarm {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(alarm.start, Consts.DP.DP19), queue);
} }
} }
} }
);
}
}
private static void toJsonStrVal(StringBuilder b, String value) {
b.append(Consts.S.DOUBLE_QUOTE).append(value).append(Consts.S.DOUBLE_QUOTE);
}
} }

View File

@@ -1,84 +1,84 @@
///* /*
// * Copyright (C) 2021 the original author or authors. * Copyright (C) 2021 the original author or authors.
// * *
// * This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or * the Free Software Foundation, either version 3 of the License, or
// * any later version. * any later version.
// * *
// * This program is distributed in the hope that it will be useful, * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details. * GNU General Public License for more details.
// * *
// * You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */ */
//
//package we.plugin.stat; package we.plugin.stat;
//
//import org.slf4j.Logger; import org.slf4j.Logger;
//import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
//import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
//import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
//import we.config.AggregateRedisConfig; import we.config.AggregateRedisConfig;
//import we.config.SchedConfig; import we.config.SchedConfig;
//import we.util.Consts; import we.util.Consts;
//import we.util.DateTimeUtils; import we.util.DateTimeUtils;
//import we.util.StringUtils; import we.util.StringUtils;
//
//import javax.annotation.Resource; import javax.annotation.Resource;
//import java.util.Map; import java.util.Map;
//
///** /**
// * @author hongqiaowei * @author hongqiaowei
// */ */
//
//@Configuration @Configuration
//public class AccessStatSchedConfig extends SchedConfig { public class AccessStatSchedConfig extends SchedConfig {
//
// private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class); private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class);
//
// private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat"); private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat");
//
// @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
// private ReactiveStringRedisTemplate rt; private ReactiveStringRedisTemplate rt;
//
// @Resource @Resource
// private StatPluginFilterProperties statPluginFilterProperties; private StatPluginFilterProperties statPluginFilterProperties;
//
// @Resource @Resource
// private StatPluginFilter statPluginFilter; private StatPluginFilter statPluginFilter;
//
// @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}") @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}")
// public void sched() { public void sched() {
// long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2); long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2);
// Map<String, AccessStat> accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart); Map<String, AccessStat> accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart);
//
// if (accessStatMap.isEmpty()) { if (accessStatMap.isEmpty()) {
// if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19)); LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19));
// } }
// } else { } else {
// accessStatMap.forEach( accessStatMap.forEach(
// (smp, accessStat) -> { (smp, accessStat) -> {
// String msg = accessStat.toString(); String msg = accessStat.toString();
// String topic = statPluginFilterProperties.getFizzAccessStatTopic(); String topic = statPluginFilterProperties.getFizzAccessStatTopic();
// if (StringUtils.isBlank(topic)) { if (StringUtils.isBlank(topic)) {
// String channel = statPluginFilterProperties.getFizzAccessStatChannel(); String channel = statPluginFilterProperties.getFizzAccessStatChannel();
// rt.convertAndSend(channel, msg).subscribe(); rt.convertAndSend(channel, msg).subscribe();
// if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel); LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel);
// } }
// } else { } else {
// STAT_LOGGER.info(msg); STAT_LOGGER.info(msg);
// if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19)); LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19));
// } }
// } }
// } }
// ); );
// } }
// } }
//} }

View File

@@ -17,107 +17,107 @@
package we.plugin.stat; package we.plugin.stat;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.plugin.PluginFilter; import we.plugin.PluginFilter;
import we.plugin.auth.GatewayGroupService;
import we.util.Consts; import we.util.Consts;
import we.util.DateTimeUtils;
import we.util.ThreadContext; import we.util.ThreadContext;
import we.util.WebUtils; import we.util.WebUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
/** /**
* @author hongqiaowei * @author hongqiaowei
* @apiNote unstable.
*/ */
@Component(StatPluginFilter.STAT_PLUGIN_FILTER) @Component(StatPluginFilter.STAT_PLUGIN_FILTER)
public class StatPluginFilter extends PluginFilter { public class StatPluginFilter extends PluginFilter {
private static final Logger log = LoggerFactory.getLogger("stat"); private static final Logger LOGGER = LoggerFactory.getLogger(StatPluginFilter.class);
public static final String STAT_PLUGIN_FILTER = "statPlugin"; public static final String STAT_PLUGIN_FILTER = "statPlugin";
private static final String ip = "\"ip\":";
private static final String gatewayGroup = "\"gatewayGroup\":";
private static final String service = "\"service\":";
private static final String appid = "\"appid\":";
private static final String apiMethod = "\"apiMethod\":";
private static final String apiPath = "\"apiPath\":";
private static final String reqTime = "\"reqTime\":";
@Resource @Resource
private StatPluginFilterProperties statPluginFilterProperties; private StatPluginFilterProperties statPluginFilterProperties;
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) private Map<Long/*thread id*/,
private ReactiveStringRedisTemplate rt; Map/*LinkedHashMap*/<Long/*time win start*/,
Map<String/*service+apiMethod+apiPath*/, AccessStat>
@Resource >
private GatewayGroupService gatewayGroupService; >
threadTimeWinAccessStatMap = new HashMap<>();
@Override @Override
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) { public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
if (statPluginFilterProperties.isStatOpen()) { if (statPluginFilterProperties.isStatOpen()) {
StringBuilder b = ThreadContext.getStringBuilder(); long tid = Thread.currentThread().getId();
b.append(Consts.S.LEFT_BRACE); Map<Long, Map<String, AccessStat>> timeWinAccessStatMap = threadTimeWinAccessStatMap.get(tid);
b.append(ip); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Consts.S.COMMA); if (timeWinAccessStatMap == null) {
b.append(gatewayGroup); toJsonStringValue(b, currentGatewayGroups()); b.append(Consts.S.COMMA); timeWinAccessStatMap = new LinkedHashMap<Long, Map<String, AccessStat>>(4, 1) {
b.append(service); toJsonStringValue(b, WebUtils.getClientService(exchange)); b.append(Consts.S.COMMA); @Override
protected boolean removeEldestEntry(Map.Entry eldest) {
String appId = WebUtils.getAppId(exchange); return size() > 2;
if (appId != null) { }
b.append(appid); toJsonStringValue(b, appId); b.append(Consts.S.COMMA); };
threadTimeWinAccessStatMap.put(tid, timeWinAccessStatMap);
} }
b.append(apiMethod); toJsonStringValue(b, exchange.getRequest().getMethodValue()); b.append(Consts.S.COMMA); long currentTimeWinStart = DateTimeUtils.get10sTimeWinStart(1);
b.append(apiPath); toJsonStringValue(b, WebUtils.getClientReqPath(exchange)); b.append(Consts.S.COMMA); Map<String, AccessStat> accessStatMap = timeWinAccessStatMap.computeIfAbsent(currentTimeWinStart, k -> new HashMap<>(128));
b.append(reqTime) .append(System.currentTimeMillis());
b.append(Consts.S.RIGHT_BRACE);
if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) { String service = WebUtils.getClientService(exchange);
rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe(); String method = exchange.getRequest().getMethodValue();
} else { String path = WebUtils.getClientReqPath(exchange);
// log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use String key = ThreadContext.getStringBuilder().append(service).append(method).append(path).toString();
log.info(b.toString()); AccessStat accessStat = accessStatMap.get(key);
if (accessStat == null) {
accessStat = new AccessStat();
accessStat.service = service;
accessStat.apiMethod = method;
accessStat.apiPath = path;
accessStatMap.put(key, accessStat);
}
accessStat.reqTime = System.currentTimeMillis();
accessStat.reqs++;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("update access stat: {}, which request at {}", accessStat, DateTimeUtils.convert(accessStat.reqTime, Consts.DP.DP19));
} }
} }
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, STAT_PLUGIN_FILTER, null); return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, STAT_PLUGIN_FILTER, null);
} }
private String currentGatewayGroups() { public Map<String, AccessStat> getAccessStat(long timeWinStart) {
int sz = gatewayGroupService.currentGatewayGroupSet.size(); Map<String, AccessStat> result = ThreadContext.getHashMap();
if (sz == 1) { threadTimeWinAccessStatMap.forEach(
return gatewayGroupService.currentGatewayGroupSet.iterator().next(); (t, timeWinAccessStatMap) -> {
} Map<String, AccessStat> accessStatMap = timeWinAccessStatMap.get(timeWinStart);
StringBuilder b = ThreadContext.getStringBuilder(ThreadContext.sb0); if (accessStatMap != null) {
byte i = 0; accessStatMap.forEach(
for (String g : gatewayGroupService.currentGatewayGroupSet) { (smp, accessStat) -> {
b.append(g); AccessStat as = result.get(smp);
i++; if (as == null) {
if (i < sz) { accessStat.start = timeWinStart;
b.append(Consts.S.COMMA); result.put(smp, accessStat);
} else {
as.reqs = as.reqs + accessStat.reqs;
if (accessStat.reqTime > as.reqTime) {
as.reqTime = accessStat.reqTime;
} }
} }
return b.toString();
} }
);
private static void toJsonStringValue(StringBuilder b, String value) { }
b.append(Consts.S.DOUBLE_QUOTE).append(value).append(Consts.S.DOUBLE_QUOTE); }
);
return result;
} }
} }

View File

@@ -37,7 +37,7 @@ public class StatPluginFilterProperties {
@Value("${stat.open:false}") @Value("${stat.open:false}")
private boolean statOpen = false; private boolean statOpen = false;
@Value("${stat.channel:fizz_access_stat}") @Value("${stat.channel:fizz_access_stat_new}")
private String fizzAccessStatChannel; private String fizzAccessStatChannel;
@Value("${stat.topic:}") @Value("${stat.topic:}")

View File

@@ -10,7 +10,7 @@
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version> <reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version> <lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version> <nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version>
<netty.version>4.1.78.Final</netty.version> <netty.version>4.1.79.Final</netty.version>
<httpcore.version>4.4.15</httpcore.version> <httpcore.version>4.4.15</httpcore.version>
<log4j2.version>2.17.2</log4j2.version> <log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>