Vert.x gRPC

可以在维基百科看到对gRPC全面的介绍

gRPC是谷歌开源的远程方法调用(RPC)系统。 它基于 HTTP/2 传输协议和 ProtoBuffer 接口描述语言, 提供认证,双向流,流量控制,阻塞和非阻塞的调用桩绑定,以及接口调用的撤销、超时等功能和特性。 它为很多语言生成跨平台客户端和服务端。

— wikipedia
wikipedia

Vert.x gRPC 是将谷歌 gRPC 编程风格与 Vert.x 编程风格统一的模块。 使用本模块,您可以在保留 Vert.x Streams 和 Futures 编码风格的同时享受 gRPC 原有的各种特性和优势。

更多关于 gRPC 的信息请查看官方文档 http://www.grpc.io/

警告
本模块是 Vert.x 技术栈自 4.3 版本后对 gRPC 支持的新实现, 基于 gRPC Netty 实现的老模块依然保留,更名为Vert.x gRPC Netty ,相关链接:https://vertx.io/docs/vertx-grpc-netty/java/ 。 本模块处于技术前瞻状态,接口定义可能在后续版本中进行改动

Vert.x gRPC 分为两个部分

  • Vert.x gRPC Server

  • Vert.x gRPC Client

  • Vert.x gRPC Context Storage

Vert.x gRPC 服务器

Vert.x gRPC Server 是一个新的 gRPC 服务端, 内核部分使用 Vert.x HTTP server 替换了原有的 grpc Netty Server

此服务端不但提供面向 请求/响应 式的服务端接口,也提供服务桥接的接口调用桩

集成 Vert.x gRPC 服务器

在项目中添加依赖

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-server</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

dependencies {
 compile 'io.vertx:vertx-grpc-server:4.4.0'
}

gRPC 请求/响应 服务端接口

gRPC 请求/响应 服务端接口可以使用客户端直接进行调用,而不依赖于调用桩

GrpcServer 就是一个 Handler<HttpServerRequest> 并且像 HTTP 服务请求处理器一样使用。

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();
提示
GrpcServer 可以挂载到 Vert.x Web 路由上

请求/响应

每个服务中的接口都由一个处理器处理

