集成Vert.x Reactive Streams
Reactive Streams 提供了一组标准接口, 适用于JVM上的非阻塞的带有背压的异步流处理。
这个库提供了基于Vert.x的reactive streams实现。
Vert.x提供了自己的机制来处理数据流,实现了一个流可以使用背压从另一个流中抽取数据。
相关的接口包括 io.vertx.core.streams.ReadStream
, io.vertx.core.streams.WriteStream
和 io.vertx.core.streams.Pump
。
请参阅Vert.x core手册,以获取有关Vert.x stream的更多信息。
这个库提供了 read stream 和 write stream 的实现, 同时也是作为 reactive streams publishers 和 subscribers 存在。 这允许我们将任何 reactive streams publisher 或 subscriber 实例像 Vert.x read 或 write stream 一样处理。
使用Vert.x Reactive Streams
想使用Vert.x Reactive Streams,需要在你的构建脚本中加入以下依赖。
-
Maven (在
pom.xml
中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-reactive-streams</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle (在
build.gradle
中):
compile io.vertx:vertx-reactive-streams:4.3.8
Reactive Read Stream
我们提供了Vert.x ReadStream
接口的实现,在 ReactiveReadStream
同时也实现了 reactive streams Subscriber
。
你可以将ReactiveReadStream实例传递给任何一个实现了reactive streams Publisher
接口的实例(比如Akka中的Publisher),
接着你就可以从上游读取数据,就像Vert.x中其他的 ReadStream
。
(比如,使用 Pump
接口将其中的数据抽取到一个 WriteStream
)。
这里有一个例子,给定一个实现了reactive streams的publisher(比如akka), 将它的数据抽取出来,作为http服务端响应的消息正文。 它将自己处理背压。
ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();
// Subscribe the read stream to the publisher
otherPublisher.subscribe(rrs);
// Pump from the read stream to the http response
Pump pump = Pump.pump(rrs, response);
pump.start();
Reactive Write Stream
我们同样也提供了Vert.x WriteStream
接口的实现,
ReactiveWriteStream
。
它同样也实现了reactive streams Publisher
接口。
给定一个reactive streams Subscriber
接口的实例,你可以将数据写入其中,就像其他Vert.x WriteStream
的实现一样。
(比如使用 Pump
从 ReadStream
中抽取数据)。
可以像任何Vert.x read stream一样,
使用 pause
, resume
, and writeQueueFull
来处理背压。
这会在内部自动转换为传播背压的reactive streams方法。
(需要更多的元素)。
这里有一个例子,从其他reactive streams实现库订阅流,并抽取http响应的消息正文数据至subscriber。 这里会自动处理背压。
ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx);
// Subscribe the other subscriber to the write stream
rws.subscribe(otherSubscriber);
// Pump the http request to the write stream
Pump pump = Pump.pump(request, rws);
pump.start();