Vert.x gRPC

关于gRPC的最佳描述可以在维基百科上看到.

gRPC是谷歌开源的远程方法调用(RPC)系统。 它基于 HTTP/2 传输协议和 ProtoBuffer 接口描述语言, 提供认证,双向流,流量控制,阻塞和非阻塞的调用桩绑定,以及接口调用的撤销、超时等功能和特性。 它为很多语言生成跨平台客户端和服务端。

— 维基百科
维基百科

Vert.x gRPC 是一个使 Google gRPC 编程风格与 Vert.x 风格一致的模块。 作为这个模块的用户,在受益于 gRPC 的同时,您将更熟悉使用 Vert.x Streams 和 Futures 的代码风格。

有关gRPC的更多信息,请参考官方文档网站http://www.grpc.io/。

此外Vert.x gRPC支持

  • 使用 Verticles 扩展 gRPC 服务

  • 非阻塞原生传输

gRPC 类型

基于 HTTP/2 协议的 gRPC,给您带来异步流传输支持, 这意味着您的远程过程调用可以具有以下特征:

  • 客户端流式请求,服务端单次响应

  • 客户端流式请求,服务端流式响应

  • 客户端单次请求,服务端单次响应

  • 客户端单次请求,服务端流式响应

虽然在初次接触的人看来, 这可能与其他基于 HTTP 的 RPC 方法没有什么不同, 但您应该了解 HTTP/2 协议中, 请求不需要在响应到达之前结束。 这意味着您的通信通道是全双工的。 全双工可以让您减少响应时延并提高吞吐量。

一个简单的Hello World

为了开始您的第一个 hello world 示例,需要定义协议。gRPC 要求您使用 protobuffer 格式定义协议。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// 定义问候服务。
service Greeter {
 // Sends a greeting
 rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 包含用户名的请求消息。
message HelloRequest {
 string name = 1;
}

// 包含问候语的响应消息
message HelloReply {
 string message = 1;
}

这是一个非常简单的示例,展示了单请求,单响应模式。

编译 RPC 定义

我们需要编译上面的定义。

如果您 愿意, 可以使用 protoc 编译器编译 proto 文件,或将其集成到构建中。

如您使用 Apache Maven,则需要添加插件:

<plugin>
 <groupId>org.xolstice.maven.plugins</groupId>
 <artifactId>protobuf-maven-plugin</artifactId>
 <version>0.6.1</version>
 <configuration>
   <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
   <pluginId>grpc-java</pluginId>
   <pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
   <protocPlugins>
     <protocPlugin>
       <id>vertx-grpc-protoc-plugin</id>
       <groupId>io.vertx</groupId>
       <artifactId>vertx-grpc-protoc-plugin</artifactId>
       <version>${stack.version}</version>
       <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
     </protocPlugin>
   </protocPlugins>
 </configuration>
 <executions>
   <execution>
     <id>compile</id>
     <configuration>
       <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
       <clearOutputDirectory>false</clearOutputDirectory>
     </configuration>
     <goals>
       <goal>compile</goal>
       <goal>compile-custom</goal>
     </goals>
   </execution>
   <execution>
     <id>test-compile</id>
     <goals>
       <goal>test-compile</goal>
       <goal>test-compile-custom</goal>
     </goals>
   </execution>
 </executions>
</plugin>

${os.detected.classifier} 属性使该构建独立于操作系统,在 OSX 上,它将会被 OSX-x86_64 所替代,其它操作系统以此类推。 要使用它,您需要在您的 pom.xmlbuild 部分中添加 os maven 插件 [https://github.com/trustin/os-maven-plugin] :

<build>
 ...
 <extensions>
   <extension>
     <groupId>kr.motd.maven</groupId>
     <artifactId>os-maven-plugin</artifactId>
     <version>1.4.1.Final</version>
   </extension>
 </extensions>
 ...
</build>

这个插件将编译您在 src/main/proto 下的proto文件,并将其提供给您的项目。

如果您使用Gradle,您需要添加插件:

...
apply plugin: 'com.google.protobuf'
...
buildscript {
 ...
 dependencies {
   // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
   classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
 }
}
...
protobuf {
 protoc {
   artifact = 'com.google.protobuf:protoc:3.2.0'
 }
 plugins {
   grpc {
     artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
   }
   vertx {
     artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
   }
 }
 generateProtoTasks {
   all()*.plugins {
     grpc
     vertx
   }
 }
}

该插件将编译您在 build/generated/source/proto/main 下的 proto 文件,并将其提供给您的项目

gRPC 服务器

现在您应该已经编写了 RPC 基本代码,是实现您的服务端的时候了。 您应该记得,我们在上面描述过,我们的服务端应该实现一个 sayHello 方法,该方法接收 HelloRequest 对象并返回 HelloReply 对象. 因此您可以实现为:

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());
    responseObserver.onCompleted();
  }
};

