diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml
index c38791d..2830651 100644
--- a/fizz-bootstrap/pom.xml
+++ b/fizz-bootstrap/pom.xml
@@ -12,13 +12,13 @@
com.fizzgate
fizz-bootstrap
- 2.3.0
+ 2.3.2-beta1
1.8
5.2.13.RELEASE
Dragonfruit-SR3
- Dysprosium-SR22
+ Dysprosium-SR23
5.3.7.RELEASE
4.1.68.Final
4.4.14
diff --git a/fizz-bootstrap/src/main/resources/application.yml b/fizz-bootstrap/src/main/resources/application.yml
index de9adfd..ad79bf3 100644
--- a/fizz-bootstrap/src/main/resources/application.yml
+++ b/fizz-bootstrap/src/main/resources/application.yml
@@ -105,4 +105,9 @@ refresh-local-cache:
fizz:
aggregate:
- writeMapNullValue: false
\ No newline at end of file
+ writeMapNullValue: false
+
+fizz-trace-id:
+ header: X-Trace-Id # default
+ value-strategy: requestId # default, or can be uuid
+ value-prefix: fizz # default
diff --git a/fizz-bootstrap/src/main/resources/log4j2-spring.xml b/fizz-bootstrap/src/main/resources/log4j2-spring.xml
index 8429c18..c67a4d2 100644
--- a/fizz-bootstrap/src/main/resources/log4j2-spring.xml
+++ b/fizz-bootstrap/src/main/resources/log4j2-spring.xml
@@ -20,6 +20,5 @@
-
diff --git a/fizz-common/pom.xml b/fizz-common/pom.xml
index 6d32363..b40e131 100644
--- a/fizz-common/pom.xml
+++ b/fizz-common/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.3.0
+ 2.3.2-beta1
../pom.xml
4.0.0
diff --git a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
index 3c46cdc..3ccba00 100644
--- a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
+++ b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpRequestDecorator.java
@@ -48,6 +48,10 @@ public class FizzServerHttpRequestDecorator extends ServerHttpRequestDecorator {
return headers;
}
+ public void setEmptyBody() {
+ this.body = Flux.empty();
+ }
+
public void setBody(DataBuffer body) {
if (body instanceof PooledDataBuffer) {
byte[] bytes = new byte[body.readableByteCount()];
diff --git a/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpResponseDecorator.java b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpResponseDecorator.java
new file mode 100644
index 0000000..7f85718
--- /dev/null
+++ b/fizz-common/src/main/java/we/spring/http/server/reactive/ext/FizzServerHttpResponseDecorator.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.spring.http.server.reactive.ext;
+
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.PooledDataBuffer;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import reactor.core.publisher.Mono;
+import we.util.NettyDataBufferUtils;
+
+/**
+ * @author hongqiaowei
+ */
+
+public abstract class FizzServerHttpResponseDecorator extends ServerHttpResponseDecorator {
+
+ public FizzServerHttpResponseDecorator(ServerHttpResponse delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public Mono writeWith(Publisher extends DataBuffer> bodyPublisher) {
+
+ return
+ NettyDataBufferUtils.join(bodyPublisher).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
+ .flatMap(
+ body -> {
+ DataBuffer b = null;
+ if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ if (body instanceof PooledDataBuffer) {
+ try {
+ b = NettyDataBufferUtils.from(body.asByteBuffer());
+ } finally {
+ NettyDataBufferUtils.release(body);
+ }
+ } else {
+ b = body;
+ }
+ }
+ Publisher extends DataBuffer> r = writeWith(b);
+ return super.writeWith(r);
+ }
+ );
+ }
+
+ /**
+ * You can getDelegate().getHeaders().set("h", "v") in the method and others for response.
+ * @param remoteResponseBody
+ * @return the real http response body to client, or Mono.empty() if response without body
+ */
+ public abstract Publisher extends DataBuffer> writeWith(DataBuffer remoteResponseBody);
+}
diff --git a/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java b/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java
new file mode 100644
index 0000000..ce00db5
--- /dev/null
+++ b/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2020 the original author or authors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package we.spring.web.server.ext;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.lang.Nullable;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.StringUtils;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.ServerWebExchangeDecorator;
+import reactor.core.publisher.Mono;
+import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
+import we.util.Constants;
+import we.util.NettyDataBufferUtils;
+import we.util.ThreadContext;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author hongqiaowei
+ */
+
+public class FizzServerWebExchangeDecorator extends ServerWebExchangeDecorator {
+
+ private static final MultiValueMap EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap(0));
+
+ private static final Mono> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache();
+
+ public FizzServerWebExchangeDecorator(ServerWebExchange delegate) {
+ super(delegate);
+ }
+
+ private Charset getMediaTypeCharset(@Nullable MediaType mediaType) {
+ if (mediaType != null && mediaType.getCharset() != null) {
+ return mediaType.getCharset();
+ } else {
+ return StandardCharsets.UTF_8;
+ }
+ }
+
+ private MultiValueMap parseFormData(Charset charset, String source) {
+ String[] pairs = StringUtils.tokenizeToStringArray(source, "&");
+ MultiValueMap result = new LinkedMultiValueMap<>(pairs.length);
+ try {
+ for (String pair : pairs) {
+ int idx = pair.indexOf('=');
+ if (idx == -1) {
+ result.add(URLDecoder.decode(pair, charset.name()), null);
+ } else {
+ String name = URLDecoder.decode(pair.substring(0, idx), charset.name());
+ String value = URLDecoder.decode(pair.substring(idx + 1), charset.name());
+ result.add(name, value);
+ }
+ }
+ } catch (UnsupportedEncodingException ex) {
+ throw new IllegalStateException(ex);
+ }
+ return result;
+ }
+
+ @Override
+ public Mono> getFormData() {
+ ServerHttpRequest req = getDelegate().getRequest();
+ MediaType ct = req.getHeaders().getContentType();
+ if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(ct)) {
+ Charset charset = getMediaTypeCharset(ct);
+ return
+ req.getBody().defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
+ .single()
+ .map(
+ body -> {
+ if (body == NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
+ return EMPTY_FORM_DATA;
+ } else {
+ CharBuffer charBuffer = charset.decode(body.asByteBuffer());
+ return parseFormData(charset, charBuffer.toString());
+ }
+ }
+ );
+ } else {
+ return EMPTY_FORM_DATA_MONO;
+ }
+ }
+
+ /**
+ * @param dataMap can be {@link org.springframework.util.LinkedMultiValueMap}
+ */
+ public void setFormData(MultiValueMap dataMap) {
+ FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) getDelegate().getRequest();
+ req.getHeaders().setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+ if (CollectionUtils.isEmpty(dataMap)) {
+ req.setEmptyBody();
+ return;
+ }
+ StringBuilder b = ThreadContext.getStringBuilder();
+ Set>> fieldValuesEntries = dataMap.entrySet();
+ int fs = fieldValuesEntries.size(), cnt = 0;
+ try {
+ for (Map.Entry> fieldValuesEntry : fieldValuesEntries) {
+ String field = fieldValuesEntry.getKey();
+ List values = fieldValuesEntry.getValue();
+ if (CollectionUtils.isEmpty(values)) {
+ b.append(URLEncoder.encode(field, Constants.Charset.UTF8));
+ } else {
+ int vs = values.size();
+ for (int i = 0; i < vs; ) {
+ b.append(URLEncoder.encode(field, Constants.Charset.UTF8))
+ .append('=')
+ .append(URLEncoder.encode(values.get(i), Constants.Charset.UTF8));
+ if ((++i) != vs) {
+ b.append('&');
+ }
+ }
+ }
+ if ((++cnt) != fs) {
+ b.append('&');
+ }
+ }
+ } catch (UnsupportedEncodingException ex) {
+ throw new IllegalStateException(ex);
+ }
+ req.setBody(b.toString());
+ }
+}
diff --git a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
index 440a377..352f840 100644
--- a/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
+++ b/fizz-common/src/main/java/we/util/NettyDataBufferUtils.java
@@ -59,7 +59,7 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
return dataBufferFactory.wrap(byteBuf);
}
- public static boolean release(@Nullable String reqId, @Nullable DataBuffer dataBuffer) {
+ public static boolean release(@Nullable String traceId, @Nullable DataBuffer dataBuffer) {
if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) {
@@ -69,7 +69,7 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
int refCnt = nativeBuffer.refCnt();
if (refCnt < 1) {
if (log.isDebugEnabled()) {
- log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, reqId);
+ log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, traceId);
}
return false;
}
diff --git a/fizz-common/src/main/java/we/util/NetworkUtils.java b/fizz-common/src/main/java/we/util/NetworkUtils.java
index 01a151b..eab90a5 100644
--- a/fizz-common/src/main/java/we/util/NetworkUtils.java
+++ b/fizz-common/src/main/java/we/util/NetworkUtils.java
@@ -47,8 +47,12 @@ public class NetworkUtils {
public static String getServerIp() {
try {
if (serverIp == null) {
- serverIp = System.getenv(SERVER_IP);
- log.info("SERVER_IP is " + serverIp);
+ serverIp = System.getProperty(SERVER_IP);
+ log.info("JVM env SERVER_IP is " + serverIp);
+ if (StringUtils.isBlank(serverIp)) {
+ serverIp = System.getenv(SERVER_IP);
+ log.info("System env SERVER_IP is " + serverIp);
+ }
if (StringUtils.isBlank(serverIp)) {
boolean found = false;
Enumeration nis = null;
diff --git a/fizz-core/pom.xml b/fizz-core/pom.xml
index f766b75..bf311d9 100644
--- a/fizz-core/pom.xml
+++ b/fizz-core/pom.xml
@@ -5,7 +5,7 @@
fizz-gateway-community
com.fizzgate
- 2.3.0
+ 2.3.2-beta1
../pom.xml
4.0.0
@@ -57,6 +57,11 @@
org.springframework.boot
spring-boot-starter-log4j2
+
+ ch.qos.logback
+ logback-classic
+ true
+
org.springframework.boot
@@ -145,6 +150,12 @@
spring-cloud-starter-alibaba-nacos-discovery
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
org.apache.commons
commons-pool2
diff --git a/fizz-core/src/main/java/we/config/AggregateRedisConfig.java b/fizz-core/src/main/java/we/config/AggregateRedisConfig.java
index 04e39ed..8030fed 100644
--- a/fizz-core/src/main/java/we/config/AggregateRedisConfig.java
+++ b/fizz-core/src/main/java/we/config/AggregateRedisConfig.java
@@ -23,13 +23,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.dao.DataAccessException;
-import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
-import org.springframework.data.redis.connection.ReactiveRedisConnection;
-import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
-import org.springframework.data.redis.connection.RedisClusterConnection;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.RedisSentinelConnection;
+import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
diff --git a/fizz-core/src/main/java/we/config/LogConfig.java b/fizz-core/src/main/java/we/config/LogConfig.java
new file mode 100644
index 0000000..eeafcbb
--- /dev/null
+++ b/fizz-core/src/main/java/we/config/LogConfig.java
@@ -0,0 +1,104 @@
+package we.config;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.PatternLayout;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.boot.context.event.ApplicationStartingEvent;
+import org.springframework.boot.context.event.SpringApplicationEvent;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.EventListener;
+import we.log.LogProperties;
+import we.log.LogSendAppenderWithLogback;
+
+@Configuration
+public class LogConfig {
+
+ @Bean
+ @ConfigurationProperties("fizz.logging")
+ public LogProperties logProperties() {
+ return new LogProperties();
+ }
+
+ @Configuration
+ @ConditionalOnClass(AbstractAppender.class)
+ @AutoConfigureAfter(AggregateRedisConfig.class)
+ public static class CustomLog4j2Config {
+ }
+
+ @Configuration
+ @ConditionalOnClass(LoggerContext.class)
+ @AutoConfigureAfter(AggregateRedisConfig.class)
+ public static class CustomLogbackConfig {
+ @Bean
+ public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
+ return new LoggingConfigurationApplicationListener(logProperties);
+ }
+ }
+
+ public static class LoggingConfigurationApplicationListener {
+ private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
+ private static final String APPENDER_NAME = "fizzLogSendAppender";
+ private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
+ private LogProperties logProperties;
+
+ public LoggingConfigurationApplicationListener() {
+ }
+
+ public LoggingConfigurationApplicationListener(LogProperties logProperties) {
+ this.logProperties = logProperties;
+ }
+
+ @EventListener
+ public void contextRefreshed(SpringApplicationEvent event) {
+ onApplicationEvent(event);
+ }
+
+ @EventListener
+ public void applicationStarting(ApplicationStartingEvent event) {
+ onApplicationEvent(event);
+ }
+
+ @EventListener
+ public void applicationReady(ApplicationReadyEvent event) {
+ onApplicationEvent(event);
+ }
+
+ public void onApplicationEvent(ApplicationEvent event) {
+ LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
+ final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
+ String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
+
+ final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
+ newAppender.setName(APPENDER_NAME);
+ newAppender.setContext(context);
+ PatternLayout layout = new PatternLayout();
+ layout.setPattern(layoutConfig);
+ newAppender.setLayout(layout);
+
+ Appender appender = root.getAppender(APPENDER_NAME);
+ if (appender == null) {
+ newAppender.start();
+ root.addAppender(newAppender);
+ logger.info("Added fizz log send appender:{}", APPENDER_NAME);
+ } else {
+ newAppender.start();
+ root.detachAppender(APPENDER_NAME);
+ root.addAppender(newAppender);
+ logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
+ }
+ }
+ }
+
+
+}
diff --git a/fizz-core/src/main/java/we/config/ProxyWebClientConfig.java b/fizz-core/src/main/java/we/config/ProxyWebClientConfig.java
index 2e244cd..2ac1819 100644
--- a/fizz-core/src/main/java/we/config/ProxyWebClientConfig.java
+++ b/fizz-core/src/main/java/we/config/ProxyWebClientConfig.java
@@ -21,8 +21,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.netty.resources.LoopResources;
-import we.util.JacksonUtils;
/**
* @author hongqiaowei
diff --git a/fizz-core/src/main/java/we/config/SystemConfig.java b/fizz-core/src/main/java/we/config/SystemConfig.java
index e3fbe53..4e96476 100644
--- a/fizz-core/src/main/java/we/config/SystemConfig.java
+++ b/fizz-core/src/main/java/we/config/SystemConfig.java
@@ -69,6 +69,27 @@ public class SystemConfig {
@Value("${route-timeout:0}")
private long routeTimeout = 0;
+ @Value("${fizz-trace-id.header:X-Trace-Id}")
+ private String fizzTraceIdHeader;
+
+ @Value("${fizz-trace-id.value-strategy:requestId}")
+ private String fizzTraceIdValueStrategy;
+
+ @Value("${fizz-trace-id.value-prefix:fizz}")
+ private String fizzTraceIdValuePrefix;
+
+ public String fizzTraceIdHeader() {
+ return fizzTraceIdHeader;
+ }
+
+ public String fizzTraceIdValueStrategy() {
+ return fizzTraceIdValueStrategy;
+ }
+
+ public String fizzTraceIdValuePrefix() {
+ return fizzTraceIdValuePrefix;
+ }
+
public long getRouteTimeout() {
return routeTimeout;
}
@@ -179,8 +200,8 @@ public class SystemConfig {
@PostConstruct
public void afterPropertiesSet() {
- afterLogResponseBodySet();
- afterLogHeadersSet();
+ // afterLogResponseBodySet();
+ // afterLogHeadersSet();
}
private void afterLogResponseBodySet() {
@@ -193,6 +214,9 @@ public class SystemConfig {
Arrays.stream(StringUtils.split(logHeaders, Constants.Symbol.COMMA)).forEach(h -> {
logHeaderSet.add(h);
});
+ if (!fizzTraceIdHeader.equals("X-Trace-Id")) {
+ logHeaderSet.add(fizzTraceIdHeader);
+ }
WebUtils.LOG_HEADER_SET = logHeaderSet;
log.info("log header list: " + logHeaderSet.toString());
}
@@ -216,7 +240,7 @@ public class SystemConfig {
this.updateLogResponseBody(logResponseBody);
}
- @Value("${log.headers:x}")
+ @Value("${log.headers:}")
public void setLogHeaders(String logHeaders) {
if (ObjectUtils.nullSafeEquals(this.logHeaders, logHeaders)) {
return;
diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java
index 1aa5f18..047975f 100644
--- a/fizz-core/src/main/java/we/filter/AggregateFilter.java
+++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java
@@ -138,11 +138,13 @@ public class AggregateFilter implements WebFilter {
}
// traceId
+ /*
String tmpTraceId = CommonConstants.TRACE_ID_PREFIX + exchange.getRequest().getId();
if (StringUtils.isNotBlank(request.getHeaders().getFirst(CommonConstants.HEADER_TRACE_ID))) {
tmpTraceId = request.getHeaders().getFirst(CommonConstants.HEADER_TRACE_ID);
}
- final String traceId = tmpTraceId;
+ */
+ final String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
LOGGER.debug("matched aggregation api: {}", path);
@@ -207,9 +209,9 @@ public class AggregateFilter implements WebFilter {
// default content-type
serverHttpResponse.getHeaders().add(CommonConstants.HEADER_CONTENT_TYPE, CommonConstants.CONTENT_TYPE_JSON);
}
- List headerTraceIds = serverHttpResponse.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
+ List headerTraceIds = serverHttpResponse.getHeaders().get(systemConfig.fizzTraceIdHeader());
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
- serverHttpResponse.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
+ serverHttpResponse.getHeaders().add(systemConfig.fizzTraceIdHeader(), traceId);
}
long end = System.currentTimeMillis();
diff --git a/fizz-core/src/main/java/we/filter/CallbackFilter.java b/fizz-core/src/main/java/we/filter/CallbackFilter.java
index d69339c..f1e5aef 100644
--- a/fizz-core/src/main/java/we/filter/CallbackFilter.java
+++ b/fizz-core/src/main/java/we/filter/CallbackFilter.java
@@ -172,7 +172,7 @@ public class CallbackFilter extends FizzWebFilter {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(Constants.Symbol.LEFT_BRACE);
- b.append(_id); toJsonStringValue(b, req.getId()); b.append(Constants.Symbol.COMMA);
+ b.append(_id); toJsonStringValue(b, WebUtils.getTraceId(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_datetime); b.append(System.currentTimeMillis()); b.append(Constants.Symbol.COMMA);
b.append(_origin); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Constants.Symbol.COMMA);
diff --git a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
index 1e37bda..c0b9437 100644
--- a/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
+++ b/fizz-core/src/main/java/we/filter/FilterExceptionHandlerConfig.java
@@ -68,30 +68,29 @@ public class FilterExceptionHandlerConfig {
return Mono.empty();
}
}
+ String rid = WebUtils.getTraceId(exchange);
if (t instanceof ExecuteScriptException) {
ExecuteScriptException ex = (ExecuteScriptException) t;
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
- String reqId = exchange.getRequest().getId();
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
- rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId, ex.getStepContext());
+ rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid, ex.getStepContext());
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(JacksonUtils.writeValueAsString(rs).getBytes())));
} else {
- rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId);
+ rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
if (t instanceof FizzRuntimeException) {
FizzRuntimeException ex = (FizzRuntimeException) t;
- log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
+ log.error(ex.getMessage(), LogService.BIZ_ID, rid, ex);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
- String reqId = exchange.getRequest().getId();
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
- rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId, ex.getStepContext());
+ rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid, ex.getStepContext());
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(JacksonUtils.writeValueAsString(rs).getBytes())));
} else {
- rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId);
+ rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
@@ -100,8 +99,8 @@ public class FilterExceptionHandlerConfig {
if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
- log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
- String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), exchange.getRequest().getId());
+ log.error(b.toString(), LogService.BIZ_ID, rid, t);
+ String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));
} else {
diff --git a/fizz-core/src/main/java/we/filter/FizzLogFilter.java b/fizz-core/src/main/java/we/filter/FizzLogFilter.java
index 7756447..3c346eb 100644
--- a/fizz-core/src/main/java/we/filter/FizzLogFilter.java
+++ b/fizz-core/src/main/java/we/filter/FizzLogFilter.java
@@ -55,7 +55,7 @@ public class FizzLogFilter implements WebFilter {
WebUtils.request2stringBuilder(exchange, b);
b.append(resp).append(exchange.getResponse().getStatusCode())
.append(in) .append(System.currentTimeMillis() - startTime);
- LOGGER.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
+ LOGGER.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
}
}
);
diff --git a/fizz-core/src/main/java/we/filter/FlowControlFilter.java b/fizz-core/src/main/java/we/filter/FlowControlFilter.java
index 7df67c3..108407a 100644
--- a/fizz-core/src/main/java/we/filter/FlowControlFilter.java
+++ b/fizz-core/src/main/java/we/filter/FlowControlFilter.java
@@ -17,11 +17,6 @@
package we.filter;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.annotation.Resource;
-
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +28,6 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
-
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.config.SystemConfig;
@@ -47,10 +41,11 @@ import we.stats.IncrRequestResult;
import we.stats.ResourceConfig;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
-import we.util.Constants;
-import we.util.JacksonUtils;
-import we.util.ThreadContext;
-import we.util.WebUtils;
+import we.util.*;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
/**
* @author hongqiaowei
@@ -64,11 +59,15 @@ public class FlowControlFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
- private static final String admin = "admin";
+ private static final String admin = "admin";
- private static final String actuator = "actuator";
+ private static final String actuator = "actuator";
- public static final String ADMIN_REQUEST = "$a";
+ private static final String uuid = "uuid";
+
+ private static final String defaultFizzTraceIdValueStrategy = "requestId";
+
+ public static final String ADMIN_REQUEST = "$a";
@Resource
private FlowControlFilterProperties flowControlFilterProperties;
@@ -85,6 +84,9 @@ public class FlowControlFilter extends FizzWebFilter {
@Resource
private AppService appService;
+ @Resource
+ private SystemConfig systemConfig;
+
@Override
public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -111,10 +113,6 @@ public class FlowControlFilter extends FizzWebFilter {
return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
}
String app = WebUtils.getAppId(exchange);
- // if (app != null && !appService.getAppMap().containsKey(app)) {
- // String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no app " + app, exchange.getRequest().getId());
- // return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
- // }
path = WebUtils.getClientReqPath(exchange);
String ip = WebUtils.getOriginIp(exchange);
@@ -151,6 +149,7 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
long start = System.currentTimeMillis();
+ setTraceId(exchange);
return chain.filter(exchange).doFinally(s -> {
long rt = System.currentTimeMillis() - start;
if (s == SignalType.ON_ERROR || exchange.getResponse().getStatusCode().is5xxServerError()) {
@@ -162,9 +161,29 @@ public class FlowControlFilter extends FizzWebFilter {
}
}
+ setTraceId(exchange);
return chain.filter(exchange);
}
+ private void setTraceId(ServerWebExchange exchange) {
+ String traceId = exchange.getRequest().getHeaders().getFirst(systemConfig.fizzTraceIdHeader());
+ if (StringUtils.isBlank(traceId)) {
+ String fizzTraceIdValueStrategy = systemConfig.fizzTraceIdValueStrategy();
+ if (fizzTraceIdValueStrategy.equals(defaultFizzTraceIdValueStrategy)) {
+ traceId = exchange.getRequest().getId();
+ } else if (fizzTraceIdValueStrategy.equals(uuid)) {
+ traceId = UUIDUtil.getUUID();
+ } else {
+ throw Utils.runtimeExceptionWithoutStack("unsupported " + fizzTraceIdValueStrategy + " trace id value strategy!");
+ }
+ }
+ String fizzTraceIdValuePrefix = systemConfig.fizzTraceIdValuePrefix();
+ if (StringUtils.isNotBlank(fizzTraceIdValuePrefix)) {
+ traceId = fizzTraceIdValuePrefix + Constants.Symbol.DASH + traceId;
+ }
+ exchange.getAttributes().put(WebUtils.TRACE_ID, traceId);
+ }
+
private List getResourceConfigItselfAndParents(ResourceConfig rc, List rcs) {
boolean check = false;
String rcId = rc.getResourceId();
diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
index 771373e..9c95c7b 100644
--- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java
+++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java
@@ -32,6 +32,7 @@ import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AuthPluginFilter;
import we.plugin.stat.StatPluginFilter;
+import we.proxy.Route;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -67,7 +68,7 @@ public class PreprocessFilter extends FizzWebFilter {
Map fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr);
Map appendHdrs = new HashMap<>(8);
Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
- eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
+ eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
Mono vm = statPluginFilter.filter(exchange, null, null);
return process(exchange, chain, eas, vm);
@@ -82,21 +83,25 @@ public class PreprocessFilter extends FizzWebFilter {
Mono m = ReactorUtils.getInitiateMono();
if (authRes instanceof ApiConfig) {
ApiConfig ac = (ApiConfig) authRes;
- afterAuth(exchange, ac);
+
+ Route route = ac.getRoute(exchange);
+ exchange.getAttributes().put(WebUtils.ROUTE, route);
+
+ afterAuth(exchange, ac, route);
m = executeFixedPluginFilters(exchange);
m = m.defaultIfEmpty(ReactorUtils.NULL);
- if (ac.pluginConfigs == null || ac.pluginConfigs.isEmpty()) {
+ if (route.pluginConfigs == null || route.pluginConfigs.isEmpty()) {
return m.flatMap(func(exchange, chain));
} else {
return m.flatMap(
- nil -> {
- eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
- return FizzPluginFilterChain.next(exchange);
- }
+ nil -> {
+ eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
+ return FizzPluginFilterChain.next(exchange);
+ }
);
}
} else if (authRes == ApiConfigService.Access.YES) {
- afterAuth(exchange, null);
+ afterAuth(exchange, null, null);
m = executeFixedPluginFilters(exchange);
return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain));
} else {
@@ -113,7 +118,7 @@ public class PreprocessFilter extends FizzWebFilter {
);
}
- private void afterAuth(ServerWebExchange exchange, ApiConfig ac) {
+ private void afterAuth(ServerWebExchange exchange, ApiConfig ac, Route route) {
String bs = null, bp = null;
if (ac == null) {
bs = WebUtils.getClientService(exchange);
@@ -125,6 +130,7 @@ public class PreprocessFilter extends FizzWebFilter {
}
if (ac.type != ApiConfig.Type.DUBBO) {
bp = ac.transform(WebUtils.getClientReqPath(exchange));
+ route.backendPath = bp;
}
}
}
diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java
index 36be31b..e2a2a3c 100644
--- a/fizz-core/src/main/java/we/filter/RouteFilter.java
+++ b/fizz-core/src/main/java/we/filter/RouteFilter.java
@@ -23,6 +23,7 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@@ -32,11 +33,13 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
+import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
+import we.proxy.Route;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.*;
@@ -65,6 +68,9 @@ public class RouteFilter extends FizzWebFilter {
@Resource
private ApacheDubboGenericService dubboGenericService;
+ @Resource
+ private SystemConfig systemConfig;
+
@Override
public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -75,7 +81,7 @@ public class RouteFilter extends FizzWebFilter {
Mono resp = WebUtils.getDirectResponse(exchange);
if (resp == null) { // should not reach here
ServerHttpRequest clientReq = exchange.getRequest();
- String rid = clientReq.getId();
+ String rid = WebUtils.getTraceId(exchange);
String msg = pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, rid);
@@ -92,32 +98,36 @@ public class RouteFilter extends FizzWebFilter {
private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest req = exchange.getRequest();
- String rid = req.getId();
- ApiConfig ac = WebUtils.getApiConfig(exchange);
+ String rid = WebUtils.getTraceId(exchange);
+
+ // ApiConfig ac = WebUtils.getApiConfig(exchange);
+ Route route = WebUtils.getRoute(exchange);
+
HttpHeaders hdrs = null;
- if (ac.type != ApiConfig.Type.DUBBO) {
+
+ if (route.type != ApiConfig.Type.DUBBO) {
hdrs = WebUtils.mergeAppendHeaders(exchange);
}
- if (ac == null) {
+ if (route == null) {
String pathQuery = WebUtils.getClientReqPathQuery(exchange);
- return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs);
+ return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs);
- } else if (ac.type == ApiConfig.Type.SERVICE_DISCOVERY) {
- String pathQuery = WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange);
- return send(exchange, WebUtils.getBackendService(exchange), pathQuery, hdrs);
+ } else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) {
+ String pathQuery = route.getBackendPathQuery();
+ return send(exchange, route.method, route.backendService, pathQuery, hdrs);
- } else if (ac.type == ApiConfig.Type.REVERSE_PROXY) {
- String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
- .append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
+ } else if (route.type == ApiConfig.Type.REVERSE_PROXY) {
+ String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
+ .append(route.getBackendPathQuery())
.toString();
- return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
+ return fizzWebClient.send(rid, route.method, uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
- } else if (ac.type == ApiConfig.Type.DUBBO) {
- return dubboRpc(exchange, ac);
+ } else if (route.type == ApiConfig.Type.DUBBO) {
+ return dubboRpc(exchange, route);
} else {
- String err = "cant handle api config type " + ac.type;
+ String err = "cant handle api config type " + route.type;
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.append(Constants.Symbol.LF).append(err).toString(), LogService.BIZ_ID, rid);
@@ -125,9 +135,9 @@ public class RouteFilter extends FizzWebFilter {
}
}
- private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
+ private Mono send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs) {
ServerHttpRequest clientReq = exchange.getRequest();
- return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
+ return fizzWebClient.send2service(WebUtils.getTraceId(exchange), method, service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
}
private Function> genServerResponse(ServerWebExchange exchange) {
@@ -141,9 +151,9 @@ public class RouteFilter extends FizzWebFilter {
String k = h.getKey();
if (clientRespHeaders.containsKey(k)) {
if (k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN) || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)
- || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
- || k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
- || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
+ || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
+ || k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
+ || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
} else {
clientRespHeaders.put(k, h.getValue());
}
@@ -154,7 +164,7 @@ public class RouteFilter extends FizzWebFilter {
);
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
- String rid = exchange.getRequest().getId();
+ String rid = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(rid, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, rid);
}
@@ -164,12 +174,12 @@ public class RouteFilter extends FizzWebFilter {
}
private void cleanup(ClientResponse clientResponse) {
- if (clientResponse != null) {
- clientResponse.bodyToMono(Void.class).subscribe();
- }
- }
+ if (clientResponse != null) {
+ clientResponse.bodyToMono(Void.class).subscribe();
+ }
+ }
- private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
+ private Mono dubboRpc(ServerWebExchange exchange, Route route) {
final String[] ls = {null};
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
@@ -191,18 +201,18 @@ public class RouteFilter extends FizzWebFilter {
}
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
- declaration.setServiceName(ac.backendService);
- declaration.setVersion(ac.rpcVersion);
- declaration.setGroup(ac.rpcGroup);
- declaration.setMethod(ac.rpcMethod);
- declaration.setParameterTypes(ac.rpcParamTypes);
+ declaration.setServiceName(route.backendService);
+ declaration.setVersion(route.rpcVersion);
+ declaration.setGroup(route.rpcGroup);
+ declaration.setMethod(route.rpcMethod);
+ declaration.setParameterTypes(route.rpcParamTypes);
int t = 20_000;
- if (ac.timeout != 0) {
- t = (int) ac.timeout;
+ if (route.timeout != 0) {
+ t = (int) route.timeout;
}
declaration.setTimeout(t);
- Map attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
+ Map attachments = Collections.singletonMap(systemConfig.fizzTraceIdHeader(), WebUtils.getTraceId(exchange));
return dubboGenericService.send(parameters, declaration, attachments);
}
)
@@ -219,7 +229,7 @@ public class RouteFilter extends FizzWebFilter {
if (ls[0] != null) {
b.append('\n').append(ls[0]);
}
- log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
+ log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
}
)
;
diff --git a/fizz-core/src/main/java/we/fizz/AggregateService.java b/fizz-core/src/main/java/we/fizz/AggregateService.java
index c664c14..4d916d2 100644
--- a/fizz-core/src/main/java/we/fizz/AggregateService.java
+++ b/fizz-core/src/main/java/we/fizz/AggregateService.java
@@ -29,6 +29,7 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
+import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
@@ -54,6 +55,9 @@ public class AggregateService {
@Resource
private ConfigLoader aggregateResourceLoader;
+ @Resource
+ private SystemConfig systemConfig;
+
public Mono request(String traceId, String clientReqPathPrefix, String method, String service, String path, MultiValueMap queryParams,
HttpHeaders headers, String body) {
@@ -114,9 +118,9 @@ public class AggregateService {
// defalut content-type
clientResp.getHeaders().add("Content-Type", "application/json; charset=UTF-8");
}
- List headerTraceIds = clientResp.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
+ List headerTraceIds = clientResp.getHeaders().get(systemConfig.fizzTraceIdHeader());
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
- clientResp.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
+ clientResp.getHeaders().add(systemConfig.fizzTraceIdHeader(), traceId);
}
// long end = System.currentTimeMillis();
// pipeline.getStepContext().addElapsedTime("总耗时", end - start);
diff --git a/fizz-core/src/main/java/we/fizz/function/ListFunc.java b/fizz-core/src/main/java/we/fizz/function/ListFunc.java
index 13d2163..cd7aa07 100644
--- a/fizz-core/src/main/java/we/fizz/function/ListFunc.java
+++ b/fizz-core/src/main/java/we/fizz/function/ListFunc.java
@@ -57,6 +57,7 @@ public class ListFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "list.expand", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.merge", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.extract", this);
+ FuncExecutor.register(NAME_SPACE_PREFIX + "list.join", this);
}
/**
@@ -108,7 +109,7 @@ public class ListFunc implements IFunc {
if (data == null || data.size() == 0) {
return result;
}
- if (fields.length == 0) {
+ if (fields == null || fields.length == 0) {
return data;
}
for (Map m : data) {
@@ -121,4 +122,44 @@ public class ListFunc implements IFunc {
return result;
}
+ /**
+ * Merge fields of source list to destination list join by the join field
+ *
+ * @param dest destination list
+ * @param src source list
+ * @param joinField join field
+ * @param fields fields which will be merge to destination list, all fields
+ * will be merged if it is null
+ * @return
+ */
+ public List