State Storage

Using the XOOM Symbio Key-Value and NoSQL storage.

The state storage type persists key-value pairs, offering the features of a NoSQL database. The key is a business id and the value is the serialized state of an entity. You may also transactionally append DomainEvent instances, and other Source types, such as Command, along with states.

Configuring and Starting the StateStore

Every StateStore is implemented as an Actor. Thus, you must use the World or one of its Stage instances to create the StateStore. Before doing this there are a few dependencies you must create, including Configuration. These get sent in as parameters to the StateStore constructor.

Every entity type must be registered. This is how the StateStore knows the table (or other persistence collection type) name to use for a given entity type.

// using the class simple name
final String productStoreName = Product.class.getSimpleName();

StateTypeStateStoreMap.stateTypeToStoreName(Product.class, productStoreName);

// using the scheme name
final String productStoreName =
    "tbl_xoom_symbio_state_" +
    Product.class.getSimpleName();

StateTypeStateStoreMap.stateTypeToStoreName(Product.class, productStoreName);

Further, each persistent entity and SourceDomainEvent or Command—should have a serialization and deserialization adapter.

import io.vlingo.xoom.symbio.EntryAdapterProvider;

EntryAdapterProvider provider = EntryAdapterProvider.instance(world);

// ProductDefinedAdapter is a io.vlingo.xoom.symbio.EntryAdapter
provider.registerAdapter(ProductDefined.class, new ProductDefinedAdapter());

// ProductStateAdapter is a io.vlingo.xoom.symbio.StateAdapter
provider.registerAdapter(ProductState.class, new ProductStateAdapter());

The following is the configuration used to create a new StateStore for a given database, which in this case is Postgres.

import io.vlingo.xoom.lattice.model.projection.ProjectionDispatcher;
import io.vlingo.xoom.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
import io.vlingo.xoom.lattice.model.projection.TextProjectionDispatcherActor;
import io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
import io.vlingo.xoom.symbio.store.common.jdbc.postgres.PostgresConfigurationProvider;
import io.vlingo.xoom.symbio.store.dispatch.Dispatcher;
import io.vlingo.xoom.symbio.store.state.jdbc.StorageDelegate;
import io.vlingo.xoom.symbio.store.state.jdbc.postgres.PostgresStorageDelegate;

World world = World.startWithDefaults("product-service");
...
List<ProjectToDescription> descriptions =
    Arrays.asList(new ProjectToDescription(UserProjectionActor.class, Events.class.getPackage()),
    ...);

Dispatcher dispatcher =
    world.actorFor(Dispatcher.class, TextProjectionDispatcherActor.class, descriptions);

Configuration configuration =
    PostgresConfigurationProvider.configuration(...);

StorageDelegate delegate =
    new PostgresStorageDelegate(configuration, world.defaultLogger());

StateStore stateStore =
    world.actorFor(StateStore.class, JDBCStateStoreActor.class,
       dispatcher, delegate);

This creates a new Actor-based StateStore that is ready to receive and process persistence messages.

An important point to consider is, if you use the XOOM Lattice entity typeStatefulEntity, there is no need to learn the operations of the StateStore. You get all storage persistence for free when you use the StatefulEntity abstract base types. Yet, you still must apply the above configuration steps.

Writing State and Source Instances

The following is the code that persists a given state and any sources (e.g. DomainEvent) to the StateStore.

stateStore.write(id, state, stateVersion, events, metadata, writeInterest);

The write() is used as follows.

import io.vlingo.xoom.symbio.store.state;
...
stateStore.write(productState.id, productState, 1, productState.events, productState.metadata, writeInterest, writeTracker);

Note that a ProductEntity actor should be designed with a ProductState that is the actual persistence state.

The WriteInterest will receive the write outcome of success or failure.

