feat: Support configuring dubbo API in route #210

This commit is contained in:
hongqiaowei
2021-06-04 16:56:23 +08:00
committed by GitHub
parent 5289a84cb4
commit 43f0160230
11 changed files with 160 additions and 141 deletions

View File

@@ -61,6 +61,10 @@ public class SystemConfig {
public List<String> proxySetHeaders = new ArrayList<>();
public boolean aggregateTestAuth = false;
@NacosValue(value = "${route-timeout:0}", autoRefreshed = true)
@Value ( "${route-timeout:0}")
public long routeTimeout = 0;
@NacosValue(value = "${gateway.aggr.proxy_set_headers:}", autoRefreshed = true)

View File

@@ -117,11 +117,15 @@ public class PreprocessFilter extends FizzWebFilter {
if (ac == null) {
bs = WebUtils.getClientService(exchange);
bp = WebUtils.getClientReqPath(exchange);
} else if (ac.type != ApiConfig.Type.CALLBACK) {
if (ac.type != ApiConfig.Type.REVERSE_PROXY) {
bs = ac.backendService;
} else {
if (ac.type != ApiConfig.Type.CALLBACK) {
if (ac.type != ApiConfig.Type.REVERSE_PROXY) {
bs = ac.backendService;
}
if (ac.type != ApiConfig.Type.DUBBO) {
bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
}
bp = ac.transform(WebUtils.getClientReqPath(exchange));
}
if (bs != null) {
WebUtils.setBackendService(exchange, bs);

View File

@@ -21,6 +21,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -28,21 +30,24 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.constants.CommonConstants;
import we.flume.clients.log4j2appender.LogService;
import we.legacy.RespEntity;
import we.plugin.auth.ApiConfig;
import we.proxy.FizzWebClient;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.dubbo.DubboInterfaceDeclaration;
import we.util.Constants;
import we.util.JacksonUtils;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
/**
@@ -58,6 +63,9 @@ public class RouteFilter extends FizzWebFilter {
@Resource
private FizzWebClient fizzWebClient;
@Resource
private ApacheDubboGenericService dubboGenericService;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -84,11 +92,14 @@ public class RouteFilter extends FizzWebFilter {
private Mono<Void> doFilter0(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest clientReq = exchange.getRequest();
HttpHeaders hdrs = WebUtils.mergeAppendHeaders(exchange);
String rid = clientReq.getId();
ServerHttpRequest req = exchange.getRequest();
String rid = req.getId();
ApiConfig ac = WebUtils.getApiConfig(exchange);
HttpHeaders hdrs = null;
if (ac.type != ApiConfig.Type.DUBBO) {
hdrs = WebUtils.mergeAppendHeaders(exchange);
}
if (ac == null) {
String pathQuery = WebUtils.getClientReqPathQuery(exchange);
return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs);
@@ -101,7 +112,10 @@ public class RouteFilter extends FizzWebFilter {
String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort())
.append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange))
.toString();
return fizzWebClient.send(rid, clientReq.getMethod(), uri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
} else if (ac.type == ApiConfig.Type.DUBBO) {
return dubboRpc(exchange, ac);
} else {
String err = "cant handle api config type " + ac.type;
@@ -114,7 +128,7 @@ public class RouteFilter extends FizzWebFilter {
private Mono<Void> send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) {
ServerHttpRequest clientReq = exchange.getRequest();
return fizzWebClient.proxySend2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
}
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
@@ -155,4 +169,66 @@ public class RouteFilter extends FizzWebFilter {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
private Mono<Void> dubboRpc(ServerWebExchange exchange, ApiConfig ac) {
DataBuffer[] body = {null};
return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY)
.flatMap(
b -> {
HashMap<String, Object> parameters = null;
if (b != WebUtils.EMPTY_BODY) {
body[0] = b;
String json = body[0].toString(StandardCharsets.UTF_8).trim();
if (json.charAt(0) == '[') {
ArrayList<Object> lst = JacksonUtils.readValue(json, ArrayList.class);
parameters = new HashMap<>();
for (int i = 0; i < lst.size(); i++) {
parameters.put("p" + (i + 1), lst.get(i));
}
} else {
parameters = JacksonUtils.readValue(json, HashMap.class);
}
}
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
declaration.setServiceName(ac.backendService);
declaration.setVersion(ac.rpcVersion);
declaration.setGroup(ac.rpcGroup);
declaration.setMethod(ac.rpcMethod);
declaration.setParameterTypes(ac.rpcParamTypes);
int t = 20_000;
if (ac.timeout != 0) {
t = (int) ac.timeout;
}
declaration.setTimeout(t);
Map<String, String> attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange));
return dubboGenericService.send(parameters, declaration, attachments);
}
)
.flatMap(
dubboRpcResponseBody -> {
Mono<Void> m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody));
return m;
}
)
.doOnError(
t -> {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(exchange, b);
if (body[0] != null) {
b.append('\n').append(body[0].toString(StandardCharsets.UTF_8));
}
log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t);
}
)
.doFinally(
s -> {
if (body[0] != null) {
DataBufferUtils.release(body[0]);
}
}
)
;
}
}

View File

@@ -387,8 +387,10 @@ public class RequestInput extends RPCInput implements IInput{
String aggrService = aggrPath.split("\\/")[2];
FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class);
Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
headers, body, (long)timeout);
// Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
// headers, body, (long)timeout);
Mono<ClientResponse> clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url, headers, body, (long)timeout);
return clientResponse.flatMap(cr->{
RequestRPCResponse response = new RequestRPCResponse();
response.setHeaders(cr.headers().asHttpHeaders());

View File

@@ -42,7 +42,7 @@ import we.fizz.input.PathMapping;
public class RequestInputConfig extends InputConfig {
private URL url ;
private String method ;
private int timeout = 3;
private int timeout;
private String protocol;
/**
* Service Type, 1 service discovery, 2 HTTP service

View File

@@ -25,9 +25,7 @@ import we.plugin.PluginConfig;
import we.util.JacksonUtils;
import we.util.UrlTransformUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -44,6 +42,7 @@ public class ApiConfig {
static final byte SERVICE_DISCOVERY = 2;
static final byte REVERSE_PROXY = 3;
static final byte CALLBACK = 4;
static final byte DUBBO = 5;
}
public static final int DELETED = 1;
@@ -96,6 +95,16 @@ public class ApiConfig {
public CallbackConfig callbackConfig;
public String rpcMethod;
public String rpcParamTypes;
public String rpcVersion;
public String rpcGroup;
public long timeout = 0;
public static boolean isAntPathPattern(String path) {
boolean uriVar = false;
for (int i = 0; i < path.length(); i++) {

View File

@@ -93,7 +93,7 @@ public class CallbackService {
if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) {
ServiceInstance si = service2instMap.get(r.service);
if (si == null) {
send = fizzWebClient.proxySend2service(reqId, method, r.service, r.path, headers, body)
send = fizzWebClient.send2service(reqId, method, r.service, r.path, headers, body)
.onErrorResume( crError(exchange, r, method, headers, body) );
} else {
String uri = buildUri(req, si, r.path);
@@ -238,7 +238,7 @@ public class CallbackService {
for (ServiceTypePath stp : req.assignServices) {
if (stp.type == ApiConfig.Type.SERVICE_DISCOVERY) {
send = fizzWebClient.proxySend2service(req.id, req.method, stp.service, stp.path, req.headers, req.body)
send = fizzWebClient.send2service(req.id, req.method, stp.service, stp.path, req.headers, req.body)
.onErrorResume( crError(req, stp.service, stp.path) );
} else {
String traceId = CommonConstants.TRACE_ID_PREFIX + req.id;

View File

@@ -17,11 +17,9 @@
package we.proxy;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@@ -34,13 +32,14 @@ import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.ProxyWebClientConfig;
import we.config.SystemConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -53,122 +52,58 @@ public class FizzWebClient {
private static final Logger log = LoggerFactory.getLogger(FizzWebClient.class);
private static final String aggrSend = "$aggrSend";
private static final String localhost = "localhost";
private static final String host = "Host";
@Resource
private SystemConfig systemConfig;
@Resource
private DiscoveryClientUriSelector discoveryClientUriSelector;
@Resource(name = ProxyWebClientConfig.proxyWebClient)
private WebClient proxyWebClient;
// @Resource(name = AggrWebClientConfig.aggrWebClient)
// private WebClient aggrWebClient;
@NacosValue(value = "${fizz-web-client.timeout:-1}")
@Value("${fizz-web-client.timeout:-1}")
private long timeout = -1;
@PostConstruct
public void afterPropertiesSet() {
if (timeout != -1) {
CallBackendConfig.DEFAULT.timeout = timeout;
}
log.info("fizz web client timeout is " + CallBackendConfig.DEFAULT.timeout);
}
public Mono<ClientResponse> aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId,
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) {
// ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
CallBackendConfig cbc = null;
// if (timeout != null) {
// cbc = new CallBackendConfig(timeout);
// }
return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, cbc);
}
public Mono<ClientResponse> aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId,
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) {
// ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future
return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, null);
}
public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) {
return send(reqId, method, uriOrSvc, headers, body, null);
return send(reqId, method, uriOrSvc, headers, body, 0);
}
public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, CallBackendConfig cbc) {
public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, long timeout) {
String s = extractServiceOrAddress(uriOrSvc);
if (isService(s)) {
String path = uriOrSvc.substring(uriOrSvc.indexOf(Constants.Symbol.FORWARD_SLASH, 10));
return send2service(reqId, method, s, path, headers, body, cbc);
return send2service(reqId, method, s, path, headers, body, timeout);
} else {
return send2uri(reqId, method, uriOrSvc, headers, body, cbc);
return send2uri(reqId, method, uriOrSvc, headers, body, timeout);
}
}
private Mono<ClientResponse> aggrResolveAddressSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId,
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) {
public Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body) {
return send(originReqIdOrBizId, method, uriOrSvc, headers, body, cbc);
return send2service(clientReqId, method, service, relativeUri, headers, body, 0);
}
public Mono<ClientResponse> proxySend2service(@Nullable String originReqIdOrBizId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body) {
public Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body, long timeout) {
return send2service(originReqIdOrBizId, method, service, relativeUri, headers, body, null);
}
public Mono<ClientResponse> send2service(@Nullable String originReqIdOrBizId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) {
// TODO this the future
// if (cbc == null) {
// InstanceInfo inst = roundRobinChoose1instFrom(service);
// String uri = buildUri(inst, relativeUri);
// return send2uri(originReqIdOrBizId, method, uri, headers, body, null);
// } else {
// List<InstanceInfo> insts = eurekaClient.getInstancesByVipAddress(service, false);
// // TODO 据callBackendConfig, 结合insts的实际metric, 从insts中选择合适的一个转发请求过去
// }
// what about multiple nginx instance
// current
String uri = discoveryClientUriSelector.getNextUri(service, relativeUri);
return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc);
return send2uri(clientReqId, method, uri, headers, body, timeout);
}
public Mono<ClientResponse> send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri,
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) {
CallBackendConfig cbc = null;
// if (timeout != null) {
// cbc = new CallBackendConfig(timeout);
// }
return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc);
public Mono<ClientResponse> send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body) {
return send2uri(clientReqId, method, uri, headers, body, 0);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Mono<ClientResponse> send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri,
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) {
public Mono<ClientResponse> send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body, long timeout) {
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
WebUtils.request2stringBuilder(originReqIdOrBizId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, originReqIdOrBizId);
WebUtils.request2stringBuilder(clientReqId, method, uri, headers, null, b);
log.debug(b.toString(), LogService.BIZ_ID, clientReqId);
}
// if (cbc == null) {
// cbc = CallBackendConfig.DEFAULT;
// }
// TODO remove this, and all event loop share one web client or one event loop one web client in future
// WebClient.RequestBodySpec req = (ThreadContext.remove(aggrSend) == null ? proxyWebClient : aggrWebClient).method(method).uri(uri).headers(
WebClient.RequestBodySpec req = proxyWebClient.method(method).uri(uri).headers(
hdrs -> {
if (headers != null) {
@@ -196,27 +131,16 @@ public class FizzWebClient {
}
}
return req.exchange()
/*
.name(reqId)
.doOnRequest(i -> {})
.doOnSuccess(r -> {})
.doOnError(
t -> {
Schedulers.parallel().schedule(() -> {
log.error("", LogService.BIZ_ID, reqId, t);
});
}
)
.timeout(Duration.ofMillis(cbc.timeout))
*/
;
// if (log.isDebugEnabled()) {
// rm = rm.log();
// }
// TODO 请求完成后做metric, 以反哺后续的请求转发
Mono<ClientResponse> cr = req.exchange();
if (timeout == 0) {
if (systemConfig.routeTimeout != 0) {
timeout = systemConfig.routeTimeout;
}
}
if (timeout != 0) {
cr = cr.timeout(Duration.ofMillis(timeout));
}
return cr;
}
private void setHostHeader(String uri, HttpHeaders headers) {

View File

@@ -87,7 +87,7 @@ public class ApacheDubboGenericService {
*/
@SuppressWarnings("unchecked")
public Mono<Object> send(final Map<String, Object> body, final DubboInterfaceDeclaration interfaceDeclaration,
HashMap<String, String> attachments) {
Map<String, String> attachments) {
RpcContext.getContext().setAttachments(attachments);
ReferenceConfig<GenericService> reference = createReferenceConfig(interfaceDeclaration.getServiceName(),

View File

@@ -76,7 +76,7 @@ public class CallbackServiceTests {
ServerHttpRequest req = exchange.getRequest();
String reqId = req.getId();
when(mockFizzWebClient.proxySend2service(reqId, HttpMethod.GET, "s1", "p1", headers, body))
when(mockFizzWebClient.send2service(reqId, HttpMethod.GET, "s1", "p1", headers, body))
.thenReturn(
Mono.just(
ClientResponse.create(HttpStatus.GONE, ExchangeStrategies.withDefaults())
@@ -86,7 +86,7 @@ public class CallbackServiceTests {
)
);
when(mockFizzWebClient.proxySend2service(reqId, HttpMethod.GET, "s2", "p2", headers, body))
when(mockFizzWebClient.send2service(reqId, HttpMethod.GET, "s2", "p2", headers, body))
.thenReturn(
Mono.just(
ClientResponse.create(HttpStatus.FOUND, ExchangeStrategies.withDefaults())