implement grpc.

This commit is contained in:
linwaiwai
2021-01-14 16:58:30 +08:00
parent 61683abfa7
commit 9fe7f38daa
23 changed files with 1664 additions and 2 deletions

21
pom.xml
View File

@@ -42,6 +42,8 @@
<httpcore.version>4.4.14</httpcore.version>
<log4j2.version>2.13.3</log4j2.version>
<apache.dubbo.version>2.7.5</apache.dubbo.version>
<grpc.version>1.16.1</grpc.version>
<mockito.version>3.4.0</mockito.version>
</properties>
<dependencyManagement>
@@ -249,6 +251,25 @@
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>-->
<!-- grpc -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!-- grpc -->
</dependencies>
<build>

View File

@@ -36,6 +36,7 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import we.fizz.input.extension.grpc.GrpcInput;
import we.fizz.input.extension.mysql.MySQLInput;
import we.fizz.input.extension.request.RequestInput;
@@ -121,6 +122,7 @@ public class ConfigLoader {
ONode cfgNode = ONode.loadStr(configStr);
InputFactory.registerInput(RequestInput.TYPE, RequestInput.class);
InputFactory.registerInput(MySQLInput.TYPE, MySQLInput.class);
InputFactory.registerInput(GrpcInput.TYPE, GrpcInput.class);
Pipeline pipeline = new Pipeline();
pipeline.setApplicationContext(appContext);

View File

@@ -0,0 +1,55 @@
package we.fizz.input.extension.grpc;
import com.alibaba.fastjson.JSON;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.publisher.Mono;
import we.constants.CommonConstants;
import we.fizz.input.*;
import we.fizz.input.extension.dubbo.DubboRPCResponse;
import we.proxy.grpc.GrpcGenericService;
import we.proxy.grpc.GrpcInterfaceDeclaration;
import java.util.HashMap;
import java.util.Map;
public class GrpcInput extends RPCInput {
static public InputType TYPE = new InputType("GRPC");
@Override
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
GrpcInputConfig config = (GrpcInputConfig) aConfig;
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
Map<String, String> attachments = (Map<String, String>) request.get("attachments");
ConfigurableApplicationContext applicationContext = this.getCurrentApplicationContext();
String body = (String)request.get("body");
String url = (String)request.get("url");
GrpcGenericService proxy = applicationContext.getBean(GrpcGenericService.class);
GrpcInterfaceDeclaration declaration = new GrpcInterfaceDeclaration();
declaration.setEndpoint(url);
declaration.setServiceName(config.getServiceName());
declaration.setMethod(config.getMethod());
declaration.setTimeout(timeout);
HashMap<String, Object> contextAttachment = null;
if (attachments == null){
contextAttachment = new HashMap<String, Object>();
} else {
contextAttachment = new HashMap<String, Object>(attachments);
}
if (inputContext.getStepContext() != null && inputContext.getStepContext().getTraceId() != null){
contextAttachment.put(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
}
Mono<Object> proxyResponse = proxy.send(body, declaration, contextAttachment);
return proxyResponse.flatMap(cr->{
DubboRPCResponse response = new DubboRPCResponse();
String responseStr = JSON.toJSONString(cr);
response.setBodyMono(Mono.just(responseStr));
return Mono.just(response);
});
}
public static Class inputConfigClass (){
return GrpcInputConfig.class;
}
}

View File

@@ -0,0 +1,38 @@
package we.fizz.input.extension.grpc;
import we.fizz.input.InputConfig;
import java.util.Map;
public class GrpcInputConfig extends InputConfig {
private int timeout;
private String serviceName;
private String method;
public GrpcInputConfig(Map configMap) {
super(configMap);
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}

View File

@@ -20,8 +20,6 @@ import we.fizz.input.IInput;
import we.fizz.input.Input;
import we.fizz.input.InputType;
import we.fizz.input.extension.mysql.MySQLInputConfig;
/**
*
* @author linwaiwai

View File

@@ -26,6 +26,7 @@ import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.util.UriComponentsBuilder;
import we.fizz.input.InputConfig;
import we.fizz.input.InputType;
/**
*
@@ -34,6 +35,7 @@ import we.fizz.input.InputConfig;
*
*/
public class RequestInputConfig extends InputConfig {
private URL url ;
private String method ;
private int timeout = 3;

View File

@@ -0,0 +1,61 @@
package we.proxy.grpc;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import org.apache.dubbo.rpc.service.GenericException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import we.fizz.exception.FizzException;
import static io.grpc.CallOptions.DEFAULT;
import static java.util.Collections.singletonList;
import we.proxy.grpc.client.GrpcProxyClient;
import we.proxy.grpc.client.core.GrpcMethodDefinition;
import we.proxy.grpc.client.utils.ChannelFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.parseToMethodDefinition;
@Service
public class GrpcGenericService {
@Autowired
private GrpcProxyClient grpcProxyClient;
/**
* Generic invoke.
*
* @param payload the json string body
* @param grpcInterfaceDeclaration the interface declaration
* @return the mono object
* @throws FizzException the fizz runtime exception
*/
public Mono<Object> send(final String payload, final GrpcInterfaceDeclaration grpcInterfaceDeclaration, HashMap<String, Object> attachments ) {
GrpcMethodDefinition methodDefinition = parseToMethodDefinition(grpcInterfaceDeclaration.getServiceName());
HostAndPort endPoint = HostAndPort.fromString(grpcInterfaceDeclaration.getEndpoint());
if (endPoint == null) {
throw new RuntimeException("can't find target endpoint");
}
Map<String, Object> metaHeaderMap = attachments;
ManagedChannel channel = null;
try {
channel = ChannelFactory.create(endPoint, metaHeaderMap);
CallOptions calloptions = DEFAULT;
calloptions.withDeadlineAfter(grpcInterfaceDeclaration.getTimeout(), TimeUnit.MILLISECONDS);
ListenableFuture<Void> future = grpcProxyClient.invokeMethodAsync(methodDefinition, channel, DEFAULT, singletonList(payload));
return Mono.fromFuture(new ListenableFutureAdapter(future).getCompletableFuture().thenApply(ret -> {
return ret;
})).onErrorMap(
exception -> exception instanceof GenericException ? new FizzException(((GenericException) exception).getExceptionMessage()) : new FizzException((Throwable) exception));
} finally {
if (channel != null) {
channel.shutdown();
}
}
}
}

View File

@@ -0,0 +1,44 @@
package we.proxy.grpc;
public class GrpcInterfaceDeclaration {
private String method;
public String getEndpoint() {
return endpoint;
}
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
private String endpoint;
private String serviceName;
private int timeout;
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public void setMethod(String method) {
this.method = method;
}
public String getMethod() {
return method;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return timeout;
}
}

View File

@@ -0,0 +1,47 @@
package we.proxy.grpc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CompletableFuture;
public class ListenableFutureAdapter<T> {
private final ListenableFuture<T> listenableFuture;
private final CompletableFuture<T> completableFuture;
public ListenableFutureAdapter(ListenableFuture<T> listenableFuture) {
this.listenableFuture = listenableFuture;
this.completableFuture = new CompletableFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning);
super.cancel(cancelled);
return cancelled;
}
};
Futures.addCallback(this.listenableFuture, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable ex) {
completableFuture.completeExceptionally(ex);
}
});
}
public CompletableFuture<T> getCompletableFuture() {
return completableFuture;
}
public static final <T> CompletableFuture<T> toCompletable(ListenableFuture<T> listenableFuture) {
ListenableFutureAdapter<T> listenableFutureAdapter = new ListenableFutureAdapter<>(listenableFuture);
return listenableFutureAdapter.getCompletableFuture();
}
}

