Streams
The VLINGO XOOM platform Reactive Streams built on XOOM Actors.
The XOOM Streams component implements the Reactive Streams specification for the VLINGO XOOM platform. It is fully based on XOOM Actors to provide reactive concurrency for stream processing.
There are four abstractions used by the Reactive Streams specification, which are entirely implemented in XOOM Streams.
Protocol | Description |
Processor | Both a Subscriber and a Publisher , first receiving a stream of elements and then publishing those following a probable transformation. This is perfect for defining streaming pipelines. |
Publisher | A provider of a potentially unbounded number of sequenced elements, pushing them according to the demand received from its one or more Subscriber interests. |
Subscriber | Receives signals from the Publisher to which it is subscribed. The signals indicate that a Subscription has been granted, that a next element is available, that an error has occurred, and if the end of stream is reached, that the stream is completed. |
Subscription | A reference to the contract between Publisher and Subscriber , which is used by the Subscriber to request 1 to N sequenced elements from the Publisher , and to cancel its subscription. The value of N may be effectively unbounded, which is indicated by requesting Long.MAX_VALUE . |

The four abstractions of Reactive Streams.
It is possible that you will never implement a
Publisher
, Subscriber
, Subscription
, or Processor
yourself. Default implementations of these are provided by XOOM Streams. You will, instead implement Source
and Sink
types.The
Subscriber
requests a Subscription
from a Publisher
and specifies the number of elements it can accept. By specifying the number of elements it can accept, backpressure is enforced on the Publisher
so that it does not overwhelm the Subscriber
.
A Subscriber requests N elements from the Publisher, creating a backpressure contract.
The following is an example of how to create a
Publisher
that produces a sequenced series of Long
values and a Subscriber
with a Sink
that prints those as String
values.final long max = 10;
final Publisher publisher =
world.actorFor(
Publisher.class,
StreamPublisher.class,
Source.rangeOf(1, max + 1),
PublisherConfiguration.defaultDropHead());
final Subscriber subscriber =
world.actorFor(
Subscriber.class,
StreamSubscriber.class,
Sink.printToStdout("> "),
max);
publisher.subscribe(subscriber);
With no further code the above produces the following output.
// RESULTS
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
Similarly a
Processor
is created, which is both a Subscriber
to an upstream Publisher
and a Publisher
to an downstream Subscriber
. 
A Processor is a Subscriber on the left side and a Publisher on the right side.
The following example reuses the
Publisher
and Subscriber
from the previous example, but injects the Processor
in between the two, where the Processor
transforms the Long
values to Double
values.final LongToDoubleMapper transformer = new LongToDoubleMapper();
final Processor<String,Integer> processor =
world.actorFor(
Processor.class,
StreamProcessor.class,
transformer,
10,
PublisherConfiguration.defaultDropHead());
processor.subscribe(subscriber);
publisher.subscribe(processor);
The above produces this output.
// RESULTS
> 1.0
> 2.0
> 3.0
> 4.0
> 5.0
> 6.0
> 7.0
> 8.0
> 9.0
> 10.0
A
Source
is the source of a stream of elements relayed by a Publisher
. A Sink
is the destination of the elements provided to a Subscriber
by its Publisher
. Next are ways that these may be used.You may reuse predefined
Source
types, but you will also develop your own implementations. This is the Source
protocol as provided by XOOM Streams.package io.vlingo.xoom.reactivestreams;
public interface Source<T> {
Completes<Elements<T>> next();
Completes<Elements<T>> next(final int maximumElements);
Completes<Elements<T>> next(final long index);
Completes<Elements<T>> next(final long index, final int maximumElements);
Completes<Boolean> isSlow();
}
The job of a
Source
is to take requests for the next elements in a stream, which are returned as a Completes<Elements<T>>
. This means that the Elements<T>
are completed at some future time. See Completes<T>
for details on use.The
isSlow()
protocol answers a Completes<Boolean>
indicating whether the Source
will tend to be slow in providing next elements. The following demonstrates how you may answer.@Override
public Completes<Boolean> isSlow() {
return Completes.withSuccess(false);
}
Of course, answering
true
or false
accurately is vitally important. If your Source is slow, stating so enables the Publisher
to decide on the kind of Scheduler
to use between probes for next elements. A slow Source
will be managed by repeated schedule-once timer intervals, while a fast Source
will be managed by a consistently schedule-many repeating time interval.The following is an example of a fast
Source
that lazily (not all preallocated in a List
) provides a range of Long
values.package io.vlingo.xoom.reactivestreams.source;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.reactivestreams.Elements;
import io.vlingo.xoom.reactivestreams.Source;
public class LongRangeSource implements Source<Long> {
private long current;
public final long endExclusive;
public final long startInclusive;
public LongRangeSource(final long startInclusive, final long endExclusive) {
assert(startInclusive <= endExclusive);
assert(startInclusive >= 0 && startInclusive <= Long.MAX_VALUE);
this.startInclusive = startInclusive;
assert(endExclusive >= 0 && endExclusive <= Long.MAX_VALUE);
this.endExclusive = endExclusive;
this.current = startInclusive;
}
@Override
public Completes<Elements<Long>> next() {
if (current < endExclusive) {
final Long[] element = new Long[1];
element[0] = current++;
return Completes.withSuccess(new Elements<>(element, false));
}
return Completes.withSuccess(new Elements<>(new Long[0], true));
}
@Override
public Completes<Elements<Long>> next(final int maximumElements) {
return next();
}
@Override
public Completes<Elements<Long>> next(long index) {
return next();
}
@Override
public Completes<Elements<Long>> next(final long index, final int maximumElements) {
return next();
}
@Override
public Completes<Boolean> isSlow() {
return Completes.withSuccess(false);
}
@Override
public String toString() {
return "LongRangeSource [startInclusive=" + startInclusive +
" endExclusive=" + endExclusive + " current=" + current + "]";
}
}
There are factory methods available to create
Source
instances, and default implementation types of which these produce instances.public interface Source<T> {
static <T> Source<T> empty() ...
static <T> Source<T> only(final T... elements) ...
static Source<Long> rangeOf(final long startInclusive, final long endExclusive) ...
static <T> Source<T> with(final Iterable<T> iterable) ...
static <T> Source<T> with(final Iterable<T> iterable, final boolean slowIterable) ...
static <T> Source<T> with(final Supplier<T> supplier) ...
static <T> Source<T> with(final Supplier<T> supplier, final boolean slowSupplier) ...
}
The number of these and the backing
Source
types will grow over future releases.You may reuse predefined
Sink
types, but you will develop your own implementations. This is the Sink
protocol as provided by XOOM Streams.public interface Sink<T> {
void ready();
void terminate();
void whenValue(final T value);
}
The
ready()
indicates that the Sink
should become prepared to handle incoming values. The terminate()
indicates that the Sink
is being terminated and will no longer receive values. The whenValue(T value)
is used to provide the next available value from the Subscriber
.The following is an example of a
Sink
that prints received values.package io.vlingo.xoom.reactivestreams.sink;
import java.io.PrintStream;
import io.vlingo.xoom.reactivestreams.Sink;
public class PrintSink<T> implements Sink<T> {
private final PrintStream printStream;
private final String prefix;
private boolean terminated;
public PrintSink(final PrintStream printStream, final String prefix) {
this.printStream = printStream;
this.prefix = prefix;
this.terminated = false;
}
@Override
public void ready() {
// ignored
}
@Override
public void terminate() {
terminated = true;
}
@Override
public void whenValue(final T value) {
if (!terminated) {
printStream.println(prefix + value.toString());
}
}
@Override
public String toString() {
return "PrintSink[terminated=" + terminated + "]";
}
}
There are factory methods available to create
Sink
instances, and default implementation types of which these produce instances.public interface Sink<T> {
static <T> Sink<T> consumeWith(final Consumer<T> consumer) ...
static <T> Sink<T> printToStdout(final String prefix) ...
static <T> Sink<T> printToStderr(final String prefix) ...
static <T> Sink<T> printTo(final PrintStream printStream, final String prefix) ...
}
The number of these and the backing
Sink
types will grow over future releases.Some functions in the VLINGO XOOM platform, such as queries provided by XOOM Symbio, answer a
Completes<Stream>
. The Stream
that is eventually available inside the Completes
is used to consume the elements of streaming data.package io.vlingo.xoom.reactivestreams;
public interface Stream {
<S> void flowInto(final Sink<S> sink);
<S> void flowInto(final Sink<S> sink, final long flowElementsRate);
<S> void flowInto(final Sink<S> sink, final long flowElementsRate, final int probeInterval);
void request(final long flowElementsRate);
void stop();
}
The
flowInto()
overrides are used to start the stream flowing into a given Sink<S>
. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for flowElementsRate
. If no parameter is provided, the default is 100
.public static final long DefaultFlowRate = 100;
To indicate the number of milliseconds between probes for
Source<T>
elements, pass probeInterval
. The default if not explicitly provided by the client is 5
milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the Source<S>
and the speed at which the Source<T>
is probed by the StreamPublisher
to retrieve that data.// 5 milliseconds
public static final int DefaultProbeInterval = PublisherConfiguration.DefaultProbeInterval;
// 2 milliseconds
public static int FastProbeInterval = PublisherConfiguration.FastProbeInterval;
// 1 millisecond
public static int FastestProbeInterval = PublisherConfiguration.FastestProbeInterval;
The
request()
operation may be used to change the flow rate after the initial rate has been established. The stop()
method may be used to completely terminate the flow of elements from the Source<T>
and Publisher<T>
. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the Sink<S>
and thus will not be prevented from arriving. If no parameter is provided, the default is 5
.In XOOM Streams an operator is used to perform a specific kind of data transformation. When using a
StreamProcessor
you will probably want to transform the StreamSubscriber
side of the processor's incoming data to another type that is outgoing through its StreamPublisher
side. You can use an operator to do that.package io.vlingo.xoom.reactivestreams;
public interface Operator<T,R> {
void performInto(final T value, final Consumer<R> consumer);
}
The
Operator<T, R>
takes type T
as an input value
and provides type R
as output. Output is delivered to a Consumer<R>
of type R
, which means the R
value is produced before the Consumer<R>
receives it. This is where the implementation of the Operator<T, R>
plays in. Note that there are currently three factory methods provided on the Operator
interface:package io.vlingo.xoom.reactivestreams;
public interface Operator<T,R> {
static <T> Operator<T,T> filterWith(final Predicate<T> filter) {
return new Filter<>(filter);
}
static <T,R> Operator<T,R> flatMapper(final Function<T, Source<R>> mapper) {
return new FlatMapper<T,R>(mapper);
}
static <T,R> Operator<T,R> mapWith(final Function<T,R> mapper) {
return new Mapper<>(mapper);
}
...
}
Thus, there are three basic kinds of operators, a filter, a flat-mapper, and a mapper.
Factory | Operator Description |
Filter | Produces the same type that it takes as input, but may produce less output than it receives as input. |
Mapper | Produces a different type than it takes as input, as it is responsible to map/transform the input to another type. Actually the mapper may produce the same type of output as the input, but perhaps the data inside the type has been enriched or restricted in some way. |
FlatMapper | Works like Mapper , but also flattens the stream by reducing the inner structure to the single one produced by the mapping Function . |
Here is an example filter:
final List<String> list =
Arrays.asList("ABC", "321", "123", "456", "DEF", "214");
final List<String> results = new ArrayList<>();
final Operator<String,String> filter =
Operator.filterWith((s) -> s.contains("1"));
list.forEach(possible ->
filter.performInto(possible, (match) -> results.add(match)));
Assert.assertEquals(3, results.size());
Assert.assertEquals("321", results.get(0));
Assert.assertEquals("123", results.get(1));
Assert.assertEquals("214", results.get(2));
In this example (broken down into multiple steps for clarity) a
filter
is created that filters in all String
instances that contain the substring "1"
, and filters out all others. Then the List<String>
of six elements is iterated over and the filter's performInto()
operation is used. If the filter Predicate<T>
of s.contains("1")
is satisfied, the match is added to the List<String>
of results
, which then contains elements "321"
, "123"
, and "214"
.Next is an example of a mapper:
final List<String> list = Arrays.asList("123", "456", "789");
final List<Integer> results = new ArrayList<>();
final Operator<String,Integer> mapper =
Operator.mapWith((s) -> Integer.parseInt(s));
list.forEach(digits ->
mapper.performInto(digits, (number) -> results.add(number)));
Assert.assertEquals(3, results.size());
Assert.assertEquals(123, (int) results.get(0));
Assert.assertEquals(456, (int) results.get(1));
Assert.assertEquals(789, (int) results.get(2));
In this example (broken down into multiple steps for clarity) a
mapper
is created that maps all String
instances of digit characters to Integer
numbers. The List<String>
of three elements is iterated over and the mapper's performInto()
operation is used. The new value is added to the List<Integer>
of results
, which then contains elements 123
, 456
, and 789
.The following demonstrates how a mapper can use flat-map (but it's easier than this; see below.):
final List<String> list1 = Arrays.asList("1", "2", "3");
final List<String> list2 = Arrays.asList("4", "5", "6");
final List<String> list3 = Arrays.asList("7", "8", "9");
final List<List<String>> lists = Arrays.asList(list1, list2, list3);
final List<Integer> results = new ArrayList<>();
final Function<List<List<String>>, List<Integer>> mapper =
(los) -> los.stream()
.flatMap(list -> list.stream().map(s -> Integer.parseInt(s)))
.collect(Collectors.toList());
final Operator<List<List<String>>,List<Integer>> flatMapper =
Operator.mapWith(mapper);
flatMapper.performInto(lists, (numbers) -> results.addAll(numbers));
Assert.assertEquals(9, results.size());
Assert.assertEquals(1, (int) results.get(0));
Assert.assertEquals(2, (int) results.get(1));
Assert.assertEquals(3, (int) results.get(2));
Note that there is a
List<List<String>> lists
that is a list of lists. The mapper
is created that internally uses the Java Stream::flatMap
. The mapper
first streams over the List<List<String>> lists
. It then uses flatMap
to handle each of the single List<String> list
. It then streams over each of those lists, and each individual String
is transformed to an Integer
.Yet, you can perform this more easily using the
FlatMapper
operator. Here's a FlatMapper
solution to a different problem:final Operator<Long, Long> flatMap = Operator.flatMapWith((record) -> new LongRangeSource(record, record + 2));
final List<Long> providedLongs = new CopyOnWriteArrayList<>();
flatMap.performInto(1L, providedLongs::add);
flatMap.performInto(3L, providedLongs::add);
Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L), providedLongs);
In this example, the flat-mapper is given a function that instantiates a
LongRangeSource
, which is a stream source that contains a startInclusive
and an endExclusive
value. In the first performInto()
the startInclusive
is 1
and the endExclusive
of two greater, which is 3
; the first numeric range is thus 1
to 2
. The second range is 3
to 4
. The flat-mapper reads every value in each LongRangeSource
and adds it to the collection referenced by providedLongs
. When completed the two containers are flattened such that only the inner ranges are preserved in the providedLongs
.You can give a
StreamProcessor
an Operator<T, R>
such as these, or use them in your Source
and Sink
implementations.We will be adding more functional interfaces to our various types over future releases.
There are several ways to stream your data. You have already seen specific examples in the above content. In this section you are introduced to other ways.
Data that is persisted inside a XOOM Streams store can be streamed out to one or more subscribers. One of the most basic examples is provided by the
StateStore
.package io.vlingo.symbio.store.state;
public interface StateStoreReader {
...
Completes<Stream> streamAllOf(final Class<?> stateType);
Completes<Stream> streamSomeUsing(final QueryExpression query);
}
A
StateStoreReader
is provided as part of the StateStore
protocol. Thus, you may ask a StateStore
to stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a stateType
is a Java Class<?>
of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.By requesting a stream of all of a given
stateType
, the StateStoreReader
queries that data from the container and provides an instance of io.vlingo.xoom.reactivestreams.Stream
.// somewhere many EquityState instances are written
final Equity equity = new Equity(...);
...
store.write(equity.id, equity.state, equity.version, interest);
...
final Completes<Stream> stream =
store.streamAllOf(EquityState.class);
stream.andThen(all ->
all.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
You may also constrain your query to a subset of the whole container. The query is storage-type dependent.
final Completes<Stream> stream =
store.streamSomeUsing(
QueryExpression.using(
EquityState.class,
"select ... from tbl_equity where ..."));
stream.andThen(some ->
some.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
You may also stream persisted
Source<?>
types, such as DomainEvent
and Command
instances. For example, the Journal<T>
provides this streaming interface through the JournalReader<T>
, which in turn is anEntryReader<T>
.// Journal
package io.vlingo.symbio.store.journal;
public interface Journal<T> {
...
<ET extends Entry<?>> Completes<JournalReader<ET>>
journalReader(final String name);
}
// JournalReader
package io.vlingo.symbio.store.journal;
public interface JournalReader<T extends Entry<?>> extends EntryReader<T> { }
// EntryReader
package io.vlingo.symbio.store.journal;
public interface EntryReader<T extends Entry<?>> {
...
Completes<Stream> streamAll();
}
These are used in the following example, first to request the
JournalReader
, then the Stream
of all EntryBundle
instances, and then to flow the stream into the Sink<EntryBundle>
for reporting on each event.final Sink<EntryBundle> sink =
Sink.consumeWith((bundle) -> reportOn(bundle.source));
journal
.journalReader("events-reporter")
.andThenTo(reader -> reader.streamAll())
.andThen(stream -> stream.flowInto(sink, 50));
All stores—
ObjectStore
, StateStore
, and Journal
—support persisting a totally ordered stream of Source<?>
entries, such as DomainEvent
and Command
. You may obtain any of the total streams using the same basic techniques.Last modified 1yr ago