Support retry in aggregation #334

This commit is contained in:
Francis Dong
2021-10-09 16:47:28 +08:00
committed by dxfeng10
parent 1d9e508410
commit 97f7b0b397
9 changed files with 248 additions and 44 deletions

View File

@@ -0,0 +1,44 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.exception;
/**
*
* @author Francis Dong
*
*/
public class ExternalService4xxException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = 1108866900458042582L;
public ExternalService4xxException() {
super();
}
public ExternalService4xxException(String message) {
super(message);
}
public ExternalService4xxException(String message, Throwable cause) {
super(message, cause);
this.setStackTrace(cause.getStackTrace());
}
}

View File

@@ -17,6 +17,7 @@
package we.fizz.input.extension.dubbo; package we.fizz.input.extension.dubbo;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -29,11 +30,13 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext; import we.FizzAppContext;
import we.config.SystemConfig; import we.config.SystemConfig;
import we.constants.CommonConstants; import we.constants.CommonConstants;
import we.exception.ExecuteScriptException; import we.exception.ExecuteScriptException;
import we.fizz.StepContext; import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.InputConfig; import we.fizz.input.InputConfig;
import we.fizz.input.InputContext; import we.fizz.input.InputContext;
import we.fizz.input.InputType; import we.fizz.input.InputType;
@@ -62,6 +65,8 @@ public class DubboInput extends RPCInput {
DubboInputConfig config = (DubboInputConfig) aConfig; DubboInputConfig config = (DubboInputConfig) aConfig;
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0;
long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0;
Map<String, String> attachments = (Map<String, String>) request.get("attachments"); Map<String, String> attachments = (Map<String, String>) request.get("attachments");
ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext(); ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext();
Map<String, Object> body = (Map<String, Object>) request.get("body"); Map<String, Object> body = (Map<String, Object>) request.get("body");
@@ -89,8 +94,14 @@ public class DubboInput extends RPCInput {
} }
} }
Mono<Object> proxyResponse = proxy.send(body, declaration, contextAttachment); HashMap<String, String> contextAttachment2 = contextAttachment;
return proxyResponse.flatMap(cr -> { Mono<Object> proxyResponse = Mono.just("").flatMap(s -> {
return proxy.send(body, declaration, contextAttachment2);
});
return proxyResponse.retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new FizzRuntimeException("External Dubbo Service failed to process after max retries");
})).flatMap(cr -> {
DubboRPCResponse response = new DubboRPCResponse(); DubboRPCResponse response = new DubboRPCResponse();
response.setBodyMono(Mono.just(cr)); response.setBodyMono(Mono.just(cr));
return Mono.just(response); return Mono.just(response);

View File

@@ -36,6 +36,11 @@ public class DubboInputConfig extends InputConfig {
private String method; private String method;
private String paramTypes; private String paramTypes;
private int timeout; private int timeout;
private long numRetries;
/**
* retry interval in millisecond
*/
private long retryInterval;
public DubboInputConfig(Map configMap) { public DubboInputConfig(Map configMap) {
super(configMap); super(configMap);
@@ -75,6 +80,20 @@ public class DubboInputConfig extends InputConfig {
throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e); throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e);
} }
} }
if (configMap.get("numRetries") != null && StringUtils.isNotBlank(configMap.get("numRetries").toString())) {
try {
numRetries = Long.valueOf(configMap.get("numRetries").toString());
} catch (Exception e) {
throw new RuntimeException("invalid numRetries: " + configMap.get("numRetries").toString() + " " + e.getMessage(), e);
}
}
if (configMap.get("retryInterval") != null && StringUtils.isNotBlank(configMap.get("retryInterval").toString())) {
try {
retryInterval = Long.valueOf(configMap.get("retryInterval").toString());
} catch (Exception e) {
throw new RuntimeException("invalid retryInterval: " + configMap.get("retryInterval").toString() + " " + e.getMessage(), e);
}
}
} }
public String getVersion() { public String getVersion() {
@@ -125,4 +144,20 @@ public class DubboInputConfig extends InputConfig {
this.timeout = timeout; this.timeout = timeout;
} }
public long getNumRetries() {
return numRetries;
}
public void setNumRetries(long numRetries) {
this.numRetries = numRetries;
}
public long getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
}
} }

View File

