add application context and weak ref to input and step, implement dubbo aggregation without unit test.
This commit is contained in:
15
pom.xml
15
pom.xml
@@ -41,6 +41,7 @@
|
||||
<netty.version>4.1.58.Final</netty.version>
|
||||
<httpcore.version>4.4.14</httpcore.version>
|
||||
<log4j2.version>2.13.3</log4j2.version>
|
||||
<apache.dubbo.version>2.7.5</apache.dubbo.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@@ -218,6 +219,20 @@
|
||||
<artifactId>java-jwt</artifactId>
|
||||
<version>3.12.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-inline</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.dubbo</groupId>
|
||||
<artifactId>dubbo</artifactId>
|
||||
<version>${apache.dubbo.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
|
||||
@@ -7,6 +7,4 @@ public class FizzException extends Exception {
|
||||
public FizzException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
70
src/main/java/we/fizz/input/DubboInput.java
Normal file
70
src/main/java/we/fizz/input/DubboInput.java
Normal file
@@ -0,0 +1,70 @@
|
||||
package we.fizz.input;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.constants.CommonConstants;
|
||||
import we.proxy.dubbo.ApacheDubboGenericProxy;
|
||||
import we.proxy.dubbo.DubboInterfaceDeclaration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
/**
|
||||
*
|
||||
* @author linwaiwai
|
||||
*
|
||||
*/
|
||||
public class DubboInput extends RPCInput {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DubboInput.class);
|
||||
|
||||
@Override
|
||||
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
|
||||
DubboInputConfig config = (DubboInputConfig) aConfig;
|
||||
|
||||
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
|
||||
Map<String, String> attachments = (Map<String, String>) request.get("attachments");
|
||||
ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext();
|
||||
String body = (String)request.get("body");
|
||||
|
||||
ApacheDubboGenericProxy proxy = applicationContext.getBean(ApacheDubboGenericProxy.class);
|
||||
DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration();
|
||||
declaration.setServiceName(config.getServiceName());
|
||||
declaration.setMethod(config.getMethod());
|
||||
declaration.setParameterTypes(config.getParameterTypes());
|
||||
declaration.setTimeout(timeout);
|
||||
|
||||
HashMap<String, String> contextAttachment = new HashMap<String, String>(attachments);
|
||||
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
|
||||
return proxy.send(body, declaration, contextAttachment).flatMap(cr->{
|
||||
DubboRPCResponse response = new DubboRPCResponse();
|
||||
String responseStr = JSON.toJSONString(cr);
|
||||
response.setBodyMono(Mono.just(responseStr));
|
||||
return Mono.just(response);
|
||||
});
|
||||
}
|
||||
|
||||
protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
|
||||
|
||||
}
|
||||
|
||||
protected void doOnResponseSuccess(RPCResponse cr, long elapsedMillis) {
|
||||
|
||||
}
|
||||
protected Mono<String> bodyToMono(RPCResponse cr){
|
||||
return cr.getBodyMono();
|
||||
}
|
||||
|
||||
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
|
||||
|
||||
}
|
||||
|
||||
protected void doOnBodySuccess(String resp, long elapsedMillis) {
|
||||
|
||||
}
|
||||
|
||||
protected void doResponseMapping(InputConfig aConfig, InputContext inputContext, String responseBody) {
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
69
src/main/java/we/fizz/input/DubboInputConfig.java
Normal file
69
src/main/java/we/fizz/input/DubboInputConfig.java
Normal file
@@ -0,0 +1,69 @@
|
||||
package we.fizz.input;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
import we.fizz.exception.FizzException;
|
||||
import we.fizz.exception.FizzRuntimeException;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class DubboInputConfig extends InputConfig {
|
||||
private String serviceName;
|
||||
private String method;
|
||||
|
||||
public String getParameterTypes() {
|
||||
return parameterTypes;
|
||||
}
|
||||
|
||||
public void setParameterTypes(String parameterTypes) {
|
||||
this.parameterTypes = parameterTypes;
|
||||
}
|
||||
|
||||
private String parameterTypes;
|
||||
|
||||
public void setServiceName(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public void setMethod(String method) {
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
public DubboInputConfig(Map configMap) {
|
||||
super(configMap);
|
||||
}
|
||||
public void parse(){
|
||||
String serviceName = (String) configMap.get("serviceName");
|
||||
if(StringUtils.isEmpty(serviceName)) {
|
||||
throw new FizzRuntimeException("service name can not be blank");
|
||||
}
|
||||
setServiceName(serviceName);
|
||||
String method = (String) configMap.get("method");
|
||||
if (StringUtils.isEmpty(method)) {
|
||||
throw new FizzRuntimeException("method can not be blank");
|
||||
}
|
||||
setMethod(method);
|
||||
}
|
||||
|
||||
private int timeout;
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
4
src/main/java/we/fizz/input/DubboRPCResponse.java
Normal file
4
src/main/java/we/fizz/input/DubboRPCResponse.java
Normal file
@@ -0,0 +1,4 @@
|
||||
package we.fizz.input;
|
||||
|
||||
public class DubboRPCResponse extends RPCResponse{
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import java.util.Map;
|
||||
* @author linwaiwai
|
||||
*
|
||||
*/
|
||||
|
||||
public class InputType {
|
||||
|
||||
private final String type;
|
||||
|
||||
14
src/main/java/we/fizz/input/RequestRPCResponse.java
Normal file
14
src/main/java/we/fizz/input/RequestRPCResponse.java
Normal file
@@ -0,0 +1,14 @@
|
||||
package we.fizz.input;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
public class RequestRPCResponse extends RPCResponse {
|
||||
private HttpStatus statusCode;
|
||||
public void setStatus(HttpStatus statusCode) {
|
||||
this.statusCode = statusCode;
|
||||
}
|
||||
|
||||
public HttpStatus getStatusCode() {
|
||||
return statusCode;
|
||||
}
|
||||
}
|
||||
@@ -58,11 +58,7 @@ public class RequestInput extends RPCInput implements IInput{
|
||||
static public InputType TYPE = new InputType("REQUEST");
|
||||
private InputType type;
|
||||
protected Map<String, Object> dataMapping;
|
||||
protected Map<String, Object> request = new HashMap<>();
|
||||
protected Map<String, Object> response = new HashMap<>();
|
||||
|
||||
private static final String FALLBACK_MODE_STOP = "stop";
|
||||
private static final String FALLBACK_MODE_CONTINUE = "continue";
|
||||
|
||||
private static final String CONTENT_TYPE_JSON = "application/json";
|
||||
private static final String CONTENT_TYPE_XML = "application/xml";
|
||||
|
||||
70
src/main/java/we/proxy/dubbo/ApacheDubboGenericProxy.java
Normal file
70
src/main/java/we/proxy/dubbo/ApacheDubboGenericProxy.java
Normal file
@@ -0,0 +1,70 @@
|
||||
package we.proxy.dubbo;
|
||||
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.config.RegistryConfig;
|
||||
import org.apache.dubbo.rpc.RpcContext;
|
||||
import org.apache.dubbo.rpc.service.GenericException;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.fizz.exception.FizzException;
|
||||
import we.fizz.input.DubboInputConfig;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
@Service
|
||||
public class ApacheDubboGenericProxy {
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
}
|
||||
|
||||
public ReferenceConfig<GenericService> createReferenceConfig(String serviceName){
|
||||
ApplicationConfig applicationConfig = new ApplicationConfig();
|
||||
applicationConfig.setName("fizz_proxy");
|
||||
RegistryConfig registryConfig = new RegistryConfig();
|
||||
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
|
||||
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
|
||||
referenceConfig.setInterface(serviceName);
|
||||
applicationConfig.setRegistry(registryConfig);
|
||||
referenceConfig.setApplication(applicationConfig);
|
||||
referenceConfig.setGeneric(true);
|
||||
referenceConfig.setAsync(true);
|
||||
referenceConfig.setTimeout(7000);
|
||||
return referenceConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic invoke.
|
||||
*
|
||||
* @param body the json string body
|
||||
* @param interfaceDeclaration the interface declaration
|
||||
* @return the object
|
||||
* @throws FizzException the fizz exception
|
||||
*/
|
||||
public Mono<Object> send(final String body, final DubboInterfaceDeclaration interfaceDeclaration, HashMap<String, String> attachments ) {
|
||||
|
||||
RpcContext.getContext().setAttachments(attachments);
|
||||
ReferenceConfig<GenericService> reference = createReferenceConfig(interfaceDeclaration.getServiceName());
|
||||
reference.setTimeout(interfaceDeclaration.getTimeout());
|
||||
GenericService genericService = reference.get();
|
||||
Pair<String[], Object[]> pair;
|
||||
if (DubboUtils.isEmpty(body)) {
|
||||
pair = new ImmutablePair<String[], Object[]>(new String[]{}, new Object[]{});
|
||||
} else {
|
||||
pair = DubboUtils.parseDubboParam(body, interfaceDeclaration.getParameterTypes());
|
||||
|
||||
}
|
||||
CompletableFuture<Object> future = genericService.$invokeAsync(interfaceDeclaration.getMethod(), pair.getLeft(), pair.getRight());
|
||||
return Mono.fromFuture(future.thenApply(ret -> {
|
||||
return ret;
|
||||
})).onErrorMap(exception -> exception instanceof GenericException ? new FizzException(((GenericException) exception).getExceptionMessage()) : new FizzException(exception));
|
||||
}
|
||||
|
||||
}
|
||||
44
src/main/java/we/proxy/dubbo/DubboInterfaceDeclaration.java
Normal file
44
src/main/java/we/proxy/dubbo/DubboInterfaceDeclaration.java
Normal file
@@ -0,0 +1,44 @@
|
||||
package we.proxy.dubbo;
|
||||
|
||||
public class DubboInterfaceDeclaration {
|
||||
private String parameterTypes;
|
||||
private String method;
|
||||
private String serviceName;
|
||||
private int timeout;
|
||||
|
||||
public DubboInterfaceDeclaration() {
|
||||
}
|
||||
|
||||
|
||||
public String getParameterTypes() {
|
||||
return parameterTypes;
|
||||
}
|
||||
// call method name
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
// service name
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public void setServiceName(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public void setMethod(String method) {
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
public void setParameterTypes(String parameterTypes) {
|
||||
this.parameterTypes = parameterTypes;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public Integer getTimeout() {
|
||||
return this.timeout;
|
||||
}
|
||||
}
|
||||
40
src/main/java/we/proxy/dubbo/DubboUtils.java
Normal file
40
src/main/java/we/proxy/dubbo/DubboUtils.java
Normal file
@@ -0,0 +1,40 @@
|
||||
package we.proxy.dubbo;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DubboUtils {
|
||||
public static boolean isEmpty(final String body) {
|
||||
return null == body || "".equals(body) || "{}".equals(body) || "null".equals(body);
|
||||
}
|
||||
/*
|
||||
* body json string
|
||||
*/
|
||||
public static Pair<String[], Object[]> parseDubboParam(String body, final String parameterTypes) {
|
||||
|
||||
Map<String, Object> paramMap = (Map<String,Object>) JSON.parse(body);
|
||||
String[] parameter = StringUtils.split(parameterTypes, ',');
|
||||
if (parameter.length == 1 && !isBaseType(parameter[0])) {
|
||||
return new ImmutablePair<>(parameter, new Object[]{paramMap});
|
||||
}
|
||||
List<Object> list = new LinkedList<>();
|
||||
for (String key : paramMap.keySet()) {
|
||||
Object obj = paramMap.get(key);
|
||||
if (obj != null) {
|
||||
list.add(obj);
|
||||
}
|
||||
}
|
||||
Object[] objects = list.toArray();
|
||||
return new ImmutablePair<>(parameter, objects);
|
||||
}
|
||||
|
||||
private static boolean isBaseType(String type) {
|
||||
return type.startsWith("java") || type.startsWith("[Ljava");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user