Vert.x RxJava

用于 RxJava 的 Vert.x API

RxJava 是 JVM 上一个流行的库,用于组合异步的、使用可观察序列的、基于事件的程序。 Vert.x 与 RxJava 集成起来很自然: 它使得无论什么时候,只要我们能使用流和异步结果,就能使用 Observable。

在 RxJava1 中使用 Vert.x API

要在 RxJava1 中使用 Vert.x API,请将以下依赖项添加到构建描述符的 dependencies 部分中:

  • Maven(在您的 pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rx-java</artifactId>
 <version>4.0.3</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

compile 'io.vertx:vertx-rx-java:4.0.3'

要使用 Vert.x 的 RxJava API,有两种方式:

  • 通过原始的 Vert.x API 辅以 RxHelper 类, 该辅助类提供了用于 Vert.x Core API 和 RxJava API 之间互相转化的静态方法。

  • 通过基于 Vert.x Core API 增强的 Rx化的 Vert.x API。

可读流支持

RxJava 中 Observable 的概念和 Vert.x 中 ReadStream 类是一对完美的匹配:都提供了一个对象流。

静态方法 RxHelper.toObservable 用于将 Vert.x 可读流转换为 rx.Observable

FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Observable<Buffer> observable = RxHelper.toObservable(file);
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

Rx化的 Vert.x API 在 ReadStream 类上提供了 toObservable 方法:

FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Observable<Buffer> observable = file.toObservable();
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

这样的 Observable 是所谓 hot Observable,即不管是否有订阅,它们都会产生通知。 ReadStream 是否能自发地发射数据,这取决于它的具体实现:

当订阅动作发生时,适配器会调用 handler 来设置它的 handler 。

某些 ReadStream 实现会在这个调用之后开始发射事件,而其他的则与 handler 是否设置无关:

  • AsyncFile 在 handler 设置后开始产生 buffer 事件

  • HttpServerRequest 则不依赖于此(即 如果 handler 未设置,buffer 可能会丢失)

在上述所有情形中,订阅 Observable 都是安全的。原因在于不管 event loop 还是 worker verticle 都不会被并发执行,所以订阅一定是在 handler 开始发射数据之前发生。

当你想延迟订阅时,需要先 暂停(pause) ReadStream ,并在之后 恢复(resume) 它, 这与使用 ReadStream 一样。

server.requestHandler(request -> {
  if (request.method() == HttpMethod.POST) {

    // 暂停接收 buffer
    request.pause();

    checkAuth(res -> {

      // 现在可以重新接收 buffer
      request.resume();

      if (res.succeeded()) {
        Observable<Buffer> observable = request.toObservable();
        observable.subscribe(buff -> {
          // 获得 buffer
        });
      }
    });
  }
});

同样的,将一个 Observable 转变为 Vert.x ReadStream 也是可以的。

静态方法 RxHelper.toReadStream 用于将 rx.Observable 转换为 Vert.x 可读流:

Observable<Buffer> observable = getObservable();
ReadStream<Buffer> readStream = RxHelper.toReadStream(observable);
Pump pump = Pump.pump(readStream, response);
pump.start();

可写流支持

WriteStream 类似于 rx.Subscriber ,它会消费数据,并且在消费速度无法跟上生产速度时与生产者协作,以避免积压的情况不断增加。

Vert.x 提供了 WriteStreamSubscriber 适配器,它可以发送 Observable 对象到任意 WriteStream

将 buffer 发送到 HTTP 服务响应
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rx.java.RxHelper.toSubscriber(response);
observable.subscribe(subscriber);

如果您使用 Rx化的 Vert.x API 进行编程,WriteStream 的实现提供了一个 toSubscriber 方法。 这样一来,上面的例子可以变得更直接明了:

response.setChunked(true);
observable.subscribe(response.toSubscriber());
注意
Observable 成功结束时,该适配器会调用 end 方法。
小心
该适配器会设置 WriteStreamdrainexception handler,所以订阅后请不要使用它们。

WriteStreamSubscriber 适配器在下述情况下会调用回调方法:

  • Observable 错误地结束,或

  • WriteStream 失败(如 HTTP 连接被关闭,或文件系统已满),或

  • WriteStream 结束(即,所有写入已完成,且文件已关闭),或

  • WriteStream 错误地结束(即,所有写入已结束,当关闭文件时发生了错误)

这样不但可以设计更健壮的程序,而且可以在处理完流之后安排其他任务:

response.setChunked(true);

WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();

subscriber.onError(throwable -> {
  if (!response.headWritten() && response.closed()) {
    response.setStatusCode(500).end("oops");
  } else {
    // 错误日志
  }
});

subscriber.onWriteStreamError(throwable -> {
  // 错误日志
});

subscriber.onWriteStreamEnd(() -> {
  // 将事务结束记录到审计系统
});

observable.subscribe(subscriber);
注意
如果 WriteStream 失败,则该是配置取消订阅 Observable

Handler 支持

RxHelper 类可以创建 ObservableHandler 对象,它是一个 Observable 对象, 它的 toHandler 方法会返回 Handler<T> 接口的实现:

ObservableHandler<Long> observable = RxHelper.observableHandler();
observable.subscribe(id -> {
  // Fired
});
vertx.setTimer(1000, observable.toHandler());

Rx化的 Vert.x API 未提供针对 Handler 的 API。

异步结果支持

以一个现有的 Vert.x Handler<AsyncResult<T>> 对象为基础,你可以创建一个 RxJava Subscriber, 然后将其注册在 ObservableSingle 上:

observable.subscribe(RxHelper.toSubscriber(handler1));

// 订阅 Single
single.subscribe(RxHelper.toSubscriber(handler2));

在构造(construct)发生时,作为异步方法的最后一个参数的 Vert.x Handler<AsyncResult<T>> 可以被映射为单个元素的 Observable:

  • 当回调成功时,观察者的 onNext 方法将被调用,参数就是这个对象; 且其后 onComplete 方法会立即被调用。

  • 当回调失败时,观察者的 onError 方法将被调用。

RxHelper.observableFuture 方法可以创建一个 ObservableFuture 对象。 这是一个 Observable 对象,它的 toHandler 方法会返回 Handler<AsyncResult<T>> 接口的实现:

ObservableFuture<HttpServer> observable = RxHelper.observableFuture();
observable.subscribe(
    server -> {
      // 服务器在监听
    },
    failure -> {
      // 服务器无法启动
    }
);
vertx.createHttpServer(new HttpServerOptions().
    setPort(1234).
    setHost("localhost")
).listen(observable.toHandler());

我们可以从 ObservableFuture<Server> 中获取单个 HttpServer 对象。如果端口 监听(listen) 失败, 订阅者将会接收到通知。

RxHelper.toHandler 方法为观察者(Observer)和事件处理器(Handler)做了适配:

Observer<HttpServer> observer = new Observer<HttpServer>() {
  @Override
  public void onNext(HttpServer o) {
  }
  @Override
  public void onError(Throwable e) {
  }
  @Override
  public void onCompleted() {
  }
};
Handler<AsyncResult<HttpServer>> handler = RxHelper.toFuture(observer);

下面的代码也是可以的(译者注:直接基于 Action ):

Action1<HttpServer> onNext = httpServer -> {};
Action1<Throwable> onError = httpServer -> {};
Action0 onComplete = () -> {};

Handler<AsyncResult<HttpServer>> handler1 = RxHelper.toFuture(onNext);
Handler<AsyncResult<HttpServer>> handler2 = RxHelper.toFuture(onNext, onError);
Handler<AsyncResult<HttpServer>> handler3 = RxHelper.toFuture(onNext, onError, onComplete);

Rx化的 Vert.x API 复制了类似的每一个方法,并冠以 rx 的前缀,它们都返回 RxJava 的 Single 对象:

Single<HttpServer> single = vertx
  .createHttpServer()
  .rxListen(1234, "localhost");

// 订阅绑定端口的事件
single.
    subscribe(
        server -> {
          // 服务器正在监听
        },
        failure -> {
          // 服务器无法启动
        }
    );

这样的 Single 是 “冷的”(cold) ,对应的 API 方法将在注册时被调用。

注意
类似 rx* 的方法替换了以前版本中 *Observable 的方法, 这样一个语义上的改变是为了与 RxJava 保持一致。

调度器支持

有时候 Reactive 扩展库需要执行一些可调度的操作,例如 Observable#timer 方法将创建一个能周期性发射事件的定时器并返回之。缺省情况下,这些可调度的操作由 RxJava 管理, 这意味着定时器线程并非 Vert.x 线程,因此(这些操作)并不是在 Vert.x Event Loop 线程上执行的。

在 RxJava 中,有些操作通常会有接受一个 rx.Scheduler 参数的重载方法用于设定 SchedulerRxHelper 类提供了一个 RxHelper.scheduler 方法,其返回的调度器可供 RxJava 的这些方法使用。比如:

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler);

