22
pom.xml
22
pom.xml
@@ -42,6 +42,8 @@
|
||||
<httpcore.version>4.4.14</httpcore.version>
|
||||
<log4j2.version>2.13.3</log4j2.version>
|
||||
<apache.dubbo.version>2.7.5</apache.dubbo.version>
|
||||
<grpc.version>1.16.1</grpc.version>
|
||||
<mockito.version>3.4.0</mockito.version>
|
||||
<curator.version>4.0.1</curator.version>
|
||||
<zookeeper.version>3.5.6</zookeeper.version>
|
||||
</properties>
|
||||
@@ -239,6 +241,26 @@
|
||||
<artifactId>dubbo</artifactId>
|
||||
<version>${apache.dubbo.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- grpc -->
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-all</artifactId>
|
||||
<version>${grpc.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-services</artifactId>
|
||||
<version>${grpc.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.16</version>
|
||||
</dependency>
|
||||
<!-- grpc -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-client</artifactId>
|
||||
|
||||
@@ -36,7 +36,7 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import we.fizz.input.extension.grpc.GrpcInput;
|
||||
import we.fizz.input.extension.dubbo.DubboInput;
|
||||
import we.fizz.input.extension.mysql.MySQLInput;
|
||||
import we.fizz.input.extension.request.RequestInput;
|
||||
@@ -125,6 +125,7 @@ public class ConfigLoader {
|
||||
|
||||
InputFactory.registerInput(RequestInput.TYPE, RequestInput.class);
|
||||
InputFactory.registerInput(MySQLInput.TYPE, MySQLInput.class);
|
||||
InputFactory.registerInput(GrpcInput.TYPE, GrpcInput.class);
|
||||
InputFactory.registerInput(DubboInput.TYPE, DubboInput.class);
|
||||
Pipeline pipeline = new Pipeline();
|
||||
pipeline.setApplicationContext(appContext);
|
||||
|
||||
@@ -75,4 +75,8 @@ public class InputConfig {
|
||||
public void parse(){
|
||||
|
||||
}
|
||||
|
||||
public Map<String, Object> getCondition() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
15
src/main/java/we/fizz/input/extension/grpc/GRPCResponse.java
Normal file
15
src/main/java/we/fizz/input/extension/grpc/GRPCResponse.java
Normal file
@@ -0,0 +1,15 @@
|
||||
package we.fizz.input.extension.grpc;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import we.fizz.input.RPCResponse;
|
||||
|
||||
public class GRPCResponse extends RPCResponse {
|
||||
private HttpStatus statusCode;
|
||||
public void setStatus(HttpStatus statusCode) {
|
||||
this.statusCode = statusCode;
|
||||
}
|
||||
|
||||
public HttpStatus getStatusCode() {
|
||||
return statusCode;
|
||||
}
|
||||
}
|
||||
54
src/main/java/we/fizz/input/extension/grpc/GrpcInput.java
Normal file
54
src/main/java/we/fizz/input/extension/grpc/GrpcInput.java
Normal file
@@ -0,0 +1,54 @@
|
||||
package we.fizz.input.extension.grpc;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.constants.CommonConstants;
|
||||
import we.fizz.input.*;
|
||||
import we.proxy.grpc.GrpcGenericService;
|
||||
import we.proxy.grpc.GrpcInterfaceDeclaration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class GrpcInput extends RPCInput implements IInput {
|
||||
static public InputType TYPE = new InputType("GRPC");
|
||||
@Override
|
||||
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
|
||||
GrpcInputConfig config = (GrpcInputConfig) aConfig;
|
||||
|
||||
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
|
||||
Map<String, String> attachments = (Map<String, String>) request.get("attachments");
|
||||
ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext();
|
||||
String body = (String)request.get("body");
|
||||
String url = (String)request.get("url");
|
||||
|
||||
GrpcGenericService proxy = applicationContext.getBean(GrpcGenericService.class);
|
||||
GrpcInterfaceDeclaration declaration = new GrpcInterfaceDeclaration();
|
||||
declaration.setEndpoint(url);
|
||||
declaration.setServiceName(config.getServiceName());
|
||||
declaration.setMethod(config.getMethod());
|
||||
declaration.setTimeout(timeout);
|
||||
HashMap<String, Object> contextAttachment = null;
|
||||
if (attachments == null){
|
||||
contextAttachment = new HashMap<String, Object>();
|
||||
} else {
|
||||
contextAttachment = new HashMap<String, Object>(attachments);
|
||||
}
|
||||
if (inputContext.getStepContext() != null && inputContext.getStepContext().getTraceId() != null){
|
||||
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
|
||||
}
|
||||
|
||||
Mono<Object> proxyResponse = proxy.send(body, declaration, contextAttachment);
|
||||
return proxyResponse.flatMap(cr->{
|
||||
GRPCResponse response = new GRPCResponse();
|
||||
String responseStr = JSON.toJSONString(cr);
|
||||
response.setBodyMono(Mono.just(responseStr));
|
||||
return Mono.just(response);
|
||||
});
|
||||
}
|
||||
|
||||
public static Class inputConfigClass (){
|
||||
return GrpcInputConfig.class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package we.fizz.input.extension.grpc;
|
||||
|
||||
import we.fizz.input.InputConfig;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class GrpcInputConfig extends InputConfig {
|
||||
private int timeout;
|
||||
private String serviceName;
|
||||
private String method;
|
||||
public GrpcInputConfig(Map configMap) {
|
||||
super(configMap);
|
||||
}
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public void setServiceName(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
public void setMethod(String method) {
|
||||
this.method = method;
|
||||
}
|
||||
}
|
||||
61
src/main/java/we/proxy/grpc/GrpcGenericService.java
Normal file
61
src/main/java/we/proxy/grpc/GrpcGenericService.java
Normal file
@@ -0,0 +1,61 @@
|
||||
package we.proxy.grpc;
|
||||
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ManagedChannel;
|
||||
import org.apache.dubbo.rpc.service.GenericException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.fizz.exception.FizzException;
|
||||
import static io.grpc.CallOptions.DEFAULT;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import we.proxy.grpc.client.GrpcProxyClient;
|
||||
import we.proxy.grpc.client.core.GrpcMethodDefinition;
|
||||
import we.proxy.grpc.client.utils.ChannelFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.parseToMethodDefinition;
|
||||
|
||||
@Service
|
||||
public class GrpcGenericService {
|
||||
|
||||
@Autowired
|
||||
private GrpcProxyClient grpcProxyClient;
|
||||
/**
|
||||
* Generic invoke.
|
||||
*
|
||||
* @param payload the json string body
|
||||
* @param grpcInterfaceDeclaration the interface declaration
|
||||
* @return the mono object
|
||||
* @throws FizzException the fizz runtime exception
|
||||
*/
|
||||
public Mono<Object> send(final String payload, final GrpcInterfaceDeclaration grpcInterfaceDeclaration, HashMap<String, Object> attachments ) {
|
||||
GrpcMethodDefinition methodDefinition = parseToMethodDefinition(grpcInterfaceDeclaration.getServiceName());
|
||||
HostAndPort endPoint = HostAndPort.fromString(grpcInterfaceDeclaration.getEndpoint());
|
||||
if (endPoint == null) {
|
||||
throw new RuntimeException("can't find target endpoint");
|
||||
}
|
||||
Map<String, Object> metaHeaderMap = attachments;
|
||||
ManagedChannel channel = null;
|
||||
try {
|
||||
channel = ChannelFactory.create(endPoint, metaHeaderMap);
|
||||
CallOptions calloptions = DEFAULT;
|
||||
calloptions.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS);
|
||||
ListenableFuture<Void> future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, DEFAULT, singletonList(payload));
|
||||
return Mono.fromFuture(new ListenableFutureAdapter(future).getCompletableFuture().thenApply(ret -> {
|
||||
return ret;
|
||||
})).onErrorMap(
|
||||
exception -> exception instanceof GenericException ? new FizzException(((GenericException) exception).getExceptionMessage()) : new FizzException((Throwable) exception));
|
||||
} finally {
|
||||
if (channel != null) {
|
||||
channel.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
44
src/main/java/we/proxy/grpc/GrpcInterfaceDeclaration.java
Normal file
44
src/main/java/we/proxy/grpc/GrpcInterfaceDeclaration.java
Normal file
@@ -0,0 +1,44 @@
|
||||
package we.proxy.grpc;
|
||||
|
||||
public class GrpcInterfaceDeclaration {
|
||||
private String method;
|
||||
|
||||
public String getEndpoint() {
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public void setEndpoint(String endpoint) {
|
||||
this.endpoint = endpoint;
|
||||
}
|
||||
|
||||
private String endpoint;
|
||||
private String serviceName;
|
||||
private int timeout;
|
||||
|
||||
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public void setServiceName(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public void setMethod(String method) {
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
47
src/main/java/we/proxy/grpc/ListenableFutureAdapter.java
Normal file
47
src/main/java/we/proxy/grpc/ListenableFutureAdapter.java
Normal file
@@ -0,0 +1,47 @@
|
||||
package we.proxy.grpc;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class ListenableFutureAdapter<T> {
|
||||
|
||||
private final ListenableFuture<T> listenableFuture;
|
||||
private final CompletableFuture<T> completableFuture;
|
||||
|
||||
public ListenableFutureAdapter(ListenableFuture<T> listenableFuture) {
|
||||
this.listenableFuture = listenableFuture;
|
||||
this.completableFuture = new CompletableFuture<T>() {
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning);
|
||||
super.cancel(cancelled);
|
||||
return cancelled;
|
||||
}
|
||||
};
|
||||
|
||||
Futures.addCallback(this.listenableFuture, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
completableFuture.complete(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
completableFuture.completeExceptionally(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<T> getCompletableFuture() {
|
||||
return completableFuture;
|
||||
}
|
||||
|
||||
public static final <T> CompletableFuture<T> toCompletable(ListenableFuture<T> listenableFuture) {
|
||||
ListenableFutureAdapter<T> listenableFutureAdapter = new ListenableFutureAdapter<>(listenableFuture);
|
||||
return listenableFutureAdapter.getCompletableFuture();
|
||||
}
|
||||
|
||||
}
|
||||
48
src/main/java/we/proxy/grpc/client/CallParams.java
Normal file
48
src/main/java/we/proxy/grpc/client/CallParams.java
Normal file
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 liuzhengyang
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
package we.proxy.grpc.client;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
*/
|
||||
@Builder
|
||||
@Getter
|
||||
public class CallParams {
|
||||
private MethodDescriptor methodDescriptor;
|
||||
private Channel channel;
|
||||
private CallOptions callOptions;
|
||||
private List<DynamicMessage> requests;
|
||||
private StreamObserver<DynamicMessage> responseObserver;
|
||||
}
|
||||
57
src/main/java/we/proxy/grpc/client/CallResults.java
Normal file
57
src/main/java/we/proxy/grpc/client/CallResults.java
Normal file
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 liuzhengyang
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
package we.proxy.grpc.client;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
*/
|
||||
public class CallResults {
|
||||
private List<String> results;
|
||||
|
||||
public CallResults() {
|
||||
this.results = new ArrayList<>();
|
||||
}
|
||||
|
||||
public void add(String jsonText) {
|
||||
results.add(jsonText);
|
||||
}
|
||||
|
||||
public List<String> asList() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public Object asJSON() {
|
||||
if (results.size() == 1) {
|
||||
return JSON.parseObject(results.get(0));
|
||||
}
|
||||
return results.stream().map(JSON::parseObject).collect(toList());
|
||||
}
|
||||
}
|
||||
117
src/main/java/we/proxy/grpc/client/GrpcClient.java
Normal file
117
src/main/java/we/proxy/grpc/client/GrpcClient.java
Normal file
@@ -0,0 +1,117 @@
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 liuzhengyang
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
package we.proxy.grpc.client;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
|
||||
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
|
||||
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
|
||||
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
|
||||
//import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
|
||||
import static org.apache.commons.lang3.ObjectUtils.isNotEmpty;
|
||||
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.fetchFullMethodName;
|
||||
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.fetchMethodType;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import we.proxy.grpc.client.core.CompositeStreamObserver;
|
||||
import we.proxy.grpc.client.core.DoneObserver;
|
||||
import we.proxy.grpc.client.core.DynamicMessageMarshaller;
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
*/
|
||||
public class GrpcClient {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
|
||||
|
||||
@Nullable
|
||||
public ListenableFuture<Void> call(CallParams callParams) {
|
||||
checkParams(callParams);
|
||||
MethodType methodType = fetchMethodType(callParams.getMethodDescriptor());
|
||||
List<DynamicMessage> requests = callParams.getRequests();
|
||||
StreamObserver<DynamicMessage> responseObserver = callParams.getResponseObserver();
|
||||
DoneObserver<DynamicMessage> doneObserver = new DoneObserver<>();
|
||||
StreamObserver<DynamicMessage> compositeObserver = CompositeStreamObserver.of(responseObserver, doneObserver);
|
||||
StreamObserver<DynamicMessage> requestObserver;
|
||||
switch (methodType) {
|
||||
case UNARY:
|
||||
asyncUnaryCall(createCall(callParams), requests.get(0), compositeObserver);
|
||||
return doneObserver.getCompletionFuture();
|
||||
case SERVER_STREAMING:
|
||||
asyncServerStreamingCall(createCall(callParams), requests.get(0), compositeObserver);
|
||||
return doneObserver.getCompletionFuture();
|
||||
case CLIENT_STREAMING:
|
||||
requestObserver = asyncClientStreamingCall(createCall(callParams), compositeObserver);
|
||||
requests.forEach(responseObserver::onNext);
|
||||
requestObserver.onCompleted();
|
||||
return doneObserver.getCompletionFuture();
|
||||
case BIDI_STREAMING:
|
||||
requestObserver = asyncBidiStreamingCall(createCall(callParams), compositeObserver);
|
||||
requests.forEach(responseObserver::onNext);
|
||||
requestObserver.onCompleted();
|
||||
return doneObserver.getCompletionFuture();
|
||||
default:
|
||||
logger.info("Unknown methodType:{}", methodType);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void checkParams(CallParams callParams) {
|
||||
checkNotNull(callParams);
|
||||
checkNotNull(callParams.getMethodDescriptor());
|
||||
checkNotNull(callParams.getChannel());
|
||||
checkNotNull(callParams.getCallOptions());
|
||||
checkArgument(isNotEmpty(callParams.getRequests()));
|
||||
checkNotNull(callParams.getResponseObserver());
|
||||
}
|
||||
|
||||
private ClientCall<DynamicMessage, DynamicMessage> createCall(CallParams callParams) {
|
||||
return callParams.getChannel().newCall(createGrpcMethodDescriptor(callParams.getMethodDescriptor()),
|
||||
callParams.getCallOptions());
|
||||
}
|
||||
|
||||
private io.grpc.MethodDescriptor<DynamicMessage, DynamicMessage> createGrpcMethodDescriptor(MethodDescriptor descriptor) {
|
||||
return io.grpc.MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
|
||||
.setType(fetchMethodType(descriptor))
|
||||
.setFullMethodName(fetchFullMethodName(descriptor))
|
||||
.setRequestMarshaller(new DynamicMessageMarshaller(descriptor.getInputType()))
|
||||
.setResponseMarshaller(new DynamicMessageMarshaller(descriptor.getOutputType()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
89
src/main/java/we/proxy/grpc/client/GrpcProxyClient.java
Normal file
89
src/main/java/we/proxy/grpc/client/GrpcProxyClient.java
Normal file
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 liuzhengyang
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
package we.proxy.grpc.client;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.util.JsonFormat.TypeRegistry;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.springframework.stereotype.Service;
|
||||
import we.proxy.grpc.client.core.GrpcMethodDefinition;
|
||||
import we.proxy.grpc.client.core.ServiceResolver;
|
||||
import we.proxy.grpc.client.utils.GrpcReflectionUtils;
|
||||
import we.proxy.grpc.client.utils.MessageWriter;
|
||||
@Service
|
||||
/**
|
||||
* @author zhangjikai
|
||||
* Created on 2018-12-01
|
||||
*/
|
||||
public class GrpcProxyClient {
|
||||
private GrpcClient grpcClient = new GrpcClient();
|
||||
public CallResults invokeMethod(GrpcMethodDefinition definition, Channel channel, CallOptions callOptions,
|
||||
List<String> requestJsonTexts) {
|
||||
|
||||
CallResults results = new CallResults();
|
||||
|
||||
try {
|
||||
this.invokeMethodAsync( definition, channel, callOptions, requestJsonTexts).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException("Caught exception while waiting for rpc", e);
|
||||
}
|
||||
return results;
|
||||
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> invokeMethodAsync(GrpcMethodDefinition definition, Channel channel, CallOptions callOptions,
|
||||
List<String> requestJsonTexts) {
|
||||
FileDescriptorSet fileDescriptorSet = GrpcReflectionUtils.resolveService(channel, definition.getFullServiceName());
|
||||
if (fileDescriptorSet == null) {
|
||||
return null;
|
||||
}
|
||||
ServiceResolver serviceResolver = ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
|
||||
MethodDescriptor methodDescriptor = serviceResolver.resolveServiceMethod(definition);
|
||||
TypeRegistry registry = TypeRegistry.newBuilder().add(serviceResolver.listMessageTypes()).build();
|
||||
List<DynamicMessage> requestMessages = GrpcReflectionUtils.parseToMessages(registry, methodDescriptor.getInputType(),
|
||||
requestJsonTexts);
|
||||
CallResults results = new CallResults();
|
||||
StreamObserver<DynamicMessage> streamObserver = MessageWriter.newInstance(registry, results);
|
||||
CallParams callParams = CallParams.builder()
|
||||
.methodDescriptor(methodDescriptor)
|
||||
.channel(channel)
|
||||
.callOptions(callOptions)
|
||||
.requests(requestMessages)
|
||||
.responseObserver(streamObserver)
|
||||
.build();
|
||||
|
||||
return grpcClient.call(callParams);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
/*
|
||||
Copyright (c) 2016, gRPC Ecosystem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of polyglot nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
||||
/**
|
||||
* A {@link StreamObserver} which groups multiple observers and executes them all.
|
||||
*/
|
||||
public class CompositeStreamObserver<T> implements StreamObserver<T> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(CompositeStreamObserver.class);
|
||||
private final ImmutableList<StreamObserver<T>> observers;
|
||||
|
||||
@SafeVarargs
|
||||
public static <T> CompositeStreamObserver<T> of(StreamObserver<T>... observers) {
|
||||
return new CompositeStreamObserver<>(ImmutableList.copyOf(observers));
|
||||
}
|
||||
|
||||
private CompositeStreamObserver(ImmutableList<StreamObserver<T>> observers) {
|
||||
this.observers = observers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
for (StreamObserver<T> observer : observers) {
|
||||
try {
|
||||
observer.onCompleted();
|
||||
} catch (Throwable t) {
|
||||
logger.error("Exception in composite onComplete, moving on", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
for (StreamObserver<T> observer : observers) {
|
||||
try {
|
||||
observer.onError(t);
|
||||
} catch (Throwable s) {
|
||||
logger.error("Exception in composite onError, moving on", s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T value) {
|
||||
for (StreamObserver<T> observer : observers) {
|
||||
try {
|
||||
observer.onNext(value);
|
||||
} catch (Throwable t) {
|
||||
logger.error("Exception in composite onNext, moving on", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
69
src/main/java/we/proxy/grpc/client/core/DoneObserver.java
Normal file
69
src/main/java/we/proxy/grpc/client/core/DoneObserver.java
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright (c) 2016, gRPC Ecosystem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of polyglot nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
||||
/**
|
||||
* A {@link StreamObserver} holding a future which completes when the rpc terminates.
|
||||
*/
|
||||
public class DoneObserver<T> implements StreamObserver<T> {
|
||||
private final SettableFuture<Void> doneFuture;
|
||||
|
||||
public DoneObserver() {
|
||||
this.doneFuture = SettableFuture.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onCompleted() {
|
||||
doneFuture.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onError(Throwable t) {
|
||||
doneFuture.setException(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T next) {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a future which completes when the rpc finishes. The returned future fails if the rpc
|
||||
* fails.
|
||||
*/
|
||||
public ListenableFuture<Void> getCompletionFuture() {
|
||||
return doneFuture;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
Copyright (c) 2016, gRPC Ecosystem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of polyglot nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import com.google.protobuf.Descriptors.Descriptor;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.ExtensionRegistryLite;
|
||||
|
||||
import io.grpc.MethodDescriptor.Marshaller;
|
||||
|
||||
/**
|
||||
* A {@link Marshaller} for dynamic messages.
|
||||
*/
|
||||
public class DynamicMessageMarshaller implements Marshaller<DynamicMessage> {
|
||||
private final Descriptor messageDescriptor;
|
||||
|
||||
public DynamicMessageMarshaller(Descriptor messageDescriptor) {
|
||||
this.messageDescriptor = messageDescriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamicMessage parse(InputStream inputStream) {
|
||||
try {
|
||||
return DynamicMessage.newBuilder(messageDescriptor)
|
||||
.mergeFrom(inputStream, ExtensionRegistryLite.getEmptyRegistry())
|
||||
.build();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to merge from the supplied input stream", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream stream(DynamicMessage abstractMessage) {
|
||||
return abstractMessage.toByteString().newInput();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 liuzhengyang
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
* Created on 2018-12-16
|
||||
*/
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class GrpcMethodDefinition {
|
||||
private String packageName;
|
||||
private String serviceName;
|
||||
private String methodName;
|
||||
|
||||
public String getFullServiceName() {
|
||||
if (isNotBlank(packageName)) {
|
||||
return packageName + "." + serviceName;
|
||||
}
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public String getFullMethodName() {
|
||||
if (isNotBlank(packageName)) {
|
||||
return packageName + "." + serviceName + "/" + methodName;
|
||||
}
|
||||
return serviceName + "/" + methodName;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,256 @@
|
||||
/*
|
||||
Copyright (c) 2016, gRPC Ecosystem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of polyglot nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.reflection.v1alpha.ListServiceResponse;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionResponse.MessageResponseCase;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
||||
public class ServerReflectionClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ServerReflectionClient.class);
|
||||
private static final long LIST_RPC_DEADLINE_MS = 10_000;
|
||||
private static final long LOOKUP_RPC_DEADLINE_MS = 10_000;
|
||||
private static final ServerReflectionRequest LIST_SERVICES_REQUEST =
|
||||
ServerReflectionRequest.newBuilder()
|
||||
.setListServices("") // Not sure what this is for, appears to be ignored.
|
||||
.build();
|
||||
|
||||
private final Channel channel;
|
||||
|
||||
/**
|
||||
* Returns a new reflection client using the supplied channel.
|
||||
*/
|
||||
public static ServerReflectionClient create(Channel channel) {
|
||||
return new ServerReflectionClient(channel);
|
||||
}
|
||||
|
||||
private ServerReflectionClient(Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks the remote server to list its services and completes when the server responds.
|
||||
*/
|
||||
public ListenableFuture<ImmutableList<String>> listServices() {
|
||||
ListServicesHandler rpcHandler = new ListServicesHandler();
|
||||
StreamObserver<ServerReflectionRequest> requestStream = ServerReflectionGrpc.newStub(channel)
|
||||
.withDeadlineAfter(LIST_RPC_DEADLINE_MS, TimeUnit.MILLISECONDS)
|
||||
.serverReflectionInfo(rpcHandler);
|
||||
return rpcHandler.start(requestStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link FileDescriptorSet} containing all the transitive dependencies of the supplied
|
||||
* service, as provided by the remote server.
|
||||
*/
|
||||
public ListenableFuture<FileDescriptorSet> lookupService(String serviceName) {
|
||||
LookupServiceHandler rpcHandler = new LookupServiceHandler(serviceName);
|
||||
StreamObserver<ServerReflectionRequest> requestStream = ServerReflectionGrpc.newStub(channel)
|
||||
.withDeadlineAfter(LOOKUP_RPC_DEADLINE_MS, TimeUnit.MILLISECONDS)
|
||||
.serverReflectionInfo(rpcHandler);
|
||||
return rpcHandler.start(requestStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the rpc life cycle of a single list operation.
|
||||
*/
|
||||
private static class ListServicesHandler implements StreamObserver<ServerReflectionResponse> {
|
||||
private final SettableFuture<ImmutableList<String>> resultFuture;
|
||||
private StreamObserver<ServerReflectionRequest> requestStream;
|
||||
|
||||
private ListServicesHandler() {
|
||||
resultFuture = SettableFuture.create();
|
||||
}
|
||||
|
||||
ListenableFuture<ImmutableList<String>> start(
|
||||
StreamObserver<ServerReflectionRequest> requestStream) {
|
||||
this.requestStream = requestStream;
|
||||
requestStream.onNext(LIST_SERVICES_REQUEST);
|
||||
return resultFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ServerReflectionResponse serverReflectionResponse) {
|
||||
MessageResponseCase responseCase = serverReflectionResponse.getMessageResponseCase();
|
||||
switch (responseCase) {
|
||||
case LIST_SERVICES_RESPONSE:
|
||||
handleListServiceResponse(serverReflectionResponse.getListServicesResponse());
|
||||
break;
|
||||
default:
|
||||
logger.warn("Got unknown reflection response type: " + responseCase);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
resultFuture.setException(new RuntimeException("Error in server reflection rpc while listing services", t));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!resultFuture.isDone()) {
|
||||
logger.error("Unexpected completion of server reflection rpc while listing services");
|
||||
resultFuture.setException(new RuntimeException("Unexpected end of rpc"));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleListServiceResponse(ListServiceResponse response) {
|
||||
ImmutableList.Builder<String> servicesBuilder = ImmutableList.builder();
|
||||
response.getServiceList().forEach(service -> servicesBuilder.add(service.getName()));
|
||||
resultFuture.set(servicesBuilder.build());
|
||||
requestStream.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the rpc life cycle of a single lookup operation.
|
||||
*/
|
||||
private static class LookupServiceHandler implements StreamObserver<ServerReflectionResponse> {
|
||||
private final SettableFuture<FileDescriptorSet> resultFuture;
|
||||
private final String serviceName;
|
||||
private final HashSet<String> requestedDescriptors;
|
||||
private final HashMap<String, FileDescriptorProto> resolvedDescriptors;
|
||||
private StreamObserver<ServerReflectionRequest> requestStream;
|
||||
|
||||
// Used to notice when we've received all the files we've asked for and we can end the rpc.
|
||||
private int outstandingRequests;
|
||||
|
||||
private LookupServiceHandler(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
this.resultFuture = SettableFuture.create();
|
||||
this.resolvedDescriptors = new HashMap<>();
|
||||
this.requestedDescriptors = new HashSet<>();
|
||||
this.outstandingRequests = 0;
|
||||
}
|
||||
|
||||
ListenableFuture<FileDescriptorSet> start(
|
||||
StreamObserver<ServerReflectionRequest> requestStream) {
|
||||
this.requestStream = requestStream;
|
||||
requestStream.onNext(requestForSymbol(serviceName));
|
||||
++outstandingRequests;
|
||||
return resultFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ServerReflectionResponse response) {
|
||||
MessageResponseCase responseCase = response.getMessageResponseCase();
|
||||
switch (responseCase) {
|
||||
case FILE_DESCRIPTOR_RESPONSE:
|
||||
ImmutableSet<FileDescriptorProto> descriptors =
|
||||
parseDescriptors(response.getFileDescriptorResponse().getFileDescriptorProtoList());
|
||||
descriptors.forEach(d -> resolvedDescriptors.put(d.getName(), d));
|
||||
descriptors.forEach(this::processDependencies);
|
||||
break;
|
||||
default:
|
||||
logger.warn("Got unknown reflection response type: " + responseCase);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
resultFuture.setException(new RuntimeException("Reflection lookup rpc failed for: " + serviceName, t));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!resultFuture.isDone()) {
|
||||
logger.error("Unexpected completion of the server reflection rpc");
|
||||
resultFuture.setException(new RuntimeException("Unexpected end of rpc"));
|
||||
}
|
||||
}
|
||||
|
||||
private ImmutableSet<FileDescriptorProto> parseDescriptors(List<ByteString> descriptorBytes) {
|
||||
ImmutableSet.Builder<FileDescriptorProto> resultBuilder = ImmutableSet.builder();
|
||||
for (ByteString fileDescriptorBytes : descriptorBytes) {
|
||||
try {
|
||||
resultBuilder.add(FileDescriptorProto.parseFrom(fileDescriptorBytes));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.warn("Failed to parse bytes as file descriptor proto");
|
||||
}
|
||||
}
|
||||
return resultBuilder.build();
|
||||
}
|
||||
|
||||
private void processDependencies(FileDescriptorProto fileDescriptor) {
|
||||
logger.debug("Processing deps of descriptor: " + fileDescriptor.getName());
|
||||
fileDescriptor.getDependencyList().forEach(dep -> {
|
||||
if (!resolvedDescriptors.containsKey(dep) && !requestedDescriptors.contains(dep)) {
|
||||
requestedDescriptors.add(dep);
|
||||
++outstandingRequests;
|
||||
requestStream.onNext(requestForDescriptor(dep));
|
||||
}
|
||||
});
|
||||
|
||||
--outstandingRequests;
|
||||
if (outstandingRequests == 0) {
|
||||
logger.debug("Retrieved service definition for [{}] by reflection", serviceName);
|
||||
resultFuture.set(FileDescriptorSet.newBuilder()
|
||||
.addAllFile(resolvedDescriptors.values())
|
||||
.build());
|
||||
requestStream.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
private static ServerReflectionRequest requestForDescriptor(String name) {
|
||||
return ServerReflectionRequest.newBuilder()
|
||||
.setFileByFilename(name)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ServerReflectionRequest requestForSymbol(String symbol) {
|
||||
return ServerReflectionRequest.newBuilder()
|
||||
.setFileContainingSymbol(symbol)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
179
src/main/java/we/proxy/grpc/client/core/ServiceResolver.java
Normal file
179
src/main/java/we/proxy/grpc/client/core/ServiceResolver.java
Normal file
@@ -0,0 +1,179 @@
|
||||
/*
|
||||
Copyright (c) 2016, gRPC Ecosystem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of polyglot nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
package we.proxy.grpc.client.core;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
|
||||
import com.google.protobuf.Descriptors.Descriptor;
|
||||
import com.google.protobuf.Descriptors.DescriptorValidationException;
|
||||
import com.google.protobuf.Descriptors.FileDescriptor;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Descriptors.ServiceDescriptor;
|
||||
|
||||
|
||||
/**
|
||||
* A locator used to read proto file descriptors and extract method definitions.
|
||||
*/
|
||||
public class ServiceResolver {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ServiceResolver.class);
|
||||
private final ImmutableList<FileDescriptor> fileDescriptors;
|
||||
|
||||
/**
|
||||
* Creates a resolver which searches the supplied {@link FileDescriptorSet}.
|
||||
*/
|
||||
public static ServiceResolver fromFileDescriptorSet(FileDescriptorSet descriptorSet) {
|
||||
ImmutableMap<String, FileDescriptorProto> descriptorProtoIndex =
|
||||
computeDescriptorProtoIndex(descriptorSet);
|
||||
Map<String, FileDescriptor> descriptorCache = new HashMap<>();
|
||||
|
||||
ImmutableList.Builder<FileDescriptor> result = ImmutableList.builder();
|
||||
for (FileDescriptorProto descriptorProto : descriptorSet.getFileList()) {
|
||||
try {
|
||||
result.add(descriptorFromProto(descriptorProto, descriptorProtoIndex, descriptorCache));
|
||||
} catch (DescriptorValidationException e) {
|
||||
logger.warn("Skipped descriptor " + descriptorProto.getName() + " due to error", e);
|
||||
}
|
||||
}
|
||||
return new ServiceResolver(result.build());
|
||||
}
|
||||
|
||||
private ServiceResolver(Iterable<FileDescriptor> fileDescriptors) {
|
||||
this.fileDescriptors = ImmutableList.copyOf(fileDescriptors);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lists all of the services found in the file descriptors
|
||||
*/
|
||||
public Iterable<ServiceDescriptor> listServices() {
|
||||
ArrayList<ServiceDescriptor> serviceDescriptors = new ArrayList<ServiceDescriptor>();
|
||||
for (FileDescriptor fileDescriptor : fileDescriptors) {
|
||||
serviceDescriptors.addAll(fileDescriptor.getServices());
|
||||
}
|
||||
return serviceDescriptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lists all the known message types.
|
||||
*/
|
||||
public ImmutableSet<Descriptor> listMessageTypes() {
|
||||
ImmutableSet.Builder<Descriptor> resultBuilder = ImmutableSet.builder();
|
||||
fileDescriptors.forEach(d -> resultBuilder.addAll(d.getMessageTypes()));
|
||||
return resultBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the descriptor of a protobuf method with the supplied grpc method name. If the method
|
||||
* cannot be found, this throws {@link IllegalArgumentException}.
|
||||
*/
|
||||
public MethodDescriptor resolveServiceMethod(GrpcMethodDefinition definition) {
|
||||
|
||||
ServiceDescriptor service = findService(definition.getPackageName(), definition.getServiceName());
|
||||
MethodDescriptor method = service.findMethodByName(definition.getMethodName());
|
||||
if (method == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to find method " + definition.getMethodName()
|
||||
+ " in service " + definition.getServiceName());
|
||||
}
|
||||
return method;
|
||||
}
|
||||
|
||||
private ServiceDescriptor findService(String packageName, String serviceName) {
|
||||
// TODO(dino): Consider creating an index.
|
||||
for (FileDescriptor fileDescriptor : fileDescriptors) {
|
||||
if (!fileDescriptor.getPackage().equals(packageName)) {
|
||||
// Package does not match this file, ignore.
|
||||
continue;
|
||||
}
|
||||
|
||||
ServiceDescriptor serviceDescriptor = fileDescriptor.findServiceByName(serviceName);
|
||||
if (serviceDescriptor != null) {
|
||||
return serviceDescriptor;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("Unable to find service with name: " + serviceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map from descriptor proto name as found inside the descriptors to protos.
|
||||
*/
|
||||
private static ImmutableMap<String, FileDescriptorProto> computeDescriptorProtoIndex(
|
||||
FileDescriptorSet fileDescriptorSet) {
|
||||
ImmutableMap.Builder<String, FileDescriptorProto> resultBuilder = ImmutableMap.builder();
|
||||
for (FileDescriptorProto descriptorProto : fileDescriptorSet.getFileList()) {
|
||||
resultBuilder.put(descriptorProto.getName(), descriptorProto);
|
||||
}
|
||||
return resultBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively constructs file descriptors for all dependencies of the supplied proto and returns
|
||||
* a {@link FileDescriptor} for the supplied proto itself. For maximal efficiency, reuse the
|
||||
* descriptorCache argument across calls.
|
||||
*/
|
||||
private static FileDescriptor descriptorFromProto(
|
||||
FileDescriptorProto descriptorProto,
|
||||
ImmutableMap<String, FileDescriptorProto> descriptorProtoIndex,
|
||||
Map<String, FileDescriptor> descriptorCache) throws DescriptorValidationException {
|
||||
// First, check the cache.
|
||||
String descriptorName = descriptorProto.getName();
|
||||
if (descriptorCache.containsKey(descriptorName)) {
|
||||
return descriptorCache.get(descriptorName);
|
||||
}
|
||||
|
||||
// Then, fetch all the required dependencies recursively.
|
||||
ImmutableList.Builder<FileDescriptor> dependencies = ImmutableList.builder();
|
||||
for (String dependencyName : descriptorProto.getDependencyList()) {
|
||||
if (!descriptorProtoIndex.containsKey(dependencyName)) {
|
||||
throw new IllegalArgumentException("Could not find dependency: " + dependencyName);
|
||||
}
|
||||
FileDescriptorProto dependencyProto = descriptorProtoIndex.get(dependencyName);
|
||||
dependencies.add(descriptorFromProto(dependencyProto, descriptorProtoIndex, descriptorCache));
|
||||
}
|
||||
|
||||
// Finally, construct the actual descriptor.
|
||||
FileDescriptor[] empty = new FileDescriptor[0];
|
||||
return FileDescriptor.buildFrom(descriptorProto, dependencies.build().toArray(empty));
|
||||
}
|
||||
|
||||
public List<FileDescriptor> getFileDescriptors() {
|
||||
return fileDescriptors;
|
||||
}
|
||||
}
|
||||
57
src/main/java/we/proxy/grpc/client/utils/ChannelFactory.java
Normal file
57
src/main/java/we/proxy/grpc/client/utils/ChannelFactory.java
Normal file
@@ -0,0 +1,57 @@
|
||||
package we.proxy.grpc.client.utils;
|
||||
|
||||
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.net.HostAndPort;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Metadata.Key;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.netty.NegotiationType;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
|
||||
/**
|
||||
* Knows how to construct grpc channels.
|
||||
*/
|
||||
public class ChannelFactory {
|
||||
|
||||
public static ManagedChannel create(HostAndPort endpoint) {
|
||||
return create(endpoint, emptyMap());
|
||||
}
|
||||
|
||||
public static ManagedChannel create(HostAndPort endpoint, Map<String, Object> metaDataMap) {
|
||||
return NettyChannelBuilder.forAddress(endpoint.getHostText(), endpoint.getPort())
|
||||
.negotiationType(NegotiationType.PLAINTEXT)
|
||||
.intercept(metadataInterceptor(metaDataMap))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ClientInterceptor metadataInterceptor(Map<String, Object> metaDataMap) {
|
||||
return new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, final Channel next) {
|
||||
|
||||
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
|
||||
@Override
|
||||
protected void checkedStart(Listener<RespT> responseListener, Metadata headers) {
|
||||
metaDataMap.forEach((k, v) -> {
|
||||
Key<String> mKey = Key.of(k, ASCII_STRING_MARSHALLER);
|
||||
headers.put(mKey, String.valueOf(v));
|
||||
});
|
||||
delegate().start(responseListener, headers);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package we.proxy.grpc.client.utils;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static io.grpc.MethodDescriptor.generateFullMethodName;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
//import static org.apache.commons.collections4.CollectionUtils.isEmpty;
|
||||
import static org.apache.commons.lang3.ObjectUtils.isEmpty;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
|
||||
import com.google.protobuf.Descriptors.Descriptor;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.google.protobuf.util.JsonFormat.Parser;
|
||||
import com.google.protobuf.util.JsonFormat.TypeRegistry;
|
||||
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.Status;
|
||||
import we.proxy.grpc.client.core.GrpcMethodDefinition;
|
||||
import we.proxy.grpc.client.core.ServerReflectionClient;
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
*/
|
||||
public class GrpcReflectionUtils {
|
||||
private static final Logger logger = LoggerFactory.getLogger(GrpcReflectionUtils.class);
|
||||
|
||||
public static List<FileDescriptorSet> resolveServices(Channel channel) {
|
||||
ServerReflectionClient serverReflectionClient = ServerReflectionClient.create(channel);
|
||||
try {
|
||||
List<String> services = serverReflectionClient.listServices().get();
|
||||
if (isEmpty(services)) {
|
||||
logger.info("Can't find services by channel {}", channel);
|
||||
return emptyList();
|
||||
}
|
||||
return services.stream().map(serviceName -> {
|
||||
ListenableFuture<FileDescriptorSet> future = serverReflectionClient.lookupService(serviceName);
|
||||
try {
|
||||
return future.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
logger.error("Get {} fileDescriptor occurs error", serviceName, e);
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull).collect(toList());
|
||||
} catch (Throwable t) {
|
||||
logger.error("Exception resolve service", t);
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
|
||||
public static FileDescriptorSet resolveService(Channel channel, String serviceName) {
|
||||
ServerReflectionClient reflectionClient = ServerReflectionClient.create(channel);
|
||||
try {
|
||||
List<String> serviceNames = reflectionClient.listServices().get();
|
||||
if (!serviceNames.contains(serviceName)) {
|
||||
throw Status.NOT_FOUND.withDescription(
|
||||
String.format("Remote server does not have service %s. Services: %s", serviceName, serviceNames))
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
return reflectionClient.lookupService(serviceName).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
logger.error("Resolve services get error", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static String fetchFullMethodName(MethodDescriptor methodDescriptor) {
|
||||
String serviceName = methodDescriptor.getService().getFullName();
|
||||
String methodName = methodDescriptor.getName();
|
||||
return generateFullMethodName(serviceName, methodName);
|
||||
}
|
||||
|
||||
public static MethodType fetchMethodType(MethodDescriptor methodDescriptor) {
|
||||
boolean clientStreaming = methodDescriptor.toProto().getClientStreaming();
|
||||
boolean serverStreaming = methodDescriptor.toProto().getServerStreaming();
|
||||
if (clientStreaming && serverStreaming) {
|
||||
return MethodType.BIDI_STREAMING;
|
||||
} else if (!clientStreaming && !serverStreaming) {
|
||||
return MethodType.UNARY;
|
||||
} else if (!clientStreaming) {
|
||||
return MethodType.SERVER_STREAMING;
|
||||
} else {
|
||||
return MethodType.SERVER_STREAMING;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<DynamicMessage> parseToMessages(TypeRegistry registry, Descriptor descriptor,
|
||||
List<String> jsonTexts) {
|
||||
Parser parser = JsonFormat.parser().usingTypeRegistry(registry);
|
||||
List<DynamicMessage> messages = new ArrayList<>();
|
||||
try {
|
||||
for (String jsonText : jsonTexts) {
|
||||
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
|
||||
parser.merge(jsonText, messageBuilder);
|
||||
messages.add(messageBuilder.build());
|
||||
}
|
||||
return messages;
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException("Unable to parse json text", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static GrpcMethodDefinition parseToMethodDefinition(String rawMethodName) {
|
||||
checkArgument(isNotBlank(rawMethodName), "Raw method name can't be empty.");
|
||||
int methodSplitPosition = rawMethodName.lastIndexOf(".");
|
||||
checkArgument(methodSplitPosition != -1, "No package name and service name found.");
|
||||
String methodName = rawMethodName.substring(methodSplitPosition + 1);
|
||||
checkArgument(isNotBlank(methodName), "Method name can't be empty.");
|
||||
String fullServiceName = rawMethodName.substring(0, methodSplitPosition);
|
||||
int serviceSplitPosition = fullServiceName.lastIndexOf(".");
|
||||
String serviceName = fullServiceName.substring(serviceSplitPosition + 1);
|
||||
String packageName = "";
|
||||
if (serviceSplitPosition != -1) {
|
||||
packageName = fullServiceName.substring(0, serviceSplitPosition);
|
||||
}
|
||||
checkArgument(isNotBlank(serviceName), "Service name can't be empty.");
|
||||
return new GrpcMethodDefinition(packageName, serviceName, methodName);
|
||||
}
|
||||
}
|
||||
54
src/main/java/we/proxy/grpc/client/utils/MessageWriter.java
Normal file
54
src/main/java/we/proxy/grpc/client/utils/MessageWriter.java
Normal file
@@ -0,0 +1,54 @@
|
||||
package we.proxy.grpc.client.utils;
|
||||
|
||||
import static com.google.protobuf.util.JsonFormat.TypeRegistry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.google.protobuf.util.JsonFormat.Printer;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import we.proxy.grpc.client.CallResults;
|
||||
|
||||
/**
|
||||
* @author zhangjikai
|
||||
*/
|
||||
public class MessageWriter<T extends Message> implements StreamObserver<T> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(MessageWriter.class);
|
||||
|
||||
private final Printer printer;
|
||||
private final CallResults results;
|
||||
|
||||
private MessageWriter(Printer printer, CallResults results) {
|
||||
this.printer = printer;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
public static <T extends Message> MessageWriter<T> newInstance(TypeRegistry registry, CallResults results){
|
||||
return new MessageWriter<>(
|
||||
JsonFormat.printer().usingTypeRegistry(registry).includingDefaultValueFields(),
|
||||
results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T value) {
|
||||
try {
|
||||
results.add(printer.print(value));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.error("Skipping invalid response message", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
logger.error("Messages write occur errors", t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
logger.info("Messages write complete");
|
||||
}
|
||||
}
|
||||
120
src/test/java/we/fizz/input/GrpcInputMockTests.java
Normal file
120
src/test/java/we/fizz/input/GrpcInputMockTests.java
Normal file
@@ -0,0 +1,120 @@
|
||||
package we.fizz.input;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import we.fizz.group.FastTestGroup;
|
||||
import we.fizz.Step;
|
||||
import we.fizz.StepContext;
|
||||
import we.fizz.StepResponse;
|
||||
import we.fizz.input.extension.grpc.GrpcInput;
|
||||
import we.fizz.input.extension.grpc.GrpcInputConfig;
|
||||
import we.proxy.grpc.GrpcGenericService;
|
||||
import we.proxy.grpc.GrpcInterfaceDeclaration;
|
||||
import we.proxy.grpc.client.GrpcProxyClient;
|
||||
import we.proxy.grpc.client.utils.ChannelFactory;
|
||||
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@Category(FastTestGroup.class)
|
||||
public class GrpcInputMockTests {
|
||||
private static final String URL ="localhost:8090";
|
||||
private static final String SERVICE_NAME = "com.fizzgate.test";
|
||||
private static final String METHOD_NAME = "method";
|
||||
private static final String[] LEFT = new String[]{};
|
||||
|
||||
private static final Object[] RIGHT = new Object[]{};
|
||||
|
||||
private GrpcGenericService proxy;
|
||||
// @Before
|
||||
// public void setup(){
|
||||
// ApacheDubboGenericProxyTests test = new ApacheDubboGenericProxyTests();
|
||||
// proxy = test.getMockApachDubbo();
|
||||
// }
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
mockStatic(ChannelFactory.class);
|
||||
GrpcInterfaceDeclaration declaration = new GrpcInterfaceDeclaration();
|
||||
declaration.setEndpoint(URL);
|
||||
declaration.setServiceName(SERVICE_NAME);
|
||||
declaration.setMethod(METHOD_NAME);
|
||||
declaration.setTimeout(3000);
|
||||
|
||||
GrpcProxyClient grpcProxyClient = mock(GrpcProxyClient.class);
|
||||
|
||||
ListenableFuture<?> future = new ListenableFuture<String>() {
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get() throws InterruptedException, ExecutionException {
|
||||
return "result";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Runnable runnable, Executor executor) {
|
||||
|
||||
}
|
||||
};
|
||||
when(grpcProxyClient.invokeMethodAsync(any(), any(), any(), any())).thenReturn((ListenableFuture<Void>) future);
|
||||
|
||||
GrpcGenericService grpcGenericService = new GrpcGenericService();
|
||||
ReflectionTestUtils.setField(grpcGenericService, "grpcProxyClient", grpcProxyClient);
|
||||
|
||||
ConfigurableApplicationContext applicationContext = mock(ConfigurableApplicationContext.class);
|
||||
when(applicationContext.getBean(GrpcGenericService.class)).thenReturn(grpcGenericService);
|
||||
|
||||
Step step = mock(Step.class);
|
||||
when(step.getCurrentApplicationContext()).thenReturn(applicationContext);
|
||||
|
||||
StepResponse stepResponse = new StepResponse(step, null, new HashMap<String, Map<String, Object>>());
|
||||
GrpcInputConfig config = mock(GrpcInputConfig.class);
|
||||
when(config.getServiceName()).thenReturn(SERVICE_NAME);
|
||||
InputFactory.registerInput(GrpcInput.TYPE, GrpcInput.class);
|
||||
GrpcInput grpcInput = (GrpcInput)InputFactory.createInput(GrpcInput.TYPE.toString());
|
||||
HashMap <String, Object>request = new HashMap <String, Object>();
|
||||
request.put("url",URL);
|
||||
ReflectionTestUtils.setField(grpcInput, "request", request);
|
||||
grpcInput.setName("input1");
|
||||
grpcInput.setWeakStep(new SoftReference<>(step));
|
||||
grpcInput.setStepResponse(stepResponse);
|
||||
grpcInput.setConfig(config);
|
||||
StepContext stepContext = mock(StepContext.class);
|
||||
stepContext.put("step1", stepResponse);
|
||||
InputContext context = new InputContext(stepContext, null);
|
||||
grpcInput.beforeRun(context);
|
||||
|
||||
grpcInput.run();
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user