View File

@@ -0,0 +1,48 @@
/*
MIT License
Copyright (c) 2018 liuzhengyang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package we.proxy.grpc.client;
import java.util.List;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.DynamicMessage;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import lombok.Builder;
import lombok.Getter;
/**
* @author zhangjikai
*/
@Builder
@Getter
public class CallParams {
private MethodDescriptor methodDescriptor;
private Channel channel;
private CallOptions callOptions;
private List<DynamicMessage> requests;
private StreamObserver<DynamicMessage> responseObserver;
}

View File

@@ -0,0 +1,57 @@
/*
MIT License
Copyright (c) 2018 liuzhengyang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package we.proxy.grpc.client;
import static java.util.stream.Collectors.toList;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSON;
/**
* @author zhangjikai
*/
public class CallResults {
private List<String> results;
public CallResults() {
this.results = new ArrayList<>();
}
public void add(String jsonText) {
results.add(jsonText);
}
public List<String> asList() {
return results;
}
public Object asJSON() {
if (results.size() == 1) {
return JSON.parseObject(results.get(0));
}
return results.stream().map(JSON::parseObject).collect(toList());
}
}

View File

@@ -0,0 +1,117 @@
/*
MIT License
Copyright (c) 2018 liuzhengyang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package we.proxy.grpc.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
//import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.apache.commons.lang3.ObjectUtils.isNotEmpty;
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.fetchFullMethodName;
import static we.proxy.grpc.client.utils.GrpcReflectionUtils.fetchMethodType;
import java.util.List;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.DynamicMessage;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.stub.StreamObserver;
import we.proxy.grpc.client.core.CompositeStreamObserver;
import we.proxy.grpc.client.core.DoneObserver;
import we.proxy.grpc.client.core.DynamicMessageMarshaller;
/**
* @author zhangjikai
*/
public class GrpcClient {
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
@Nullable
public ListenableFuture<Void> call(CallParams callParams) {
checkParams(callParams);
MethodType methodType = fetchMethodType(callParams.getMethodDescriptor());
List<DynamicMessage> requests = callParams.getRequests();
StreamObserver<DynamicMessage> responseObserver = callParams.getResponseObserver();
DoneObserver<DynamicMessage> doneObserver = new DoneObserver<>();
StreamObserver<DynamicMessage> compositeObserver = CompositeStreamObserver.of(responseObserver, doneObserver);
StreamObserver<DynamicMessage> requestObserver;
switch (methodType) {
case UNARY:
asyncUnaryCall(createCall(callParams), requests.get(0), compositeObserver);
return doneObserver.getCompletionFuture();
case SERVER_STREAMING:
asyncServerStreamingCall(createCall(callParams), requests.get(0), compositeObserver);
return doneObserver.getCompletionFuture();
case CLIENT_STREAMING:
requestObserver = asyncClientStreamingCall(createCall(callParams), compositeObserver);
requests.forEach(responseObserver::onNext);
requestObserver.onCompleted();
return doneObserver.getCompletionFuture();
case BIDI_STREAMING:
requestObserver = asyncBidiStreamingCall(createCall(callParams), compositeObserver);
requests.forEach(responseObserver::onNext);
requestObserver.onCompleted();
return doneObserver.getCompletionFuture();
default:
logger.info("Unknown methodType:{}", methodType);
return null;
}
}
private void checkParams(CallParams callParams) {
checkNotNull(callParams);
checkNotNull(callParams.getMethodDescriptor());
checkNotNull(callParams.getChannel());
checkNotNull(callParams.getCallOptions());
checkArgument(isNotEmpty(callParams.getRequests()));
checkNotNull(callParams.getResponseObserver());
}
private ClientCall<DynamicMessage, DynamicMessage> createCall(CallParams callParams) {
return callParams.getChannel().newCall(createGrpcMethodDescriptor(callParams.getMethodDescriptor()),
callParams.getCallOptions());
}
private io.grpc.MethodDescriptor<DynamicMessage, DynamicMessage> createGrpcMethodDescriptor(MethodDescriptor descriptor) {
return io.grpc.MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setType(fetchMethodType(descriptor))
.setFullMethodName(fetchFullMethodName(descriptor))
.setRequestMarshaller(new DynamicMessageMarshaller(descriptor.getInputType()))
.setResponseMarshaller(new DynamicMessageMarshaller(descriptor.getOutputType()))
.build();
}
}

