From bf91f6429f72132704a0d7a1cbeb5ba838477dea Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Mon, 6 Jun 2022 17:13:58 +0800 Subject: [PATCH] Integrate log4j2 kafka appender --- fizz-bootstrap/pom.xml | 16 +- .../java/we/FizzBootstrapApplication.java | 10 +- .../src/main/resources/log4j2-kafka.json | 38 ++++ .../src/main/resources/log4j2-spring.xml | 60 ++++- .../resources/log4j2.component.properties | 2 + .../src/test/resource/log4j2-test.xml | 7 +- fizz-common/pom.xml | 10 + .../clients/log4j2appender/LogService.java | 15 +- .../clients/log4j2appender/ThreadContext.java | 180 +++++++-------- fizz-common/src/main/java/we/util/Consts.java | 6 +- .../java/we/util/NettyDataBufferUtils.java | 38 ---- .../java/we/config/FlowStatSchedConfig.java | 18 +- .../src/main/java/we/config/LogConfig.java | 208 +++++++++--------- .../we/controller/CallbackController.java | 12 +- .../DedicatedLineController.java | 18 +- .../main/java/we/filter/AggregateFilter.java | 32 +-- .../main/java/we/filter/CallbackFilter.java | 5 +- .../filter/FilterExceptionHandlerConfig.java | 10 +- .../main/java/we/filter/FizzLogFilter.java | 7 +- .../java/we/filter/FlowControlFilter.java | 13 +- .../src/main/java/we/filter/RouteFilter.java | 16 +- .../main/java/we/fizz/AggregateService.java | 11 +- .../src/main/java/we/fizz/ConfigLoader.java | 33 +-- fizz-core/src/main/java/we/fizz/Pipeline.java | 36 ++- .../src/main/java/we/fizz/input/RPCInput.java | 15 +- .../input/extension/dubbo/DubboInput.java | 32 ++- .../fizz/input/extension/grpc/GrpcInput.java | 17 +- .../input/extension/request/RequestInput.java | 44 ++-- .../we/log/LogSendAppenderWithLog4j2.java | 13 +- .../we/log/LogSendAppenderWithLogback.java | 188 ++++++++-------- .../java/we/monitor/FizzMonitorService.java | 8 +- .../src/main/java/we/plugin/PluginFilter.java | 13 +- .../we/plugin/auth/ApiConfig2appsService.java | 5 +- .../java/we/plugin/auth/ApiConfigService.java | 14 +- .../main/java/we/plugin/auth/AppService.java | 11 +- .../java/we/plugin/auth/AuthPluginFilter.java | 7 +- .../we/plugin/auth/GatewayGroupService.java | 11 +- .../plugin/requestbody/RequestBodyPlugin.java | 8 +- .../java/we/plugin/stat/StatPluginFilter.java | 6 +- .../main/java/we/proxy/CallbackService.java | 17 +- .../src/main/java/we/proxy/FizzWebClient.java | 5 +- .../java/we/proxy/RpcInstanceServiceImpl.java | 17 +- .../circuitbreaker/CircuitBreakManager.java | 15 +- .../ResourceRateLimitConfigService.java | 12 +- fizz-core/src/main/java/we/util/WebUtils.java | 16 +- fizz-core/src/test/java/we/LogKafkaTests.java | 37 ++++ .../src/test/resources/log4j2-kafka.json | 38 ++++ fizz-core/src/test/resources/log4j2-test.xml | 43 ++-- .../resources/log4j2.component.properties | 2 + .../DedicatedLineApiAuthPluginFilter.java | 7 +- .../codec/DedicatedLineCodecPluginFilter.java | 10 +- .../DedicatedLinePairingPluginFilter.java | 9 +- pom.xml | 18 +- 53 files changed, 842 insertions(+), 597 deletions(-) create mode 100644 fizz-bootstrap/src/main/resources/log4j2-kafka.json create mode 100644 fizz-bootstrap/src/main/resources/log4j2.component.properties create mode 100644 fizz-core/src/test/java/we/LogKafkaTests.java create mode 100644 fizz-core/src/test/resources/log4j2-kafka.json create mode 100644 fizz-core/src/test/resources/log4j2.component.properties diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index 21ea75c..46f1cae 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -20,7 +20,7 @@ Dragonfruit-SR3 Dysprosium-SR25 5.3.7.RELEASE - 4.1.77.Final + 4.1.78.Final 4.4.15 2.17.2 1.7.36 @@ -34,7 +34,7 @@ 1.15 2.11.1 2.8.9 - 2.0.52.Final + 2.0.53.Final 2.2.9.RELEASE 1.30 @@ -80,6 +80,18 @@ netty-tcnative-classes ${netty-tcnative.version} + + + org.apache.logging.log4j + log4j-layout-template-json + ${log4j2.version} + + + + org.apache.kafka + kafka-clients + 2.0.1 + diff --git a/fizz-bootstrap/src/main/java/we/FizzBootstrapApplication.java b/fizz-bootstrap/src/main/java/we/FizzBootstrapApplication.java index e54ab0c..74ed6e7 100644 --- a/fizz-bootstrap/src/main/java/we/FizzBootstrapApplication.java +++ b/fizz-bootstrap/src/main/java/we/FizzBootstrapApplication.java @@ -85,6 +85,7 @@ import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWeb import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import we.config.AggregateRedisConfig; import we.log.LogSendAppender; +import we.util.FileUtils; /** * fizz gateway application boot entrance @@ -186,8 +187,13 @@ public class FizzBootstrapApplication { private static final Logger LOGGER = LoggerFactory.getLogger(FizzBootstrapApplication.class); public static void main(String[] args) { - System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"); - System.setProperty("log4j2.formatMsgNoLookups", "true"); + System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"); + System.setProperty("log4j2.formatMsgNoLookups", "true"); + System.setProperty("log4j2.isThreadContextMapInheritable", "true"); + + String appRootDir = FileUtils.getAppRootDir(); + System.setProperty("APP_ROOT_DIR", appRootDir); + LOGGER.info("app root dir: {}", appRootDir); SpringApplication springApplication = new SpringApplication(FizzBootstrapApplication.class); springApplication.setApplicationContextClass(CustomReactiveWebServerApplicationContext.class); diff --git a/fizz-bootstrap/src/main/resources/log4j2-kafka.json b/fizz-bootstrap/src/main/resources/log4j2-kafka.json new file mode 100644 index 0000000..c540015 --- /dev/null +++ b/fizz-bootstrap/src/main/resources/log4j2-kafka.json @@ -0,0 +1,38 @@ +{ + "logTime": { + "$resolver": "timestamp", + "epoch": { + "unit": "millis", + "rounded": true + } + }, + "logLevel": { + "$resolver": "level", + "field": "name" + }, + "logMsg": { + "$resolver": "message", + "stringified": true + }, + "thread": { + "$resolver": "thread", + "field": "name" + }, + "loggerName": { + "$resolver": "logger", + "field": "name" + }, + "thrown": { + "message": { + "$resolver": "exception", + "field": "message" + }, + "extendedStackTrace": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + } +} \ No newline at end of file diff --git a/fizz-bootstrap/src/main/resources/log4j2-spring.xml b/fizz-bootstrap/src/main/resources/log4j2-spring.xml index b948186..e904fe5 100644 --- a/fizz-bootstrap/src/main/resources/log4j2-spring.xml +++ b/fizz-bootstrap/src/main/resources/log4j2-spring.xml @@ -2,24 +2,76 @@ - ${sys:APP_NAME} + fizz-bootstrap + ${sys:APP_ROOT_DIR}/log + + + + - + + + + - - + + + + + + + + + + + + + + + + + + diff --git a/fizz-bootstrap/src/main/resources/log4j2.component.properties b/fizz-bootstrap/src/main/resources/log4j2.component.properties new file mode 100644 index 0000000..e0c714f --- /dev/null +++ b/fizz-bootstrap/src/main/resources/log4j2.component.properties @@ -0,0 +1,2 @@ +log4j2.asyncQueueFullPolicy=Discard +log4j2.discardThreshold=ERROR \ No newline at end of file diff --git a/fizz-bootstrap/src/test/resource/log4j2-test.xml b/fizz-bootstrap/src/test/resource/log4j2-test.xml index e43c021..14f26f5 100644 --- a/fizz-bootstrap/src/test/resource/log4j2-test.xml +++ b/fizz-bootstrap/src/test/resource/log4j2-test.xml @@ -1,17 +1,14 @@ - - fizz-gateway - - + - + diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml index 91ad0e9..6ef21ec 100644 --- a/fizz-common/pom.xml +++ b/fizz-common/pom.xml @@ -17,6 +17,16 @@ + + org.apache.logging.log4j + log4j-layout-template-json + + + + org.apache.kafka + kafka-clients + + org.openjdk.jol jol-core diff --git a/fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java b/fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java index 8294458..11179b6 100644 --- a/fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java +++ b/fizz-common/src/main/java/we/flume/clients/log4j2appender/LogService.java @@ -17,24 +17,29 @@ package we.flume.clients.log4j2appender; +import org.apache.logging.log4j.ThreadContext; import we.constants.CommonConstants; +import we.util.Consts; public enum LogService { BIZ_ID, HANDLE_STGY, APP; public static void cleanBizId() { - setBizId(null); + // setBizId(null); + ThreadContext.remove(Consts.TRACE_ID); } public static Object getBizId() { - return ThreadContext.get(Constants.BIZ_ID); + // return ThreadContext.get(Constants.BIZ_ID); + return ThreadContext.get(Consts.TRACE_ID); } public static void setBizId(Object bizId) { - ThreadContext.set(Constants.BIZ_ID, bizId); + // ThreadContext.set(Constants.BIZ_ID, bizId); if (bizId != null) { - org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId)); + // org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId)); + ThreadContext.put(Consts.TRACE_ID, String.valueOf(bizId)); } } @@ -45,7 +50,7 @@ public enum LogService { public static String toESaKF(String topic) { return Constants.AND + topic; } - + public static class Constants { static final String BIZ_ID = "bizId"; static final char AND = '&'; diff --git a/fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java b/fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java index f3f1bba..fbceff1 100644 --- a/fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java +++ b/fizz-common/src/main/java/we/flume/clients/log4j2appender/ThreadContext.java @@ -1,90 +1,90 @@ -/* - * Copyright (C) 2020 the original author or authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package we.flume.clients.log4j2appender; - -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Map; - -/** for internal use */ -public abstract class ThreadContext { - - private static ThreadLocal> tl = new ThreadLocal<>(); - private static final int mapCap = 32; - - private static final String sb = "sb"; - private static final int sbCap = 256; - - public static StringBuilder getStringBuilder() { - return getStringBuilder(true); - } - - public static StringBuilder getStringBuilder(boolean clean) { - Map m = getMap(); - StringBuilder b = (StringBuilder) m.get(sb); - if (b == null) { - b = new StringBuilder(sbCap); - m.put(sb, b); - } else { - if (clean) { - b.delete(0, b.length()); - } - } - return b; - } - - public static SimpleDateFormat getSimpleDateFormat(String pattern) { - Map m = getMap(); - SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern); - if (sdf == null) { - sdf = new SimpleDateFormat(pattern); - m.put(pattern, sdf); - } - return sdf; - } - - public static Object get(String key, Class clz) { - Object obj = get(key); - if (obj == null) { - try { - obj = clz.newInstance(); - set(key, obj); - } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - return obj; - } - - private static Map getMap() { - Map m = tl.get(); - if (m == null) { - m = new HashMap<>(mapCap); - tl.set(m); - } - return m; - } - - public static Object get(String key) { - return getMap().get(key); - } - - public static void set(String key, Object obj) { - getMap().put(key, obj); - } -} +///* +// * Copyright (C) 2020 the original author or authors. +// * +// * This program is free software: you can redistribute it and/or modify +// * it under the terms of the GNU General Public License as published by +// * the Free Software Foundation, either version 3 of the License, or +// * any later version. +// * +// * This program is distributed in the hope that it will be useful, +// * but WITHOUT ANY WARRANTY; without even the implied warranty of +// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// * GNU General Public License for more details. +// * +// * You should have received a copy of the GNU General Public License +// * along with this program. If not, see . +// */ +// +//package we.flume.clients.log4j2appender; +// +//import java.text.SimpleDateFormat; +//import java.util.HashMap; +//import java.util.Map; +// +///** for internal use */ +//public abstract class ThreadContext { +// +// private static ThreadLocal> tl = new ThreadLocal<>(); +// private static final int mapCap = 32; +// +// private static final String sb = "sb"; +// private static final int sbCap = 256; +// +// public static StringBuilder getStringBuilder() { +// return getStringBuilder(true); +// } +// +// public static StringBuilder getStringBuilder(boolean clean) { +// Map m = getMap(); +// StringBuilder b = (StringBuilder) m.get(sb); +// if (b == null) { +// b = new StringBuilder(sbCap); +// m.put(sb, b); +// } else { +// if (clean) { +// b.delete(0, b.length()); +// } +// } +// return b; +// } +// +// public static SimpleDateFormat getSimpleDateFormat(String pattern) { +// Map m = getMap(); +// SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern); +// if (sdf == null) { +// sdf = new SimpleDateFormat(pattern); +// m.put(pattern, sdf); +// } +// return sdf; +// } +// +// public static Object get(String key, Class clz) { +// Object obj = get(key); +// if (obj == null) { +// try { +// obj = clz.newInstance(); +// set(key, obj); +// } catch (InstantiationException | IllegalAccessException e) { +// throw new RuntimeException(e); +// } +// } +// return obj; +// } +// +// private static Map getMap() { +// Map m = tl.get(); +// if (m == null) { +// m = new HashMap<>(mapCap); +// tl.set(m); +// } +// return m; +// } +// +// public static Object get(String key) { +// return getMap().get(key); +// } +// +// public static void set(String key, Object obj) { +// getMap().put(key, obj); +// } +//} diff --git a/fizz-common/src/main/java/we/util/Consts.java b/fizz-common/src/main/java/we/util/Consts.java index 014a1b2..7a3dcec 100644 --- a/fizz-common/src/main/java/we/util/Consts.java +++ b/fizz-common/src/main/java/we/util/Consts.java @@ -107,8 +107,8 @@ public final class Consts { public static final int GB = 1024 * MB; } - public static final String HTTP_SERVER = "http_server"; - public static final String HTTP_CLIENT = "http_client"; + public static final String HTTP_SERVER = "httpServer"; + public static final String HTTP_CLIENT = "httpClient"; public static final String MYSQL = "mysql"; public static final String REDIS = "redis"; public static final String CODIS = "codis"; @@ -118,5 +118,5 @@ public final class Consts { public static final String SCHED = "sched"; public static final String R2DBC = "r2dbc"; - public static final String TRACE_ID = "id^"; + public static final String TRACE_ID = "traceId"; } diff --git a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java index 9670c00..9b5a4b6 100644 --- a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java +++ b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java @@ -17,18 +17,11 @@ package we.util; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; -import org.springframework.core.io.buffer.PooledDataBuffer; -import org.springframework.lang.Nullable; -import we.flume.clients.log4j2appender.LogService; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; /** @@ -37,8 +30,6 @@ import java.nio.charset.StandardCharsets; public abstract class NettyDataBufferUtils extends org.springframework.core.io.buffer.DataBufferUtils { - private static final Logger log = LoggerFactory.getLogger(NettyDataBufferUtils.class); - private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); public static final DataBuffer EMPTY_DATA_BUFFER = from(new byte[0]); @@ -54,14 +45,6 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b return (NettyDataBuffer) dataBufferFactory.wrap(bytes); } - /*public static NettyDataBuffer from(ByteBuffer byteBuffer) { - return dataBufferFactory.wrap(byteBuffer); - } - - public static NettyDataBuffer from(ByteBuf byteBuf) { - return dataBufferFactory.wrap(byteBuf); - }*/ - public static byte[] copyBytes(DataBuffer dataBuffer) { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); @@ -71,25 +54,4 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b public static DataBuffer copy2heap(DataBuffer dataBuffer) { return from(copyBytes(dataBuffer)); } - - /*public static boolean release(@Nullable String traceId, @Nullable DataBuffer dataBuffer) { - if (dataBuffer instanceof PooledDataBuffer) { - PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer; - if (pooledDataBuffer.isAllocated()) { - if (pooledDataBuffer instanceof NettyDataBuffer) { - NettyDataBuffer ndb = (NettyDataBuffer) pooledDataBuffer; - ByteBuf nativeBuffer = ndb.getNativeBuffer(); - int refCnt = nativeBuffer.refCnt(); - if (refCnt < 1) { - if (log.isDebugEnabled()) { - log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, traceId); - } - return false; - } - } - return pooledDataBuffer.release(); - } - } - return false; - }*/ } diff --git a/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java b/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java index 80c9df4..ebe3bca 100644 --- a/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java +++ b/fizz-core/src/main/java/we/config/FlowStatSchedConfig.java @@ -22,19 +22,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; -import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; -import we.flume.clients.log4j2appender.LogService; import we.stats.FlowStat; import we.stats.ResourceTimeWindowStat; import we.stats.TimeWindowStat; import we.stats.ratelimit.ResourceRateLimitConfig; import we.stats.ratelimit.ResourceRateLimitConfigService; -import we.util.Consts; -import we.util.DateTimeUtils; -import we.util.NetworkUtils; -import we.util.ResourceIdUtils; -import we.util.ThreadContext; +import we.util.*; import javax.annotation.Resource; import java.math.BigDecimal; @@ -48,7 +42,8 @@ import java.util.List; //@EnableScheduling public class FlowStatSchedConfig extends SchedConfig { - private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class); + private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class); + private static final Logger FLOW_LOGGER = LoggerFactory.getLogger("flow"); private static final String _ip = "\"ip\":"; private static final String _id = "\"id\":"; @@ -239,13 +234,16 @@ public class FlowStatSchedConfig extends SchedConfig { b.append(Consts.S.RIGHT_BRACE); String msg = b.toString(); if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use - log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue())); + // log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue())); + FLOW_LOGGER.info(msg); } else { rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe(); } if (log.isDebugEnabled()) { String wt = 'w' + toDP19(timeWin); - log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt); + // log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, wt); + log.debug("report " + wt + ": " + msg); } } } diff --git a/fizz-core/src/main/java/we/config/LogConfig.java b/fizz-core/src/main/java/we/config/LogConfig.java index eeafcbb..ed11943 100644 --- a/fizz-core/src/main/java/we/config/LogConfig.java +++ b/fizz-core/src/main/java/we/config/LogConfig.java @@ -1,104 +1,104 @@ -package we.config; - -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.PatternLayout; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Appender; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.boot.context.event.ApplicationStartingEvent; -import org.springframework.boot.context.event.SpringApplicationEvent; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.event.EventListener; -import we.log.LogProperties; -import we.log.LogSendAppenderWithLogback; - -@Configuration -public class LogConfig { - - @Bean - @ConfigurationProperties("fizz.logging") - public LogProperties logProperties() { - return new LogProperties(); - } - - @Configuration - @ConditionalOnClass(AbstractAppender.class) - @AutoConfigureAfter(AggregateRedisConfig.class) - public static class CustomLog4j2Config { - } - - @Configuration - @ConditionalOnClass(LoggerContext.class) - @AutoConfigureAfter(AggregateRedisConfig.class) - public static class CustomLogbackConfig { - @Bean - public Object initLogSendAppenderWithLogback(LogProperties logProperties) { - return new LoggingConfigurationApplicationListener(logProperties); - } - } - - public static class LoggingConfigurationApplicationListener { - private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class); - private static final String APPENDER_NAME = "fizzLogSendAppender"; - private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"; - private LogProperties logProperties; - - public LoggingConfigurationApplicationListener() { - } - - public LoggingConfigurationApplicationListener(LogProperties logProperties) { - this.logProperties = logProperties; - } - - @EventListener - public void contextRefreshed(SpringApplicationEvent event) { - onApplicationEvent(event); - } - - @EventListener - public void applicationStarting(ApplicationStartingEvent event) { - onApplicationEvent(event); - } - - @EventListener - public void applicationReady(ApplicationReadyEvent event) { - onApplicationEvent(event); - } - - public void onApplicationEvent(ApplicationEvent event) { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); - final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME); - String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout(); - - final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback(); - newAppender.setName(APPENDER_NAME); - newAppender.setContext(context); - PatternLayout layout = new PatternLayout(); - layout.setPattern(layoutConfig); - newAppender.setLayout(layout); - - Appender appender = root.getAppender(APPENDER_NAME); - if (appender == null) { - newAppender.start(); - root.addAppender(newAppender); - logger.info("Added fizz log send appender:{}", APPENDER_NAME); - } else { - newAppender.start(); - root.detachAppender(APPENDER_NAME); - root.addAppender(newAppender); - logger.info("Refresh fizz log send appender:{}", APPENDER_NAME); - } - } - } - - -} +//package we.config; +// +//import ch.qos.logback.classic.LoggerContext; +//import ch.qos.logback.classic.PatternLayout; +//import ch.qos.logback.classic.spi.ILoggingEvent; +//import ch.qos.logback.core.Appender; +//import org.apache.commons.lang3.StringUtils; +//import org.apache.logging.log4j.core.appender.AbstractAppender; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.boot.autoconfigure.AutoConfigureAfter; +//import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +//import org.springframework.boot.context.event.ApplicationReadyEvent; +//import org.springframework.boot.context.event.ApplicationStartingEvent; +//import org.springframework.boot.context.event.SpringApplicationEvent; +//import org.springframework.boot.context.properties.ConfigurationProperties; +//import org.springframework.context.ApplicationEvent; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.context.event.EventListener; +//import we.log.LogProperties; +//import we.log.LogSendAppenderWithLogback; +// +//@Configuration +//public class LogConfig { +// +// @Bean +// @ConfigurationProperties("fizz.logging") +// public LogProperties logProperties() { +// return new LogProperties(); +// } +// +// @Configuration +// @ConditionalOnClass(AbstractAppender.class) +// @AutoConfigureAfter(AggregateRedisConfig.class) +// public static class CustomLog4j2Config { +// } +// +// @Configuration +// @ConditionalOnClass(LoggerContext.class) +// @AutoConfigureAfter(AggregateRedisConfig.class) +// public static class CustomLogbackConfig { +// @Bean +// public Object initLogSendAppenderWithLogback(LogProperties logProperties) { +// return new LoggingConfigurationApplicationListener(logProperties); +// } +// } +// +// public static class LoggingConfigurationApplicationListener { +// private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class); +// private static final String APPENDER_NAME = "fizzLogSendAppender"; +// private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"; +// private LogProperties logProperties; +// +// public LoggingConfigurationApplicationListener() { +// } +// +// public LoggingConfigurationApplicationListener(LogProperties logProperties) { +// this.logProperties = logProperties; +// } +// +// @EventListener +// public void contextRefreshed(SpringApplicationEvent event) { +// onApplicationEvent(event); +// } +// +// @EventListener +// public void applicationStarting(ApplicationStartingEvent event) { +// onApplicationEvent(event); +// } +// +// @EventListener +// public void applicationReady(ApplicationReadyEvent event) { +// onApplicationEvent(event); +// } +// +// public void onApplicationEvent(ApplicationEvent event) { +// LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); +// final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME); +// String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout(); +// +// final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback(); +// newAppender.setName(APPENDER_NAME); +// newAppender.setContext(context); +// PatternLayout layout = new PatternLayout(); +// layout.setPattern(layoutConfig); +// newAppender.setLayout(layout); +// +// Appender appender = root.getAppender(APPENDER_NAME); +// if (appender == null) { +// newAppender.start(); +// root.addAppender(newAppender); +// logger.info("Added fizz log send appender:{}", APPENDER_NAME); +// } else { +// newAppender.start(); +// root.detachAppender(APPENDER_NAME); +// root.addAppender(newAppender); +// logger.info("Refresh fizz log send appender:{}", APPENDER_NAME); +// } +// } +// } +// +// +//} diff --git a/fizz-core/src/main/java/we/controller/CallbackController.java b/fizz-core/src/main/java/we/controller/CallbackController.java index 5c57a87..12c1f05 100644 --- a/fizz-core/src/main/java/we/controller/CallbackController.java +++ b/fizz-core/src/main/java/we/controller/CallbackController.java @@ -27,7 +27,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; import we.proxy.CallbackReplayReq; import we.proxy.CallbackService; import we.util.Consts; @@ -54,7 +53,9 @@ public class CallbackController { public Mono callback(ServerWebExchange exchange, @RequestBody CallbackReplayReq req) { if (log.isDebugEnabled()) { - log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id); + // log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id); + log.debug(JacksonUtils.writeValueAsString(req)); } return @@ -69,13 +70,16 @@ public class CallbackController { StringBuilder b = ThreadContext.getStringBuilder(); b.append(req.id).append(' ').append(req.service).append(' ').append(req.path).append(' '); ServerHttpResponse resp = exchange.getResponse(); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id); if (r.code == Result.SUCC) { - log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id); + // log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id); + log.info(b.append("replay success").toString()); resp.setStatusCode(HttpStatus.OK); return Consts.S.EMPTY; } else { b.append("replay error:\n").append(r); - log.error(b.toString(), LogService.BIZ_ID, req.id); + // log.error(b.toString(), LogService.BIZ_ID, req.id); + log.error(b.toString()); resp.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); if (r.msg != null) { return r.msg; diff --git a/fizz-core/src/main/java/we/dedicated_line/DedicatedLineController.java b/fizz-core/src/main/java/we/dedicated_line/DedicatedLineController.java index 8470154..77fff23 100644 --- a/fizz-core/src/main/java/we/dedicated_line/DedicatedLineController.java +++ b/fizz-core/src/main/java/we/dedicated_line/DedicatedLineController.java @@ -33,12 +33,8 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.FizzMangerConfig; import we.config.SystemConfig; -import we.flume.clients.log4j2appender.LogService; import we.proxy.FizzWebClient; -import we.util.DateTimeUtils; -import we.util.Result; -import we.util.ThreadContext; -import we.util.WebUtils; +import we.util.*; import javax.annotation.Resource; import java.time.LocalDateTime; @@ -100,7 +96,9 @@ public class DedicatedLineController { boolean equals = DedicatedLineUtils.checkSign(dedicatedLineId, timestamp, pairCodeSecretKey, sign); if (!equals) { String traceId = WebUtils.getTraceId(exchange); - log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId); + // log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign); return Result.fail("request sign invalid"); } return Result.succ(); @@ -128,7 +126,9 @@ public class DedicatedLineController { if (log.isDebugEnabled()) { StringBuilder sb = ThreadContext.getStringBuilder(); WebUtils.response2stringBuilder(traceId, remoteResp, sb); - log.debug(sb.toString(), LogService.BIZ_ID, traceId); + // log.debug(sb.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(sb.toString()); } return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) ) .doOnError ( throwable -> cleanup(remoteResp) ) @@ -163,7 +163,9 @@ public class DedicatedLineController { if (log.isDebugEnabled()) { StringBuilder sb = ThreadContext.getStringBuilder(); WebUtils.response2stringBuilder(traceId, remoteResp, sb); - log.debug(sb.toString(), LogService.BIZ_ID, traceId); + // log.debug(sb.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(sb.toString()); } return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) ) .doOnError ( throwable -> cleanup(remoteResp) ) diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java index 5e3050c..8bd8b4f 100644 --- a/fizz-core/src/main/java/we/filter/AggregateFilter.java +++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java @@ -17,16 +17,10 @@ package we.filter; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import javax.annotation.Resource; - +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; @@ -41,10 +35,6 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -55,12 +45,20 @@ import we.fizz.AggregateResult; import we.fizz.ConfigLoader; import we.fizz.Pipeline; import we.fizz.input.Input; -import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; +import we.util.Consts; import we.util.MapUtil; import we.util.NettyDataBufferUtils; import we.util.WebUtils; +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + /** * @author Francis Dong */ @@ -140,7 +138,8 @@ public class AggregateFilter implements WebFilter { // traceId final String traceId = WebUtils.getTraceId(exchange); - LogService.setBizId(traceId); + // LogService.setBizId(traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); LOGGER.debug("{} matched api in aggregation: {}", traceId, path); @@ -184,7 +183,8 @@ public class AggregateFilter implements WebFilter { } } return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> { - LogService.setBizId(traceId); + // LogService.setBizId(traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); if (aggResult.getHttpStatus() != null) { serverHttpResponse.setRawStatusCode(aggResult.getHttpStatus()); } diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java index 8ee27da..d6adebb 100644 --- a/fizz-core/src/main/java/we/filter/CallbackFilter.java +++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java @@ -35,7 +35,6 @@ import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; import we.plugin.auth.CallbackConfig; import we.plugin.auth.GatewayGroupService; @@ -63,6 +62,7 @@ import java.util.List; public class CallbackFilter extends FizzWebFilter { private static final Logger log = LoggerFactory.getLogger(CallbackFilter.class); + private static final Logger CALLBACK_LOGGER = LoggerFactory.getLogger("callback"); public static final String CALLBACK_FILTER = "callbackFilter"; @@ -226,7 +226,8 @@ public class CallbackFilter extends FizzWebFilter { b.append(Consts.S.RIGHT_BRACE); String msg = b.toString(); if ("kafka".equals(callbackFilterProperties.getDest())) { // for internal use - log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue())); + // log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue())); + CALLBACK_LOGGER.info(msg); } else { rt.convertAndSend(callbackFilterProperties.getQueue(), msg).subscribe(); } diff --git a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java index 4cbc672..98e25c1 100644 --- a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java +++ b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java @@ -37,7 +37,6 @@ import we.exception.ExecuteScriptException; import we.exception.RedirectException; import we.exception.StopAndResponseException; import we.fizz.exception.FizzRuntimeException; -import we.flume.clients.log4j2appender.LogService; import we.legacy.RespEntity; import we.util.Consts; import we.util.JacksonUtils; @@ -45,7 +44,6 @@ import we.util.ThreadContext; import we.util.WebUtils; import java.net.URI; -import java.util.concurrent.TimeoutException; /** * @author hongqiaowei @@ -113,7 +111,9 @@ public class FilterExceptionHandlerConfig { if (t instanceof FizzRuntimeException) { FizzRuntimeException ex = (FizzRuntimeException) t; - log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex); + // log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.error(traceId + ' ' + tMsg, ex); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); RespEntity rs = null; if (ex.getStepContext() != null && ex.getStepContext().returnContext()) { @@ -130,7 +130,9 @@ public class FilterExceptionHandlerConfig { if (fc == null) { // t came from flow control filter StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(exchange, b); - log.error(b.toString(), LogService.BIZ_ID, traceId, t); + // log.error(b.toString(), LogService.BIZ_ID, traceId, t); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.error(b.toString(), t); String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId); respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes()))); diff --git a/fizz-core/src/main/java/we/filter/FizzLogFilter.java b/fizz-core/src/main/java/we/filter/FizzLogFilter.java index c020cb1..c128da9 100644 --- a/fizz-core/src/main/java/we/filter/FizzLogFilter.java +++ b/fizz-core/src/main/java/we/filter/FizzLogFilter.java @@ -24,9 +24,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; - import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.ThreadContext; import we.util.WebUtils; @@ -55,7 +54,9 @@ public class FizzLogFilter implements WebFilter { WebUtils.request2stringBuilder(exchange, b); b.append(resp).append(exchange.getResponse().getStatusCode()) .append(in) .append(System.currentTimeMillis() - start); - log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + // log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); + log.info(b.toString()); } } ); diff --git a/fizz-core/src/main/java/we/filter/FlowControlFilter.java b/fizz-core/src/main/java/we/filter/FlowControlFilter.java index 637abdf..fc741d0 100644 --- a/fizz-core/src/main/java/we/filter/FlowControlFilter.java +++ b/fizz-core/src/main/java/we/filter/FlowControlFilter.java @@ -32,7 +32,6 @@ import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import we.config.SystemConfig; -import we.flume.clients.log4j2appender.LogService; import we.monitor.FizzMonitorService; import we.plugin.auth.ApiConfigService; import we.plugin.auth.AppService; @@ -134,7 +133,8 @@ public class FlowControlFilter extends FizzWebFilter { if (flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) { String traceId = WebUtils.getTraceId(exchange); - LogService.setBizId(traceId); + // LogService.setBizId(traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); if (!apiConfigService.serviceConfigMap.containsKey(service)) { String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId); return WebUtils.responseJson(exchange, HttpStatus.FORBIDDEN, null, json); @@ -155,7 +155,8 @@ public class FlowControlFilter extends FizzWebFilter { String blockedResourceId = result.getBlockedResourceId(); if (BlockType.CIRCUIT_BREAK == result.getBlockType()) { fizzMonitorService.sendAlarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null, currentTimeMillis); - log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + // log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId); String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType(); String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent(); @@ -186,10 +187,12 @@ public class FlowControlFilter extends FizzWebFilter { } else { if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) { fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents, currentTimeMillis); - log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + // log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId); } else { fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps, currentTimeMillis); - log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + // log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId); + log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId); } ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE); diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java index 4e564bf..747e15b 100644 --- a/fizz-core/src/main/java/we/filter/RouteFilter.java +++ b/fizz-core/src/main/java/we/filter/RouteFilter.java @@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import we.config.SystemConfig; -import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; import we.proxy.FizzWebClient; import we.proxy.Route; @@ -78,11 +77,14 @@ public class RouteFilter extends FizzWebFilter { if (resp == null) { // should not reach here ServerHttpRequest clientReq = exchange.getRequest(); String traceId = WebUtils.getTraceId(exchange); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); String msg = traceId + ' ' + pfr.id + " fail"; if (pfr.cause == null) { - log.error(msg, LogService.BIZ_ID, traceId); + // log.error(msg, LogService.BIZ_ID, traceId); + log.error(msg); } else { - log.error(msg, LogService.BIZ_ID, traceId, pfr.cause); + // log.error(msg, LogService.BIZ_ID, traceId, pfr.cause); + log.error(msg, pfr.cause); } HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR; if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { @@ -202,7 +204,9 @@ public class RouteFilter extends FizzWebFilter { StringBuilder b = ThreadContext.getStringBuilder(); String traceId = WebUtils.getTraceId(exchange); WebUtils.response2stringBuilder(traceId, remoteResp, b); - log.debug(b.toString(), LogService.BIZ_ID, traceId); + // log.debug(b.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(b.toString()); } return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers())) .doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp)); @@ -265,7 +269,9 @@ public class RouteFilter extends FizzWebFilter { if (ls[0] != null) { b.append('\n').append(ls[0]); } - log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t); + // log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); + log.error(b.toString(), t); } ) ; diff --git a/fizz-core/src/main/java/we/fizz/AggregateService.java b/fizz-core/src/main/java/we/fizz/AggregateService.java index 4d916d2..626cf55 100644 --- a/fizz-core/src/main/java/we/fizz/AggregateService.java +++ b/fizz-core/src/main/java/we/fizz/AggregateService.java @@ -18,6 +18,7 @@ package we.fizz; import com.alibaba.fastjson.JSON; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.buffer.DataBuffer; @@ -30,9 +31,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import we.config.SystemConfig; -import we.constants.CommonConstants; import we.fizz.input.Input; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.MapUtil; import we.util.Utils; import we.util.WebUtils; @@ -72,8 +72,8 @@ public class AggregateService { Pipeline pipeline = aggregateResource.getPipeline(); Input input = aggregateResource.getInput(); Map hs = MapUtil.toHashMap(headers); - // String traceId = WebUtils.getTraceId(exchange); - LogService.setBizId(traceId); + // LogService.setBizId(traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); log.debug("matched aggregation api: {}", pash); Map clientInput = new HashMap<>(); clientInput.put("path", pash); @@ -102,7 +102,8 @@ public class AggregateService { public Mono genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) { ServerHttpResponse clientResp = exchange.getResponse(); String traceId = WebUtils.getTraceId(exchange); - LogService.setBizId(traceId); + // LogService.setBizId(traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); String js = null; if(ar.getBody() instanceof String) { js = (String) ar.getBody(); diff --git a/fizz-core/src/main/java/we/fizz/ConfigLoader.java b/fizz-core/src/main/java/we/fizz/ConfigLoader.java index e6a5d1f..acb572a 100644 --- a/fizz-core/src/main/java/we/fizz/ConfigLoader.java +++ b/fizz-core/src/main/java/we/fizz/ConfigLoader.java @@ -19,33 +19,29 @@ package we.fizz; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ConfigurableApplicationContext; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import we.config.AppConfigProperties; -import we.fizz.input.*; - import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import we.flume.clients.log4j2appender.LogService; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import we.config.AppConfigProperties; +import we.fizz.input.ClientInputConfig; +import we.fizz.input.Input; +import we.fizz.input.InputFactory; +import we.fizz.input.InputType; import we.util.Consts; import we.util.ReactorUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; - -import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE; -import static we.util.Consts.S.FORWARD_SLASH; -import static we.util.Consts.S.FORWARD_SLASH_STR; - import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -54,6 +50,10 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE; +import static we.util.Consts.S.FORWARD_SLASH; +import static we.util.Consts.S.FORWARD_SLASH_STR; + /** * * @author Francis Dong @@ -245,7 +245,10 @@ public class ConfigLoader { return Flux.just(entry); } String configStr = (String) entry.getValue(); - LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString()); + // LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString()); + + ThreadContext.put(Consts.TRACE_ID, k.toString()); + LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr); try { this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp); diff --git a/fizz-core/src/main/java/we/fizz/Pipeline.java b/fizz-core/src/main/java/we/fizz/Pipeline.java index 2e40e32..9a504ee 100644 --- a/fizz-core/src/main/java/we/fizz/Pipeline.java +++ b/fizz-core/src/main/java/we/fizz/Pipeline.java @@ -17,28 +17,17 @@ package we.fizz; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import javax.script.ScriptException; - +import com.alibaba.fastjson.JSON; +import org.apache.logging.log4j.ThreadContext; +import org.noear.snack.ONode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.util.Pair; import org.springframework.http.HttpHeaders; import org.springframework.http.codec.multipart.FilePart; - -import we.schema.util.I18nUtils; -import org.noear.snack.ONode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; - -import com.alibaba.fastjson.JSON; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.constants.CommonConstants; @@ -52,13 +41,10 @@ import we.fizz.component.StepContextPosition; import we.fizz.exception.FizzRuntimeException; import we.fizz.field.FieldConfig; import we.fizz.field.ValueTypeEnum; -import we.fizz.input.ClientInputConfig; -import we.fizz.input.Input; -import we.fizz.input.InputConfig; -import we.fizz.input.PathMapping; -import we.fizz.input.ScriptHelper; -import we.flume.clients.log4j2appender.LogService; +import we.fizz.input.*; +import we.schema.util.I18nUtils; import we.schema.util.PropertiesSupportUtils; +import we.util.Consts; import we.util.JacksonUtils; import we.util.JsonSchemaUtils; import we.util.MapUtil; @@ -66,6 +52,9 @@ import we.xml.JsonToXml; import we.xml.XmlToJson; import we.xml.XmlToJson.Builder; +import javax.script.ScriptException; +import java.util.*; + /** * * @author linwaiwai @@ -197,7 +186,8 @@ public class Pipeline { AggregateResult aggResult = this.doInputDataMapping(input, null); this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3); if(this.stepContext.isDebug() || LOGGER.isDebugEnabled()) { - LogService.setBizId(this.stepContext.getTraceId()); + // LogService.setBizId(this.stepContext.getTraceId()); + ThreadContext.put(Consts.TRACE_ID, this.stepContext.getTraceId()); String jsonString = JSON.toJSONString(aggResult); if(LOGGER.isDebugEnabled()) { LOGGER.debug("aggResult {}", jsonString); diff --git a/fizz-core/src/main/java/we/fizz/input/RPCInput.java b/fizz-core/src/main/java/we/fizz/input/RPCInput.java index 64708c2..b1384dc 100644 --- a/fizz-core/src/main/java/we/fizz/input/RPCInput.java +++ b/fizz-core/src/main/java/we/fizz/input/RPCInput.java @@ -16,22 +16,22 @@ */ package we.fizz.input; -import java.util.HashMap; -import java.util.Map; - -import javax.script.ScriptException; +import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; - import reactor.core.publisher.Mono; import we.exception.ExecuteScriptException; import we.fizz.StepContext; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.JacksonUtils; +import javax.script.ScriptException; +import java.util.HashMap; +import java.util.Map; + /** * * @author linwaiwai @@ -79,7 +79,8 @@ public class RPCInput extends Input { Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class); return needRun != null ? needRun : Boolean.TRUE; } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e); throw new ExecuteScriptException(e, stepContext, condition); } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java index 181e295..d245a68 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java @@ -17,18 +17,12 @@ package we.fizz.input.extension.dubbo; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; - -import javax.script.ScriptException; - +import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.util.CollectionUtils; - import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import we.FizzAppContext; @@ -37,18 +31,17 @@ import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.exception.FizzRuntimeException; -import we.fizz.input.InputConfig; -import we.fizz.input.InputContext; -import we.fizz.input.InputType; -import we.fizz.input.PathMapping; -import we.fizz.input.RPCInput; -import we.fizz.input.RPCResponse; -import we.fizz.input.ScriptHelper; -import we.flume.clients.log4j2appender.LogService; +import we.fizz.input.*; import we.proxy.dubbo.ApacheDubboGenericService; import we.proxy.dubbo.DubboInterfaceDeclaration; +import we.util.Consts; import we.util.JacksonUtils; +import javax.script.ScriptException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + /** * * @author linwaiwai @@ -159,7 +152,8 @@ public class DubboInput extends RPCInput { body.putAll((Map) reqBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); } @@ -180,7 +174,8 @@ public class DubboInput extends RPCInput { } protected void doOnBodyError(Throwable ex, long elapsedMillis) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("failed to call {}", this.getApiName(), ex); inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis); } @@ -223,7 +218,8 @@ public class DubboInput extends RPCInput { body.putAll((Map) respBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java index 6569268..be45866 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java @@ -18,11 +18,10 @@ package we.fizz.input.extension.grpc; import com.alibaba.fastjson.JSON; - +import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.util.CollectionUtils; - import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import we.FizzAppContext; @@ -32,18 +31,17 @@ import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.exception.FizzRuntimeException; import we.fizz.input.*; -import we.flume.clients.log4j2appender.LogService; import we.proxy.grpc.GrpcGenericService; import we.proxy.grpc.GrpcInstanceService; import we.proxy.grpc.GrpcInterfaceDeclaration; +import we.util.Consts; import we.util.JacksonUtils; +import javax.script.ScriptException; import java.time.Duration; import java.util.HashMap; import java.util.Map; -import javax.script.ScriptException; - /** * * @author linwaiwai @@ -153,7 +151,8 @@ public class GrpcInput extends RPCInput implements IInput { body.putAll((Map) reqBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); } @@ -174,7 +173,8 @@ public class GrpcInput extends RPCInput implements IInput { } protected void doOnBodyError(Throwable ex, long elapsedMillis) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("failed to call {}", this.getApiName(), ex); inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis); } @@ -218,7 +218,8 @@ public class GrpcInput extends RPCInput implements IInput { body.putAll((Map) respBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java index efc50dc..099fb7f 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -17,16 +17,9 @@ package we.fizz.input.extension.request; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.script.ScriptException; - +import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,24 +31,13 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; - -import com.alibaba.fastjson.JSON; - import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.StepResponse; -import we.fizz.input.IInput; -import we.fizz.input.InputConfig; -import we.fizz.input.InputContext; -import we.fizz.input.InputType; -import we.fizz.input.PathMapping; -import we.fizz.input.RPCInput; -import we.fizz.input.RPCResponse; -import we.fizz.input.ScriptHelper; -import we.flume.clients.log4j2appender.LogService; +import we.fizz.input.*; import we.proxy.FizzWebClient; import we.proxy.http.HttpInstanceService; import we.service_registry.RegistryCenterService; @@ -67,6 +49,14 @@ import we.xml.JsonToXml; import we.xml.XmlToJson; import we.xml.XmlToJson.Builder; +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * * @author linwaiwai @@ -189,7 +179,8 @@ public class RequestInput extends RPCInput implements IInput{ body.putAll((Map) reqBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); } @@ -321,7 +312,8 @@ public class RequestInput extends RPCInput implements IInput{ body.putAll((Map) respBody); } } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); throw new ExecuteScriptException(e, stepContext, scriptCfg); } @@ -473,7 +465,8 @@ public class RequestInput extends RPCInput implements IInput{ } protected void doOnBodyError(Throwable ex, long elapsedMillis) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.warn("failed to call {}", request.get("url"), ex); inputContext.getStepContext().addElapsedTime( stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); @@ -533,7 +526,8 @@ public class RequestInput extends RPCInput implements IInput{ protected void doOnBodySuccess(Object resp, long elapsedMillis) { if(inputContext.getStepContext().isDebug()) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); + // LogService.setBizId(inputContext.getStepContext().getTraceId()); + ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"), JSON.toJSONString(this.request.get("headers")), JSON.toJSONString(this.request.get("body")), resp); diff --git a/fizz-core/src/main/java/we/log/LogSendAppenderWithLog4j2.java b/fizz-core/src/main/java/we/log/LogSendAppenderWithLog4j2.java index 20533e2..5a1f4fe 100644 --- a/fizz-core/src/main/java/we/log/LogSendAppenderWithLog4j2.java +++ b/fizz-core/src/main/java/we/log/LogSendAppenderWithLog4j2.java @@ -16,6 +16,7 @@ */ package we.log; +import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.*; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Property; @@ -25,7 +26,7 @@ import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.apache.logging.log4j.core.layout.PatternLayout; import we.FizzAppContext; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.NetworkUtils; import java.io.Serializable; @@ -90,7 +91,7 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender { } private String getBizId(Object[] parameters) { - Object bizId = LogService.getBizId(); + /*Object bizId = LogService.getBizId(); if (parameters != null) { for (int i = parameters.length - 1; i > -1; --i) { Object p = parameters[i]; @@ -105,7 +106,13 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender { if (bizId == null) { return ""; } - return bizId.toString(); + return bizId.toString();*/ + + String traceId = ThreadContext.get(Consts.TRACE_ID); + if (traceId == null) { + return Consts.S.EMPTY; + } + return traceId; } @PluginFactory diff --git a/fizz-core/src/main/java/we/log/LogSendAppenderWithLogback.java b/fizz-core/src/main/java/we/log/LogSendAppenderWithLogback.java index 1520c7d..9dde1b2 100644 --- a/fizz-core/src/main/java/we/log/LogSendAppenderWithLogback.java +++ b/fizz-core/src/main/java/we/log/LogSendAppenderWithLogback.java @@ -1,94 +1,94 @@ -package we.log; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.AppenderBase; -import ch.qos.logback.core.Layout; -import ch.qos.logback.core.LogbackException; -import lombok.Getter; -import lombok.Setter; -import we.FizzAppContext; -import we.flume.clients.log4j2appender.LogService; -import we.util.NetworkUtils; - -import static we.log.LogSendAppender.*; - -/** - * log send appender with logback - * - * @author huahuang - */ -public class LogSendAppenderWithLogback extends AppenderBase { - - //负责将日志事件转换为字符串,需Getter和Setter方法 - @Getter - @Setter - private Layout layout; - - @Override - protected void append(ILoggingEvent event) { - try { - if (logEnabled != null && !logEnabled) { - return; - } - - if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) { - // local cache - logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend( - this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt, - event.getTimeStamp(), this.getLayout().doLayout(event)); - return; - } - - if (logEnabled == null && logSendService == null) { - // no legal logSendService, discard the local cache - logEnabled = Boolean.FALSE; - logSends = null; - return; - } - - if (logEnabled == null) { - logEnabled = Boolean.TRUE; - - LogSend[] logSends; - synchronized (LogSendAppender.class) { - logSends = LogSendAppender.logSends; - LogSendAppender.logSends = null; - } - - // logSendService is ready, send the local cache - if (logSends != null) { - int size = Math.min(logSendIndex.get(), logSends.length); - for (int i = 0; i < size; i++) { - logSendService.send(logSends[i]); - } - } - } - - LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), - event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event)); - logSendService.send(logSend); - } catch (Exception ex) { - throw new LogbackException(event.getFormattedMessage(), ex); - } - } - - private String getBizId(Object[] parameters) { - Object bizId = LogService.getBizId(); - if (parameters != null) { - for (int i = parameters.length - 1; i > -1; --i) { - Object p = parameters[i]; - if (p == LogService.BIZ_ID) { - if (i != parameters.length - 1) { - bizId = parameters[i + 1]; - } - break; - } - } - } - if (bizId == null) { - return ""; - } - return bizId.toString(); - } - -} +//package we.log; +// +//import ch.qos.logback.classic.spi.ILoggingEvent; +//import ch.qos.logback.core.AppenderBase; +//import ch.qos.logback.core.Layout; +//import ch.qos.logback.core.LogbackException; +//import lombok.Getter; +//import lombok.Setter; +//import we.FizzAppContext; +//import we.flume.clients.log4j2appender.LogService; +//import we.util.NetworkUtils; +// +//import static we.log.LogSendAppender.*; +// +///** +// * log send appender with logback +// * +// * @author huahuang +// */ +//public class LogSendAppenderWithLogback extends AppenderBase { +// +// //负责将日志事件转换为字符串,需Getter和Setter方法 +// @Getter +// @Setter +// private Layout layout; +// +// @Override +// protected void append(ILoggingEvent event) { +// try { +// if (logEnabled != null && !logEnabled) { +// return; +// } +// +// if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) { +// // local cache +// logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend( +// this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt, +// event.getTimeStamp(), this.getLayout().doLayout(event)); +// return; +// } +// +// if (logEnabled == null && logSendService == null) { +// // no legal logSendService, discard the local cache +// logEnabled = Boolean.FALSE; +// logSends = null; +// return; +// } +// +// if (logEnabled == null) { +// logEnabled = Boolean.TRUE; +// +// LogSend[] logSends; +// synchronized (LogSendAppender.class) { +// logSends = LogSendAppender.logSends; +// LogSendAppender.logSends = null; +// } +// +// // logSendService is ready, send the local cache +// if (logSends != null) { +// int size = Math.min(logSendIndex.get(), logSends.length); +// for (int i = 0; i < size; i++) { +// logSendService.send(logSends[i]); +// } +// } +// } +// +// LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), +// event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event)); +// logSendService.send(logSend); +// } catch (Exception ex) { +// throw new LogbackException(event.getFormattedMessage(), ex); +// } +// } +// +// private String getBizId(Object[] parameters) { +// Object bizId = LogService.getBizId(); +// if (parameters != null) { +// for (int i = parameters.length - 1; i > -1; --i) { +// Object p = parameters[i]; +// if (p == LogService.BIZ_ID) { +// if (i != parameters.length - 1) { +// bizId = parameters[i + 1]; +// } +// break; +// } +// } +// } +// if (bizId == null) { +// return ""; +// } +// return bizId.toString(); +// } +// +//} diff --git a/fizz-core/src/main/java/we/monitor/FizzMonitorService.java b/fizz-core/src/main/java/we/monitor/FizzMonitorService.java index da4693b..6d50ae7 100644 --- a/fizz-core/src/main/java/we/monitor/FizzMonitorService.java +++ b/fizz-core/src/main/java/we/monitor/FizzMonitorService.java @@ -23,7 +23,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.stereotype.Service; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.util.Consts; import we.util.ThreadContext; @@ -36,7 +35,7 @@ import javax.annotation.Resource; @Service public class FizzMonitorService { - private static final Logger LOGGER = LoggerFactory.getLogger(FizzMonitorService.class); + private static final Logger LOGGER = LoggerFactory.getLogger("monitor"); public static final byte ERROR_ALARM = 1; public static final byte TIMEOUT_ALARM = 2; @@ -76,8 +75,9 @@ public class FizzMonitorService { b.append(_timestamp) .append(timestamp); b.append(Consts.S.RIGHT_BRACE); String msg = b.toString(); - if (Consts.KAFKA.equals(dest)) { // for internal use - LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); + if (Consts.KAFKA.equals(dest)) { + // LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); + LOGGER.info(msg); } else { rt.convertAndSend(queue, msg).subscribe(); } diff --git a/fizz-core/src/main/java/we/plugin/PluginFilter.java b/fizz-core/src/main/java/we/plugin/PluginFilter.java index 932cd4c..039aa35 100644 --- a/fizz-core/src/main/java/we/plugin/PluginFilter.java +++ b/fizz-core/src/main/java/we/plugin/PluginFilter.java @@ -17,16 +17,15 @@ package we.plugin; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.filter.FilterResult; -import we.flume.clients.log4j2appender.LogService; -import we.legacy.RespEntity; +import we.util.Consts; import we.util.WebUtils; import java.util.Map; @@ -53,8 +52,10 @@ public abstract class PluginFilter implements FizzPluginFilter { public Mono filter(ServerWebExchange exchange, Map config, String fixedConfig) { FilterResult pfr = WebUtils.getPrevFilterResult(exchange); String traceId = WebUtils.getTraceId(exchange); + ThreadContext.put(Consts.TRACE_ID, traceId); if (log.isDebugEnabled()) { - log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId); + // log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId); + log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail")); } if (pfr.success) { return doFilter(exchange, config, fixedConfig); @@ -62,9 +63,9 @@ public abstract class PluginFilter implements FizzPluginFilter { if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here String msg = traceId + ' ' + pfr.id + " fail"; if (pfr.cause == null) { - log.error(msg, LogService.BIZ_ID, traceId); + log.error(msg); } else { - log.error(msg, LogService.BIZ_ID, traceId, pfr.cause); + log.error(msg, pfr.cause); } HttpStatus s = HttpStatus.OK; if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) { diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfig2appsService.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfig2appsService.java index 16d4b14..b0eccfc 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfig2appsService.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfig2appsService.java @@ -23,7 +23,6 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.util.Consts; import we.util.JacksonUtils; import we.util.ReactorUtils; @@ -149,7 +148,9 @@ public class ApiConfig2appsService { .doOnNext( msg -> { String json = msg.getMessage(); - log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis()); + // log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis()); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "ac2as" + System.currentTimeMillis()); + log.info("apiConfig2apps: " + json); try { ApiConfig2apps data = JacksonUtils.readValue(json, ApiConfig2apps.class); updateApiConfig2appsMap(data); diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java index 36b524c..8846ea9 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java @@ -35,7 +35,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; import we.config.SystemConfig; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FizzPluginFilter; import we.util.*; @@ -110,7 +109,11 @@ public class ApiConfigService implements ApplicationListener { String json = msg.getMessage(); - log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis()); + // log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis()); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "acc" + System.currentTimeMillis()); + log.info("api config change: {}", json); try { ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class); ApiConfig r = apiConfigMap.remove(ac.id); @@ -430,7 +435,8 @@ public class ApiConfigService implements ApplicationListener> auth(ServerWebExchange exchange) { ServerHttpRequest req = exchange.getRequest(); - LogService.setBizId(WebUtils.getTraceId(exchange)); + // LogService.setBizId(WebUtils.getTraceId(exchange)); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); boolean dedicatedLineRequest = WebUtils.isDedicatedLineRequest(exchange); return auth(exchange, dedicatedLineRequest, WebUtils.getAppId(exchange), WebUtils.getOriginIp(exchange), WebUtils.getTimestamp(exchange), WebUtils.getSign(exchange), diff --git a/fizz-core/src/main/java/we/plugin/auth/AppService.java b/fizz-core/src/main/java/we/plugin/auth/AppService.java index 7a5d332..00026de 100644 --- a/fizz-core/src/main/java/we/plugin/auth/AppService.java +++ b/fizz-core/src/main/java/we/plugin/auth/AppService.java @@ -17,6 +17,7 @@ package we.plugin.auth; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; @@ -24,7 +25,7 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.JacksonUtils; import we.util.ReactorUtils; @@ -80,7 +81,9 @@ public class AppService { return Flux.just(e); } String json = (String) e.getValue(); - log.info("init app: {}", json, LogService.BIZ_ID, k.toString()); + // log.info("init app: {}", json, LogService.BIZ_ID, k.toString()); + ThreadContext.put(Consts.TRACE_ID, k.toString()); + log.info("init app: {}", json); try { App app = JacksonUtils.readValue(json, App.class); oldAppMapTmp.put(app.id, app); @@ -125,7 +128,9 @@ public class AppService { } ).doOnNext(msg -> { String json = msg.getMessage(); - log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis()); + // log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis()); + ThreadContext.put(Consts.TRACE_ID, "ac" + System.currentTimeMillis()); + log.info("app change: " + json); try { App app = JacksonUtils.readValue(json, App.class); App r = oldAppMap.remove(app.id); diff --git a/fizz-core/src/main/java/we/plugin/auth/AuthPluginFilter.java b/fizz-core/src/main/java/we/plugin/auth/AuthPluginFilter.java index 676de2f..86e954e 100644 --- a/fizz-core/src/main/java/we/plugin/auth/AuthPluginFilter.java +++ b/fizz-core/src/main/java/we/plugin/auth/AuthPluginFilter.java @@ -17,13 +17,14 @@ package we.plugin.auth; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; import we.plugin.PluginFilter; +import we.util.Consts; import we.util.WebUtils; import javax.annotation.Resource; @@ -53,7 +54,9 @@ public class AuthPluginFilter extends PluginFilter { r -> { if (log.isDebugEnabled()) { String traceId = WebUtils.getTraceId(exchange); - log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId); + // log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug("{} req auth: {}", traceId, r); } Map data = Collections.singletonMap(RESULT, r); return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, AUTH_PLUGIN_FILTER, data); diff --git a/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java b/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java index e57dadf..f4dca04 100644 --- a/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java +++ b/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java @@ -17,6 +17,7 @@ package we.plugin.auth; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.env.Environment; @@ -25,7 +26,7 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.JacksonUtils; import we.util.NetworkUtils; import we.util.ReactorUtils; @@ -87,7 +88,9 @@ public class GatewayGroupService { return Flux.just(e); } String json = (String) e.getValue(); - log.info(json, LogService.BIZ_ID, k.toString()); + // log.info(json, LogService.BIZ_ID, k.toString()); + ThreadContext.put(Consts.TRACE_ID, k.toString()); + log.info(json); try { GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class); oldGatewayGroupMapTmp.put(gg.id, gg); @@ -133,7 +136,9 @@ public class GatewayGroupService { } ).doOnNext(msg -> { String json = msg.getMessage(); - log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis()); + // log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis()); + ThreadContext.put(Consts.TRACE_ID, "gg" + System.currentTimeMillis()); + log.info(json); try { GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class); GatewayGroup r = oldGatewayGroupMap.remove(gg.id); diff --git a/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java index beee485..3a01e87 100644 --- a/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java +++ b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java @@ -17,19 +17,19 @@ package we.plugin.requestbody; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FizzPluginFilter; import we.plugin.FizzPluginFilterChain; import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; import we.spring.web.server.ext.FizzServerWebExchangeDecorator; +import we.util.Consts; import we.util.NettyDataBufferUtils; import we.util.WebUtils; @@ -76,7 +76,9 @@ public class RequestBodyPlugin implements FizzPluginFilter { } if (log.isDebugEnabled()) { String traceId = WebUtils.getTraceId(exchange); - log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId); + // log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId); + ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug("{} request is decorated", traceId); } return doFilter(newExchange, config); } diff --git a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java index 5c6b409..b021700 100644 --- a/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java +++ b/fizz-core/src/main/java/we/plugin/stat/StatPluginFilter.java @@ -25,7 +25,6 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.plugin.PluginFilter; import we.plugin.auth.GatewayGroupService; import we.util.Consts; @@ -43,7 +42,7 @@ import java.util.Map; @Component(StatPluginFilter.STAT_PLUGIN_FILTER) public class StatPluginFilter extends PluginFilter { - private static final Logger log = LoggerFactory.getLogger(StatPluginFilter.class); + private static final Logger log = LoggerFactory.getLogger("stat"); public static final String STAT_PLUGIN_FILTER = "statPlugin"; @@ -93,7 +92,8 @@ public class StatPluginFilter extends PluginFilter { if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) { rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe(); } else { - log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use + // log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use + log.info(b.toString()); } } diff --git a/fizz-core/src/main/java/we/proxy/CallbackService.java b/fizz-core/src/main/java/we/proxy/CallbackService.java index 4e19072..baf1756 100644 --- a/fizz-core/src/main/java/we/proxy/CallbackService.java +++ b/fizz-core/src/main/java/we/proxy/CallbackService.java @@ -34,7 +34,6 @@ import we.config.SystemConfig; import we.constants.CommonConstants; import we.fizz.AggregateResult; import we.fizz.AggregateService; -import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfigService; import we.plugin.auth.CallbackConfig; @@ -85,7 +84,9 @@ public class CallbackService { String traceId = WebUtils.getTraceId(exchange); HttpMethod method = req.getMethod(); if (log.isDebugEnabled()) { - log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId); + // log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap)); } int rs = cc.receivers.size(); Mono[] sends = new Mono[rs]; @@ -157,7 +158,9 @@ public class CallbackService { b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR); String traceId = WebUtils.getTraceId(exchange); WebUtils.request2stringBuilder(traceId, method, r.service + Consts.S.FORWARD_SLASH + r.path, headers, body, b); - log.error(b.toString(), LogService.BIZ_ID, traceId, t); + // log.error(b.toString(), LogService.BIZ_ID, traceId, t); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.error(b.toString(), t); } private String buildUri(ServerHttpRequest req, ServiceInstance si, String path) { @@ -199,7 +202,9 @@ public class CallbackService { StringBuilder b = ThreadContext.getStringBuilder(); String traceId = WebUtils.getTraceId(exchange); WebUtils.response2stringBuilder(traceId, remoteResp, b); - log.debug(b.toString(), LogService.BIZ_ID, traceId); + // log.debug(b.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(b.toString()); } return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers())) .doOnError(throwable -> clean(remoteResp)).doOnCancel(() -> clean(remoteResp)); @@ -301,7 +306,9 @@ public class CallbackService { b.append(req.service).append(Consts.S.FORWARD_SLASH).append(req.path); b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR); WebUtils.request2stringBuilder(req.id, req.method, service + Consts.S.FORWARD_SLASH + path, req.headers, req.body, b); - log.error(b.toString(), LogService.BIZ_ID, req.id, t); + // log.error(b.toString(), LogService.BIZ_ID, req.id, t); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id); + log.error(b.toString(), t); } private void clean(ClientResponse cr) { diff --git a/fizz-core/src/main/java/we/proxy/FizzWebClient.java b/fizz-core/src/main/java/we/proxy/FizzWebClient.java index c9abc34..e9183ac 100644 --- a/fizz-core/src/main/java/we/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/we/proxy/FizzWebClient.java @@ -37,7 +37,6 @@ import we.config.ProxyWebClientConfig; import we.config.SystemConfig; import we.exception.ExternalService4xxException; import we.fizz.exception.FizzRuntimeException; -import we.flume.clients.log4j2appender.LogService; import we.service_registry.RegistryCenterService; import we.util.Consts; import we.util.ThreadContext; @@ -215,7 +214,9 @@ public class FizzWebClient { if (log.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b); - log.debug(b.toString(), LogService.BIZ_ID, traceId); + // log.debug(b.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.debug(b.toString()); } WebClient.RequestBodyUriSpec requestBodyUriSpec = webClient.method(method); diff --git a/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java b/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java index 2c921b2..c048789 100644 --- a/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java +++ b/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java @@ -16,6 +16,7 @@ */ package we.proxy; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; @@ -24,19 +25,13 @@ import org.springframework.util.CollectionUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.util.Consts; import we.util.JacksonUtils; import we.util.ReactorUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.util.AbstractMap; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -102,7 +97,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { return Flux.just(e); } Object v = e.getValue(); - LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString()); + // LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString()); + ThreadContext.put(Consts.TRACE_ID, k.toString()); + LOGGER.info(k.toString() + Consts.S.COLON + v.toString()); String json = (String) v; try { RpcService rpcService = JacksonUtils.readValue(json, RpcService.class); @@ -194,7 +191,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { } ).doOnNext(msg -> { String json = msg.getMessage(); - LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis()); + // LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis()); + ThreadContext.put(Consts.TRACE_ID, "rpc" + System.currentTimeMillis()); + LOGGER.info(json); try { RpcService rpcService = JacksonUtils.readValue(json, RpcService.class); this.updateLocalCache(rpcService, serviceToInstancesMap, serviceToLoadBalanceTypeMap, idToRpcServiceMap, diff --git a/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java b/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java index 9a76112..0ed2501 100644 --- a/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java +++ b/fizz-core/src/main/java/we/stats/circuitbreaker/CircuitBreakManager.java @@ -17,6 +17,7 @@ package we.stats.circuitbreaker; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; @@ -25,12 +26,8 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; import we.stats.FlowStat; -import we.util.JacksonUtils; -import we.util.ResourceIdUtils; -import we.util.Result; -import we.util.WebUtils; +import we.util.*; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -210,12 +207,16 @@ public class CircuitBreakManager { } if (cb == null) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + // LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); + LOGGER.debug("no circuit breaker for {} {}", service, path); } return true; } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + // LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange)); + ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange)); + LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb); } return cb.permit(exchange, currentTimeWindow, flowStat); } diff --git a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java index 8e30ebe..21b47ba 100644 --- a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java +++ b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java @@ -17,6 +17,7 @@ package we.stats.ratelimit; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; @@ -24,10 +25,9 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; -import we.flume.clients.log4j2appender.LogService; +import we.util.Consts; import we.util.JacksonUtils; import we.util.ReactorUtils; -import we.util.ThreadContext; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -78,7 +78,9 @@ public class ResourceRateLimitConfigService { return Flux.just(e); } String json = (String) e.getValue(); - log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString()); + // log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString()); + ThreadContext.put(Consts.TRACE_ID, k.toString()); + log.info("rateLimitConfig: " + json); try { ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class); oldResourceRateLimitConfigMapTmp.put(rrlc.id, rrlc); @@ -122,7 +124,9 @@ public class ResourceRateLimitConfigService { } ).doOnNext(msg -> { String json = msg.getMessage(); - log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis()); + // log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis()); + ThreadContext.put(Consts.TRACE_ID, "rrlc" + System.currentTimeMillis()); + log.info("channel recv rate limit config: " + json); try { ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class); ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id); diff --git a/fizz-core/src/main/java/we/util/WebUtils.java b/fizz-core/src/main/java/we/util/WebUtils.java index d02f8f2..27deb31 100644 --- a/fizz-core/src/main/java/we/util/WebUtils.java +++ b/fizz-core/src/main/java/we/util/WebUtils.java @@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.filter.FilterResult; -import we.flume.clients.log4j2appender.LogService; import we.plugin.auth.ApiConfig; import we.plugin.auth.AuthPluginFilter; import we.proxy.Route; @@ -527,10 +526,13 @@ public abstract class WebUtils { // } b.append(Consts.S.LINE_SEPARATOR); b.append(filter).append(Consts.S.SPACE).append(code).append(Consts.S.SPACE).append(msg); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); if (t == null) { - log.error(b.toString(), LogService.BIZ_ID, traceId); + // log.error(b.toString(), LogService.BIZ_ID, traceId); + log.error(b.toString()); } else { - log.error(b.toString(), LogService.BIZ_ID, traceId, t); + // log.error(b.toString(), LogService.BIZ_ID, traceId, t); + log.error(b.toString(), t); Throwable[] suppressed = t.getSuppressed(); if (suppressed != null && suppressed.length != 0) { log.error(StringUtils.EMPTY, suppressed[0]); @@ -846,7 +848,9 @@ public abstract class WebUtils { request2stringBuilder(exchange, b); b.append(Consts.S.LINE_SEPARATOR); b.append(filter).append(Consts.S.SPACE).append(httpStatus); - log.error(b.toString(), LogService.BIZ_ID, traceId); + // log.error(b.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.error(b.toString()); transmitFailFilterResult(exchange, filter); return buildDirectResponseAndBindContext(exchange, httpStatus, new HttpHeaders(), Consts.S.EMPTY); } @@ -860,7 +864,9 @@ public abstract class WebUtils { request2stringBuilder(exchange, b); b.append(Consts.S.LINE_SEPARATOR); b.append(filter).append(Consts.S.SPACE).append(httpStatus); - log.error(b.toString(), LogService.BIZ_ID, traceId); + // log.error(b.toString(), LogService.BIZ_ID, traceId); + org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId); + log.error(b.toString()); transmitFailFilterResult(exchange, filter); headers = headers == null ? new HttpHeaders() : headers; content = StringUtils.isBlank(content) ? Consts.S.EMPTY : content; diff --git a/fizz-core/src/test/java/we/LogKafkaTests.java b/fizz-core/src/test/java/we/LogKafkaTests.java new file mode 100644 index 0000000..89b9f36 --- /dev/null +++ b/fizz-core/src/test/java/we/LogKafkaTests.java @@ -0,0 +1,37 @@ +package we; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.ThreadContext; +import org.apache.logging.log4j.core.config.ConfigurationSource; +import org.apache.logging.log4j.core.config.Configurator; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class LogKafkaTests { + + // @Test + void test() throws InterruptedException, IOException { + System.setProperty("log4j2.isThreadContextMapInheritable", "true"); + + /*ConfigurationSource source = new ConfigurationSource(new FileInputStream("D:\\idea\\projects\\fizz-gateway-community\\fizz-core\\src\\test\\resources\\log4j2-test.xml")); + Configurator.initialize(null, source); + Logger logger4es = LogManager.getLogger(LogKafkaTests.class);*/ + + Logger logger4es = LoggerFactory.getLogger(LogKafkaTests.class); + Logger logger4kf = LoggerFactory.getLogger("monitor"); + + ThreadContext.put("traceId", "ti1"); + + // MDC.put("traceId", "ti0"); + + logger4es.warn("520"); + logger4kf.warn("521"); + Thread.currentThread().join(); + } +} diff --git a/fizz-core/src/test/resources/log4j2-kafka.json b/fizz-core/src/test/resources/log4j2-kafka.json new file mode 100644 index 0000000..c540015 --- /dev/null +++ b/fizz-core/src/test/resources/log4j2-kafka.json @@ -0,0 +1,38 @@ +{ + "logTime": { + "$resolver": "timestamp", + "epoch": { + "unit": "millis", + "rounded": true + } + }, + "logLevel": { + "$resolver": "level", + "field": "name" + }, + "logMsg": { + "$resolver": "message", + "stringified": true + }, + "thread": { + "$resolver": "thread", + "field": "name" + }, + "loggerName": { + "$resolver": "logger", + "field": "name" + }, + "thrown": { + "message": { + "$resolver": "exception", + "field": "message" + }, + "extendedStackTrace": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + } +} \ No newline at end of file diff --git a/fizz-core/src/test/resources/log4j2-test.xml b/fizz-core/src/test/resources/log4j2-test.xml index bc9be98..a310415 100644 --- a/fizz-core/src/test/resources/log4j2-test.xml +++ b/fizz-core/src/test/resources/log4j2-test.xml @@ -1,18 +1,33 @@ - - fizz-core - - - - - - - - - - - - + + fizz-core + + + + + + + + + + + + + + + + + + diff --git a/fizz-core/src/test/resources/log4j2.component.properties b/fizz-core/src/test/resources/log4j2.component.properties new file mode 100644 index 0000000..e0c714f --- /dev/null +++ b/fizz-core/src/test/resources/log4j2.component.properties @@ -0,0 +1,2 @@ +log4j2.asyncQueueFullPolicy=Discard +log4j2.discardThreshold=ERROR \ No newline at end of file diff --git a/fizz-plugin/src/main/java/we/plugin/dedicatedline/auth/DedicatedLineApiAuthPluginFilter.java b/fizz-plugin/src/main/java/we/plugin/dedicatedline/auth/DedicatedLineApiAuthPluginFilter.java index 68fcc69..b0ee3cf 100644 --- a/fizz-plugin/src/main/java/we/plugin/dedicatedline/auth/DedicatedLineApiAuthPluginFilter.java +++ b/fizz-plugin/src/main/java/we/plugin/dedicatedline/auth/DedicatedLineApiAuthPluginFilter.java @@ -17,6 +17,7 @@ package we.plugin.dedicatedline.auth; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -27,9 +28,9 @@ import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.dedicated_line.DedicatedLineService; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FizzPluginFilter; import we.plugin.FizzPluginFilterChain; +import we.util.Consts; import we.util.ReactorUtils; import we.util.WebUtils; @@ -77,7 +78,9 @@ public class DedicatedLineApiAuthPluginFilter implements FizzPluginFilter { return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson); } } catch (Exception e) { - log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + // log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + ThreadContext.put(Consts.TRACE_ID, traceId); + log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, e); String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId); return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson); diff --git a/fizz-plugin/src/main/java/we/plugin/dedicatedline/codec/DedicatedLineCodecPluginFilter.java b/fizz-plugin/src/main/java/we/plugin/dedicatedline/codec/DedicatedLineCodecPluginFilter.java index 8b66943..79a6d5b 100644 --- a/fizz-plugin/src/main/java/we/plugin/dedicatedline/codec/DedicatedLineCodecPluginFilter.java +++ b/fizz-plugin/src/main/java/we/plugin/dedicatedline/codec/DedicatedLineCodecPluginFilter.java @@ -21,6 +21,7 @@ import cn.hutool.crypto.SecureUtil; import cn.hutool.crypto.symmetric.SymmetricAlgorithm; import cn.hutool.crypto.symmetric.SymmetricCrypto; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.ThreadContext; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,14 +30,12 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.dedicated_line.DedicatedLineService; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FizzPluginFilterChain; import we.plugin.requestbody.RequestBodyPlugin; import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator; @@ -46,7 +45,6 @@ import we.util.NettyDataBufferUtils; import we.util.WebUtils; import javax.annotation.Resource; -import java.nio.charset.StandardCharsets; import java.util.Map; /** @@ -70,8 +68,9 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin { @Override public Mono doFilter(ServerWebExchange exchange, Map config) { String traceId = WebUtils.getTraceId(exchange); + ThreadContext.put(Consts.TRACE_ID, traceId); try { - LogService.setBizId(traceId); + // LogService.setBizId(traceId); String dedicatedLineId = WebUtils.getDedicatedLineId(exchange); String cryptoKey = dedicatedLineService.getRequestCryptoKey(dedicatedLineId); @@ -106,7 +105,8 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin { }); } catch (Exception e) { - log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + // log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, e); String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId); return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson); diff --git a/fizz-plugin/src/main/java/we/plugin/dedicatedline/pairing/DedicatedLinePairingPluginFilter.java b/fizz-plugin/src/main/java/we/plugin/dedicatedline/pairing/DedicatedLinePairingPluginFilter.java index c08fd0d..b8307a4 100644 --- a/fizz-plugin/src/main/java/we/plugin/dedicatedline/pairing/DedicatedLinePairingPluginFilter.java +++ b/fizz-plugin/src/main/java/we/plugin/dedicatedline/pairing/DedicatedLinePairingPluginFilter.java @@ -18,6 +18,7 @@ package we.plugin.dedicatedline.pairing; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -28,9 +29,9 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.dedicated_line.DedicatedLineService; -import we.flume.clients.log4j2appender.LogService; import we.plugin.FizzPluginFilter; import we.plugin.FizzPluginFilterChain; +import we.util.Consts; import we.util.DigestUtils; import we.util.ReactorUtils; import we.util.WebUtils; @@ -59,8 +60,9 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter { @Override public Mono filter(ServerWebExchange exchange, Map config) { String traceId = WebUtils.getTraceId(exchange); + ThreadContext.put(Consts.TRACE_ID, traceId); try { - LogService.setBizId(traceId); + // LogService.setBizId(traceId); String dedicatedLineId = WebUtils.getDedicatedLineId(exchange); String secretKey = dedicatedLineService.getSignSecretKey(dedicatedLineId); String ts = WebUtils.getDedicatedLineTimestamp(exchange); @@ -83,7 +85,8 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter { return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson); } } catch (Exception e) { - log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + // log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e); + log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, e); String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId); return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson); diff --git a/pom.xml b/pom.xml index 3d29380..0e9059b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ Dysprosium-SR25 5.3.7.RELEASE 2.2.7.RELEASE - 4.1.77.Final + 4.1.78.Final 4.4.15 2.17.2 1.7.36 @@ -22,7 +22,7 @@ 0.8.2 0.9.11 2.11.1 - 2.0.52.Final + 2.0.53.Final 2.2.9.RELEASE 1.30 @@ -69,6 +69,18 @@ + + org.apache.logging.log4j + log4j-layout-template-json + ${log4j2.version} + + + + org.apache.kafka + kafka-clients + 2.0.1 + + org.openjdk.jol jol-core @@ -156,7 +168,7 @@ com.alibaba fastjson - 1.2.80 + 1.2.83