Vert.x 服务代理

当您编写 Vert.x 程序的时候,您也许想将某处独立服务功能提供给其他程序使用。 此时便使用服务代理。它可以让您在事件总线上发布您的 服务 , 任何 Vert.x 程序只要知道其 地址 即可使用该服务。

一个 服务 通过 Java 接口描述并遵循 异步 模式。 从本质上说,消息通过事件总线发送调用服务并获取响应。 但为了方便使用,它会生成一个 代理 ,您可直接使用该代理(用代理提供的服务接口 API)。

使用Vert.x服务代理

使用 Vert.x Service Proxies 之前, 您必须在您得项目当中添加 依赖

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-service-proxy</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

compile 'io.vertx:vertx-service-proxy:4.4.0'

为了 实现 服务代理, 您还需要添加:

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-codegen</artifactId>
 <version>4.4.0</version>
 <classifier>processor</classifier>
 <scope>provided</scope>
</dependency>
  • Gradle < 5 (在您的 build.gradle 文件中):

compileOnly 'io.vertx:vertx-codegen:4.4.0'
  • Gradle >= 5(在您的 build.gradle 文件中):

annotationProcessor 'io.vertx:vertx-codegen:4.4.0:processor'
annotationProcessor 'io.vertx:vertx-service-proxy:4.4.0'

注意:因为服务代理类是由代码自动生成的,因此每当您修改了 服务接口 您必须重新编译源码, 以重新生成代理类。

如果您要生成不同语言的代理类,您需要添加相应的依赖。例如, 生成 Groovy 语言的代理接口需要添加 vertx-lang-groovy 依赖

服务代理简介

让我看看服务代理它怎么用。 如果您在事件总线上公开 数据库服务 您可以执行下面的操作

JsonObject message = new JsonObject();

message
  .put("collection", "mycollection")
  .put("document", new JsonObject().put("name", "tim"));

DeliveryOptions options = new DeliveryOptions().addHeader("action", "save");

vertx.eventBus()
  .request("database-service-address", message, options)
  .onSuccess(msg -> {
    // 完成
  }).onFailure(err -> {
    // 失败
  });

当创建服务的时候,会有一定数量的样本代码在事件总线接收信息, 路由会找到合适的方法并在总线上返回结果

使用 Vert.x 服务代理时, 您可以使用代码生成避免编写重复的代码,从而集中精力编写服务

在您编写的Java接口上面打上 @ProxyGen 注解, 例如:

@ProxyGen
public interface SomeDatabaseService {

 // 几个工厂方法用于创建实例和代理
 static SomeDatabaseService create(Vertx vertx) {
   return new SomeDatabaseServiceImpl(vertx);
 }

 static SomeDatabaseService createProxy(Vertx vertx,
   String address) {
   return new SomeDatabaseServiceVertxEBProxy(vertx, address);
 }

// 此处是实际的服务操作……
void save(String collection, JsonObject document,
  Handler<AsyncResult<Void>> resultHandler);
}

您还需要编写 package-info.java 文件,位置处于接口定义包中。 这个包还需要 @ModuleGen 注解, 以便 Vert.x 代码生成器生成事件总线代理代码。

package-info.java
@io.vertx.codegen.annotations.ModuleGen(groupPackage = "io.vertx.example", name = "services", useFutures = true)
package io.vertx.example;

有了这个接口,Vert.x 会生成所有需要的用于在事件总线上访问您服务的模板代码, 同时也会生成对应的 调用端代理类(client side proxy) , 这样您的服务调用端就可以使用一个相当符合习惯的 API(译者注:即相同的服务接口)进行服务调用,而不是去手动地向事件总线发送消息。 不管您的服务实际依赖于哪个事件总线上(可能是在不同的机器上),调用端代理类都能正常工作。

也就是说,您可以通过以下方式进行服务调用:

SomeDatabaseService service = SomeDatabaseService
  .createProxy(vertx, "database-service-address");

// 保存数据到数据库,这里使用了代理
service.save(
  "mycollection",
  new JsonObject().put("name", "tim"),
  res2 -> {
    if (res2.succeeded()) {
      // 调用完毕
    }
  });

