Developing Event-Sourced Models

Last updated 4 months ago

Reactive Event Sourcing with vlingo/symbio (in progress)

What is Event Sourcing?

Using vlingo/symbio-jdbc

vlingo/symbio-jdbc comes with a high-performance implementation of an event journal based on a PostgreSQL backend. To start using the event journal you will need to add vlingo/symbio and vlingo/symbol-jdbc dependencies to your project, even using Maven or Gradle. Remember to get the latest version from Maven Central to be up to date and enjoy the latest features and improvements.

Maven
Gradle
<dependency>
<groupId>io.vlingo</groupId>
<artifactId>vlingo-symbio</artifactId>
<version>0.7.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.vlingo</groupId>
<artifactId>vlingo-symbio-jdbc</artifactId>
<version>0.7.3</version>
<scope>compile</scope>
</dependency>
dependencies {
compile 'io.vlingo:vlingo-symbio:0.7.3'
compile 'io.vlingo:vlingo-symbio-jdbc:0.7.3'
}

After setting up the project to use vlingo/symbio-jdbc the next step is to set up your PostgreSQL database.

Setting Up PostgreSQL

vlingo will need access to a PostgreSQL Schema and a set of tables an indexes to ensure the behavior of the application. However, it doesn't need to be dedicated only to vlingo, if you already have your application deployed and using your own schema, you can reuse it for vlingo.

If you don't have a PostgreSQL on your development machine, we suggest you to use Docker and docker-compose, so you can easily recreate a local development cluster. You can use this docker-compose.yaml as an example.

The event journal will not create and update the needed set of tables and indexes required to work. However, the script that will create those tables is as follows:

CREATE TABLE vlingo_event_journal (
id UUID PRIMARY KEY,
event_timestamp BIGINT NOT NULL,
event_data JSONB NOT NULL,
event_metadata JSONB NOT NULL,
event_type VARCHAR(256) NOT NULL,
event_type_version INTEGER NOT NULL,
stream_name VARCHAR(128) NOT NULL,
stream_version INTEGER NOT NULL
);
CREATE INDEX ON vlingo_event_journal (stream_name, stream_version);
CREATE INDEX ON vlingo_event_journal (event_timestamp);
CREATE TABLE vlingo_event_journal_snapshots (
stream_name VARCHAR(128) PRIMARY KEY,
snapshot_type VARCHAR(256) NOT NULL,
snapshot_type_version INTEGER NOT NULL,
snapshot_data JSONB NOT NULL,
snapshot_data_version INTEGER NOT NULL,
snapshot_metadata JSONB NOT NULL
);
CREATE TABLE vlingo_event_journal_offsets (
reader_name VARCHAR(128) PRIMARY KEY,
reader_offset BIGINT NOT NULL
);

We recommend to run this script using a SQL migration system like flyway instead of doing it manually, to ensure that the system is easy to reproduce. You can see an example of a event sourced application that uses flyway here.

Even if we acknowledge that this configuration works with good performance, you might need to change how indexes and tables are created and stored in your cluster depending in your current set up.

Opening the Event Journal

Before starting emitting events you need to open a connection to your database.

final Configuration configuration = new Configuration(
new NoopConfigurationInterest(), // You will need to create your own ConfigurationInterest
"org.postgresql.Driver",
StateStore.DataFormat.Text,
DB_URL, // Valid JDBC URL
DB_NAME, // Database name
DB_USER, // Database username
DB_PWD, // Database password
false, // Use a SSL Connection (true in production)
"", // Originator id (ignored for the event journal)
false // Create tables (ignored for the event journal)
);

With an open connection to your Postgres cluster, creating the EventJournal is as easy as instantiating a new PostgresEventJournalActor. There is an EventJournalListener with useful hooks during the lifecycle of the Event Journal, so you can implement yours or keep it empty (but it can't be null).

You can see examples of empty ConfigurationListener and EventJournalListener to use it on your own projects in case you don't need this information.

final World world = World.startWithDefaults("event-journal");
final EventJournal journal = world.actorFor(
Definition.has(PostgresEventJournalActor.class, Definition.parameters(configuration, new NoopEventJournalListener())),
EventJournal.class
);

Appending Events to the Event Journal

The Postgres Event Journal at this moment only supports text events. Appending events is transactional and you only need a single method call:

journal.append((1) counterName, (2) version++, (3) textEvent, (4) resultInterest, (5) context);
  1. The name of the Event Stream where the event will be published. Usually it's the ID of the aggregate root that will publish this event.

  2. The version of the Event Stream, it should be incremental and starting from 1.

  3. A TextEvent that will contain the event information. You can see an example here.

  4. An interest that will be executed with the result of the operation.

  5. Context to be given to the interest. It can be any object.

Reading from an Event Journal