View File

@@ -0,0 +1,89 @@
/*
MIT License
Copyright (c) 2018 liuzhengyang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package we.proxy.grpc.client;
import java.util.List;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat.TypeRegistry;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;
import we.proxy.grpc.client.core.GrpcMethodDefinition;
import we.proxy.grpc.client.core.ServiceResolver;
import we.proxy.grpc.client.utils.GrpcReflectionUtils;
import we.proxy.grpc.client.utils.MessageWriter;
@Service
/**
* @author zhangjikai
* Created on 2018-12-01
*/
public class GrpcProxyClient {
private GrpcClient grpcClient = new GrpcClient();
public CallResults invokeMethod(GrpcMethodDefinition definition, Channel channel, CallOptions callOptions,
List<String> requestJsonTexts) {
CallResults results = new CallResults();
try {
this.invokeMethodAsync( definition, channel, callOptions, requestJsonTexts).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Caught exception while waiting for rpc", e);
}
return results;
}
public ListenableFuture<Void> invokeMethodAsync(GrpcMethodDefinition definition, Channel channel, CallOptions callOptions,
List<String> requestJsonTexts) {
FileDescriptorSet fileDescriptorSet = GrpcReflectionUtils.resolveService(channel, definition.getFullServiceName());
if (fileDescriptorSet == null) {
return null;
}
ServiceResolver serviceResolver = ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
MethodDescriptor methodDescriptor = serviceResolver.resolveServiceMethod(definition);
TypeRegistry registry = TypeRegistry.newBuilder().add(serviceResolver.listMessageTypes()).build();
List<DynamicMessage> requestMessages = GrpcReflectionUtils.parseToMessages(registry, methodDescriptor.getInputType(),
requestJsonTexts);
CallResults results = new CallResults();
StreamObserver<DynamicMessage> streamObserver = MessageWriter.newInstance(registry, results);
CallParams callParams = CallParams.builder()
.methodDescriptor(methodDescriptor)
.channel(channel)
.callOptions(callOptions)
.requests(requestMessages)
.responseObserver(streamObserver)
.build();
return grpcClient.call(callParams);
}
}

View File

@@ -0,0 +1,87 @@
/*
Copyright (c) 2016, gRPC Ecosystem
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of polyglot nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package we.proxy.grpc.client.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import io.grpc.stub.StreamObserver;
/**
* A {@link StreamObserver} which groups multiple observers and executes them all.
*/
public class CompositeStreamObserver<T> implements StreamObserver<T> {
private static final Logger logger = LoggerFactory.getLogger(CompositeStreamObserver.class);
private final ImmutableList<StreamObserver<T>> observers;
@SafeVarargs
public static <T> CompositeStreamObserver<T> of(StreamObserver<T>... observers) {
return new CompositeStreamObserver<>(ImmutableList.copyOf(observers));
}
private CompositeStreamObserver(ImmutableList<StreamObserver<T>> observers) {
this.observers = observers;
}
@Override
public void onCompleted() {
for (StreamObserver<T> observer : observers) {
try {
observer.onCompleted();
} catch (Throwable t) {
logger.error("Exception in composite onComplete, moving on", t);
}
}
}
@Override
public void onError(Throwable t) {
for (StreamObserver<T> observer : observers) {
try {
observer.onError(t);
} catch (Throwable s) {
logger.error("Exception in composite onError, moving on", s);
}
}
}
@Override
public void onNext(T value) {
for (StreamObserver<T> observer : observers) {
try {
observer.onNext(value);
} catch (Throwable t) {
logger.error("Exception in composite onNext, moving on", t);
}
}
}
}

