Merge pull request #322 from wehotel/develop

This commit is contained in:
hongqiaowei
2021-09-27 16:38:16 +08:00
committed by GitHub
46 changed files with 1150 additions and 345 deletions

View File

@@ -12,13 +12,13 @@
<groupId>com.fizzgate</groupId>
<artifactId>fizz-bootstrap</artifactId>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<properties>
<java.version>1.8</java.version>
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
<reactor-bom.version>Dysprosium-SR22</reactor-bom.version>
<reactor-bom.version>Dysprosium-SR23</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<netty.version>4.1.68.Final</netty.version>
<httpcore.version>4.4.14</httpcore.version>

View File

@@ -105,4 +105,9 @@ refresh-local-cache:
fizz:
aggregate:
writeMapNullValue: false
writeMapNullValue: false
fizz-trace-id:
header: X-Trace-Id # default
value-strategy: requestId # default, or can be uuid
value-prefix: fizz # default

View File

@@ -20,6 +20,5 @@
<!-- suppress the warn 'No URLs will be polled as dynamic configuration sources.' -->
<logger name="com.netflix.config.sources.URLConfigurationSource" level="ERROR"/>
<Logger name="we" level="info"/>
</Loggers>
</Configuration>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -48,6 +48,10 @@ public class FizzServerHttpRequestDecorator extends ServerHttpRequestDecorator {
return headers;
}
public void setEmptyBody() {
this.body = Flux.empty();
}
public void setBody(DataBuffer body) {
if (body instanceof PooledDataBuffer) {
byte[] bytes = new byte[body.readableByteCount()];

View File

@@ -0,0 +1,69 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.spring.http.server.reactive.ext;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import reactor.core.publisher.Mono;
import we.util.NettyDataBufferUtils;
/**
* @author hongqiaowei
*/
public abstract class FizzServerHttpResponseDecorator extends ServerHttpResponseDecorator {
public FizzServerHttpResponseDecorator(ServerHttpResponse delegate) {
super(delegate);
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> bodyPublisher) {
return
NettyDataBufferUtils.join(bodyPublisher).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
body -> {
DataBuffer b = null;
if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
if (body instanceof PooledDataBuffer) {
try {
b = NettyDataBufferUtils.from(body.asByteBuffer());
} finally {
NettyDataBufferUtils.release(body);
}
} else {
b = body;
}
}
Publisher<? extends DataBuffer> r = writeWith(b);
return super.writeWith(r);
}
);
}
/**
* You can getDelegate().getHeaders().set("h", "v") in the method and others for response.
* @param remoteResponseBody
* @return the real http response body to client, or Mono.empty() if response without body
*/
public abstract Publisher<? extends DataBuffer> writeWith(DataBuffer remoteResponseBody);
}

View File

@@ -0,0 +1,150 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.spring.web.server.ext;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
import reactor.core.publisher.Mono;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.util.Constants;
import we.util.NettyDataBufferUtils;
import we.util.ThreadContext;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author hongqiaowei
*/
public class FizzServerWebExchangeDecorator extends ServerWebExchangeDecorator {
private static final MultiValueMap<String, String> EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
private static final Mono<MultiValueMap<String, String>> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache();
public FizzServerWebExchangeDecorator(ServerWebExchange delegate) {
super(delegate);
}
private Charset getMediaTypeCharset(@Nullable MediaType mediaType) {
if (mediaType != null && mediaType.getCharset() != null) {
return mediaType.getCharset();
} else {
return StandardCharsets.UTF_8;
}
}
private MultiValueMap<String, String> parseFormData(Charset charset, String source) {
String[] pairs = StringUtils.tokenizeToStringArray(source, "&");
MultiValueMap<String, String> result = new LinkedMultiValueMap<>(pairs.length);
try {
for (String pair : pairs) {
int idx = pair.indexOf('=');
if (idx == -1) {
result.add(URLDecoder.decode(pair, charset.name()), null);
} else {
String name = URLDecoder.decode(pair.substring(0, idx), charset.name());
String value = URLDecoder.decode(pair.substring(idx + 1), charset.name());
result.add(name, value);
}
}
} catch (UnsupportedEncodingException ex) {
throw new IllegalStateException(ex);
}
return result;
}
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
ServerHttpRequest req = getDelegate().getRequest();
MediaType ct = req.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(ct)) {
Charset charset = getMediaTypeCharset(ct);
return
req.getBody().defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.single()
.map(
body -> {
if (body == NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
return EMPTY_FORM_DATA;
} else {
CharBuffer charBuffer = charset.decode(body.asByteBuffer());
return parseFormData(charset, charBuffer.toString());
}
}
);
} else {
return EMPTY_FORM_DATA_MONO;
}
}
/**
* @param dataMap can be {@link org.springframework.util.LinkedMultiValueMap}
*/
public void setFormData(MultiValueMap<String, String> dataMap) {
FizzServerHttpRequestDecorator req = (FizzServerHttpRequestDecorator) getDelegate().getRequest();
req.getHeaders().setContentType(MediaType.APPLICATION_FORM_URLENCODED);
if (CollectionUtils.isEmpty(dataMap)) {
req.setEmptyBody();
return;
}
StringBuilder b = ThreadContext.getStringBuilder();
Set<Map.Entry<String, List<String>>> fieldValuesEntries = dataMap.entrySet();
int fs = fieldValuesEntries.size(), cnt = 0;
try {
for (Map.Entry<String, List<String>> fieldValuesEntry : fieldValuesEntries) {
String field = fieldValuesEntry.getKey();
List<String> values = fieldValuesEntry.getValue();
if (CollectionUtils.isEmpty(values)) {
b.append(URLEncoder.encode(field, Constants.Charset.UTF8));
} else {
int vs = values.size();
for (int i = 0; i < vs; ) {
b.append(URLEncoder.encode(field, Constants.Charset.UTF8))
.append('=')
.append(URLEncoder.encode(values.get(i), Constants.Charset.UTF8));
if ((++i) != vs) {
b.append('&');
}
}
}
if ((++cnt) != fs) {
b.append('&');
}
}
} catch (UnsupportedEncodingException ex) {
throw new IllegalStateException(ex);
}
req.setBody(b.toString());
}
}

View File