import io.vlingo.xoom.symbio.store.state;
...
public class ProductWriteInterest extends Actor implements WriteResultInterest {
  void writeResultedIn(final Outcome<StorageException,Result> outcome, final String id, final S state, final int stateVersion, final List<Source<C>> sources, final Object object) {
    final WriteTracker writeTracker = (WriteTracker) object;
        
    outcome
      .andThen(result -> {
        // success actions...

        return result;
      })
      .otherwise(cause -> {
        // failure actions...
        
        final String message = writeTracker.failureMessageFor(cause);

        logger().error(message, cause);

        throw new IllegalStateException(message, cause);
      });
  }
}

Reading State Instances

You may read a state from a StateStore as follows.

stateStore.read(id, entityType, readInterest, referenceObject);

The read() is used as follows.

import io.vlingo.xoom.symbio.store.state;
...
stateSore.read(productState.id, ProductState.class, 1, readInterest, ignoreNotFound);

The readInterest is the ReadResultInterest that receives the outcome of the read operation, with the referenceObject passed back to it. The ReadResultInterest works as follows.

import io.vlingo.xoom.symbio.store.state.StateStore.ReadResultInterest;

public class ProductReadInterest extends Actor implements ReadResultInterest {
  final public <ST> void readResultedIn(final Outcome<StorageException, Result> outcome, final String id, final ST state, final int stateVersion, final Metadata metadata, final Object object) {
    outcome
      .andThen(result -> {
        // success actions...
        
        return result;
      })
      .otherwise(cause -> {
        // failure actions...

        final boolean ignoreNotFound = (boolean) object;
        if (!ignoreNotFound) {
          final String message = "State not restored for: " + getClass() + "(" + id + ") because: " + cause.result + " with: " + cause.getMessage();
          logger().error(message, cause);
          throw new IllegalStateException(message, cause);
        }
        return cause.result;
      });
  }
}

Streaming Over Entry Instances of Source Types

Streaming is an extremely important part of reactive. For queries, it prevents dealing with all results at one time by enabling a message-driven model where you see one element at a time as it is pushed to your consumer. You may stream over all instances of ProductState as follows.

Stream<ProductState> stream = stateStore.streamAllOf(ProductState.class);

You may instead stream over a constrained subset of ProductState instances.

Stream<ProductState> stream = stateStore.streamSomeUsing(productStateQuery);

A QueryExpression may be defined as follows.

import io.vlingo.xoom.symbio.store;

Stream<ProductState> stream =
    stateStore.streamSomeUsing(
        QueryExpression.using(ProductState.class, "select ..."));

// or

