集成Vert.x Reactive Streams

Reactive Streams 提供了一组标准接口, 适用于JVM上的非阻塞的带有背压的异步流处理。

这个库提供了基于Vert.x的reactive streams实现。

Vert.x提供了自己的机制来处理数据流,实现了一个流可以使用背压从另一个流中抽取数据。 相关的接口包括 io.vertx.core.streams.ReadStream, io.vertx.core.streams.WriteStreamio.vertx.core.streams.Pump 。 请参阅Vert.x core手册,以获取有关Vert.x stream的更多信息。

这个库提供了 read stream 和 write stream 的实现, 同时也是作为 reactive streams publishers 和 subscribers 存在。 这允许我们将任何 reactive streams publisher 或 subscriber 实例像 Vert.x read 或 write stream 一样处理。

使用Vert.x Reactive Streams

想使用Vert.x Reactive Streams,需要在你的构建脚本中加入以下依赖。

  • Maven (在 pom.xml 中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-reactive-streams</artifactId>
 <version>4.0.3</version>
</dependency>
  • Gradle (在 build.gradle 中):

compile io.vertx:vertx-reactive-streams:4.0.3

Reactive Read Stream

我们提供了Vert.x ReadStream 接口的实现,在 ReactiveReadStream 同时也实现了 reactive streams Subscriber

你可以将ReactiveReadStream实例传递给任何一个实现了reactive streams Publisher 接口的实例(比如Akka中的Publisher), 接着你就可以从上游读取数据,就像Vert.x中其他的 ReadStream。 (比如,使用 Pump 接口将其中的数据抽取到一个 WriteStream )。

这里有一个例子,给定一个实现了reactive streams的publisher(比如akka), 将它的数据抽取出来,作为http服务端响应的消息正文。 它将自己处理背压。

ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();

// Subscribe the read stream to the publisher
otherPublisher.subscribe(rrs);

// Pump from the read stream to the http response
Pump pump = Pump.pump(rrs, response);

pump.start();

Reactive Write Stream

我们同样也提供了Vert.x WriteStream 接口的实现, ReactiveWriteStream 。 它同样也实现了reactive streams Publisher 接口。 给定一个reactive streams Subscriber 接口的实例,你可以将数据写入其中,就像其他Vert.x WriteStream 的实现一样。 (比如使用 PumpReadStream 中抽取数据)。

可以像任何Vert.x read stream一样, 使用 pause, resume, and writeQueueFull 来处理背压。 这会在内部自动转换为传播背压的reactive streams方法。 (需要更多的元素)。

这里有一个例子,从其他reactive streams实现库订阅流,并抽取http响应的消息正文数据至subscriber。 这里会自动处理背压。

ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx);

// Subscribe the other subscriber to the write stream
rws.subscribe(otherSubscriber);

// Pump the http request to the write stream
Pump pump = Pump.pump(request, rws);

pump.start();