Streams

The VLINGO XOOM platform Reactive Streams built on XOOM Actors.

The XOOM Streams component implements the Reactive Streams specification for the VLINGO XOOM platform. It is fully based on XOOM Actors to provide reactive concurrency for stream processing.

There are four abstractions used by the Reactive Streams specification, which are entirely implemented in XOOM Streams.

Protocol

Description

Processor

Both a Subscriber and a Publisher, first receiving a stream of elements and then publishing those following a probable transformation. This is perfect for defining streaming pipelines.

Publisher

A provider of a potentially unbounded number of sequenced elements, pushing them according to the demand received from its one or more Subscriber interests.

Subscriber

Receives signals from the Publisher to which it is subscribed. The signals indicate that a Subscription has been granted, that a next element is available, that an error has occurred, and if the end of stream is reached, that the stream is completed.

Subscription

A reference to the contract between Publisher and Subscriber, which is used by the Subscriber to request 1 to N sequenced elements from the Publisher, and to cancel its subscription. The value of N may be effectively unbounded, which is indicated by requesting Long.MAX_VALUE.

It is possible that you will never implement a Publisher, Subscriber, Subscription, or Processor yourself. Default implementations of these are provided by XOOM Streams. You will, instead implement Source and Sink types.

The Subscriber requests a Subscription from a Publisher and specifies the number of elements it can accept. By specifying the number of elements it can accept, backpressure is enforced on the Publisher so that it does not overwhelm the Subscriber.

The following is an example of how to create a Publisher that produces a sequenced series of Long values and a Subscriber with a Sink that prints those as String values.

final long max = 10;

final Publisher publisher =
  world.actorFor(
    Publisher.class,
    StreamPublisher.class,
    Source.rangeOf(1, max + 1),
    PublisherConfiguration.defaultDropHead());

final Subscriber subscriber =
  world.actorFor(
    Subscriber.class,
    StreamSubscriber.class,
    Sink.printToStdout("> "),
    max);

publisher.subscribe(subscriber);

With no further code the above produces the following output.

// RESULTS
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10

Similarly a Processor is created, which is both a Subscriber to an upstream Publisher and a Publisher to an downstream Subscriber.

The following example reuses the Publisher and Subscriber from the previous example, but injects the Processor in between the two, where the Processor transforms the Long values to Double values.

final LongToDoubleMapper transformer = new LongToDoubleMapper();

final Processor<String,Integer> processor =
  world.actorFor(
    Processor.class,
    StreamProcessor.class,
    transformer,
    10,
    PublisherConfiguration.defaultDropHead());

processor.subscribe(subscriber);

publisher.subscribe(processor);

The above produces this output.

// RESULTS
> 1.0
> 2.0
> 3.0
> 4.0
> 5.0
> 6.0
> 7.0
> 8.0
> 9.0
> 10.0

Sources and Sinks

A Source is the source of a stream of elements relayed by a Publisher. A Sink is the destination of the elements provided to a Subscriber by its Publisher. Next are ways that these may be used.

Source

You may reuse predefined Source types, but you will also develop your own implementations. This is the Source protocol as provided by XOOM Streams.

package io.vlingo.xoom.reactivestreams;

public interface Source<T> {
  Completes<Elements<T>> next();
  Completes<Elements<T>> next(final int maximumElements);
  Completes<Elements<T>> next(final long index);
  Completes<Elements<T>> next(final long index, final int maximumElements);
  Completes<Boolean> isSlow();
}

The job of a Source is to take requests for the next elements in a stream, which are returned as a Completes<Elements<T>>. This means that the Elements<T> are completed at some future time. See Completes<T> for details on use.

The isSlow() protocol answers a Completes<Boolean> indicating whether the Source will tend to be slow in providing next elements. The following demonstrates how you may answer.

@Override
public Completes<Boolean> isSlow() {
  return Completes.withSuccess(false);
}

Of course, answering true or false accurately is vitally important. If your Source is slow, stating so enables the Publisher to decide on the kind of Scheduler to use between probes for next elements. A slow Source will be managed by repeated schedule-once timer intervals, while a fast Source will be managed by a consistently schedule-many repeating time interval.

The following is an example of a fast Source that lazily (not all preallocated in a List) provides a range of Long values.

package io.vlingo.xoom.reactivestreams.source;

import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.reactivestreams.Elements;
import io.vlingo.xoom.reactivestreams.Source;

public class LongRangeSource implements Source<Long> {
  private long current;
  public final long endExclusive;
  public final long startInclusive;

