Integrate log4j2 kafka appender

This commit is contained in:
hongqiaowei
2022-06-06 17:13:58 +08:00
parent f2c9bd1243
commit bf91f6429f
53 changed files with 842 additions and 597 deletions

View File

@@ -20,7 +20,7 @@
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<netty.version>4.1.77.Final</netty.version>
<netty.version>4.1.78.Final</netty.version>
<httpcore.version>4.4.15</httpcore.version>
<log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version>
@@ -34,7 +34,7 @@
<commons-codec.version>1.15</commons-codec.version>
<commons-pool2.version>2.11.1</commons-pool2.version>
<gson.version>2.8.9</gson.version>
<netty-tcnative.version>2.0.52.Final</netty-tcnative.version>
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
</properties>
@@ -80,6 +80,18 @@
<artifactId>netty-tcnative-classes</artifactId>
<version>${netty-tcnative.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-layout-template-json</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
<profiles>

View File

@@ -85,6 +85,7 @@ import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWeb
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import we.config.AggregateRedisConfig;
import we.log.LogSendAppender;
import we.util.FileUtils;
/**
* fizz gateway application boot entrance
@@ -186,8 +187,13 @@ public class FizzBootstrapApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzBootstrapApplication.class);
public static void main(String[] args) {
System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
System.setProperty("log4j2.formatMsgNoLookups", "true");
System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
System.setProperty("log4j2.formatMsgNoLookups", "true");
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
String appRootDir = FileUtils.getAppRootDir();
System.setProperty("APP_ROOT_DIR", appRootDir);
LOGGER.info("app root dir: {}", appRootDir);
SpringApplication springApplication = new SpringApplication(FizzBootstrapApplication.class);
springApplication.setApplicationContextClass(CustomReactiveWebServerApplicationContext.class);

View File

@@ -0,0 +1,38 @@
{
"logTime": {
"$resolver": "timestamp",
"epoch": {
"unit": "millis",
"rounded": true
}
},
"logLevel": {
"$resolver": "level",
"field": "name"
},
"logMsg": {
"$resolver": "message",
"stringified": true
},
"thread": {
"$resolver": "thread",
"field": "name"
},
"loggerName": {
"$resolver": "logger",
"field": "name"
},
"thrown": {
"message": {
"$resolver": "exception",
"field": "message"
},
"extendedStackTrace": {
"$resolver": "exception",
"field": "stackTrace",
"stackTrace": {
"stringified": true
}
}
}
}

View File

@@ -2,24 +2,76 @@
<Configuration status="warn">
<properties>
<property name="APP_NAME">${sys:APP_NAME}</property>
<property name="APP_NAME">fizz-bootstrap</property>
<property name="LOG_DIR">${sys:APP_ROOT_DIR}/log</property>
<!--<property name="KAFKA_SERVER">1.1.1.1:9092</property>-->
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<!--<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level [%-29t] %30c{1}.%41M:%4L %m %ex%n"/>-->
<!--<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %30c{1}.%41M:%4L %X{traceId} %m{nolookups} %ex%n"/>-->
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %30c{1}.%41M:%4L %m{nolookups} %ex%n"/>
</Console>
<LogSend name="LogSend">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg{nolookups}%n"/>
</LogSend>
<!--<RollingRandomAccessFile name="RollingFile" fileName="${LOG_DIR}/${APP_NAME}.log" filePattern="${LOG_DIR}/$${date:yyyy-MM-dd}/${APP_NAME}-%d{HH}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level [%c{1}.%M:%L] %msg{nolookups}%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="10MB"/>
</Policies>
<DefaultRolloverStrategy max="50"/>
</RollingRandomAccessFile>-->
<!--<Kafka name="KafkaAppender4biz" topic="log-zt-fizz-bootstrap" syncSend="false">
<JsonTemplateLayout eventTemplateUri="classpath:log4j2-kafka.json">
<EventTemplateAdditionalField key="traceId" value="$${ctx:traceId}"/>
</JsonTemplateLayout>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>
<Kafka name="KafkaAppender4monitor" topic="log-zt-fizz-bootstrap-monitor" syncSend="false">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>
<Kafka name="KafkaAppender4stat" topic="log-zt-fizz-bootstrap-stat" syncSend="false">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>
<Kafka name="KafkaAppender4flow" topic="log-zt-fizz-bootstrap-flow" syncSend="false">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>
<Kafka name="KafkaAppender4callback" topic="log-zt-fizz-bootstrap-callback" syncSend="false">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">${KAFKA_SERVER}</Property>
</Kafka>-->
</Appenders>
<Loggers>
<Root level="warn" includeLocation="true">
<Root level="warn" includeLocation="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="LogSend"/>
<!--<AppenderRef ref="RollingFile"/>-->
<!--<AppenderRef ref="KafkaAppender4biz"/>-->
</Root>
<Logger name="org.apache.kafka" level="info" includeLocation="false"/>
<!-- suppress the warn 'No URLs will be polled as dynamic configuration sources.' -->
<logger name="com.netflix.config.sources.URLConfigurationSource" level="ERROR" includeLocation="true"/>
<Logger name="we" level="info" includeLocation="true"/>
<logger name="com.netflix.config.sources.URLConfigurationSource" level="ERROR" includeLocation="false"/>
<Logger name="we" level="info" includeLocation="false"/>
<Logger name="monitor" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4monitor"/>-->
</Logger>
<Logger name="stat" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4stat"/>-->
</Logger>
<Logger name="flow" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4flow"/>-->
</Logger>
<Logger name="callback" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4callback"/>-->
</Logger>
</Loggers>
</Configuration>

View File

@@ -0,0 +1,2 @@
log4j2.asyncQueueFullPolicy=Discard
log4j2.discardThreshold=ERROR

View File

@@ -1,17 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<properties>
<property name="APP_NAME">fizz-gateway</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %X{traceId} %msg%n" />
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console" />
<AppenderRef ref="Console"/>
</Root>
<Logger name="we" level="DEBUG"/>
</Loggers>

View File

@@ -17,6 +17,16 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-layout-template-json</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>

View File

@@ -17,24 +17,29 @@
package we.flume.clients.log4j2appender;
import org.apache.logging.log4j.ThreadContext;
import we.constants.CommonConstants;
import we.util.Consts;
public enum LogService {
BIZ_ID, HANDLE_STGY, APP;
public static void cleanBizId() {
setBizId(null);
// setBizId(null);
ThreadContext.remove(Consts.TRACE_ID);
}
public static Object getBizId() {
return ThreadContext.get(Constants.BIZ_ID);
// return ThreadContext.get(Constants.BIZ_ID);
return ThreadContext.get(Consts.TRACE_ID);
}
public static void setBizId(Object bizId) {
ThreadContext.set(Constants.BIZ_ID, bizId);
// ThreadContext.set(Constants.BIZ_ID, bizId);
if (bizId != null) {
org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId));
// org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId));
ThreadContext.put(Consts.TRACE_ID, String.valueOf(bizId));
}
}
@@ -45,7 +50,7 @@ public enum LogService {
public static String toESaKF(String topic) {
return Constants.AND + topic;
}
public static class Constants {
static final String BIZ_ID = "bizId";
static final char AND = '&';

View File

@@ -1,90 +1,90 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.flume.clients.log4j2appender;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
/** for internal use */
public abstract class ThreadContext {
private static ThreadLocal<Map<String, Object>> tl = new ThreadLocal<>();
private static final int mapCap = 32;
private static final String sb = "sb";
private static final int sbCap = 256;
public static StringBuilder getStringBuilder() {
return getStringBuilder(true);
}
public static StringBuilder getStringBuilder(boolean clean) {
Map<String, Object> m = getMap();
StringBuilder b = (StringBuilder) m.get(sb);
if (b == null) {
b = new StringBuilder(sbCap);
m.put(sb, b);
} else {
if (clean) {
b.delete(0, b.length());
}
}
return b;
}
public static SimpleDateFormat getSimpleDateFormat(String pattern) {
Map<String, Object> m = getMap();
SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern);
if (sdf == null) {
sdf = new SimpleDateFormat(pattern);
m.put(pattern, sdf);
}
return sdf;
}
public static Object get(String key, Class<?> clz) {
Object obj = get(key);
if (obj == null) {
try {
obj = clz.newInstance();
set(key, obj);
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return obj;
}
private static Map<String, Object> getMap() {
Map<String, Object> m = tl.get();
if (m == null) {
m = new HashMap<>(mapCap);
tl.set(m);
}
return m;
}
public static Object get(String key) {
return getMap().get(key);
}
public static void set(String key, Object obj) {
getMap().put(key, obj);
}
}
///*
// * Copyright (C) 2020 the original author or authors.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//
//package we.flume.clients.log4j2appender;
//
//import java.text.SimpleDateFormat;
//import java.util.HashMap;
//import java.util.Map;
//
///** for internal use */
//public abstract class ThreadContext {
//
// private static ThreadLocal<Map<String, Object>> tl = new ThreadLocal<>();
// private static final int mapCap = 32;
//
// private static final String sb = "sb";
// private static final int sbCap = 256;
//
// public static StringBuilder getStringBuilder() {
// return getStringBuilder(true);
// }
//
// public static StringBuilder getStringBuilder(boolean clean) {
// Map<String, Object> m = getMap();
// StringBuilder b = (StringBuilder) m.get(sb);
// if (b == null) {
// b = new StringBuilder(sbCap);
// m.put(sb, b);
// } else {
// if (clean) {
// b.delete(0, b.length());
// }
// }
// return b;
// }
//
// public static SimpleDateFormat getSimpleDateFormat(String pattern) {
// Map<String, Object> m = getMap();
// SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern);
// if (sdf == null) {
// sdf = new SimpleDateFormat(pattern);
// m.put(pattern, sdf);
// }
// return sdf;
// }
//
// public static Object get(String key, Class<?> clz) {
// Object obj = get(key);
// if (obj == null) {
// try {
// obj = clz.newInstance();
// set(key, obj);
// } catch (InstantiationException | IllegalAccessException e) {
// throw new RuntimeException(e);
// }
// }
// return obj;
// }
//
// private static Map<String, Object> getMap() {
// Map<String, Object> m = tl.get();
// if (m == null) {
// m = new HashMap<>(mapCap);
// tl.set(m);
// }
// return m;
// }
//
// public static Object get(String key) {
// return getMap().get(key);
// }
//
// public static void set(String key, Object obj) {
// getMap().put(key, obj);
// }
//}

View File

@@ -107,8 +107,8 @@ public final class Consts {
public static final int GB = 1024 * MB;
}
public static final String HTTP_SERVER = "http_server";
public static final String HTTP_CLIENT = "http_client";
public static final String HTTP_SERVER = "httpServer";
public static final String HTTP_CLIENT = "httpClient";
public static final String MYSQL = "mysql";
public static final String REDIS = "redis";
public static final String CODIS = "codis";
@@ -118,5 +118,5 @@ public final class Consts {
public static final String SCHED = "sched";
public static final String R2DBC = "r2dbc";
public static final String TRACE_ID = "id^";
public static final String TRACE_ID = "traceId";
}

View File

@@ -17,18 +17,11 @@
package we.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable;
import we.flume.clients.log4j2appender.LogService;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
@@ -37,8 +30,6 @@ import java.nio.charset.StandardCharsets;
public abstract class NettyDataBufferUtils extends org.springframework.core.io.buffer.DataBufferUtils {
private static final Logger log = LoggerFactory.getLogger(NettyDataBufferUtils.class);
private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
public static final DataBuffer EMPTY_DATA_BUFFER = from(new byte[0]);
@@ -54,14 +45,6 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
return (NettyDataBuffer) dataBufferFactory.wrap(bytes);
}
/*public static NettyDataBuffer from(ByteBuffer byteBuffer) {
return dataBufferFactory.wrap(byteBuffer);
}
public static NettyDataBuffer from(ByteBuf byteBuf) {
return dataBufferFactory.wrap(byteBuf);
}*/
public static byte[] copyBytes(DataBuffer dataBuffer) {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
@@ -71,25 +54,4 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
public static DataBuffer copy2heap(DataBuffer dataBuffer) {
return from(copyBytes(dataBuffer));
}
/*public static boolean release(@Nullable String traceId, @Nullable DataBuffer dataBuffer) {
if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) {
if (pooledDataBuffer instanceof NettyDataBuffer) {
NettyDataBuffer ndb = (NettyDataBuffer) pooledDataBuffer;
ByteBuf nativeBuffer = ndb.getNativeBuffer();
int refCnt = nativeBuffer.refCnt();
if (refCnt < 1) {
if (log.isDebugEnabled()) {
log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, traceId);
}
return false;
}
}
return pooledDataBuffer.release();
}
}
return false;
}*/
}

View File

@@ -22,19 +22,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Consts;
import we.util.DateTimeUtils;
import we.util.NetworkUtils;
import we.util.ResourceIdUtils;
import we.util.ThreadContext;
import we.util.*;
import javax.annotation.Resource;
import java.math.BigDecimal;
@@ -48,7 +42,8 @@ import java.util.List;
//@EnableScheduling
public class FlowStatSchedConfig extends SchedConfig {
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
private static final Logger FLOW_LOGGER = LoggerFactory.getLogger("flow");
private static final String _ip = "\"ip\":";
private static final String _id = "\"id\":";
@@ -239,13 +234,16 @@ public class FlowStatSchedConfig extends SchedConfig {
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
FLOW_LOGGER.info(msg);
} else {
rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe();
}
if (log.isDebugEnabled()) {
String wt = 'w' + toDP19(timeWin);
log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt);
// log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, wt);
log.debug("report " + wt + ": " + msg);
}
}
}