@@ -24,11 +24,13 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.FizzAppContext; import we.FizzAppContext;
import we.config.SystemConfig; import we.config.SystemConfig;
import we.constants.CommonConstants; import we.constants.CommonConstants;
import we.exception.ExecuteScriptException; import we.exception.ExecuteScriptException;
import we.fizz.StepContext; import we.fizz.StepContext;
import we.fizz.exception.FizzRuntimeException;
import we.fizz.input.*; import we.fizz.input.*;
import we.flume.clients.log4j2appender.LogService; import we.flume.clients.log4j2appender.LogService;
import we.proxy.grpc.GrpcGenericService; import we.proxy.grpc.GrpcGenericService;
@@ -36,6 +38,7 @@ import we.proxy.grpc.GrpcInstanceService;
import we.proxy.grpc.GrpcInterfaceDeclaration; import we.proxy.grpc.GrpcInterfaceDeclaration;
import we.util.JacksonUtils; import we.util.JacksonUtils;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -56,6 +59,8 @@ public class GrpcInput extends RPCInput implements IInput {
GrpcInputConfig config = (GrpcInputConfig) aConfig; GrpcInputConfig config = (GrpcInputConfig) aConfig;
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0;
long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0;
Map<String, String> attachments = (Map<String, String>) request.get("attachments"); Map<String, String> attachments = (Map<String, String>) request.get("attachments");
ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext(); ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext();
Map<String, Object> body = (Map<String, Object>) request.get("body"); Map<String, Object> body = (Map<String, Object>) request.get("body");
@@ -82,8 +87,14 @@ public class GrpcInput extends RPCInput implements IInput {
} }
} }
Mono<Object> proxyResponse = proxy.send(JSON.toJSONString(body), declaration, contextAttachment); HashMap<String, Object> contextAttachment2 = contextAttachment;
return proxyResponse.flatMap(cr -> { Mono<Object> proxyResponse = Mono.just("").flatMap(s -> {
return proxy.send(JSON.toJSONString(body), declaration, contextAttachment2);
});
return proxyResponse.retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new FizzRuntimeException("External gRPC Service failed to process after max retries");
})).flatMap(cr -> {
GRPCResponse response = new GRPCResponse(); GRPCResponse response = new GRPCResponse();
response.setBodyMono(Mono.just(cr)); response.setBodyMono(Mono.just(cr));
return Mono.just(response); return Mono.just(response);

View File

@@ -35,6 +35,11 @@ public class GrpcInputConfig extends InputConfig {
private int timeout; private int timeout;
private String serviceName; private String serviceName;
private String method; private String method;
private long numRetries;
/**
* retry interval in millisecond
*/
private long retryInterval;
public GrpcInputConfig(Map configMap) { public GrpcInputConfig(Map configMap) {
super(configMap); super(configMap);
@@ -60,6 +65,20 @@ public class GrpcInputConfig extends InputConfig {
throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e); throw new RuntimeException("invalid timeout: " + configMap.get("timeout").toString() + " " + e.getMessage(), e);
} }
} }
if (configMap.get("numRetries") != null && StringUtils.isNotBlank(configMap.get("numRetries").toString())) {
try {
numRetries = Long.valueOf(configMap.get("numRetries").toString());
} catch (Exception e) {
throw new RuntimeException("invalid numRetries: " + configMap.get("numRetries").toString() + " " + e.getMessage(), e);
}
}
if (configMap.get("retryInterval") != null && StringUtils.isNotBlank(configMap.get("retryInterval").toString())) {
try {
retryInterval = Long.valueOf(configMap.get("retryInterval").toString());
} catch (Exception e) {
throw new RuntimeException("invalid retryInterval: " + configMap.get("retryInterval").toString() + " " + e.getMessage(), e);
}
}
} }
public int getTimeout() { public int getTimeout() {
@@ -85,4 +104,20 @@ public class GrpcInputConfig extends InputConfig {
public void setMethod(String method) { public void setMethod(String method) {
this.method = method; this.method = method;
} }
public long getNumRetries() {
return numRetries;
}
public void setNumRetries(long numRetries) {
this.numRetries = numRetries;
}
public long getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
}
} }

View File