@@ -59,7 +59,7 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
return dataBufferFactory.wrap(byteBuf);
}
public static boolean release(@Nullable String reqId, @Nullable DataBuffer dataBuffer) {
public static boolean release(@Nullable String traceId, @Nullable DataBuffer dataBuffer) {
if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) {
@@ -69,7 +69,7 @@ public abstract class NettyDataBufferUtils extends org.springframework.core.io.b
int refCnt = nativeBuffer.refCnt();
if (refCnt < 1) {
if (log.isDebugEnabled()) {
log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, reqId);
log.debug(nativeBuffer + " ref cnt is " + refCnt, LogService.BIZ_ID, traceId);
}
return false;
}

View File

@@ -47,8 +47,12 @@ public class NetworkUtils {
public static String getServerIp() {
try {
if (serverIp == null) {
serverIp = System.getenv(SERVER_IP);
log.info("SERVER_IP is " + serverIp);
serverIp = System.getProperty(SERVER_IP);
log.info("JVM env SERVER_IP is " + serverIp);
if (StringUtils.isBlank(serverIp)) {
serverIp = System.getenv(SERVER_IP);
log.info("System env SERVER_IP is " + serverIp);
}
if (StringUtils.isBlank(serverIp)) {
boolean found = false;
Enumeration<NetworkInterface> nis = null;

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -57,6 +57,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -145,6 +150,12 @@
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>

View File

@@ -23,13 +23,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;

View File

@@ -0,0 +1,104 @@
package we.config;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.event.ApplicationStartingEvent;
import org.springframework.boot.context.event.SpringApplicationEvent;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import we.log.LogProperties;
import we.log.LogSendAppenderWithLogback;
@Configuration
public class LogConfig {
@Bean
@ConfigurationProperties("fizz.logging")
public LogProperties logProperties() {
return new LogProperties();
}
@Configuration
@ConditionalOnClass(AbstractAppender.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLog4j2Config {
}
@Configuration
@ConditionalOnClass(LoggerContext.class)
@AutoConfigureAfter(AggregateRedisConfig.class)
public static class CustomLogbackConfig {
@Bean
public Object initLogSendAppenderWithLogback(LogProperties logProperties) {
return new LoggingConfigurationApplicationListener(logProperties);
}
}
public static class LoggingConfigurationApplicationListener {
private static final Logger logger = LoggerFactory.getLogger(LoggingConfigurationApplicationListener.class);
private static final String APPENDER_NAME = "fizzLogSendAppender";
private static final String LAYOUT = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %level %logger{36} - %msg%n";
private LogProperties logProperties;
public LoggingConfigurationApplicationListener() {
}
public LoggingConfigurationApplicationListener(LogProperties logProperties) {
this.logProperties = logProperties;
}
@EventListener
public void contextRefreshed(SpringApplicationEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationStarting(ApplicationStartingEvent event) {
onApplicationEvent(event);
}
@EventListener
public void applicationReady(ApplicationReadyEvent event) {
onApplicationEvent(event);
}
public void onApplicationEvent(ApplicationEvent event) {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
final ch.qos.logback.classic.Logger root = context.getLogger(Logger.ROOT_LOGGER_NAME);
String layoutConfig = StringUtils.isBlank(logProperties.getLayout()) ? LAYOUT : logProperties.getLayout();
final LogSendAppenderWithLogback newAppender = new LogSendAppenderWithLogback();
newAppender.setName(APPENDER_NAME);
newAppender.setContext(context);
PatternLayout layout = new PatternLayout();
layout.setPattern(layoutConfig);
newAppender.setLayout(layout);
Appender<ILoggingEvent> appender = root.getAppender(APPENDER_NAME);
if (appender == null) {
newAppender.start();
root.addAppender(newAppender);
logger.info("Added fizz log send appender:{}", APPENDER_NAME);
} else {
newAppender.start();
root.detachAppender(APPENDER_NAME);
root.addAppender(newAppender);
logger.info("Refresh fizz log send appender:{}", APPENDER_NAME);
}
}
}
}

View File

@@ -21,8 +21,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.resources.LoopResources;
import we.util.JacksonUtils;
/**
* @author hongqiaowei

View File

@@ -69,6 +69,27 @@ public class SystemConfig {
@Value("${route-timeout:0}")
private long routeTimeout = 0;
@Value("${fizz-trace-id.header:X-Trace-Id}")
private String fizzTraceIdHeader;
@Value("${fizz-trace-id.value-strategy:requestId}")
private String fizzTraceIdValueStrategy;
@Value("${fizz-trace-id.value-prefix:fizz}")
private String fizzTraceIdValuePrefix;
public String fizzTraceIdHeader() {
return fizzTraceIdHeader;
}
public String fizzTraceIdValueStrategy() {
return fizzTraceIdValueStrategy;
}
public String fizzTraceIdValuePrefix() {
return fizzTraceIdValuePrefix;
}
public long getRouteTimeout() {
return routeTimeout;
}
@@ -179,8 +200,8 @@ public class SystemConfig {
@PostConstruct
public void afterPropertiesSet() {
afterLogResponseBodySet();
afterLogHeadersSet();
// afterLogResponseBodySet();
// afterLogHeadersSet();
}
private void afterLogResponseBodySet() {
@@ -193,6 +214,9 @@ public class SystemConfig {
Arrays.stream(StringUtils.split(logHeaders, Constants.Symbol.COMMA)).forEach(h -> {
logHeaderSet.add(h);
});
if (!fizzTraceIdHeader.equals("X-Trace-Id")) {
logHeaderSet.add(fizzTraceIdHeader);
}
WebUtils.LOG_HEADER_SET = logHeaderSet;
log.info("log header list: " + logHeaderSet.toString());
}
@@ -216,7 +240,7 @@ public class SystemConfig {
this.updateLogResponseBody(logResponseBody);
}
@Value("${log.headers:x}")
@Value("${log.headers:}")
public void setLogHeaders(String logHeaders) {
if (ObjectUtils.nullSafeEquals(this.logHeaders, logHeaders)) {
return;

View File

@@ -138,11 +138,13 @@ public class AggregateFilter implements WebFilter {
}
// traceId
/*
String tmpTraceId = CommonConstants.TRACE_ID_PREFIX + exchange.getRequest().getId();
if (StringUtils.isNotBlank(request.getHeaders().getFirst(CommonConstants.HEADER_TRACE_ID))) {
tmpTraceId = request.getHeaders().getFirst(CommonConstants.HEADER_TRACE_ID);
}
final String traceId = tmpTraceId;
*/
final String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
LOGGER.debug("matched aggregation api: {}", path);
@@ -207,9 +209,9 @@ public class AggregateFilter implements WebFilter {
// default content-type
serverHttpResponse.getHeaders().add(CommonConstants.HEADER_CONTENT_TYPE, CommonConstants.CONTENT_TYPE_JSON);
}
List<String> headerTraceIds = serverHttpResponse.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
List<String> headerTraceIds = serverHttpResponse.getHeaders().get(systemConfig.fizzTraceIdHeader());
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
serverHttpResponse.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
serverHttpResponse.getHeaders().add(systemConfig.fizzTraceIdHeader(), traceId);
}
long end = System.currentTimeMillis();

View File

@@ -172,7 +172,7 @@ public class CallbackFilter extends FizzWebFilter {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(Constants.Symbol.LEFT_BRACE);
b.append(_id); toJsonStringValue(b, req.getId()); b.append(Constants.Symbol.COMMA);
b.append(_id); toJsonStringValue(b, WebUtils.getTraceId(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_datetime); b.append(System.currentTimeMillis()); b.append(Constants.Symbol.COMMA);
b.append(_origin); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Constants.Symbol.COMMA);

View File

@@ -68,30 +68,29 @@ public class FilterExceptionHandlerConfig {
return Mono.empty();
}
}
String rid = WebUtils.getTraceId(exchange);
if (t instanceof ExecuteScriptException) {
ExecuteScriptException ex = (ExecuteScriptException) t;
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
String reqId = exchange.getRequest().getId();
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId, ex.getStepContext());
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid, ex.getStepContext());
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(JacksonUtils.writeValueAsString(rs).getBytes())));
} else {
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId);
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
if (t instanceof FizzRuntimeException) {
FizzRuntimeException ex = (FizzRuntimeException) t;
log.error(ex.getMessage(), LogService.BIZ_ID, exchange.getRequest().getId(), ex);
log.error(ex.getMessage(), LogService.BIZ_ID, rid, ex);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
RespEntity rs = null;
String reqId = exchange.getRequest().getId();
if (ex.getStepContext() != null && ex.getStepContext().returnContext()) {
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId, ex.getStepContext());
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid, ex.getStepContext());
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(JacksonUtils.writeValueAsString(rs).getBytes())));
} else {
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), reqId);
rs = new RespEntity(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
return resp.writeWith(Mono.just(resp.bufferFactory().wrap(rs.toString().getBytes())));
}
}
@@ -100,8 +99,8 @@ public class FilterExceptionHandlerConfig {
if (fc == null) { // t came from flow control filter
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), exchange.getRequest().getId());
log.error(b.toString(), LogService.BIZ_ID, rid, t);
String s = RespEntity.toJson(HttpStatus.INTERNAL_SERVER_ERROR.value(), t.getMessage(), rid);
resp.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
vm = resp.writeWith(Mono.just(resp.bufferFactory().wrap(s.getBytes())));
} else {

View File

@@ -55,7 +55,7 @@ public class FizzLogFilter implements WebFilter {
WebUtils.request2stringBuilder(exchange, b);
b.append(resp).append(exchange.getResponse().getStatusCode())
.append(in) .append(System.currentTimeMillis() - startTime);
LOGGER.info(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId());
LOGGER.info(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
}
}
);

View File