View File

@@ -1,104 +1,104 @@
package we.config;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.event.ApplicationStartingEvent;
import org.springframework.boot.context.event.SpringApplicationEvent;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import we.log.LogProperties;
import we.log.LogSendAppenderWithLogback;
@Configuration
public class LogConfig {
@Bean
@ConfigurationProperties("fizz.logging")
public LogProperties logProperties() {
return new LogProperties();
}
@Configuration
@ConditionalOnClass(AbstractAppender.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLog4j2Config {
}
@Configuration
@ConditionalOnClass(LoggerContext.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLogbackConfig {
@Bean
public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
return new LoggingConfigurationApplicationListener(logProperties);
}
}
public static class LoggingConfigurationApplicationListener {
private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
private static final String APPENDER_NAME = "fizzLogSendAppender";
private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
private LogProperties logProperties;
public LoggingConfigurationApplicationListener() {
}
public LoggingConfigurationApplicationListener(LogProperties logProperties) {
this.logProperties = logProperties;
}
@EventListener
public void contextRefreshed(SpringApplicationEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationStarting(ApplicationStartingEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationReady(ApplicationReadyEvent event) {
onApplicationEvent(event);
}
public void onApplicationEvent(ApplicationEvent event) {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
newAppender.setName(APPENDER_NAME);
newAppender.setContext(context);
PatternLayout layout = new PatternLayout();
layout.setPattern(layoutConfig);
newAppender.setLayout(layout);
Appender<ILoggingEvent> appender = root.getAppender(APPENDER_NAME);
if (appender == null) {
newAppender.start();
root.addAppender(newAppender);
logger.info("Added fizz log send appender:{}", APPENDER_NAME);
} else {
newAppender.start();
root.detachAppender(APPENDER_NAME);
root.addAppender(newAppender);
logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
}
}
}
}
//package we.config;
//
//import ch.qos.logback.classic.LoggerContext;
//import ch.qos.logback.classic.PatternLayout;
//import ch.qos.logback.classic.spi.ILoggingEvent;
//import ch.qos.logback.core.Appender;
//import org.apache.commons.lang3.StringUtils;
//import org.apache.logging.log4j.core.appender.AbstractAppender;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.boot.autoconfigure.AutoConfigureAfter;
//import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
//import org.springframework.boot.context.event.ApplicationReadyEvent;
//import org.springframework.boot.context.event.ApplicationStartingEvent;
//import org.springframework.boot.context.event.SpringApplicationEvent;
//import org.springframework.boot.context.properties.ConfigurationProperties;
//import org.springframework.context.ApplicationEvent;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.context.event.EventListener;
//import we.log.LogProperties;
//import we.log.LogSendAppenderWithLogback;
//
//@Configuration
//public class LogConfig {
//
// @Bean
// @ConfigurationProperties("fizz.logging")
// public LogProperties logProperties() {
// return new LogProperties();
// }
//
// @Configuration
// @ConditionalOnClass(AbstractAppender.class)
// @AutoConfigureAfter(AggregateRedisConfig.class)
// public static class CustomLog4j2Config {
// }
//
// @Configuration
// @ConditionalOnClass(LoggerContext.class)
// @AutoConfigureAfter(AggregateRedisConfig.class)
// public static class CustomLogbackConfig {
// @Bean
// public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
// return new LoggingConfigurationApplicationListener(logProperties);
// }
// }
//
// public static class LoggingConfigurationApplicationListener {
// private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
// private static final String APPENDER_NAME = "fizzLogSendAppender";
// private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
// private LogProperties logProperties;
//
// public LoggingConfigurationApplicationListener() {
// }
//
// public LoggingConfigurationApplicationListener(LogProperties logProperties) {
// this.logProperties = logProperties;
// }
//
// @EventListener
// public void contextRefreshed(SpringApplicationEvent event) {
// onApplicationEvent(event);
// }
//
// @EventListener
// public void applicationStarting(ApplicationStartingEvent event) {
// onApplicationEvent(event);
// }
//
// @EventListener
// public void applicationReady(ApplicationReadyEvent event) {
// onApplicationEvent(event);
// }
//
// public void onApplicationEvent(ApplicationEvent event) {
// LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
// final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
// String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
//
// final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
// newAppender.setName(APPENDER_NAME);
// newAppender.setContext(context);
// PatternLayout layout = new PatternLayout();
// layout.setPattern(layoutConfig);
// newAppender.setLayout(layout);
//
// Appender<ILoggingEvent> appender = root.getAppender(APPENDER_NAME);
// if (appender == null) {
// newAppender.start();
// root.addAppender(newAppender);
// logger.info("Added fizz log send appender:{}", APPENDER_NAME);
// } else {
// newAppender.start();
// root.detachAppender(APPENDER_NAME);
// root.addAppender(newAppender);
// logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
// }
// }
// }
//
//
//}

View File

@@ -27,7 +27,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.CallbackReplayReq;
import we.proxy.CallbackService;
import we.util.Consts;
@@ -54,7 +53,9 @@ public class CallbackController {
public Mono<String> callback(ServerWebExchange exchange, @RequestBody CallbackReplayReq req) {
if (log.isDebugEnabled()) {
log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id);
// log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
log.debug(JacksonUtils.writeValueAsString(req));
}
return
@@ -69,13 +70,16 @@ public class CallbackController {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(req.id).append(' ').append(req.service).append(' ').append(req.path).append(' ');
ServerHttpResponse resp = exchange.getResponse();
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
if (r.code == Result.SUCC) {
log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id);
// log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id);
log.info(b.append("replay success").toString());
resp.setStatusCode(HttpStatus.OK);
return Consts.S.EMPTY;
} else {
b.append("replay error:\n").append(r);
log.error(b.toString(), LogService.BIZ_ID, req.id);
// log.error(b.toString(), LogService.BIZ_ID, req.id);
log.error(b.toString());
resp.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
if (r.msg != null) {
return r.msg;

View File

@@ -33,12 +33,8 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.FizzMangerConfig;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.FizzWebClient;
import we.util.DateTimeUtils;
import we.util.Result;
import we.util.ThreadContext;
import we.util.WebUtils;
import we.util.*;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@@ -100,7 +96,9 @@ public class DedicatedLineController {
boolean equals = DedicatedLineUtils.checkSign(dedicatedLineId, timestamp, pairCodeSecretKey, sign);
if (!equals) {
String traceId = WebUtils.getTraceId(exchange);
log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId);
// log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign);
return Result.fail("request sign invalid");
}
return Result.succ();
@@ -128,7 +126,9 @@ public class DedicatedLineController {
if (log.isDebugEnabled()) {
StringBuilder sb = ThreadContext.getStringBuilder();
WebUtils.response2stringBuilder(traceId, remoteResp, sb);
log.debug(sb.toString(), LogService.BIZ_ID, traceId);
// log.debug(sb.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(sb.toString());
}
return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) )
.doOnError ( throwable -> cleanup(remoteResp) )
@@ -163,7 +163,9 @@ public class DedicatedLineController {
if (log.isDebugEnabled()) {
StringBuilder sb = ThreadContext.getStringBuilder();
WebUtils.response2stringBuilder(traceId, remoteResp, sb);
log.debug(sb.toString(), LogService.BIZ_ID, traceId);
// log.debug(sb.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(sb.toString());
}
return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) )
.doOnError ( throwable -> cleanup(remoteResp) )

View File

@@ -17,16 +17,10 @@
package we.filter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Resource;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
@@ -41,10 +35,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -55,12 +45,20 @@ import we.fizz.AggregateResult;
import we.fizz.ConfigLoader;
import we.fizz.Pipeline;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.util.Consts;
import we.util.MapUtil;
import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* @author Francis Dong
*/
@@ -140,7 +138,8 @@ public class AggregateFilter implements WebFilter {
// traceId
final String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug("{} matched api in aggregation: {}", traceId, path);
@@ -184,7 +183,8 @@ public class AggregateFilter implements WebFilter {
}
}
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
if (aggResult.getHttpStatus() != null) {
serverHttpResponse.setRawStatusCode(aggResult.getHttpStatus());
}

View File

@@ -35,7 +35,6 @@ import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.CallbackConfig;
import we.plugin.auth.GatewayGroupService;
@@ -63,6 +62,7 @@ import java.util.List;
public class CallbackFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(CallbackFilter.class);
private static final Logger CALLBACK_LOGGER = LoggerFactory.getLogger("callback");
public static final String CALLBACK_FILTER = "callbackFilter";
@@ -226,7 +226,8 @@ public class CallbackFilter extends FizzWebFilter {
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(callbackFilterProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue()));
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue()));
CALLBACK_LOGGER.info(msg);
} else {
rt.convertAndSend(callbackFilterProperties.getQueue(), msg).subscribe();
}

View File

@@ -37,7 +37,6 @@ import we.exception.ExecuteScriptException;
import we.exception.RedirectException;
import we.exception.StopAndResponseException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.Consts;
import we.util.JacksonUtils;
@@ -45,7 +44,6 @@ import we.util.ThreadContext;
import we.util.WebUtils;
import java.net.URI;
import java.util.concurrent.TimeoutException;
/**
* @author hongqiaowei
@@ -113,7 +111,9 @@ public class FilterExceptionHandlerConfig {
if (t instanceof FizzRuntimeException) {
FizzRuntimeException ex = (FizzRuntimeException) t;
log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex);
// log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(traceId + ' ' + tMsg, ex);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
@@ -130,7 +130,9 @@ public class FilterExceptionHandlerConfig {
if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString(), t);
String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));

View File

@@ -24,9 +24,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.ThreadContext;
import we.util.WebUtils;
@@ -55,7 +54,9 @@ public class FizzLogFilter implements WebFilter {
WebUtils.request2stringBuilder(exchange, b);
b.append(resp).append(exchange.getResponse().getStatusCode())
.append(in) .append(System.currentTimeMillis() - start);
log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.info(b.toString());
}
}
);

View File

@@ -32,7 +32,6 @@ import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.monitor.FizzMonitorService;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
@@ -134,7 +133,8 @@ public class FlowControlFilter extends FizzWebFilter {
if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) {
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (!apiConfigService.serviceConfigMap.containsKey(service)) {
String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId);
return WebUtils.responseJson(exchange, HttpStatus.FORBIDDEN, null, json);
@@ -155,7 +155,8 @@ public class FlowControlFilter extends FizzWebFilter {
String blockedResourceId = result.getBlockedResourceId();
if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null, currentTimeMillis);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId);
String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType();
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
@@ -186,10 +187,12 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents, currentTimeMillis);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId);
} else {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps, currentTimeMillis);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId);
}
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE);

View File

@@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.Route;
@@ -78,11 +77,14 @@ public class RouteFilter extends FizzWebFilter {
if (resp == null) { // should not reach here
ServerHttpRequest clientReq = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, traceId);
// log.error(msg, LogService.BIZ_ID, traceId);
log.error(msg);
} else {
log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
// log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
log.error(msg, pfr.cause);
}
HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR;
if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {
@@ -202,7 +204,9 @@ public class RouteFilter extends FizzWebFilter {
StringBuilder b = ThreadContext.getStringBuilder();
String traceId = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(traceId, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp));
@@ -265,7 +269,9 @@ public class RouteFilter extends FizzWebFilter {
if (ls[0] != null) {
b.append('\n').append(ls[0]);
}
log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
// log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.error(b.toString(), t);
}
)
;

View File

@@ -18,6 +18,7 @@
package we.fizz;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
@@ -30,9 +31,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.MapUtil;
import we.util.Utils;
import we.util.WebUtils;
@@ -72,8 +72,8 @@ public class AggregateService {
Pipeline pipeline = aggregateResource.getPipeline();
Input input = aggregateResource.getInput();
Map<String, Object> hs = MapUtil.toHashMap(headers);
// String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("matched aggregation api: {}", pash);
Map<String, Object> clientInput = new HashMap<>();
clientInput.put("path", pash);
@@ -102,7 +102,8 @@ public class AggregateService {
public Mono<? extends Void> genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) {
ServerHttpResponse clientResp = exchange.getResponse();
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
String js = null;
if(ar.getBody() instanceof String) {
js = (String) ar.getBody();

View File

@@ -19,33 +19,29 @@ package we.fizz;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AppConfigProperties;
import we.fizz.input.*;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import we.flume.clients.log4j2appender.LogService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AppConfigProperties;
import we.fizz.input.ClientInputConfig;
import we.fizz.input.Input;
import we.fizz.input.InputFactory;
import we.fizz.input.InputType;
import we.util.Consts;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static we.util.Consts.S.FORWARD_SLASH;
import static we.util.Consts.S.FORWARD_SLASH_STR;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -54,6 +50,10 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static we.util.Consts.S.FORWARD_SLASH;
import static we.util.Consts.S.FORWARD_SLASH_STR;
/**
*
* @author Francis Dong
@@ -245,7 +245,10 @@ public class ConfigLoader {
return Flux.just(entry);
}
String configStr = (String) entry.getValue();
LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString());
// LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr);
try {
this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp);

View File

@@ -17,28 +17,17 @@
package we.fizz;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.script.ScriptException;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.util.Pair;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.multipart.FilePart;
import we.schema.util.I18nUtils;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.constants.CommonConstants;
@@ -52,13 +41,10 @@ import we.fizz.component.StepContextPosition;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.field.FieldConfig;
import we.fizz.field.ValueTypeEnum;
import we.fizz.input.ClientInputConfig;
import we.fizz.input.Input;
import we.fizz.input.InputConfig;
import we.fizz.input.PathMapping;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.schema.util.I18nUtils;
import we.schema.util.PropertiesSupportUtils;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.JsonSchemaUtils;
import we.util.MapUtil;
@@ -66,6 +52,9 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.*;
/**
*
* @author linwaiwai
@@ -197,7 +186,8 @@ public class Pipeline {
AggregateResult aggResult = this.doInputDataMapping(input, null);
this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3);
if(this.stepContext.isDebug() || LOGGER.isDebugEnabled()) {
LogService.setBizId(this.stepContext.getTraceId());
// LogService.setBizId(this.stepContext.getTraceId());
ThreadContext.put(Consts.TRACE_ID, this.stepContext.getTraceId());
String jsonString = JSON.toJSONString(aggResult);
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("aggResult {}", jsonString);

View File

@@ -16,22 +16,22 @@
*/
package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author linwaiwai
@@ -79,7 +79,8 @@ public class RPCInput extends Input {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition);
}

