vlingo/streams

The vlingo/PLATFORM Reactive Streams built on vlingo/actors.

The vlingo/streams component implements the Reactive Streams specification for the vlingo/PLATFORM. It is fully based on vlingo/actors to provide reactive concurrency for stream processing.

There are four major abstractions used by the Reactive Streams specification, which are entirely implemented in vlingo/streams.

Protocol

Description

Processor

Both a Subscriber and a Publisher, first receiving a stream of elements and then publishing those following a probable transformation. This is perfect for defining streaming pipelines.

Publisher

A provider of a potentially unbounded number of sequenced elements, pushing them according to the demand received from its one or more Subscriber interests.

Subscriber

Receives signals from the Publisher to which it is subscribed. The signals indicate that a Subscription has been granted, that a next element is available, that an error has occurred, and if the end of stream is reached, that the stream is completed.

Subscription

A reference to the contract between Publisher and Subscriber, which is used by the Subscriber to request 1 to N sequenced elements from the Publisher, and to cancel its subscription. The value of N may be effectively unbounded, which is indicated by requesting Long.MAX_VALUE.

It is possible that you will never implement a Publisher, Subscriber, Subscription, or Processor yourself. Default implementations of these are provided by vlingo/streams. You will, instead implement Source and Sink types.

The following is an example of how to create a Publisher that produces a sequenced series of Long values and a Subscriber with a Sink that prints those as String values.

final long max = 10;
final Publisher publisher =
world.actorFor(
Publisher.class,
StreamPublisher.class,
Source.rangeOf(1, max + 1),
PublisherConfiguration.defaultDropHead());
final Subscriber subscriber =
world.actorFor(
Subscriber.class,
StreamSubscriber.class,
Sink.printToStdout("> "),
max);
publisher.subscribe(subscriber);

With no further code the above produces the following output.

// RESULTS
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10

We will be adding functional interfaces to our various types over future releases.

Sources and Sinks

A Source is the source of a stream of elements relayed by a Publisher. A Sink is the destination of the elements provided to a Subscriber by its Publisher. Next are ways that these may be used.

Source

You may reuse predefined Source types, but you will also develop your own implementations. This is the Source protocol as provided by vlingo/streams.

package io.vlingo.reactivestreams;
public interface Source<T> {
Completes<Elements<T>> next();
Completes<Elements<T>> next(final int maximumElements);
Completes<Elements<T>> next(final long index);
Completes<Elements<T>> next(final long index, final int maximumElements);
Completes<Boolean> isSlow();
}

The job of a Source is to take requests for the next elements in a stream, which are returned as a Completes<Elements<T>>. This means that the Elements<T> are completed at some future time. See Completes<T> for details on use.

The isSlow() protocol answers a Completes<Boolean> indicating whether the Source will tend to be slow in providing next elements. The following demonstrates how you may answer.

@Override
public Completes<Boolean> isSlow() {
return Completes.withSuccess(false);
}

Of course, answering true or false accurately is vitally important. If your Source is slow, stating so enables the Publisher to decide on the kind of Scheduler to use between probes for next elements. A slow Source will be managed by repeated schedule-once timer intervals, while a fast Source will be managed by a consistently schedule-many repeating time interval.

The following is an example of a fast Source that lazily (not all preallocated in a List) provides a range of Long values.

package io.vlingo.reactivestreams.source;
import io.vlingo.common.Completes;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.Source;
public class LongRangeSource implements Source<Long> {
private long current;
public final long endExclusive;
public final long startInclusive;
public LongRangeSource(final long startInclusive, final long endExclusive) {
assert(startInclusive <= endExclusive);
assert(startInclusive >= 0 && startInclusive <= Long.MAX_VALUE);
this.startInclusive = startInclusive;
assert(endExclusive >= 0 && endExclusive <= Long.MAX_VALUE);
this.endExclusive = endExclusive;
this.current = startInclusive;
}
@Override
public Completes<Elements<Long>> next() {
if (current < endExclusive) {
final Long[] element = new Long[1];
element[0] = current++;
return Completes.withSuccess(new Elements<>(element, false));
}
return Completes.withSuccess(new Elements<>(new Long[0], true));
}
@Override
public Completes<Elements<Long>> next(final int maximumElements) {
return next();
}
@Override
public Completes<Elements<Long>> next(long index) {
return next();
}
@Override
public Completes<Elements<Long>> next(final long index, final int maximumElements) {
return next();
}
@Override
public Completes<Boolean> isSlow() {
return Completes.withSuccess(false);
}
@Override
public String toString() {
return "LongRangeSource [startInclusive=" + startInclusive +
" endExclusive=" + endExclusive + " current=" + current + "]";
}
}

See the Javadocs for on Source for full API explanations.

There are factory methods available to create Source instances, and default implementation types of which these these produce instances.

public interface Source<T> {
static <T> Source<T> empty() ...
static <T> Source<T> only(final T... elements) ...
static Source<Long> rangeOf(final long startInclusive, final long endExclusive) ...
static <T> Source<T> with(final Iterable<T> iterable) ...
static <T> Source<T> with(final Iterable<T> iterable, final boolean slowIterable) ...
static <T> Source<T> with(final Supplier<T> supplier) ...
static <T> Source<T> with(final Supplier<T> supplier, final boolean slowSupplier) ...
}

The number of these and the backing Sink types will grow over future releases.

Sink

You may reuse predefined Sink types, but you will develop your own implementations. This is the Sink protocol as provided by vlingo/streams.

public interface Sink<T> {
void ready();
void terminate();
void whenValue(final T value);
}

The ready() indicates that the Sink should become prepared to handle incoming values. The terminate() indicates that the Sink is being terminated and will no longer receive values. The whenValue(T value) is used to provide the next available value from the Subscriber.

The following is an example of a Sink that prints received values.

package io.vlingo.reactivestreams.sink;
import java.io.PrintStream;
import io.vlingo.reactivestreams.Sink;
public class PrintSink<T> implements Sink<T> {
private final PrintStream printStream;
private final String prefix;
private boolean terminated;
public PrintSink(final PrintStream printStream, final String prefix) {
this.printStream = printStream;
this.prefix = prefix;
this.terminated = false;
}
@Override
public void ready() {
// ignored
}
@Override
public void terminate() {
terminated = true;
}
@Override
public void whenValue(final T value) {
if (!terminated) {
printStream.println(prefix + value.toString());
}
}
@Override
public String toString() {
return "PrintSink[terminated=" + terminated + "]";
}
}

See the Javadocs for on Source for full API explanations.

There are factory methods available to create Sink instances, and default implementation types of which these produce instances.

public interface Sink<T> {
static <T> Sink<T> consumeWith(final Consumer<T> consumer) ...
static <T> Sink<T> printToStdout(final String prefix) ...
static <T> Sink<T> printToStderr(final String prefix) ...
static <T> Sink<T> printTo(final PrintStream printStream, final String prefix) ...
}

The number of these and the backing Sink types will grow over future releases.