Journal Storage

Using vlingo/symbio to store entries in journals that collectively define state.

A Journal keeps a record of all occurrences of important happenings. A Journal may be thought of in terms of Event Sourcing, because event instances may be journaled over the lifetime of a given Entity/Aggregate that capture the result of actions that were carried out on it. The captured set of ordered events of a given Entity/Aggregate together for its event stream. A single Journal may be used to hold the event streams of all Entity/Aggregate instances of a Bounded Context.

However, a Journal need not be limited to persisting only events. The vlingo/symbio defines an abstract base class known as Source. A Source is parameterized by a specific type. There are no concrete Source types defined in vlingo/symbio. In vlingo/lattice there are a few concrete Source types defined: Command, DomainEvent, and ProcessMessage.

A Command is the expression of the intention to execute an command action. Although there is an expressed intention to carry out the command, the choice is up to the command receiver to accept and execute it, or reject it. The command receiver is generally an Entity/Aggregate, but doesn't have to be; it could be a Domain Service or other kind of service. The Command type is a Source because commands may be persisted in a Journal. Persisting a Command is generally done to ensure that it is guaranteed to be offered as an intention to be carried out at some future time. Generally Process Managers (or Sagas) will be CommandSourced because they issue commands to be carried out in response to previous outcomes. These previous outcomes are generally received by the Process Managers as DomainEvent instances.

A DomainEvent is a record of a significant business occurrence within a domain model. A DomainEvent may not be rejected, in the sense that it is a captured fact. Generally if there will be an action carried out in response to the fact of a DomainEvent, it will be by means of translating the DomainEvent to a Command, and thus the corresponding Command may be rejected.

A component that is EventSourced and a component that is CommandSourced are really quite similar, but the semantics are inverted for each. An EventSourced component receives commands and emits events in response. A CommandSource component receives notification of event occurrences and emits commands in response. The receipt of a command by an EventSourced component need not be in the form of an object; it may be the receipt of a protocol message with parameters received by an actor asynchronously. The same applies for a CommandSource component, which may be informed of a previous event occurrence by sending it a protocol message with parameters that is received asynchronously by the implementing actor.

Since it is possible for a Process Manager (or Saga) to emit both Command instances and DomainEvent instances, the instances will be wrapped in a ProcessMessage. This enables a Process Manager to stream all of its Source instances in a generic way.

The point of these Source types is that they may be used to represent the state of a given component, either one that is EventSourced or one that is CommandSourced, or a Process that may be sourced by both commands and events. The state of any such component, such as an Entity/Aggregate is a stream of such Source types, ordered by the sequence in which they originally occurred.

You may think of a Journal as having the following logical columns, and which are actual persistence columns when using a relational database, for example.

ID

StreamName

StreamVersion

EntryData

EntryType

Metadata

A unique identity assigned to this source entry.

The unique identity of the component that owns this stream.

The index order of the occurrence of this entry in the stream.

The serialized source data.

The fully-qualified class name of this source.

Any data about the source data.

A Journal need not be implemented in a relational database. It may use a key-value store or another kind of storage. Even so, logically the above elements must be supported by the storage mechanism.

A Journal may maintain snapshots of any given sourced type instances as a performance optimization when they have accumulated large streams. If snapshots are used the Journal must maintain them and provide the means to merge the stream from a given version into the snapshot state.

To obtain a Journal, create an actor with the protocol and the implementation you will use to store the source streams of your Bounded Context.

Journal<String> journal =
stage.actorFor(
Journal.class,
PostgresJournalActor.class,
dispatcher,
configuration);

As a convenience the Journal provides a factory method for creating the instance.

Journal<T> using(
final Stage stage,
final Class<A> implementor,
final Dispatcher<Dispatchable<Entry<T>,RS>> dispatcher,
final Object...additional);

The PostgresJournalActor is the implementation of the Journal protocol for the Postgres database. The Dispatcher is used to accept newly appended Source instances, such as for various DomainEvent types, and relay them to consumers. The consumers may be projection processors that build and maintain CQRS query models, and that feed messaging topics and exchanges to publish the occurrences.

The following are the means to append Source instances to the Journal.

