Streams
The VLINGO XOOM platform Reactive Streams built on XOOM Actors.
Last updated
The VLINGO XOOM platform Reactive Streams built on XOOM Actors.
Last updated
The XOOM Streams component implements the Reactive Streams specification for the VLINGO XOOM platform. It is fully based on XOOM Actors to provide reactive concurrency for stream processing.
There are four abstractions used by the Reactive Streams specification, which are entirely implemented in XOOM Streams.
It is possible that you will never implement a Publisher
, Subscriber
, Subscription
, or Processor
yourself. Default implementations of these are provided by XOOM Streams. You will, instead implement Source
and Sink
types.
The Subscriber
requests a Subscription
from a Publisher
and specifies the number of elements it can accept. By specifying the number of elements it can accept, backpressure is enforced on the Publisher
so that it does not overwhelm the Subscriber
.
The following is an example of how to create a Publisher
that produces a sequenced series of Long
values and a Subscriber
with a Sink
that prints those as String
values.
With no further code the above produces the following output.
Similarly a Processor
is created, which is both a Subscriber
to an upstream Publisher
and a Publisher
to an downstream Subscriber
.
The following example reuses the Publisher
and Subscriber
from the previous example, but injects the Processor
in between the two, where the Processor
transforms the Long
values to Double
values.
The above produces this output.
A Source
is the source of a stream of elements relayed by a Publisher
. A Sink
is the destination of the elements provided to a Subscriber
by its Publisher
. Next are ways that these may be used.
Source
You may reuse predefined Source
types, but you will also develop your own implementations. This is the Source
protocol as provided by XOOM Streams.
The job of a Source
is to take requests for the next elements in a stream, which are returned as a Completes<Elements<T>>
. This means that the Elements<T>
are completed at some future time. See Completes<T>
for details on use.
The isSlow()
protocol answers a Completes<Boolean>
indicating whether the Source
will tend to be slow in providing next elements. The following demonstrates how you may answer.
Of course, answering true
or false
accurately is vitally important. If your Source is slow, stating so enables the Publisher
to decide on the kind of Scheduler
to use between probes for next elements. A slow Source
will be managed by repeated schedule-once timer intervals, while a fast Source
will be managed by a consistently schedule-many repeating time interval.
The following is an example of a fast Source
that lazily (not all preallocated in a List
) provides a range of Long
values.
See the Javadocs on Source
for full API explanations.
There are factory methods available to create Source
instances, and default implementation types of which these produce instances.
The number of these and the backing Source
types will grow over future releases.
Sink
You may reuse predefined Sink
types, but you will develop your own implementations. This is the Sink
protocol as provided by XOOM Streams.
The ready()
indicates that the Sink
should become prepared to handle incoming values. The terminate()
indicates that the Sink
is being terminated and will no longer receive values. The whenValue(T value)
is used to provide the next available value from the Subscriber
.
The following is an example of a Sink
that prints received values.
See the Javadocs on Sink
for full API explanations.
There are factory methods available to create Sink
instances, and default implementation types of which these produce instances.
The number of these and the backing Sink
types will grow over future releases.
Stream
Some functions in the VLINGO XOOM platform, such as queries provided by XOOM Symbio, answer a Completes<Stream>
. The Stream
that is eventually available inside the Completes
is used to consume the elements of streaming data.
The flowInto()
overrides are used to start the stream flowing into a given Sink<S>
. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for flowElementsRate
. If no parameter is provided, the default is 100
.
To indicate the number of milliseconds between probes for Source<T>
elements, pass probeInterval
. The default if not explicitly provided by the client is 5
milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the Source<S>
and the speed at which the Source<T>
is probed by the StreamPublisher
to retrieve that data.
The request()
operation may be used to change the flow rate after the initial rate has been established. The stop()
method may be used to completely terminate the flow of elements from the Source<T>
and Publisher<T>
. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the Sink<S>
and thus will not be prevented from arriving. If no parameter is provided, the default is 5
.
For examples of how Stream
is used, see Streaming Persistent Data.
In XOOM Streams an operator is used to perform a specific kind of data transformation. When using a StreamProcessor
you will probably want to transform the StreamSubscriber
side of the processor's incoming data to another type that is outgoing through its StreamPublisher
side. You can use an operator to do that.
The Operator<T, R>
takes type T
as an input value
and provides type R
as output. Output is delivered to a Consumer<R>
of type R
, which means the R
value is produced before the Consumer<R>
receives it. This is where the implementation of the Operator<T, R>
plays in. Note that there are currently three factory methods provided on the Operator
interface:
Thus, there are three basic kinds of operators, a filter, a flat-mapper, and a mapper.
Here is an example filter:
In this example (broken down into multiple steps for clarity) a filter
is created that filters in all String
instances that contain the substring "1"
, and filters out all others. Then the List<String>
of six elements is iterated over and the filter's performInto()
operation is used. If the filter Predicate<T>
of s.contains("1")
is satisfied, the match is added to the List<String>
of results
, which then contains elements "321"
, "123"
, and "214"
.
Next is an example of a mapper:
In this example (broken down into multiple steps for clarity) a mapper
is created that maps all String
instances of digit characters to Integer
numbers. The List<String>
of three elements is iterated over and the mapper's performInto()
operation is used. The new value is added to the List<Integer>
of results
, which then contains elements 123
, 456
, and 789
.
The following demonstrates how a mapper can use flat-map (but it's easier than this; see below.):
Note that there is a List<List<String>> lists
that is a list of lists. The mapper
is created that internally uses the Java Stream::flatMap
. The mapper
first streams over the List<List<String>> lists
. It then uses flatMap
to handle each of the single List<String> list
. It then streams over each of those lists, and each individual String
is transformed to an Integer
.
Yet, you can perform this more easily using the FlatMapper
operator. Here's a FlatMapper
solution to a different problem:
In this example, the flat-mapper is given a function that instantiates a LongRangeSource
, which is a stream source that contains a startInclusive
and an endExclusive
value. In the first performInto()
the startInclusive
is 1
and the endExclusive
of two greater, which is 3
; the first numeric range is thus 1
to 2
. The second range is 3
to 4
. The flat-mapper reads every value in each LongRangeSource
and adds it to the collection referenced by providedLongs
. When completed the two containers are flattened such that only the inner ranges are preserved in the providedLongs
.
You can give a StreamProcessor
an Operator<T, R>
such as these, or use them in your Source
and Sink
implementations.
We will be adding more functional interfaces to our various types over future releases.
There are several ways to stream your data. You have already seen specific examples in the above content. In this section you are introduced to other ways.
Data that is persisted inside a XOOM Streams store can be streamed out to one or more subscribers. One of the most basic examples is provided by the StateStore
.
A StateStoreReader
is provided as part of the StateStore
protocol. Thus, you may ask a StateStore
to stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a stateType
is a Java Class<?>
of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.
By requesting a stream of all of a given stateType
, the StateStoreReader
queries that data from the container and provides an instance of io.vlingo.xoom.reactivestreams.Stream
.
You may also constrain your query to a subset of the whole container. The query is storage-type dependent.
You may also stream persisted Source<?>
types, such as DomainEvent
and Command
instances. For example, the Journal<T>
provides this streaming interface through the JournalReader<T>
, which in turn is anEntryReader<T>
.
These are used in the following example, first to request the JournalReader
, then the Stream
of all EntryBundle
instances, and then to flow the stream into the Sink<EntryBundle>
for reporting on each event.
All stores—ObjectStore
, StateStore
, and Journal
—support persisting a totally ordered stream of Source<?>
entries, such as DomainEvent
and Command
. You may obtain any of the total streams using the same basic techniques.