Stream<ProductState> stream =
    stateStore.streamSomeUsing(
        QueryExpression.using(ProductState.class, "select ...", QueryMode.ReadOnly);

Creating StateStore Database Tables

There are several implementations of the StateStore. One primary type is for JDBC over relational databases. There are also implementations for Amazon DynamoDB and Apache Geode. The following provides set up and configuration for each of these.

Using Relational Databases with JDBC

Although the StateStore can auto-create all necessary tables, you may want to pre-create the necessary tables. There will be a unique table for every entity type stored. Note the {0} parameter in the following table naming scheme. You will replace this parameter with your entity type name, which may be the class simple name. This must include the prefix "tbl_". For example, a class named Product would have the table named tbl_product.

More formally, you may want to use the pattern tbl_xoom_symbio_state_{0}, the class simple name replacing the parameter. For the class named Product, your table would be named tbl_xoom_symbio_state_product. Using this scheme visually documents that your table is specifically used by the SYMBIO StateStore.This is the table creation script for Postgres.

CREATE TABLE {0} (
  s_id                      VARCHAR(128) PRIMARY KEY,
  s_type                    VARCHAR(256) NOT NULL,
  s_type_version            INT NOT NULL,
  s_data                    {1} NOT NULL,
  s_data_version            INT NOT NULL,
  s_metadata_value          TEXT NOT NULL,
  s_metadata_op             VARCHAR(128) NOT NULL
);

Note that the s_data column is created according to your preferred storage type, text or binary. The following table explains per database.

Database

Column Type

HSQLDB

The Text type is defaulted to LONGVARCHAR(65535).

HSQLDB

The Binary type is defaulted to VARBINARY(65535).

MariaDB

See MySQL.

MySQL

The Text type is defaulted to TEXT.

MySQL

The Binary type is defaulted to VARBINARY(4096).

Postgres

The Text type is defaulted to JSONB, making your entities' attributes searchable.

Postgres

The Binary type is defaulted to BYTEA.

YugaByte

See Postgres.

You indicate your preferences in the configuration.

// Configuration
package io.vlingo.xoom.symbio.store.common.jdbc.Configuration;

public class Configuration {
  ...
  public final DataFormat format;
  ...
}
// DataFormat
package io.vlingo.xoom.symbio.store;

public enum DataFormat {
  Binary,
  Text
};

The following format type and column type pair up: DataFormat.Binary and s_data column type BYTEA. Likewise the following two are used together:DataFormat.Text and s_data column type TEXT. You must consistently use the same types across all tables that use the same StateStore.

The following table must be created to support dispatching write events from the StateStore. This is how we ensure that Dispatchables are delivered via the Dispatcher.

CREATE TABLE tbl_xoom_symbio_dispatchables (
  d_id                         BIGSERIAL PRIMARY KEY,
  d_created_at                 TIMESTAMP NOT NULL,
  d_originator_id              VARCHAR(32) NOT NULL,
  d_dispatch_id                VARCHAR(128) NOT NULL,
  d_state_id                   VARCHAR(128) NOT NULL,
  d_state_type                 VARCHAR(256) NOT NULL,
  d_state_type_version         INT NOT NULL,
  d_state_data                 {1} NOT NULL,
  d_state_data_version         INT NOT NULL,
  d_state_metadata_value       TEXT NOT NULL,
  d_state_metadata_op          VARCHAR(128) NOT NULL,
  d_state_metadata_object      TEXT,
  d_state_metadata_object_type VARCHAR(256),
  d_entries                    TEXT
);

CREATE INDEX idx_dispatchables_dispatch_id
     ON tbl_xoom_symbio_dispatchables (d_dispatch_id);
 
CREATE INDEX idx_dispatchables_originator_id
     ON tbl_xoom_symbio_dispatchables (d_originator_id);

The same rules for the DataFormat and d_state_data column type apply for the tbl_xoom_symbio_dispatchables table. You must consistently use the same types across all tables that use the same StateStore.

The following is the table used to store DomainEvent instances and other Source types such as Command.

CREATE TABLE tbl_xoom_symbio_state_entry (
  e_id                         BIGSERIAL PRIMARY KEY,
  e_type                       VARCHAR(256) NOT NULL,
  e_type_version               INT NOT NULL,
  e_data                       {1} NOT NULL,
  e_metadata_value             VARCHAR(4000) NOT NULL,
  e_metadata_op                VARCHAR(128) NOT NULL
);

The same rules for the DataFormat and d_state_data column type apply for the tbl_xoom_symbio_state_entry table and its e_data column. You must consistently use the same types across all tables that use the same StateStore.

The following table supports totally ordered streaming reads of the Entry instances stored in the tbl_xoom_symbio_state_entry table. The readers implement the interfaceStateStoreEntryReader<T>, which extends EntryReader<T>.

CREATE TABLE tbl_xoom_symbio_state_entry_offsets (
  reader_name                   VARCHAR(128) PRIMARY KEY,
  reader_offset                 BIGINT NOT NULL
);

Each EntryReader<T> type has a name and a current offset, which are used to track the current position in the stream. The stream position/offset corresponds to a value of e_id in the tbl_xoom_symbio_state_entry table.

NoSQL Storage Engines

The Symbio StateStore support extends beyond relational databases storage engines. Two of the prominent NoSQL persistence mechanisms are DynamoDB and Apache Geode. The set up for these is handled by the StateStore implementation and when registering types, such as by using StateTypeStateStoreMap and EntryAdapterProvider. When using Lattice, these are managed by the StatefulTypeRegistry.

Last updated