View File

@@ -0,0 +1,69 @@
/*
Copyright (c) 2016, gRPC Ecosystem
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of polyglot nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package we.proxy.grpc.client.core;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver;
/**
* A {@link StreamObserver} holding a future which completes when the rpc terminates.
*/
public class DoneObserver<T> implements StreamObserver<T> {
private final SettableFuture<Void> doneFuture;
public DoneObserver() {
this.doneFuture = SettableFuture.create();
}
@Override
public synchronized void onCompleted() {
doneFuture.set(null);
}
@Override
public synchronized void onError(Throwable t) {
doneFuture.setException(t);
}
@Override
public void onNext(T next) {
// Do nothing.
}
/**
* Returns a future which completes when the rpc finishes. The returned future fails if the rpc
* fails.
*/
public ListenableFuture<Void> getCompletionFuture() {
return doneFuture;
}
}

View File

@@ -0,0 +1,66 @@
/*
Copyright (c) 2016, gRPC Ecosystem
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of polyglot nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package we.proxy.grpc.client.core;
import java.io.IOException;
import java.io.InputStream;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.ExtensionRegistryLite;
import io.grpc.MethodDescriptor.Marshaller;
/**
* A {@link Marshaller} for dynamic messages.
*/
public class DynamicMessageMarshaller implements Marshaller<DynamicMessage> {
private final Descriptor messageDescriptor;
public DynamicMessageMarshaller(Descriptor messageDescriptor) {
this.messageDescriptor = messageDescriptor;
}
@Override
public DynamicMessage parse(InputStream inputStream) {
try {
return DynamicMessage.newBuilder(messageDescriptor)
.mergeFrom(inputStream, ExtensionRegistryLite.getEmptyRegistry())
.build();
} catch (IOException e) {
throw new RuntimeException("Unable to merge from the supplied input stream", e);
}
}
@Override
public InputStream stream(DynamicMessage abstractMessage) {
return abstractMessage.toByteString().newInput();
}
}

View File

