public class ReadStreamSubscriber<R,J> extends Subscriber<R> implements ReadStream<J>
Subscriber
that turns an Observable
into a ReadStream
.
The stream implements the pause()
and resume()
operation by maintaining
a buffer of BUFFER_SIZE
elements between the Observable
and the ReadStream
.
When the subscriber is created it requests 0
elements to activate the subscriber's back-pressure.
Setting the handler initially on the ReadStream
triggers a request of BUFFER_SIZE
elements.
When the item buffer is half empty, new elements are requested to fill the buffer back to BUFFER_SIZE
elements.
The #endHandler(Handler
is called when the Observable
is completed or has failed and
no pending elements, emitted before the completion or failure, are still in the buffer, i.e the handler
is not called when the stream is paused.
Modifier and Type | Field and Description |
---|---|
static int |
BUFFER_SIZE |
Constructor and Description |
---|
ReadStreamSubscriber(java.util.function.Function<R,J> adapter,
java.util.function.Consumer<Subscriber<R>> doSubscribe) |
Modifier and Type | Method and Description |
---|---|
static <R,J> ReadStream<J> |
asReadStream(Observable<R> observable,
java.util.function.Function<R,J> adapter) |
ReadStream<J> |
endHandler(Handler<Void> handler)
Set an end handler.
|
ReadStream<J> |
exceptionHandler(Handler<Throwable> handler)
Set an exception handler on the read stream.
|
ReadStream<J> |
fetch(long amount)
Fetch the specified
amount of elements. |
ReadStream<J> |
handler(Handler<J> handler)
Set a data handler.
|
void |
onCompleted() |
void |
onError(Throwable e) |
void |
onNext(R item) |
ReadStream<J> |
pause()
Pause the
ReadStream , it sets the buffer in fetch mode and clears the actual demand. |
ReadStream<J> |
resume()
Resume reading, and sets the buffer in
flowing mode. |
add, isUnsubscribed, onStart, request, setProducer, unsubscribe
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
pipe, pipeTo, pipeTo
public static final int BUFFER_SIZE
public ReadStreamSubscriber(java.util.function.Function<R,J> adapter, java.util.function.Consumer<Subscriber<R>> doSubscribe)
public static <R,J> ReadStream<J> asReadStream(Observable<R> observable, java.util.function.Function<R,J> adapter)
public ReadStream<J> handler(Handler<J> handler)
ReadStream
handler
in interface ReadStream<J>
public ReadStream<J> pause()
ReadStream
ReadStream
, it sets the buffer in fetch
mode and clears the actual demand.
While it's paused, no data will be sent to the data handler
.
pause
in interface ReadStream<J>
public ReadStream<J> fetch(long amount)
ReadStream
amount
of elements. If the ReadStream
has been paused, reading will
recommence with the specified amount
of items, otherwise the specified amount
will
be added to the current stream demand.fetch
in interface ReadStream<J>
public ReadStream<J> resume()
ReadStream
flowing
mode.
If the ReadStream
has been paused, reading will recommence on it.resume
in interface ReadStream<J>
public ReadStream<J> endHandler(Handler<Void> handler)
ReadStream
endHandler
in interface ReadStream<J>
public ReadStream<J> exceptionHandler(Handler<Throwable> handler)
ReadStream
exceptionHandler
in interface ReadStream<J>
exceptionHandler
in interface StreamBase
handler
- the exception handlerpublic void onCompleted()
onCompleted
in interface Observer<R>
Copyright © 2023 Eclipse. All rights reserved.