只要您愿意,您就可以在服务端上提供您的服务。 Vert.x 创建服务非常简单, 您需要添加:

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(service)
  .build();

// 异步启动
rpcServer.start();

使用Vert.x future 和 streams

上述例子是使用 gRPC 异步构造, 如 io.grpc.stub.StreamObserver 异步处理gRPC服务。此代码由 protoc 编译器生成。

插件配置如下:

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

这将生成一个使用 Vert.x 异步构造的服务,例如 FutureReadStreamWriteStream 的服务, 在Vert.x体系中使用更方便。

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  };

服务端 gzip 压缩

您可以启用 gzip 压缩来告诉服务端发送压缩响应 (压缩请求由服务端自动处理)。

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  }
    .withCompression("gzip");

withCompression 配置由 Vert.x gRPC protoc 插件生成。 您也可以通过将 ResponseObserver 转换为 ServerCallStreamObserver 并在发送响应之前调用 setCompression 来启用默认服务的压缩。

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    ((ServerCallStreamObserver) responseObserver)
      .setCompression("gzip");

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());

    responseObserver.onCompleted();
  }
};
注意
您也可以使用其它压缩器,只要服务器支持,在构建 ManagedChannel 时在压缩器注册表里注册它们即可

SSL 配置

前面的示例很简单,但是您的RPC不安全。为了使其安全,我们应该启用SSL/TLS:

VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setKeyStoreOptions(new JksOptions()
      .setPath("server-keystore.jks")
      .setPassword("secret")));

恭喜您刚刚完成第一个gRPC服务端。

重要
由于 gRPC 使用 HTTP/2 传输,SSL/TLS 设置需要在您的服务端中开展 应用层协议协商

Server 扩展

当您部署多实例的 verticles 时, gRPC 服务端将按 verticle event-loops 缩放。

vertx.deployVerticle(

  // Verticle 供应器 - 应该被调用 4 次
  () -> new AbstractVerticle() {

    BindableService service = new GreeterGrpc.GreeterImplBase() {
      @Override
      public void sayHello(
        HelloRequest request,
        StreamObserver<HelloReply> responseObserver) {

        responseObserver.onNext(
          HelloReply.newBuilder()
            .setMessage(request.getName())
            .build());

        responseObserver.onCompleted();
      }
    };

    @Override
    public void start() throws Exception {
      VertxServerBuilder
        .forAddress(vertx, "my.host", 8080)
        .addService(service)
        .build()
        .start();
    }
  },

  // 部署 4 个实例,即服务在 4 个事件循环上扩展
  new DeploymentOptions()
    .setInstances(4));

阻塞型服务端拦截器

gRPC 服务端拦截器是一种机制, 用于传入的调用在发送到服务之前拦截它们。 它具有同步行为,将在 Vert.x event loop 上执行。

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, myInterceptor))
  .build();

假设我们有一个拦截器来阻塞event loop:

class MyInterceptor implements ServerInterceptor {
  @Override
  public <Q, A> ServerCall.Listener<Q> interceptCall(
    ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
    // 做一些困难的事情并更新元数据,例如
    return next.startCall(call, headers);
  }
}
MyInterceptor myInterceptor = new MyInterceptor();

为了避免阻塞,应该包装拦截器。然后它就会Vert.x工作线程中调用。

ServerInterceptor wrapped =
  BlockingServerInterceptor.wrap(vertx, myInterceptor);

