diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java index 771373e..9c95c7b 100644 --- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java +++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java @@ -32,6 +32,7 @@ import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfigService; import we.plugin.auth.AuthPluginFilter; import we.plugin.stat.StatPluginFilter; +import we.proxy.Route; import we.util.ReactorUtils; import we.util.WebUtils; @@ -67,7 +68,7 @@ public class PreprocessFilter extends FizzWebFilter { Map fc = new HashMap<>(); fc.put(WebUtils.PREV_FILTER_RESULT, succFr); Map appendHdrs = new HashMap<>(8); Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc); - eas.put(WebUtils.APPEND_HEADERS, appendHdrs); + eas.put(WebUtils.APPEND_HEADERS, appendHdrs); Mono vm = statPluginFilter.filter(exchange, null, null); return process(exchange, chain, eas, vm); @@ -82,21 +83,25 @@ public class PreprocessFilter extends FizzWebFilter { Mono m = ReactorUtils.getInitiateMono(); if (authRes instanceof ApiConfig) { ApiConfig ac = (ApiConfig) authRes; - afterAuth(exchange, ac); + + Route route = ac.getRoute(exchange); + exchange.getAttributes().put(WebUtils.ROUTE, route); + + afterAuth(exchange, ac, route); m = executeFixedPluginFilters(exchange); m = m.defaultIfEmpty(ReactorUtils.NULL); - if (ac.pluginConfigs == null || ac.pluginConfigs.isEmpty()) { + if (route.pluginConfigs == null || route.pluginConfigs.isEmpty()) { return m.flatMap(func(exchange, chain)); } else { return m.flatMap( - nil -> { - eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain); - return FizzPluginFilterChain.next(exchange); - } + nil -> { + eas.put(FizzPluginFilterChain.WEB_FILTER_CHAIN, chain); + return FizzPluginFilterChain.next(exchange); + } ); } } else if (authRes == ApiConfigService.Access.YES) { - afterAuth(exchange, null); + afterAuth(exchange, null, null); m = executeFixedPluginFilters(exchange); return m.defaultIfEmpty(ReactorUtils.NULL).flatMap(func(exchange, chain)); } else { @@ -113,7 +118,7 @@ public class PreprocessFilter extends FizzWebFilter { ); } - private void afterAuth(ServerWebExchange exchange, ApiConfig ac) { + private void afterAuth(ServerWebExchange exchange, ApiConfig ac, Route route) { String bs = null, bp = null; if (ac == null) { bs = WebUtils.getClientService(exchange); @@ -125,6 +130,7 @@ public class PreprocessFilter extends FizzWebFilter { } if (ac.type != ApiConfig.Type.DUBBO) { bp = ac.transform(WebUtils.getClientReqPath(exchange)); + route.backendPath = bp; } } } diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java index 36be31b..8dd1d3c 100644 --- a/fizz-core/src/main/java/we/filter/RouteFilter.java +++ b/fizz-core/src/main/java/we/filter/RouteFilter.java @@ -23,6 +23,7 @@ import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -37,6 +38,7 @@ import we.flume.clients.log4j2appender.LogService; import we.legacy.RespEntity; import we.plugin.auth.ApiConfig; import we.proxy.FizzWebClient; +import we.proxy.Route; import we.proxy.dubbo.ApacheDubboGenericService; import we.proxy.dubbo.DubboInterfaceDeclaration; import we.util.*; @@ -93,31 +95,35 @@ public class RouteFilter extends FizzWebFilter { ServerHttpRequest req = exchange.getRequest(); String rid = req.getId(); - ApiConfig ac = WebUtils.getApiConfig(exchange); + + // ApiConfig ac = WebUtils.getApiConfig(exchange); + Route route = WebUtils.getRoute(exchange); + HttpHeaders hdrs = null; - if (ac.type != ApiConfig.Type.DUBBO) { + + if (route.type != ApiConfig.Type.DUBBO) { hdrs = WebUtils.mergeAppendHeaders(exchange); } - if (ac == null) { + if (route == null) { String pathQuery = WebUtils.getClientReqPathQuery(exchange); - return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs); + return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs); - } else if (ac.type == ApiConfig.Type.SERVICE_DISCOVERY) { - String pathQuery = WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange); - return send(exchange, WebUtils.getBackendService(exchange), pathQuery, hdrs); + } else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) { + String pathQuery = route.getBackendPathQuery(); + return send(exchange, route.method, route.backendService, pathQuery, hdrs); - } else if (ac.type == ApiConfig.Type.REVERSE_PROXY) { - String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort()) - .append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange)) + } else if (route.type == ApiConfig.Type.REVERSE_PROXY) { + String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort) + .append(route.getBackendPathQuery()) .toString(); - return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange)); + return fizzWebClient.send(rid, route.method, uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange)); - } else if (ac.type == ApiConfig.Type.DUBBO) { - return dubboRpc(exchange, ac); + } else if (route.type == ApiConfig.Type.DUBBO) { + return dubboRpc(exchange, route); } else { - String err = "cant handle api config type " + ac.type; + String err = "cant handle api config type " + route.type; StringBuilder b = ThreadContext.getStringBuilder(); WebUtils.request2stringBuilder(exchange, b); log.error(b.append(Constants.Symbol.LF).append(err).toString(), LogService.BIZ_ID, rid); @@ -125,9 +131,9 @@ public class RouteFilter extends FizzWebFilter { } } - private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) { + private Mono send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs) { ServerHttpRequest clientReq = exchange.getRequest(); - return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); + return fizzWebClient.send2service(clientReq.getId(), method, service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); } private Function> genServerResponse(ServerWebExchange exchange) { @@ -141,9 +147,9 @@ public class RouteFilter extends FizzWebFilter { String k = h.getKey(); if (clientRespHeaders.containsKey(k)) { if (k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN) || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS) - || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS) - || k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE) - || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) { + || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS) + || k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE) + || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS)) { } else { clientRespHeaders.put(k, h.getValue()); } @@ -164,12 +170,12 @@ public class RouteFilter extends FizzWebFilter { } private void cleanup(ClientResponse clientResponse) { - if (clientResponse != null) { - clientResponse.bodyToMono(Void.class).subscribe(); - } - } + if (clientResponse != null) { + clientResponse.bodyToMono(Void.class).subscribe(); + } + } - private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) { + private Mono dubboRpc(ServerWebExchange exchange, Route route) { final String[] ls = {null}; return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(NettyDataBufferUtils.EMPTY_DATA_BUFFER) .flatMap( @@ -191,14 +197,14 @@ public class RouteFilter extends FizzWebFilter { } DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration(); - declaration.setServiceName(ac.backendService); - declaration.setVersion(ac.rpcVersion); - declaration.setGroup(ac.rpcGroup); - declaration.setMethod(ac.rpcMethod); - declaration.setParameterTypes(ac.rpcParamTypes); + declaration.setServiceName(route.backendService); + declaration.setVersion(route.rpcVersion); + declaration.setGroup(route.rpcGroup); + declaration.setMethod(route.rpcMethod); + declaration.setParameterTypes(route.rpcParamTypes); int t = 20_000; - if (ac.timeout != 0) { - t = (int) ac.timeout; + if (route.timeout != 0) { + t = (int) route.timeout; } declaration.setTimeout(t); diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java index 4d58891..53cc0d6 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java @@ -21,12 +21,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpMethod; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.web.server.ServerWebExchange; import we.plugin.PluginConfig; +import we.proxy.Route; import we.util.JacksonUtils; import we.util.UrlTransformUtils; +import we.util.WebUtils; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -65,7 +70,7 @@ public class ApiConfig { public Set gatewayGroups = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet()); - public String service; + public String service; // a public String backendService; @@ -175,8 +180,8 @@ public class ApiConfig { i = Math.abs(i); } return httpHostPorts.get( - i % httpHostPorts.size() - ); + i % httpHostPorts.size() + ); } public String transform(String reqPath) { @@ -200,6 +205,27 @@ public class ApiConfig { return false; } + public Route getRoute(ServerWebExchange exchange) { + ServerHttpRequest request = exchange.getRequest(); + Route r = new Route().type(this.type) + .method(request.getMethod()) + .backendService(this.backendService) + .backendPath(this.backendPath) + .query(WebUtils.getClientReqQuery(exchange)) + .pluginConfigs(this.pluginConfigs) + .rpcMethod(this.rpcMethod) + .rpcParamTypes(this.rpcParamTypes) + .rpcGroup(this.rpcGroup) + .rpcVersion(this.rpcVersion) + .timeout(this.timeout); + + if (this.type == Type.REVERSE_PROXY) { + r = r.nextHttpHostPort(getNextHttpHostPort()); + } + + return r; + } + @Override public String toString() { return JacksonUtils.writeValueAsString(this); diff --git a/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java index ed0d23e..6f81e25 100644 --- a/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java +++ b/fizz-core/src/main/java/we/plugin/requestbody/RequestBodyPlugin.java @@ -19,6 +19,7 @@ package we.plugin.requestbody; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; @@ -58,6 +59,7 @@ public class RequestBodyPlugin implements FizzPluginFilter { } finally { NettyDataBufferUtils.release(body); } + requestDecorator.getHeaders().remove(HttpHeaders.CONTENT_LENGTH); newExchange = exchange.mutate().request(requestDecorator).build(); if (log.isDebugEnabled()) { log.debug("retain body", LogService.BIZ_ID, req.getId()); diff --git a/fizz-core/src/main/java/we/proxy/Route.java b/fizz-core/src/main/java/we/proxy/Route.java new file mode 100644 index 0000000..2b0df7e --- /dev/null +++ b/fizz-core/src/main/java/we/proxy/Route.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.proxy; + +import org.springframework.http.HttpMethod; +import we.plugin.PluginConfig; +import we.util.Constants; +import we.util.JacksonUtils; + +import java.util.List; + +/** + * @author hongqiaowei + */ + +public class Route { + + public byte type; + + public HttpMethod method; + + public String backendService; + + public String backendPath; + + public String query; + + public String nextHttpHostPort; + + public List pluginConfigs; + + public String rpcMethod; + + public String rpcParamTypes; + + public String rpcVersion; + + public String rpcGroup; + + public long timeout = 0; + + public Route type(byte t) { + type = t; + return this; + } + + public Route method(HttpMethod m) { + method = m; + return this; + } + + public Route backendService(String bs) { + backendService = bs; + return this; + } + + public Route backendPath(String bp) { + backendPath = bp; + return this; + } + + public Route query(String qry) { + query = qry; + return this; + } + + public Route pluginConfigs(List pcs) { + pluginConfigs = pcs; + return this; + } + + public Route nextHttpHostPort(String nhhp) { + nextHttpHostPort = nhhp; + return this; + } + + public Route rpcMethod(String m) { + rpcMethod = m; + return this; + } + + public Route rpcParamTypes(String t) { + rpcParamTypes = t; + return this; + } + + public Route rpcVersion(String v) { + rpcVersion = v; + return this; + } + + public Route rpcGroup(String g) { + rpcGroup = g; + return this; + } + + public Route timeout(long t) { + timeout = t; + return this; + } + + public String getBackendPathQuery() { + if (query != null) { + return backendPath + Constants.Symbol.QUESTION + query; + } + return backendPath; + } + + @Override + public String toString() { + return JacksonUtils.writeValueAsString(this); + } +} diff --git a/fizz-core/src/main/java/we/util/WebUtils.java b/fizz-core/src/main/java/we/util/WebUtils.java index 0b4add2..bab4718 100644 --- a/fizz-core/src/main/java/we/util/WebUtils.java +++ b/fizz-core/src/main/java/we/util/WebUtils.java @@ -36,6 +36,7 @@ import we.flume.clients.log4j2appender.LogService; import we.legacy.RespEntity; import we.plugin.auth.ApiConfig; import we.plugin.auth.AuthPluginFilter; +import we.proxy.Route; import java.net.URI; import java.util.Collections; @@ -93,6 +94,8 @@ public abstract class WebUtils { public static final String BACKEND_PATH = "@bp"; + public static final String ROUTE = "@rout"; + public static boolean LOG_RESPONSE_BODY = false; public static Set LOG_HEADER_SET = Collections.EMPTY_SET; @@ -105,7 +108,7 @@ public abstract class WebUtils { public static void setAppHeaders(List hdrs) { appHeaders = hdrs; } - + public static String getHeaderValue(ServerWebExchange exchange, String header) { return exchange.getRequest().getHeaders().getFirst(header); } @@ -175,7 +178,11 @@ public abstract class WebUtils { return null; } } - + + public static Route getRoute(ServerWebExchange exchange) { + return exchange.getAttribute(ROUTE); + } + public static Mono getDirectResponse(ServerWebExchange exchange) { return exchange.getAttribute(WebUtils.directResponse); } @@ -454,22 +461,22 @@ public abstract class WebUtils { // } String rid = exchange.getRequest().getId(); // Schedulers.parallel().schedule(() -> { - StringBuilder b = ThreadContext.getStringBuilder(); - request2stringBuilder(exchange, b); - // if (reqBody[0] != null) { - // DataBufferUtils.release(reqBody[0]); - // } - b.append(Constants.Symbol.LINE_SEPARATOR); - b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg); - if (t == null) { - log.error(b.toString(), LogService.BIZ_ID, rid); - } else { - log.error(b.toString(), LogService.BIZ_ID, rid, t); - Throwable[] suppressed = t.getSuppressed(); - if (suppressed != null && suppressed.length != 0) { - log.error(StringUtils.EMPTY, suppressed[0]); - } + StringBuilder b = ThreadContext.getStringBuilder(); + request2stringBuilder(exchange, b); + // if (reqBody[0] != null) { + // DataBufferUtils.release(reqBody[0]); + // } + b.append(Constants.Symbol.LINE_SEPARATOR); + b.append(filter).append(Constants.Symbol.SPACE).append(code).append(Constants.Symbol.SPACE).append(msg); + if (t == null) { + log.error(b.toString(), LogService.BIZ_ID, rid); + } else { + log.error(b.toString(), LogService.BIZ_ID, rid, t); + Throwable[] suppressed = t.getSuppressed(); + if (suppressed != null && suppressed.length != 0) { + log.error(StringUtils.EMPTY, suppressed[0]); } + } // }); if (filter != null) { if (t == null) { @@ -514,7 +521,7 @@ public abstract class WebUtils { } public static Mono responseErrorAndBindContext(ServerWebExchange exchange, String filter, HttpStatus httpStatus, - HttpHeaders headers, String content) { + HttpHeaders headers, String content) { ServerHttpResponse response = exchange.getResponse(); String rid = exchange.getRequest().getId(); StringBuilder b = ThreadContext.getStringBuilder();