server.callHandler(GreeterGrpc.getSayHelloMethod(), request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

流式请求

您可以设置处理器处理请求事件

server.callHandler(StreamingGrpc.getSinkMethod(), request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

流式响应

流式响应使用 write 发送每条响应消息元素 使用 end 结束流

server.callHandler(StreamingGrpc.getSourceMethod(), request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

双向流

一个双向流可以用一个流式请求和一个流式响应简单的组合起来

server.callHandler(StreamingGrpc.getPipeMethod(), request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});

流控

使用 Vert.x streams 对请求和响应进行背压控制

您可以 暂停/恢复/同步 一个请求

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

您可以检查响应的写能力,然后设置恢复处理器

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

压缩

您可在发送消息 之前 设置压缩算法,用于压缩消息

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());

解压缩

解压缩在服务端自动进行 (译者注:gRPC只内置了gzip,如果要使用其他压缩算法需要在客户端和服务端同时进行扩展)

调用桩接口

Vert.x gRPC Server 提供了传统的使用 gRPC 通道的调用桩接口

GrpcServer grpcServer = GrpcServer.server(vertx);

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service bridge in the gRPC server
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

消息级接口

服务端提供了消息级别的接口用于直接处理 protobuf 编码的 gRPC 消息

提示
服务端消息级接口可以和客户端消息级接口一起使用构建一个 gRPC 反向代理

如果您对消息的内容不感兴趣,而是想将消息转发到其他服务, 比方说您在写一个代理,这些接口就十分有用。

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

您也可以使用 messageHandler 处理 GrpcMessage ,这些消息会保留客户端的编码, 如果您想直接转发压缩后的消息就非常有用, 可以避免二次解压缩和压缩。

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

writeMessageendMessage 将处理这些消息编码:

  • 如果消息使用服务端编码,将原样发出

  • 如果消息使用一个不同的编码,它将会重新编码,例如:压缩和解压缩

Vert.x gRPC 客户端

Vert.x gRPC Client 是一个新的 gRPC 客户端,其内核部分使用 Vert.x HTTP Client 替换了原有的 gRPC Netty client

此客户端不但提供面向 请求/响应 式的客户端接口,也提供 gRPC Channel 方式的接口调用桩

集成 Vert.x gRPC 客户端

在项目中添加依赖

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-client</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle(在您的`build.gradle` 文件中):

dependencies {
 compile 'io.vertx:vertx-grpc-client:4.4.0'
}

gRPC 请求/响应 客户端接口

gRPC 请求/响应 客户端接口可以对服务直接进行调用,而不依赖于调用桩

您可以快速的创建客户端实例

GrpcClient client = GrpcClient.client(vertx);

请求/响应

创建和发送请求

SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
MethodDescriptor<HelloRequest, HelloReply> sayHelloMethod = GreeterGrpc.getSayHelloMethod();
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
  // The end method calls the service
  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

response 持有响应 last 持有结果

request.response().onSuccess(response -> {
  Future<HelloReply> fut = response.last();
  fut.onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
});

Future 的组合接口可以将所有步骤组合在一起形成流式编程风格

client
  .request(server, GreeterGrpc.getSayHelloMethod()).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

流式请求

流式请求使用 write 发送每条请求消息元素 使用 end 结束流

client
  .request(server, StreamingGrpc.getSinkMethod())
  .onSuccess(request -> {
  for (int i = 0;i < 10;i++) {
    request.write(Item.newBuilder().setValue("1").build());
  }
  request.end();
});

流式响应

您可以设置处理器去处理每个响应事件

client
  .request(server, StreamingGrpc.getSourceMethod())
  .compose(request -> {
    request.end(Empty.getDefaultInstance());
    return request.response();
  })
  .onSuccess(response -> {
    response.handler(item -> {
      // Process item
    });
    response.endHandler(v -> {
      // Done
    });
    response.exceptionHandler(err -> {
      // Something went bad
    });
  });

双向流

一个双向流可以用一个流式请求和一个流式响应简单的组合起来

流控

使用 Vert.x streams 进行背压控制

您可以检查请求的写能力,然后设置恢复处理器

if (request.writeQueueFull()) {
  request.drainHandler(v -> {
    // Writable again
  });
} else {
  request.write(item);
}

您可以 暂停/恢复/同步 响应

response.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  response.resume();
});

取消指令

您可以使用 cancel 取消一个请求

request.cancel();
注意
取消指令会发送一个 HTTP/2 reset 帧到服务端

压缩

在发送消息 之前 设置压缩算法,用于消息压缩

request.encoding("gzip");

// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());

解压缩

解压缩在服务端自动进行(译者注:gRPC只内置了gzip,如果要使用其他压缩算法需要在客户端和服务端同时进行扩展)。

调用桩接口

Vert.x gRPC Client 提供了传统的使用 gRPC 通道的调用桩 API

GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver<HelloReply>() {
  @Override
  public void onNext(HelloReply value) {
    // Process response
  }
  @Override
  public void onCompleted() {
    // Done
  }
  @Override
  public void onError(Throwable t) {
    // Something went bad
  }
});

消息级接口

客户端提供了消息级别的接口用于直接处理 protobuf 编码的 gRPC 消息

提示
客户端消息级接口可以和服务端消息级接口一起使用构建一个 gRPC 反向代理

如果您对消息的内容不感兴趣,而是想将消息转发到其他服务, 比方说您在写一个代理,这些接口就十分有用。

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.end(protoHello);

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.handler(protoReply -> {
      // Handle the protobuf reply
    });
  });
});

您也可以使用 messageHandler 去处理 GrpcMessage ,这些消息会保留服务端的编码。

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.endMessage(GrpcMessage.message("identity", protoHello));

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.messageHandler(replyMessage -> {
      System.out.println("Got reply message encoded as " + replyMessage.encoding());
    });
  });
});

writeMessageendMessage 将处理这些消息编码:

  • 如果消息使用服务端编码,将原样发出

  • 如果消息使用一个不同的编码, 它将会重新编码,例如压缩和解压缩

Unresolved directive in index.adoc - include::storage.adoc[]