# Journal Storage

A `Journal` , like a ledger, keeps a record of all occurrences of important happenings in a domain model of a Bounded Context. A `Journal` may be thought of in terms of Event Sourcing, because event instances may be journaled over the lifetime of a given Entity/Aggregate that capture the result of actions that were carried out on it. The captured set of ordered events of a given Entity/Aggregate together form its event stream. A single `Journal` may be used to hold the event streams of all Entity/Aggregate instances of a Bounded Context.

However, a `Journal` need not be limited to persisting only events. The XOOM Symbio defines an abstract base class known as `Source`. A `Source` is parameterized by a specific type. There are no concrete `Source` types defined in XOOM Symbio. In XOOM Lattice there are a few concrete `Source` types defined: [`Command`](https://docs.vlingo.io/vlingo-lattice/entity-cqrs#sourced), [`DomainEvent`](https://docs.vlingo.io/vlingo-lattice/entity-cqrs#sourced), and [`Process`](https://docs.vlingo.io/vlingo-lattice/processes).

A `Command` is the expression of the intention to execute a command action. Although there is an expressed intention to carry out the command, the choice is up to the command receiver to accept and execute it, or to reject it. The command receiver is generally an Entity/Aggregate, but doesn't have to be; it could be a Domain Service or other kind of service. The `Command` type is a `Source`, an thus commands may be persisted in a `Journal`. Persisting a `Command` is generally done to ensure that it is guaranteed to be offered as an intention to be carried out at some future time. Generally Process Managers (or Sagas) will be `CommandSourced` because they issue commands to be carried out in response to previous outcomes. These previous outcomes are generally received by the Process Managers as `DomainEvent` instances.

A `DomainEvent` is a record of a significant business occurrence within a domain model. A `DomainEvent` may not be rejected, in the sense that it is a captured fact. However, it may be ignored by all unconcerned parties. Generally if there will be an action carried out in response to the fact of a `DomainEvent`, it will be by means of translating the `DomainEvent` to a `Command`, and thus the corresponding `Command` may be rejected.

A component that is `EventSourced` and a component that is `CommandSourced` are really quite similar, but the semantics are inverted for each. An `EventSourced` component receives commands and emits events in response. A `CommandSourced` component receives notification of event occurrences and emits commands in response. The receipt of a command by an `EventSourced` component need not be in the form of an object; it may be the receipt of a protocol message with parameters received by an actor asynchronously. The same applies for a `CommandSourced` component, which may be informed of a previous event occurrence by sending it a protocol message with parameters that is received asynchronously by the implementing actor.

Since it is possible for a Process Manager (or Saga) to emit both `Command` instances and `DomainEvent` instances, the instances will be wrapped in a `ProcessMessage`. This enables a Process Manager to stream all of its `Source` instances in a generic way.

The point of these `Source` types is that they may be used to represent the state of a given component, either one that is `EventSourced` or one that is `CommandSourced`, or a `ProcessMessage` that may be sourced by both commands and events. The state of any such component, such as an Entity/Aggregate is a stream of such `Source` types, ordered by the sequence in which they originally occurred.

You may think of a `Journal` as having the following logical columns, and which are actual persistence columns when using a relational database, for example.

| ID                                               | StreamName                                                  | StreamVersion                                                  | EntryData                   | EntryType                                      | Metadata                        |
| ------------------------------------------------ | ----------------------------------------------------------- | -------------------------------------------------------------- | --------------------------- | ---------------------------------------------- | ------------------------------- |
| A unique identity assigned to this source entry. | The unique identity of the component that owns this stream. | The index order of the occurrence of this entry in the stream. | The serialized source data. | The fully-qualified class name of this source. | Any data about the source data. |

A `Journal` need not be implemented in a relational database. It may use a key-value store or another kind of storage. Even so, logically the above elements must be supported by the storage mechanism.

A `Journal` may maintain snapshots of any given sourced type instances as a performance optimization when they have accumulated large streams. If snapshots are used the Journal must maintain them and provide the means to merge the stream from a given version into the snapshot state.

{% hint style="info" %}
An important point to consider is, if you use the XOOM Lattice entity type `EventSourced`, `CommandSourced`, or `ProcessMessage`, there is no need to learn the operations of the `Journal`. You get all storage persistence for free when you use one of the sourced entity abstract base types.
{% endhint %}

To obtain a `Journal`, create an actor with the protocol and the implementation you will use to store the source streams of your Bounded Context.

```java
Journal<String> journal =
  stage.actorFor(
    Journal.class,
    PostgresJournalActor.class,
    dispatcher,
    configuration);
```

As a convenience the Journal provides a factory method for creating the instance.

```java
Journal<T> using(
      final Stage stage,
      final Class<A> implementor,
      final Dispatcher<Dispatchable<Entry<T>,RS>> dispatcher,
      final Object...additional);
```

The `PostgresJournalActor` is the implementation of the Journal protocol for the Postgres database. The `Dispatcher` is used to accept newly appended `Source` instances, such as for various `DomainEvent` types, and relay them to consumers. The consumers may be projection processors that build and maintain CQRS query models, and that feed messaging topics and exchanges to publish the occurrences.

The following are the means to append `Source` instances to the `Journal`.

```java
void append(final String streamName, final int streamVersion, final Source<S> source, final AppendResultInterest interest, final Object object);

void append(final String streamName, final int streamVersion, final Source<S> source, final Metadata metadata, final AppendResultInterest interest, final Object object);

void appendWith(final String streamName, final int streamVersion, final Source<S> source, final ST snapshot, final AppendResultInterest interest, final Object object);

void appendWith(final String streamName, final int streamVersion, final Source<S> source, final Metadata metadata, final ST snapshot, final AppendResultInterest interest, final Object object);

void appendAll(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final AppendResultInterest interest, final Object object);

void appendAll(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final Metadata metadata, final AppendResultInterest interest, final Object object);

void appendAllWith(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final ST snapshot, final AppendResultInterest interest, final Object object);

void appendAllWith(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final Metadata metadata, final ST snapshot, final AppendResultInterest interest, final Object object);
```

The difference between these message types is the number of `Source` instances that will be appended, either one or more than one, and whether or not a snapshot will be persisted, and whether or not `Metadata` is provided.

The `AppendResultInterest` is used to asynchronously communicate the result of the append to the sender. The result maybe be a success or failure, and will contain the data provided for the append operation, along with any `Object` instance that is optionally sent.

Although the `Dispatcher` registered with the `Journal` is used to guarantee delivery of the original `Source` and corresponding persisted serialized `Entry`, clients may desire to read the `Journal` contents at any future time. To do so the client obtains a `JournalReader`.

```java
Completes<JournalReader<ET>> journalReader(final String name);
```

The `JournalReader` is returned asynchronously by means of a `Completes` and is given the `name` provided as a parameter. If the same `name` is requested in the future of this actor's in-memory lifetime, the same `JournalReader` is returned.

The `JournalReader` provides the following protocol.

```java
void close();
Completes<String> name();
Completes<T> readNext();
Completes<List<T>> readNext(final int maximumEntries);
void rewind();
Completes<String> seekTo(final String id);
```

All query messages that answer values do so asynchronously using `Completes`. The parameterless `readNext()` answers the single next available `Entry` in the `Journal`. The `readNext(maximumEntries)` answers the next available `Entry` instances up to the `maximumEntries`. The `rewind()` moves the read start location back to the beginning of the `Journal`. The `seekTo()` is used to seek to a given `Entry` position in the `Journal`, or to simply provide the current position.

| ID          | Description                                                                                                                                  |
| ----------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
| `id`        | Seek to the `Entry` that possesses the `id` corresponding to the given `id` (e.g. position or sequence).                                     |
| `Beginning` | Seek to the beginning of the `Journal`, which is the same as using `rewind()`. Answers the current position following the operation.         |
| `End`       | Seek to the end of the `Journal`, which is the position past the current last `Entry`. Answers the current position following the operation. |
| `Query`     | Do not seek in either direction. Only answers the current position.                                                                          |

To read individual Entry/Aggregate `Source` streams, use the `StreamReader`. You may obtain this also from the `Journal`.

```java
Completes<StreamReader<T>> streamReader(final String name);
```

As with the `JournalReader`, the `StreamReader` is returned asynchronously by means of a `Completes` and is given the `name` provided as a parameter. If the same `name` is requested in the future of this actor's in-memory lifetime, the same `StreamReader` is returned.

The `StreamReader` works as follows.

```java
Completes<Stream<T>> streamFor(final String streamName);
Completes<Stream<T>> streamFor(final String streamName, final int fromStreamVersion);
```

Both of the message implementations are obligated to optimize the `Stream` that is answered. Optimizing the stream means that if a snapshot is available it is read first, and only the `Entry` instances that follow the snapshot's version are read. This optimization applies to both `streamFor()` query messages.

The `streamFor(streamName)` will answer the full `Stream` of the Entity/Aggregate uniquely identified by `streamName` if a snapshot is unavailable. The `streamFor(streamName)` uses `streamFor(streamName, 1)`. The`streamFor(streamName, fromStreamVersion)` answers the sub-portion of the`Stream` uniquely identified by `streamName` starting at the `fromStreamVersion` until the stream's end. The `Stream` is defined as follows.

```java
public class Stream<T> {
  public final State<T> snapshot;
  public final List<BaseEntry<T>> entries;
  public final String streamName;
  public final int streamVersion;

  public boolean hasSnapshot();
  public int size();
}
```

The `snapshot`, if any, and the `entries`, are all in serialized form; that is, the `State<T>` and `BaseEntry<T>` respectively. The `T` parameter indicates whether it is a text `String` based serialization or a binary serialization of `byte[]`. This depends on the concrete `Journal` implementation used. To render the `Entry` instances and possible `State` to their native form, use the `EntryAdapter` and `StateAdapter` respectively.

## Concrete Implementations

The following provides information on using various `Journal` implementations. We provide specific guidance on using PostgreSQL, but this is nearly the same across other databases when using JDBC .

### Using XOOM Symbio JDBC

XOOM Symbio JDBC comes with a high-performance implementation of an event journal based on a PostgreSQL backend. To start using the event journal you will need to add XOOM Symbio dependencies to your project, using either Maven or Gradle. Remember to get the latest version from Maven Central to be up to date and enjoy the latest features and improvements.

{% tabs %}
{% tab title="Maven" %}

```markup
<dependency>
  <groupId>io.vlingo.xoom</groupId>
  <artifactId>xoom-symbio</artifactId>
  <version>1.7.0</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>io.vlingo.xoom</groupId>
  <artifactId>xoom-symbio-jdbc</artifactId>
  <version>1.7.0</version>
  <scope>compile</scope>
</dependency>

```

{% endtab %}

{% tab title="Gradle" %}

```groovy
dependencies {
    compile 'io.vlingo:vlingo-symbio:1.3.0'
    compile 'io.vlingo:vlingo-symbio-jdbc:1.3.0'
}
```

{% endtab %}
{% endtabs %}

After setting up the project to use XOOM Symbio the next step is to set up your PostgreSQL database.

### Setting Up PostgreSQL

XOOM Symbio will need access to a PostgreSQL Schema and a set of tables and indexes to ensure the behavior of the application. However, it doesn't need to be dedicated only to XOOM Symbio. If you already have your application deployed and using a pre-existing schema, you can reuse for XOOM Symbio JDBC.

{% hint style="info" %}
If you don't have a PostgreSQL on your development machine, we suggest you to use Docker and `docker-compose`, so you can easily recreate a local development cluster. [You can use this `docker-compose.yaml` as an example.](https://github.com/vlingo/vlingo-symbio-jdbc/blob/master/docker-compose.yaml)
{% endhint %}

The event journal will not create and update the needed set of tables and indexes required to work. You may use the following script that will create the necessary tables.

```sql
CREATE TABLE tbl_xoom_symbio_journal (
  e_id                      BIGSERIAL PRIMARY KEY,
  e_stream_name             VARCHAR(512) NOT NULL,
  e_stream_version          INTEGER      NOT NULL,
  e_entry_data              TEXT         NOT NULL,
  e_entry_type              VARCHAR(512) NOT NULL,
  e_entry_type_version      INTEGER      NOT NULL,
  e_entry_metadata          TEXT         NOT NULL
);

CREATE INDEX tbl_xoom_symbio_journal_idx ON tbl_xoom_symbio_journal (e_stream_name, e_stream_version);

CREATE TABLE tbl_xoom_symbio_journal_snapshots (
  s_stream_name             VARCHAR(512) PRIMARY KEY,
  s_stream_version          INTEGER      NOT NULL,
  s_snapshot_data           TEXT         NOT NULL,
  s_snapshot_data_version   INTEGER      NOT NULL,
  s_snapshot_type           VARCHAR(512) NOT NULL,
  s_snapshot_type_version   INTEGER      NOT NULL,
  s_snapshot_metadata       TEXT         NOT NULL
);

CREATE TABLE tbl_xoom_symbio_journal_dispatchables (
  d_dispatch_id             VARCHAR(512) PRIMARY KEY,
  d_originator_id           VARCHAR(512) NOT NULL,
  d_created_on              BIGINT       NOT NULL,
  d_state_id                VARCHAR(512) NULL,
  d_state_data              TEXT         NULL,
  d_state_data_version      INT          NULL,
  d_state_type              VARCHAR(512) NULL,
  d_state_type_version      INTEGER      NULL,
  d_state_metadata          TEXT         NULL,
  d_entries                 TEXT         NOT NULL
);

CREATE TABLE tbl_xoom_symbio_journal_offsets (
  o_reader_name             VARCHAR(128) PRIMARY KEY,
  o_reader_offset           BIGINT       NOT NULL
);
```

{% hint style="info" %}
We recommend that you run this script using a SQL migration system like [**Flyway**](https://flywaydb.org/) instead of doing it manually. Doing so will ensure that the system is easy to reproduce. You can see an example of an Event Sourced application that uses **Flyway** [here](https://github.com/vlingo/vlingo-examples/blob/fa8d6736cb9215f6db4e2443bf7ac63f978c3c14/vlingo-eventjournal/src/main/java/io/vlingo/eventjournal/Bootstrap.java#L29).
{% endhint %}

Even if we acknowledge that this configuration works with good performance, you might need to change how indexes and tables are created and stored in your cluster depending on your current set up.

### Opening the Event Journal

Before starting to emit events you need to open a connection to your database.

```java
final Configuration configuration = new Configuration(
      new NoopConfigurationInterest(), // You will need to create your own ConfigurationInterest
      "org.postgresql.Driver",
      StateStore.DataFormat.Text,
      DB_URL,   // Valid JDBC URL
      DB_NAME,  // Database name
      DB_USER,  // Database username
      DB_PWD,   // Database password
      false,    // Use a SSL Connection (true in production)
      "",       // Originator id (ignored for the event journal)
      false     // Create tables (ignored for the event journal)
  );
```

With an open connection to your Postgres cluster, creating the Journal is as easy as creating a new actor that implements the `Journal` protocol. For JDBC-compatible databases you use`JDBCJournalActor`**.** There is a `JournalListener` with useful hooks during the lifecycle of the Event Journal, so you can implement yours or keep it empty (but it can't be null).

```java
final World world = World.startWithDefaults("event-sourced-app");
final Journal journal = world.actorFor(
        Journal.class,
        Definition.has(JDBCJournalActor.class, Definition.parameters(configuration, new NoOpDispatcher()))
);
```

### Appending Events to the Event Journal

The Postgres Journal at this moment only supports text events. Appending events is transactional and you only need a single method call:

```java
journal.append((1) counterName, (2) version++, (3) textEvent, (4) resultInterest, (5) context);
```

1. The name of the Event Stream where the event will be published. Usually it's the ID of the aggregate root that will publish this event.
2. The version of the Event Stream, it should be incremental and starting from 1.
3. A TextEvent that will contain the event information.
4. An interest that will be executed with the result of the operation.
5. Context to be given to the interest. It can be any object.

You can see an example [here](https://github.com/vlingo/vlingo-examples/blob/master/vlingo-eventjournal/src/main/java/io/vlingo/eventjournal/counter/CounterActor.java).

### Choosing a Journal

There are several options for the backing storage engines and the options will continue to grow over time. Of the various RDBM databases it may be more a matter of which database is supported in your enterprise. It may be a performance concern, and perhaps you have concluded that PostgreSQL is more performant than MySQL or another.

We will be adding support for NoSQL databases. Stay in touch for announcements.