void append(final String streamName, final int streamVersion, final Source<S> source, final AppendResultInterest interest, final Object object);
void append(final String streamName, final int streamVersion, final Source<S> source, final Metadata metadata, final AppendResultInterest interest, final Object object);
void appendWith(final String streamName, final int streamVersion, final Source<S> source, final ST snapshot, final AppendResultInterest interest, final Object object);
void appendWith(final String streamName, final int streamVersion, final Source<S> source, final Metadata metadata, final ST snapshot, final AppendResultInterest interest, final Object object);
void appendAll(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final AppendResultInterest interest, final Object object);
void appendAll(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final Metadata metadata, final AppendResultInterest interest, final Object object);
void appendAllWith(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final ST snapshot, final AppendResultInterest interest, final Object object);
void appendAllWith(final String streamName, final int fromStreamVersion, final List<Source<S>> sources, final Metadata metadata, final ST snapshot, final AppendResultInterest interest, final Object object);

The difference between these message types is the number of Source instances that will be appended, either one or more than one, and whether or not a snapshot will be persisted, and whether or not Metadata is provided.

The AppendResultInterest is used to asynchronously communicate the result of the append to the sender. The result maybe be a success or failure, and will contain the data provided for the append operation, along with any object instance that is optionally sent.

Although the Dispatcher registered with the Journal is used to guarantee delivery of the original Source and corresponding persisted serialized Entry, clients may desire to read the Journal contents at any future time. To do so the client obtains a JournalReader.

Completes<JournalReader<ET>> journalReader(final String name);

The JournalReader is returned asynchronously by means of a Completes and is given the name provided as a parameter. If the same name is requested in the future of this actor's in-memory lifetime, the same JournalReader is returned.

The JournalReader provides the following protocol.

void close();
Completes<String> name();
Completes<T> readNext();
Completes<List<T>> readNext(final int maximumEntries);
void rewind();
Completes<String> seekTo(final String id);

All query messages that answer values do so asynchronously using Completes. The parameterless readNext() answers the single next available Entry in the Journal. The readNext(maximumEntries) answers the next available Entry instances up to the maximumEntries. The rewind() moves the read start location back to the beginning of the Journal. The seekTo() is used to seek to a given Entry position in the Journal, or to simply provide the current position.

ID

Description

id

Seek to the Entry that possesses the id corresponding to the given id (e.g. position or sequence).

Beginning

Seek to the beginning of the Journal, which is the same as using rewind(). Answers the current position following the operation.

End

Seek to the end of the Journal, which is the position past the current last Entry. Answers the current position following the operation.

Query

Do not seek in either direction. Only answers the current position.

To read individual Entry/Aggregate Source streams, use the StreamReader. You may obtain this also from the Journal.

Completes<StreamReader<T>> streamReader(final String name);

As with the JournalReader, the StreamReader is returned asynchronously by means of a Completes and is given the name provided as a parameter. If the same name is requested in the future of this actor's in-memory lifetime, the same StreamReader is returned.

The StreamReader works as follows.

Completes<Stream<T>> streamFor(final String streamName);
Completes<Stream<T>> streamFor(final String streamName, final int fromStreamVersion);

Both of the message implementations are obligated to optimize the Stream that is answered. Optimizing the stream means that if a snapshot is available it is read first, and only the Entry instances that follow the snapshot's version are read. This optimization applies to both streamFor() query messages.

The streamFor(streamName) will answer the full Stream of the Entity/Aggregate uniquely identified by streamName if a snapshot is unavailable. The streamFor(streamName) uses streamFor(streamName, 1). ThestreamFor(streamName, fromStreamVersion) answers the sub-portion of theStream uniquely identified by streamName starting at the fromStreamVersion until the stream's end. The Stream is defined as follows.

public class Stream<T> {
public final State<T> snapshot;
public final List<BaseEntry<T>> entries;
public final String streamName;
public final int streamVersion;
public boolean hasSnapshot();
public int size();
}

The snapshot, if any, and the entries, are all in serialized form; that is, the State<T> and BaseEntry<T> respectively. The T parameter indicates whether it is a text String based serialization or a binary serialization of byte[]. This depends on the concrete Journal implementation is use. To render the Entry instances and possible State to their native form, use the EntryAdapter and StateAdapter respectively.

Concrete Implementations

Using vlingo/symbio-jdbc

vlingo/symbio-jdbc comes with a high-performance implementation of an event journal based on a PostgreSQL backend. To start using the event journal you will need to add vlingo/symbio and vlingo/symbol-jdbc dependencies to your project, even using Maven or Gradle. Remember to get the latest version from Maven Central to be up to date and enjoy the latest features and improvements.

