Merge pull request #444 from wehotel/develop

This commit is contained in:
hongqiaowei
2022-07-14 18:20:50 +08:00
committed by GitHub
66 changed files with 1388 additions and 700 deletions

View File

@@ -4,47 +4,46 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.13.RELEASE</version>
<relativePath/>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.6.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.fizzgate</groupId>
<artifactId>fizz-bootstrap</artifactId>
<version>2.6.6-beta1</version>
<properties>
<java.version>1.8</java.version>
<!--<java.version>1.8</java.version>
<spring-framework.version>5.2.22.RELEASE</spring-framework.version>
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<netty.version>4.1.77.Final</netty.version>
<netty.version>4.1.78.Final</netty.version>
<httpcore.version>4.4.15</httpcore.version>
<log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<lombok.version>1.18.22</lombok.version>
<lombok.version>1.18.24</lombok.version>
<apache.dubbo.version>2.7.7</apache.dubbo.version>
<grpc.version>1.16.1</grpc.version>
<mockito.version>3.4.6</mockito.version>
<curator.version>4.0.1</curator.version>
<zookeeper.version>3.5.9</zookeeper.version>
<zookeeper.version>3.5.10</zookeeper.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-pool2.version>2.11.1</commons-pool2.version>
<gson.version>2.8.9</gson.version>
<netty-tcnative.version>2.0.52.Final</netty-tcnative.version>
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>-->
</properties>
<dependencies>
<dependency>
<!--<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
</dependency>-->
<dependency>
<groupId>com.fizzgate</groupId>
@@ -57,6 +56,13 @@
<artifactId>fizz-spring-boot-starter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator-i18n-support</artifactId>
<version>1.0.39_5</version>
</dependency>
<!-- import fizz-input-mysql -->
<!-- <dependency>
<groupId>com.fizzgate</groupId>
@@ -64,7 +70,7 @@
<version>${project.version}</version>
</dependency> -->
<dependency>
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>${spring-cloud.version}</version>
@@ -79,7 +85,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-classes</artifactId>
<version>${netty-tcnative.version}</version>
</dependency>
</dependency>-->
</dependencies>
<profiles>
@@ -100,6 +106,13 @@
</profile>-->
</profiles>
<repositories>
<repository>
<id>repo</id>
<url>file://${project.basedir}/../repo</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
@@ -107,6 +120,12 @@
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<includeSystemScope>true</includeSystemScope>
<excludes>
<exclude>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator-i18n-support</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
@@ -132,6 +151,8 @@
<include>application.yml</include>
<include>bootstrap.yml</include>
<include>log4j2-spring.xml</include>
<include>log4j2-kafka.json</include>
<include>log4j2.component.properties</include>
<filtering>true</filtering>
</resource>
</resources>

View File

@@ -85,6 +85,7 @@ import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWeb
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import we.config.AggregateRedisConfig;
import we.log.LogSendAppender;
import we.util.FileUtils;
/**
* fizz gateway application boot entrance
@@ -186,8 +187,13 @@ public class FizzBootstrapApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzBootstrapApplication.class);
public static void main(String[] args) {
System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
System.setProperty("log4j2.formatMsgNoLookups", "true");
System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
System.setProperty("log4j2.formatMsgNoLookups", "true");
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
String appRootDir = FileUtils.getAppRootDir();
System.setProperty("APP_ROOT_DIR", appRootDir);
LOGGER.info("app root dir: {}", appRootDir);
SpringApplication springApplication = new SpringApplication(FizzBootstrapApplication.class);
springApplication.setApplicationContextClass(CustomReactiveWebServerApplicationContext.class);

View File

@@ -51,6 +51,7 @@ spring:
# Must use the same Redis as fizz-manager
aggregate:
redis:
# standalone redis config
# need replace
host: 1.1.1.1 #please input the redis host (default:localhost)
# need replace
@@ -59,6 +60,12 @@ aggregate:
password: 123456 #please input the redis password (default:123456)
# need replace
database: 10 #please input the redis database (default:9)
# redis cluster config
# type: cluster # type can be standalone or cluster, standalone is default
# password: 123456
# clusterNodes: 172.1.1.181:7001,172.1.1.181:7002,172.1.1.181:7003,172.1.1.182:7001,172.1.1.182:7002,172.1.1.182:7003
proxy-webclient:
name: proxy
trust-insecure-SSL: false

View File

@@ -2,24 +2,48 @@
<Configuration status="warn">
<properties>
<property name="APP_NAME">${sys:APP_NAME}</property>
<property name="APP_NAME">fizz-bootstrap</property>
<property name="LOG_DIR">${sys:APP_ROOT_DIR}/log</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<!--<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level [%-29t] %30c{1}.%41M:%4L %m %ex%n"/>-->
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %30c{1}.%41M:%4L %m{nolookups} %ex%n"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %level %logger{2} - %X{traceId} %msg{nolookups}%n"/>
</Console>
<LogSend name="LogSend">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg{nolookups}%n"/>
</LogSend>
<!--<RollingRandomAccessFile name="RollingFile" fileName="${LOG_DIR}/${APP_NAME}.log" filePattern="${LOG_DIR}/$${date:yyyy-MM-dd}/${APP_NAME}-%d{HH}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level [%c{1}.%M:%L] %msg{nolookups}%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="10MB"/>
</Policies>
<DefaultRolloverStrategy max="50"/>
</RollingRandomAccessFile>-->
</Appenders>
<Loggers>
<Root level="warn" includeLocation="true">
<Root level="warn" includeLocation="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="LogSend"/>
<!--<AppenderRef ref="RollingFile"/>-->
</Root>
<!-- suppress the warn 'No URLs will be polled as dynamic configuration sources.' -->
<logger name="com.netflix.config.sources.URLConfigurationSource" level="ERROR" includeLocation="true"/>
<Logger name="we" level="info" includeLocation="true"/>
<logger name="com.netflix.config.sources.URLConfigurationSource" level="ERROR" includeLocation="false"/>
<Logger name="we" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="LogSend" level="warn"/>
</Logger>
<Logger name="monitor" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="stat" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="flow" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="callback" level="info" includeLocation="false" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>

View File

@@ -0,0 +1,2 @@
log4j2.asyncQueueFullPolicy=Discard
log4j2.discardThreshold=ERROR

View File

@@ -1,17 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<properties>
<property name="APP_NAME">fizz-gateway</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %X{traceId} %msg%n" />
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console" />
<AppenderRef ref="Console"/>
</Root>
<Logger name="we" level="DEBUG"/>
</Loggers>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.6.6-beta1</version>
<version>2.6.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -18,31 +18,46 @@
package we.config;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import java.time.Duration;
/**
* @author hongqiaowei
*/
public abstract class RedisReactiveConfig {
protected static final Logger log = LoggerFactory.getLogger(RedisReactiveConfig.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(RedisReactiveConfig.class);
// this should not be changed unless there is a truely good reason to do so
private static final int ps = Runtime.getRuntime().availableProcessors();
public static final ClientResources clientResources = DefaultClientResources.builder()
.ioThreadPoolSize(ps)
.computationThreadPoolSize(ps)
.build();
// this should not be changed unless there is a good reason to do so
private static final int ps = Runtime.getRuntime().availableProcessors();
/**
* @deprecated and renamed to CLIENT_RESOURCES
*/
@Deprecated
public static final ClientResources clientResources = DefaultClientResources.builder()
.ioThreadPoolSize(ps)
.computationThreadPoolSize(ps)
.build();
public static final ClientResources CLIENT_RESOURCES = clientResources;
private RedisReactiveProperties redisReactiveProperties;
@@ -56,23 +71,91 @@ public abstract class RedisReactiveConfig {
public ReactiveRedisConnectionFactory lettuceConnectionFactory() {
log.info("connect to {}", redisReactiveProperties);
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort());
String password = redisReactiveProperties.getPassword();
if (password != null) {
rcs.setPassword(password);
if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort());
String password = redisReactiveProperties.getPassword();
if (password != null) {
redisStandaloneConfiguration.setPassword(password);
}
redisStandaloneConfiguration.setDatabase(redisReactiveProperties.getDatabase());
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
LettucePoolingClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(poolConfig)
.build();
reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfiguration);
} else {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String password = redisReactiveProperties.getPassword();
if (password != null) {
redisClusterConfiguration.setPassword(password);
}
redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes());
int maxRedirects = redisReactiveProperties.getMaxRedirects();
if (maxRedirects > 0) {
redisClusterConfiguration.setMaxRedirects(maxRedirects);
}
ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder();
int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod();
builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod));
boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers();
if (enableAllAdaptiveRefreshTriggers) {
builder = builder.enableAllAdaptiveRefreshTriggers();
}
ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod)))
.topologyRefreshOptions(topologyRefreshOptions)
.publishOnScheduler(true)
.build();
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
int minIdle = redisReactiveProperties.getMinIdle();
if (minIdle > 0) {
poolConfig.setMinIdle(minIdle);
}
int maxIdle = redisReactiveProperties.getMaxIdle();
if (maxIdle > 0) {
poolConfig.setMaxIdle(maxIdle);
}
int maxTotal = redisReactiveProperties.getMaxTotal();
if (maxTotal > 0) {
poolConfig.setMaxTotal(maxTotal);
} else {
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
}
Duration maxWait = redisReactiveProperties.getMaxWait();
if (maxWait != null) {
poolConfig.setMaxWait(maxWait);
}
Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns();
if (timeBetweenEvictionRuns != null) {
poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
}
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
// .commandTimeout(Duration.ofSeconds(60))
.poolConfig(poolConfig)
.readFrom(redisReactiveProperties.getReadFrom())
.clientOptions(clusterClientOptions)
.build();
reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig);
}
rcs.setDatabase(redisReactiveProperties.getDatabase());
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(poolConfig)
.build();
return new LettuceConnectionFactory(rcs, ccs);
LOGGER.info("build reactive redis connection factory for {}", redisReactiveProperties);
return reactiveRedisConnectionFactory;
}
}

View File