您也可以将多语言 API 生成功能(@VertxGen 注解)与 @ProxyGen 注解相结合, 用于生成其它 Vert.x 支持的 JVM 语言对应的服务代理 —— 这意味着您可以只用 Java 编写您的服务一次, 就可以在其他语言中以一种习惯的 API 风格进行服务调用,而完全不必管服务是在本地还是在哪个事件总线上。 想要利用多语言代码生成功能,不要忘记添加对应支持语言的依赖。

@ProxyGen // 生成服务代理
@VertxGen // 生成客户端
public interface SomeDatabaseService {
 // ...
}

异步接口

想要正确地生成服务代理类,服务接口 的设计必须遵循一些规则。 首先是需要遵循异步模式。 如果需要返回结果,对应的方法需要包含一个 Future<ResultType> 类型的返回值。 其中 ResultType 可以是另一种代理类型(所以一个代理类可以作为另一个代理类的工厂)。

例如:

@ProxyGen
public interface SomeDatabaseService {

// 一些用于创建服务实例和服务代理实例的工厂方法

static SomeDatabaseService create(Vertx vertx) {
  return new SomeDatabaseServiceImpl(vertx);
}

static SomeDatabaseService createProxy(Vertx vertx, String address) {
  return new SomeDatabaseServiceVertxEBProxy(vertx, address);
}

// 异步方法,仅通知调用是否完成,不返回结果
Future<Void> save(String collection, JsonObject document);

// 异步方法,包含JsonObject类型的返回结果
Future<JsonObject> findOne(String collection, JsonObject query);

// 创建连接
Future<MyDatabaseConnection> createConnection(String shoeSize);

}

以及:

@ProxyGen
@VertxGen
public interface MyDatabaseConnection {

void insert(JsonObject someData);

Future<Void> commit();

@ProxyClose
void close();
}

您可以通过声明一个特殊方法,并给其加上 @ProxyClose 注解来注销代理。 当此方法被调用时,代理实例被清除。

更多 服务接口 的限制会在下面详解。

带回调的异步接口

在 Vert.x 4.1 之前,服务异步接口由回调定义。

您仍然可以使用回调创建服务异步接口,使用以下模块声明:

package-info.java
@io.vertx.codegen.annotations.ModuleGen(groupPackage = "io.vertx.example", name = "services", useFutures = false)
package io.vertx.example;
注意
为向后兼容,useFutures 的默认值为 false,因此您也可以省略该声明

带有回调的服务异步接口如下所示:

@ProxyGen
public interface SomeDatabaseService {

 // 一个通知完成而没有结果的方法(void)
 void save(String collection, JsonObject document,
  Handler<AsyncResult<Void>> result);

 // 一个提供了结果的方法(一个 json 对象)
 void findOne(String collection, JsonObject query,
  Handler<AsyncResult<JsonObject>> result);

 // 创建连接
 void createConnection(String shoeSize,
  Handler<AsyncResult<MyDatabaseConnection>> resultHandler);

}

返回类型必须是以下之一:

  • void

  • @Fluent 并返回对该服务的引用(this):

@Fluent
SomeDatabaseService doSomething();

这是因为方法不允许阻塞,如果该服务是远程调用(remote)(译者注:远程意思是服务代理通过事件总线调用其他verticle中的服务), 那么无法在不阻塞的前提下立即返回结果。(译者注:因为远程调用需要消耗一定时间)

安全

服务代理可以使用简单的拦截器保障基本安全。 提供一个身份验证器,可以选择添加 Authorization 在这种情况下,AuthorizationProvider 是必须提提供的。 注意,身份认证的令牌从 auth-token 信息头获取。

SomeDatabaseService service = new SomeDatabaseServiceImpl();
// 注册处理器
new ServiceBinder(vertx)
  .setAddress("database-service-address")
  // 保护传输中的信息
  .addInterceptor(
    "action",
    // 使用JWT认证进行校验令牌
    AuthenticationInterceptor.create(
      JWTAuth.create(vertx, new JWTAuthOptions())))
  .addInterceptor(
    AuthorizationInterceptor.create(JWTAuthorization.create("permissions"))
      // 我们可以选择部分权限进行保护:
      // 比如admin组
      .addAuthorization(RoleBasedAuthorization.create("admin"))
      // 比如打印权限
      .addAuthorization(PermissionBasedAuthorization.create("print")))
  .register(SomeDatabaseService.class, service);

