Vert.x RxJava

用于RxJava3的Vert.x API

RxJava 是一个流行的库, 旨在使用 Java VM 的可观察序列来编写异步和基于事件的程序。

Vert.x 与 RxJava 集成起来很自然:它使得无论什么时候,只要我们能使用流和异步结果,就能使用 RxJava。

在 RxJava3 中使用 Vert.x API

要在 RxJava3 中使用 Vert.x API,请将以下依赖项添加到构建描述符的 dependencies 部分中:

  • Maven (在您的 pom.xml 中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rx-java3</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

compile 'io.vertx:vertx-rx-java3:4.4.0'

有两种方式在 Vert.x 使用RxJava 3 的 API:

可读流支持

RxJava 中 Flowable 的概念和 Vert.x 中 ReadStream 类是一对完美的匹配:都提供了一个对象流。

FlowableHelper.toFlowable 这个静态方法可以把 Vert.x 可读流转换为 Flowable

FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

Rx化的 Vert.x API 在 ReadStream 类上提供了 toFlowable 方法:

FileSystem fs = vertx.fileSystem();
fs.rxOpen("/data.txt", new OpenOptions())
  .flatMapPublisher(file -> file.toFlowable())
  .subscribe(data -> System.out.println("Read data: " + data.toString("UTF-8")));

这样的 Flowable 是所谓 hot Flowable,即不管是否有订阅,它们都会产生通知。 ReadStream 是否能自发地发射数据,这取决于它的具体实现:

当订阅动作发生时,适配器会调用`handler` 来设置它的处理器(handler)。

某些 ReadStream 实现会在这个调用之后开始发射事件,而其他的则与 是否设置了处理器无关:

  • AsyncFile 在处理器设置后开始产生 buffer 事件

  • HttpServerRequest 则不依赖于此(即 如果处理器未设置,buffer 可能会丢失)

在上述两种情形中,订阅 Flowable 都是安全的。原因在于不管 event loop 还是 worker verticle 都不会被并发执行, 所以订阅一定是在处理器开始发射数据之前产生。

当您想延迟订阅时,需要先 暂停(pause) ReadStream ,并在之后 恢复(resume) 它, 这与使用 ReadStream 一样。

server.requestHandler(request -> {
  if (request.method() == HttpMethod.POST) {

    // 暂停接收 buffer
    request.pause();

    checkAuth(res -> {

      // 现在可以重新接收 buffer
      request.resume();

      if (res.succeeded()) {
        Flowable<Buffer> flowable = request.toFlowable();
        flowable.subscribe(buff -> {
          // Get buffers
        });
      }
    });
  }
});

同样的,将一个 Flowable 转变为 Vert.x ReadStream 也是可以的。

静态方法 FlowableHelper.toReadStream 用于将 Flowable 转换为 Vert.x 可读流:

Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
Pump pump = Pump.pump(readStream, response);
pump.start();

可写流支持

WriteStream 类似于 org.reactivestreams.Subscriber ,它会消费数据,并且在消费速度无法跟上生产速度时与生产者协作,以避免积压的情况不断增加。

Vert.x 提供了 WriteStreamSubscriber 适配器,它可以发送 Flowable 对象到任意 WriteStream:

将 buffer 发送到 HTTP 服务响应
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rxjava3.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
提示
另外也存在用于非背压的 io.reactivex.Observable 使用的 io.vertx.rxjava3.WriteStreamObserver 适配器。 不同之处在于,这个适配器将向 WriteStream 发送对象,即使它不能跟上生产者速率

如果您正在使用 RX化的 Vert.x API进行编程, WriteStream 的实现提供了一个 toSubscriber 方法。 这样一来,上面的例子可以变得更直接明了:

response.setChunked(true);
flowable.subscribe(response.toSubscriber());
注意
Flowable 成功结束时,该适配器会调用 end 方法。
小心
该适配器会设置 WriteStreamdrainexception handler,所以订阅后请不要使用它们。

WriteStreamSubscriber 适配器在下述情况下会调用回调方法:

  • Flowable 错误地结束,或

  • WriteStream 失败(如 HTTP 连接被关闭,或文件系统已满),或

  • the WriteStream 结束(即,所有写入已完成,且文件已关闭),或

  • the `WriteStream`错误地结束(即,所有写入已结束,当关闭文件时发生了错误)

这样不但可以设计更健壮的程序,而且可以在处理完流之后安排其他任务:

response.setChunked(true);

WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();

subscriber.onError(throwable -> {
  if (!response.headWritten() && response.closed()) {
    response.setStatusCode(500).end("oops");
  } else {
    // 错误日志
  }
});

subscriber.onWriteStreamError(throwable -> {
  // 错误日志
});

subscriber.onWriteStreamEnd(() -> {
  // 将事务结束记录到审计系统...
});

flowable.subscribe(subscriber);
注意
如果 WriteStream 失败, 适配器会取消订阅 org.reactivestreams.Subscription.

异步结果支持

以一个现有的 Vert.x Handler<AsyncResult<T>> 对象为基础,您可以创建一个 RxJava Observer, 并订阅它:

Handler<AsyncResult<String>> handler = getHandler();

// 订阅 Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();

// 订阅 Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();

// 订阅 Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));

Rx化的 Vert.x API 复制了类似的每一个方法,并冠以 rx 的前缀,它们都返回 RxJava 的 SingleMaybeCompletable 对象:

Single<HttpServer> single = vertx
  .createHttpServer()
  .rxListen(1234, "localhost");

// 订阅绑定端口的事件
single.
    subscribe(
        server -> {
          // 服务器在监听
        },
        failure -> {
          // 服务器无法启动
        }
    );

这样的 Single 是 “冷的”(cold) ,对应的 API 方法将在注册时被调用。

Maybe 对象可能有结果、也可能没有结果:

DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);

