Vert.x gRPC

可以在维基百科看到对gRPC全面的介绍

gRPC是一个由谷歌开发的开源远程方法调用(RPC)系统。
它用HTTP/2作为传输协议,以协议Buffer作为接口描述语言,
它提供一些特性,例如认证、双向streaming和flow的控制,
阻塞或非阻塞绑定,和撤销、超时操作。
它为很多语言生成跨平台客户端和服务端。

Vert.x gRPC是一个将谷歌风格gRPC编程风格对应到Vert.x风格的模块。作为这个模块的用户, 您会更加习惯于用Vert.x Streams和Futures的编码风格从而体验到 gRPC的好处。

更多关于gRPC的信息请查看官方文档 http://www.grpc.io/

Vert.x gRPC 额外支持:

  • 用Verticle横向扩展gRPC服务

  • 非阻塞的本地传输

gRPC类型

用gRPC,您将受益于HTTP/2协议,这意味着您会获得异步流的支持, 同时您的远程方法调用会有如下特性:

  • 客户端发出stream请求,服务端响应单个对象

  • 客户端发出stream请求,服务端响应stream

  • 客户端发送单个对象请求,服务端响应单个对象

  • 客户端发送单个对象请求,服务端响应stream

尽管对于非专业认识来讲,这也许看起来和其他基于HTTP协议实现的RPC没有区别,但您需要注意一点, 那就是在HTTP/2协议下,在响应到达之前您的请求不必完成。这意味着 您的通信通道是全双工的。全双工允可以减小响应延迟并且增强应用的响应能力。

一个简单的 Hello World

为了开始您的第一个Hello World示例,您需要定义协议。gRPC要求 用 protobuffer 格式来定义协议。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// 服务定义。
service Greeter {
 // 发送请求
 rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 包含用户名字的请求消息
message HelloRequest {
 string name = 1;
}

// 包含请求信息的响应消息
message HelloReply {
 string message = 1;
}

这是一个非常普通的示例,用来展示单次请求单次响应。

编译RPC的定义

要用前述的定义,那么我们需要将其编译。

如果您 喜欢 ,您可以用 protoc 编译器来编译原始文件,您也可以添加如下依赖:

如果您用Apache Maven,您需要添加如下插件:

<plugin>
 <groupId>org.xolstice.maven.plugins</groupId>
 <artifactId>protobuf-maven-plugin</artifactId>
 <version>0.6.1</version>
 <configuration>
   <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
   <pluginId>grpc-java</pluginId>
   <pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
   <protocPlugins>
     <protocPlugin>
       <id>vertx-grpc-protoc-plugin</id>
       <groupId>io.vertx</groupId>
       <artifactId>vertx-grpc-protoc-plugin</artifactId>
       <version>${stack.version}</version>
       <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
     </protocPlugin>
   </protocPlugins>
 </configuration>
 <executions>
   <execution>
     <id>compile</id>
     <configuration>
       <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
       <clearOutputDirectory>false</clearOutputDirectory>
     </configuration>
     <goals>
       <goal>compile</goal>
       <goal>compile-custom</goal>
     </goals>
   </execution>
   <execution>
     <id>test-compile</id>
     <goals>
       <goal>test-compile</goal>
       <goal>test-compile-custom</goal>
     </goals>
   </execution>
 </executions>
</plugin>

${os.detected.classifier} 属性在构建时用来区分操作系统,在苹果系统(OSX)中, 需要替换为 osx-x86_64 ,其他同理。要使用它,您需要在 pom.xml 文件中添加 os-maven-plugin 插件:

<build>
 ...
 <extensions>
   <extension>
     <groupId>kr.motd.maven</groupId>
     <artifactId>os-maven-plugin</artifactId>
     <version>1.4.1.Final</version>
   </extension>
 </extensions>
 ...
</build>

这个插件会编译 src/main/proto 目录下的原始文件,并且使其对您的项目可用。

如果您正在用Gradle,那么您需要加入如下依赖:

...
apply plugin: 'com.google.protobuf'
...
buildscript {
 ...
 dependencies {
   // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
   classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
 }
}
...
protobuf {
 protoc {
   artifact = 'com.google.protobuf:protoc:3.2.0'
 }
 plugins {
   grpc {
     artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
   }
   vertx {
     artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
   }
 }
 generateProtoTasks {
   all()*.plugins {
     grpc
     vertx
   }
 }
}

这个插件会编译 build/generated/source/proto/main 目录下的原始文件,然后使其对项目可用。

gRPC 服务

您已经拥有您的RPC基础代码,现在该实现您自己的服务器了。由前述可知, 我们的服务需要实现 sayHello 方法,该方法接收 HelloRequest 对象,然后返回 一个 HelloReply 对象,所以 您可以像如下实现:

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());
    responseObserver.onCompleted();
  }
};