@@ -0,0 +1,60 @@
/*
MIT License
Copyright (c) 2018 liuzhengyang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package we.proxy.grpc.client.core;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* @author zhangjikai
* Created on 2018-12-16
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GrpcMethodDefinition {
private String packageName;
private String serviceName;
private String methodName;
public String getFullServiceName() {
if (isNotBlank(packageName)) {
return packageName + "." + serviceName;
}
return serviceName;
}
public String getFullMethodName() {
if (isNotBlank(packageName)) {
return packageName + "." + serviceName + "/" + methodName;
}
return serviceName + "/" + methodName;
}
}

View File

@@ -0,0 +1,256 @@
/*
Copyright (c) 2016, gRPC Ecosystem
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of polyglot nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package we.proxy.grpc.client.core;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Channel;
import io.grpc.reflection.v1alpha.ListServiceResponse;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.reflection.v1alpha.ServerReflectionResponse.MessageResponseCase;
import io.grpc.stub.StreamObserver;
public class ServerReflectionClient {
private static final Logger logger = LoggerFactory.getLogger(ServerReflectionClient.class);
private static final long LIST_RPC_DEADLINE_MS = 10_000;
private static final long LOOKUP_RPC_DEADLINE_MS = 10_000;
private static final ServerReflectionRequest LIST_SERVICES_REQUEST =
ServerReflectionRequest.newBuilder()
.setListServices("") // Not sure what this is for, appears to be ignored.
.build();
private final Channel channel;
/**
* Returns a new reflection client using the supplied channel.
*/
public static ServerReflectionClient create(Channel channel) {
return new ServerReflectionClient(channel);
}
private ServerReflectionClient(Channel channel) {
this.channel = channel;
}
/**
* Asks the remote server to list its services and completes when the server responds.
*/
public ListenableFuture<ImmutableList<String>> listServices() {
ListServicesHandler rpcHandler = new ListServicesHandler();
StreamObserver<ServerReflectionRequest> requestStream = ServerReflectionGrpc.newStub(channel)
.withDeadlineAfter(LIST_RPC_DEADLINE_MS, TimeUnit.MILLISECONDS)
.serverReflectionInfo(rpcHandler);
return rpcHandler.start(requestStream);
}
/**
* Returns a {@link FileDescriptorSet} containing all the transitive dependencies of the supplied
* service, as provided by the remote server.
*/
public ListenableFuture<FileDescriptorSet> lookupService(String serviceName) {
LookupServiceHandler rpcHandler = new LookupServiceHandler(serviceName);
StreamObserver<ServerReflectionRequest> requestStream = ServerReflectionGrpc.newStub(channel)
.withDeadlineAfter(LOOKUP_RPC_DEADLINE_MS, TimeUnit.MILLISECONDS)
.serverReflectionInfo(rpcHandler);
return rpcHandler.start(requestStream);
}
/**
* Handles the rpc life cycle of a single list operation.
*/
private static class ListServicesHandler implements StreamObserver<ServerReflectionResponse> {
private final SettableFuture<ImmutableList<String>> resultFuture;
private StreamObserver<ServerReflectionRequest> requestStream;
private ListServicesHandler() {
resultFuture = SettableFuture.create();
}
ListenableFuture<ImmutableList<String>> start(
StreamObserver<ServerReflectionRequest> requestStream) {
this.requestStream = requestStream;
requestStream.onNext(LIST_SERVICES_REQUEST);
return resultFuture;
}
@Override
public void onNext(ServerReflectionResponse serverReflectionResponse) {
MessageResponseCase responseCase = serverReflectionResponse.getMessageResponseCase();
switch (responseCase) {
case LIST_SERVICES_RESPONSE:
handleListServiceResponse(serverReflectionResponse.getListServicesResponse());
break;
default:
logger.warn("Got unknown reflection response type: " + responseCase);
break;
}
}
@Override
public void onError(Throwable t) {
resultFuture.setException(new RuntimeException("Error in server reflection rpc while listing services", t));
}
@Override
public void onCompleted() {
if (!resultFuture.isDone()) {
logger.error("Unexpected completion of server reflection rpc while listing services");
resultFuture.setException(new RuntimeException("Unexpected end of rpc"));
}
}
private void handleListServiceResponse(ListServiceResponse response) {
ImmutableList.Builder<String> servicesBuilder = ImmutableList.builder();
response.getServiceList().forEach(service -> servicesBuilder.add(service.getName()));
resultFuture.set(servicesBuilder.build());
requestStream.onCompleted();
}
}
/**
* Handles the rpc life cycle of a single lookup operation.
*/
private static class LookupServiceHandler implements StreamObserver<ServerReflectionResponse> {
private final SettableFuture<FileDescriptorSet> resultFuture;
private final String serviceName;
private final HashSet<String> requestedDescriptors;
private final HashMap<String, FileDescriptorProto> resolvedDescriptors;
private StreamObserver<ServerReflectionRequest> requestStream;
// Used to notice when we've received all the files we've asked for and we can end the rpc.
private int outstandingRequests;
private LookupServiceHandler(String serviceName) {
this.serviceName = serviceName;
this.resultFuture = SettableFuture.create();
this.resolvedDescriptors = new HashMap<>();
this.requestedDescriptors = new HashSet<>();
this.outstandingRequests = 0;
}
ListenableFuture<FileDescriptorSet> start(
StreamObserver<ServerReflectionRequest> requestStream) {
this.requestStream = requestStream;
requestStream.onNext(requestForSymbol(serviceName));
++outstandingRequests;
return resultFuture;
}
@Override
public void onNext(ServerReflectionResponse response) {
MessageResponseCase responseCase = response.getMessageResponseCase();
switch (responseCase) {
case FILE_DESCRIPTOR_RESPONSE:
ImmutableSet<FileDescriptorProto> descriptors =
parseDescriptors(response.getFileDescriptorResponse().getFileDescriptorProtoList());
descriptors.forEach(d -> resolvedDescriptors.put(d.getName(), d));
descriptors.forEach(this::processDependencies);
break;
default:
logger.warn("Got unknown reflection response type: " + responseCase);
break;
}
}
@Override
public void onError(Throwable t) {
resultFuture.setException(new RuntimeException("Reflection lookup rpc failed for: " + serviceName, t));
}
@Override
public void onCompleted() {
if (!resultFuture.isDone()) {
logger.error("Unexpected completion of the server reflection rpc");
resultFuture.setException(new RuntimeException("Unexpected end of rpc"));
}
}
private ImmutableSet<FileDescriptorProto> parseDescriptors(List<ByteString> descriptorBytes) {
ImmutableSet.Builder<FileDescriptorProto> resultBuilder = ImmutableSet.builder();
for (ByteString fileDescriptorBytes : descriptorBytes) {
try {
resultBuilder.add(FileDescriptorProto.parseFrom(fileDescriptorBytes));
} catch (InvalidProtocolBufferException e) {
logger.warn("Failed to parse bytes as file descriptor proto");
}
}
return resultBuilder.build();
}
private void processDependencies(FileDescriptorProto fileDescriptor) {
logger.debug("Processing deps of descriptor: " + fileDescriptor.getName());
fileDescriptor.getDependencyList().forEach(dep -> {
if (!resolvedDescriptors.containsKey(dep) && !requestedDescriptors.contains(dep)) {
requestedDescriptors.add(dep);
++outstandingRequests;
requestStream.onNext(requestForDescriptor(dep));
}
});
--outstandingRequests;
if (outstandingRequests == 0) {
logger.debug("Retrieved service definition for [{}] by reflection", serviceName);
resultFuture.set(FileDescriptorSet.newBuilder()
.addAllFile(resolvedDescriptors.values())
.build());
requestStream.onCompleted();
}
}
private static ServerReflectionRequest requestForDescriptor(String name) {
return ServerReflectionRequest.newBuilder()
.setFileByFilename(name)
.build();
}
private static ServerReflectionRequest requestForSymbol(String symbol) {
return ServerReflectionRequest.newBuilder()
.setFileContainingSymbol(symbol)
.build();
}
}
}