View File

@@ -17,18 +17,12 @@
package we.fizz.input.extension.dubbo;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext;
@@ -37,18 +31,17 @@ import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author linwaiwai
@@ -159,7 +152,8 @@ public class DubboInput extends RPCInput {
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -180,7 +174,8 @@ public class DubboInput extends RPCInput {
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", this.getApiName(), ex);
inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis);
}
@@ -223,7 +218,8 @@ public class DubboInput extends RPCInput {
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}

View File

@@ -18,11 +18,10 @@
package we.fizz.input.extension.grpc;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext;
@@ -32,18 +31,17 @@ import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.*;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.grpc.GrpcGenericService;
import we.proxy.grpc.GrpcInstanceService;
import we.proxy.grpc.GrpcInterfaceDeclaration;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptException;
/**
*
* @author linwaiwai
@@ -153,7 +151,8 @@ public class GrpcInput extends RPCInput implements IInput {
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -174,7 +173,8 @@ public class GrpcInput extends RPCInput implements IInput {
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", this.getApiName(), ex);
inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis);
}
@@ -218,7 +218,8 @@ public class GrpcInput extends RPCInput implements IInput {
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg),
e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);

View File

@@ -17,16 +17,9 @@
package we.fizz.input.extension.request;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.script.ScriptException;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,24 +31,13 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.IInput;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.proxy.FizzWebClient;
import we.proxy.http.HttpInstanceService;
import we.service_registry.RegistryCenterService;
@@ -67,6 +49,14 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author linwaiwai
@@ -189,7 +179,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -321,7 +312,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -473,7 +465,8 @@ public class RequestInput extends RPCInput implements IInput{
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex);
inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis);
@@ -533,7 +526,8 @@ public class RequestInput extends RPCInput implements IInput{
protected void doOnBodySuccess(Object resp, long elapsedMillis) {
if(inputContext.getStepContext().isDebug()) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp);

View File

@@ -16,6 +16,7 @@
*/
package we.log;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
@@ -25,7 +26,7 @@ import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.NetworkUtils;
import java.io.Serializable;
@@ -90,7 +91,7 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender {
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
/*Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
@@ -105,7 +106,13 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender {
if (bizId == null) {
return "";
}
return bizId.toString();
return bizId.toString();*/
String traceId = ThreadContext.get(Consts.TRACE_ID);
if (traceId == null) {
return Consts.S.EMPTY;
}
return traceId;
}
@PluginFactory