对于阻塞型的可调度操作(blocking scheduled actions),我们可以通过 RxHelper.blockingScheduler 方法获得适用的调度器:

Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Integer> obs = blockingObservable.observeOn(scheduler);

RxJava 也能被配置成使用 Vert.x 的调度器,这得益于 RxHelper.schedulerHook 方法创建的调度器钩子对象。 对于 IO 操作这里使用了阻塞型的调度器:

RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());

Rx化的 Vert.x API 在 RxHelper 类中也提供了相似的方法:

Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaSchedulersHook hook = io.vertx.rxjava.core.RxHelper.schedulerHook(vertx);
  RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
  RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
  RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());

基于一个命名的工作线程池(named worker pool)创建调度器也是可以的, 如果你想为了调度阻塞操作复用特定的线程池,这将会很有帮助:

Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

JSON解码

RxHelper.unmarshaller 方法创建了一个 rx.Observable.Operator 对象, 它可以将 Observable<Buffer> 变换为对象的 Observable:

fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Observable<Buffer> observable = RxHelper.toObservable(file);
  observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // 处理对象
      }
  );
});

Rx化 的辅助类也能做同样的事情:

fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Observable<Buffer> observable = file.toObservable();
  observable.lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // 处理对象
      }
  );
});

部署Verticle

Rx化的 API 不能部署一个已经存在的 Verticle 实例。RxHelper.observableFuture 方法为此提供了一个解决方案。

