performance tuning again

This commit is contained in:
hongqiaowei
2021-05-28 09:42:34 +08:00
parent 22628040d5
commit 36e8866bbc
18 changed files with 396 additions and 262 deletions

View File

@@ -1,44 +1,44 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.resources.LoopResources;
import we.util.JacksonUtils;
/**
* @author hongqiaowei
*/
@Configuration
@ConfigurationProperties(prefix = AggrWebClientConfig.prefix)
public class AggrWebClientConfig extends WebClientConfig {
protected static final String prefix = "aggr-webclient";
public static final String aggrWebClient = "aggrWebClient";
@Bean(aggrWebClient)
public WebClient webClient() {
log.info(aggrWebClient + ": " + this);
return super.webClient();
}
}
// /*
// * Copyright (C) 2020 the original author or authors.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//
// package we.config;
//
// import org.springframework.boot.context.properties.ConfigurationProperties;
// import org.springframework.context.annotation.Bean;
// import org.springframework.context.annotation.Configuration;
// import org.springframework.web.reactive.function.client.WebClient;
// import reactor.netty.resources.LoopResources;
// import we.util.JacksonUtils;
//
// /**
// * @author hongqiaowei
// */
//
// @Configuration
// @ConfigurationProperties(prefix = AggrWebClientConfig.prefix)
// public class AggrWebClientConfig extends WebClientConfig {
//
// protected static final String prefix = "aggr-webclient";
//
// public static final String aggrWebClient = "aggrWebClient";
//
// @Bean(aggrWebClient)
// public WebClient webClient() {
// log.info(aggrWebClient + ": " + this);
// return super.webClient();
// }
// }

View File