View File

@@ -0,0 +1,179 @@
/*
Copyright (c) 2016, gRPC Ecosystem
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of polyglot nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package we.proxy.grpc.client.core;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Descriptors.ServiceDescriptor;
/**
* A locator used to read proto file descriptors and extract method definitions.
*/
public class ServiceResolver {
private static final Logger logger = LoggerFactory.getLogger(ServiceResolver.class);
private final ImmutableList<FileDescriptor> fileDescriptors;
/**
* Creates a resolver which searches the supplied {@link FileDescriptorSet}.
*/
public static ServiceResolver fromFileDescriptorSet(FileDescriptorSet descriptorSet) {
ImmutableMap<String, FileDescriptorProto> descriptorProtoIndex =
computeDescriptorProtoIndex(descriptorSet);
Map<String, FileDescriptor> descriptorCache = new HashMap<>();
ImmutableList.Builder<FileDescriptor> result = ImmutableList.builder();
for (FileDescriptorProto descriptorProto : descriptorSet.getFileList()) {
try {
result.add(descriptorFromProto(descriptorProto, descriptorProtoIndex, descriptorCache));
} catch (DescriptorValidationException e) {
logger.warn("Skipped descriptor " + descriptorProto.getName() + " due to error", e);
}
}
return new ServiceResolver(result.build());
}
private ServiceResolver(Iterable<FileDescriptor> fileDescriptors) {
this.fileDescriptors = ImmutableList.copyOf(fileDescriptors);
}
/**
* Lists all of the services found in the file descriptors
*/
public Iterable<ServiceDescriptor> listServices() {
ArrayList<ServiceDescriptor> serviceDescriptors = new ArrayList<ServiceDescriptor>();
for (FileDescriptor fileDescriptor : fileDescriptors) {
serviceDescriptors.addAll(fileDescriptor.getServices());
}
return serviceDescriptors;
}
/**
* Lists all the known message types.
*/
public ImmutableSet<Descriptor> listMessageTypes() {
ImmutableSet.Builder<Descriptor> resultBuilder = ImmutableSet.builder();
fileDescriptors.forEach(d -> resultBuilder.addAll(d.getMessageTypes()));
return resultBuilder.build();
}
/**
* Returns the descriptor of a protobuf method with the supplied grpc method name. If the method
* cannot be found, this throws {@link IllegalArgumentException}.
*/
public MethodDescriptor resolveServiceMethod(GrpcMethodDefinition definition) {
ServiceDescriptor service = findService(definition.getPackageName(), definition.getServiceName());
MethodDescriptor method = service.findMethodByName(definition.getMethodName());
if (method == null) {
throw new IllegalArgumentException(
"Unable to find method " + definition.getMethodName()
+ " in service " + definition.getServiceName());
}
return method;
}
private ServiceDescriptor findService(String packageName, String serviceName) {
// TODO(dino): Consider creating an index.
for (FileDescriptor fileDescriptor : fileDescriptors) {
if (!fileDescriptor.getPackage().equals(packageName)) {
// Package does not match this file, ignore.
continue;
}
ServiceDescriptor serviceDescriptor = fileDescriptor.findServiceByName(serviceName);
if (serviceDescriptor != null) {
return serviceDescriptor;
}
}
throw new IllegalArgumentException("Unable to find service with name: " + serviceName);
}
/**
* Returns a map from descriptor proto name as found inside the descriptors to protos.
*/
private static ImmutableMap<String, FileDescriptorProto> computeDescriptorProtoIndex(
FileDescriptorSet fileDescriptorSet) {
ImmutableMap.Builder<String, FileDescriptorProto> resultBuilder = ImmutableMap.builder();
for (FileDescriptorProto descriptorProto : fileDescriptorSet.getFileList()) {
resultBuilder.put(descriptorProto.getName(), descriptorProto);
}
return resultBuilder.build();
}
/**
* Recursively constructs file descriptors for all dependencies of the supplied proto and returns
* a {@link FileDescriptor} for the supplied proto itself. For maximal efficiency, reuse the
* descriptorCache argument across calls.
*/
private static FileDescriptor descriptorFromProto(
FileDescriptorProto descriptorProto,
ImmutableMap<String, FileDescriptorProto> descriptorProtoIndex,
Map<String, FileDescriptor> descriptorCache) throws DescriptorValidationException {
// First, check the cache.
String descriptorName = descriptorProto.getName();
if (descriptorCache.containsKey(descriptorName)) {
return descriptorCache.get(descriptorName);
}
// Then, fetch all the required dependencies recursively.
ImmutableList.Builder<FileDescriptor> dependencies = ImmutableList.builder();
for (String dependencyName : descriptorProto.getDependencyList()) {
if (!descriptorProtoIndex.containsKey(dependencyName)) {
throw new IllegalArgumentException("Could not find dependency: " + dependencyName);
}
FileDescriptorProto dependencyProto = descriptorProtoIndex.get(dependencyName);
dependencies.add(descriptorFromProto(dependencyProto, descriptorProtoIndex, descriptorCache));
}
// Finally, construct the actual descriptor.
FileDescriptor[] empty = new FileDescriptor[0];
return FileDescriptor.buildFrom(descriptorProto, dependencies.build().toArray(empty));
}
public List<FileDescriptor> getFileDescriptors() {
return fileDescriptors;
}
}