@@ -328,6 +328,8 @@ public class RequestInput extends RPCInput implements IInput{
RequestInputConfig config = (RequestInputConfig) aConfig; RequestInputConfig config = (RequestInputConfig) aConfig;
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
long numRetries = config.getNumRetries() > 0 ? config.getNumRetries() : 0;
long retryInterval = config.getRetryInterval() > 0 ? config.getRetryInterval() : 0;
HttpMethod method = HttpMethod.valueOf(config.getMethod()); HttpMethod method = HttpMethod.valueOf(config.getMethod());
String url = (String) request.get("url"); String url = (String) request.get("url");
@@ -407,7 +409,8 @@ public class RequestInput extends RPCInput implements IInput{
// Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, // Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
// headers, body, (long)timeout); // headers, body, (long)timeout);
Mono<ClientResponse> clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url, headers, body, (long)timeout); Mono<ClientResponse> clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url,
headers, body, (long)timeout, numRetries, retryInterval);
return clientResponse.flatMap(cr->{ return clientResponse.flatMap(cr->{
RequestRPCResponse response = new RequestRPCResponse(); RequestRPCResponse response = new RequestRPCResponse();
response.setHeaders(cr.headers().asHttpHeaders()); response.setHeaders(cr.headers().asHttpHeaders());

View File

@@ -19,17 +19,13 @@ package we.fizz.input.extension.request;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.noear.snack.ONode;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder; import org.springframework.web.util.UriComponentsBuilder;
import we.fizz.input.InputConfig; import we.fizz.input.InputConfig;
import we.fizz.input.PathMapping;
@@ -50,6 +46,11 @@ public class RequestInputConfig extends InputConfig {
private Integer serviceType; private Integer serviceType;
private String serviceName; private String serviceName;
private String path; private String path;
private long numRetries;
/**
* retry interval in millisecond
*/
private long retryInterval;
public RequestInputConfig(Map configBody) { public RequestInputConfig(Map configBody) {
super(configBody); super(configBody);
@@ -81,6 +82,20 @@ public class RequestInputConfig extends InputConfig {
throw new RuntimeException("invalid timeout: " + configBody.get("timeout").toString() + " " + e.getMessage(), e); throw new RuntimeException("invalid timeout: " + configBody.get("timeout").toString() + " " + e.getMessage(), e);
} }
} }
if (configBody.get("numRetries") != null && StringUtils.isNotBlank(configBody.get("numRetries").toString())) {
try {
numRetries = Long.valueOf(configBody.get("numRetries").toString());
} catch (Exception e) {
throw new RuntimeException("invalid numRetries: " + configBody.get("numRetries").toString() + " " + e.getMessage(), e);
}
}
if (configBody.get("retryInterval") != null && StringUtils.isNotBlank(configBody.get("retryInterval").toString())) {
try {
retryInterval = Long.valueOf(configBody.get("retryInterval").toString());
} catch (Exception e) {
throw new RuntimeException("invalid retryInterval: " + configBody.get("retryInterval").toString() + " " + e.getMessage(), e);
}
}
if (configBody.get("fallback") != null) { if (configBody.get("fallback") != null) {
Map<String,String> fallback = (Map<String,String>)configBody.get("fallback"); Map<String,String> fallback = (Map<String,String>)configBody.get("fallback");
setFallback(fallback); setFallback(fallback);
@@ -181,4 +196,19 @@ public class RequestInputConfig extends InputConfig {
this.path = path; this.path = path;
} }
public long getNumRetries() {
return numRetries;
}
public void setNumRetries(long numRetries) {
this.numRetries = numRetries;
}
public long getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
}
} }

View File

@@ -17,6 +17,12 @@
package we.proxy; package we.proxy;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -29,20 +35,19 @@ import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import we.config.ProxyWebClientConfig; import we.config.ProxyWebClientConfig;
import we.config.SystemConfig; import we.config.SystemConfig;
import we.exception.ExternalService4xxException;
import we.fizz.exception.FizzRuntimeException;
import we.flume.clients.log4j2appender.LogService; import we.flume.clients.log4j2appender.LogService;
import we.util.Consts; import we.util.Consts;
import we.util.ThreadContext; import we.util.ThreadContext;
import we.util.WebUtils; import we.util.WebUtils;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
/** /**
* @author hongqiaowei * @author hongqiaowei
*/ */
@@ -66,37 +71,68 @@ public class FizzWebClient {
private WebClient proxyWebClient; private WebClient proxyWebClient;
public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) { public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) {
return send(reqId, method, uriOrSvc, headers, body, 0); return send(reqId, method, uriOrSvc, headers, body, 0, 0, 0);
} }
public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, long timeout) { public Mono<ClientResponse> send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body,
long timeout, long numRetries, long retryInterval) {
String s = extractServiceOrAddress(uriOrSvc); String s = extractServiceOrAddress(uriOrSvc);
Mono<ClientResponse> cr = Mono.just("").flatMap(dummy -> {
if (isService(s)) { if (isService(s)) {
String path = uriOrSvc.substring(uriOrSvc.indexOf(Consts.S.FORWARD_SLASH, 10)); String path = uriOrSvc.substring(uriOrSvc.indexOf(Consts.S.FORWARD_SLASH, 10));
return send2service(reqId, method, s, path, headers, body, timeout); String uri = discoveryClientUriSelector.getNextUri(s, path);
return send2uri(reqId, method, uri, headers, body, timeout);
} else { } else {
return send2uri(reqId, method, uriOrSvc, headers, body, timeout); return send2uri(reqId, method, uriOrSvc, headers, body, timeout);
} }
});
if (numRetries > 0) {
cr = cr.flatMap(resp->{
// Do not retry on 4xx client error
if (resp.statusCode().is4xxClientError()) {
return Mono.error(new ExternalService4xxException());
}
return Mono.just(resp);
}).retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval > 0 ? retryInterval : 0))
.filter(throwable -> !(throwable instanceof ExternalService4xxException))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new FizzRuntimeException("External Service failed to process after max retries");
}));
}
return cr;
} }
public Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, public Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body) { @Nullable HttpHeaders headers, @Nullable Object body) {
return send2service(clientReqId, method, service, relativeUri, headers, body, 0, 0, 0);
return send2service(clientReqId, method, service, relativeUri, headers, body, 0);
} }
public Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, private Mono<ClientResponse> send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri,
@Nullable HttpHeaders headers, @Nullable Object body, long timeout) { @Nullable HttpHeaders headers, @Nullable Object body, long timeout, long numRetries, long retryInterval) {
Mono<ClientResponse> cr = Mono.just("").flatMap(dummy -> {
String uri = discoveryClientUriSelector.getNextUri(service, relativeUri); String uri = discoveryClientUriSelector.getNextUri(service, relativeUri);
return send2uri(clientReqId, method, uri, headers, body, timeout); return send2uri(clientReqId, method, uri, headers, body, timeout);
});
if (numRetries > 0) {
cr = cr.flatMap(resp->{
// Do not retry on 4xx client error
if (resp.statusCode().is4xxClientError()) {
return Mono.error(new ExternalService4xxException());
}
return Mono.just(resp);
}).retryWhen(Retry.fixedDelay(numRetries, Duration.ofMillis(retryInterval > 0 ? retryInterval : 0))
.filter(throwable -> !(throwable instanceof ExternalService4xxException))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new FizzRuntimeException("External Service failed to process after max retries");
}));
}
return cr;
} }
public Mono<ClientResponse> send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body) { private Mono<ClientResponse> send2uri(@Nullable String clientReqId, HttpMethod method, String uri,
return send2uri(clientReqId, method, uri, headers, body, 0); @Nullable HttpHeaders headers, @Nullable Object body, long timeout) {
}
public Mono<ClientResponse> send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body, long timeout) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
@@ -134,7 +170,7 @@ public class FizzWebClient {
timeout = systemConfig.getRouteTimeout(); timeout = systemConfig.getRouteTimeout();
} }
} }
if (timeout != 0) { if (timeout > 0) {
cr = cr.timeout(Duration.ofMillis(timeout)); cr = cr.timeout(Duration.ofMillis(timeout));
} }

View File

@@ -67,11 +67,10 @@ public class GrpcGenericService {
ManagedChannel channel = null; ManagedChannel channel = null;
try { try {
channel = ChannelFactory.create(endPoint, metaHeaderMap); channel = ChannelFactory.create(endPoint, metaHeaderMap);
CallOptions calloptions = DEFAULT; CallOptions calloptions = DEFAULT.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS);
calloptions.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS);
CallResults callResults = new CallResults(); CallResults callResults = new CallResults();
ListenableFuture<Void> future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, DEFAULT, ListenableFuture<Void> future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, calloptions,
singletonList(payload), callResults); singletonList(payload), callResults);
return Mono.fromFuture(new ListenableFutureAdapter(future).getCompletableFuture().thenApply(ret -> { return Mono.fromFuture(new ListenableFutureAdapter(future).getCompletableFuture().thenApply(ret -> {
return callResults.asJSON(); return callResults.asJSON();