  public LongRangeSource(final long startInclusive, final long endExclusive) {
    assert(startInclusive <= endExclusive);
    assert(startInclusive >= 0 && startInclusive <= Long.MAX_VALUE);
    this.startInclusive = startInclusive;
    assert(endExclusive >= 0 && endExclusive <= Long.MAX_VALUE);
    this.endExclusive = endExclusive;

    this.current = startInclusive;
  }

  @Override
  public Completes<Elements<Long>> next() {
    if (current < endExclusive) {
      final Long[] element = new Long[1];
      element[0] = current++;
      return Completes.withSuccess(new Elements<>(element, false));
    }
    return Completes.withSuccess(new Elements<>(new Long[0], true));
  }

  @Override
  public Completes<Elements<Long>> next(final int maximumElements) {
    return next();
  }

  @Override
  public Completes<Elements<Long>> next(long index) {
    return next();
  }

  @Override
  public Completes<Elements<Long>> next(final long index, final int maximumElements) {
    return next();
  }

  @Override
  public Completes<Boolean> isSlow() {
    return Completes.withSuccess(false);
  }

  @Override
  public String toString() {
    return "LongRangeSource [startInclusive=" + startInclusive +
              " endExclusive=" + endExclusive + " current=" + current + "]";
  }
}

See the Javadocs on Source for full API explanations.

There are factory methods available to create Source instances, and default implementation types of which these produce instances.

public interface Source<T> {
  static <T> Source<T> empty() ...
  static <T> Source<T> only(final T... elements) ...
  static Source<Long> rangeOf(final long startInclusive, final long endExclusive) ...
  static <T> Source<T> with(final Iterable<T> iterable) ...
  static <T> Source<T> with(final Iterable<T> iterable, final boolean slowIterable) ...
  static <T> Source<T> with(final Supplier<T> supplier) ...
  static <T> Source<T> with(final Supplier<T> supplier, final boolean slowSupplier) ...
}

The number of these and the backing Source types will grow over future releases.

Sink

You may reuse predefined Sink types, but you will develop your own implementations. This is the Sink protocol as provided by XOOM Streams.

public interface Sink<T> {
  void ready();
  void terminate();
  void whenValue(final T value);
}

The ready() indicates that the Sink should become prepared to handle incoming values. The terminate() indicates that the Sink is being terminated and will no longer receive values. The whenValue(T value) is used to provide the next available value from the Subscriber.

The following is an example of a Sink that prints received values.

package io.vlingo.xoom.reactivestreams.sink;

import java.io.PrintStream;
import io.vlingo.xoom.reactivestreams.Sink;

public class PrintSink<T> implements Sink<T> {
  private final PrintStream printStream;
  private final String prefix;
  private boolean terminated;

  public PrintSink(final PrintStream printStream, final String prefix) {
    this.printStream = printStream;
    this.prefix = prefix;
    this.terminated = false;
  }

  @Override
  public void ready() {
    // ignored
  }

  @Override
  public void terminate() {
    terminated = true;
  }

  @Override
  public void whenValue(final T value) {
    if (!terminated) {
      printStream.println(prefix + value.toString());
    }
  }

  @Override
  public String toString() {
    return "PrintSink[terminated=" + terminated + "]";
  }
}

See the Javadocs on Sink for full API explanations.

There are factory methods available to create Sink instances, and default implementation types of which these produce instances.

public interface Sink<T> {
  static <T> Sink<T> consumeWith(final Consumer<T> consumer) ...
  static <T> Sink<T> printToStdout(final String prefix) ...
  static <T> Sink<T> printToStderr(final String prefix) ...
  static <T> Sink<T> printTo(final PrintStream printStream, final String prefix) ...
}

The number of these and the backing Sink types will grow over future releases.

Stream

Some functions in the VLINGO XOOM platform, such as queries provided by XOOM Symbio, answer a Completes<Stream>. The Stream that is eventually available inside the Completes is used to consume the elements of streaming data.

package io.vlingo.xoom.reactivestreams;

public interface Stream {
  <S> void flowInto(final Sink<S> sink);
  <S> void flowInto(final Sink<S> sink, final long flowElementsRate);
  <S> void flowInto(final Sink<S> sink, final long flowElementsRate, final int probeInterval);
  void request(final long flowElementsRate);
  void stop();
}

The flowInto() overrides are used to start the stream flowing into a given Sink<S>. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for flowElementsRate. If no parameter is provided, the default is 100.

public static final long DefaultFlowRate = 100;

To indicate the number of milliseconds between probes for Source<T> elements, pass probeInterval. The default if not explicitly provided by the client is 5 milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the Source<S> and the speed at which the Source<T> is probed by the StreamPublisher to retrieve that data.