Maven
Gradle
<dependency>
<groupId>io.vlingo</groupId>
<artifactId>vlingo-symbio</artifactId>
<version>0.8.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.vlingo</groupId>
<artifactId>vlingo-symbio-jdbc</artifactId>
<version>0.8.7</version>
<scope>compile</scope>
</dependency>
dependencies {
compile 'io.vlingo:vlingo-symbio:0.8.2'
compile 'io.vlingo:vlingo-symbio-jdbc:0.8.2'
}

After setting up the project to use vlingo/symbio-jdbc the next step is to set up your PostgreSQL database.

Setting Up PostgreSQL

vlingo will need access to a PostgreSQL Schema and a set of tables an indexes to ensure the behavior of the application. However, it doesn't need to be dedicated only to vlingo, if you already have your application deployed and using your own schema, you can reuse it for vlingo.

If you don't have a PostgreSQL on your development machine, we suggest you to use Docker and docker-compose, so you can easily recreate a local development cluster. You can use this docker-compose.yaml as an example.

The event journal will not create and update the needed set of tables and indexes required to work. However, the script that will create those tables is as follows:

CREATE TABLE vlingo_symbio_journal (
id UUID PRIMARY KEY,
entry_timestamp BIGINT NOT NULL,
entry_data JSONB NOT NULL,
entry_metadata JSONB NOT NULL,
entry_type VARCHAR(256) NOT NULL,
entry_type_version INTEGER NOT NULL,
stream_name VARCHAR(128) NOT NULL,
stream_version INTEGER NOT NULL
);
CREATE INDEX ON vlingo_symbio_journal (stream_name, stream_version);
CREATE INDEX ON vlingo_symbio_journal (entry_timestamp);
CREATE TABLE vlingo_symbio_journal_snapshots (
stream_name VARCHAR(128) PRIMARY KEY,
snapshot_type VARCHAR(256) NOT NULL,
snapshot_type_version INTEGER NOT NULL,
snapshot_data JSONB NOT NULL,
snapshot_data_version INTEGER NOT NULL,
snapshot_metadata JSONB NOT NULL
);
CREATE TABLE vlingo_symbio_journal_offsets (
reader_name VARCHAR(128) PRIMARY KEY,
reader_offset BIGINT NOT NULL
);

We recommend to run this script using a SQL migration system like flyway instead of doing it manually, to ensure that the system is easy to reproduce. You can see an example of a event sourced application that uses flyway here.

Even if we acknowledge that this configuration works with good performance, you might need to change how indexes and tables are created and stored in your cluster depending in your current set up.

Opening the Event Journal

Before starting emitting events you need to open a connection to your database.

final Configuration configuration = new Configuration(
new NoopConfigurationInterest(), // You will need to create your own ConfigurationInterest
"org.postgresql.Driver",
StateStore.DataFormat.Text,
DB_URL, // Valid JDBC URL
DB_NAME, // Database name
DB_USER, // Database username
DB_PWD, // Database password
false, // Use a SSL Connection (true in production)
"", // Originator id (ignored for the event journal)
false // Create tables (ignored for the event journal)
);

With an open connection to your Postgres cluster, creating the EventJournal is as easy as instantiating a new PostgresEventJournalActor. There is an EventJournalListener with useful hooks during the lifecycle of the Event Journal, so you can implement yours or keep it empty (but it can't be null).

You can see examples of empty ConfigurationListener and EventJournalListener to use it on your own projects in case you don't need this information.

final World world = World.startWithDefaults("event-journal");
final EventJournal journal = world.actorFor(
EventJournal.class,
Definition.has(PostgresEventJournalActor.class, Definition.parameters(configuration, new NoopEventJournalListener()))
);

Appending Events to the Event Journal

The Postgres Event Journal at this moment only supports text events. Appending events is transactional and you only need a single method call:

journal.append((1) counterName, (2) version++, (3) textEvent, (4) resultInterest, (5) context);
  1. The name of the Event Stream where the event will be published. Usually it's the ID of the aggregate root that will publish this event.

  2. The version of the Event Stream, it should be incremental and starting from 1.

  3. A TextEvent that will contain the event information. You can see an example here.

  4. An interest that will be executed with the result of the operation.

  5. Context to be given to the interest. It can be any object.

Reading from an Event Journal