所有工作都在 RxHelper.deployVerticle 方法里自动完成,它会部署一个 Verticle 并返回包含部署 ID 的 Observable<String>

Observable<String> deployment = RxHelper.deployVerticle(vertx, verticle);

deployment.subscribe(id -> {
  // 部署成功
}, err -> {
  // 部署失败
});

Rx化的 API

Rx化的 API 是 Vert.x API 的一个代码自动生成版本,就像 Vert.x 的 JavaScriptGroovy 版本一样。 这些 API 以 io.vertx.rxjava 为包名前缀,例如 io.vertx.core.Vertx 类对应为 Vertx 类。

嵌入Rx化的 Vert.x

只需使用 Vertx.vertx 方法:

Vertx vertx = io.vertx.rxjava.core.Vertx.vertx();

作为 Verticle

通过继承 AbstractVerticle 类,它会做一些包装(您将获得一个 RxJava Verticle):

class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
  public void start() {
    // 在此可使用Rx化的Vert.x了
  }
}

部署一个 RxJava Verticle 不需要特别的部署器,使用 Java 部署器即可。

支持异步启动的 Verticle 可以重写 rxStart 方法并返回一个 Completable 实例:

class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
  public Completable rxStart() {
    return vertx.createHttpServer()
      .requestHandler(req -> req.response().end("Hello World"))
      .rxListen()
      .toCompletable();
  }
}

API 例子

让我们通过研究一些例子来了解相关 API 吧。

EventBus 消息流

很自然地, MessageConsumer 类提供了相关的 Observable<Message<T>>

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Subscription sub = observable.subscribe(msg -> {
  // 获得消息
});

// 10秒后注销
vertx.setTimer(10000, id -> {
  sub.unsubscribe();
});

MessageConsumer 类提供了 Message 的流。 如果需要,还可以通过 body 方法获得消息体组成的新流:

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();