View File

@@ -1,94 +1,94 @@
package we.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LogbackException;
import lombok.Getter;
import lombok.Setter;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.NetworkUtils;
import static we.log.LogSendAppender.*;
/**
* log send appender with logback
*
* @author huahuang
*/
public class LogSendAppenderWithLogback extends AppenderBase<ILoggingEvent> {
//负责将日志事件转换为字符串需Getter和Setter方法
@Getter
@Setter
private Layout<ILoggingEvent> layout;
@Override
protected void append(ILoggingEvent event) {
try {
if (logEnabled != null && !logEnabled) {
return;
}
if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// local cache
logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(
this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt,
event.getTimeStamp(), this.getLayout().doLayout(event));
return;
}
if (logEnabled == null && logSendService == null) {
// no legal logSendService, discard the local cache
logEnabled = Boolean.FALSE;
logSends = null;
return;
}
if (logEnabled == null) {
logEnabled = Boolean.TRUE;
LogSend[] logSends;
synchronized (LogSendAppender.class) {
logSends = LogSendAppender.logSends;
LogSendAppender.logSends = null;
}
// logSendService is ready, send the local cache
if (logSends != null) {
int size = Math.min(logSendIndex.get(), logSends.length);
for (int i = 0; i < size; i++) {
logSendService.send(logSends[i]);
}
}
}
LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(),
event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event));
logSendService.send(logSend);
} catch (Exception ex) {
throw new LogbackException(event.getFormattedMessage(), ex);
}
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
if (p == LogService.BIZ_ID) {
if (i != parameters.length - 1) {
bizId = parameters[i + 1];
}
break;
}
}
}
if (bizId == null) {
return "";
}
return bizId.toString();
}
}
//package we.log;
//
//import ch.qos.logback.classic.spi.ILoggingEvent;
//import ch.qos.logback.core.AppenderBase;
//import ch.qos.logback.core.Layout;
//import ch.qos.logback.core.LogbackException;
//import lombok.Getter;
//import lombok.Setter;
//import we.FizzAppContext;
//import we.flume.clients.log4j2appender.LogService;
//import we.util.NetworkUtils;
//
//import static we.log.LogSendAppender.*;
//
///**
// * log send appender with logback
// *
// * @author huahuang
// */
//public class LogSendAppenderWithLogback extends AppenderBase<ILoggingEvent> {
//
// //负责将日志事件转换为字符串需Getter和Setter方法
// @Getter
// @Setter
// private Layout<ILoggingEvent> layout;
//
// @Override
// protected void append(ILoggingEvent event) {
// try {
// if (logEnabled != null && !logEnabled) {
// return;
// }
//
// if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// // local cache
// logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(
// this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt,
// event.getTimeStamp(), this.getLayout().doLayout(event));
// return;
// }
//
// if (logEnabled == null && logSendService == null) {
// // no legal logSendService, discard the local cache
// logEnabled = Boolean.FALSE;
// logSends = null;
// return;
// }
//
// if (logEnabled == null) {
// logEnabled = Boolean.TRUE;
//
// LogSend[] logSends;
// synchronized (LogSendAppender.class) {
// logSends = LogSendAppender.logSends;
// LogSendAppender.logSends = null;
// }
//
// // logSendService is ready, send the local cache
// if (logSends != null) {
// int size = Math.min(logSendIndex.get(), logSends.length);
// for (int i = 0; i < size; i++) {
// logSendService.send(logSends[i]);
// }
// }
// }
//
// LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(),
// event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event));
// logSendService.send(logSend);
// } catch (Exception ex) {
// throw new LogbackException(event.getFormattedMessage(), ex);
// }
// }
//
// private String getBizId(Object[] parameters) {
// Object bizId = LogService.getBizId();
// if (parameters != null) {
// for (int i = parameters.length - 1; i > -1; --i) {
// Object p = parameters[i];
// if (p == LogService.BIZ_ID) {
// if (i != parameters.length - 1) {
// bizId = parameters[i + 1];
// }
// break;
// }
// }
// }
// if (bizId == null) {
// return "";
// }
// return bizId.toString();
// }
//
//}

