From 97f7b0b397528e720f4d3bffca560f63c7f4ff51 Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Sat, 9 Oct 2021 16:47:28 +0800 Subject: [PATCH] Support retry in aggregation #334 --- .../ExternalService4xxException.java | 44 ++++++++++ .../input/extension/dubbo/DubboInput.java | 21 +++-- .../extension/dubbo/DubboInputConfig.java | 35 ++++++++ .../fizz/input/extension/grpc/GrpcInput.java | 21 +++-- .../input/extension/grpc/GrpcInputConfig.java | 35 ++++++++ .../input/extension/request/RequestInput.java | 5 +- .../extension/request/RequestInputConfig.java | 40 +++++++-- .../src/main/java/we/proxy/FizzWebClient.java | 86 +++++++++++++------ .../we/proxy/grpc/GrpcGenericService.java | 5 +- 9 files changed, 248 insertions(+), 44 deletions(-) create mode 100644 fizz-core/src/main/java/we/exception/ExternalService4xxException.java diff --git a/fizz-core/src/main/java/we/exception/ExternalService4xxException.java b/fizz-core/src/main/java/we/exception/ExternalService4xxException.java new file mode 100644 index 0000000..fa6400e --- /dev/null +++ b/fizz-core/src/main/java/we/exception/ExternalService4xxException.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2021 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package we.exception; + +/** + * + * @author Francis Dong + * + */ +public class ExternalService4xxException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1108866900458042582L; + + public ExternalService4xxException() { + super(); + } + + public ExternalService4xxException(String message) { + super(message); + } + + public ExternalService4xxException(String message, Throwable cause) { + super(message, cause); + this.setStackTrace(cause.getStackTrace()); + } + +} diff --git a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java index 5a23bb1..93e2679 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInput.java @@ -17,6 +17,7 @@ package we.fizz.input.extension.dubbo; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -29,11 +30,13 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.util.CollectionUtils; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import we.FizzAppContext; import we.config.SystemConfig; import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; +import we.fizz.exception.FizzRuntimeException; import we.fizz.input.InputConfig; import we.fizz.input.InputContext; import we.fizz.input.InputType; @@ -62,6 +65,8 @@ public class DubboInput extends RPCInput { DubboInputConfig config = (DubboInputConfig) aConfig; int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); + long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0; + long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0; Map attachments = (Map) request.get("attachments"); ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext(); Map body = (Map) request.get("body"); @@ -89,12 +94,18 @@ public class DubboInput extends RPCInput { } } - Mono proxyResponse = proxy.send(body, declaration, contextAttachment); - return proxyResponse.flatMap(cr -> { - DubboRPCResponse response = new DubboRPCResponse(); - response.setBodyMono(Mono.just(cr)); - return Mono.just(response); + HashMap contextAttachment2 = contextAttachment; + Mono proxyResponse = Mono.just("").flatMap(s -> { + return proxy.send(body, declaration, contextAttachment2); }); + return proxyResponse.retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { + throw new FizzRuntimeException("External Dubbo Service failed to process after max retries"); + })).flatMap(cr -> { + DubboRPCResponse response = new DubboRPCResponse(); + response.setBodyMono(Mono.just(cr)); + return Mono.just(response); + }); } protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { diff --git a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInputConfig.java b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInputConfig.java index 99fc9de..0f3e068 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInputConfig.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/dubbo/DubboInputConfig.java @@ -36,6 +36,11 @@ public class DubboInputConfig extends InputConfig { private String method; private String paramTypes; private int timeout; + private long numRetries; + /** + * retry interval in millisecond + */ + private long retryInterval; public DubboInputConfig(Map configMap) { super(configMap); @@ -75,6 +80,20 @@ public class DubboInputConfig extends InputConfig { throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e); } } + if (configMap.get("numRetries") != null && StringUtils.isNotBlank(configMap.get("numRetries").toString())) { + try { + numRetries = Long.valueOf(configMap.get("numRetries").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid numRetries: " + configMap.get("numRetries").toString() + " " + e.getMessage(), e); + } + } + if (configMap.get("retryInterval") != null && StringUtils.isNotBlank(configMap.get("retryInterval").toString())) { + try { + retryInterval = Long.valueOf(configMap.get("retryInterval").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid retryInterval: " + configMap.get("retryInterval").toString() + " " + e.getMessage(), e); + } + } } public String getVersion() { @@ -125,4 +144,20 @@ public class DubboInputConfig extends InputConfig { this.timeout = timeout; } + public long getNumRetries() { + return numRetries; + } + + public void setNumRetries(long numRetries) { + this.numRetries = numRetries; + } + + public long getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(long retryInterval) { + this.retryInterval = retryInterval; + } + } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java index edc78e7..d56b617 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInput.java @@ -24,11 +24,13 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.util.CollectionUtils; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import we.FizzAppContext; import we.config.SystemConfig; import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; +import we.fizz.exception.FizzRuntimeException; import we.fizz.input.*; import we.flume.clients.log4j2appender.LogService; import we.proxy.grpc.GrpcGenericService; @@ -36,6 +38,7 @@ import we.proxy.grpc.GrpcInstanceService; import we.proxy.grpc.GrpcInterfaceDeclaration; import we.util.JacksonUtils; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -56,6 +59,8 @@ public class GrpcInput extends RPCInput implements IInput { GrpcInputConfig config = (GrpcInputConfig) aConfig; int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); + long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0; + long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0; Map attachments = (Map) request.get("attachments"); ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext(); Map body = (Map) request.get("body"); @@ -82,12 +87,18 @@ public class GrpcInput extends RPCInput implements IInput { } } - Mono proxyResponse = proxy.send(JSON.toJSONString(body), declaration, contextAttachment); - return proxyResponse.flatMap(cr -> { - GRPCResponse response = new GRPCResponse(); - response.setBodyMono(Mono.just(cr)); - return Mono.just(response); + HashMap contextAttachment2 = contextAttachment; + Mono proxyResponse = Mono.just("").flatMap(s -> { + return proxy.send(JSON.toJSONString(body), declaration, contextAttachment2); }); + return proxyResponse.retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { + throw new FizzRuntimeException("External gRPC Service failed to process after max retries"); + })).flatMap(cr -> { + GRPCResponse response = new GRPCResponse(); + response.setBodyMono(Mono.just(cr)); + return Mono.just(response); + }); } @SuppressWarnings("unchecked") diff --git a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInputConfig.java b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInputConfig.java index 818ff9d..d4e9e5b 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInputConfig.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/grpc/GrpcInputConfig.java @@ -35,6 +35,11 @@ public class GrpcInputConfig extends InputConfig { private int timeout; private String serviceName; private String method; + private long numRetries; + /** + * retry interval in millisecond + */ + private long retryInterval; public GrpcInputConfig(Map configMap) { super(configMap); @@ -60,6 +65,20 @@ public class GrpcInputConfig extends InputConfig { throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e); } } + if (configMap.get("numRetries") != null && StringUtils.isNotBlank(configMap.get("numRetries").toString())) { + try { + numRetries = Long.valueOf(configMap.get("numRetries").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid numRetries: " + configMap.get("numRetries").toString() + " " + e.getMessage(), e); + } + } + if (configMap.get("retryInterval") != null && StringUtils.isNotBlank(configMap.get("retryInterval").toString())) { + try { + retryInterval = Long.valueOf(configMap.get("retryInterval").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid retryInterval: " + configMap.get("retryInterval").toString() + " " + e.getMessage(), e); + } + } } public int getTimeout() { @@ -85,4 +104,20 @@ public class GrpcInputConfig extends InputConfig { public void setMethod(String method) { this.method = method; } + + public long getNumRetries() { + return numRetries; + } + + public void setNumRetries(long numRetries) { + this.numRetries = numRetries; + } + + public long getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(long retryInterval) { + this.retryInterval = retryInterval; + } } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java index 3138045..22c91c7 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -328,6 +328,8 @@ public class RequestInput extends RPCInput implements IInput{ RequestInputConfig config = (RequestInputConfig) aConfig; int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); + long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0; + long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0; HttpMethod method = HttpMethod.valueOf(config.getMethod()); String url = (String) request.get("url"); @@ -407,7 +409,8 @@ public class RequestInput extends RPCInput implements IInput{ // Mono clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, // headers, body, (long)timeout); - Mono clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url, headers, body, (long)timeout); + Mono clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url, + headers, body, (long)timeout, numRetries, retryInterval); return clientResponse.flatMap(cr->{ RequestRPCResponse response = new RequestRPCResponse(); response.setHeaders(cr.headers().asHttpHeaders()); diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java index 0467365..9445b98 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java @@ -19,17 +19,13 @@ package we.fizz.input.extension.request; import java.net.MalformedURLException; import java.net.URL; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; -import org.noear.snack.ONode; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; + import we.fizz.input.InputConfig; -import we.fizz.input.PathMapping; @@ -50,6 +46,11 @@ public class RequestInputConfig extends InputConfig { private Integer serviceType; private String serviceName; private String path; + private long numRetries; + /** + * retry interval in millisecond + */ + private long retryInterval; public RequestInputConfig(Map configBody) { super(configBody); @@ -81,6 +82,20 @@ public class RequestInputConfig extends InputConfig { throw new RuntimeException("invalid timeout: " + configBody.get("timeout").toString() + " " + e.getMessage(), e); } } + if (configBody.get("numRetries") != null && StringUtils.isNotBlank(configBody.get("numRetries").toString())) { + try { + numRetries = Long.valueOf(configBody.get("numRetries").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid numRetries: " + configBody.get("numRetries").toString() + " " + e.getMessage(), e); + } + } + if (configBody.get("retryInterval") != null && StringUtils.isNotBlank(configBody.get("retryInterval").toString())) { + try { + retryInterval = Long.valueOf(configBody.get("retryInterval").toString()); + } catch (Exception e) { + throw new RuntimeException("invalid retryInterval: " + configBody.get("retryInterval").toString() + " " + e.getMessage(), e); + } + } if (configBody.get("fallback") != null) { Map fallback = (Map)configBody.get("fallback"); setFallback(fallback); @@ -181,4 +196,19 @@ public class RequestInputConfig extends InputConfig { this.path = path; } + public long getNumRetries() { + return numRetries; + } + + public void setNumRetries(long numRetries) { + this.numRetries = numRetries; + } + + public long getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(long retryInterval) { + this.retryInterval = retryInterval; + } } diff --git a/fizz-core/src/main/java/we/proxy/FizzWebClient.java b/fizz-core/src/main/java/we/proxy/FizzWebClient.java index 684ea59..bf1d922 100644 --- a/fizz-core/src/main/java/we/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/we/proxy/FizzWebClient.java @@ -17,6 +17,12 @@ package we.proxy; +import java.time.Duration; +import java.util.Collections; +import java.util.List; + +import javax.annotation.Resource; + import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,20 +35,19 @@ import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import we.config.ProxyWebClientConfig; import we.config.SystemConfig; +import we.exception.ExternalService4xxException; +import we.fizz.exception.FizzRuntimeException; import we.flume.clients.log4j2appender.LogService; import we.util.Consts; import we.util.ThreadContext; import we.util.WebUtils; -import javax.annotation.Resource; -import java.time.Duration; -import java.util.Collections; -import java.util.List; - /** * @author hongqiaowei */ @@ -66,37 +71,68 @@ public class FizzWebClient { private WebClient proxyWebClient; public Mono send(String reqId, HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) { - return send(reqId, method, uriOrSvc, headers, body, 0); + return send(reqId, method, uriOrSvc, headers, body, 0, 0, 0); } - public Mono send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, long timeout) { + public Mono send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, + long timeout, long numRetries, long retryInterval) { String s = extractServiceOrAddress(uriOrSvc); - if (isService(s)) { - String path = uriOrSvc.substring(uriOrSvc.indexOf(Consts.S.FORWARD_SLASH, 10)); - return send2service(reqId, method, s, path, headers, body, timeout); - } else { - return send2uri(reqId, method, uriOrSvc, headers, body, timeout); + + Mono cr = Mono.just("").flatMap(dummy -> { + if (isService(s)) { + String path = uriOrSvc.substring(uriOrSvc.indexOf(Consts.S.FORWARD_SLASH, 10)); + String uri = discoveryClientUriSelector.getNextUri(s, path); + return send2uri(reqId, method, uri, headers, body, timeout); + } else { + return send2uri(reqId, method, uriOrSvc, headers, body, timeout); + } + }); + + if (numRetries > 0) { + cr = cr.flatMap(resp->{ + // Do not retry on 4xx client error + if (resp.statusCode().is4xxClientError()) { + return Mono.error(new ExternalService4xxException()); + } + return Mono.just(resp); + }).retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval > 0 ? retryInterval : 0)) + .filter(throwable -> !(throwable instanceof ExternalService4xxException)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { + throw new FizzRuntimeException("External Service failed to process after max retries"); + })); } + return cr; } public Mono send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, @Nullable HttpHeaders headers, @Nullable Object body) { - - return send2service(clientReqId, method, service, relativeUri, headers, body, 0); + return send2service(clientReqId, method, service, relativeUri, headers, body, 0, 0, 0); } - public Mono send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, - @Nullable HttpHeaders headers, @Nullable Object body, long timeout) { - - String uri = discoveryClientUriSelector.getNextUri(service, relativeUri); - return send2uri(clientReqId, method, uri, headers, body, timeout); + private Mono send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, + @Nullable HttpHeaders headers, @Nullable Object body, long timeout, long numRetries, long retryInterval) { + Mono cr = Mono.just("").flatMap(dummy -> { + String uri = discoveryClientUriSelector.getNextUri(service, relativeUri); + return send2uri(clientReqId, method, uri, headers, body, timeout); + }); + if (numRetries > 0) { + cr = cr.flatMap(resp->{ + // Do not retry on 4xx client error + if (resp.statusCode().is4xxClientError()) { + return Mono.error(new ExternalService4xxException()); + } + return Mono.just(resp); + }).retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval > 0 ? retryInterval : 0)) + .filter(throwable -> !(throwable instanceof ExternalService4xxException)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { + throw new FizzRuntimeException("External Service failed to process after max retries"); + })); + } + return cr; } - public Mono send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body) { - return send2uri(clientReqId, method, uri, headers, body, 0); - } - - public Mono send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body, long timeout) { + private Mono send2uri(@Nullable String clientReqId, HttpMethod method, String uri, + @Nullable HttpHeaders headers, @Nullable Object body, long timeout) { if (log.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); @@ -134,7 +170,7 @@ public class FizzWebClient { timeout = systemConfig.getRouteTimeout(); } } - if (timeout != 0) { + if (timeout > 0) { cr = cr.timeout(Duration.ofMillis(timeout)); } diff --git a/fizz-core/src/main/java/we/proxy/grpc/GrpcGenericService.java b/fizz-core/src/main/java/we/proxy/grpc/GrpcGenericService.java index e584e29..09dde95 100644 --- a/fizz-core/src/main/java/we/proxy/grpc/GrpcGenericService.java +++ b/fizz-core/src/main/java/we/proxy/grpc/GrpcGenericService.java @@ -67,11 +67,10 @@ public class GrpcGenericService { ManagedChannel channel = null; try { channel = ChannelFactory.create(endPoint, metaHeaderMap); - CallOptions calloptions = DEFAULT; - calloptions.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS); + CallOptions calloptions = DEFAULT.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS); CallResults callResults = new CallResults(); - ListenableFuture future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, DEFAULT, + ListenableFuture future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, calloptions, singletonList(payload), callResults); return Mono.fromFuture(new ListenableFutureAdapter(future).getCompletableFuture().thenApply(ret -> { return callResults.asJSON();