RxJava 的 map/reduce 组合风格在这里是相当有用的:

Observable<Double> observable = vertx.eventBus().
    <Double>consumer("heat-sensor").
    bodyStream().
    toObservable();

observable.
    buffer(1, TimeUnit.SECONDS).
    map(samples -> samples.
        stream().
        collect(Collectors.averagingDouble(d -> d))).
    subscribe(heat -> {
      vertx.eventBus().send("news-feed", "Current heat is " + heat);
    });

定时器Timers

定时器任务可以通过 timerStream 方法来创建:

vertx.timerStream(1000).
    toObservable().
    subscribe(
        id -> {
          System.out.println("Callback after 1 second");
        }
    );

周期性的任务可以通过 periodicStream 方法来创建:

vertx.periodicStream(1000).
    toObservable().
    subscribe(
        id -> {
          System.out.println("Callback every second");
        }
    );

通过注销操作可以取消对 Observable 的订阅:

vertx.periodicStream(1000).
    toObservable().
    subscribe(new Subscriber<Long>() {
      public void onNext(Long aLong) {
        // 回调
        unsubscribe();
      }
      public void onError(Throwable e) {}
      public void onCompleted() {}
    });

HTTP客户端请求

rxRequest 方法返回一个 HttpClientRequest 的 Single 对象。 这个 Single 对象可以将请求失败上报。

调用 rxSend 方法可以将请求发送出去,并返回响应。

HttpClient client = vertx.createHttpClient(new HttpClientOptions());
Single<HttpClientResponse> request = client
  .rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
  .flatMap(HttpClientRequest::rxSend);
request.subscribe(
    response -> {
      // 处理响应
    },
    error -> {
      // 无法连接
    }
);

通过 toObservable 方法可以将响应当成 Observable<Buffer> 来处理:

Single<HttpClientResponse> request = client
  .rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
  .flatMap(HttpClientRequest::rxSend);
request.toObservable().
    subscribe(
        response -> {
          Observable<Buffer> observable = response.toObservable();
          observable.forEach(
              buffer -> {
                // 处理 buffer
              }
          );
        }
    );

flatMap 操作也能获得同样的流:

Single<HttpClientResponse> request = client
  .rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
  .flatMap(HttpClientRequest::rxSend);
request.toObservable().
    flatMap(HttpClientResponse::toObservable).
    forEach(
        buffer -> {
          // 处理 buffer
        }
    );

通过静态方法 RxHelper.unmarshaller ,我们也能将 Observable<Buffer> 重组为对象。 这个方法创建了一个 Rx.Observable.Operator(Rx 操作符)供重组操作使用:

Single<HttpClientResponse> request = client
  .rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
  .flatMap(HttpClientRequest::rxSend);
request.toObservable().
    flatMap(HttpClientResponse::toObservable).
    lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).
    forEach(
        pojo -> {
          // 处理 pojo
        }
    );

HTTP服务端请求

requestStream 方法对到达的每个请求都提供了回调:

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
  // 处理请求
});

HttpServerRequest 可以被适配为 Observable<Buffer>

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
  Observable<Buffer> observable = request.toObservable();
});

RxHelper.unmarshaller 方法可以用来解析 JSON 格式的请求, 并将其映射为对象:

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
  Observable<MyPojo> observable = request.
      toObservable().
      lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class));
});

WebSocket客户端

当 WebSocket 连接成功或失败时, rxWebSocket 方法对此提供了一次性的回调:

HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
    ws -> {
      // 使用 websocket
    },
    error -> {
      // 连接失败
    }
);

WebSocket 对象可以轻松地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Observable<Buffer> dataObs = socket.toObservable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);

WebSocket服务端

每当有新连接到达时, webSocketStream 方法都会提供一次回调:

Observable<ServerWebSocket> socketObservable = server.webSocketStream().toObservable();
socketObservable.subscribe(
    socket -> System.out.println("Web socket connect"),
    failure -> System.out.println("Should never be called"),
    () -> {
      System.out.println("Subscription ended or server closed");
    }
);

ServerWebSocket 对象可以轻松地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Observable<Buffer> dataObs = socket.toObservable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);