@@ -17,20 +17,68 @@
package we.config;
import we.util.Constants;
import io.lettuce.core.ReadFrom;
import org.springframework.data.redis.connection.RedisNode;
import we.util.Consts;
import we.util.StringUtils;
import we.util.Utils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* @author hongqiaowei
*/
public abstract class RedisReactiveProperties {
private String host = "127.0.0.1";
private int port = 6379;
private String password;
private int database = 0;
public static final String STANDALONE = "standalone";
public static final String CLUSTER = "cluster";
private String type = STANDALONE;
private String host = "127.0.0.1";
private int port = 6379;
private String password;
private int database = 0;
private List<RedisNode> clusterNodes;
private int maxRedirects = 0;
private int clusterRefreshPeriod = 60;
private boolean clusterRefreshAdaptive = true;
private boolean enableAllAdaptiveRefreshTriggers = true;
private ReadFrom readFrom = ReadFrom.REPLICA_PREFERRED;
private int minIdle = 0;
private int maxIdle = 0;
private int maxTotal = 0;
private Duration maxWait;
private Duration timeBetweenEvictionRuns;
public String getType() {
return type;
}
public void setType(String type) {
if (type.equals(STANDALONE)) {
this.type = STANDALONE;
} else {
this.type = CLUSTER;
}
}
public String getHost() {
return host;
@@ -64,19 +112,127 @@ public abstract class RedisReactiveProperties {
this.database = database;
}
public List<RedisNode> getClusterNodes() {
return clusterNodes;
}
public void setClusterNodes(String clusterNodes) {
String[] nodeArr = StringUtils.split(clusterNodes, ',');
this.clusterNodes = new ArrayList<>(nodeArr.length);
for (String n : nodeArr) {
String[] ipAndPort = StringUtils.split(n.trim(), ':');
RedisNode redisNode = new RedisNode(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
this.clusterNodes.add(redisNode);
}
}
public int getMaxRedirects() {
return maxRedirects;
}
public void setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
}
public int getClusterRefreshPeriod() {
return clusterRefreshPeriod;
}
public void setClusterRefreshPeriod(int clusterRefreshPeriod) {
this.clusterRefreshPeriod = clusterRefreshPeriod;
}
public boolean isClusterRefreshAdaptive() {
return clusterRefreshAdaptive;
}
public void setClusterRefreshAdaptive(boolean clusterRefreshAdaptive) {
this.clusterRefreshAdaptive = clusterRefreshAdaptive;
}
public boolean isEnableAllAdaptiveRefreshTriggers() {
return enableAllAdaptiveRefreshTriggers;
}
public void setEnableAllAdaptiveRefreshTriggers(boolean enableAllAdaptiveRefreshTriggers) {
this.enableAllAdaptiveRefreshTriggers = enableAllAdaptiveRefreshTriggers;
}
public ReadFrom getReadFrom() {
return readFrom;
}
public void setReadFrom(String readFrom) {
this.readFrom = ReadFrom.valueOf(readFrom);
}
public int getMinIdle() {
return minIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public int getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public Duration getMaxWait() {
return maxWait;
}
public void setMaxWait(long maxWait) {
this.maxWait = Duration.ofMillis(maxWait);
}
public Duration getTimeBetweenEvictionRuns() {
return timeBetweenEvictionRuns;
}
public void setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) {
this.timeBetweenEvictionRuns = Duration.ofMillis(timeBetweenEvictionRuns);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder(48);
StringBuilder b = new StringBuilder(256);
appendTo(b);
return b.toString();
}
public void appendTo(StringBuilder b) {
b.append(Consts.S.LEFT_BRACE);
Utils.addTo(b, "host", Consts.S.EQUAL, host, Consts.S.SPACE_STR);
Utils.addTo(b, "port", Consts.S.EQUAL, port, Consts.S.SPACE_STR);
// Utils.addTo(b, "password", Consts.S.EQUAL, password, Consts.S.SPACE_STR);
Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.EMPTY);
if (type == STANDALONE) {
Utils.addTo(b, "host", Consts.S.EQUAL, host, Consts.S.SPACE_STR);
Utils.addTo(b, "port", Consts.S.EQUAL, port, Consts.S.SPACE_STR);
Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.SPACE_STR);
} else {
Utils.addTo(b, "clusterNodes", Consts.S.EQUAL, clusterNodes, Consts.S.SPACE_STR);
Utils.addTo(b, "maxRedirects", Consts.S.EQUAL, maxRedirects, Consts.S.SPACE_STR);
Utils.addTo(b, "clusterRefreshPeriod", Consts.S.EQUAL, clusterRefreshPeriod, Consts.S.SPACE_STR);
Utils.addTo(b, "clusterRefreshAdaptive", Consts.S.EQUAL, clusterRefreshAdaptive, Consts.S.SPACE_STR);
Utils.addTo(b, "enableAllAdaptiveRefreshTriggers", Consts.S.EQUAL, enableAllAdaptiveRefreshTriggers, Consts.S.SPACE_STR);
Utils.addTo(b, "readFrom", Consts.S.EQUAL, readFrom, Consts.S.SPACE_STR);
}
Utils.addTo(b, "minIdle", Consts.S.EQUAL, minIdle, Consts.S.SPACE_STR);
Utils.addTo(b, "maxIdle", Consts.S.EQUAL, maxIdle, Consts.S.SPACE_STR);
Utils.addTo(b, "maxWait", Consts.S.EQUAL, maxWait, Consts.S.SPACE_STR);
Utils.addTo(b, "maxTotal", Consts.S.EQUAL, maxTotal, Consts.S.SPACE_STR);
Utils.addTo(b, "timeBetweenEvictionRuns", Consts.S.EQUAL, timeBetweenEvictionRuns, Consts.S.EMPTY);
b.append(Consts.S.RIGHT_BRACE);
}
}

View File