// 此处会返回一个 Maybe 对象,用于订阅实际执行反向DNS查询的结果
Maybe<String> maybe = client.rxReverseLookup(ipAddress);

// Subscribe to perform the lookup
maybe.
  subscribe(
    name -> {
      // DNS反向查询返回结果
    },
    failure -> {
      // Lookup failed
    },
    () -> {
      // DNS反向查询没返回结果
    }
  );

Completable 一般对应 Handler<AsyncResult<Void>>

Completable single = server.rxClose();

// 订阅服务器关闭事件
single.
  subscribe(
    () -> {
      // 服务器关闭
    },
    failure -> {
      // 服务器关闭,但是遇到问题
    }
  );
提示

如果您不能使用 Rx化的 Vert.x API,或您有自己的基于回调的异步方法,Vert.x 提供了下述的适配器:

适配 Vert.x core 的 executeBlocking 方法
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
  vertx.executeBlocking(fut -> fut.complete(invokeBlocking()), handler);
});

调度器支持

有时候 Reactive 扩展库需要执行一些可调度的操作,例如 Flowable#timer 方法将创建一个能周期性发射事件的定时器并返回之。默认情况下,这些可调度的操作由 RxJava 管理, 这意味着定时器线程并非 Vert.x 线程,因此(这些操作)并不在 Vert.x Event Loop 线程上执行。

当 Rxjava方法处理一个调度器的时候,它接受一个额外 io.reactivex.Scheduler 参数的重载方法, RxHelper.scheduler 方法返回的调度器可以供RxJava的 这些方法使用

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

对于阻塞型的可调度操作,我们可以通过 RxHelper.blockingScheduler 方法获得适用的调度器:

Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

RxJava 也能被配置成使用 Vert.x 的调度器:

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
小心
RxJava使用 computation 表示非阻塞任务,使用 io 表示阻塞任务, 这与 Vert.x 术语相反

Rx化的 Vert.x API 在 RxHelper 类中也提供了相似的方法:

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

也可以基于一个命名的工作线程池(named worker pool)创建调度器, 如果您想为了调度阻塞操作复用特定的线程池,这将会很有帮助:

Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

JSON解码

FlowableHelper.unmarshaller 方法创建了一个 io.reactivex.rxjava3.FlowableOperator 对象, 它可以将json格式的 Flowable<Buffer> 转换为对象的 flowable

fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // 处理对象
      }
  );
});

Rx化 的辅助类也能做同样的事情:

fileSystem
  .rxOpen("/data.txt", new OpenOptions())
  .flatMapObservable(file -> file.toObservable())
  .compose(ObservableHelper.unmarshaller((MyPojo.class)))
  .subscribe(mypojo -> {
    // 处理对象
  });

部署Verticle

部署一个已经存在的 Verticle 实例可以使用 RxHelper.deployVerticle 方法, 它会部署一个 Verticle 并返回包含部署ID 的 Single<String>

Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);

deployment.subscribe(id -> {
  // 部署
}, err -> {
  // 部署失败
});

Rx化的 API

Rx化的 API 是 Vert.x API 的一个代码自动生成版本,就像 Vert.x 的 JavaScriptGroovy 版本一样。 这些 API 以 io.vertx.rxjava 为包名前缀,例如 io.vertx.core.Vertx 类对应为 Vertx 类。

Rx化的API以两种方式提供Vert.x的异步方法

  • 把原始的方法转化为一个等效的RxJava风格的方法,其会返回一个带有缓存的立即订阅源(eager and cached subscription)

  • 一个 rx 前缀的派生方法,它在订阅的时候会调用原始的方法

// 直接写入
// 无需订阅
// 完成后提供异步结果
response.write(buffer);

// 写操作并没有发生
completion = response.rxWrite(buffer);

// 执行实际写操作
completion.subscribe(() -> ..., err -> ...);

您可以根据您的需要使用原始的方法或Rx化方法, 例如当您不想订阅或不关心结果,可以调用原始方法。

嵌入Rx化的 Vert.x

只需使用 Vertx.vertx 方法:

Vertx vertx = io.vertx.rxjava3.core.Vertx.vertx();

作为 Verticle