如果您愿意,您可以让它在服务上可用。Vert.x使服务的创建变得很简单, 您只需要添加如下代码:

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(service)
  .build();

// 异步启动
rpcServer.start();

使用Vert.x future和streams

前述示例通过gRPC异步架构( 例如 io.grpc.stub.StreamObserver )进行异步处理的方式来使用gRPC服务。 这些代码由 protoc 编译器生成。

上述插件的配置作用于以下插件。

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

它用Vert.x的异步架构( 例如 FutureReadStreamWriteStream )生成服务版本,这样在Vert.x生态中更加方便。

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  };

服务 gzip 压缩

您可以启用gzip压缩来告诉服务端来返回压缩的响应 (服务器会自动处理压缩过的请求)

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  }
    .withCompression("gzip");

withCompression 配置是由Vert.x gRPC protoc 插件生成。 您也可以在默认服务上启用压缩功能 (将 ResponseObserver 转换成 ServerCallStreamObserver 并在响应之前调用 setCompression

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    ((ServerCallStreamObserver) responseObserver)
      .setCompression("gzip");

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());

    responseObserver.onCompleted();
  }
};
注意
只要服务端支持,您可以使用任何其他的压缩器,在构建 ManagedChannel 时注册它们。

SSL 配置

前一个例子很简单,但您的RPC并不安全。为了让RPC更安全,我们应该启用SSL/TLS:

VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setKeyStoreOptions(new JksOptions()
      .setPath("server-keystore.jks")
      .setPassword("secret")));

恭喜您拥有了第一个gRPC服务。

重要
因为gRPC用HTTP/2作为传输协议,SSL/TLS服务的启用必须拥有 Application-Layer Protocol Negotiation

服务横向扩展

如果你将一个Verticle部署了多个实例,gRPC服务将 在verticle event-loops上横向扩展。

vertx.deployVerticle(

  // Verticle supplier - 被调用了4次
  () -> new AbstractVerticle() {

    BindableService service = new GreeterGrpc.GreeterImplBase() {
      @Override
      public void sayHello(
        HelloRequest request,
        StreamObserver<HelloReply> responseObserver) {

        responseObserver.onNext(
          HelloReply.newBuilder()
            .setMessage(request.getName())
            .build());

        responseObserver.onCompleted();
      }
    };

    @Override
    public void start() throws Exception {
      VertxServerBuilder
        .forAddress(vertx, "my.host", 8080)
        .addService(service)
        .build()
        .start();
    }
  },

  // 部署4个实例,即服务以4个eventloop的形式做了横向扩展。
  new DeploymentOptions()
    .setInstances(4));

BlockingServerInterceptor

gRPC ServerInterceptor 是一个机制, 该机制在向服务端发起请求之前拦截该方法调用。 它有着同步的行为并且在Vert.x event loop上执行。

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, myInterceptor))
  .build();

假设我们有一个拦截器,它阻塞了eventloop:

class MyInterceptor implements ServerInterceptor {
  @Override
  public <Q, A> ServerCall.Listener<Q> interceptCall(
    ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
    // 例如做一些复杂操作并更新元数据。
    return next.startCall(call, headers);
  }
}
MyInterceptor myInterceptor = new MyInterceptor();

为了避免阻塞,您应该包装这个拦截器,让它在Vert.x的worker线程上执行。

ServerInterceptor wrapped =
  BlockingServerInterceptor.wrap(vertx, myInterceptor);

