Vert.x gRPC
可以在维基百科看到对gRPC全面的介绍
gRPC是一个由谷歌开发的开源远程方法调用(RPC)系统。 它用HTTP/2作为传输协议,以协议Buffer作为接口描述语言, 它提供一些特性,例如认证、双向streaming和flow的控制, 阻塞或非阻塞绑定,和撤销、超时操作。 它为很多语言生成跨平台客户端和服务端。
Vert.x gRPC是一个将谷歌风格gRPC编程风格对应到Vert.x风格的模块。作为这个模块的用户, 您会更加习惯于用Vert.x Streams和Futures的编码风格从而体验到 gRPC的好处。
更多关于gRPC的信息请查看官方文档 http://www.grpc.io/ 。
Vert.x gRPC 额外支持:
-
用Verticle横向扩展gRPC服务
-
非阻塞的本地传输
gRPC类型
用gRPC,您将受益于HTTP/2协议,这意味着您会获得异步流的支持, 同时您的远程方法调用会有如下特性:
-
客户端发出stream请求,服务端响应单个对象
-
客户端发出stream请求,服务端响应stream
-
客户端发送单个对象请求,服务端响应单个对象
-
客户端发送单个对象请求,服务端响应stream
尽管对于非专业认识来讲,这也许看起来和其他基于HTTP协议实现的RPC没有区别,但您需要注意一点, 那就是在HTTP/2协议下,在响应到达之前您的请求不必完成。这意味着 您的通信通道是全双工的。全双工允可以减小响应延迟并且增强应用的响应能力。
一个简单的 Hello World
为了开始您的第一个Hello World示例,您需要定义协议。gRPC要求
用 protobuffer
格式来定义协议。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// 服务定义。
service Greeter {
// 发送请求
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 包含用户名字的请求消息
message HelloRequest {
string name = 1;
}
// 包含请求信息的响应消息
message HelloReply {
string message = 1;
}
这是一个非常普通的示例,用来展示单次请求单次响应。
编译RPC的定义
要用前述的定义,那么我们需要将其编译。
如果您 喜欢 ,您可以用 protoc
编译器来编译原始文件,您也可以添加如下依赖:
如果您用Apache Maven,您需要添加如下插件:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<id>compile</id>
<configuration>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
${os.detected.classifier}
属性在构建时用来区分操作系统,在苹果系统(OSX)中,
需要替换为 osx-x86_64 ,其他同理。要使用它,您需要在 pom.xml
文件中添加 os-maven-plugin
插件:
<build>
...
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
...
</build>
这个插件会编译 src/main/proto
目录下的原始文件,并且使其对您的项目可用。
如果您正在用Gradle,那么您需要加入如下依赖:
...
apply plugin: 'com.google.protobuf'
...
buildscript {
...
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
}
}
...
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.2.0'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
}
vertx {
artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
}
}
generateProtoTasks {
all()*.plugins {
grpc
vertx
}
}
}
这个插件会编译 build/generated/source/proto/main
目录下的原始文件,然后使其对项目可用。
gRPC 服务
您已经拥有您的RPC基础代码,现在该实现您自己的服务器了。由前述可知,
我们的服务需要实现 sayHello
方法,该方法接收 HelloRequest
对象,然后返回
一个 HelloReply
对象,所以 您可以像如下实现:
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
如果您愿意,您可以让它在服务上可用。Vert.x使服务的创建变得很简单, 您只需要添加如下代码:
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build();
// 异步启动
rpcServer.start();
使用Vert.x future和streams
前述示例通过gRPC异步架构( 例如 io.grpc.stub.StreamObserver
)进行异步处理的方式来使用gRPC服务。
这些代码由 protoc 编译器生成。
上述插件的配置作用于以下插件。
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
它用Vert.x的异步架构( 例如 Future
或 ReadStream
或 WriteStream
)生成服务版本,这样在Vert.x生态中更加方便。
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
};
服务 gzip 压缩
您可以启用gzip压缩来告诉服务端来返回压缩的响应 (服务器会自动处理压缩过的请求)
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
}
.withCompression("gzip");
withCompression
配置是由Vert.x gRPC protoc 插件生成。
您也可以在默认服务上启用压缩功能
(将 ResponseObserver
转换成 ServerCallStreamObserver
并在响应之前调用 setCompression
)
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
((ServerCallStreamObserver) responseObserver)
.setCompression("gzip");
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
注意
|
只要服务端支持,您可以使用任何其他的压缩器,在构建 ManagedChannel 时注册它们。
|
SSL 配置
前一个例子很简单,但您的RPC并不安全。为了让RPC更安全,我们应该启用SSL/TLS:
VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setKeyStoreOptions(new JksOptions()
.setPath("server-keystore.jks")
.setPassword("secret")));
恭喜您拥有了第一个gRPC服务。
重要
|
因为gRPC用HTTP/2作为传输协议,SSL/TLS服务的启用必须拥有 Application-Layer Protocol Negotiation |
服务横向扩展
如果你将一个Verticle部署了多个实例,gRPC服务将 在verticle event-loops上横向扩展。
vertx.deployVerticle(
// Verticle supplier - 被调用了4次
() -> new AbstractVerticle() {
BindableService service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
@Override
public void start() throws Exception {
VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build()
.start();
}
},
// 部署4个实例,即服务以4个eventloop的形式做了横向扩展。
new DeploymentOptions()
.setInstances(4));
BlockingServerInterceptor
gRPC ServerInterceptor 是一个机制, 该机制在向服务端发起请求之前拦截该方法调用。 它有着同步的行为并且在Vert.x event loop上执行。
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, myInterceptor))
.build();
假设我们有一个拦截器,它阻塞了eventloop:
class MyInterceptor implements ServerInterceptor {
@Override
public <Q, A> ServerCall.Listener<Q> interceptCall(
ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
// 例如做一些复杂操作并更新元数据。
return next.startCall(call, headers);
}
}
MyInterceptor myInterceptor = new MyInterceptor();
为了避免阻塞,您应该包装这个拦截器,让它在Vert.x的worker线程上执行。
ServerInterceptor wrapped =
BlockingServerInterceptor.wrap(vertx, myInterceptor);
// 创建服务
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, wrapped))
.build();
// 开启
rpcServer.start();
Context Server Interceptor
一个 abstract context server interceptor 允许拦截向服务发起的请求并提取元数据 到Vert.x context。这个Context不依赖于thread locals,所以使用Vert.x API是安全的。 这个拦截器应该在首位(或者首先添加到拦截器列表的其中之一)
session id 则是一个典型的用法。一个客户端可以创建客户端拦截器并向所有连接设置一个session id:
Metadata extraHeaders = new Metadata();
extraHeaders.put(
Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);
ClientInterceptor clientInterceptor = MetadataUtils
.newAttachHeadersInterceptor(extraHeaders);
channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
.intercept(clientInterceptor)
.build();
在服务端一侧,可以像如下添加拦截器:
BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
@Override
public void bind(Metadata metadata) {
put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
}
};
// 创建服务
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, contextInterceptor))
.build();
gRPC 客户端
没有客户端的服务端是没用的,所以我们创建一个客户端。创建客户端和创建服务端的步骤有重叠。 首先我们需要预先有一个RPC的定义,否则就不会有服务端,然后这个相同的定义会被编译。
请注意:编译器即生成基本服务也会生成客户端存根,所以如果您已经编译了一次,那么您就无需再次编译。
每一个客户端存根都必须有一个服务端通信channel相对应, 所以首先我们需要创建一个gRPC channel:
ManagedChannel channel = VertxChannelBuilder
.forAddress(vertx, "localhost", 8080)
.usePlaintext()
.build();
// 获取一个存根来与远程服务交互
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);
一旦存根生成,我们可以和服务端进行交互,此时这会更加简单,因为存根已经提供了 正确的方法定义和正确的参数类型定义:
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// 调用远程服务
stub.sayHello(request, new StreamObserver<HelloReply>() {
private HelloReply helloReply;
@Override
public void onNext(HelloReply helloReply) {
this.helloReply = helloReply;
}
@Override
public void onError(Throwable throwable) {
System.out.println("Coult not reach server " + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Got the server response: " + helloReply.getMessage());
}
});
使用 Vert.x 的 future 和 streams
前述示例用一个gRPC客户端基于gRPC异步架构(例如 io.grpc.stub.StreamObserver
)来执行异步操作。
代码是由 protoc 编译器生成的。
上述配置作用于下列插件:
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
它用Vert.x的异步架构( 例如 Future
或 ReadStream
或 WriteStream
)生成客户端版本,这样在Vert.x生态中更加方便。
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// Call the remote service
Future<HelloReply> future = stub.sayHello(request);
// Listen to completion events
future
.onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));
客户端gzip压缩
您可以启用gzip压缩来让客户端发送压缩消息。
GreeterGrpc.GreeterStub stub = GreeterGrpc
.newStub(channel)
.withCompression("gzip");
注意
|
只要客户端支持,您可以使用任何其他的压缩器,他们在构建 ManagedChannel 时被注册。
|
SSL 配置
如果您先前启用了SSL,那么您的客户端也必须用SSL,我们需要像如下配置channel:
ManagedChannel channel = VertxChannelBuilder.
forAddress(vertx, "localhost", 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setTrustStoreOptions(new JksOptions()
.setPath("client-truststore.jks")
.setPassword("secret")))
.build();
重要
|
因为gRPC用了 HTTP/2 作为传输协议,客户端SSL/TLS的启用必须有 Application-Layer Protocol Negotiation |
高级配置
直到现在,所有的gRPC示例都使用标准的默认配置,但是还有更多的配置项。如果您需要完整控制服务端的配置,
那么您应该查阅文档: VertxServerBuilder
,
如果您需要控制客户端channel,则查阅 VertxChannelBuilder
。 Vert.x gRPC继承了 grpc-java 项目(Netty传输),因此建议阅读其 文档