代码生成

被 @ProxyGen 注解的服务接口会触发生成对应的服务辅助类:

  • 服务代理类(service proxy):一个编译时产生的代理类,用 EventBus 通过消息与服务交互。

  • 服务处理器类(service handler): 一个编译时产生的 EventBus 处理器类,用于响应由服务代理发送的事件。

产生的服务代理和处理器的命名是在类名的后面加相关的字段,例如,如果一个服务接口名为 MyService, 则对应的处理器类命名为 MyServiceProxyHandler,对应的服务代理类命名为 MyServiceVertxEBProxy。

此外 Vert.x Core 提供了一个生成器用于数据转化器,以简化服务代理中数据对象的使用。 数据转化器要求数据对象提供一个以 JsonObject 为基础的构造器和 toJson() 方法

codegen 注释处理器在编译时生成这些类 它是Java编译器的功能 所以无需 额外步骤, 只需正确配置您的构建参数即可:

只需要在构建配置中加上 io.vertx:vertx-codegen:processorio.vertx:vertx-service-proxy 依赖。

这是一个针对 Maven 的配置示例:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-codegen</artifactId>
 <version>4.4.0</version>
 <classifier>processor</classifier>
</dependency>
<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-service-proxy</artifactId>
 <version>4.4.0</version>
</dependency>

此功能也可以在 Gradle 中使用:

compile "io.vertx:vertx-codegen:4.4.0:processor"
compile "io.vertx:vertx-service-proxy:4.4.0"

IDE 通常为注释处理器提供支持。

代码生成 处理器 分类器会把服务代理注释处理器的配置自动添加到 jar 包目录下的 META-INF/services 当中。

如果您想和其与常规 jar一起使用,但是需要显式声明注释处理器, 例如在 Maven 中:

<plugin>
 <artifactId>maven-compiler-plugin</artifactId>
 <configuration>
   <annotationProcessors>
     <annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
   </annotationProcessors>
 </configuration>
</plugin>

公开服务

当您写好服务接口以后,执行构建操作以生成代码。 然后您需要将您的服务 注册 到事件总线上:

SomeDatabaseService service = new SomeDatabaseServiceImpl();
// 注册处理器
new ServiceBinder(vertx)
  .setAddress("database-service-address")
  .register(SomeDatabaseService.class, service);

这个过程既可以在 Verticle 中完成,也可以在您的代码的任何其它位置完成。

一旦注册了,这个服务就可用了。如果您的应用运行在集群上, 则集群中节点都可访问。

如果想注销这个服务, 使用 unregister 方法注销:

ServiceBinder binder = new ServiceBinder(vertx);

// 创建服务实现实例
SomeDatabaseService service = new SomeDatabaseServiceImpl();
// 注册处理器
MessageConsumer<JsonObject> consumer = binder
  .setAddress("database-service-address")
  .register(SomeDatabaseService.class, service);

// ....

// 销毁服务。
binder.unregister(consumer);

代理创建

现在服务已经公开, 现在可以消费使用它。 为此,您必须创建一个代理。 创建代理使用 ServiceProxyBuilder 类:

ServiceProxyBuilder builder = new ServiceProxyBuilder(vertx)
  .setAddress("database-service-address");

SomeDatabaseService service = builder.build(SomeDatabaseService.class);
// 设置其他属性:
SomeDatabaseService service2 = builder.setOptions(options)
  .build(SomeDatabaseService.class);

第二种构造通过 DeliveryOptions 构造实例, 您可以在其中配置属性(例如:超时)

或者,您也可以使用代理类。 这个代理名称为 服务接口 类目后追加 VertxEBProxy。 例如, 如果您的 服务接口 名为 SomeDatabaseService,那么代理类名为 SomeDatabaseServiceVertxEBProxy

一般来说, 服务接口 包含 createProxy 静态方法用于创建代理。 但这不是必须的:

@ProxyGen
public interface SomeDatabaseService {

// 静态方法创建代理。
static SomeDatabaseService createProxy(Vertx vertx, String address) {
  return new SomeDatabaseServiceVertxEBProxy(vertx, address);
}

// ...
}

错误处理

服务方法可能会通过向方法的处理器(Handler)传递一个失败状态的 Future (包含一个 ServiceException 实例。 一个 ServiceException 包含 int 类型的错误码、消息,以及一个可选的 JsonObject 对象用于传递额外信息。为了方便, ServiceException.fail 工厂方法来创建一个已经是失败状态并且包装着 ServiceException 实例的失败 Future 。例如:

public class SomeDatabaseServiceImpl implements SomeDatabaseService {

 private static final BAD_SHOE_SIZE = 42;
 private static final CONNECTION_FAILED = 43;

 // 创建连接
 public Future<MyDatabaseConnection> createConnection(String shoeSize) {
   if (!shoeSize.equals("9")) {
     return Future.failedFuture(ServiceException.fail(BAD_SHOE_SIZE, "The shoe size must be 9!",
       new JsonObject().put("shoeSize", shoeSize)));
    } else {
       return doDbConnection().recover(err -> Future.failedFuture(ServiceException.fail(CONNECTION_FAILED, result.cause().getMessage())));
    }
 }
}

服务调用端(客户端)可以检查它接收到的失败状态的 Future 包含的 Throwable 对象是否为 ServiceException 实例。 如果是的话,继续检查内部的特定的错误状态码。 调用端可以通过这些信息来将业务逻辑错误与系统错误(如服务没有被注册到事件总线上)区分开, 以便确定到底发生了哪一种业务逻辑错误。下面是一个例子:

public Future<JsonObject> foo(String shoeSize) {
 SomeDatabaseService service = SomeDatabaseService.createProxy(vertx, SERVICE_ADDRESS);
 server.createConnection("8")
   .compose(connection -> {
     // 正常调用。
     return doSuccessStuff(connection);
   })
   .recover(err -> {
     if (err instanceof ServiceException) {
       ServiceException exc = (ServiceException) err;
       if (exc.failureCode() == SomeDatabaseServiceImpl.BAD_SHOE_SIZE) {
         return Future.failedFuture(
           new InvalidInputError("You provided a bad shoe size: " +
             exc.getDebugInfo().getString("shoeSize")));
       } else if (exc.failureCode() == SomeDatabaseServiceImpl.CONNECTION) {
         return Future.failedFuture(new ConnectionError("Failed to connect to the DB"));
       }
     } else {
       // 必须是一个系统错误,如:服务代理没有对应的已注册的服务
       return Future.failedFuture(new SystemError("An unexpected error occurred: + " result.cause().getMessage()));
     }
   });
}

如果需要的话, 服务实现的时候也可以返回 ServiceException 子类, 只要向事件总线注册了对应的默认 MessageCodec 就可以了。 例如, 比如给定下面的 ServiceException 子类:

class ShoeSizeException extends ServiceException {
 public static final BAD_SHOE_SIZE_ERROR = 42;

 private final String shoeSize;

 public ShoeSizeException(String shoeSize) {
   super(BAD_SHOE_SIZE_ERROR, "In invalid shoe size was received: " + shoeSize);
   this.shoeSize = shoeSize;
 }

 public String getShoeSize() {
   return extra;
 }

 public static <T> Future<T> fail(int failureCode, String message, String shoeSize) {
   return Future.failedFuture(new MyServiceException(failureCode, message, shoeSize));
 }
}

只要向事件总线注册了对应的 MessageCodec , 服务就可以直接向调用者返回自定义的异常类型:

public class SomeDatabaseServiceImpl implements SomeDatabaseService {
 public SomeDataBaseServiceImpl(Vertx vertx) {
   // 注册服务,如果你是用事件总线使用本地模式,这就是全部
   // 因为代理端和服务端共享一个vert.x实例
 SomeDatabaseService service = SomeDatabaseService.createProxy(vertx, SERVICE_ADDRESS);
   vertx.eventBus().registerDefaultCodec(ShoeSizeException.class,
     new ShoeSizeExceptionMessageCodec());
 }

 // 创建连接
 Future<MyDatabaseConnection> createConnection(String shoeSize) {
   if (!shoeSize.equals("9")) {
     return ShoeSizeException.fail(shoeSize);
   } else {
     // 此处创建连接
     return Future.succeededFuture(myDbConnection);
   }
 }
}

最后调用端可以检查自定义的异常类型了:

public Future<JsonObject> foo(String shoeSize) {
 // 如果运行在集群模式当中,代码在不同的节点运行,
 // ShoeSizeExceptionMessageCodec 必须注册到
 // 该节点的Vertx当中
 SomeDatabaseService service = SomeDatabaseService.createProxy(vertx, SERVICE_ADDRESS);
 service.createConnection("8")
   .compose(connection -> {
     // 成功调用。
     return doSuccessStuff(connection);
   })
   .recover(err -> {
     if (result.cause() instanceof ShoeSizeException) {
       ShoeSizeException exc = (ShoeSizeException) result.cause();
       return Future.failedFuture(
         new InvalidInputError("You provided a bad shoe size: " + exc.getShoeSize()));
     } else {
       // 必须是个系统错误 (例如:没有为服务代理进行注册)
       return Future.failedFuture(
         new SystemError("An unexpected error occurred: + " result.cause().getMessage())
       );
     }
   });
}

注意在 Vertx 集群模式下,您需要向集群中每个节点的事件总线注册对应的自定义异常类型 的 MessageCodec 实例

接口类型限制

在服务中参数和返回值在类型上有一定的限制,因此可以方便在事件总线中进行转化。 他们是:

数据类型

JSON 表示 JsonObject 或 JsonArray PRIMITIVE 表示任何原始类型或被自动拆装箱的原始类型

参数可以是以下任意一种:

  • JSON

  • PRIMITIVE

  • List<JSON>

  • List<PRIMITIVE>

  • Set<JSON>

  • Set<PRIMITIVE>

  • Map<String, JSON>

  • Map<String, PRIMITIVE>

  • 任何 枚举 类型

  • 任何被打上 @DataObject 注解的实体类

异步结果可建模为

  • Future<R>

  • 用于回调样式的 Handler<AsyncResult<R>>

R 的类型可以是:

  • JSON

  • PRIMITIVE

  • List<JSON>

  • List<PRIMITIVE>

  • Set<JSON>

  • Set<PRIMITIVE>

  • 任何 枚举 类型

  • 任何打上 @DataObject 注解的类(需符合上文的代码篇章要求)

  • 另一个代理类

重载方法

服务接口不支持任何的重载(即方法名称相同,但参数列表不同)服务方法。。

通过事件总线调用服务的规则 (不使用服务代理)

服务代理假定事件总线中的消息遵循一定的格式,因此能被用于服务的调用

当然,如果不愿意的话,您也可以 不用 服务代理类来访问远程服务。 被广泛接受的与服务交互的方式就是直接在事件总线发送消息。

为了使服务访问的方式一致, 所有的服务都必须遵循以下的消息格式。

格式非常简单:

  • 需要有一个名为 action 的 消息头(header),作为要执行操作的名称。

  • 消息体(message body)应该是一个 JsonObject 对象,里面需要包含操作需要的所有参数。

举个例子,假如我们要去执行一个名为 save 的操作,此操作接受一个字符串类型的 collection 和 JsonObject 类型 document:

Headers:
   "action": "save"
Body:
   {
       "collection", "mycollection",
       "document", {
           "name": "tim"
       }
   }

无论有没有用到服务代理来创建服务,都应该用上面这种方式编写服务, 因为这样允许服务交互时保持一致性。

在上面的例子中,"action" 对应的值应该与服务接口的某个方法名称相对应, 而消息体中每个 [key, value] 都要与服务方法中的某个 [arg_name, arg_value] 相对应

对于返回值,服务需使用 message.reply(…​) 方法去向调用端发送回一个返回值 - 这个值可以是事件总线支持的任何类型。 如果需要表示调用失败,可以调用 message.fail(…​) 方法。

如果您使用 Vert.x 服务代理组件的话,生成的代码会自动帮您处理这些问题。