// 创建服务
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, wrapped))
  .build();

// 开启
rpcServer.start();

Context Server Interceptor

一个 abstract context server interceptor 允许拦截向服务发起的请求并提取元数据 到Vert.x context。这个Context不依赖于thread locals,所以使用Vert.x API是安全的。 这个拦截器应该在首位(或者首先添加到拦截器列表的其中之一)

session id 则是一个典型的用法。一个客户端可以创建客户端拦截器并向所有连接设置一个session id:

Metadata extraHeaders = new Metadata();
extraHeaders.put(
  Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);

ClientInterceptor clientInterceptor = MetadataUtils
  .newAttachHeadersInterceptor(extraHeaders);

channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
  .intercept(clientInterceptor)
  .build();

在服务端一侧,可以像如下添加拦截器:

BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(
      HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
  @Override
  public void bind(Metadata metadata) {
    put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
  }
};

// 创建服务
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, contextInterceptor))
  .build();

gRPC 客户端

没有客户端的服务端是没用的,所以我们创建一个客户端。创建客户端和创建服务端的步骤有重叠。 首先我们需要预先有一个RPC的定义,否则就不会有服务端,然后这个相同的定义会被编译。

请注意:编译器即生成基本服务也会生成客户端存根,所以如果您已经编译了一次,那么您就无需再次编译。

每一个客户端存根都必须有一个服务端通信channel相对应, 所以首先我们需要创建一个gRPC channel:

ManagedChannel channel = VertxChannelBuilder
  .forAddress(vertx, "localhost", 8080)
  .usePlaintext()
  .build();

// 获取一个存根来与远程服务交互
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);

一旦存根生成,我们可以和服务端进行交互,此时这会更加简单,因为存根已经提供了 正确的方法定义和正确的参数类型定义:

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// 调用远程服务
stub.sayHello(request, new StreamObserver<HelloReply>() {
  private HelloReply helloReply;

  @Override
  public void onNext(HelloReply helloReply) {
    this.helloReply = helloReply;
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Coult not reach server " + throwable.getMessage());
  }

  @Override
  public void onCompleted() {
    System.out.println("Got the server response: " + helloReply.getMessage());
  }
});

使用 Vert.x 的 future 和 streams

前述示例用一个gRPC客户端基于gRPC异步架构(例如 io.grpc.stub.StreamObserver )来执行异步操作。 代码是由 protoc 编译器生成的。

上述配置作用于下列插件:

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

它用Vert.x的异步架构( 例如 FutureReadStreamWriteStream )生成客户端版本,这样在Vert.x生态中更加方便。

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// Call the remote service
Future<HelloReply> future = stub.sayHello(request);

// Listen to completion events
future
  .onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));

客户端gzip压缩

您可以启用gzip压缩来让客户端发送压缩消息。

GreeterGrpc.GreeterStub stub = GreeterGrpc
  .newStub(channel)
  .withCompression("gzip");
注意
只要客户端支持,您可以使用任何其他的压缩器,他们在构建 ManagedChannel 时被注册。

SSL 配置

如果您先前启用了SSL,那么您的客户端也必须用SSL,我们需要像如下配置channel:

ManagedChannel channel = VertxChannelBuilder.
  forAddress(vertx, "localhost", 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setTrustStoreOptions(new JksOptions()
      .setPath("client-truststore.jks")
      .setPassword("secret")))
  .build();
重要
因为gRPC用了 HTTP/2 作为传输协议,客户端SSL/TLS的启用必须有 Application-Layer Protocol Negotiation

高级配置

直到现在,所有的gRPC示例都使用标准的默认配置,但是还有更多的配置项。如果您需要完整控制服务端的配置, 那么您应该查阅文档: VertxServerBuilder , 如果您需要控制客户端channel,则查阅 VertxChannelBuilder 。 Vert.x gRPC继承了 grpc-java 项目(Netty传输),因此建议阅读其 文档

本地传输

客户端和服务端可以用Netty的本地传输来部署,这是在创建Vert.x实例时实现的。

Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));

了解更多本地传输的信息,请查阅 Vert.x Core 文档。