Route support retry function
This commit is contained in:
@@ -102,7 +102,7 @@ public class RouteFilter extends FizzWebFilter {
|
||||
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
String traceId = WebUtils.getTraceId(exchange);
|
||||
Route route = WebUtils.getRoute(exchange);
|
||||
Route route = exchange.getAttribute(WebUtils.ROUTE);
|
||||
HttpHeaders hdrs = null;
|
||||
|
||||
if (route.type != ApiConfig.Type.DUBBO) {
|
||||
@@ -111,17 +111,17 @@ public class RouteFilter extends FizzWebFilter {
|
||||
|
||||
if (route == null) {
|
||||
String pathQuery = WebUtils.getClientReqPathQuery(exchange);
|
||||
return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs);
|
||||
return send(exchange, req.getMethod(), WebUtils.getClientService(exchange), pathQuery, hdrs, route);
|
||||
|
||||
} else if (route.type == ApiConfig.Type.SERVICE_DISCOVERY) {
|
||||
String pathQuery = route.getBackendPathQuery();
|
||||
return send(exchange, route.method, route.backendService, pathQuery, hdrs);
|
||||
return send(exchange, route.method, route.backendService, pathQuery, hdrs, route);
|
||||
|
||||
} else if (route.type == ApiConfig.Type.REVERSE_PROXY) {
|
||||
String uri = ThreadContext.getStringBuilder().append(route.nextHttpHostPort)
|
||||
.append(route.getBackendPathQuery())
|
||||
.toString();
|
||||
return fizzWebClient.send(traceId, route.method, uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange));
|
||||
return fizzWebClient.send(traceId, route.method, uri, hdrs, req.getBody(), route.timeout, route.retryCount, route.retryInterval).flatMap(genServerResponse(exchange));
|
||||
|
||||
} else if (route.type == ApiConfig.Type.DUBBO) {
|
||||
return dubboRpc(exchange, route);
|
||||
@@ -139,9 +139,14 @@ public class RouteFilter extends FizzWebFilter {
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<Void> send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs) {
|
||||
private Mono<Void> send(ServerWebExchange exchange, HttpMethod method, String service, String relativeUri, HttpHeaders hdrs, Route r) {
|
||||
ServerHttpRequest clientReq = exchange.getRequest();
|
||||
return fizzWebClient.send2service(WebUtils.getTraceId(exchange), method, service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange));
|
||||
if (r == null) {
|
||||
return fizzWebClient.send2service(WebUtils.getTraceId(exchange), method, service, relativeUri, hdrs, clientReq.getBody())
|
||||
.flatMap(genServerResponse(exchange));
|
||||
}
|
||||
return fizzWebClient.send2service(WebUtils.getTraceId(exchange), method, service, relativeUri, hdrs, clientReq.getBody(), r.timeout, r.retryCount, r.retryInterval)
|
||||
.flatMap(genServerResponse(exchange));
|
||||
}
|
||||
|
||||
private Function<ClientResponse, Mono<? extends Void>> genServerResponse(ServerWebExchange exchange) {
|
||||
|
||||
@@ -121,6 +121,10 @@ public class ApiConfig {
|
||||
|
||||
public long timeout = 0;
|
||||
|
||||
public int retryCount = 0;
|
||||
|
||||
public long retryInterval = 0;
|
||||
|
||||
public static boolean isAntPathPattern(String path) {
|
||||
boolean uriVar = false;
|
||||
for (int i = 0; i < path.length(); i++) {
|
||||
@@ -228,7 +232,9 @@ public class ApiConfig {
|
||||
.rpcParamTypes( this.rpcParamTypes)
|
||||
.rpcGroup( this.rpcGroup)
|
||||
.rpcVersion( this.rpcVersion)
|
||||
.timeout( this.timeout);
|
||||
.timeout( this.timeout)
|
||||
.retryCount( this.retryCount)
|
||||
.retryInterval( this.retryInterval);
|
||||
|
||||
if (this.type == Type.REVERSE_PROXY) {
|
||||
r = r.nextHttpHostPort(getNextHttpHostPort());
|
||||
|
||||
@@ -70,7 +70,6 @@ public class FizzWebClient {
|
||||
@Resource(name = ProxyWebClientConfig.proxyWebClient)
|
||||
private WebClient webClient;
|
||||
|
||||
// TODO
|
||||
public Mono<ClientResponse> send(String traceId,
|
||||
HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) {
|
||||
|
||||
@@ -113,14 +112,13 @@ public class FizzWebClient {
|
||||
return cr;
|
||||
}
|
||||
|
||||
// TODO
|
||||
public Mono<ClientResponse> send2service(@Nullable String traceId,
|
||||
HttpMethod method, String service, String relativeUri, @Nullable HttpHeaders headers, @Nullable Object body) {
|
||||
|
||||
return send2service(traceId, method, service, relativeUri, headers, body, 0, 0, 0);
|
||||
}
|
||||
|
||||
private Mono<ClientResponse> send2service(@Nullable String traceId,
|
||||
public Mono<ClientResponse> send2service(@Nullable String traceId,
|
||||
HttpMethod method, String service, String relativeUri, @Nullable HttpHeaders headers, @Nullable Object body,
|
||||
long timeout, long numRetries, long retryInterval) {
|
||||
|
||||
|
||||
@@ -54,6 +54,10 @@ public class Route {
|
||||
|
||||
public long timeout = 0;
|
||||
|
||||
public int retryCount = 0;
|
||||
|
||||
public long retryInterval = 0;
|
||||
|
||||
public Route type(byte t) {
|
||||
type = t;
|
||||
return this;
|
||||
@@ -114,6 +118,16 @@ public class Route {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Route retryCount(int rc) {
|
||||
retryCount = rc;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Route retryInterval(long ri) {
|
||||
retryInterval = ri;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getBackendPathQuery() {
|
||||
if (query != null) {
|
||||
return backendPath + Consts.S.QUESTION + query;
|
||||
|
||||
Reference in New Issue
Block a user