@@ -17,24 +17,29 @@
package we.flume.clients.log4j2appender;
import org.apache.logging.log4j.ThreadContext;
import we.constants.CommonConstants;
import we.util.Consts;
public enum LogService {
BIZ_ID, HANDLE_STGY, APP;
public static void cleanBizId() {
setBizId(null);
// setBizId(null);
ThreadContext.remove(Consts.TRACE_ID);
}
public static Object getBizId() {
return ThreadContext.get(Constants.BIZ_ID);
// return ThreadContext.get(Constants.BIZ_ID);
return ThreadContext.get(Consts.TRACE_ID);
}
public static void setBizId(Object bizId) {
ThreadContext.set(Constants.BIZ_ID, bizId);
// ThreadContext.set(Constants.BIZ_ID, bizId);
if (bizId != null) {
org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId));
// org.apache.logging.log4j.ThreadContext.put(CommonConstants.TRACE_ID, String.valueOf(bizId));
ThreadContext.put(Consts.TRACE_ID, String.valueOf(bizId));
}
}
@@ -45,7 +50,7 @@ public enum LogService {
public static String toESaKF(String topic) {
return Constants.AND + topic;
}
public static class Constants {
static final String BIZ_ID = "bizId";
static final char AND = '&';

View File

@@ -1,90 +1,90 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.flume.clients.log4j2appender;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
/** for internal use */
public abstract class ThreadContext {
private static ThreadLocal<Map<String, Object>> tl = new ThreadLocal<>();
private static final int mapCap = 32;
private static final String sb = "sb";
private static final int sbCap = 256;
public static StringBuilder getStringBuilder() {
return getStringBuilder(true);
}
public static StringBuilder getStringBuilder(boolean clean) {
Map<String, Object> m = getMap();
StringBuilder b = (StringBuilder) m.get(sb);
if (b == null) {
b = new StringBuilder(sbCap);
m.put(sb, b);
} else {
if (clean) {
b.delete(0, b.length());
}
}
return b;
}
public static SimpleDateFormat getSimpleDateFormat(String pattern) {
Map<String, Object> m = getMap();
SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern);
if (sdf == null) {
sdf = new SimpleDateFormat(pattern);
m.put(pattern, sdf);
}
return sdf;
}
public static Object get(String key, Class<?> clz) {
Object obj = get(key);
if (obj == null) {
try {
obj = clz.newInstance();
set(key, obj);
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return obj;
}
private static Map<String, Object> getMap() {
Map<String, Object> m = tl.get();
if (m == null) {
m = new HashMap<>(mapCap);
tl.set(m);
}
return m;
}
public static Object get(String key) {
return getMap().get(key);
}
public static void set(String key, Object obj) {
getMap().put(key, obj);
}
}
///*
// * Copyright (C) 2020 the original author or authors.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//
//package we.flume.clients.log4j2appender;
//
//import java.text.SimpleDateFormat;
//import java.util.HashMap;
//import java.util.Map;
//
///** for internal use */
//public abstract class ThreadContext {
//
// private static ThreadLocal<Map<String, Object>> tl = new ThreadLocal<>();
// private static final int mapCap = 32;
//
// private static final String sb = "sb";
// private static final int sbCap = 256;
//
// public static StringBuilder getStringBuilder() {
// return getStringBuilder(true);
// }
//
// public static StringBuilder getStringBuilder(boolean clean) {
// Map<String, Object> m = getMap();
// StringBuilder b = (StringBuilder) m.get(sb);
// if (b == null) {
// b = new StringBuilder(sbCap);
// m.put(sb, b);
// } else {
// if (clean) {
// b.delete(0, b.length());
// }
// }
// return b;
// }
//
// public static SimpleDateFormat getSimpleDateFormat(String pattern) {
// Map<String, Object> m = getMap();
// SimpleDateFormat sdf = (SimpleDateFormat) m.get(pattern);
// if (sdf == null) {
// sdf = new SimpleDateFormat(pattern);
// m.put(pattern, sdf);
// }
// return sdf;
// }
//
// public static Object get(String key, Class<?> clz) {
// Object obj = get(key);
// if (obj == null) {
// try {
// obj = clz.newInstance();
// set(key, obj);
// } catch (InstantiationException | IllegalAccessException e) {
// throw new RuntimeException(e);
// }
// }
// return obj;
// }
//
// private static Map<String, Object> getMap() {
// Map<String, Object> m = tl.get();
// if (m == null) {
// m = new HashMap<>(mapCap);
// tl.set(m);
// }
// return m;
// }
//
// public static Object get(String key) {
// return getMap().get(key);
// }
//
// public static void set(String key, Object obj) {
// getMap().put(key, obj);
// }
//}

View File

@@ -107,8 +107,8 @@ public final class Consts {
public static final int GB = 1024 * MB;
}
public static final String HTTP_SERVER = "http_server";
public static final String HTTP_CLIENT = "http_client";
public static final String HTTP_SERVER = "httpServer";
public static final String HTTP_CLIENT = "httpClient";
public static final String MYSQL = "mysql";
public static final String REDIS = "redis";
public static final String CODIS = "codis";
@@ -118,5 +118,5 @@ public final class Consts {
public static final String SCHED = "sched";
public static final String R2DBC = "r2dbc";
public static final String TRACE_ID = "id^";
public static final String TRACE_ID = "traceId";
}

View File

@@ -17,7 +17,12 @@
package we.util;
import java.time.*;
import we.util.Consts.DP;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
@@ -26,8 +31,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import we.util.Consts.DP;
/**
* @author hongqiaowei
*/
@@ -165,6 +168,27 @@ public abstract class DateTimeUtils {
return localDate1.isEqual(localDate2);
}
public static long get10sTimeWinStart(int n) {
LocalDateTime now = LocalDateTime.now().with(ChronoField.MILLI_OF_SECOND, 0);
int sec = now.getSecond();
long interval;
if (sec > 49) {
interval = sec - 50;
} else if (sec > 39) {
interval = sec - 40;
} else if (sec > 29) {
interval = sec - 30;
} else if (sec > 19) {
interval = sec - 20;
} else if (sec > 9) {
interval = sec - 10;
} else {
interval = sec;
}
long millis = toMillis(now);
return millis - interval * 1000 - (n - 1) * 10L * 1000;
}
/*
void iterateBetweenDatesJava8(LocalDate start, LocalDate end) {
for (LocalDate date = start; date.isBefore(end); date = date.plusDays(1)) {

View File

@@ -17,18 +17,11 @@
package we.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable;
import we.flume.clients.log4j2appender.LogService;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
@@ -37,8 +30,6 @@ import java.nio.charset.StandardCharsets;
public abstract class NettyDataBufferUtils extends org.springframework.core.io.buffer.DataBufferUtils {
private static final Logger log = LoggerFactory.getLogger(NettyDataBufferUtils.class);
private static NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
public static final DataBuffer EMPTY_DATA_BUFFER = from(new byte[0]);
@@ -54,14 +45,6 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
return (NettyDataBuffer) dataBufferFactory.wrap(bytes);
}
/*public static NettyDataBuffer from(ByteBuffer byteBuffer) {
return dataBufferFactory.wrap(byteBuffer);
}
public static NettyDataBuffer from(ByteBuf byteBuf) {
return dataBufferFactory.wrap(byteBuf);
}*/
public static byte[] copyBytes(DataBuffer dataBuffer) {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
@@ -71,25 +54,4 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
public static DataBuffer copy2heap(DataBuffer dataBuffer) {
return from(copyBytes(dataBuffer));
}
/*public static boolean release(@Nullable String traceId, @Nullable DataBuffer dataBuffer) {
if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) {
if (pooledDataBuffer instanceof NettyDataBuffer) {
NettyDataBuffer ndb = (NettyDataBuffer) pooledDataBuffer;
ByteBuf nativeBuffer = ndb.getNativeBuffer();
int refCnt = nativeBuffer.refCnt();
if (refCnt < 1) {
if (log.isDebugEnabled()) {
log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, traceId);
}
return false;
}
}
return pooledDataBuffer.release();
}
}
return false;
}*/
}

View File

@@ -29,7 +29,6 @@ import java.security.SecureRandom;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
/**
@@ -38,7 +37,7 @@ import java.util.Set;
public class NetworkUtils {
private static final Logger log = LoggerFactory.getLogger(NetworkUtils.class);
private static final Logger LOGGER = LoggerFactory.getLogger(NetworkUtils.class);
private static final int maxServerId = 1023;
@@ -50,13 +49,13 @@ public class NetworkUtils {
private static final String SERVER_IP = "SERVER_IP";
private static final String LOCAL_IP = "127.0.0.1";
private static final String LOCAL_IP = "127.0.0.1";
private NetworkUtils() {
}
/**
* @return user settings, or the first non local IP of IP address list.
* @return user settings, or the first non-local IP of IP address list.
*/
public static String getServerIp() {
if (serverIp == null) {
@@ -98,7 +97,7 @@ public class NetworkUtils {
} else {
serverIps.add(ip);
}
log.info("server ip: {}", serverIps);
LOGGER.info("server ip: {}", serverIps);
} catch (SocketException | UnknownHostException e) {
throw new RuntimeException(e);
}
@@ -123,10 +122,10 @@ public class NetworkUtils {
serverId = b.toString().hashCode();
} catch (Exception e) {
serverId = (new SecureRandom().nextInt());
log.error(null, e);
LOGGER.error(null, e);
}
serverId = serverId & maxServerId;
log.info("server id: {}", serverId);
LOGGER.info("server id: {}", serverId);
}
return serverId;
}

View File

@@ -18,13 +18,21 @@
package we.util;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import we.config.RedisReactiveConfig;
import we.config.RedisReactiveProperties;
import java.time.Duration;
/**
* @apiNote just helper, RedisReactiveConfig is best practice
@@ -38,26 +46,114 @@ public abstract class ReactiveRedisHelper {
private ReactiveRedisHelper() {
}
public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) {
RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(host, port);
if (password != null) {
rcs.setPassword(password);
public static ReactiveRedisConnectionFactory getConnectionFactory(RedisReactiveProperties redisReactiveProperties) {
if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) {
return getConnectionFactory(redisReactiveProperties.getHost(), redisReactiveProperties.getPort(), redisReactiveProperties.getPassword(), redisReactiveProperties.getDatabase());
} else {
return getClusterConnectionFactory(redisReactiveProperties);
}
rcs.setDatabase(database);
}
public static ReactiveStringRedisTemplate getStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) {
ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(redisReactiveProperties);
return new ReactiveStringRedisTemplate(connectionFactory);
}
/**
* For standalone redis.
*/
public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) {
RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration(host, port);
if (password != null) {
rsc.setPassword(password);
}
rsc.setDatabase(database);
LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder()
.clientResources(RedisReactiveConfig.clientResources)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(new GenericObjectPoolConfig<>())
.build();
.clientResources(RedisReactiveConfig.CLIENT_RESOURCES)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(new GenericObjectPoolConfig<>())
.build();
LettuceConnectionFactory factory = new LettuceConnectionFactory(rcs, ccs);
LettuceConnectionFactory factory = new LettuceConnectionFactory(rsc, ccs);
factory.afterPropertiesSet();
return factory;
}
/**
* For standalone redis.
*/
public static ReactiveStringRedisTemplate getStringRedisTemplate(String host, int port, String password, int database) {
ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(host, port, password, database);
return new ReactiveStringRedisTemplate(connectionFactory);
}
public static ReactiveRedisConnectionFactory getClusterConnectionFactory(RedisReactiveProperties redisReactiveProperties) {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String password = redisReactiveProperties.getPassword();
if (password != null) {
redisClusterConfiguration.setPassword(password);
}
redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes());
int maxRedirects = redisReactiveProperties.getMaxRedirects();
if (maxRedirects > 0) {
redisClusterConfiguration.setMaxRedirects(maxRedirects);
}
ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder();
int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod();
builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod));
boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers();
if (enableAllAdaptiveRefreshTriggers) {
builder = builder.enableAllAdaptiveRefreshTriggers();
}
ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod)))
.topologyRefreshOptions(topologyRefreshOptions)
.publishOnScheduler(true)
.build();
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
int minIdle = redisReactiveProperties.getMinIdle();
if (minIdle > 0) {
poolConfig.setMinIdle(minIdle);
}
int maxIdle = redisReactiveProperties.getMaxIdle();
if (maxIdle > 0) {
poolConfig.setMaxIdle(maxIdle);
}
int maxTotal = redisReactiveProperties.getMaxTotal();
if (maxTotal > 0) {
poolConfig.setMaxTotal(maxTotal);
} else {
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
}
Duration maxWait = redisReactiveProperties.getMaxWait();
if (maxWait != null) {
poolConfig.setMaxWait(maxWait);
}
Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns();
if (timeBetweenEvictionRuns != null) {
poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
}
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.clientResources(RedisReactiveConfig.CLIENT_RESOURCES)
// .commandTimeout(Duration.ofSeconds(60))
.poolConfig(poolConfig)
.readFrom(redisReactiveProperties.getReadFrom())
.clientOptions(clusterClientOptions)
.build();
LettuceConnectionFactory reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig);
reactiveRedisConnectionFactory.afterPropertiesSet();
return reactiveRedisConnectionFactory;
}
public static ReactiveStringRedisTemplate getClusterStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) {
ReactiveRedisConnectionFactory connectionFactory = getClusterConnectionFactory(redisReactiveProperties);
return new ReactiveStringRedisTemplate(connectionFactory);
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.6.6-beta1</version>
<version>2.6.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -18,7 +18,6 @@
package we.beans.factory.config;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -39,9 +38,9 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.FizzConfigConfiguration;
import we.config.RedisReactiveProperties;
import we.context.config.annotation.FizzRefreshScope;
import we.context.event.FizzRefreshEvent;
import we.global_resource.GlobalResource;
import we.util.*;
import java.util.*;
@@ -58,7 +57,7 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E
private ConfigurableEnvironment environment;
private final Map<String, String> property2beanMap = new HashMap<>(32);
private final Map<String, String> property2beanMap = new HashMap<>();
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
@@ -73,11 +72,25 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E
}
private void initReactiveStringRedisTemplate() {
String host = environment.getProperty("aggregate.redis.host");
String port = environment.getProperty("aggregate.redis.port");
RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() {
};
String host = environment.getProperty("aggregate.redis.host");
if (StringUtils.isBlank(host)) {
redisReactiveProperties.setType(RedisReactiveProperties.CLUSTER);
redisReactiveProperties.setClusterNodes(environment.getProperty("aggregate.redis.clusterNodes"));
} else {
redisReactiveProperties.setHost(host);
redisReactiveProperties.setPort(Integer.parseInt(environment.getProperty("aggregate.redis.port")));
redisReactiveProperties.setDatabase(Integer.parseInt(environment.getProperty("aggregate.redis.database")));
}
String password = environment.getProperty("aggregate.redis.password");
String database = environment.getProperty("aggregate.redis.database");
reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(host, Integer.parseInt(port), password, Integer.parseInt(database));
if (StringUtils.isNotBlank(password)) {
redisReactiveProperties.setPassword(password);
}
reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(redisReactiveProperties);
}
private void initFizzPropertySource() {

View File

@@ -51,13 +51,14 @@ public class AggregateRedisConfig extends RedisReactiveConfig {
public static final String AGGREGATE_REACTIVE_REDIS_TEMPLATE = "aggregateReactiveRedisTemplate";
public static final String AGGREGATE_REACTIVE_REDIS_MESSAGE_LISTENER_CONTAINER = "aggregateReactiveRedisMessageListenerContainer";
private static final String SEND_LOG_TYPE_REDIS = "redis";
public static final String AGGREGATE_REDIS_PREFIX = "aggregate.redis";
public static ProxyLettuceConnectionFactory proxyLettuceConnectionFactory;
@Resource
private AggregateRedisConfigProperties aggregateRedisConfigProperties;
@ConfigurationProperties(prefix = "aggregate.redis")
@ConfigurationProperties(prefix = AGGREGATE_REDIS_PREFIX)
@Configuration(AGGREGATE_REACTIVE_REDIS_PROPERTIES)
public static class AggregateRedisReactiveProperties extends RedisReactiveProperties {
}

View File

@@ -22,19 +22,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Consts;
import we.util.DateTimeUtils;
import we.util.NetworkUtils;
import we.util.ResourceIdUtils;
import we.util.ThreadContext;
import we.util.*;
import javax.annotation.Resource;
import java.math.BigDecimal;
@@ -48,7 +42,8 @@ import java.util.List;
//@EnableScheduling
public class FlowStatSchedConfig extends SchedConfig {
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
private static final Logger log = LoggerFactory.getLogger(FlowStatSchedConfig.class);
private static final Logger FLOW_LOGGER = LoggerFactory.getLogger("flow");
private static final String _ip = "\"ip\":";
private static final String _id = "\"id\":";
@@ -239,13 +234,16 @@ public class FlowStatSchedConfig extends SchedConfig {
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(flowStatSchedConfigProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(flowStatSchedConfigProperties.getQueue()));
FLOW_LOGGER.info(msg);
} else {
rt.convertAndSend(flowStatSchedConfigProperties.getQueue(), msg).subscribe();
}
if (log.isDebugEnabled()) {
String wt = 'w' + toDP19(timeWin);
log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt);
// log.debug("report " + wt + ": " + msg, LogService.BIZ_ID, wt);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, wt);
log.debug("report " + wt + ": " + msg);
}
}
}