// 创建服务器
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, wrapped))
  .build();

// 将其启动
rpcServer.start();

服务端上下文拦截器

服务端上下文拦截器的抽象可用于拦截服务端调用并将元数据提取到 vert.x 上下文中。 此上下文不依赖于本地线程变量,因此在 vert.x API 上是正常使用的。这个拦截器应该是第一个 (或要添加到拦截器列表中的第一个)。

一个典型的使用会话id的例子。 客户端可以创建一个客户端拦截器来设置所有的会话id连接:

Metadata extraHeaders = new Metadata();
extraHeaders.put(
  Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);

ClientInterceptor clientInterceptor = MetadataUtils
  .newAttachHeadersInterceptor(extraHeaders);

channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
  .intercept(clientInterceptor)
  .build();

然后在服务器端可以按如下方式添加拦截器:

BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(
      HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
  @Override
  public void bind(Metadata metadata) {
    put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
  }
};

// 创建服务器
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, contextInterceptor))
  .build();

gRPC 客户端

服务器没有客户端可不行,所以我们来创建一个客户端。实现过程中一些步骤与服务器端类似。 首先,我们需要有 RPC 定义,这应该已经完成,否则将没有服务器端, 并且应该编译相同的定义。

请注意,编译器将始终生成基础服务端和客户端存根,因此如果您已经编译过一次, 则不需要再次编译。

每个客户端存根总是需要一个到服务端的通信通道,所以我们首先需要创建一个gRPC通道:

ManagedChannel channel = VertxChannelBuilder
  .forAddress(vertx, "localhost", 8080)
  .usePlaintext()
  .build();

// 获取调用远程服务的存根
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);

一旦创建了存根,我们就可以和我们的服务端通信了,这次就容易多了, 因为存根已经提供了正确的方法定义和参数类型:

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// 调用远程服务
stub.sayHello(request, new StreamObserver<HelloReply>() {
  private HelloReply helloReply;

  @Override
  public void onNext(HelloReply helloReply) {
    this.helloReply = helloReply;
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Coult not reach server " + throwable.getMessage());
  }

  @Override
  public void onCompleted() {
    System.out.println("Got the server response: " + helloReply.getMessage());
  }
});

使用 Vert.x future 和 streams

上一个例子是使用 gRPC 异步构造, 如使用 io.grpc.stub.StreamObserver 异步处理 gRPC 客户端。 此代码由 protoc 编译器生成。

插件配置如下:

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

这将生成一个使用 Vert.x 异步构造的客户端,例如 FutureReadStreamWriteStream 客户端服务, 在Vert.x体系中使用更方便。

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// 调用远程服务
Future<HelloReply> future = stub.sayHello(request);

// 监听完成事件
future
  .onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));

客户端 gzip 压缩

您可以启用gzip压缩来告诉客户端发送压缩后的消息。

GreeterGrpc.GreeterStub stub = GreeterGrpc
  .newStub(channel)
  .withCompression("gzip");
注意
只要服务端支持其他压缩器,并且在构建 ManagedChannel 时在压缩器注册表注册它们,就可以使用

SSL 配置

如果您之前启用了 SSL,您的客户端也需要 SSL,为此我们需要配置通道:

ManagedChannel channel = VertxChannelBuilder.
  forAddress(vertx, "localhost", 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setTrustStoreOptions(new JksOptions()
      .setPath("client-truststore.jks")
      .setPassword("secret")))
  .build();
重要
由于gRPC使用 HTTP/2 传输,SSL/TLS 设置需要在您的客户端中实行 应用层协议协商

高级的配置

到目前为止,所有gRPC示例都使用了合理的默认值,但还有更多,如果您需要完全控制服务端配置, 您应该参考文档:VertxServerBuilder, 或者如果您需要控制您的客户端通道:VertxChannelBuilder。 Vert.x gRPC 扩展了 grpc-java 项目(Netty transport), 因此推荐阅读 文档

原生传输

客户端和服务端可以使用 Netty 的原生传输, 这是在创建 Vert.x 实例时设置的。

Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));

有关原生传输的更多信息,请参阅 Vert.x Core 文档。