继承 AbstractVerticle 类, 它会做一些包装(您将获得一个 RxJava Verticle):

class MyVerticle extends AbstractVerticle {
  public void start() {
    // 在此可使用Rx化的Vert.x了
  }
}

部署一个 RxJava Verticle 不需要特别的部署器,使用 Java 部署器即可。

支持异步启动的 Verticle 可以重写 rxStart 方法并返回一个 Completable 实例:

class MyVerticle extends AbstractVerticle {
  public Completable rxStart() {
    return vertx.createHttpServer()
      .requestHandler(req -> req.response().end("Hello World"))
      .rxListen()
      .ignoreElement();
  }
}

API 例子

让我们通过研究一些例子来了解相关 API 吧。

EventBus 消息流

EventBus的 MessageConsumer 很自然地提供了 Observable<Message<T>>:

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<Message<String>> flowable = consumer.toFlowable();
Disposable sub = flowable.subscribe(msg -> {
  // 获得消息
});

// 10秒后注销
vertx.setTimer(10000, id -> {
  sub.dispose();
});

MessageConsumer 提供了一个 Message 的流. 如果有需要,可以通过 body 方法获取信息体组成的流

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<String> flowable = consumer.bodyStream().toFlowable();

RxJava 的 map/reduce 组合风格在这里是相当有用的:

Flowable<Double> flowable = vertx.eventBus().
    <Double>consumer("heat-sensor").
    bodyStream().
    toFlowable();

flowable.
    buffer(1, TimeUnit.SECONDS).
    map(samples -> samples.
        stream().
        collect(Collectors.averagingDouble(d -> d))).
    subscribe(heat -> {
      vertx.eventBus().send("news-feed", "Current heat is " + heat);
    });

定时器

可以通过 timerStream 创建定时器任务

vertx.timerStream(1000).
    toObservable().
    subscribe(
        id -> {
          System.out.println("Callback after 1 second");
        }
    );

可以通过 periodicStream 创建周期性任务

vertx.periodicStream(1000).
    toObservable().
    subscribe(
        id -> {
          System.out.println("Callback every second");
        }
    );

通过注销操作可以取消对 Observable 的订阅:

vertx.periodicStream(1000).
    toObservable().
    subscribe(new Observer<Long>() {
      private Disposable sub;
      public void onSubscribe(@NonNull Disposable d) {
        sub = d;
      }
      public void onNext(Long aLong) {
        // Callback
        sub.dispose();
      }
      public void onError(Throwable e) {}
      public void onComplete() {}
    });

HTTP客户端请求

您可以很轻松地使用http client发送请求和处理响应

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMap(request -> request
    .rxSend()
    .flatMap(response -> {
      if (response.statusCode() == 200) {
        return response.body();
      } else {
        return Single.error(new NoStackTraceThrowable("Invalid response"));
      }
    }))
  .subscribe(body -> {
    // 处理响应体
  });

当您需要处理大规模的响应流时,您可以通过http response获取 Flowable<Buffer>

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMapPublisher(request -> request
    .rxSend()
    .flatMapPublisher(response -> {
      if (response.statusCode() == 200) {
        return response.toFlowable();
      } else {
        return Flowable.error(new NoStackTraceThrowable("Invalid response"));
      }
    }))
  .subscribe(chunk -> {
    // Process the response chunks
  });

您也可以使用 Vert.x Web Client

HTTP服务端请求

requestStream 方法对到达的每个请求都提供了回调:

Flowable<HttpServerRequest> requestFlowable = server.requestStream().toFlowable();
requestFlowable.subscribe(request -> {
  // 处理请求
});

HttpServerRequest 可以适配为 Observable<Buffer>:

Flowable<HttpServerRequest> requestFlowable = server.requestStream().toFlowable();
requestFlowable.subscribe(request -> {
  Observable<Buffer> observable = request.toObservable();
});

ObservableHelper.unmarshaller 方法可以用来解析 JSON 格式的请求, 并将其映射为对象:

Flowable<HttpServerRequest> requestFlowable = server.requestStream().toFlowable();
requestFlowable.subscribe(request -> {
  Flowable<MyPojo> flowable = request.
    toFlowable().
    compose(FlowableHelper.unmarshaller(MyPojo.class));
});

WebSocket客户端

当 WebSocket 连接成功或失败时,webSocket 方法对此提供了一次性的回调:

HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
    ws -> {
      // 使用websocket
    },
    error -> {
      // 连接失败
    }
);

WebSocket 对象可以轻松地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);

WebSocket服务端

当有连接接入时, webSocketStream 为每个接入的连接提供了回调

Flowable<ServerWebSocket> socketFlowable = server.webSocketStream().toFlowable();
socketFlowable.subscribe(
    socket -> System.out.println("Web socket connect"),
    failure -> System.out.println("Should never be called"),
    () -> {
      System.out.println("Subscription ended or server closed");
    }
);

ServerWebSocket 对象可以轻松地转换为 Observable<Buffer>:

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);