// 5 milliseconds
public static final int DefaultProbeInterval = PublisherConfiguration.DefaultProbeInterval;
// 2 milliseconds
public static int FastProbeInterval = PublisherConfiguration.FastProbeInterval;
// 1 millisecond
public static int FastestProbeInterval = PublisherConfiguration.FastestProbeInterval;

The request() operation may be used to change the flow rate after the initial rate has been established. The stop() method may be used to completely terminate the flow of elements from the Source<T> and Publisher<T>. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the Sink<S> and thus will not be prevented from arriving. If no parameter is provided, the default is 5.

For examples of how Stream is used, see Streaming Persistent Data.

Operators

In XOOM Streams an operator is used to perform a specific kind of data transformation. When using a StreamProcessor you will probably want to transform the StreamSubscriber side of the processor's incoming data to another type that is outgoing through its StreamPublisher side. You can use an operator to do that.

package io.vlingo.xoom.reactivestreams;

public interface Operator<T,R> {
  void performInto(final T value, final Consumer<R> consumer);
}

The Operator<T, R> takes type T as an input value and provides type R as output. Output is delivered to a Consumer<R> of type R, which means the R value is produced before the Consumer<R> receives it. This is where the implementation of the Operator<T, R> plays in. Note that there are currently three factory methods provided on the Operator interface:

package io.vlingo.xoom.reactivestreams;

public interface Operator<T,R> {
  static <T> Operator<T,T> filterWith(final Predicate<T> filter) {
    return new Filter<>(filter);
  }

  static <T,R> Operator<T,R> flatMapper(final Function<T, Source<R>> mapper) {
    return new FlatMapper<T,R>(mapper);
  }

  static <T,R> Operator<T,R> mapWith(final Function<T,R> mapper) {
    return new Mapper<>(mapper);
  }

  ...
}

Thus, there are three basic kinds of operators, a filter, a flat-mapper, and a mapper.

Factory

Operator Description

Filter

Produces the same type that it takes as input, but may produce less output than it receives as input.

Mapper

Produces a different type than it takes as input, as it is responsible to map/transform the input to another type. Actually the mapper may produce the same type of output as the input, but perhaps the data inside the type has been enriched or restricted in some way.

FlatMapper

Works like Mapper, but also flattens the stream by reducing the inner structure to the single one produced by the mapping Function.

Here is an example filter:

final List<String> list =
  Arrays.asList("ABC", "321", "123", "456", "DEF", "214");

final List<String> results = new ArrayList<>();

final Operator<String,String> filter =
  Operator.filterWith((s) -> s.contains("1"));

list.forEach(possible ->
  filter.performInto(possible, (match) -> results.add(match)));

Assert.assertEquals(3, results.size());
Assert.assertEquals("321", results.get(0));
Assert.assertEquals("123", results.get(1));
Assert.assertEquals("214", results.get(2));

In this example (broken down into multiple steps for clarity) a filter is created that filters in all String instances that contain the substring "1", and filters out all others. Then the List<String> of six elements is iterated over and the filter's performInto() operation is used. If the filter Predicate<T> of s.contains("1") is satisfied, the match is added to the List<String> of results, which then contains elements "321", "123", and "214".

Next is an example of a mapper:

final List<String> list = Arrays.asList("123", "456", "789");

final List<Integer> results = new ArrayList<>();

final Operator<String,Integer> mapper =
  Operator.mapWith((s) -> Integer.parseInt(s));

list.forEach(digits ->
  mapper.performInto(digits, (number) -> results.add(number)));

Assert.assertEquals(3, results.size());
Assert.assertEquals(123, (int) results.get(0));
Assert.assertEquals(456, (int) results.get(1));
Assert.assertEquals(789, (int) results.get(2));

In this example (broken down into multiple steps for clarity) a mapper is created that maps all String instances of digit characters to Integer numbers. The List<String> of three elements is iterated over and the mapper's performInto() operation is used. The new value is added to the List<Integer> of results, which then contains elements 123, 456, and 789.

