The VLINGO/STREAMS component implements the Reactive Streams specification for the VLINGO/PLATFORM. It is fully based on VLINGO/ACTORS to provide reactive concurrency for stream processing.
There are four abstractions used by the Reactive Streams specification, which are entirely implemented in VLINGO/STREAMS.
Protocol | Description |
| Both a |
| A provider of a potentially unbounded number of sequenced elements, pushing them according to the demand received from its one or more |
| Receives signals from the |
| A reference to the contract between |
It is possible that you will never implement a Publisher
, Subscriber
, Subscription
, or Processor
yourself. Default implementations of these are provided by VLINGO/STREAMS. You will, instead implement Source
and Sink
types.
The Subscriber
requests a Subscription
from a Publisher
and specifies the number of elements it can accept. By specifying the number of elements it can accept, backpressure is enforced on the Publisher
so that it does not overwhelm the Subscriber
.
The following is an example of how to create a Publisher
that produces a sequenced series of Long
values and a Subscriber
with a Sink
that prints those as String
values.
final long max = 10;​final Publisher publisher =world.actorFor(Publisher.class,StreamPublisher.class,Source.rangeOf(1, max + 1),PublisherConfiguration.defaultDropHead());​final Subscriber subscriber =world.actorFor(Subscriber.class,StreamSubscriber.class,Sink.printToStdout("> "),max);​publisher.subscribe(subscriber);
With no further code the above produces the following output.
// RESULTS> 1> 2> 3> 4> 5> 6> 7> 8> 9> 10
Similarly a Processor
is created, which is both a Subscriber
to an upstream Publisher
and a Publisher
to an downstream Subscriber
.
The following example reuses the Publisher
and Subscriber
from the previous example, but injects the Processor
in between the two, where the Processor
transforms the Long
values to Double
values.
final LongToDoubleMapper transformer = new LongToDoubleMapper();​final Processor<String,Integer> processor =world.actorFor(Processor.class,StreamProcessor.class,transformer,10,PublisherConfiguration.defaultDropHead());​processor.subscribe(subscriber);​publisher.subscribe(processor);​
The above produces this output.
// RESULTS> 1.0> 2.0> 3.0> 4.0> 5.0> 6.0> 7.0> 8.0> 9.0> 10.0
A Source
is the source of a stream of elements relayed by a Publisher
. A Sink
is the destination of the elements provided to a Subscriber
by its Publisher
. Next are ways that these may be used.
You may reuse predefined Source
types, but you will also develop your own implementations. This is the Source
protocol as provided by VLINGO/STREAMS.
package io.vlingo.reactivestreams;​public interface Source<T> {Completes<Elements<T>> next();Completes<Elements<T>> next(final int maximumElements);Completes<Elements<T>> next(final long index);Completes<Elements<T>> next(final long index, final int maximumElements);Completes<Boolean> isSlow();}
The job of a Source
is to take requests for the next elements in a stream, which are returned as a Completes<Elements<T>>
. This means that the Elements<T>
are completed at some future time. See Completes<T>
for details on use.
The isSlow()
protocol answers a Completes<Boolean>
indicating whether the Source
will tend to be slow in providing next elements. The following demonstrates how you may answer.
@Overridepublic Completes<Boolean> isSlow() {return Completes.withSuccess(false);}
Of course, answering true
or false
accurately is vitally important. If your Source is slow, stating so enables the Publisher
to decide on the kind of Scheduler
to use between probes for next elements. A slow Source
will be managed by repeated schedule-once timer intervals, while a fast Source
will be managed by a consistently schedule-many repeating time interval.
The following is an example of a fast Source
that lazily (not all preallocated in a List
) provides a range of Long
values.
package io.vlingo.reactivestreams.source;​import io.vlingo.common.Completes;import io.vlingo.reactivestreams.Elements;import io.vlingo.reactivestreams.Source;​public class LongRangeSource implements Source<Long> {private long current;public final long endExclusive;public final long startInclusive;​public LongRangeSource(final long startInclusive, final long endExclusive) {assert(startInclusive <= endExclusive);assert(startInclusive >= 0 && startInclusive <= Long.MAX_VALUE);this.startInclusive = startInclusive;assert(endExclusive >= 0 && endExclusive <= Long.MAX_VALUE);this.endExclusive = endExclusive;​this.current = startInclusive;}​@Overridepublic Completes<Elements<Long>> next() {if (current < endExclusive) {final Long[] element = new Long[1];element[0] = current++;return Completes.withSuccess(new Elements<>(element, false));}return Completes.withSuccess(new Elements<>(new Long[0], true));}​@Overridepublic Completes<Elements<Long>> next(final int maximumElements) {return next();}​@Overridepublic Completes<Elements<Long>> next(long index) {return next();}​@Overridepublic Completes<Elements<Long>> next(final long index, final int maximumElements) {return next();}​@Overridepublic Completes<Boolean> isSlow() {return Completes.withSuccess(false);}​@Overridepublic String toString() {return "LongRangeSource [startInclusive=" + startInclusive +" endExclusive=" + endExclusive + " current=" + current + "]";}}
See the Javadocs on Source
for full API explanations.
There are factory methods available to create Source
instances, and default implementation types of which these produce instances.
public interface Source<T> {static <T> Source<T> empty() ...static <T> Source<T> only(final T... elements) ...static Source<Long> rangeOf(final long startInclusive, final long endExclusive) ...static <T> Source<T> with(final Iterable<T> iterable) ...static <T> Source<T> with(final Iterable<T> iterable, final boolean slowIterable) ...static <T> Source<T> with(final Supplier<T> supplier) ...static <T> Source<T> with(final Supplier<T> supplier, final boolean slowSupplier) ...}
The number of these and the backing Source
types will grow over future releases.
You may reuse predefined Sink
types, but you will develop your own implementations. This is the Sink
protocol as provided by VLINGO/STREAMS.
public interface Sink<T> {void ready();void terminate();void whenValue(final T value);}
The ready()
indicates that the Sink
should become prepared to handle incoming values. The terminate()
indicates that the Sink
is being terminated and will no longer receive values. The whenValue(T value)
is used to provide the next available value from the Subscriber
.
The following is an example of a Sink
that prints received values.
package io.vlingo.reactivestreams.sink;​import java.io.PrintStream;import io.vlingo.reactivestreams.Sink;​public class PrintSink<T> implements Sink<T> {private final PrintStream printStream;private final String prefix;private boolean terminated;​public PrintSink(final PrintStream printStream, final String prefix) {this.printStream = printStream;this.prefix = prefix;this.terminated = false;}​@Overridepublic void ready() {// ignored}​@Overridepublic void terminate() {terminated = true;}​@Overridepublic void whenValue(final T value) {if (!terminated) {printStream.println(prefix + value.toString());}}​@Overridepublic String toString() {return "PrintSink[terminated=" + terminated + "]";}}
See the Javadocs on Sink
for full API explanations.
There are factory methods available to create Sink
instances, and default implementation types of which these produce instances.
public interface Sink<T> {static <T> Sink<T> consumeWith(final Consumer<T> consumer) ...static <T> Sink<T> printToStdout(final String prefix) ...static <T> Sink<T> printToStderr(final String prefix) ...static <T> Sink<T> printTo(final PrintStream printStream, final String prefix) ...}
The number of these and the backing Sink
types will grow over future releases.
Some functions in the VLINGO/PLATFORM, such as queries provided by VLINGO/SYMBIO, answer a Completes<Stream>
. The Stream
that is eventually available inside the Completes
is used to consume the elements of streaming data.
package io.vlingo.reactivestreams;​public interface Stream {<S> void flowInto(final Sink<S> sink);<S> void flowInto(final Sink<S> sink, final long flowElementsRate);<S> void flowInto(final Sink<S> sink, final long flowElementsRate, final int probeInterval);void request(final long flowElementsRate);void stop();}
The flowInto()
overrides are used to start the stream flowing into a given Sink<S>
. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for flowElementsRate
. If no parameter is provided, the default is 100
.
public static final long DefaultFlowRate = 100;
To indicate the number of milliseconds between probes for Source<T>
elements, pass probeInterval
. The default if not explicitly provided by the client is 5
milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the Source<S>
and the speed at which the Source<T>
is probed by the StreamPublisher
to retrieve that data.
// 5 millisecondspublic static final int DefaultProbeInterval = PublisherConfiguration.DefaultProbeInterval;// 2 millisecondspublic static int FastProbeInterval = PublisherConfiguration.FastProbeInterval;// 1 millisecondpublic static int FastestProbeInterval = PublisherConfiguration.FastestProbeInterval;
The request()
operation may be used to change the flow rate after the initial rate has been established. The stop()
method may be used to completely terminate the flow of elements from the Source<T>
and Publisher<T>
. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the Sink<S>
and thus will not be prevented from arriving. If no parameter is provided, the default is 5
.
For examples of how Stream
is used, see Streaming Persistent Data.
In VLINGO/STREAMS an operator is used to perform a specific kind of data transformation. When using a StreamProcessor
you will probably want to transform the StreamSubscriber
side of the processor's incoming data to another type that is outgoing through its StreamPublisher
side. You can use an operator to do that.
package io.vlingo.reactivestreams;​public interface Operator<T,R> {void performInto(final T value, final Consumer<R> consumer);}
The Operator<T, R>
takes type T
as an input value
and provides type R
as output. Output is delivered to a Consumer<R>
of type R
, which means the R
value is produced before the Consumer<R>
receives it. This is where the implementation of the Operator<T, R>
plays in. Note that there are currently two factory methods provided on the Operator
interface.
package io.vlingo.reactivestreams;​public interface Operator<T,R> {static <T> Operator<T,T> filterWith(final Predicate<T> filter) {return new Filter<>(filter);}​static <T,R> Operator<T,R> mapWith(final Function<T,R> mapper) {return new Mapper<>(mapper);}​...}
Thus, there are two basic kinds of operators, a filter and a mapper. A filter outputs the same type that it takes as input, but may produce less output than it receives as input. The mapper outputs a different type than it takes as input, as it is responsible to map/transform the input to another type. Actually the mapper may produce the same type of output as the input, but perhaps the data inside the type has been enriched or restricted in some way.
Here is an example filter.
final List<String> list =Arrays.asList("ABC", "321", "123", "456", "DEF", "214");​final List<String> results = new ArrayList<>();​final Operator<String,String> filter =Operator.filterWith((s) -> s.contains("1"));​list.forEach(possible ->filter.performInto(possible, (match) -> results.add(match)));​Assert.assertEquals(3, results.size());Assert.assertEquals("321", results.get(0));Assert.assertEquals("123", results.get(1));Assert.assertEquals("214", results.get(2));
In this example (broken down into multiple steps for clarity) a filter
is created that filters in all String
instances that contain the substring "1"
, and filters out all others. Then the List<String>
of six elements is iterated over and the filter's performInto()
operation is used. If the filter Predicate<T>
of s.contains("1")
is satisfied, the match is added to the List<String>
of results
, which then contains elements "321"
, "123"
, and "214"
.
Next is an example of a mapper.
final List<String> list = Arrays.asList("123", "456", "789");​final List<Integer> results = new ArrayList<>();​final Operator<String,Integer> mapper =Operator.mapWith((s) -> Integer.parseInt(s));​list.forEach(digits ->mapper.performInto(digits, (number) -> results.add(number)));​Assert.assertEquals(3, results.size());Assert.assertEquals(123, (int) results.get(0));Assert.assertEquals(456, (int) results.get(1));Assert.assertEquals(789, (int) results.get(2));
In this example (broken down into multiple steps for clarity) a mapper
is created that maps all String
instances of digit characters to Integer
numbers. The List<String>
of three elements is iterated over and the mapper's performInto()
operation is used. The new value is added to the List<Integer>
of results
, which then contains elements 123
, 456
, and 789
.
The following demonstrates how a mapper can use flat-map.
final List<String> list1 = Arrays.asList("1", "2", "3");final List<String> list2 = Arrays.asList("4", "5", "6");final List<String> list3 = Arrays.asList("7", "8", "9");​final List<List<String>> lists = Arrays.asList(list1, list2, list3);​final List<Integer> results = new ArrayList<>();​final Function<List<List<String>>, List<Integer>> mapper =(los) -> los.stream().flatMap(list -> list.stream().map(s -> Integer.parseInt(s))).collect(Collectors.toList());​final Operator<List<List<String>>,List<Integer>> flatMapper =Operator.mapWith(mapper);​flatMapper.performInto(lists, (numbers) -> results.addAll(numbers));​Assert.assertEquals(9, results.size());Assert.assertEquals(1, (int) results.get(0));Assert.assertEquals(2, (int) results.get(1));Assert.assertEquals(3, (int) results.get(2));
Note that there is a List<List<String>> lists
that is a list of lists. The mapper
is created that internally uses the Java Stream::flatMap
. The mapper
first streams over the List<List<String>> lists
. It then uses flatMap
to see each of the single List<String> list
. It then streams over each of those lists, and each individual String
is transformed to an Integer
.
You can give a StreamProcessor
an Operator<T, R>
such as these, or use them in your Source
and Sink
implementations.
We will be adding more functional interfaces to our various types over future releases.
There are several ways to stream your data. You have already seen specific examples in the above content. In this section you are introduced to other ways.
Data that is persisted inside a VLINGO/STREAMS store can be streamed out to one or more subscribers. One of the most basic examples is provided by the StateStore
.
package io.vlingo.symbio.store.state;​public interface StateStoreReader {...Completes<Stream> streamAllOf(final Class<?> stateType);Completes<Stream> streamSomeUsing(final QueryExpression query);}
A StateStoreReader
is provided as part of the StateStore
protocol. Thus, you may ask a StateStore
to stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a stateType
is a Java Class<?>
of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.
By requesting a stream of all of a given stateType
, the StateStoreReader
queries that data from the container and provides an instance of io.vlingo.reactivestreams.Stream
.
// somewhere many EquityState instances are writtenfinal Equity equity = new Equity(...);...store.write(equity.id, equity.state, equity.version, interest);​...​final Completes<Stream> stream =store.streamAllOf(EquityState.class);​stream.andThen(all ->all.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
You may also constrain your query to a subset of the whole container. The query is storage-type dependent.
final Completes<Stream> stream =store.streamSomeUsing(QueryExpression.using(EquityState.class,"select ... from tbl_equity where ..."));​stream.andThen(some ->some.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
You may also stream persisted Source<?>
types, such as DomainEvent
and Command
instances. For example, the Journal<T>
provides this streaming interface through the JournalReader<T>
, which in turn is anEntryReader<T>
.
// Journalpackage io.vlingo.symbio.store.journal;​public interface Journal<T> {...<ET extends Entry<?>> Completes<JournalReader<ET>>journalReader(final String name);}​// JournalReaderpackage io.vlingo.symbio.store.journal;​public interface JournalReader<T extends Entry<?>> extends EntryReader<T> { }​// EntryReaderpackage io.vlingo.symbio.store.journal;​public interface EntryReader<T extends Entry<?>> {...Completes<Stream> streamAll();}
These are used in the following example, first to request the JournalReader
, then the Stream
of all EntryBundle
instances, and then to flow the stream into the Sink<EntryBundle>
for reporting on each event.
final Sink<EntryBundle> sink =Sink.consumeWith((bundle) -> reportOn(bundle.source));​journal.journalReader("events-reporter").andThenTo(reader -> reader.streamAll()).andThen(stream -> stream.flowInto(sink, 50));
All stores—ObjectStore
, StateStore
, and Journal
—support persisting a totally ordered stream of Source<?>
entries, such as DomainEvent
and Command
. You may obtain any of the total streams using the same basic techniques.