Vert.x RxJava
用于 RxJava2 的 Vert.x API
RxJava 是 JVM 上一个流行的库,用于组合异步的、 使用可观察序列的、基于事件的程序。
Vert.x 与 RxJava 集成起来很自然: 它使得无论什么时候,只要我们能使用流和异步结果,就能使用 Observable。
在 RxJava2 中使用 Vert.x API
要在 RxJava2 中使用 Vert.x API,请将以下依赖项添加到构建描述符的 dependencies 部分中:
-
Maven(在您的
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
compile 'io.vertx:vertx-rx-java2:4.0.3'
要使用 Vert.x 的 RxJava2 API,有两种方式:
-
通过原始的 Vert.x API 辅以辅助类, 这些辅助类提供了提供了用于 Vert.x Core API 和 RxJava2 API 之间互相转化的静态方法。
-
通过基于 Vert.x Core API 增强的 Rx化的 Vert.x API。
可读流支持
RxJava 中 Flowable
的概念和 Vert.x 中 ReadStream
类是一对完美的匹配:都提供了一个对象流。
静态方法 FlowableHelper.toFlowable
用于将
Vert.x 可读流转换为 Flowable
:
FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
而 Rx化的 Vert.x API 在 ReadStream
类上提供了
toFlowable
方法:
FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = file.toFlowable();
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
这样的 Flowable 是所谓 hot Flowable,即不管是否有订阅,它们都会产生通知。
ReadStream
是否能自发地发射数据,这取决于它的具体实现:
当订阅动作发生时,适配器会调用 handler
来设置它的 handler 。
某些 ReadStream
实现会在这个调用之后开始发射事件,而其他的则与
handler 是否设置无关:
-
AsyncFile
在 handler 设置后开始产生 buffer 事件 -
HttpServerRequest
则不依赖于此(即 如果 handler 未设置,buffer 可能会丢失)
在上述两种情形中,订阅 Flowable
都是安全的。原因在于不管 event loop 还是 worker
verticle 都不会被并发执行,所以订阅一定是在 handler
开始发射数据之前发生。
当你想延迟订阅时,需要先 暂停(pause)
ReadStream
,并在之后 恢复(resume)
它,
这与使用 ReadStream
一样。
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
// 暂停接收 buffer
request.pause();
checkAuth(res -> {
// 现在可以重新接收 buffer
request.resume();
if (res.succeeded()) {
Flowable<Buffer> flowable = request.toFlowable();
flowable.subscribe(buff -> {
// 获得 buffer
});
}
});
}
});
同样的,将一个 Flowable
转变为 Vert.x ReadStream
也是可以的。
静态方法 FlowableHelper.toReadStream
用于将
Flowable
转换为 Vert.x 可读流:
Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
Pump pump = Pump.pump(readStream, response);
pump.start();
可写流支持
WriteStream
类似于 org.reactivestreams.Subscriber
,它会消费数据,并且在消费速度无法跟上生产速度时与生产者协作,以避免积压的情况不断增加。
Vert.x 提供了 WriteStreamSubscriber
适配器,它可以发送 Flowable
对象到任意 WriteStream
:
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.reactivex.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
提示
|
另外也存在用于非背压的 io.reactivex.Observable 使用的 io.vertx.reactivex.WriteStreamObserver 适配器。
The difference is that this adapter will send items to the WriteStream even when it can’t keep-up with the producer rate.
|
如果您使用 Rx化的 Vert.x API 进行编程,WriteStream
的实现提供了一个 toSubscriber
方法。
这样一来,上面的例子可以变得更直接明了:
response.setChunked(true);
flowable.subscribe(response.toSubscriber());
注意
|
当 Flowable 成功结束时,该适配器会调用 end 方法。
|
小心
|
该适配器会设置 WriteStream 的 drain 和 exception handler,所以订阅后请不要使用它们。
|
WriteStreamSubscriber
适配器在下述情况下会调用回调方法:
-
Flowable
错误地结束,或 -
WriteStream
失败(如 HTTP 连接被关闭,或文件系统已满),或 -
WriteStream
结束(即,所有写入已完成,且文件已关闭),或 -
WriteStream
错误地结束(即,所有写入已结束,当关闭文件时发生了错误)
这样不但可以设计更健壮的程序,而且可以在处理完流之后安排其他任务:
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
// 错误日志
}
});
subscriber.onWriteStreamError(throwable -> {
// 错误日志
});
subscriber.onWriteStreamEnd(() -> {
// 将事务结束记录到审计系统...
});
flowable.subscribe(subscriber);
注意
|
如果 WriteStream 失败,则该是配置取消订阅 org.reactivestreams.Subscription 。
|
异步结果支持
以一个现有的 Vert.x Handler<AsyncResult<T>>
对象为基础,你可以创建一个 RxJava Observer
,
并订阅它:
Handler<AsyncResult<String>> handler = getHandler();
// 订阅 Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();
// 订阅 Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();
// 订阅 Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));
Rx化的 Vert.x API 复制了类似的每一个方法,并冠以 rx
的前缀,它们都返回 RxJava 的 Single
、
Maybe
或 Completable
对象:
Single<HttpServer> single = vertx
.createHttpServer()
.rxListen(1234, "localhost");
// 订阅绑定端口的事件
single.
subscribe(
server -> {
// 服务器在监听
},
failure -> {
// 服务器无法启动
}
);
这样的 Single 是 “冷的”(cold) ,对应的 API 方法将在注册时被调用。
Maybe
对象可能有结果、也可能没有结果:
DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);
// 此处会返回一个 Maybe 对象,用于订阅实际执行反向DNS查询的结果
Maybe<String> maybe = client.rxReverseLookup(ipAddress);
// 订阅,以执行DNS查询
maybe.
subscribe(
name -> {
// DNS反向查询返回结果
},
failure -> {
// DNS反向查询失败
},
() -> {
// DNS反向查询没返回结果
}
);
Completable
一般对应 Handler<AsyncResult<Void>>
Completable single = server.rxClose();
// 订阅,以绑定服务端
single.
subscribe(
() -> {
// 服务端关闭
},
failure -> {
// 服务端关闭,但遇到故障
}
);
提示
|
如果您不能使用 Rx化的 Vert.x API,或您有自己的基于回调的异步方法,Vert.x 提供了下述的适配器: |
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
vertx.executeBlocking(fut -> fut.complete(invokeBlocking()), handler);
});
调度器支持
有时候 Reactive 扩展库需要执行一些可调度的操作,例如 Flowable#timer
方法将创建一个能周期性发射事件的定时器并返回之。缺省情况下,这些可调度的操作由 RxJava 管理,
这意味着定时器线程并非 Vert.x 线程,因此(这些操作)并不是在 Vert.x Event Loop 线程上执行的。
在 RxJava 中,有些操作通常会有接受一个 io.reactivex.Scheduler
参数的重载方法用于设定 Scheduler。
RxHelper
类提供了一个 RxHelper.scheduler
方法,其返回的调度器可供 RxJava
的这些方法使用。
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
对于阻塞型的可调度操作(blocking scheduled actions),我们可以通过 RxHelper.blockingScheduler
方法获得适用的调度器:
Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJava 也能被配置成使用 Vert.x 的调度器:
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
小心
|
RxJava使用 computation 表示非阻塞任务,使用 io 表示阻塞任务, 这与 Vert.x 术语相反 |
Rx化的 Vert.x API 在 RxHelper
类中也提供了相似的方法:
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
基于一个命名的工作线程池(named worker pool)创建调度器也是可以的, 如果你想为了调度阻塞操作复用特定的线程池,这将会很有帮助:
Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
JSON解码
FlowableHelper.unmarshaller
方法创建了一个 io.reactivex.rxjava2.FlowableOperator
对象,
它可以将json格式的 Flowable<Buffer>
转换为对象的 flowable
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// 处理对象
}
);
});
Rx化 的辅助类也能做同样的事情:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.compose(ObservableHelper.unmarshaller((MyPojo.class))).subscribe(
mypojo -> {
// 处理对象
}
);
});
部署Verticle
部署一个已经存在的 Verticle 实例可以使用 RxHelper.deployVerticle
方法,
它会部署一个 Verticle
并返回包含部署ID 的 Single<String>
。
Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);
deployment.subscribe(id -> {
// 部署成功
}, err -> {
// 部署失败
});
Rx化的 API
Rx化的 API 是 Vert.x API 的一个代码自动生成版本,就像 Vert.x 的 JavaScript 或 Groovy 版本一样。
这些 API 以 io.vertx.rxjava
为包名前缀,例如 io.vertx.core.Vertx
类对应为
Vertx
类。
作为 Verticle
通过继承 AbstractVerticle
类,它会做一些包装(您将获得一个 RxJava Verticle):
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public void start() {
// 在此可使用Rx化的Vert.x了
}
}
部署一个 RxJava Verticle 不需要特别的部署器,使用 Java 部署器即可。
支持异步启动的 Verticle 可以重写 rxStart
方法并返回一个 Completable
实例:
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public Completable rxStart() {
return vertx.createHttpServer()
.requestHandler(req -> req.response().end("Hello World"))
.rxListen()
.toCompletable();
}
}
API 例子
让我们通过研究一些例子来了解相关 API 吧。
EventBus 消息流
很自然地, MessageConsumer
类提供了相关的 Observable<Message<T>>
:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Disposable sub = observable.subscribe(msg -> {
// 获得消息
});
// 10秒后注销
vertx.setTimer(10000, id -> {
sub.dispose();
});
MessageConsumer
类提供了 Message
的流。
如果需要,还可以通过 body
方法获得消息体组成的新流:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();
RxJava 的 map/reduce 组合风格在这里是相当有用的:
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
定时器Timers
定时器任务可以通过 timerStream
方法来创建:
vertx.timerStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback after 1 second");
}
);
周期性的任务可以通过 periodicStream
方法来创建:
vertx.periodicStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback every second");
}
);
通过注销操作可以取消对 Observable 的订阅:
vertx.periodicStream(1000).
toObservable().
subscribe(new Observer<Long>() {
private Disposable sub;
public void onSubscribe(@NonNull Disposable d) {
sub = d;
}
public void onNext(Long aLong) {
// 回调
sub.dispose();
}
public void onError(Throwable e) {}
public void onComplete() {}
});
HTTP客户端请求
建议结合 RxJava 使用 Vert.x Web Client 。
HTTP服务端请求
requestStream
方法对到达的每个请求都提供了回调:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
// 处理请求
});
HttpServerRequest
可以被适配为 Observable<Buffer>
:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<Buffer> observable = request.toObservable();
});
ObservableHelper.unmarshaller
方法可以用来解析 JSON 格式的请求,
并将其映射为对象:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<MyPojo> observable = request.
toObservable().
compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(MyPojo.class));
});
WebSocket客户端
当 WebSocket 连接成功或失败时, rxWebSocket
方法对此提供了一次性的回调:
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
ws -> {
// 使用 websocket
},
error -> {
// 连接失败
}
);
WebSocket
对象可以轻松地转换为 Observable<Buffer>
:
socketObservable.subscribe(
socket -> {
Flowable<Buffer> dataObs = socket.toFlowable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
WebSocket服务端
每当有新连接到达时, webSocketStream
方法都会提供一次回调:
Observable<ServerWebSocket> socketObservable = server.webSocketStream().toObservable();
socketObservable.subscribe(
socket -> System.out.println("Web socket connect"),
failure -> System.out.println("Should never be called"),
() -> {
System.out.println("Subscription ended or server closed");
}
);
ServerWebSocket
对象可以轻松地转换为 Observable<Buffer>
:
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);