@@ -22,6 +22,7 @@ import io.netty.channel.ChannelOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -34,6 +35,8 @@ import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.resource.HttpResource;
import reactor.netty.http.HttpResources;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
@@ -50,39 +53,48 @@ public class WebFluxConfig {
private static final Logger log = LoggerFactory.getLogger(WebFluxConfig.class);
@NacosValue(value = "${server.connection-pool.max-connections:500}", autoRefreshed = true)
@Value( "${server.connection-pool.max-connections:500}" )
private int maxConnections;
// @NacosValue(value = "${server.connection-pool.max-connections:500}", autoRefreshed = true)
// @Value( "${server.connection-pool.max-connections:500}" )
// private int maxConnections;
//
// @NacosValue(value = "${server.connection-pool.max-idle-time:30000}", autoRefreshed = true)
// @Value( "${server.connection-pool.max-idle-time:30000}" )
// private long maxIdleTime;
@NacosValue(value = "${server.connection-pool.max-idle-time:30000}", autoRefreshed = true)
@Value( "${server.connection-pool.max-idle-time:30000}" )
private long maxIdleTime;
// private ConnectionProvider connectionProvider() {
// ConnectionProvider connectionProvider = ConnectionProvider.builder("fizz-cp")
// .maxConnections(maxConnections)
// .maxIdleTime(Duration.ofMillis(maxIdleTime))
// .pendingAcquireMaxCount(-1)
// .build();
// log.info("server connection provider: maxConnections={}, maxIdleTime={}", maxConnections, maxIdleTime);
// return connectionProvider;
// }
private ConnectionProvider connectionProvider() {
ConnectionProvider connectionProvider = ConnectionProvider.builder("fizz-cp")
.maxConnections(maxConnections)
.maxIdleTime(Duration.ofMillis(maxIdleTime))
.pendingAcquireMaxCount(-1)
.build();
log.info("server connection provider: maxConnections={}, maxIdleTime={}", maxConnections, maxIdleTime);
return connectionProvider;
}
// @ConditionalOnBean(ReactorResourceFactory.class)
// @Bean(name = "$dummyObject")
// public Object dummyObject() {
// ConnectionProvider provider = connectionProvider();
// HttpResources.set(provider);
// log.info("replace default connection provider with: " + provider);
// return new Object();
// }
private LoopResources loopResources() {
LoopResources loopResources = LoopResources.create("fizz-lrs");
log.info("server loop resources: " + loopResources);
return loopResources;
}
// private LoopResources loopResources() {
// LoopResources loopResources = LoopResources.create("fizz-lrs");
// log.info("server loop resources: " + loopResources);
// return loopResources;
// }
@Bean
public ReactorResourceFactory reactorServerResourceFactory() {
ReactorResourceFactory rrf = new ReactorResourceFactory();
rrf.setUseGlobalResources(false);
rrf.setLoopResources(loopResources());
rrf.setConnectionProvider(connectionProvider());
log.info("server reactor resource factory: " + rrf);
return rrf;
}
// @Bean
// public ReactorResourceFactory reactorServerResourceFactory() {
// ReactorResourceFactory rrf = new ReactorResourceFactory();
// rrf.setUseGlobalResources(false);
// rrf.setLoopResources(loopResources());
// rrf.setConnectionProvider(connectionProvider());
// log.info("server reactor resource factory: " + rrf);
// return rrf;
// }
// @Bean
// public NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ServerProperties serverProperties/*, ReactorResourceFactory reactorResourceFactory*/) {

View File

@@ -20,12 +20,14 @@ package we.filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.flume.clients.log4j2appender.LogService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
@@ -37,15 +39,33 @@ import we.util.WebUtils;
@Order(0)
public class FizzLogFilter implements WebFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(FizzLogFilter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FizzLogFilter.class);
private static final String resp = "\nresponse ";
private static final String resp = "\nresponse ";
private static final String in = " in ";
private static final String in = " in ";
private static final String admin = "admin";
private static final String actuator = "actuator";
public static final String ADMIN_REQUEST = "$a";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
long startTime = System.currentTimeMillis();
String path = exchange.getRequest().getPath().value();
int secFS = path.indexOf(Constants.Symbol.FORWARD_SLASH, 1);
if (secFS == -1) {
return WebUtils.responseError(exchange, HttpStatus.INTERNAL_SERVER_ERROR.value(), "request path should like /optional-prefix/service-name/real-biz-path");
}
String s = path.substring(1, secFS);
if (s.equals(admin) || s.equals(actuator)) {
exchange.getAttributes().put(ADMIN_REQUEST, Constants.Symbol.EMPTY);
}
return chain.filter(exchange).doAfterTerminate(
() -> {
if (LOGGER.isInfoEnabled()) {

View File

@@ -37,17 +37,10 @@ public abstract class FizzWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String path = exchange.getRequest().getPath().value();
int secFS = path.indexOf(Constants.Symbol.FORWARD_SLASH, 1);
if (secFS == -1) {
return WebUtils.responseError(exchange, HttpStatus.INTERNAL_SERVER_ERROR.value(), "request path should like /optional-prefix/service-name/real-biz-path");
}
String s = path.substring(1, secFS);
if (s.equals(admin) || s.equals(actuator)) {
return chain.filter(exchange);
} else {
if (exchange.getAttribute(FizzLogFilter.ADMIN_REQUEST) == null) {
return doFilter(exchange, chain);
} else {
return chain.filter(exchange);
}
}

View File

@@ -81,12 +81,12 @@ public class PreprocessFilter extends FizzWebFilter {
.flatMap(
v -> {
Object authRes = WebUtils.getFilterResultDataItem(exchange, AuthPluginFilter.AUTH_PLUGIN_FILTER, AuthPluginFilter.RESULT);
Mono m;
Mono m = ReactorUtils.getInitiateMono();
if (authRes instanceof ApiConfig) {
ApiConfig ac = (ApiConfig) authRes;
afterAuth(exchange, ac);
m = executeFixedPluginFilters(exchange);
m = m.defaultIfEmpty(ReactorUtils.NULL);
// m = executeFixedPluginFilters(exchange);
// m = m.defaultIfEmpty(ReactorUtils.NULL);
if (ac.pluginConfigs == null || ac.pluginConfigs.isEmpty()) {
return m.flatMap(func(exchange, chain));
} else {
@@ -95,8 +95,9 @@ public class PreprocessFilter extends FizzWebFilter {
}
} else if (authRes == ApiConfigService.Access.YES) {
afterAuth(exchange, null);
m = executeFixedPluginFilters(exchange);
return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain));
// m = executeFixedPluginFilters(exchange);
// return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain));
return m.flatMap(func(exchange, chain));
} else {
ApiConfigService.Access access = (ApiConfigService.Access) authRes;
return WebUtils.responseError(exchange, HttpStatus.FORBIDDEN.value(), access.getReason());
@@ -142,19 +143,19 @@ public class PreprocessFilter extends FizzWebFilter {
};
}
private Mono<Void> executeFixedPluginFilters(ServerWebExchange exchange) {
Mono vm = Mono.empty();
List<FixedPluginFilter> fixedPluginFilters = FixedPluginFilter.getPluginFilters();
for (byte i = 0; i < fixedPluginFilters.size(); i++) {
FixedPluginFilter fpf = fixedPluginFilters.get(i);
vm = vm.defaultIfEmpty(ReactorUtils.NULL).flatMap(
v -> {
return fpf.filter(exchange, null, null);
}
);
}
return vm;
}
// private Mono<Void> executeFixedPluginFilters(ServerWebExchange exchange) {
// Mono vm = Mono.empty();
// List<FixedPluginFilter> fixedPluginFilters = FixedPluginFilter.getPluginFilters();
// for (byte i = 0; i < fixedPluginFilters.size(); i++) {
// FixedPluginFilter fpf = fixedPluginFilters.get(i);
// vm = vm.defaultIfEmpty(ReactorUtils.NULL).flatMap(
// v -> {
// return fpf.filter(exchange, null, null);
// }
// );
// }
// return vm;
// }
private Mono<Void> executeManagedPluginFilters(ServerWebExchange exchange, List<PluginConfig> pluginConfigs) {
Mono vm = Mono.empty();

View File

@@ -28,6 +28,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@@ -59,6 +60,7 @@ public class RouteFilter extends FizzWebFilter {
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
FilterResult pfr = WebUtils.getPrevFilterResult(exchange);
if (pfr.success) {
return doFilter0(exchange, chain);
@@ -96,7 +98,9 @@ public class RouteFilter extends FizzWebFilter {
return send(exchange, WebUtils.getBackendService(exchange), pathQuery, hdrs);
} else if (ac.type == ApiConfig.Type.REVERSE_PROXY) {
String uri = ac.getNextHttpHostPort() + WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange);
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
.toString();
return fizzWebClient.send(rid, clientReq.getMethod(), uri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
} else {

View File

@@ -84,7 +84,7 @@ public class ApiConfig {
@JsonProperty("proxyMode")
public byte type = Type.SERVICE_DISCOVERY;
private AtomicInteger counter = new AtomicInteger(-1);
private int counter = 0;
public List<String> httpHostPorts;
@@ -161,12 +161,13 @@ public class ApiConfig {
@JsonIgnore
public String getNextHttpHostPort() {
int idx = counter.incrementAndGet();
if (idx < 0) {
counter.set(0);
idx = 0;
int i = counter++;
if (i < 0) {
i = Math.abs(i);
}
return httpHostPorts.get(idx % httpHostPorts.size());
return httpHostPorts.get(
i % httpHostPorts.size()
);
}
public String transform(String reqPath) {

View File

@@ -263,7 +263,8 @@ public class ApiConfigService {
List<ApiConfig> apiConfigs = sc.getApiConfigs(method, path, gatewayGroup);
if (!apiConfigs.isEmpty()) {
List<String> matchPathPatterns = ThreadContext.getArrayList(mpps, String.class);
for (ApiConfig ac : apiConfigs) {
for (int i = 0; i < apiConfigs.size(); i++) {
ApiConfig ac = apiConfigs.get(i);
if (ac.checkApp) {
if (apiConifg2appsService.contains(ac.id, app)) {
matchPathPatterns.add(ac.path);
@@ -279,7 +280,8 @@ public class ApiConfigService {
Collections.sort(matchPathPatterns, UrlTransformUtils.ANT_PATH_MATCHER.getPatternComparator(path));
}
String bestPathPattern = matchPathPatterns.get(0);
for (ApiConfig ac : apiConfigs) {
for (int i = 0; i < apiConfigs.size(); i++) {
ApiConfig ac = apiConfigs.get(i);
if (StringUtils.equals(ac.path, bestPathPattern)) {
return ac;
}

View File

@@ -142,7 +142,8 @@ public class ServiceConfig {
return Collections.emptyList();
} else {
List<ApiConfig> lst = ThreadContext.getArrayList(acs, ApiConfig.class);
for (GatewayGroup2apiConfig gatewayGroup2apiConfig : matchGatewayGroup2apiConfigs) {
for (int i = 0; i < matchGatewayGroup2apiConfigs.size(); i++) {
GatewayGroup2apiConfig gatewayGroup2apiConfig = matchGatewayGroup2apiConfigs.get(i);
Set<ApiConfig> apiConfigs = gatewayGroup2apiConfig.get(gatewayGroup);
if (apiConfigs != null) {
lst.addAll(apiConfigs);

View File

@@ -18,7 +18,6 @@
package we.proxy;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,8 +32,6 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.config.AggrWebClientConfig;
import we.config.ProxyWebClientConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Constants;
@@ -43,11 +40,8 @@ import we.util.WebUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author hongqiaowei
@@ -62,7 +56,7 @@ public class FizzWebClient {
private static final String localhost = "localhost";
private static final String host = "HOST";
private static final String host = "Host";
@Resource
private DiscoveryClientUriSelector discoveryClientUriSelector;
@@ -70,8 +64,8 @@ public class FizzWebClient {
@Resource(name = ProxyWebClientConfig.proxyWebClient)
private WebClient proxyWebClient;
@Resource(name = AggrWebClientConfig.aggrWebClient)
private WebClient aggrWebClient;
// @Resource(name = AggrWebClientConfig.aggrWebClient)
// private WebClient aggrWebClient;
@NacosValue(value = "${fizz-web-client.timeout:-1}")
@Value("${fizz-web-client.timeout:-1}")
@@ -88,18 +82,18 @@ public class FizzWebClient {
public Mono<ClientResponse> aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId,
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) {
ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
// ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
CallBackendConfig cbc = null;
if (timeout != null) {
cbc = new CallBackendConfig(timeout);
}
// if (timeout != null) {
// cbc = new CallBackendConfig(timeout);
// }
return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, cbc);
}
public Mono<ClientResponse> aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId,
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) {
ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
// ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, null);
}
@@ -152,38 +146,28 @@ public class FizzWebClient {
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) {
CallBackendConfig cbc = null;
if (timeout != null) {
cbc = new CallBackendConfig(timeout);
}
// if (timeout != null) {
// cbc = new CallBackendConfig(timeout);
// }
return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc);
}
private static final String r = "R";
private Mono<ClientResponse> send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri,
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) {
if (originReqIdOrBizId == null) { // should not execute this
if (headers == null) {
originReqIdOrBizId = r + ThreadLocalRandom.current().nextInt(1_000, 10_000);
} else {
originReqIdOrBizId = r + headers.hashCode();
}
}
final String reqId = originReqIdOrBizId;
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(reqId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, reqId);
WebUtils.request2stringBuilder(originReqIdOrBizId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, originReqIdOrBizId);
}
if (cbc == null) {
cbc = CallBackendConfig.DEFAULT;
}
// if (cbc == null) {
// cbc = CallBackendConfig.DEFAULT;
// }
// TODO remove this, and all event loop share one web client or one event loop one web client in future
WebClient.RequestBodySpec req = (ThreadContext.remove(aggrSend) == null ? proxyWebClient : aggrWebClient).method(method).uri(uri).headers(
// WebClient.RequestBodySpec req = (ThreadContext.remove(aggrSend) == null ? proxyWebClient : aggrWebClient).method(method).uri(uri).headers(
WebClient.RequestBodySpec req = proxyWebClient.method(method).uri(uri).headers(
hdrs -> {
if (headers != null) {
headers.forEach(
@@ -208,7 +192,7 @@ public class FizzWebClient {
}
}
Mono<ClientResponse> rm = req.exchange()
return req.exchange()
/*
.name(reqId)
.doOnRequest(i -> {})
@@ -224,10 +208,9 @@ public class FizzWebClient {
*/
;
if (log.isDebugEnabled()) {
rm = rm.log();
}
return rm;
// if (log.isDebugEnabled()) {
// rm = rm.log();
// }
// TODO 请求完成后做metric, 以反哺后续的请求转发
}
@@ -265,10 +248,11 @@ public class FizzWebClient {
private boolean isService(String s) {
if (StringUtils.indexOfAny(s, Constants.Symbol.DOT, Constants.Symbol.COLON) > 0
|| StringUtils.indexOfIgnoreCase(s, localhost) > 0) {
|| StringUtils.startsWith(s, localhost)) {
return false;
} else {
return true;
}
}
}

View File

@@ -187,7 +187,7 @@ public abstract class WebUtils {
}
public static Map<String, FilterResult> getFilterContext(ServerWebExchange exchange) {
return (Map<String, FilterResult>) exchange.getAttributes().get(FILTER_CONTEXT);
return (Map<String, FilterResult>) exchange.getAttribute(FILTER_CONTEXT);
}
public static FilterResult getFilterResult(ServerWebExchange exchange, String filter) {
@@ -367,16 +367,16 @@ public abstract class WebUtils {
return path;
}
public static Map<String, String> getAppendHeaders(ServerWebExchange exchange) {
return (Map<String, String>) exchange.getAttributes().get(APPEND_HEADERS);
}
public static Map<String, String> appendHeader(ServerWebExchange exchange, String name, String value) {
Map<String, String> hdrs = getAppendHeaders(exchange);
hdrs.put(name, value);
return hdrs;
}
public static Map<String, String> getAppendHeaders(ServerWebExchange exchange) {
return (Map<String, String>) exchange.getAttribute(APPEND_HEADERS);
}
public static HttpHeaders mergeAppendHeaders(ServerWebExchange exchange) {
ServerHttpRequest req = exchange.getRequest();
Map<String, String> appendHeaders = getAppendHeaders(exchange);