View File

@@ -0,0 +1,57 @@
package we.proxy.grpc.client.utils;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static java.util.Collections.emptyMap;
import java.util.Map;
import com.google.common.net.HostAndPort;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
/**
* Knows how to construct grpc channels.
*/
public class ChannelFactory {
public static ManagedChannel create(HostAndPort endpoint) {
return create(endpoint, emptyMap());
}
public static ManagedChannel create(HostAndPort endpoint, Map<String, Object> metaDataMap) {
return NettyChannelBuilder.forAddress(endpoint.getHostText(), endpoint.getPort())
.negotiationType(NegotiationType.PLAINTEXT)
.intercept(metadataInterceptor(metaDataMap))
.build();
}
private static ClientInterceptor metadataInterceptor(Map<String, Object> metaDataMap) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, final Channel next) {
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
protected void checkedStart(Listener<RespT> responseListener, Metadata headers) {
metaDataMap.forEach((k, v) -> {
Key<String> mKey = Key.of(k, ASCII_STRING_MARSHALLER);
headers.put(mKey, String.valueOf(v));
});
delegate().start(responseListener, headers);
}
};
}
};
}
}

View File

@@ -0,0 +1,133 @@
package we.proxy.grpc.client.utils;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
//import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.ObjectUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Parser;
import com.google.protobuf.util.JsonFormat.TypeRegistry;
import io.grpc.Channel;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import we.proxy.grpc.client.core.GrpcMethodDefinition;
import we.proxy.grpc.client.core.ServerReflectionClient;
/**
* @author zhangjikai
*/
public class GrpcReflectionUtils {
private static final Logger logger = LoggerFactory.getLogger(GrpcReflectionUtils.class);
public static List<FileDescriptorSet> resolveServices(Channel channel) {
ServerReflectionClient serverReflectionClient = ServerReflectionClient.create(channel);
try {
List<String> services = serverReflectionClient.listServices().get();
if (isEmpty(services)) {
logger.info("Can't find services by channel {}", channel);
return emptyList();
}
return services.stream().map(serviceName -> {
ListenableFuture<FileDescriptorSet> future = serverReflectionClient.lookupService(serviceName);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Get {} fileDescriptor occurs error", serviceName, e);
return null;
}
}).filter(Objects::nonNull).collect(toList());
} catch (Throwable t) {
logger.error("Exception resolve service", t);
throw new RuntimeException(t);
}
}
public static FileDescriptorSet resolveService(Channel channel, String serviceName) {
ServerReflectionClient reflectionClient = ServerReflectionClient.create(channel);
try {
List<String> serviceNames = reflectionClient.listServices().get();
if (!serviceNames.contains(serviceName)) {
throw Status.NOT_FOUND.withDescription(
String.format("Remote server does not have service %s. Services: %s", serviceName, serviceNames))
.asRuntimeException();
}
return reflectionClient.lookupService(serviceName).get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Resolve services get error", e);
throw new RuntimeException(e);
}
}
public static String fetchFullMethodName(MethodDescriptor methodDescriptor) {
String serviceName = methodDescriptor.getService().getFullName();
String methodName = methodDescriptor.getName();
return generateFullMethodName(serviceName, methodName);
}
public static MethodType fetchMethodType(MethodDescriptor methodDescriptor) {
boolean clientStreaming = methodDescriptor.toProto().getClientStreaming();
boolean serverStreaming = methodDescriptor.toProto().getServerStreaming();
if (clientStreaming && serverStreaming) {
return MethodType.BIDI_STREAMING;
} else if (!clientStreaming && !serverStreaming) {
return MethodType.UNARY;
} else if (!clientStreaming) {
return MethodType.SERVER_STREAMING;
} else {
return MethodType.SERVER_STREAMING;
}
}
public static List<DynamicMessage> parseToMessages(TypeRegistry registry, Descriptor descriptor,
List<String> jsonTexts) {
Parser parser = JsonFormat.parser().usingTypeRegistry(registry);
List<DynamicMessage> messages = new ArrayList<>();
try {
for (String jsonText : jsonTexts) {
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
parser.merge(jsonText, messageBuilder);
messages.add(messageBuilder.build());
}
return messages;
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException("Unable to parse json text", e);
}
}
public static GrpcMethodDefinition parseToMethodDefinition(String rawMethodName) {
checkArgument(isNotBlank(rawMethodName), "Raw method name can't be empty.");
int methodSplitPosition = rawMethodName.lastIndexOf(".");
checkArgument(methodSplitPosition != -1, "No package name and service name found.");
String methodName = rawMethodName.substring(methodSplitPosition + 1);
checkArgument(isNotBlank(methodName), "Method name can't be empty.");
String fullServiceName = rawMethodName.substring(0, methodSplitPosition);
int serviceSplitPosition = fullServiceName.lastIndexOf(".");
String serviceName = fullServiceName.substring(serviceSplitPosition + 1);
String packageName = "";
if (serviceSplitPosition != -1) {
packageName = fullServiceName.substring(0, serviceSplitPosition);
}
checkArgument(isNotBlank(serviceName), "Service name can't be empty.");
return new GrpcMethodDefinition(packageName, serviceName, methodName);
}
}