@@ -17,11 +17,6 @@
package we.filter;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +28,6 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import we.config.SystemConfig;
@@ -47,10 +41,11 @@ import we.stats.IncrRequestResult;
import we.stats.ResourceConfig;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import we.util.Constants;
import we.util.JacksonUtils;
import we.util.ThreadContext;
import we.util.WebUtils;
import we.util.*;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @author hongqiaowei
@@ -64,11 +59,15 @@ public class FlowControlFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(FlowControlFilter.class);
private static final String admin = "admin";
private static final String admin = "admin";
private static final String actuator = "actuator";
private static final String actuator = "actuator";
public static final String ADMIN_REQUEST = "$a";
private static final String uuid = "uuid";
private static final String defaultFizzTraceIdValueStrategy = "requestId";
public static final String ADMIN_REQUEST = "$a";
@Resource
private FlowControlFilterProperties flowControlFilterProperties;
@@ -85,6 +84,9 @@ public class FlowControlFilter extends FizzWebFilter {
@Resource
private AppService appService;
@Resource
private SystemConfig systemConfig;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -111,10 +113,6 @@ public class FlowControlFilter extends FizzWebFilter {
return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
}
String app = WebUtils.getAppId(exchange);
// if (app != null && !appService.getAppMap().containsKey(app)) {
// String json = RespEntity.toJson(HttpStatus.FORBIDDEN.value(), "no app " + app, exchange.getRequest().getId());
// return WebUtils.buildJsonDirectResponse(exchange, HttpStatus.FORBIDDEN, null, json);
// }
path = WebUtils.getClientReqPath(exchange);
String ip = WebUtils.getOriginIp(exchange);
@@ -151,6 +149,7 @@ public class FlowControlFilter extends FizzWebFilter {
} else {
long start = System.currentTimeMillis();
setTraceId(exchange);
return chain.filter(exchange).doFinally(s -> {
long rt = System.currentTimeMillis() - start;
if (s == SignalType.ON_ERROR || exchange.getResponse().getStatusCode().is5xxServerError()) {
@@ -162,9 +161,29 @@ public class FlowControlFilter extends FizzWebFilter {
}
}
setTraceId(exchange);
return chain.filter(exchange);
}
private void setTraceId(ServerWebExchange exchange) {
String traceId = exchange.getRequest().getHeaders().getFirst(systemConfig.fizzTraceIdHeader());
if (StringUtils.isBlank(traceId)) {
String fizzTraceIdValueStrategy = systemConfig.fizzTraceIdValueStrategy();
if (fizzTraceIdValueStrategy.equals(defaultFizzTraceIdValueStrategy)) {
traceId = exchange.getRequest().getId();
} else if (fizzTraceIdValueStrategy.equals(uuid)) {
traceId = UUIDUtil.getUUID();
} else {
throw Utils.runtimeExceptionWithoutStack("unsupported " + fizzTraceIdValueStrategy + " trace id value strategy!");
}
}
String fizzTraceIdValuePrefix = systemConfig.fizzTraceIdValuePrefix();
if (StringUtils.isNotBlank(fizzTraceIdValuePrefix)) {
traceId = fizzTraceIdValuePrefix + Constants.Symbol.DASH + traceId;
}
exchange.getAttributes().put(WebUtils.TRACE_ID, traceId);
}
private List<ResourceConfig> getResourceConfigItselfAndParents(ResourceConfig rc, List<ResourceConfig> rcs) {
boolean check = false;
String rcId = rc.getResourceId();

View File

@@ -32,6 +32,7 @@ import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AuthPluginFilter;
import we.plugin.stat.StatPluginFilter;
import we.proxy.Route;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -67,7 +68,7 @@ public class PreprocessFilter extends FizzWebFilter {
Map<String, FilterResult> fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr);
Map<String, String> appendHdrs = new HashMap<>(8);
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
Mono vm = statPluginFilter.filter(exchange, null, null);
return process(exchange, chain, eas, vm);
@@ -82,21 +83,25 @@ public class PreprocessFilter extends FizzWebFilter {
Mono m = ReactorUtils.getInitiateMono();
if (authRes instanceof ApiConfig) {
ApiConfig ac = (ApiConfig) authRes;
afterAuth(exchange, ac);
Route route = ac.getRoute(exchange);
exchange.getAttributes().put(WebUtils.ROUTE, route);
afterAuth(exchange, ac, route);
m = executeFixedPluginFilters(exchange);
m = m.defaultIfEmpty(ReactorUtils.NULL);
if (ac.pluginConfigs == null || ac.pluginConfigs.isEmpty()) {
if (route.pluginConfigs == null || route.pluginConfigs.isEmpty()) {
return m.flatMap(func(exchange, chain));
} else {
return m.flatMap(
nil -> {
eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
return FizzPluginFilterChain.next(exchange);
}
nil -> {
eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
return FizzPluginFilterChain.next(exchange);
}
);
}
} else if (authRes == ApiConfigService.Access.YES) {
afterAuth(exchange, null);
afterAuth(exchange, null, null);
m = executeFixedPluginFilters(exchange);
return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain));
} else {
@@ -113,7 +118,7 @@ public class PreprocessFilter extends FizzWebFilter {
);
}
private void afterAuth(ServerWebExchange exchange, ApiConfig ac) {
private void afterAuth(ServerWebExchange exchange, ApiConfig ac, Route route) {
String bs = null, bp = null;
if (ac == null) {
bs = WebUtils.getClientService(exchange);
@@ -125,6 +130,7 @@ public class PreprocessFilter extends FizzWebFilter {
}
if (ac.type != ApiConfig.Type.DUBBO) {
bp = ac.transform(WebUtils.getClientReqPath(exchange));
route.backendPath = bp;
}
}
}

View File

@@ -23,6 +23,7 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@@ -32,11 +33,13 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.Route;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.*;
@@ -65,6 +68,9 @@ public class RouteFilter extends FizzWebFilter {
@Resource
private ApacheDubboGenericService dubboGenericService;
@Resource
private SystemConfig systemConfig;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -75,7 +81,7 @@ public class RouteFilter extends FizzWebFilter {
Mono<Void> resp = WebUtils.getDirectResponse(exchange);
if (resp == null) { // should not reach here
ServerHttpRequest clientReq = exchange.getRequest();
String rid = clientReq.getId();
String rid = WebUtils.getTraceId(exchange);
String msg = pfr.id + " fail";
if (pfr.cause == null) {
log.error(msg, LogService.BIZ_ID, rid);
@@ -92,32 +98,36 @@ public class RouteFilter extends FizzWebFilter {
private Mono<Void> doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest req = exchange.getRequest();
String rid = req.getId();
ApiConfig ac = WebUtils.getApiConfig(exchange);
String rid = WebUtils.getTraceId(exchange);
// ApiConfig ac = WebUtils.getApiConfig(exchange);
Route route = WebUtils.getRoute(exchange);
HttpHeaders hdrs = null;
if (ac.type != ApiConfig.Type.DUBBO) {
if (route.type != ApiConfig.Type.DUBBO) {
hdrs = WebUtils.mergeAppendHeaders(exchange);
}
if (ac == null) {
if (route == null) {
String pathQuery = WebUtils.getClientReqPathQuery(exchange);
return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs);
return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs);
} else if (ac.type == ApiConfig.Type.SERVICE_DISCOVERY) {
String pathQuery = WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange);
return send(exchange, WebUtils.getBackendService(exchange), pathQuery, hdrs);
} else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) {
String pathQuery = route.getBackendPathQuery();
return send(exchange, route.method, route.backendService, pathQuery, hdrs);
} else if (ac.type == ApiConfig.Type.REVERSE_PROXY) {
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
} else if (route.type == ApiConfig.Type.REVERSE_PROXY) {
String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
.append(route.getBackendPathQuery())
.toString();
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send(rid, route.method, uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
} else if (ac.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, ac);
} else if (route.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, route);
} else {
String err = "cant handle api config type " + ac.type;
String err = "cant handle api config type " + route.type;
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.append(Constants.Symbol.LF).append(err).toString(), LogService.BIZ_ID, rid);
@@ -125,9 +135,9 @@ public class RouteFilter extends FizzWebFilter {
}
}
private Mono<Void> send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
private Mono<Void> send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs) {
ServerHttpRequest clientReq = exchange.getRequest();
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send2service(WebUtils.getTraceId(exchange), method, service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
}
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
@@ -141,9 +151,9 @@ public class RouteFilter extends FizzWebFilter {
String k = h.getKey();
if (clientRespHeaders.containsKey(k)) {
if (k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN) || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
} else {
clientRespHeaders.put(k, h.getValue());
}
@@ -154,7 +164,7 @@ public class RouteFilter extends FizzWebFilter {
);
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
String rid = exchange.getRequest().getId();
String rid = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(rid, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, rid);
}
@@ -164,12 +174,12 @@ public class RouteFilter extends FizzWebFilter {
}
private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
private Mono<Void> dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
private Mono<Void> dubboRpc(ServerWebExchange exchange, Route route) {
final String[] ls = {null};
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
@@ -191,18 +201,18 @@ public class RouteFilter extends FizzWebFilter {
}
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
declaration.setServiceName(ac.backendService);
declaration.setVersion(ac.rpcVersion);
declaration.setGroup(ac.rpcGroup);
declaration.setMethod(ac.rpcMethod);
declaration.setParameterTypes(ac.rpcParamTypes);
declaration.setServiceName(route.backendService);
declaration.setVersion(route.rpcVersion);
declaration.setGroup(route.rpcGroup);
declaration.setMethod(route.rpcMethod);
declaration.setParameterTypes(route.rpcParamTypes);
int t = 20_000;
if (ac.timeout != 0) {
t = (int) ac.timeout;
if (route.timeout != 0) {
t = (int) route.timeout;
}
declaration.setTimeout(t);
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
Map<String, String> attachments = Collections.singletonMap(systemConfig.fizzTraceIdHeader(), WebUtils.getTraceId(exchange));
return dubboGenericService.send(parameters, declaration, attachments);
}
)
@@ -219,7 +229,7 @@ public class RouteFilter extends FizzWebFilter {
if (ls[0] != null) {
b.append('\n').append(ls[0]);
}
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
log.error(b.toString(), LogService.BIZ_ID, WebUtils.getTraceId(exchange), t);
}
)
;

View File

@@ -29,6 +29,7 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
@@ -54,6 +55,9 @@ public class AggregateService {
@Resource
private ConfigLoader aggregateResourceLoader;
@Resource
private SystemConfig systemConfig;
public Mono<AggregateResult> request(String traceId, String clientReqPathPrefix, String method, String service, String path, MultiValueMap<String, String> queryParams,
HttpHeaders headers, String body) {
@@ -114,9 +118,9 @@ public class AggregateService {
// defalut content-type
clientResp.getHeaders().add("Content-Type", "application/json; charset=UTF-8");
}
List<String> headerTraceIds = clientResp.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
List<String> headerTraceIds = clientResp.getHeaders().get(systemConfig.fizzTraceIdHeader());
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
clientResp.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
clientResp.getHeaders().add(systemConfig.fizzTraceIdHeader(), traceId);
}
// long end = System.currentTimeMillis();
// pipeline.getStepContext().addElapsedTime("总耗时", end - start);

View File

@@ -57,6 +57,7 @@ public class ListFunc implements IFunc {
FuncExecutor.register(NAME_SPACE_PREFIX + "list.expand", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.merge", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.extract", this);
FuncExecutor.register(NAME_SPACE_PREFIX + "list.join", this);
}
/**
@@ -108,7 +109,7 @@ public class ListFunc implements IFunc {
if (data == null || data.size() == 0) {
return result;
}
if (fields.length == 0) {
if (fields == null || fields.length == 0) {
return data;
}
for (Map<String, Object> m : data) {
@@ -121,4 +122,44 @@ public class ListFunc implements IFunc {
return result;
}
/**
* Merge fields of source list to destination list join by the join field
*
* @param dest destination list
* @param src source list
* @param joinField join field
* @param fields fields which will be merge to destination list, all fields
* will be merged if it is null
* @return
*/
public List<Map<String, Object>> join(List<Map<String, Object>> dest, List<Map<String, Object>> src,
String joinField, String... fields) {
if (dest == null || dest.size() == 0 || src == null || src.size() == 0) {
return dest;
}
Map<String, Map<String, Object>> index = new HashMap<>();
for (Map<String, Object> record : dest) {
if (record.get(joinField) != null) {
index.put(record.get(joinField).toString(), record);
}
}
for (Map<String, Object> m : src) {
if (m.get(joinField) == null) {
continue;
}
Map<String, Object> record = index.get(m.get(joinField).toString());
if (fields == null || fields.length == 0) {
record.putAll(m);
} else {
for (String field : fields) {
record.put(field, m.get(field));
}
}
}
return dest;
}
}

View File

@@ -29,6 +29,8 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.FizzAppContext;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
@@ -79,7 +81,12 @@ public class DubboInput extends RPCInput {
contextAttachment = new HashMap<String, String>(attachments);
}
if (inputContext.getStepContext() != null && inputContext.getStepContext().getTraceId() != null) {
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
if (FizzAppContext.appContext == null) {
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
} else {
SystemConfig systemConfig = FizzAppContext.appContext.getBean(SystemConfig.class);
contextAttachment.put(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId());
}
}
Mono<Object> proxyResponse = proxy.send(body, declaration, contextAttachment);

View File

@@ -24,6 +24,8 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.FizzAppContext;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
@@ -72,7 +74,12 @@ public class GrpcInput extends RPCInput implements IInput {
contextAttachment = new HashMap<String, Object>(attachments);
}
if (inputContext.getStepContext() != null && inputContext.getStepContext().getTraceId() != null) {
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
if (FizzAppContext.appContext == null) {
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
} else {
SystemConfig systemConfig = FizzAppContext.appContext.getBean(SystemConfig.class);
contextAttachment.put(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId());
}
}
Mono<Object> proxyResponse = proxy.send(JSON.toJSONString(body), declaration, contextAttachment);

View File

@@ -360,7 +360,7 @@ public class RequestInput extends RPCInput implements IInput{
}
headers.remove(CommonConstants.HEADER_CONTENT_LENGTH);
headers.add(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
headers.add(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId());
request.put("headers", MapUtil.headerToHashMap(headers));
Object body = null;

View File

@@ -0,0 +1,8 @@
package we.log;
import lombok.Data;
@Data
public class LogProperties {
private String layout;
}

View File

@@ -1,133 +1,10 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.log;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.NetworkUtils;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
/**
* log send appender
*
* @author zhongjie
*/
@Plugin(name = LogSendAppender.PLUGIN_NAME, category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public class LogSendAppender extends AbstractAppender {
static final String PLUGIN_NAME = "LogSend";
public static LogSendService logSendService;
public static Boolean logEnabled;
private static LogSend[] logSends = new LogSend[1000];
private static AtomicInteger logSendIndex = new AtomicInteger(0);
private LogSendAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties) {
super(name, filter, layout, ignoreExceptions, properties);
}
@Override
public void append(LogEvent event) {
if (logEnabled != null && !logEnabled) {
return;
}
if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// local cache
logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(this.getBizId(event.getMessage().getParameters()),
NetworkUtils.getServerIp(), event.getLevel().intLevel(), event.getTimeMillis(), new String(this.getLayout().toByteArray(event)));
return;
}
if (logEnabled == null && logSendService == null) {
// no legal logSendService, discard the local cache
logEnabled = Boolean.FALSE;
logSends = null;
return;
}
if (logEnabled == null) {
logEnabled = Boolean.TRUE;
LogSend[] logSends;
synchronized (LogSendAppender.class) {
logSends = LogSendAppender.logSends;
LogSendAppender.logSends = null;
}
// logSendService is ready, send the local cache
if (logSends != null) {
int size = Math.min(logSendIndex.get(), logSends.length);
for (int i = 0; i < size; i++) {
logSendService.send(logSends[i]);
}
}
}
LogSend logSend = new LogSend(this.getBizId(event.getMessage().getParameters()), NetworkUtils.getServerIp(),
event.getLevel().intLevel(), event.getTimeMillis(), new String(this.getLayout().toByteArray(event)));
logSendService.send(logSend);
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
if (p == LogService.BIZ_ID) {
if (i != parameters.length - 1) {
bizId = parameters[i + 1];
}
break;
}
}
}
if (bizId == null) {
return "";
}
return bizId.toString();
}
@PluginFactory
public static LogSendAppender createAppender(@PluginAttribute("name") String name,
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
if (name == null) {
LOGGER.error("No name provided for LogSendAppender!");
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new LogSendAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY);
}
public class LogSendAppender {
public static volatile LogSendService logSendService;
public static volatile Boolean logEnabled;
public static volatile LogSend[] logSends = new LogSend[1000];
public static volatile AtomicInteger logSendIndex = new AtomicInteger(0);
}

View File

@@ -0,0 +1,126 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.log;
import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.NetworkUtils;
import java.io.Serializable;
import static we.log.LogSendAppender.*;
/**
* log send appender with log4j2
*
* @author zhongjie
*/
@Plugin(name = LogSendAppenderWithLog4j2.PLUGIN_NAME, category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public class LogSendAppenderWithLog4j2 extends AbstractAppender {
static final String PLUGIN_NAME = "LogSend";
private LogSendAppenderWithLog4j2(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties) {
super(name, filter, layout, ignoreExceptions, properties);
}
@Override
public void append(LogEvent event) {
if (logEnabled != null && !logEnabled) {
return;
}
if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// local cache
logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(this.getBizId(event.getMessage().getParameters()),
NetworkUtils.getServerIp(), event.getLevel().intLevel(), event.getTimeMillis(), new String(this.getLayout().toByteArray(event)));
return;
}
if (logEnabled == null && logSendService == null) {
// no legal logSendService, discard the local cache
logEnabled = Boolean.FALSE;
logSends = null;
return;
}
if (logEnabled == null) {
logEnabled = Boolean.TRUE;
LogSend[] logSends;
synchronized (LogSendAppender.class) {
logSends = LogSendAppender.logSends;
LogSendAppender.logSends = null;
}
// logSendService is ready, send the local cache
if (logSends != null) {
int size = Math.min(logSendIndex.get(), logSends.length);
for (int i = 0; i < size; i++) {
logSendService.send(logSends[i]);
}
}
}
LogSend logSend = new LogSend(this.getBizId(event.getMessage().getParameters()), NetworkUtils.getServerIp(),
event.getLevel().intLevel(), event.getTimeMillis(), new String(this.getLayout().toByteArray(event)));
logSendService.send(logSend);
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
if (p == LogService.BIZ_ID) {
if (i != parameters.length - 1) {
bizId = parameters[i + 1];
}
break;
}
}
}
if (bizId == null) {
return "";
}
return bizId.toString();
}
@PluginFactory
public static LogSendAppenderWithLog4j2 createAppender(@PluginAttribute("name") String name,
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
if (name == null) {
LOGGER.error("No name provided for LogSendAppender!");
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new LogSendAppenderWithLog4j2(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY);
}
}

View File

@@ -0,0 +1,94 @@
package we.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LogbackException;
import lombok.Getter;
import lombok.Setter;
import we.FizzAppContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.NetworkUtils;
import static we.log.LogSendAppender.*;
/**
* log send appender with logback
*
* @author huahuang
*/
public class LogSendAppenderWithLogback extends AppenderBase<ILoggingEvent> {
//负责将日志事件转换为字符串需Getter和Setter方法
@Getter
@Setter
private Layout<ILoggingEvent> layout;
@Override
protected void append(ILoggingEvent event) {
try {
if (logEnabled != null && !logEnabled) {
return;
}
if (logEnabled == null && FizzAppContext.appContext == null && logSendService == null) {
// local cache
logSends[logSendIndex.getAndIncrement() % logSends.length] = new LogSend(
this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(), event.getLevel().levelInt,
event.getTimeStamp(), this.getLayout().doLayout(event));
return;
}
if (logEnabled == null && logSendService == null) {
// no legal logSendService, discard the local cache
logEnabled = Boolean.FALSE;
logSends = null;
return;
}
if (logEnabled == null) {
logEnabled = Boolean.TRUE;
LogSend[] logSends;
synchronized (LogSendAppender.class) {
logSends = LogSendAppender.logSends;
LogSendAppender.logSends = null;
}
// logSendService is ready, send the local cache
if (logSends != null) {
int size = Math.min(logSendIndex.get(), logSends.length);
for (int i = 0; i < size; i++) {
logSendService.send(logSends[i]);
}
}
}
LogSend logSend = new LogSend(this.getBizId(event.getArgumentArray()), NetworkUtils.getServerIp(),
event.getLevel().levelInt, event.getTimeStamp(), this.getLayout().doLayout(event));
logSendService.send(logSend);
} catch (Exception ex) {
throw new LogbackException(event.getFormattedMessage(), ex);
}
}
private String getBizId(Object[] parameters) {
Object bizId = LogService.getBizId();
if (parameters != null) {
for (int i = parameters.length - 1; i > -1; --i) {
Object p = parameters[i];
if (p == LogService.BIZ_ID) {
if (i != parameters.length - 1) {
bizId = parameters[i + 1];
}
break;
}
}
}
if (bizId == null) {
return "";
}
return bizId.toString();
}
}

View File

@@ -17,7 +17,7 @@
package we.log;
/**
* log send service interface, used by {@link LogSendAppender} to send log to fizz-manager
* log send service interface, used by {@link LogSendAppenderWithLog4j2} to send log to fizz-manager
*
* @author zhongjie
*/

View File

@@ -52,7 +52,7 @@ public abstract class PluginFilter implements FizzPluginFilter {
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
if (log.isDebugEnabled()) {
log.debug(this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, exchange.getRequest().getId());
log.debug(this + ": " + pfr.id + " execute " + (pfr.success ? "success" : "fail"), LogService.BIZ_ID, WebUtils.getTraceId(exchange));
}
if (pfr.success) {
return doFilter(exchange, config, fixedConfig);

View File

@@ -21,12 +21,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import we.plugin.PluginConfig;
import we.proxy.Route;
import we.util.JacksonUtils;
import we.util.UrlTransformUtils;
import we.util.WebUtils;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -65,7 +70,7 @@ public class ApiConfig {
public Set<String> gatewayGroups = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet());
public String service;
public String service; // a
public String backendService;
@@ -175,8 +180,8 @@ public class ApiConfig {
i = Math.abs(i);
}
return httpHostPorts.get(
i % httpHostPorts.size()
);
i % httpHostPorts.size()
);
}
public String transform(String reqPath) {
@@ -200,6 +205,27 @@ public class ApiConfig {
return false;
}
public Route getRoute(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
Route r = new Route().type(this.type)
.method(request.getMethod())
.backendService(this.backendService)
.backendPath(this.backendPath)
.query(WebUtils.getClientReqQuery(exchange))
.pluginConfigs(this.pluginConfigs)
.rpcMethod(this.rpcMethod)
.rpcParamTypes(this.rpcParamTypes)
.rpcGroup(this.rpcGroup)
.rpcVersion(this.rpcVersion)
.timeout(this.timeout);
if (this.type == Type.REVERSE_PROXY) {
r = r.nextHttpHostPort(getNextHttpHostPort());
}
return r;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);

View File

@@ -289,7 +289,7 @@ public class ApiConfigService {
public Mono<Object> canAccess(ServerWebExchange exchange) {
ServerHttpRequest req = exchange.getRequest();
HttpHeaders hdrs = req.getHeaders();
LogService.setBizId(req.getId());
LogService.setBizId(WebUtils.getTraceId(exchange));
return canAccess(exchange, WebUtils.getAppId(exchange), WebUtils.getOriginIp(exchange), getTimestamp(hdrs), getSign(hdrs),
WebUtils.getClientService(exchange), req.getMethod(), WebUtils.getClientReqPath(exchange));
}

View File

@@ -51,7 +51,7 @@ public class AuthPluginFilter extends PluginFilter {
return apiConfigService.canAccess(exchange).flatMap(
r -> {
if (log.isDebugEnabled()) {
log.debug("req auth: " + r, LogService.BIZ_ID, exchange.getRequest().getId());
log.debug("req auth: " + r, LogService.BIZ_ID, WebUtils.getTraceId(exchange));
}
Map<String, Object> data = Collections.singletonMap(RESULT, r);
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, AUTH_PLUGIN_FILTER, data);

View File

@@ -19,12 +19,13 @@ package we.plugin.auth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Constants;
import we.util.JacksonUtils;
import we.util.NetworkUtils;
@@ -60,6 +61,9 @@ public class GatewayGroupService {
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
@Resource
private Environment environment;
@PostConstruct
public void init() throws Throwable {
this.init(this::lsnGatewayGroupChange);
@@ -181,10 +185,11 @@ public class GatewayGroupService {
private void updateCurrentGatewayGroupSet(Set<String> currentGatewayGroupSet, Map<String,
GatewayGroup> gatewayGroupMap) {
String ip = NetworkUtils.getServerIp();
String applicationName = environment.getProperty("spring.application.name");
currentGatewayGroupSet.clear();
gatewayGroupMap.forEach(
(k, gg) -> {
if (gg.gateways.contains(ip)) {
if (gg.gateways.contains(ip) || gg.gateways.contains(applicationName)) {
currentGatewayGroupSet.add(gg.group);
}
}

View File

@@ -19,6 +19,8 @@ package we.plugin.requestbody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
@@ -27,7 +29,9 @@ import we.flume.clients.log4j2appender.LogService;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.spring.http.server.reactive.ext.FizzServerHttpRequestDecorator;
import we.spring.web.server.ext.FizzServerWebExchangeDecorator;
import we.util.NettyDataBufferUtils;
import we.util.WebUtils;
import java.util.Map;
@@ -50,18 +54,24 @@ public class RequestBodyPlugin implements FizzPluginFilter {
NettyDataBufferUtils.join(req.getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
body -> {
ServerWebExchange newExchange = exchange;
FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
if (body != NettyDataBufferUtils.EMPTY_DATA_BUFFER) {
FizzServerHttpRequestDecorator requestDecorator = new FizzServerHttpRequestDecorator(req);
try {
requestDecorator.setBody(body);
} finally {
NettyDataBufferUtils.release(body);
}
newExchange = exchange.mutate().request(requestDecorator).build();
if (log.isDebugEnabled()) {
log.debug("retain body", LogService.BIZ_ID, req.getId());
}
// requestDecorator.getHeaders().remove(HttpHeaders.CONTENT_LENGTH);
}
ServerWebExchange mutatedExchange = exchange.mutate().request(requestDecorator).build();
ServerWebExchange newExchange = mutatedExchange;
MediaType contentType = req.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
newExchange = new FizzServerWebExchangeDecorator(mutatedExchange);
}
if (log.isDebugEnabled()) {
String traceId = WebUtils.getTraceId(exchange);
log.debug(traceId + " request is decorated", LogService.BIZ_ID, traceId);
}
return FizzPluginFilterChain.next(newExchange);
}

View File

@@ -80,7 +80,7 @@ public class CallbackService {
public Mono<Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, Map<String, ServiceInstance> service2instMap) {
ServerHttpRequest req = exchange.getRequest();
String reqId = req.getId();
String reqId = WebUtils.getTraceId(exchange);
HttpMethod method = req.getMethod();
if (log.isDebugEnabled()) {
log.debug("service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, reqId);
@@ -153,7 +153,7 @@ public class CallbackService {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
b.append(Constants.Symbol.LINE_SEPARATOR).append(callback).append(Constants.Symbol.LINE_SEPARATOR);
String id = exchange.getRequest().getId();
String id = WebUtils.getTraceId(exchange);
WebUtils.request2stringBuilder(id, method, r.service + Constants.Symbol.FORWARD_SLASH + r.path, headers, body, b);
log.error(b.toString(), LogService.BIZ_ID, id, t);
}
@@ -195,7 +195,7 @@ public class CallbackService {
);
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
String rid = exchange.getRequest().getId();
String rid = WebUtils.getTraceId(exchange);
WebUtils.response2stringBuilder(rid, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, rid);
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.proxy;
import org.springframework.http.HttpMethod;
import we.plugin.PluginConfig;
import we.util.Constants;
import we.util.JacksonUtils;
import java.util.List;
/**
* @author hongqiaowei
*/
public class Route {
public byte type;
public HttpMethod method;
public String backendService;
public String backendPath;
public String query;
public String nextHttpHostPort;
public List<PluginConfig> pluginConfigs;
public String rpcMethod;
public String rpcParamTypes;
public String rpcVersion;
public String rpcGroup;
public long timeout = 0;
public Route type(byte t) {
type = t;
return this;
}
public Route method(HttpMethod m) {
method = m;
return this;
}
public Route backendService(String bs) {
backendService = bs;
return this;
}
public Route backendPath(String bp) {
backendPath = bp;
return this;
}
public Route query(String qry) {
query = qry;
return this;
}
public Route pluginConfigs(List<PluginConfig> pcs) {
pluginConfigs = pcs;
return this;
}
public Route nextHttpHostPort(String nhhp) {
nextHttpHostPort = nhhp;
return this;
}
public Route rpcMethod(String m) {
rpcMethod = m;
return this;
}
public Route rpcParamTypes(String t) {
rpcParamTypes = t;
return this;
}
public Route rpcVersion(String v) {
rpcVersion = v;
return this;
}
public Route rpcGroup(String g) {
rpcGroup = g;
return this;
}
public Route timeout(long t) {
timeout = t;
return this;
}
public String getBackendPathQuery() {
if (query != null) {
return backendPath + Constants.Symbol.QUESTION + query;
}
return backendPath;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -36,6 +36,7 @@ import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.AuthPluginFilter;
import we.proxy.Route;
import java.net.URI;
import java.util.Collections;
@@ -75,14 +76,14 @@ public abstract class WebUtils {
private static final String clientRequestQuery = "@crq";
private static final String traceId = "traceId";
private static String gatewayPrefix = SystemConfig.DEFAULT_GATEWAY_PREFIX;
private static List<String> appHeaders = Stream.of("fizz-appid").collect(Collectors.toList());
private static final String app = "app";
public static final String TRACE_ID = "@traid";
public static final String BACKEND_SERVICE = "@bs";
public static final String FILTER_CONTEXT = "@fc";
@@ -93,6 +94,8 @@ public abstract class WebUtils {
public static final String BACKEND_PATH = "@bp";
public static final String ROUTE = "@rout";
public static boolean LOG_RESPONSE_BODY = false;
public static Set<String> LOG_HEADER_SET = Collections.EMPTY_SET;
@@ -105,7 +108,7 @@ public abstract class WebUtils {
public static void setAppHeaders(List<String> hdrs) {
appHeaders = hdrs;
}
public static String getHeaderValue(ServerWebExchange exchange, String header) {
return exchange.getRequest().getHeaders().getFirst(header);
}
@@ -175,23 +178,31 @@ public abstract class WebUtils {
return null;
}
}
public static Route getRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ROUTE);
}
public static Mono<Void> getDirectResponse(ServerWebExchange exchange) {
return exchange.getAttribute(WebUtils.directResponse);
}
@Deprecated
public static Map<String, FilterResult> getFilterContext(ServerWebExchange exchange) {
return exchange.getAttribute(FILTER_CONTEXT);
}
@Deprecated
public static FilterResult getFilterResult(ServerWebExchange exchange, String filter) {
return getFilterContext(exchange).get(filter);
}
@Deprecated
public static Map<String, Object> getFilterResultData(ServerWebExchange exchange, String filter) {
return getFilterResult(exchange, filter).data;
}
@Deprecated
public static Object getFilterResultDataItem(ServerWebExchange exchange, String filter, String key) {
return getFilterResultData(exchange, filter).get(key);
}
@@ -200,6 +211,7 @@ public abstract class WebUtils {
return buildDirectResponse(exchange.getResponse(), status, headers, bodyContent);
}
@Deprecated
public static Mono buildDirectResponseAndBindContext(ServerWebExchange exchange, HttpStatus status, HttpHeaders headers, String bodyContent) {
Mono<Void> mv = buildDirectResponse(exchange, status, headers, bodyContent);
exchange.getAttributes().put(WebUtils.directResponse, mv);
@@ -214,6 +226,7 @@ public abstract class WebUtils {
return buildDirectResponse(exchange, status, headers, json);
}
@Deprecated
public static Mono buildJsonDirectResponseAndBindContext(ServerWebExchange exchange, HttpStatus status, HttpHeaders headers, String json) {
if (headers == null) {
headers = new HttpHeaders();
@@ -244,21 +257,25 @@ public abstract class WebUtils {
.writeWith(Mono.just(clientResp.bufferFactory().wrap(bodyContent.getBytes())));
}
@Deprecated
public static void transmitSuccessFilterResult(ServerWebExchange exchange, String filter, Map<String, Object> data) {
FilterResult fr = FilterResult.SUCCESS_WITH(filter, data);
bind(exchange, filter, fr);
}
@Deprecated
public static Mono transmitSuccessFilterResultAndEmptyMono(ServerWebExchange exchange, String filter, Map<String, Object> data) {
transmitSuccessFilterResult(exchange, filter, data);
return Mono.empty();
}
@Deprecated
public static void transmitFailFilterResult(ServerWebExchange exchange, String filter) {
FilterResult fr = FilterResult.FAIL(filter);
bind(exchange, filter, fr);
}
@Deprecated
public static void transmitFailFilterResult(ServerWebExchange exchange, String filter, Throwable cause) {
FilterResult fr = FilterResult.FAIL_WITH(filter, cause);
bind(exchange, filter, fr);
@@ -270,6 +287,7 @@ public abstract class WebUtils {
fc.put(PREV_FILTER_RESULT, fr);
}
@Deprecated
public static FilterResult getPrevFilterResult(ServerWebExchange exchange) {
return getFilterContext(exchange).get(PREV_FILTER_RESULT);
}
@@ -399,7 +417,7 @@ public abstract class WebUtils {
public static void request2stringBuilder(ServerWebExchange exchange, StringBuilder b) {
ServerHttpRequest req = exchange.getRequest();
request2stringBuilder(req.getId(), req.getMethod(), req.getURI().toString(), req.getHeaders(), null, b);
request2stringBuilder(WebUtils.getTraceId(exchange), req.getMethod(), req.getURI().toString(), req.getHeaders(), null, b);
}
public static void request2stringBuilder(String reqId, HttpMethod method, String uri, HttpHeaders headers, Object body, StringBuilder b) {
@@ -452,24 +470,24 @@ public abstract class WebUtils {
// }
// );
// }
String rid = exchange.getRequest().getId();
String rid = getTraceId(exchange);
// Schedulers.parallel().schedule(() -> {
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
// if (reqBody[0] != null) {
// DataBufferUtils.release(reqBody[0]);
// }
b.append(Constants.Symbol.LINE_SEPARATOR);
b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, rid);
} else {
log.error(b.toString(), LogService.BIZ_ID, rid, t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
}
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
// if (reqBody[0] != null) {
// DataBufferUtils.release(reqBody[0]);
// }
b.append(Constants.Symbol.LINE_SEPARATOR);
b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, rid);
} else {
log.error(b.toString(), LogService.BIZ_ID, rid, t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
}
}
// });
if (filter != null) {
if (t == null) {
@@ -485,10 +503,12 @@ public abstract class WebUtils {
}
}
@Deprecated
public static Mono<Void> responseErrorAndBindContext(ServerWebExchange exchange, String filter, int code, String msg) {
return responseError(exchange, filter, code, msg, null, true);
}
@Deprecated
public static Mono<Void> responseErrorAndBindContext(ServerWebExchange exchange, String filter, int code, String msg, Throwable t) {
return responseError(exchange, filter, code, msg, t, true);
}
@@ -501,9 +521,10 @@ public abstract class WebUtils {
return responseError(exchange, reporter, code, msg, t, false);
}
@Deprecated
public static Mono<Void> responseErrorAndBindContext(ServerWebExchange exchange, String filter, HttpStatus httpStatus) {
ServerHttpResponse response = exchange.getResponse();
String rid = exchange.getRequest().getId();
String rid = getTraceId(exchange);
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
b.append(Constants.Symbol.LINE_SEPARATOR);
@@ -513,10 +534,11 @@ public abstract class WebUtils {
return buildDirectResponseAndBindContext(exchange, httpStatus, new HttpHeaders(), Constants.Symbol.EMPTY);
}
@Deprecated
public static Mono<Void> responseErrorAndBindContext(ServerWebExchange exchange, String filter, HttpStatus httpStatus,
HttpHeaders headers, String content) {
HttpHeaders headers, String content) {
ServerHttpResponse response = exchange.getResponse();
String rid = exchange.getRequest().getId();
String rid = getTraceId(exchange);
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
b.append(Constants.Symbol.LINE_SEPARATOR);
@@ -549,15 +571,9 @@ public abstract class WebUtils {
}
public static String getTraceId(ServerWebExchange exchange) {
String id = exchange.getAttribute(traceId);
String id = exchange.getAttribute(TRACE_ID);
if (id == null) {
ServerHttpRequest request = exchange.getRequest();
String v = request.getHeaders().getFirst(CommonConstants.HEADER_TRACE_ID);
if (StringUtils.isNotBlank(v)) {
id = v;
} else {
id = CommonConstants.TRACE_ID_PREFIX + request.getId();
}
id = exchange.getRequest().getId();
}
return id;
}

View File

@@ -26,6 +26,8 @@ import java.util.Map;
import org.junit.jupiter.api.Test;
import org.noear.snack.ONode;
import com.alibaba.fastjson.JSON;
import we.fizz.input.PathMapping;
/**
@@ -44,7 +46,7 @@ class ListFuncTests {
m.put(key, value);
return m;
}
private Map<String, Object> createRecord2(int index) {
Map<String, Object> m = new HashMap<>();
m.put("a", "a" + index);
@@ -81,8 +83,7 @@ class ListFuncTests {
assertEquals("a2", ((Map<String, Object>) result.get(1)).get("a").toString());
assertEquals("a4", ((Map<String, Object>) result.get(3)).get("a").toString());
}
@Test
void testMerge() {
List<Object> subList1 = new ArrayList<>();
@@ -95,7 +96,6 @@ class ListFuncTests {
subList2.add(createRecord("a", "a5"));
subList2.add(createRecord("a", "a6"));
ONode ctxNode = ONode.load(new HashMap());
PathMapping.setByPath(ctxNode, "test.data1", subList1, true);
PathMapping.setByPath(ctxNode, "test.data2", subList2, true);
@@ -106,7 +106,7 @@ class ListFuncTests {
assertEquals("a2", ((Map<String, Object>) result.get(1)).get("a").toString());
assertEquals("a4", ((Map<String, Object>) result.get(3)).get("a").toString());
}
@Test
void testExtract() {
List<Object> subList1 = new ArrayList<>();
@@ -116,7 +116,6 @@ class ListFuncTests {
subList1.add(createRecord2(4));
subList1.add(createRecord2(5));
ONode ctxNode = ONode.load(new HashMap());
PathMapping.setByPath(ctxNode, "test.data", subList1, true);
@@ -130,4 +129,46 @@ class ListFuncTests {
// System.out.println(result);
}
private Map<String, Object> createRecord3(int index, boolean isDest) {
Map<String, Object> m = new HashMap<>();
m.put("a", "a" + index);
String s = isDest ? "" + index : index + "-abc";
m.put("b", "b" + s);
m.put("c", "c" + s);
if (!isDest) {
m.put("d", "d" + s);
m.put("e", "e" + s);
}
return m;
}
@Test
void testJoin() {
List<Object> list1 = new ArrayList<>();
list1.add(createRecord3(1, true));
list1.add(createRecord3(2, true));
list1.add(createRecord3(3, true));
list1.add(createRecord3(4, true));
list1.add(createRecord3(5, true));
List<Object> list2 = new ArrayList<>();
list2.add(createRecord3(1, false));
list2.add(createRecord3(2, false));
list2.add(createRecord3(3, false));
list2.add(createRecord3(4, false));
ONode ctxNode = ONode.load(new HashMap());
PathMapping.setByPath(ctxNode, "test.list1", list1, true);
PathMapping.setByPath(ctxNode, "test.list2", list2, true);
String funcExpression = "fn.list.join({test.list1}, {test.list2},\"a\",\"c\",\"d\")";
List<Object> result = (List<Object>) FuncExecutor.getInstance().exec(ctxNode, funcExpression);
assertEquals(5, result.size());
assertEquals("a2", ((Map<String, Object>) result.get(1)).get("a").toString());
assertEquals("d4-abc", ((Map<String, Object>) result.get(3)).get("d").toString());
// System.out.println(JSON.toJSONString(result));
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -26,11 +26,15 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import we.plugin.PluginFilter;
import we.plugin.FizzPluginFilter;
import we.plugin.FizzPluginFilterChain;
import we.plugin.PluginConfig;
import we.util.JacksonUtils;
import we.util.ReactorUtils;
import we.util.WebUtils;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
/**
@@ -39,7 +43,7 @@ import java.util.Map;
*
*/
@Component(BasicAuthPluginFilter.BASIC_AUTH_PLUGIN_FILTER)
public class BasicAuthPluginFilter extends PluginFilter {
public class BasicAuthPluginFilter implements FizzPluginFilter {
private static final Logger log = LoggerFactory.getLogger(BasicAuthPluginFilter.class);
@@ -56,54 +60,61 @@ public class BasicAuthPluginFilter extends PluginFilter {
*/
private GlobalConfig globalConfig = null;
private String fixedConfigCache = null;
private String customConfigCache = null;
@SuppressWarnings("unchecked")
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, Map<String, Object> config, String fixedConfig) {
public Mono<Void> filter(ServerWebExchange exchange, Map<String, Object> config) {
try {
if (globalConfig == null || fixedConfigCache == null
|| (fixedConfigCache != null && !fixedConfigCache.equals(fixedConfig))) {
if (StringUtils.isNotBlank(fixedConfig)) {
globalConfig = JacksonUtils.readValue(fixedConfig, GlobalConfig.class);
// global config
String customConfig = (String) config.get(PluginConfig.CUSTOM_CONFIG);
if (globalConfig == null || customConfigCache == null
|| (customConfigCache != null && !customConfigCache.equals(customConfig))) {
if (StringUtils.isNotBlank(customConfig)) {
globalConfig = JacksonUtils.readValue(customConfig, GlobalConfig.class);
} else {
globalConfig = null;
}
fixedConfigCache = fixedConfig;
customConfigCache = customConfig;
}
return exchange.getSession().flatMap(webSession -> {
// check session
String authInfo = webSession.getAttribute(BASIC_AUTH_KEY);
if (authInfo == null) {
// check header auth
HttpHeaders reqHeaders = exchange.getRequest().getHeaders();
String authorization = reqHeaders.getFirst(HttpHeaders.AUTHORIZATION);
if (checkAuth(authorization, globalConfig)) {
webSession.getAttributes().put(BASIC_AUTH_KEY, BASIC_AUTH_VALUE);
authInfo = BASIC_AUTH_VALUE;
}
}
if (authInfo == null) {
// Auth failed
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().setCacheControl("no-store");
response.getHeaders().setExpires(0);
response.getHeaders().add("WWW-authenticate", "Basic Realm=\"input username and password\"");
return WebUtils.responseErrorAndBindContext(exchange, BASIC_AUTH_PLUGIN_FILTER,
HttpStatus.UNAUTHORIZED);
} else {
return WebUtils.transmitSuccessFilterResultAndEmptyMono(exchange, BASIC_AUTH_PLUGIN_FILTER, null);
}
});
// route level config
Map<String, String> routeUsers = new HashMap<>();
String routeLevelConfig = (String) config.get("users");
if (StringUtils.isNotBlank(routeLevelConfig)) {
Map<String, String> tmp = (Map<String, String>) JacksonUtils.readValue(routeLevelConfig, Map.class);
routeUsers.putAll(tmp);
}
// check header auth
HttpHeaders reqHeaders = exchange.getRequest().getHeaders();
String authorization = reqHeaders.getFirst(HttpHeaders.AUTHORIZATION);
if (checkAuth(authorization, globalConfig, routeUsers)) {
// Go to next plugin
Mono next = FizzPluginFilterChain.next(exchange);
return next.defaultIfEmpty(ReactorUtils.NULL).flatMap(nil -> {
doAfter();
return Mono.empty();
});
} else {
// Auth failed
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().setCacheControl("no-store");
response.getHeaders().setExpires(0);
response.getHeaders().add("WWW-authenticate", "Basic Realm=\"input username and password\"");
return WebUtils.buildDirectResponse(exchange, HttpStatus.UNAUTHORIZED, null, null);
}
} catch (Exception e) {
log.error("Basic Auth plugin Exception", e);
return WebUtils.responseErrorAndBindContext(exchange, BASIC_AUTH_PLUGIN_FILTER,
HttpStatus.INTERNAL_SERVER_ERROR);
return WebUtils.buildDirectResponse(exchange, HttpStatus.INTERNAL_SERVER_ERROR, null, null);
}
}
public void doAfter() {
}
/**
* Validate basic authorization
*
@@ -111,7 +122,7 @@ public class BasicAuthPluginFilter extends PluginFilter {
* @param globalConfig
* @return
*/
public boolean checkAuth(String authorization, GlobalConfig globalConfig) {
public boolean checkAuth(String authorization, GlobalConfig globalConfig, Map<String, String> routeUsers) {
if ((authorization != null) && (authorization.length() > 6)) {
authorization = authorization.substring(6, authorization.length());
try {
@@ -120,9 +131,12 @@ public class BasicAuthPluginFilter extends PluginFilter {
int idx = decodedAuth.indexOf(":");
String username = decodedAuth.substring(0, idx);
String password = decodedAuth.substring(idx + 1, decodedAuth.length());
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password) && globalConfig != null
&& globalConfig.getUsers() != null) {
return password.equals(globalConfig.getUsers().get(username));
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
if (routeUsers != null && routeUsers.containsKey(username)) {
return password.equals(routeUsers.get(username));
} else if (globalConfig != null && globalConfig.getUsers() != null) {
return password.equals(globalConfig.getUsers().get(username));
}
}
}
} catch (Exception e) {

View File

@@ -42,47 +42,50 @@ public class BasicAuthPluginFilterTests {
globalConfig.setUsers(users);
users.put("abc", "123456");
Map<String, String> routeUsers = new HashMap<>();
routeUsers.put("abc", "123456");
BasicAuthPluginFilter plugin = new BasicAuthPluginFilter();
String authorization = "Basic " + Base64.getEncoder().encodeToString("a:b".getBytes());
boolean result = plugin.checkAuth(authorization, globalConfig);
boolean result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "Basic " + Base64.getEncoder().encodeToString("a:".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "Basic " + Base64.getEncoder().encodeToString(":b".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "Basic " + Base64.getEncoder().encodeToString(":".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "Basic " + Base64.getEncoder().encodeToString("".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "";
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = null;
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = "Basic " + Base64.getEncoder().encodeToString("abc:123456".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertTrue(result);
authorization = "Basic" + Base64.getEncoder().encodeToString("abc:123456".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
authorization = Base64.getEncoder().encodeToString("abc:123456".getBytes());
result = plugin.checkAuth(authorization, globalConfig);
result = plugin.checkAuth(authorization, globalConfig, routeUsers);
assertFalse(result);
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>fizz-gateway-community</artifactId>
<groupId>com.fizzgate</groupId>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -7,7 +7,7 @@
<!--<java.version>1.8</java.version>-->
<spring-boot.version>2.2.13.RELEASE</spring-boot.version>
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
<reactor-bom.version>Dysprosium-SR22</reactor-bom.version>
<reactor-bom.version>Dysprosium-SR23</reactor-bom.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<nacos.cloud.version>2.2.5.RELEASE</nacos.cloud.version>
<netty.version>4.1.68.Final</netty.version>
@@ -34,7 +34,7 @@
<artifactId>fizz-gateway-community</artifactId>
<name>${project.artifactId}</name>
<description>fizz gateway community</description>
<version>2.3.0</version>
<version>2.3.2-beta1</version>
<packaging>pom</packaging>
<modules>
<module>fizz-common</module>
@@ -134,7 +134,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.3.19.RELEASE</version>
<version>3.3.20.RELEASE</version>
<scope>test</scope>
</dependency>