View File

@@ -23,7 +23,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.ThreadContext;
@@ -36,7 +35,7 @@ import javax.annotation.Resource;
@Service
public class FizzMonitorService {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzMonitorService.class);
private static final Logger LOGGER = LoggerFactory.getLogger("monitor");
public static final byte ERROR_ALARM = 1;
public static final byte TIMEOUT_ALARM = 2;
@@ -76,8 +75,9 @@ public class FizzMonitorService {
b.append(_timestamp) .append(timestamp);
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if (Consts.KAFKA.equals(dest)) { // for internal use
LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
if (Consts.KAFKA.equals(dest)) {
// LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
LOGGER.info(msg);
} else {
rt.convertAndSend(queue, msg).subscribe();
}

View File

@@ -17,16 +17,15 @@
package we.plugin;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.filter.FilterResult;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.Consts;
import we.util.WebUtils;
import java.util.Map;
@@ -53,8 +52,10 @@ public abstract class PluginFilter implements FizzPluginFilter {
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
if (log.isDebugEnabled()) {
log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId);
// log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId);
log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"));
}
if (pfr.success) {
return doFilter(exchange, config, fixedConfig);
@@ -62,9 +63,9 @@ public abstract class PluginFilter implements FizzPluginFilter {
if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here
String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, traceId);
log.error(msg);
} else {
log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
log.error(msg, pfr.cause);
}
HttpStatus s = HttpStatus.OK;
if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {

View File

@@ -23,7 +23,6 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
@@ -149,7 +148,9 @@ public class ApiConfig2appsService {
.doOnNext(
msg -> {
String json = msg.getMessage();
log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis());
// log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "ac2as" + System.currentTimeMillis());
log.info("apiConfig2apps: " + json);
try {
ApiConfig2apps data = JacksonUtils.readValue(json, ApiConfig2apps.class);
updateApiConfig2appsMap(data);

View File

@@ -35,7 +35,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.util.*;
@@ -110,7 +109,11 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("init api config: {}", json, LogService.BIZ_ID, k.toString());
// log.info("init api config: {}", json, LogService.BIZ_ID, k.toString());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("init api config: {}", json);
try {
ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class);
apiConfigMapTmp.put(ac.id, ac);
@@ -156,7 +159,9 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis());
// log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "acc" + System.currentTimeMillis());
log.info("api config change: {}", json);
try {
ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class);
ApiConfig r = apiConfigMap.remove(ac.id);
@@ -430,7 +435,8 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
public Mono<Result<ApiConfig>> auth(ServerWebExchange exchange) {
ServerHttpRequest req = exchange.getRequest();
LogService.setBizId(WebUtils.getTraceId(exchange));
// LogService.setBizId(WebUtils.getTraceId(exchange));
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
boolean dedicatedLineRequest = WebUtils.isDedicatedLineRequest(exchange);
return auth(exchange, dedicatedLineRequest,
WebUtils.getAppId(exchange), WebUtils.getOriginIp(exchange), WebUtils.getTimestamp(exchange), WebUtils.getSign(exchange),

View File

@@ -17,6 +17,7 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,7 +25,7 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
@@ -80,7 +81,9 @@ public class AppService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("init app: {}", json, LogService.BIZ_ID, k.toString());
// log.info("init app: {}", json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("init app: {}", json);
try {
App app = JacksonUtils.readValue(json, App.class);
oldAppMapTmp.put(app.id, app);
@@ -125,7 +128,9 @@ public class AppService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis());
// log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "ac" + System.currentTimeMillis());
log.info("app change: " + json);
try {
App app = JacksonUtils.readValue(json, App.class);
App r = oldAppMap.remove(app.id);

View File

@@ -17,13 +17,14 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.PluginFilter;
import we.util.Consts;
import we.util.WebUtils;
import javax.annotation.Resource;
@@ -53,7 +54,9 @@ public class AuthPluginFilter extends PluginFilter {
r -> {
if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange);
log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId);
// log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("{} req auth: {}", traceId, r);
}
Map<String, Object> data = Collections.singletonMap(RESULT, r);
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, AUTH_PLUGIN_FILTER, data);

View File

@@ -17,6 +17,7 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
@@ -25,7 +26,7 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.NetworkUtils;
import we.util.ReactorUtils;
@@ -87,7 +88,9 @@ public class GatewayGroupService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info(json, LogService.BIZ_ID, k.toString());
// log.info(json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info(json);
try {
GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class);
oldGatewayGroupMapTmp.put(gg.id, gg);
@@ -133,7 +136,9 @@ public class GatewayGroupService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis());
// log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "gg" + System.currentTimeMillis());
log.info(json);
try {
GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class);
GatewayGroup r = oldGatewayGroupMap.remove(gg.id);

View File

@@ -17,19 +17,19 @@
package we.plugin.requestbody;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.spring.web.server.ext.FizzServerWebExchangeDecorator;
import we.util.Consts;
import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
@@ -76,7 +76,9 @@ public class RequestBodyPlugin implements FizzPluginFilter {
}
if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange);
log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId);
// log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("{} request is decorated", traceId);
}
return doFilter(newExchange, config);
}

View File

@@ -25,7 +25,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.PluginFilter;
import we.plugin.auth.GatewayGroupService;
import we.util.Consts;
@@ -43,7 +42,7 @@ import java.util.Map;
@Component(StatPluginFilter.STAT_PLUGIN_FILTER)
public class StatPluginFilter extends PluginFilter {
private static final Logger log = LoggerFactory.getLogger(StatPluginFilter.class);
private static final Logger log = LoggerFactory.getLogger("stat");
public static final String STAT_PLUGIN_FILTER = "statPlugin";
@@ -93,7 +92,8 @@ public class StatPluginFilter extends PluginFilter {
if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) {
rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe();
} else {
log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use
// log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use
log.info(b.toString());
}
}

View File

@@ -34,7 +34,6 @@ import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.AggregateResult;
import we.fizz.AggregateService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.CallbackConfig;
@@ -85,7 +84,9 @@ public class CallbackService {
String traceId = WebUtils.getTraceId(exchange);
HttpMethod method = req.getMethod();
if (log.isDebugEnabled()) {
log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId);
// log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap));
}
int rs = cc.receivers.size();
Mono<Object>[] sends = new Mono[rs];
@@ -157,7 +158,9 @@ public class CallbackService {
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);
String traceId = WebUtils.getTraceId(exchange);
WebUtils.request2stringBuilder(traceId, method, r.service + Consts.S.FORWARD_SLASH + r.path, headers, body, b);
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString(), t);
}
private String buildUri(ServerHttpRequest req, ServiceInstance si, String path) {
@@ -199,7 +202,9 @@ public class CallbackService {
StringBuilder b = ThreadContext.getStringBuilder();
String traceId = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(traceId, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> clean(remoteResp)).doOnCancel(() -> clean(remoteResp));
@@ -301,7 +306,9 @@ public class CallbackService {
b.append(req.service).append(Consts.S.FORWARD_SLASH).append(req.path);
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);
WebUtils.request2stringBuilder(req.id, req.method, service + Consts.S.FORWARD_SLASH + path, req.headers, req.body, b);
log.error(b.toString(), LogService.BIZ_ID, req.id, t);
// log.error(b.toString(), LogService.BIZ_ID, req.id, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
log.error(b.toString(), t);
}
private void clean(ClientResponse cr) {

View File

@@ -37,7 +37,6 @@ import we.config.ProxyWebClientConfig;
import we.config.SystemConfig;
import we.exception.ExternalService4xxException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService;
import we.service_registry.RegistryCenterService;
import we.util.Consts;
import we.util.ThreadContext;
@@ -215,7 +214,9 @@ public class FizzWebClient {
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
WebClient.RequestBodyUriSpec requestBodyUriSpec = webClient.method(method);

View File

@@ -16,6 +16,7 @@
*/
package we.proxy;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,19 +25,13 @@ import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -102,7 +97,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
return Flux.just(e);
}
Object v = e.getValue();
LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString());
// LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
LOGGER.info(k.toString() + Consts.S.COLON + v.toString());
String json = (String) v;
try {
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
@@ -194,7 +191,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis());
// LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "rpc" + System.currentTimeMillis());
LOGGER.info(json);
try {
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
this.updateLocalCache(rpcService, serviceToInstancesMap, serviceToLoadBalanceTypeMap, idToRpcServiceMap,

View File

@@ -17,6 +17,7 @@
package we.stats.circuitbreaker;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -25,12 +26,8 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.util.JacksonUtils;
import we.util.ResourceIdUtils;
import we.util.Result;
import we.util.WebUtils;
import we.util.*;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@@ -210,12 +207,16 @@ public class CircuitBreakManager {
}
if (cb == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
LOGGER.debug("no circuit breaker for {} {}", service, path);
}
return true;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb);
}
return cb.permit(exchange, currentTimeWindow, flowStat);
}