The following demonstrates how a mapper can use flat-map (but it's easier than this; see below.):

final List<String> list1 = Arrays.asList("1", "2", "3");
final List<String> list2 = Arrays.asList("4", "5", "6");
final List<String> list3 = Arrays.asList("7", "8", "9");

final List<List<String>> lists = Arrays.asList(list1, list2, list3);

final List<Integer> results = new ArrayList<>();

final Function<List<List<String>>, List<Integer>> mapper =
        (los) -> los.stream()
          .flatMap(list -> list.stream().map(s -> Integer.parseInt(s)))
          .collect(Collectors.toList());

final Operator<List<List<String>>,List<Integer>> flatMapper =
  Operator.mapWith(mapper);

flatMapper.performInto(lists, (numbers) -> results.addAll(numbers));

Assert.assertEquals(9, results.size());
Assert.assertEquals(1, (int) results.get(0));
Assert.assertEquals(2, (int) results.get(1));
Assert.assertEquals(3, (int) results.get(2));

Note that there is a List<List<String>> lists that is a list of lists. The mapper is created that internally uses the Java Stream::flatMap. The mapper first streams over the List<List<String>> lists. It then uses flatMap to handle each of the single List<String> list. It then streams over each of those lists, and each individual String is transformed to an Integer.

Yet, you can perform this more easily using the FlatMapper operator. Here's a FlatMapper solution to a different problem:

final Operator<Long, Long> flatMap = Operator.flatMapWith((record) -> new LongRangeSource(record, record + 2));
final List<Long> providedLongs = new CopyOnWriteArrayList<>();

flatMap.performInto(1L, providedLongs::add);
flatMap.performInto(3L, providedLongs::add);

Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L), providedLongs);

In this example, the flat-mapper is given a function that instantiates a LongRangeSource, which is a stream source that contains a startInclusive and an endExclusive value. In the first performInto() the startInclusive is 1 and the endExclusive of two greater, which is 3; the first numeric range is thus 1 to 2. The second range is 3 to 4. The flat-mapper reads every value in each LongRangeSource and adds it to the collection referenced by providedLongs. When completed the two containers are flattened such that only the inner ranges are preserved in the providedLongs.

You can give a StreamProcessor an Operator<T, R> such as these, or use them in your Source and Sink implementations.

We will be adding more functional interfaces to our various types over future releases.

Streaming Your Data

There are several ways to stream your data. You have already seen specific examples in the above content. In this section you are introduced to other ways.

Streaming Persistent Data

Data that is persisted inside a XOOM Streams store can be streamed out to one or more subscribers. One of the most basic examples is provided by the StateStore.

package io.vlingo.symbio.store.state;

public interface StateStoreReader {
  ...
  Completes<Stream> streamAllOf(final Class<?> stateType);
  Completes<Stream> streamSomeUsing(final QueryExpression query);
}

A StateStoreReader is provided as part of the StateStore protocol. Thus, you may ask a StateStore to stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a stateType is a Java Class<?> of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.

By requesting a stream of all of a given stateType, the StateStoreReader queries that data from the container and provides an instance of io.vlingo.xoom.reactivestreams.Stream.

// somewhere many EquityState instances are written
final Equity equity = new Equity(...);
...
store.write(equity.id, equity.state, equity.version, interest);

...

final Completes<Stream> stream =
  store.streamAllOf(EquityState.class);

stream.andThen(all ->
  all.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));

You may also constrain your query to a subset of the whole container. The query is storage-type dependent.

final Completes<Stream> stream =
  store.streamSomeUsing(
    QueryExpression.using(
      EquityState.class,
      "select ... from tbl_equity where ..."));

stream.andThen(some ->
  some.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));

You may also stream persisted Source<?> types, such as DomainEvent and Command instances. For example, the Journal<T> provides this streaming interface through the JournalReader<T>, which in turn is anEntryReader<T>.

// Journal
package io.vlingo.symbio.store.journal;

public interface Journal<T> {
  ...
  <ET extends Entry<?>> Completes<JournalReader<ET>>
     journalReader(final String name);
}

// JournalReader
package io.vlingo.symbio.store.journal;

public interface JournalReader<T extends Entry<?>> extends EntryReader<T> { }

// EntryReader
package io.vlingo.symbio.store.journal;

public interface EntryReader<T extends Entry<?>> {
  ...
  Completes<Stream> streamAll();
}

These are used in the following example, first to request the JournalReader, then the Stream of all EntryBundle instances, and then to flow the stream into the Sink<EntryBundle> for reporting on each event.

final Sink<EntryBundle> sink =
  Sink.consumeWith((bundle) -> reportOn(bundle.source));

journal
    .journalReader("events-reporter")
    .andThenTo(reader -> reader.streamAll())
    .andThen(stream -> stream.flowInto(sink, 50));

All stores—ObjectStore, StateStore, and Journal—support persisting a totally ordered stream of Source<?> entries, such as DomainEvent and Command. You may obtain any of the total streams using the same basic techniques.

Last updated