Vert.x gRPC
可以在维基百科看到对gRPC全面的介绍
gRPC是谷歌开源的远程方法调用(RPC)系统。 它基于 HTTP/2 传输协议和 ProtoBuffer 接口描述语言, 提供认证,双向流,流量控制,阻塞和非阻塞的调用桩绑定,以及接口调用的撤销、超时等功能和特性。 它为很多语言生成跨平台客户端和服务端。
wikipedia
Vert.x gRPC 是将谷歌 gRPC 编程风格与 Vert.x 编程风格统一的模块。 使用本模块,您可以在保留 Vert.x Streams 和 Futures 编码风格的同时享受 gRPC 原有的各种特性和优势。
更多关于 gRPC 的信息请查看官方文档 http://www.grpc.io/ 。
警告
|
本模块是 Vert.x 技术栈自 4.3 版本后对 gRPC 支持的新实现, 基于 gRPC Netty 实现的老模块依然保留,更名为Vert.x gRPC Netty ,相关链接:https://vertx.io/docs/vertx-grpc-netty/java/ 。 本模块处于技术前瞻状态,接口定义可能在后续版本中进行改动 |
Vert.x gRPC 分为两个部分
-
Vert.x gRPC Server
-
Vert.x gRPC Client
-
Vert.x gRPC Context Storage
Vert.x gRPC 服务器
Vert.x gRPC Server 是一个新的 gRPC 服务端, 内核部分使用 Vert.x HTTP server 替换了原有的 grpc Netty Server
此服务端不但提供面向 请求/响应 式的服务端接口,也提供服务桥接的接口调用桩
集成 Vert.x gRPC 服务器
在项目中添加依赖
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-grpc-server:4.4.0'
}
gRPC 请求/响应 服务端接口
gRPC 请求/响应 服务端接口可以使用客户端直接进行调用,而不依赖于调用桩
GrpcServer
就是一个 Handler<HttpServerRequest>
并且像 HTTP 服务请求处理器一样使用。
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
提示
|
GrpcServer 可以挂载到 Vert.x Web 路由上
|
请求/响应
每个服务中的接口都由一个处理器处理
server.callHandler(GreeterGrpc.getSayHelloMethod(), request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
流式请求
您可以设置处理器处理请求事件
server.callHandler(StreamingGrpc.getSinkMethod(), request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
流控
使用 Vert.x streams 对请求和响应进行背压控制
您可以 暂停/恢复/同步 一个请求
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
您可以检查响应的写能力,然后设置恢复处理器
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
压缩
您可在发送消息 之前 设置压缩算法,用于压缩消息
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
调用桩接口
Vert.x gRPC Server 提供了传统的使用 gRPC 通道的调用桩接口
GrpcServer grpcServer = GrpcServer.server(vertx);
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
};
// Bind the service bridge in the gRPC server
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);
// Start the HTTP/2 server
vertx.createHttpServer(options)
.requestHandler(grpcServer)
.listen();
消息级接口
服务端提供了消息级别的接口用于直接处理 protobuf 编码的 gRPC 消息
提示
|
服务端消息级接口可以和客户端消息级接口一起使用构建一个 gRPC 反向代理 |
如果您对消息的内容不感兴趣,而是想将消息转发到其他服务, 比方说您在写一个代理,这些接口就十分有用。
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
您也可以使用 messageHandler
处理 GrpcMessage
,这些消息会保留客户端的编码,
如果您想直接转发压缩后的消息就非常有用,
可以避免二次解压缩和压缩。
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
writeMessage
和 endMessage
将处理这些消息编码:
-
如果消息使用服务端编码,将原样发出
-
如果消息使用一个不同的编码,它将会重新编码,例如:压缩和解压缩
Vert.x gRPC 客户端
Vert.x gRPC Client 是一个新的 gRPC 客户端,其内核部分使用 Vert.x HTTP Client 替换了原有的 gRPC Netty client
此客户端不但提供面向 请求/响应 式的客户端接口,也提供 gRPC Channel 方式的接口调用桩
集成 Vert.x gRPC 客户端
在项目中添加依赖
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-client</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle(在您的`build.gradle` 文件中):
dependencies {
compile 'io.vertx:vertx-grpc-client:4.4.0'
}
gRPC 请求/响应 客户端接口
gRPC 请求/响应 客户端接口可以对服务直接进行调用,而不依赖于调用桩
您可以快速的创建客户端实例
GrpcClient client = GrpcClient.client(vertx);
请求/响应
创建和发送请求
SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
MethodDescriptor<HelloRequest, HelloReply> sayHelloMethod = GreeterGrpc.getSayHelloMethod();
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
// The end method calls the service
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
request.response().onSuccess(response -> {
Future<HelloReply> fut = response.last();
fut.onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
});
Future 的组合接口可以将所有步骤组合在一起形成流式编程风格
client
.request(server, GreeterGrpc.getSayHelloMethod()).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
流式请求
client
.request(server, StreamingGrpc.getSinkMethod())
.onSuccess(request -> {
for (int i = 0;i < 10;i++) {
request.write(Item.newBuilder().setValue("1").build());
}
request.end();
});
流式响应
您可以设置处理器去处理每个响应事件
client
.request(server, StreamingGrpc.getSourceMethod())
.compose(request -> {
request.end(Empty.getDefaultInstance());
return request.response();
})
.onSuccess(response -> {
response.handler(item -> {
// Process item
});
response.endHandler(v -> {
// Done
});
response.exceptionHandler(err -> {
// Something went bad
});
});
流控
使用 Vert.x streams 进行背压控制
您可以检查请求的写能力,然后设置恢复处理器
if (request.writeQueueFull()) {
request.drainHandler(v -> {
// Writable again
});
} else {
request.write(item);
}
您可以 暂停/恢复/同步 响应
response.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
response.resume();
});
压缩
在发送消息 之前 设置压缩算法,用于消息压缩
request.encoding("gzip");
// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());
调用桩接口
Vert.x gRPC Client 提供了传统的使用 gRPC 通道的调用桩 API
GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));
GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);
greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply value) {
// Process response
}
@Override
public void onCompleted() {
// Done
}
@Override
public void onError(Throwable t) {
// Something went bad
}
});
消息级接口
客户端提供了消息级别的接口用于直接处理 protobuf 编码的 gRPC 消息
提示
|
客户端消息级接口可以和服务端消息级接口一起使用构建一个 gRPC 反向代理 |
如果您对消息的内容不感兴趣,而是想将消息转发到其他服务, 比方说您在写一个代理,这些接口就十分有用。
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.end(protoHello);
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.handler(protoReply -> {
// Handle the protobuf reply
});
});
});
您也可以使用 messageHandler
去处理 GrpcMessage
,这些消息会保留服务端的编码。
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.endMessage(GrpcMessage.message("identity", protoHello));
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.messageHandler(replyMessage -> {
System.out.println("Got reply message encoded as " + replyMessage.encoding());
});
});
});
writeMessage
和 endMessage
将处理这些消息编码:
-
如果消息使用服务端编码,将原样发出
-
如果消息使用一个不同的编码, 它将会重新编码,例如压缩和解压缩
Unresolved directive in index.adoc - include::storage.adoc[]