Support dynamic route

This commit is contained in:
hongqiaowei
2021-09-15 19:37:34 +08:00
parent 76ecd1d36e
commit 7ace84ba36
6 changed files with 238 additions and 63 deletions

View File

@@ -32,6 +32,7 @@ import we.plugin.auth.ApiConfig;
import we.plugin.auth.ApiConfigService;
import we.plugin.auth.AuthPluginFilter;
import we.plugin.stat.StatPluginFilter;
import we.proxy.Route;
import we.util.ReactorUtils;
import we.util.WebUtils;
@@ -67,7 +68,7 @@ public class PreprocessFilter extends FizzWebFilter {
Map<String, FilterResult> fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr);
Map<String, String> appendHdrs = new HashMap<>(8);
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
eas.put(WebUtils.APPEND_HEADERS, appendHdrs);
Mono vm = statPluginFilter.filter(exchange, null, null);
return process(exchange, chain, eas, vm);
@@ -82,21 +83,25 @@ public class PreprocessFilter extends FizzWebFilter {
Mono m = ReactorUtils.getInitiateMono();
if (authRes instanceof ApiConfig) {
ApiConfig ac = (ApiConfig) authRes;
afterAuth(exchange, ac);
Route route = ac.getRoute(exchange);
exchange.getAttributes().put(WebUtils.ROUTE, route);
afterAuth(exchange, ac, route);
m = executeFixedPluginFilters(exchange);
m = m.defaultIfEmpty(ReactorUtils.NULL);
if (ac.pluginConfigs == null || ac.pluginConfigs.isEmpty()) {
if (route.pluginConfigs == null || route.pluginConfigs.isEmpty()) {
return m.flatMap(func(exchange, chain));
} else {
return m.flatMap(
nil -> {
eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
return FizzPluginFilterChain.next(exchange);
}
nil -> {
eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain);
return FizzPluginFilterChain.next(exchange);
}
);
}
} else if (authRes == ApiConfigService.Access.YES) {
afterAuth(exchange, null);
afterAuth(exchange, null, null);
m = executeFixedPluginFilters(exchange);
return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain));
} else {
@@ -113,7 +118,7 @@ public class PreprocessFilter extends FizzWebFilter {
);
}
private void afterAuth(ServerWebExchange exchange, ApiConfig ac) {
private void afterAuth(ServerWebExchange exchange, ApiConfig ac, Route route) {
String bs = null, bp = null;
if (ac == null) {
bs = WebUtils.getClientService(exchange);
@@ -125,6 +130,7 @@ public class PreprocessFilter extends FizzWebFilter {
}
if (ac.type != ApiConfig.Type.DUBBO) {
bp = ac.transform(WebUtils.getClientReqPath(exchange));
route.backendPath = bp;
}
}
}

View File

@@ -23,6 +23,7 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@@ -37,6 +38,7 @@ import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.Route;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.*;
@@ -93,31 +95,35 @@ public class RouteFilter extends FizzWebFilter {
ServerHttpRequest req = exchange.getRequest();
String rid = req.getId();
ApiConfig ac = WebUtils.getApiConfig(exchange);
// ApiConfig ac = WebUtils.getApiConfig(exchange);
Route route = WebUtils.getRoute(exchange);
HttpHeaders hdrs = null;
if (ac.type != ApiConfig.Type.DUBBO) {
if (route.type != ApiConfig.Type.DUBBO) {
hdrs = WebUtils.mergeAppendHeaders(exchange);
}
if (ac == null) {
if (route == null) {
String pathQuery = WebUtils.getClientReqPathQuery(exchange);
return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs);
return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs);
} else if (ac.type == ApiConfig.Type.SERVICE_DISCOVERY) {
String pathQuery = WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange);
return send(exchange, WebUtils.getBackendService(exchange), pathQuery, hdrs);
} else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) {
String pathQuery = route.getBackendPathQuery();
return send(exchange, route.method, route.backendService, pathQuery, hdrs);
} else if (ac.type == ApiConfig.Type.REVERSE_PROXY) {
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
} else if (route.type == ApiConfig.Type.REVERSE_PROXY) {
String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
.append(route.getBackendPathQuery())
.toString();
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send(rid, route.method, uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
} else if (ac.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, ac);
} else if (route.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, route);
} else {
String err = "cant handle api config type " + ac.type;
String err = "cant handle api config type " + route.type;
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
log.error(b.append(Constants.Symbol.LF).append(err).toString(), LogService.BIZ_ID, rid);
@@ -125,9 +131,9 @@ public class RouteFilter extends FizzWebFilter {
}
}
private Mono<Void> send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
private Mono<Void> send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs) {
ServerHttpRequest clientReq = exchange.getRequest();
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send2service(clientReq.getId(), method, service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
}
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
@@ -141,9 +147,9 @@ public class RouteFilter extends FizzWebFilter {
String k = h.getKey();
if (clientRespHeaders.containsKey(k)) {
if (k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN) || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) {
} else {
clientRespHeaders.put(k, h.getValue());
}
@@ -164,12 +170,12 @@ public class RouteFilter extends FizzWebFilter {
}
private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
private Mono<Void> dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
private Mono<Void> dubboRpc(ServerWebExchange exchange, Route route) {
final String[] ls = {null};
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER)
.flatMap(
@@ -191,14 +197,14 @@ public class RouteFilter extends FizzWebFilter {
}
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
declaration.setServiceName(ac.backendService);
declaration.setVersion(ac.rpcVersion);
declaration.setGroup(ac.rpcGroup);
declaration.setMethod(ac.rpcMethod);
declaration.setParameterTypes(ac.rpcParamTypes);
declaration.setServiceName(route.backendService);
declaration.setVersion(route.rpcVersion);
declaration.setGroup(route.rpcGroup);
declaration.setMethod(route.rpcMethod);
declaration.setParameterTypes(route.rpcParamTypes);
int t = 20_000;
if (ac.timeout != 0) {
t = (int) ac.timeout;
if (route.timeout != 0) {
t = (int) route.timeout;
}
declaration.setTimeout(t);

View File

@@ -21,12 +21,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import we.plugin.PluginConfig;
import we.proxy.Route;
import we.util.JacksonUtils;
import we.util.UrlTransformUtils;
import we.util.WebUtils;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -65,7 +70,7 @@ public class ApiConfig {
public Set<String> gatewayGroups = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet());
public String service;
public String service; // a
public String backendService;
@@ -175,8 +180,8 @@ public class ApiConfig {
i = Math.abs(i);
}
return httpHostPorts.get(
i % httpHostPorts.size()
);
i % httpHostPorts.size()
);
}
public String transform(String reqPath) {
@@ -200,6 +205,27 @@ public class ApiConfig {
return false;
}
public Route getRoute(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
Route r = new Route().type(this.type)
.method(request.getMethod())
.backendService(this.backendService)
.backendPath(this.backendPath)
.query(WebUtils.getClientReqQuery(exchange))
.pluginConfigs(this.pluginConfigs)
.rpcMethod(this.rpcMethod)
.rpcParamTypes(this.rpcParamTypes)
.rpcGroup(this.rpcGroup)
.rpcVersion(this.rpcVersion)
.timeout(this.timeout);
if (this.type == Type.REVERSE_PROXY) {
r = r.nextHttpHostPort(getNextHttpHostPort());
}
return r;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);

View File

@@ -19,6 +19,7 @@ package we.plugin.requestbody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
@@ -58,6 +59,7 @@ public class RequestBodyPlugin implements FizzPluginFilter {
} finally {
NettyDataBufferUtils.release(body);
}
requestDecorator.getHeaders().remove(HttpHeaders.CONTENT_LENGTH);
newExchange = exchange.mutate().request(requestDecorator).build();
if (log.isDebugEnabled()) {
log.debug("retain body", LogService.BIZ_ID, req.getId());

View File

@@ -0,0 +1,128 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.proxy;
import org.springframework.http.HttpMethod;
import we.plugin.PluginConfig;
import we.util.Constants;
import we.util.JacksonUtils;
import java.util.List;
/**
* @author hongqiaowei
*/
public class Route {
public byte type;
public HttpMethod method;
public String backendService;
public String backendPath;
public String query;
public String nextHttpHostPort;
public List<PluginConfig> pluginConfigs;
public String rpcMethod;
public String rpcParamTypes;
public String rpcVersion;
public String rpcGroup;
public long timeout = 0;
public Route type(byte t) {
type = t;
return this;
}
public Route method(HttpMethod m) {
method = m;
return this;
}
public Route backendService(String bs) {
backendService = bs;
return this;
}
public Route backendPath(String bp) {
backendPath = bp;
return this;
}
public Route query(String qry) {
query = qry;
return this;
}
public Route pluginConfigs(List<PluginConfig> pcs) {
pluginConfigs = pcs;
return this;
}
public Route nextHttpHostPort(String nhhp) {
nextHttpHostPort = nhhp;
return this;
}
public Route rpcMethod(String m) {
rpcMethod = m;
return this;
}
public Route rpcParamTypes(String t) {
rpcParamTypes = t;
return this;
}
public Route rpcVersion(String v) {
rpcVersion = v;
return this;
}
public Route rpcGroup(String g) {
rpcGroup = g;
return this;
}
public Route timeout(long t) {
timeout = t;
return this;
}
public String getBackendPathQuery() {
if (query != null) {
return backendPath + Constants.Symbol.QUESTION + query;
}
return backendPath;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -36,6 +36,7 @@ import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.AuthPluginFilter;
import we.proxy.Route;
import java.net.URI;
import java.util.Collections;
@@ -93,6 +94,8 @@ public abstract class WebUtils {
public static final String BACKEND_PATH = "@bp";
public static final String ROUTE = "@rout";
public static boolean LOG_RESPONSE_BODY = false;
public static Set<String> LOG_HEADER_SET = Collections.EMPTY_SET;
@@ -105,7 +108,7 @@ public abstract class WebUtils {
public static void setAppHeaders(List<String> hdrs) {
appHeaders = hdrs;
}
public static String getHeaderValue(ServerWebExchange exchange, String header) {
return exchange.getRequest().getHeaders().getFirst(header);
}
@@ -175,7 +178,11 @@ public abstract class WebUtils {
return null;
}
}
public static Route getRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ROUTE);
}
public static Mono<Void> getDirectResponse(ServerWebExchange exchange) {
return exchange.getAttribute(WebUtils.directResponse);
}
@@ -454,22 +461,22 @@ public abstract class WebUtils {
// }
String rid = exchange.getRequest().getId();
// Schedulers.parallel().schedule(() -> {
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
// if (reqBody[0] != null) {
// DataBufferUtils.release(reqBody[0]);
// }
b.append(Constants.Symbol.LINE_SEPARATOR);
b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, rid);
} else {
log.error(b.toString(), LogService.BIZ_ID, rid, t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
}
StringBuilder b = ThreadContext.getStringBuilder();
request2stringBuilder(exchange, b);
// if (reqBody[0] != null) {
// DataBufferUtils.release(reqBody[0]);
// }
b.append(Constants.Symbol.LINE_SEPARATOR);
b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg);
if (t == null) {
log.error(b.toString(), LogService.BIZ_ID, rid);
} else {
log.error(b.toString(), LogService.BIZ_ID, rid, t);
Throwable[] suppressed = t.getSuppressed();
if (suppressed != null && suppressed.length != 0) {
log.error(StringUtils.EMPTY, suppressed[0]);
}
}
// });
if (filter != null) {
if (t == null) {
@@ -514,7 +521,7 @@ public abstract class WebUtils {
}
public static Mono<Void> responseErrorAndBindContext(ServerWebExchange exchange, String filter, HttpStatus httpStatus,
HttpHeaders headers, String content) {
HttpHeaders headers, String content) {
ServerHttpResponse response = exchange.getResponse();
String rid = exchange.getRequest().getId();
StringBuilder b = ThreadContext.getStringBuilder();