View File

@@ -17,6 +17,7 @@
package we.stats.ratelimit;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,10 +25,9 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import we.util.ThreadContext;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@@ -78,7 +78,9 @@ public class ResourceRateLimitConfigService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString());
// log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("rateLimitConfig: " + json);
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
oldResourceRateLimitConfigMapTmp.put(rrlc.id, rrlc);
@@ -122,7 +124,9 @@ public class ResourceRateLimitConfigService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
// log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "rrlc" + System.currentTimeMillis());
log.info("channel recv rate limit config: " + json);
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);

View File

@@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.filter.FilterResult;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.AuthPluginFilter;
import we.proxy.Route;
@@ -527,10 +526,13 @@ public abstract class WebUtils {
// }
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(code).append(Consts.S.SPACE).append(msg);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
log.error(b.toString());
} else {
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
log.error(b.toString(), t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
@@ -846,7 +848,9 @@ public abstract class WebUtils {
request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(httpStatus);
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString());
transmitFailFilterResult(exchange, filter);
return buildDirectResponseAndBindContext(exchange, httpStatus, new HttpHeaders(), Consts.S.EMPTY);
}
@@ -860,7 +864,9 @@ public abstract class WebUtils {
request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(httpStatus);
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString());
transmitFailFilterResult(exchange, filter);
headers = headers == null ? new HttpHeaders() : headers;
content = StringUtils.isBlank(content) ? Consts.S.EMPTY : content;

View File

@@ -0,0 +1,37 @@
package we;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
public class LogKafkaTests {
// @Test
void test() throws InterruptedException, IOException {
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
/*ConfigurationSource source = new ConfigurationSource(new FileInputStream("D:\\idea\\projects\\fizz-gateway-community\\fizz-core\\src\\test\\resources\\log4j2-test.xml"));
Configurator.initialize(null, source);
Logger logger4es = LogManager.getLogger(LogKafkaTests.class);*/
Logger logger4es = LoggerFactory.getLogger(LogKafkaTests.class);
Logger logger4kf = LoggerFactory.getLogger("monitor");
ThreadContext.put("traceId", "ti1");
// MDC.put("traceId", "ti0");
logger4es.warn("520");
logger4kf.warn("521");
Thread.currentThread().join();
}
}

View File

@@ -0,0 +1,38 @@
{
"logTime": {
"$resolver": "timestamp",
"epoch": {
"unit": "millis",
"rounded": true
}
},
"logLevel": {
"$resolver": "level",
"field": "name"
},
"logMsg": {
"$resolver": "message",
"stringified": true
},
"thread": {
"$resolver": "thread",
"field": "name"
},
"loggerName": {
"$resolver": "logger",
"field": "name"
},
"thrown": {
"message": {
"$resolver": "exception",
"field": "message"
},
"extendedStackTrace": {
"$resolver": "exception",
"field": "stackTrace",
"stackTrace": {
"stringified": true
}
}
}
}

View File

@@ -1,18 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<properties>
<property name="APP_NAME">fizz-core</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %X{traceId} %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console" />
</Root>
<Logger name="we" level="DEBUG"/>
</Loggers>
<properties>
<property name="APP_NAME">fizz-core</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"/>
</Console>
<!--<Kafka name="KafkaAppender4biz" topic="log-zt-fizz-core" syncSend="false">
<JsonTemplateLayout eventTemplateUri="classpath:log4j2-kafka.json">
<EventTemplateAdditionalField key="traceId" value="$${ctx:traceId}"/>
</JsonTemplateLayout>
<Property name="bootstrap.servers">1.1.1.1:9092</Property>
</Kafka>
<Kafka name="KafkaAppender4monitor" topic="log-zt-fizz-core-monitor" syncSend="false">
<PatternLayout pattern="%m"/>
<Property name="bootstrap.servers">1.1.1.1:9092</Property>
</Kafka>-->
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4biz"/>-->
</Root>
<Logger name="org.apache.kafka" level="info"/>
<Logger name="monitor" level="warn" additivity="false">
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4monitor"/>-->
</Logger>
</Loggers>
</Configuration>

View File

@@ -0,0 +1,2 @@
log4j2.asyncQueueFullPolicy=Discard
log4j2.discardThreshold=ERROR

View File

@@ -17,6 +17,7 @@
package we.plugin.dedicatedline.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -27,9 +28,9 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.util.Consts;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -77,7 +78,9 @@ public class DedicatedLineApiAuthPluginFilter implements FizzPluginFilter {
return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson);
}
} catch (Exception e) {
log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

View File

@@ -21,6 +21,7 @@ import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.symmetric.SymmetricAlgorithm;
import cn.hutool.crypto.symmetric.SymmetricCrypto;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,14 +30,12 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilterChain;
import we.plugin.requestbody.RequestBodyPlugin;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
@@ -46,7 +45,6 @@ import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -70,8 +68,9 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin {
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config) {
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
try {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
String dedicatedLineId = WebUtils.getDedicatedLineId(exchange);
String cryptoKey = dedicatedLineService.getRequestCryptoKey(dedicatedLineId);
@@ -106,7 +105,8 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin {
});
} catch (Exception e) {
log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

View File

@@ -18,6 +18,7 @@
package we.plugin.dedicatedline.pairing;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -28,9 +29,9 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.util.Consts;
import we.util.DigestUtils;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -59,8 +60,9 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
try {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
String dedicatedLineId = WebUtils.getDedicatedLineId(exchange);
String secretKey = dedicatedLineService.getSignSecretKey(dedicatedLineId);
String ts = WebUtils.getDedicatedLineTimestamp(exchange);
@@ -83,7 +85,8 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter {
return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson);
}
} catch (Exception e) {
log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

18
pom.xml
View File

@@ -10,7 +10,7 @@
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version>
<netty.version>4.1.77.Final</netty.version>
<netty.version>4.1.78.Final</netty.version>
<httpcore.version>4.4.15</httpcore.version>
<log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version>
@@ -22,7 +22,7 @@
<r2dbc-mysql.version>0.8.2</r2dbc-mysql.version>
<reflections.version>0.9.11</reflections.version>
<commons-pool2.version>2.11.1</commons-pool2.version>
<netty-tcnative.version>2.0.52.Final</netty-tcnative.version>
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
</properties>
@@ -69,6 +69,18 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-layout-template-json</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
@@ -156,7 +168,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
<version>1.2.83</version>
</dependency>
<dependency>