# Streams

The XOOM Streams component implements the [Reactive Streams specification](https://www.reactive-streams.org/) for the VLINGO XOOM platfor&#x6D;*.* It is fully based on XOOM Actors to provide reactive concurrency for stream processing.

There are four abstractions used by the Reactive Streams specification, which are entirely implemented in XOOM Streams.

| Protocol       | Description                                                                                                                                                                                                                                                                                    |
| -------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `Processor`    | Both a `Subscriber` and a `Publisher`, first receiving a stream of elements and then publishing those following a probable transformation. This is perfect for defining streaming pipelines.                                                                                                   |
| `Publisher`    | A provider of a potentially unbounded number of sequenced elements, pushing them according to the demand received from its one or more `Subscriber` interests.                                                                                                                                 |
| `Subscriber`   | Receives signals from the `Publisher` to which it is subscribed. The signals indicate that a `Subscription` has been granted, that a next element is available, that an error has occurred, and if the end of stream is reached, that the stream is completed.                                 |
| `Subscription` | A reference to the contract between `Publisher` and `Subscriber`, which is used by the `Subscriber` to request 1 to N sequenced elements from the `Publisher`, and to cancel its subscription. The value of N may be effectively unbounded, which is indicated by requesting `Long.MAX_VALUE`. |

![The four abstractions of Reactive Streams.](https://3691394259-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LLB-V2sJmANuWISDmBf%2F-M99HlTd6nvy6k3PYPMc%2F-M9Brft0IxmpPoy1NxPd%2FAbstractions.png?alt=media\&token=b4ed13a4-e3bc-448e-a33f-561af2b6e96e)

It is possible that you will never implement a `Publisher`, `Subscriber`, `Subscription`, or `Processor` yourself. Default implementations of these are provided by XOOM Streams. You will, instead implement `Source` and `Sink` types.

The `Subscriber` requests a `Subscription` from a `Publisher` and specifies the number of elements it can accept. By specifying the number of elements it can accept, *backpressure* is enforced on the `Publisher` so that it does not overwhelm the `Subscriber`.

![A Subscriber requests N elements from the Publisher, creating a backpressure contract.](https://3691394259-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LLB-V2sJmANuWISDmBf%2F-M99HlTd6nvy6k3PYPMc%2F-M9Bsdd-zWi9VUBzJkuD%2FReactiveStreams01.png?alt=media\&token=60fb0b34-b964-415c-a984-9edba01e4344)

The following is an example of how to create a `Publisher` that produces a sequenced series of `Long` values and a `Subscriber` with a `Sink` that prints those as `String` values.

```java
final long max = 10;

final Publisher publisher =
  world.actorFor(
    Publisher.class,
    StreamPublisher.class,
    Source.rangeOf(1, max + 1),
    PublisherConfiguration.defaultDropHead());

final Subscriber subscriber =
  world.actorFor(
    Subscriber.class,
    StreamSubscriber.class,
    Sink.printToStdout("> "),
    max);

publisher.subscribe(subscriber);
```

With no further code the above produces the following output.

```java
// RESULTS
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
```

Similarly a `Processor` is created, which is both a `Subscriber` to an upstream `Publisher` and a `Publisher` to an downstream `Subscriber`.&#x20;

![A Processor is a Subscriber on the left side and a Publisher on the right side.](https://3691394259-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LLB-V2sJmANuWISDmBf%2F-M99HlTd6nvy6k3PYPMc%2F-M9BwWQtsdhRVuMDzaBv%2FReactiveStreams02.png?alt=media\&token=5a78dc04-81c0-4172-a85f-b88ce167f668)

The following example reuses the `Publisher` and `Subscriber` from the previous example, but injects the `Processor` in between the two, where the `Processor` transforms the `Long` values to `Double` values.

```java
final LongToDoubleMapper transformer = new LongToDoubleMapper();

final Processor<String,Integer> processor =
  world.actorFor(
    Processor.class,
    StreamProcessor.class,
    transformer,
    10,
    PublisherConfiguration.defaultDropHead());

processor.subscribe(subscriber);

publisher.subscribe(processor);

```

The above produces this output.

```java
// RESULTS
> 1.0
> 2.0
> 3.0
> 4.0
> 5.0
> 6.0
> 7.0
> 8.0
> 9.0
> 10.0
```

## Sources and Sinks

A `Source` is the source of a stream of elements relayed by a `Publisher`. A `Sink` is the destination of the elements provided to a `Subscriber` by its `Publisher`. Next are ways that these may be used.

### `Source`

You may reuse predefined `Source` types, but you will also develop your own implementations. This is the `Source` protocol as provided by XOOM Streams.

```java
package io.vlingo.xoom.reactivestreams;

public interface Source<T> {
  Completes<Elements<T>> next();
  Completes<Elements<T>> next(final int maximumElements);
  Completes<Elements<T>> next(final long index);
  Completes<Elements<T>> next(final long index, final int maximumElements);
  Completes<Boolean> isSlow();
}
```

The job of a `Source` is to take requests for the next elements in a stream, which are returned as a `Completes<Elements<T>>`. This means that the `Elements<T>` are completed at some future time. See [`Completes<T>`](https://docs.vlingo.io/xoom-common#completes) for details on use.

The `isSlow()` protocol answers a `Completes<Boolean>` indicating whether the `Source` will tend to be slow in providing next elements. The following demonstrates how you may answer.

```java
@Override
public Completes<Boolean> isSlow() {
  return Completes.withSuccess(false);
}
```

Of course, answering `true` or `false` accurately is vitally important. If your Source is slow, stating so enables the `Publisher` to decide on the kind of `Scheduler` to use between probes for next elements. A slow `Source` will be managed by repeated schedule-once timer intervals, while a fast `Source` will be managed by a consistently schedule-many repeating time interval.

The following is an example of a fast `Source` that lazily (not all preallocated in a `List`) provides a range of `Long` values.

```java
package io.vlingo.xoom.reactivestreams.source;

import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.reactivestreams.Elements;
import io.vlingo.xoom.reactivestreams.Source;

public class LongRangeSource implements Source<Long> {
  private long current;
  public final long endExclusive;
  public final long startInclusive;

  public LongRangeSource(final long startInclusive, final long endExclusive) {
    assert(startInclusive <= endExclusive);
    assert(startInclusive >= 0 && startInclusive <= Long.MAX_VALUE);
    this.startInclusive = startInclusive;
    assert(endExclusive >= 0 && endExclusive <= Long.MAX_VALUE);
    this.endExclusive = endExclusive;

    this.current = startInclusive;
  }

  @Override
  public Completes<Elements<Long>> next() {
    if (current < endExclusive) {
      final Long[] element = new Long[1];
      element[0] = current++;
      return Completes.withSuccess(new Elements<>(element, false));
    }
    return Completes.withSuccess(new Elements<>(new Long[0], true));
  }

  @Override
  public Completes<Elements<Long>> next(final int maximumElements) {
    return next();
  }

  @Override
  public Completes<Elements<Long>> next(long index) {
    return next();
  }

  @Override
  public Completes<Elements<Long>> next(final long index, final int maximumElements) {
    return next();
  }

  @Override
  public Completes<Boolean> isSlow() {
    return Completes.withSuccess(false);
  }

  @Override
  public String toString() {
    return "LongRangeSource [startInclusive=" + startInclusive +
              " endExclusive=" + endExclusive + " current=" + current + "]";
  }
}
```

See the Javadocs on [`Source`](https://javadoc.io/doc/io.vlingo/vlingo-streams/latest/index.html) for full API explanations.

There are factory methods available to create `Source` instances, and default implementation types of which these produce instances.

```java
public interface Source<T> {
  static <T> Source<T> empty() ...
  static <T> Source<T> only(final T... elements) ...
  static Source<Long> rangeOf(final long startInclusive, final long endExclusive) ...
  static <T> Source<T> with(final Iterable<T> iterable) ...
  static <T> Source<T> with(final Iterable<T> iterable, final boolean slowIterable) ...
  static <T> Source<T> with(final Supplier<T> supplier) ...
  static <T> Source<T> with(final Supplier<T> supplier, final boolean slowSupplier) ...
}
```

The number of these and the backing `Source` types will grow over future releases.

### `Sink`

You may reuse predefined `Sink` types, but you will develop your own implementations. This is the `Sink` protocol as provided by XOOM Streams.

```java
public interface Sink<T> {
  void ready();
  void terminate();
  void whenValue(final T value);
}
```

The `ready()` indicates that the `Sink` should become prepared to handle incoming values. The `terminate()` indicates that the `Sink` is being terminated and will no longer receive values. The `whenValue(T value)` is used to provide the next available value from the `Subscriber`.

The following is an example of a `Sink` that prints received values.

```java
package io.vlingo.xoom.reactivestreams.sink;

import java.io.PrintStream;
import io.vlingo.xoom.reactivestreams.Sink;

public class PrintSink<T> implements Sink<T> {
  private final PrintStream printStream;
  private final String prefix;
  private boolean terminated;

  public PrintSink(final PrintStream printStream, final String prefix) {
    this.printStream = printStream;
    this.prefix = prefix;
    this.terminated = false;
  }

  @Override
  public void ready() {
    // ignored
  }

  @Override
  public void terminate() {
    terminated = true;
  }

  @Override
  public void whenValue(final T value) {
    if (!terminated) {
      printStream.println(prefix + value.toString());
    }
  }

  @Override
  public String toString() {
    return "PrintSink[terminated=" + terminated + "]";
  }
}
```

See the Javadocs on [`Sink`](https://javadoc.io/doc/io.vlingo/vlingo-streams/latest/index.html) for full API explanations.

There are factory methods available to create `Sink` instances, and default implementation types of which these produce instances.

```java
public interface Sink<T> {
  static <T> Sink<T> consumeWith(final Consumer<T> consumer) ...
  static <T> Sink<T> printToStdout(final String prefix) ...
  static <T> Sink<T> printToStderr(final String prefix) ...
  static <T> Sink<T> printTo(final PrintStream printStream, final String prefix) ...
}
```

The number of these and the backing `Sink` types will grow over future releases.

## `Stream`

Some functions in the VLINGO XOOM platform, such as queries provided by XOOM Symbio, answer a `Completes<Stream>`. The `Stream` that is eventually available inside the `Completes` is used to consume the elements of streaming data.

```java
package io.vlingo.xoom.reactivestreams;

public interface Stream {
  <S> void flowInto(final Sink<S> sink);
  <S> void flowInto(final Sink<S> sink, final long flowElementsRate);
  <S> void flowInto(final Sink<S> sink, final long flowElementsRate, final int probeInterval);
  void request(final long flowElementsRate);
  void stop();
}
```

The `flowInto()` overrides are used to start the stream flowing into a given `Sink<S>`. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for `flowElementsRate`. If no parameter is provided, the default is `100`.

```java
public static final long DefaultFlowRate = 100;
```

To indicate the number of milliseconds between probes for `Source<T>` elements, pass `probeInterval`.  The default if not explicitly provided by the client is `5` milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the `Source<S>` and the speed at which the `Source<T>` is probed by the `StreamPublisher` to retrieve that data.

```java
// 5 milliseconds
public static final int DefaultProbeInterval = PublisherConfiguration.DefaultProbeInterval;
// 2 milliseconds
public static int FastProbeInterval = PublisherConfiguration.FastProbeInterval;
// 1 millisecond
public static int FastestProbeInterval = PublisherConfiguration.FastestProbeInterval;
```

The `request()` operation may be used to change the flow rate after the initial rate has been established. The `stop()` method may be used to completely terminate the flow of elements from the `Source<T>` and `Publisher<T>`. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the `Sink<S>` and thus will not be prevented from arriving. If no parameter is provided, the default is `5`.

For examples of how `Stream` is used, see [Streaming Persistent Data](https://docs.vlingo.io/vlingo-streams#streaming-persistent-data).

## Operators

In XOOM Streams an operator is used to perform a specific kind of data transformation. When using a `StreamProcessor` you will probably want to transform the `StreamSubscriber` side of the processor's incoming data to another type that is outgoing through its `StreamPublisher` side. You can use an operator to do that.

```java
package io.vlingo.xoom.reactivestreams;

public interface Operator<T,R> {
  void performInto(final T value, final Consumer<R> consumer);
}
```

The `Operator<T, R>` takes type `T` as an input `value` and provides type `R` as output. Output is delivered to a `Consumer<R>` of type `R`,  which means the `R` value is produced before the `Consumer<R>` receives it. This is where the implementation of the `Operator<T, R>` plays in. Note that there are currently three factory methods provided on the `Operator` interface:

```java
package io.vlingo.xoom.reactivestreams;

public interface Operator<T,R> {
  static <T> Operator<T,T> filterWith(final Predicate<T> filter) {
    return new Filter<>(filter);
  }

  static <T,R> Operator<T,R> flatMapper(final Function<T, Source<R>> mapper) {
    return new FlatMapper<T,R>(mapper);
  }

  static <T,R> Operator<T,R> mapWith(final Function<T,R> mapper) {
    return new Mapper<>(mapper);
  }

  ...
}
```

Thus, there are three basic kinds of operators, a filter, a flat-mapper, and a mapper.

| Factory      | Operator Description                                                                                                                                                                                                                                                       |
| ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `Filter`     | Produces the same type that it takes as input, but may produce less output than it receives as input.                                                                                                                                                                      |
| `Mapper`     | Produces a different type than it takes as input, as it is responsible to map/transform the input to another type. Actually the mapper may produce the same type of output as the input, but perhaps the data inside the type has been enriched or restricted in some way. |
| `FlatMapper` | Works like `Mapper`, but also flattens the stream by reducing the inner structure to the single one produced by the mapping `Function`.                                                                                                                                    |

Here is an example filter:

```java
final List<String> list =
  Arrays.asList("ABC", "321", "123", "456", "DEF", "214");

final List<String> results = new ArrayList<>();

final Operator<String,String> filter =
  Operator.filterWith((s) -> s.contains("1"));

list.forEach(possible ->
  filter.performInto(possible, (match) -> results.add(match)));

Assert.assertEquals(3, results.size());
Assert.assertEquals("321", results.get(0));
Assert.assertEquals("123", results.get(1));
Assert.assertEquals("214", results.get(2));
```

In this example (broken down into multiple steps for clarity) a `filter` is created that filters in all `String` instances that contain the substring `"1"`, and filters out all others. Then the `List<String>` of six elements is iterated over and the filter's `performInto()` operation is used. If the filter `Predicate<T>` of `s.contains("1")` is satisfied, the match is added to the `List<String>` of `results`, which then contains elements `"321"`, `"123"`, and `"214"`.

Next is an example of a mapper:

```java
final List<String> list = Arrays.asList("123", "456", "789");

final List<Integer> results = new ArrayList<>();

final Operator<String,Integer> mapper =
  Operator.mapWith((s) -> Integer.parseInt(s));

list.forEach(digits ->
  mapper.performInto(digits, (number) -> results.add(number)));

Assert.assertEquals(3, results.size());
Assert.assertEquals(123, (int) results.get(0));
Assert.assertEquals(456, (int) results.get(1));
Assert.assertEquals(789, (int) results.get(2));
```

In this example (broken down into multiple steps for clarity) a `mapper` is created that maps all `String` instances of digit characters to `Integer` numbers. The `List<String>` of three elements is iterated over and the mapper's `performInto()` operation is used. The new value is added to the `List<Integer>` of `results`, which then contains elements `123`, `456`, and `789`.

The following demonstrates how a mapper can use flat-map (but it's easier than this; see below.):

```java
final List<String> list1 = Arrays.asList("1", "2", "3");
final List<String> list2 = Arrays.asList("4", "5", "6");
final List<String> list3 = Arrays.asList("7", "8", "9");

final List<List<String>> lists = Arrays.asList(list1, list2, list3);

final List<Integer> results = new ArrayList<>();

final Function<List<List<String>>, List<Integer>> mapper =
        (los) -> los.stream()
          .flatMap(list -> list.stream().map(s -> Integer.parseInt(s)))
          .collect(Collectors.toList());

final Operator<List<List<String>>,List<Integer>> flatMapper =
  Operator.mapWith(mapper);

flatMapper.performInto(lists, (numbers) -> results.addAll(numbers));

Assert.assertEquals(9, results.size());
Assert.assertEquals(1, (int) results.get(0));
Assert.assertEquals(2, (int) results.get(1));
Assert.assertEquals(3, (int) results.get(2));
```

Note that there is a `List<List<String>> lists` that is a list of lists. The `mapper` is created that internally uses the Java `Stream::flatMap`. The `mapper` first streams over the `List<List<String>> lists`. It then uses `flatMap` to handle each of the single `List<String> list`. It then streams over each of those lists, and each individual `String` is transformed to an `Integer`.

Yet, you can perform this more easily using the `FlatMapper` operator. Here's a `FlatMapper` solution to a different problem:

```java
final Operator<Long, Long> flatMap = Operator.flatMapWith((record) -> new LongRangeSource(record, record + 2));
final List<Long> providedLongs = new CopyOnWriteArrayList<>();

flatMap.performInto(1L, providedLongs::add);
flatMap.performInto(3L, providedLongs::add);

Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L), providedLongs);
```

In this example, the flat-mapper is given a function that instantiates a `LongRangeSource`, which is a stream source that contains a `startInclusive` and an `endExclusive` value. In the first `performInto()` the `startInclusive` is `1` and the `endExclusive` of two greater, which is `3`; the first numeric range is thus `1` to `2`. The second range is `3` to `4`. The flat-mapper reads every value in each `LongRangeSource` and adds it to the collection referenced by `providedLongs`. When completed the two containers are flattened such that only the inner ranges are preserved in the `providedLongs`.

You can give a `StreamProcessor` an `Operator<T, R>` such as these, or use them in your `Source` and `Sink` implementations.

We will be adding more functional interfaces to our various types over future releases.

## Streaming Your Data

There are several ways to stream your data. You have already seen specific examples in the above content. In this section you are introduced to other ways.

### Streaming Persistent Data

Data that is persisted inside a XOOM Streams store can be streamed out to one or more subscribers. One of the most basic examples is provided by the `StateStore`.

```java
package io.vlingo.symbio.store.state;

public interface StateStoreReader {
  ...
  Completes<Stream> streamAllOf(final Class<?> stateType);
  Completes<Stream> streamSomeUsing(final QueryExpression query);
}
```

A `StateStoreReader` is provided as part of the `StateStore` protocol. Thus, you may ask a `StateStore` to stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a `stateType` is a Java `Class<?>` of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.

By requesting a stream of all of a given `stateType`, the `StateStoreReader` queries that data from the container and provides an instance of `io.vlingo.xoom.reactivestreams.Stream`.

```java
// somewhere many EquityState instances are written
final Equity equity = new Equity(...);
...
store.write(equity.id, equity.state, equity.version, interest);

...

final Completes<Stream> stream =
  store.streamAllOf(EquityState.class);

stream.andThen(all ->
  all.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
```

You may also constrain your query to a subset of the whole container. The query is storage-type dependent.

```java
final Completes<Stream> stream =
  store.streamSomeUsing(
    QueryExpression.using(
      EquityState.class,
      "select ... from tbl_equity where ..."));

stream.andThen(some ->
  some.flowInto(new ConsumerSink<>((equityState) -> reportOn(equityState));
```

You may also stream persisted `Source<?>` types, such as `DomainEvent` and `Command` instances. For example, the `Journal<T>` provides this streaming interface through the `JournalReader<T>`, which in turn is an`EntryReader<T>`.

```java
// Journal
package io.vlingo.symbio.store.journal;

public interface Journal<T> {
  ...
  <ET extends Entry<?>> Completes<JournalReader<ET>>
     journalReader(final String name);
}

// JournalReader
package io.vlingo.symbio.store.journal;

public interface JournalReader<T extends Entry<?>> extends EntryReader<T> { }

// EntryReader
package io.vlingo.symbio.store.journal;

public interface EntryReader<T extends Entry<?>> {
  ...
  Completes<Stream> streamAll();
}
```

These are used in the following example, first to request the `JournalReader`, then the `Stream` of all `EntryBundle` instances, and then to flow the stream into the `Sink<EntryBundle>` for reporting on each event.

```java
final Sink<EntryBundle> sink =
  Sink.consumeWith((bundle) -> reportOn(bundle.source));

journal
    .journalReader("events-reporter")
    .andThenTo(reader -> reader.streamAll())
    .andThen(stream -> stream.flowInto(sink, 50));
```

All stores—`ObjectStore`, `StateStore`, and `Journal`—support persisting a totally ordered stream of `Source<?>` entries, such as `DomainEvent` and `Command`. You may obtain any of the total streams using the same basic techniques.