View File

@@ -0,0 +1,54 @@
package we.proxy.grpc.client.utils;
import static com.google.protobuf.util.JsonFormat.TypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import io.grpc.stub.StreamObserver;
import we.proxy.grpc.client.CallResults;
/**
* @author zhangjikai
*/
public class MessageWriter<T extends Message> implements StreamObserver<T> {
private static final Logger logger = LoggerFactory.getLogger(MessageWriter.class);
private final Printer printer;
private final CallResults results;
private MessageWriter(Printer printer, CallResults results) {
this.printer = printer;
this.results = results;
}
public static <T extends Message> MessageWriter<T> newInstance(TypeRegistry registry, CallResults results){
return new MessageWriter<>(
JsonFormat.printer().usingTypeRegistry(registry).includingDefaultValueFields(),
results);
}
@Override
public void onNext(T value) {
try {
results.add(printer.print(value));
} catch (InvalidProtocolBufferException e) {
logger.error("Skipping invalid response message", e);
}
}
@Override
public void onError(Throwable t) {
logger.error("Messages write occur errors", t);
}
@Override
public void onCompleted() {
logger.info("Messages write complete");
}
}

View File

@@ -0,0 +1,122 @@
package we.fizz.input;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.rpc.service.GenericService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.util.ReflectionTestUtils;
import we.fizz.Step;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.extension.grpc.GrpcInput;
import we.fizz.input.extension.grpc.GrpcInputConfig;
import we.proxy.dubbo.ApacheDubboGenericService;
import we.proxy.grpc.GrpcGenericService;
import we.proxy.grpc.GrpcInterfaceDeclaration;
import we.proxy.grpc.client.GrpcProxyClient;
import we.proxy.grpc.client.utils.ChannelFactory;
import java.lang.ref.SoftReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class GrpcInputTests {
private static final String URL ="localhost:8090";
private static final String SERVICE_NAME = "com.fizzgate.test";
private static final String METHOD_NAME = "method";
private static final String[] LEFT = new String[]{};
private static final Object[] RIGHT = new Object[]{};
private ApacheDubboGenericService proxy;
// @Before
// public void setup(){
// ApacheDubboGenericProxyTests test = new ApacheDubboGenericProxyTests();
// proxy = test.getMockApachDubbo();
// }
@Test
public void test() {
mockStatic(ChannelFactory.class);
GrpcInterfaceDeclaration declaration = new GrpcInterfaceDeclaration();
declaration.setEndpoint(URL);
declaration.setServiceName(SERVICE_NAME);
declaration.setMethod(METHOD_NAME);
declaration.setTimeout(3000);
GrpcProxyClient grpcProxyClient = mock(GrpcProxyClient.class);
ListenableFuture<?> future = new ListenableFuture<String>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public String get() throws InterruptedException, ExecutionException {
return "result";
}
@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void addListener(Runnable runnable, Executor executor) {
}
};
when(grpcProxyClient.invokeMethodAsync(any(), any(), any(), any())).thenReturn((ListenableFuture<Void>) future);
GrpcGenericService grpcGenericService = new GrpcGenericService();
ReflectionTestUtils.setField(grpcGenericService, "grpcProxyClient", grpcProxyClient);
ConfigurableApplicationContext applicationContext = mock(ConfigurableApplicationContext.class);
when(applicationContext.getBean(GrpcGenericService.class)).thenReturn(grpcGenericService);
Step step = mock(Step.class);
when(step.getCurrentApplicationContext()).thenReturn(applicationContext);
StepResponse stepResponse = new StepResponse(step, null, new HashMap<String, Map<String, Object>>());
GrpcInputConfig config = mock(GrpcInputConfig.class);
when(config.getServiceName()).thenReturn(SERVICE_NAME);
InputFactory.registerInput(GrpcInput.TYPE, GrpcInput.class);
GrpcInput grpcInput = (GrpcInput)InputFactory.createInput(GrpcInput.TYPE.toString());
HashMap <String, Object>request = new HashMap <String, Object>();
request.put("url",URL);
ReflectionTestUtils.setField(grpcInput, "request", request);
grpcInput.setName("input1");
grpcInput.setWeakStep(new SoftReference<>(step));
grpcInput.setStepResponse(stepResponse);
grpcInput.setConfig(config);
StepContext stepContext = mock(StepContext.class);
stepContext.put("step1", stepResponse);
InputContext context = new InputContext(stepContext, null);
grpcInput.beforeRun(context);
grpcInput.run();
}
}