View File

@@ -1,104 +1,104 @@
package we.config;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.event.ApplicationStartingEvent;
import org.springframework.boot.context.event.SpringApplicationEvent;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import we.log.LogProperties;
import we.log.LogSendAppenderWithLogback;
@Configuration
public class LogConfig {
@Bean
@ConfigurationProperties("fizz.logging")
public LogProperties logProperties() {
return new LogProperties();
}
@Configuration
@ConditionalOnClass(AbstractAppender.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLog4j2Config {
}
@Configuration
@ConditionalOnClass(LoggerContext.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLogbackConfig {
@Bean
public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
return new LoggingConfigurationApplicationListener(logProperties);
}
}
public static class LoggingConfigurationApplicationListener {
private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
private static final String APPENDER_NAME = "fizzLogSendAppender";
private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
private LogProperties logProperties;
public LoggingConfigurationApplicationListener() {
}
public LoggingConfigurationApplicationListener(LogProperties logProperties) {
this.logProperties = logProperties;
}
@EventListener
public void contextRefreshed(SpringApplicationEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationStarting(ApplicationStartingEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationReady(ApplicationReadyEvent event) {
onApplicationEvent(event);
}
public void onApplicationEvent(ApplicationEvent event) {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
newAppender.setName(APPENDER_NAME);
newAppender.setContext(context);
PatternLayout layout = new PatternLayout();
layout.setPattern(layoutConfig);
newAppender.setLayout(layout);
Appender<ILoggingEvent> appender = root.getAppender(APPENDER_NAME);
if (appender == null) {
newAppender.start();
root.addAppender(newAppender);
logger.info("Added fizz log send appender:{}", APPENDER_NAME);
} else {
newAppender.start();
root.detachAppender(APPENDER_NAME);
root.addAppender(newAppender);
logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
}
}
}
}
//package we.config;
//
//import ch.qos.logback.classic.LoggerContext;
//import ch.qos.logback.classic.PatternLayout;
//import ch.qos.logback.classic.spi.ILoggingEvent;
//import ch.qos.logback.core.Appender;
//import org.apache.commons.lang3.StringUtils;
//import org.apache.logging.log4j.core.appender.AbstractAppender;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.boot.autoconfigure.AutoConfigureAfter;
//import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
//import org.springframework.boot.context.event.ApplicationReadyEvent;
//import org.springframework.boot.context.event.ApplicationStartingEvent;
//import org.springframework.boot.context.event.SpringApplicationEvent;
//import org.springframework.boot.context.properties.ConfigurationProperties;
//import org.springframework.context.ApplicationEvent;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.context.event.EventListener;
//import we.log.LogProperties;
//import we.log.LogSendAppenderWithLogback;
//
//@Configuration
//public class LogConfig {
//
// @Bean
// @ConfigurationProperties("fizz.logging")
// public LogProperties logProperties() {
// return new LogProperties();
// }
//
// @Configuration
// @ConditionalOnClass(AbstractAppender.class)
// @AutoConfigureAfter(AggregateRedisConfig.class)
// public static class CustomLog4j2Config {
// }
//
// @Configuration
// @ConditionalOnClass(LoggerContext.class)
// @AutoConfigureAfter(AggregateRedisConfig.class)
// public static class CustomLogbackConfig {
// @Bean
// public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
// return new LoggingConfigurationApplicationListener(logProperties);
// }
// }
//
// public static class LoggingConfigurationApplicationListener {
// private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
// private static final String APPENDER_NAME = "fizzLogSendAppender";
// private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
// private LogProperties logProperties;
//
// public LoggingConfigurationApplicationListener() {
// }
//
// public LoggingConfigurationApplicationListener(LogProperties logProperties) {
// this.logProperties = logProperties;
// }
//
// @EventListener
// public void contextRefreshed(SpringApplicationEvent event) {
// onApplicationEvent(event);
// }
//
// @EventListener
// public void applicationStarting(ApplicationStartingEvent event) {
// onApplicationEvent(event);
// }
//
// @EventListener
// public void applicationReady(ApplicationReadyEvent event) {
// onApplicationEvent(event);
// }
//
// public void onApplicationEvent(ApplicationEvent event) {
// LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
// final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
// String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
//
// final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
// newAppender.setName(APPENDER_NAME);
// newAppender.setContext(context);
// PatternLayout layout = new PatternLayout();
// layout.setPattern(layoutConfig);
// newAppender.setLayout(layout);
//
// Appender<ILoggingEvent> appender = root.getAppender(APPENDER_NAME);
// if (appender == null) {
// newAppender.start();
// root.addAppender(newAppender);
// logger.info("Added fizz log send appender:{}", APPENDER_NAME);
// } else {
// newAppender.start();
// root.detachAppender(APPENDER_NAME);
// root.addAppender(newAppender);
// logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
// }
// }
// }
//
//
//}

View File

@@ -76,17 +76,24 @@ public class SystemConfig {
private boolean aggregateTestAuth = false;
@Value("${fizz.md5sign-timestamp-timeliness:300}")
private int fizzMD5signTimestampTimeliness = 300; // unit: sec
public int fizzMD5signTimestampTimeliness() {
return fizzMD5signTimestampTimeliness;
}
@Value("${route-timeout:0}")
private long routeTimeout = 0;
@Value("${fizz-trace-id.header:X-Trace-Id}")
private String fizzTraceIdHeader;
private String fizzTraceIdHeader;
@Value("${fizz-trace-id.value-strategy:requestId}")
private String fizzTraceIdValueStrategy;
private String fizzTraceIdValueStrategy;
@Value("${fizz-trace-id.value-prefix:fizz}")
private String fizzTraceIdValuePrefix;
private String fizzTraceIdValuePrefix;
@Value("${fizz.error.response.http-status.enable:true}")
public void setFizzErrRespHttpStatusEnable(boolean fizzErrRespHttpStatusEnable) {

View File

@@ -27,7 +27,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.CallbackReplayReq;
import we.proxy.CallbackService;
import we.util.Consts;
@@ -54,7 +53,9 @@ public class CallbackController {
public Mono<String> callback(ServerWebExchange exchange, @RequestBody CallbackReplayReq req) {
if (log.isDebugEnabled()) {
log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id);
// log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
log.debug(JacksonUtils.writeValueAsString(req));
}
return
@@ -69,13 +70,16 @@ public class CallbackController {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(req.id).append(' ').append(req.service).append(' ').append(req.path).append(' ');
ServerHttpResponse resp = exchange.getResponse();
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
if (r.code == Result.SUCC) {
log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id);
// log.info(b.append("replay success").toString(), LogService.BIZ_ID, req.id);
log.info(b.append("replay success").toString());
resp.setStatusCode(HttpStatus.OK);
return Consts.S.EMPTY;
} else {
b.append("replay error:\n").append(r);
log.error(b.toString(), LogService.BIZ_ID, req.id);
// log.error(b.toString(), LogService.BIZ_ID, req.id);
log.error(b.toString());
resp.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
if (r.msg != null) {
return r.msg;

View File

@@ -33,12 +33,8 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.FizzMangerConfig;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.FizzWebClient;
import we.util.DateTimeUtils;
import we.util.Result;
import we.util.ThreadContext;
import we.util.WebUtils;
import we.util.*;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@@ -100,7 +96,9 @@ public class DedicatedLineController {
boolean equals = DedicatedLineUtils.checkSign(dedicatedLineId, timestamp, pairCodeSecretKey, sign);
if (!equals) {
String traceId = WebUtils.getTraceId(exchange);
log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId);
// log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign, LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.warn("{} request authority: dedicated line id {}, timestamp {}, sign {} invalid", traceId, dedicatedLineId, timestamp, sign);
return Result.fail("request sign invalid");
}
return Result.succ();
@@ -128,7 +126,9 @@ public class DedicatedLineController {
if (log.isDebugEnabled()) {
StringBuilder sb = ThreadContext.getStringBuilder();
WebUtils.response2stringBuilder(traceId, remoteResp, sb);
log.debug(sb.toString(), LogService.BIZ_ID, traceId);
// log.debug(sb.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(sb.toString());
}
return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) )
.doOnError ( throwable -> cleanup(remoteResp) )
@@ -163,7 +163,9 @@ public class DedicatedLineController {
if (log.isDebugEnabled()) {
StringBuilder sb = ThreadContext.getStringBuilder();
WebUtils.response2stringBuilder(traceId, remoteResp, sb);
log.debug(sb.toString(), LogService.BIZ_ID, traceId);
// log.debug(sb.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(sb.toString());
}
return response.writeWith ( remoteResp.body(BodyExtractors.toDataBuffers()) )
.doOnError ( throwable -> cleanup(remoteResp) )

View File

@@ -17,16 +17,10 @@
package we.filter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Resource;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
@@ -41,10 +35,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -55,12 +45,20 @@ import we.fizz.AggregateResult;
import we.fizz.ConfigLoader;
import we.fizz.Pipeline;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.util.Consts;
import we.util.MapUtil;
import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* @author Francis Dong
*/
@@ -88,7 +86,7 @@ public class AggregateFilter implements WebFilter {
} else {
byte act = WebUtils.getApiConfigType(exchange);
if (act == ApiConfig.Type.UNDEFINED) {
String p = exchange.getRequest().getPath().value();
String p = exchange.getRequest().getURI().getPath();
if (StringUtils.startsWith(p, SystemConfig.DEFAULT_GATEWAY_TEST_PREFIX0)) {
if (systemConfig.isAggregateTestAuth()) {
return chain.filter(exchange);
@@ -140,7 +138,8 @@ public class AggregateFilter implements WebFilter {
// traceId
final String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
LOGGER.debug("{} matched api in aggregation: {}", traceId, path);
@@ -184,7 +183,8 @@ public class AggregateFilter implements WebFilter {
}
}
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
if (aggResult.getHttpStatus() != null) {
serverHttpResponse.setRawStatusCode(aggResult.getHttpStatus());
}

View File

@@ -35,7 +35,6 @@ import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.CallbackConfig;
import we.plugin.auth.GatewayGroupService;
@@ -63,6 +62,7 @@ import java.util.List;
public class CallbackFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(CallbackFilter.class);
private static final Logger CALLBACK_LOGGER = LoggerFactory.getLogger("callback");
public static final String CALLBACK_FILTER = "callbackFilter";
@@ -226,7 +226,8 @@ public class CallbackFilter extends FizzWebFilter {
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(callbackFilterProperties.getDest())) { // for internal use
log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue()));
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(callbackFilterProperties.getQueue()));
CALLBACK_LOGGER.info(msg);
} else {
rt.convertAndSend(callbackFilterProperties.getQueue(), msg).subscribe();
}

View File

@@ -37,7 +37,6 @@ import we.exception.ExecuteScriptException;
import we.exception.RedirectException;
import we.exception.StopAndResponseException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.Consts;
import we.util.JacksonUtils;
@@ -45,7 +44,6 @@ import we.util.ThreadContext;
import we.util.WebUtils;
import java.net.URI;
import java.util.concurrent.TimeoutException;
/**
* @author hongqiaowei
@@ -113,7 +111,9 @@ public class FilterExceptionHandlerConfig {
if (t instanceof FizzRuntimeException) {
FizzRuntimeException ex = (FizzRuntimeException) t;
log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex);
// log.error(traceId + ' ' + tMsg, LogService.BIZ_ID, traceId, ex);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(traceId + ' ' + tMsg, ex);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
@@ -130,7 +130,9 @@ public class FilterExceptionHandlerConfig {
if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString(), t);
String s = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(), tMsg, traceId);
respHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));

View File

@@ -24,9 +24,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.ThreadContext;
import we.util.WebUtils;
@@ -55,7 +54,9 @@ public class FizzLogFilter implements WebFilter {
WebUtils.request2stringBuilder(exchange, b);
b.append(resp).append(exchange.getResponse().getStatusCode())
.append(in) .append(System.currentTimeMillis() - start);
log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// log.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.info(b.toString());
}
}
);

View File

@@ -32,7 +32,6 @@ import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.monitor.FizzMonitorService;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AppService;
@@ -115,7 +114,7 @@ public class FlowControlFilter extends FizzWebFilter {
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().value();
String path = request.getURI().getPath();
boolean adminReq = false, proxyTestReq = false, fizzApiReq = false, favReq = false;
if (path.equals(favPath)) {
exchange.getAttributes().put(WebUtils.FAV_REQUEST, Consts.S.EMPTY);
@@ -148,7 +147,8 @@ public class FlowControlFilter extends FizzWebFilter {
if (!favReq && flowControlFilterProperties.isFlowControl() && !adminReq && !proxyTestReq && !fizzApiReq) {
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (!apiConfigService.serviceConfigMap.containsKey(service)) {
String json = WebUtils.jsonRespBody(HttpStatus.FORBIDDEN.value(), "no service " + service + " in flow config", traceId);
return WebUtils.responseJson(exchange, HttpStatus.FORBIDDEN, null, json);
@@ -169,7 +169,8 @@ public class FlowControlFilter extends FizzWebFilter {
String blockedResourceId = result.getBlockedResourceId();
if (BlockType.CIRCUIT_BREAK == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.CIRCUIT_BREAK_ALARM, null, currentTimeMillis);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} trigger {} circuit breaker limit", traceId, blockedResourceId);
String responseContentType = flowControlFilterProperties.getDegradeDefaultResponseContentType();
String responseContent = flowControlFilterProperties.getDegradeDefaultResponseContent();
@@ -200,10 +201,12 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
if (BlockType.CONCURRENT_REQUEST == result.getBlockType()) {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, concurrents, currentTimeMillis);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum concurrent requests", traceId, blockedResourceId);
} else {
fizzMonitorService.sendAlarm(service, path, FizzMonitorService.RATE_LIMIT_ALARM, qps, currentTimeMillis);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
// log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId, LogService.BIZ_ID, traceId);
log.info("{} exceed {} flow limit, blocked by maximum QPS", traceId, blockedResourceId);
}
ResourceRateLimitConfig c = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.NODE_RESOURCE);
@@ -236,7 +239,8 @@ public class FlowControlFilter extends FizzWebFilter {
if (t instanceof TimeoutException) {
statusCode = HttpStatus.GATEWAY_TIMEOUT;
}
if (s == SignalType.ON_ERROR || statusCode.is4xxClientError() || statusCode.is5xxServerError()) {
// if (s == SignalType.ON_ERROR || statusCode.is4xxClientError() || statusCode.is5xxServerError()) {
if (s == SignalType.ON_ERROR || statusCode.is5xxServerError()) {
flowStat.addRequestRT(resourceConfigs, currentTimeSlot, rt, false, statusCode);
if (cb != null) {
cb.transit(CircuitBreaker.State.RESUME_DETECTIVE, CircuitBreaker.State.OPEN, currentTimeSlot, flowStat);

View File

@@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.Route;
@@ -78,11 +77,14 @@ public class RouteFilter extends FizzWebFilter {
if (resp == null) { // should not reach here
ServerHttpRequest clientReq = exchange.getRequest();
String traceId = WebUtils.getTraceId(exchange);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, traceId);
// log.error(msg, LogService.BIZ_ID, traceId);
log.error(msg);
} else {
log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
// log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
log.error(msg, pfr.cause);
}
HttpStatus s = HttpStatus.INTERNAL_SERVER_ERROR;
if (!SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {
@@ -202,7 +204,9 @@ public class RouteFilter extends FizzWebFilter {
StringBuilder b = ThreadContext.getStringBuilder();
String traceId = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(traceId, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp));
@@ -265,7 +269,9 @@ public class RouteFilter extends FizzWebFilter {
if (ls[0] != null) {
b.append('\n').append(ls[0]);
}
log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
// log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
log.error(b.toString(), t);
}
)
;

View File

@@ -18,6 +18,7 @@
package we.fizz;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
@@ -30,9 +31,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.MapUtil;
import we.util.Utils;
import we.util.WebUtils;
@@ -72,8 +72,8 @@ public class AggregateService {
Pipeline pipeline = aggregateResource.getPipeline();
Input input = aggregateResource.getInput();
Map<String, Object> hs = MapUtil.toHashMap(headers);
// String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("matched aggregation api: {}", pash);
Map<String, Object> clientInput = new HashMap<>();
clientInput.put("path", pash);
@@ -102,7 +102,8 @@ public class AggregateService {
public Mono<? extends Void> genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) {
ServerHttpResponse clientResp = exchange.getResponse();
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
String js = null;
if(ar.getBody() instanceof String) {
js = (String) ar.getBody();

View File

@@ -19,33 +19,29 @@ package we.fizz;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AppConfigProperties;
import we.fizz.input.*;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import we.flume.clients.log4j2appender.LogService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AppConfigProperties;
import we.fizz.input.ClientInputConfig;
import we.fizz.input.Input;
import we.fizz.input.InputFactory;
import we.fizz.input.InputType;
import we.util.Consts;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static we.util.Consts.S.FORWARD_SLASH;
import static we.util.Consts.S.FORWARD_SLASH_STR;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -54,6 +50,10 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static we.config.AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE;
import static we.util.Consts.S.FORWARD_SLASH;
import static we.util.Consts.S.FORWARD_SLASH_STR;
/**
*
* @author Francis Dong
@@ -245,7 +245,10 @@ public class ConfigLoader {
return Flux.just(entry);
}
String configStr = (String) entry.getValue();
LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString());
// LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
LOGGER.info("aggregate config: " + k.toString() + Consts.S.COLON + configStr);
try {
this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp);

View File

@@ -17,28 +17,17 @@
package we.fizz;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.script.ScriptException;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.util.Pair;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.multipart.FilePart;
import we.schema.util.I18nUtils;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.constants.CommonConstants;
@@ -52,13 +41,10 @@ import we.fizz.component.StepContextPosition;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.field.FieldConfig;
import we.fizz.field.ValueTypeEnum;
import we.fizz.input.ClientInputConfig;
import we.fizz.input.Input;
import we.fizz.input.InputConfig;
import we.fizz.input.PathMapping;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.schema.util.I18nUtils;
import we.schema.util.PropertiesSupportUtils;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.JsonSchemaUtils;
import we.util.MapUtil;
@@ -66,6 +52,9 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.*;
/**
*
* @author linwaiwai
@@ -197,7 +186,8 @@ public class Pipeline {
AggregateResult aggResult = this.doInputDataMapping(input, null);
this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3);
if(this.stepContext.isDebug() || LOGGER.isDebugEnabled()) {
LogService.setBizId(this.stepContext.getTraceId());
// LogService.setBizId(this.stepContext.getTraceId());
ThreadContext.put(Consts.TRACE_ID, this.stepContext.getTraceId());
String jsonString = JSON.toJSONString(aggResult);
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("aggResult {}", jsonString);

View File

@@ -16,23 +16,23 @@
*/
package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author linwaiwai
@@ -80,7 +80,8 @@ public class RPCInput extends Input {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition);
}

View File

@@ -17,18 +17,12 @@
package we.fizz.input.extension.dubbo;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext;
@@ -37,18 +31,17 @@ import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author linwaiwai
@@ -159,7 +152,8 @@ public class DubboInput extends RPCInput {
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -180,7 +174,8 @@ public class DubboInput extends RPCInput {
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", this.getApiName(), ex);
inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis);
}
@@ -223,7 +218,8 @@ public class DubboInput extends RPCInput {
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}

View File

@@ -18,11 +18,10 @@
package we.fizz.input.extension.grpc;
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext;
@@ -32,18 +31,17 @@ import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.*;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.grpc.GrpcGenericService;
import we.proxy.grpc.GrpcInstanceService;
import we.proxy.grpc.GrpcInterfaceDeclaration;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptException;
/**
*
* @author linwaiwai
@@ -153,7 +151,8 @@ public class GrpcInput extends RPCInput implements IInput {
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -174,7 +173,8 @@ public class GrpcInput extends RPCInput implements IInput {
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", this.getApiName(), ex);
inputContext.getStepContext().addElapsedTime(this.getApiName() + " failed ", elapsedMillis);
}
@@ -218,7 +218,8 @@ public class GrpcInput extends RPCInput implements IInput {
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg),
e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);

View File

@@ -17,16 +17,9 @@
package we.fizz.input.extension.request;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.script.ScriptException;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,24 +31,13 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.IInput;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.proxy.FizzWebClient;
import we.proxy.http.HttpInstanceService;
import we.service_registry.RegistryCenterService;
@@ -67,6 +49,14 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author linwaiwai
@@ -192,7 +182,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -331,7 +322,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -492,7 +484,8 @@ public class RequestInput extends RPCInput implements IInput{
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex);
synchronized (inputContext.getStepContext()) {
inputContext.getStepContext().addElapsedTime(
@@ -554,7 +547,8 @@ public class RequestInput extends RPCInput implements IInput{
protected void doOnBodySuccess(Object resp, long elapsedMillis) {
if(inputContext.getStepContext().isDebug()) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp);

View File

@@ -16,6 +16,7 @@
*/
package we.log;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
@@ -25,7 +26,7 @@ import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.NetworkUtils;
import java.io.Serializable;
@@ -90,7 +91,7 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender {
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
/*Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
@@ -105,7 +106,13 @@ public class LogSendAppenderWithLog4j2 extends AbstractAppender {
if (bizId == null) {
return "";
}
return bizId.toString();
return bizId.toString();*/
String traceId = ThreadContext.get(Consts.TRACE_ID);
if (traceId == null) {
return Consts.S.EMPTY;
}
return traceId;
}
@PluginFactory

View File

@@ -1,94 +1,94 @@
package we.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LogbackException;
import lombok.Getter;
import lombok.Setter;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.NetworkUtils;
import static we.log.LogSendAppender.*;
/**
* log send appender with logback
*
* @author huahuang
*/
public class LogSendAppenderWithLogback extends AppenderBase<ILoggingEvent> {
//负责将日志事件转换为字符串需Getter和Setter方法
@Getter
@Setter
private Layout<ILoggingEvent> layout;
@Override
protected void append(ILoggingEvent event) {
try {
if (logEnabled != null && !logEnabled) {
return;
}
if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// local cache
logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(
this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt,
event.getTimeStamp(), this.getLayout().doLayout(event));
return;
}
if (logEnabled == null && logSendService == null) {
// no legal logSendService, discard the local cache
logEnabled = Boolean.FALSE;
logSends = null;
return;
}
if (logEnabled == null) {
logEnabled = Boolean.TRUE;
LogSend[] logSends;
synchronized (LogSendAppender.class) {
logSends = LogSendAppender.logSends;
LogSendAppender.logSends = null;
}
// logSendService is ready, send the local cache
if (logSends != null) {
int size = Math.min(logSendIndex.get(), logSends.length);
for (int i = 0; i < size; i++) {
logSendService.send(logSends[i]);
}
}
}
LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(),
event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event));
logSendService.send(logSend);
} catch (Exception ex) {
throw new LogbackException(event.getFormattedMessage(), ex);
}
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
if (p == LogService.BIZ_ID) {
if (i != parameters.length - 1) {
bizId = parameters[i + 1];
}
break;
}
}
}
if (bizId == null) {
return "";
}
return bizId.toString();
}
}
//package we.log;
//
//import ch.qos.logback.classic.spi.ILoggingEvent;
//import ch.qos.logback.core.AppenderBase;
//import ch.qos.logback.core.Layout;
//import ch.qos.logback.core.LogbackException;
//import lombok.Getter;
//import lombok.Setter;
//import we.FizzAppContext;
//import we.flume.clients.log4j2appender.LogService;
//import we.util.NetworkUtils;
//
//import static we.log.LogSendAppender.*;
//
///**
// * log send appender with logback
// *
// * @author huahuang
// */
//public class LogSendAppenderWithLogback extends AppenderBase<ILoggingEvent> {
//
// //负责将日志事件转换为字符串需Getter和Setter方法
// @Getter
// @Setter
// private Layout<ILoggingEvent> layout;
//
// @Override
// protected void append(ILoggingEvent event) {
// try {
// if (logEnabled != null && !logEnabled) {
// return;
// }
//
// if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// // local cache
// logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(
// this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt,
// event.getTimeStamp(), this.getLayout().doLayout(event));
// return;
// }
//
// if (logEnabled == null && logSendService == null) {
// // no legal logSendService, discard the local cache
// logEnabled = Boolean.FALSE;
// logSends = null;
// return;
// }
//
// if (logEnabled == null) {
// logEnabled = Boolean.TRUE;
//
// LogSend[] logSends;
// synchronized (LogSendAppender.class) {
// logSends = LogSendAppender.logSends;
// LogSendAppender.logSends = null;
// }
//
// // logSendService is ready, send the local cache
// if (logSends != null) {
// int size = Math.min(logSendIndex.get(), logSends.length);
// for (int i = 0; i < size; i++) {
// logSendService.send(logSends[i]);
// }
// }
// }
//
// LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(),
// event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event));
// logSendService.send(logSend);
// } catch (Exception ex) {
// throw new LogbackException(event.getFormattedMessage(), ex);
// }
// }
//
// private String getBizId(Object[] parameters) {
// Object bizId = LogService.getBizId();
// if (parameters != null) {
// for (int i = parameters.length - 1; i > -1; --i) {
// Object p = parameters[i];
// if (p == LogService.BIZ_ID) {
// if (i != parameters.length - 1) {
// bizId = parameters[i + 1];
// }
// break;
// }
// }
// }
// if (bizId == null) {
// return "";
// }
// return bizId.toString();
// }
//
//}

View File

@@ -23,7 +23,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.ThreadContext;
@@ -36,7 +35,7 @@ import javax.annotation.Resource;
@Service
public class FizzMonitorService {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzMonitorService.class);
private static final Logger LOGGER = LoggerFactory.getLogger("monitor");
public static final byte ERROR_ALARM = 1;
public static final byte TIMEOUT_ALARM = 2;
@@ -76,8 +75,9 @@ public class FizzMonitorService {
b.append(_timestamp) .append(timestamp);
b.append(Consts.S.RIGHT_BRACE);
String msg = b.toString();
if (Consts.KAFKA.equals(dest)) { // for internal use
LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
if (Consts.KAFKA.equals(dest)) {
// LOGGER.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
LOGGER.info(msg);
} else {
rt.convertAndSend(queue, msg).subscribe();
}

View File

@@ -17,16 +17,15 @@
package we.plugin;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.filter.FilterResult;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.util.Consts;
import we.util.WebUtils;
import java.util.Map;
@@ -53,8 +52,10 @@ public abstract class PluginFilter implements FizzPluginFilter {
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
if (log.isDebugEnabled()) {
log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId);
// log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, traceId);
log.debug(traceId + ' ' + this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"));
}
if (pfr.success) {
return doFilter(exchange, config, fixedConfig);
@@ -62,9 +63,9 @@ public abstract class PluginFilter implements FizzPluginFilter {
if (WebUtils.getDirectResponse(exchange) == null) { // should not reach here
String msg = traceId + ' ' + pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, traceId);
log.error(msg);
} else {
log.error(msg, LogService.BIZ_ID, traceId, pfr.cause);
log.error(msg, pfr.cause);
}
HttpStatus s = HttpStatus.OK;
if (SystemConfig.FIZZ_ERR_RESP_HTTP_STATUS_ENABLE) {

View File

@@ -23,7 +23,6 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
@@ -149,7 +148,9 @@ public class ApiConfig2appsService {
.doOnNext(
msg -> {
String json = msg.getMessage();
log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis());
// log.info("apiConfig2apps: " + json, LogService.BIZ_ID, "ac2as" + System.currentTimeMillis());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "ac2as" + System.currentTimeMillis());
log.info("apiConfig2apps: " + json);
try {
ApiConfig2apps data = JacksonUtils.readValue(json, ApiConfig2apps.class);
updateApiConfig2appsMap(data);

View File

@@ -35,12 +35,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.util.*;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -110,7 +110,11 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("init api config: {}", json, LogService.BIZ_ID, k.toString());
// log.info("init api config: {}", json, LogService.BIZ_ID, k.toString());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("init api config: {}", json);
try {
ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class);
apiConfigMapTmp.put(ac.id, ac);
@@ -156,7 +160,9 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis());
// log.info("api config change: {}", json, LogService.BIZ_ID, "acc" + System.currentTimeMillis());
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, "acc" + System.currentTimeMillis());
log.info("api config change: {}", json);
try {
ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class);
ApiConfig r = apiConfigMap.remove(ac.id);
@@ -430,7 +436,8 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
public Mono<Result<ApiConfig>> auth(ServerWebExchange exchange) {
ServerHttpRequest req = exchange.getRequest();
LogService.setBizId(WebUtils.getTraceId(exchange));
// LogService.setBizId(WebUtils.getTraceId(exchange));
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
boolean dedicatedLineRequest = WebUtils.isDedicatedLineRequest(exchange);
return auth(exchange, dedicatedLineRequest,
WebUtils.getAppId(exchange), WebUtils.getOriginIp(exchange), WebUtils.getTimestamp(exchange), WebUtils.getSign(exchange),
@@ -509,22 +516,29 @@ public class ApiConfigService implements ApplicationListener<ContextRefreshedEve
if (StringUtils.isAnyBlank(timestamp, sign)) {
r.code = Result.FAIL;
r.msg = a.app + " not present timestamp " + timestamp + " or sign " + sign;
} else if (validate(a.app, timestamp, a.secretkey, sign)) {
} else {
r.code = Result.FAIL;
r.msg = a.app + " sign " + sign + " invalid";
long ts = Long.parseLong(timestamp);
LocalDateTime now = LocalDateTime.now();
long timeliness = systemConfig.fizzMD5signTimestampTimeliness();
long start = DateTimeUtils.toMillis(now.minusSeconds(timeliness));
long end = DateTimeUtils.toMillis(now.plusSeconds(timeliness));
if (start <= ts && ts <= end) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(a.app) .append(Consts.S.UNDER_LINE)
.append(timestamp).append(Consts.S.UNDER_LINE)
.append(a.secretkey);
if (!sign.equalsIgnoreCase(DigestUtils.md532(b.toString()))) {
r.code = Result.FAIL;
r.msg = a.app + " sign " + sign + " invalid";
}
} else {
r.code = Result.FAIL;
r.msg = a.app + " timestamp " + timestamp + " invalid";
}
}
return Mono.just(r);
}
private boolean validate(String app, String timestamp, String secretKey, String sign) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(app) .append(Consts.S.UNDER_LINE)
.append(timestamp).append(Consts.S.UNDER_LINE)
.append(secretKey);
return sign.equalsIgnoreCase(DigestUtils.md532(b.toString()));
}
private Mono<Result<ApiConfig>> authSecretKey(App a, String sign, Result<ApiConfig> r) {
if (StringUtils.isBlank(sign)) {
r.code = Result.FAIL;

View File

@@ -17,6 +17,7 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,7 +25,7 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
@@ -80,7 +81,9 @@ public class AppService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("init app: {}", json, LogService.BIZ_ID, k.toString());
// log.info("init app: {}", json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("init app: {}", json);
try {
App app = JacksonUtils.readValue(json, App.class);
oldAppMapTmp.put(app.id, app);
@@ -125,7 +128,9 @@ public class AppService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis());
// log.info("app change: " + json, LogService.BIZ_ID, "ac" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "ac" + System.currentTimeMillis());
log.info("app change: " + json);
try {
App app = JacksonUtils.readValue(json, App.class);
App r = oldAppMap.remove(app.id);

View File

@@ -17,13 +17,14 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.PluginFilter;
import we.util.Consts;
import we.util.WebUtils;
import javax.annotation.Resource;
@@ -53,7 +54,9 @@ public class AuthPluginFilter extends PluginFilter {
r -> {
if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange);
log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId);
// log.debug("{} req auth: {}", traceId, r, LogService.BIZ_ID, traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("{} req auth: {}", traceId, r);
}
Map<String, Object> data = Collections.singletonMap(RESULT, r);
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, AUTH_PLUGIN_FILTER, data);

View File

@@ -17,6 +17,7 @@
package we.plugin.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
@@ -25,7 +26,7 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.NetworkUtils;
import we.util.ReactorUtils;
@@ -87,7 +88,9 @@ public class GatewayGroupService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info(json, LogService.BIZ_ID, k.toString());
// log.info(json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info(json);
try {
GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class);
oldGatewayGroupMapTmp.put(gg.id, gg);
@@ -133,7 +136,9 @@ public class GatewayGroupService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis());
// log.info(json, LogService.BIZ_ID, "gg" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "gg" + System.currentTimeMillis());
log.info(json);
try {
GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class);
GatewayGroup r = oldGatewayGroupMap.remove(gg.id);

View File

@@ -17,19 +17,19 @@
package we.plugin.requestbody;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.spring.web.server.ext.FizzServerWebExchangeDecorator;
import we.util.Consts;
import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
@@ -76,7 +76,9 @@ public class RequestBodyPlugin implements FizzPluginFilter {
}
if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange);
log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId);
// log.debug("{} request is decorated", traceId, LogService.BIZ_ID, traceId);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug("{} request is decorated", traceId);
}
return doFilter(newExchange, config);
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.plugin.stat;
import we.util.JacksonUtils;
/**
* @author hongqiaowei
*/
public class AccessStat {
public String service;
public String apiMethod;
public String apiPath;
public long start;
public int reqs = 0;
public long reqTime;
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -0,0 +1,84 @@
///*
// * Copyright (C) 2021 the original author or authors.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//
//package we.plugin.stat;
//
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
//import org.springframework.scheduling.annotation.Scheduled;
//import we.config.AggregateRedisConfig;
//import we.config.SchedConfig;
//import we.util.Consts;
//import we.util.DateTimeUtils;
//import we.util.StringUtils;
//
//import javax.annotation.Resource;
//import java.util.Map;
//
///**
// * @author hongqiaowei
// */
//
//@Configuration
//public class AccessStatSchedConfig extends SchedConfig {
//
// private static final Logger LOGGER = LoggerFactory.getLogger(AccessStatSchedConfig.class);
//
// private static final Logger STAT_LOGGER = LoggerFactory.getLogger("stat");
//
// @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
// private ReactiveStringRedisTemplate rt;
//
// @Resource
// private StatPluginFilterProperties statPluginFilterProperties;
//
// @Resource
// private StatPluginFilter statPluginFilter;
//
// @Scheduled(cron = "${fizz-access-stat-sched.cron:2/10 * * * * ?}")
// public void sched() {
// long prevTimeWinStart = DateTimeUtils.get10sTimeWinStart(2);
// Map<String, AccessStat> accessStatMap = statPluginFilter.getAccessStat(prevTimeWinStart);
//
// if (accessStatMap.isEmpty()) {
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("no access stat in {} window", DateTimeUtils.convert(prevTimeWinStart, Consts.DP.DP19));
// }
// } else {
// accessStatMap.forEach(
// (smp, accessStat) -> {
// String msg = accessStat.toString();
// String topic = statPluginFilterProperties.getFizzAccessStatTopic();
// if (StringUtils.isBlank(topic)) {
// String channel = statPluginFilterProperties.getFizzAccessStatChannel();
// rt.convertAndSend(channel, msg).subscribe();
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("send access stat {} which belong to {} window to channel {}", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19), channel);
// }
// } else {
// STAT_LOGGER.info(msg);
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("send access stat {} which belong to {} window to topic", msg, DateTimeUtils.convert(accessStat.start, Consts.DP.DP19));
// }
// }
// }
// );
// }
// }
//}

View File

@@ -25,7 +25,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.PluginFilter;
import we.plugin.auth.GatewayGroupService;
import we.util.Consts;
@@ -43,7 +42,7 @@ import java.util.Map;
@Component(StatPluginFilter.STAT_PLUGIN_FILTER)
public class StatPluginFilter extends PluginFilter {
private static final Logger log = LoggerFactory.getLogger(StatPluginFilter.class);
private static final Logger log = LoggerFactory.getLogger("stat");
public static final String STAT_PLUGIN_FILTER = "statPlugin";
@@ -93,7 +92,8 @@ public class StatPluginFilter extends PluginFilter {
if (StringUtils.isBlank(statPluginFilterProperties.getFizzAccessStatTopic())) {
rt.convertAndSend(statPluginFilterProperties.getFizzAccessStatChannel(), b.toString()).subscribe();
} else {
log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use
// log.warn(b.toString(), LogService.HANDLE_STGY, LogService.toKF(statPluginFilterProperties.getFizzAccessStatTopic())); // for internal use
log.info(b.toString());
}
}

View File

@@ -34,7 +34,6 @@ import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.AggregateResult;
import we.fizz.AggregateService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.CallbackConfig;
@@ -85,7 +84,9 @@ public class CallbackService {
String traceId = WebUtils.getTraceId(exchange);
HttpMethod method = req.getMethod();
if (log.isDebugEnabled()) {
log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId);
// log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(traceId + " service2instMap: " + JacksonUtils.writeValueAsString(service2instMap));
}
int rs = cc.receivers.size();
Mono<Object>[] sends = new Mono[rs];
@@ -157,7 +158,9 @@ public class CallbackService {
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);
String traceId = WebUtils.getTraceId(exchange);
WebUtils.request2stringBuilder(traceId, method, r.service + Consts.S.FORWARD_SLASH + r.path, headers, body, b);
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString(), t);
}
private String buildUri(ServerHttpRequest req, ServiceInstance si, String path) {
@@ -199,7 +202,9 @@ public class CallbackService {
StringBuilder b = ThreadContext.getStringBuilder();
String traceId = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(traceId, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> clean(remoteResp)).doOnCancel(() -> clean(remoteResp));
@@ -301,7 +306,9 @@ public class CallbackService {
b.append(req.service).append(Consts.S.FORWARD_SLASH).append(req.path);
b.append(Consts.S.LINE_SEPARATOR).append(callback).append(Consts.S.LINE_SEPARATOR);
WebUtils.request2stringBuilder(req.id, req.method, service + Consts.S.FORWARD_SLASH + path, req.headers, req.body, b);
log.error(b.toString(), LogService.BIZ_ID, req.id, t);
// log.error(b.toString(), LogService.BIZ_ID, req.id, t);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, req.id);
log.error(b.toString(), t);
}
private void clean(ClientResponse cr) {

View File

@@ -37,7 +37,6 @@ import we.config.ProxyWebClientConfig;
import we.config.SystemConfig;
import we.exception.ExternalService4xxException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService;
import we.service_registry.RegistryCenterService;
import we.util.Consts;
import we.util.ThreadContext;
@@ -215,7 +214,9 @@ public class FizzWebClient {
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(traceId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, traceId);
// log.debug(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.debug(b.toString());
}
WebClient.RequestBodyUriSpec requestBodyUriSpec = webClient.method(method);

View File

@@ -16,6 +16,7 @@
*/
package we.proxy;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,19 +25,13 @@ import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -102,7 +97,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
return Flux.just(e);
}
Object v = e.getValue();
LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString());
// LOGGER.info(k.toString() + Consts.S.COLON + v.toString(), LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
LOGGER.info(k.toString() + Consts.S.COLON + v.toString());
String json = (String) v;
try {
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
@@ -194,7 +191,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis());
// LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "rpc" + System.currentTimeMillis());
LOGGER.info(json);
try {
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
this.updateLocalCache(rpcService, serviceToInstancesMap, serviceToLoadBalanceTypeMap, idToRpcServiceMap,

View File

@@ -17,6 +17,7 @@
package we.stats.circuitbreaker;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -25,12 +26,8 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.stats.FlowStat;
import we.util.JacksonUtils;
import we.util.ResourceIdUtils;
import we.util.Result;
import we.util.WebUtils;
import we.util.*;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@@ -210,12 +207,16 @@ public class CircuitBreakManager {
}
if (cb == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// LOGGER.debug("no circuit breaker for {} {}", service, path, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
LOGGER.debug("no circuit breaker for {} {}", service, path);
}
return true;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
// LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
ThreadContext.put(Consts.TRACE_ID, WebUtils.getTraceId(exchange));
LOGGER.debug("circuit breaker for {} {} is {}", service, path, cb);
}
return cb.permit(exchange, currentTimeWindow, flowStat);
}

View File

@@ -17,6 +17,7 @@
package we.stats.ratelimit;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
@@ -24,10 +25,9 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import we.util.ThreadContext;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@@ -78,7 +78,9 @@ public class ResourceRateLimitConfigService {
return Flux.just(e);
}
String json = (String) e.getValue();
log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString());
// log.info("rateLimitConfig: " + json, LogService.BIZ_ID, k.toString());
ThreadContext.put(Consts.TRACE_ID, k.toString());
log.info("rateLimitConfig: " + json);
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
oldResourceRateLimitConfigMapTmp.put(rrlc.id, rrlc);
@@ -122,7 +124,9 @@ public class ResourceRateLimitConfigService {
}
).doOnNext(msg -> {
String json = msg.getMessage();
log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
// log.info("channel recv rate limit config: " + json, LogService.BIZ_ID, "rrlc" + System.currentTimeMillis());
ThreadContext.put(Consts.TRACE_ID, "rrlc" + System.currentTimeMillis());
log.info("channel recv rate limit config: " + json);
try {
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
ResourceRateLimitConfig r = oldResourceRateLimitConfigMap.remove(rrlc.id);

View File

@@ -34,7 +34,6 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.filter.FilterResult;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.AuthPluginFilter;
import we.proxy.Route;
@@ -332,7 +331,8 @@ public abstract class WebUtils {
public static String getClientReqPath(ServerWebExchange exchange) {
String p = exchange.getAttribute(clientRequestPath);
if (p == null) {
p = exchange.getRequest().getPath().value();
// p = exchange.getRequest().getPath().value();
p = exchange.getRequest().getURI().getPath();
int secFS = p.indexOf(Consts.S.FORWARD_SLASH, 1);
if (StringUtils.isBlank(gatewayPrefix) || Consts.S.FORWARD_SLASH_STR.equals(gatewayPrefix)) {
p = p.substring(secFS);
@@ -533,10 +533,13 @@ public abstract class WebUtils {
// }
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(code).append(Consts.S.SPACE).append(msg);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
log.error(b.toString());
} else {
log.error(b.toString(), LogService.BIZ_ID, traceId, t);
// log.error(b.toString(), LogService.BIZ_ID, traceId, t);
log.error(b.toString(), t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
@@ -852,7 +855,9 @@ public abstract class WebUtils {
request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(httpStatus);
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString());
transmitFailFilterResult(exchange, filter);
return buildDirectResponseAndBindContext(exchange, httpStatus, new HttpHeaders(), Consts.S.EMPTY);
}
@@ -866,7 +871,9 @@ public abstract class WebUtils {
request2stringBuilder(exchange, b);
b.append(Consts.S.LINE_SEPARATOR);
b.append(filter).append(Consts.S.SPACE).append(httpStatus);
log.error(b.toString(), LogService.BIZ_ID, traceId);
// log.error(b.toString(), LogService.BIZ_ID, traceId);
org.apache.logging.log4j.ThreadContext.put(Consts.TRACE_ID, traceId);
log.error(b.toString());
transmitFailFilterResult(exchange, filter);
headers = headers == null ? new HttpHeaders() : headers;
content = StringUtils.isBlank(content) ? Consts.S.EMPTY : content;

View File

@@ -0,0 +1,37 @@
package we;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
public class LogKafkaTests {
// @Test
void test() throws InterruptedException, IOException {
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
/*ConfigurationSource source = new ConfigurationSource(new FileInputStream("D:\\idea\\projects\\fizz-gateway-community\\fizz-core\\src\\test\\resources\\log4j2-test.xml"));
Configurator.initialize(null, source);
Logger logger4es = LogManager.getLogger(LogKafkaTests.class);*/
Logger logger4es = LoggerFactory.getLogger(LogKafkaTests.class);
Logger logger4kf = LoggerFactory.getLogger("monitor");
ThreadContext.put("traceId", "ti1");
// MDC.put("traceId", "ti0");
logger4es.warn("520");
logger4kf.warn("521");
Thread.currentThread().join();
}
}

View File

@@ -0,0 +1,52 @@
package we;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import we.config.RedisReactiveConfig;
import we.config.RedisReactiveProperties;
public class RedisClusterTests {
// @Test
void test() throws InterruptedException {
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
Logger LOGGER = LogManager.getLogger(RedisClusterTests.class);
RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() {
};
redisReactiveProperties.setType(RedisReactiveProperties.CLUSTER);
redisReactiveProperties.setPassword("123456");
redisReactiveProperties.setClusterNodes("ip:port");
RedisReactiveConfig redisReactiveConfig = new RedisReactiveConfig(redisReactiveProperties) {
};
LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) redisReactiveConfig.lettuceConnectionFactory();
lettuceConnectionFactory.afterPropertiesSet();
ReactiveStringRedisTemplate reactiveStringRedisTemplate = redisReactiveConfig.reactiveStringRedisTemplate(lettuceConnectionFactory);
reactiveStringRedisTemplate.opsForValue().set("hqw", "lancer").block();
String channel = "ch1";
reactiveStringRedisTemplate.listenToChannel(channel)
.doOnError(
t -> {
LOGGER.error("lsn channel {} error", channel, t);
}
)
.doOnSubscribe(
s -> {
LOGGER.info("success to lsn on {}", channel);
}
)
.doOnNext(
msg -> {
String message = msg.getMessage();
LOGGER.info("receive message: {}", message);
}
)
.subscribe();
Thread.currentThread().join();
}
}

View File

@@ -1,18 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<properties>
<property name="APP_NAME">fizz-core</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %X{traceId} %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console" />
</Root>
<Logger name="we" level="DEBUG"/>
</Loggers>
<properties>
<property name="APP_NAME">fizz-core</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console"/>
</Root>
<Logger name="monitor" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="we" level="debug"/>
</Loggers>
</Configuration>

View File

@@ -0,0 +1,2 @@
log4j2.asyncQueueFullPolicy=Discard
log4j2.discardThreshold=ERROR

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.6.6-beta1</version>
<version>2.6.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -17,6 +17,7 @@
package we.plugin.dedicatedline.auth;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -27,9 +28,9 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.util.Consts;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -77,7 +78,9 @@ public class DedicatedLineApiAuthPluginFilter implements FizzPluginFilter {
return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson);
}
} catch (Exception e) {
log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
ThreadContext.put(Consts.TRACE_ID, traceId);
log.error("{} {} exception", traceId, DEDICATED_LINE_API_AUTH_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

View File

@@ -21,6 +21,7 @@ import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.symmetric.SymmetricAlgorithm;
import cn.hutool.crypto.symmetric.SymmetricCrypto;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,14 +30,12 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilterChain;
import we.plugin.requestbody.RequestBodyPlugin;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
@@ -46,7 +45,6 @@ import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -70,8 +68,9 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin {
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config) {
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
try {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
String dedicatedLineId = WebUtils.getDedicatedLineId(exchange);
String cryptoKey = dedicatedLineService.getRequestCryptoKey(dedicatedLineId);
@@ -106,7 +105,8 @@ public class DedicatedLineCodecPluginFilter extends RequestBodyPlugin {
});
} catch (Exception e) {
log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
log.error("{} {} Exception", traceId, DEDICATED_LINE_CODEC_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

View File

@@ -18,6 +18,7 @@
package we.plugin.dedicatedline.pairing;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -28,9 +29,9 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.dedicated_line.DedicatedLineService;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.util.Consts;
import we.util.DigestUtils;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -59,8 +60,9 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
String traceId = WebUtils.getTraceId(exchange);
ThreadContext.put(Consts.TRACE_ID, traceId);
try {
LogService.setBizId(traceId);
// LogService.setBizId(traceId);
String dedicatedLineId = WebUtils.getDedicatedLineId(exchange);
String secretKey = dedicatedLineService.getSignSecretKey(dedicatedLineId);
String ts = WebUtils.getDedicatedLineTimestamp(exchange);
@@ -83,7 +85,8 @@ public class DedicatedLinePairingPluginFilter implements FizzPluginFilter {
return WebUtils.response(exchange, HttpStatus.UNAUTHORIZED, null, respJson);
}
} catch (Exception e) {
log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
// log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, LogService.BIZ_ID, traceId, e);
log.error("{} {} Exception", traceId, DEDICATED_LINE_PAIRING_PLUGIN_FILTER, e);
String respJson = WebUtils.jsonRespBody(HttpStatus.INTERNAL_SERVER_ERROR.value(),
HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase(), traceId);
return WebUtils.response(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, respJson);

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.6.6-beta1</version>
<version>2.6.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

17
pom.xml
View File

@@ -10,7 +10,7 @@
<reactor-bom.version>Dysprosium-SR25</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.7.RELEASE</nacos.cloud.version>
<netty.version>4.1.77.Final</netty.version>
<netty.version>4.1.78.Final</netty.version>
<httpcore.version>4.4.15</httpcore.version>
<log4j2.version>2.17.2</log4j2.version>
<slf4j.version>1.7.36</slf4j.version>
@@ -18,13 +18,14 @@
<grpc.version>1.16.1</grpc.version>
<mockito.version>3.4.6</mockito.version>
<curator.version>4.0.1</curator.version>
<zookeeper.version>3.5.9</zookeeper.version>
<zookeeper.version>3.5.10</zookeeper.version>
<r2dbc-mysql.version>0.8.2</r2dbc-mysql.version>
<reflections.version>0.9.11</reflections.version>
<commons-pool2.version>2.11.1</commons-pool2.version>
<netty-tcnative.version>2.0.52.Final</netty-tcnative.version>
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>
</properties>
<parent>
@@ -37,13 +38,13 @@
<artifactId>fizz-gateway-community</artifactId>
<name>${project.artifactId}</name>
<description>fizz gateway community</description>
<version>2.6.6-beta1</version>
<version>2.6.6</version>
<packaging>pom</packaging>
<modules>
<module>fizz-common</module>
<module>fizz-core</module>
<module>fizz-plugin</module>
<!--<module>fizz-bootstrap</module>-->
<module>fizz-bootstrap</module>
<module>fizz-spring-boot-starter</module>
<!--<module>fizz-input-mysql</module>-->
</modules>
@@ -156,7 +157,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
<version>1.2.83</version>
</dependency>
<dependency>
@@ -282,7 +283,7 @@
<dependency>
<groupId>org.noear</groupId>
<artifactId>snack3</artifactId>
<version>3.2.24</version>
<version>3.2.29</version>
</dependency>
<dependency>
@@ -331,7 +332,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<version>1.18.24</version>
</dependency>
<!-- grpc -->