State Storage
Using the XOOM Symbio Key-Value and NoSQL storage.
The state storage type persists key-value pairs, offering the features of a NoSQL database. The
key
is a business id
and the value
is the serialized state of an entity. You may also transactionally append DomainEvent
instances, and other Source
types, such as Command
, along with states.Every
StateStore
is implemented as an Actor
. Thus, you must use the World
or one of its Stage
instances to create the StateStore
. Before doing this there are a few dependencies you must create, including Configuration
. These get sent in as parameters to the StateStore
constructor.Every entity type must be registered. This is how the
StateStore
knows the table (or other persistence collection type) name to use for a given entity type.// using the class simple name
final String productStoreName = Product.class.getSimpleName();
StateTypeStateStoreMap.stateTypeToStoreName(Product.class, productStoreName);
// using the scheme name
final String productStoreName =
"tbl_xoom_symbio_state_" +
Product.class.getSimpleName();
StateTypeStateStoreMap.stateTypeToStoreName(Product.class, productStoreName);
Further, each persistent entity and
Source
—DomainEvent
or Command
—should have a serialization and deserialization adapter.import io.vlingo.xoom.symbio.EntryAdapterProvider;
EntryAdapterProvider provider = EntryAdapterProvider.instance(world);
// ProductDefinedAdapter is a io.vlingo.xoom.symbio.EntryAdapter
provider.registerAdapter(ProductDefined.class, new ProductDefinedAdapter());
// ProductStateAdapter is a io.vlingo.xoom.symbio.StateAdapter
provider.registerAdapter(ProductState.class, new ProductStateAdapter());
The following is the configuration used to create a new
StateStore
for a given database, which in this case is Postgres.import io.vlingo.xoom.lattice.model.projection.ProjectionDispatcher;
import io.vlingo.xoom.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
import io.vlingo.xoom.lattice.model.projection.TextProjectionDispatcherActor;
import io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
import io.vlingo.xoom.symbio.store.common.jdbc.postgres.PostgresConfigurationProvider;
import io.vlingo.xoom.symbio.store.dispatch.Dispatcher;
import io.vlingo.xoom.symbio.store.state.jdbc.StorageDelegate;
import io.vlingo.xoom.symbio.store.state.jdbc.postgres.PostgresStorageDelegate;
World world = World.startWithDefaults("product-service");
...
List<ProjectToDescription> descriptions =
Arrays.asList(new ProjectToDescription(UserProjectionActor.class, Events.class.getPackage()),
...);
Dispatcher dispatcher =
world.actorFor(Dispatcher.class, TextProjectionDispatcherActor.class, descriptions);
Configuration configuration =
PostgresConfigurationProvider.configuration(...);
StorageDelegate delegate =
new PostgresStorageDelegate(configuration, world.defaultLogger());
StateStore stateStore =
world.actorFor(StateStore.class, JDBCStateStoreActor.class,
dispatcher, delegate);
This creates a new
Actor
-based StateStore
that is ready to receive and process persistence messages.An important point to consider is, if you use the XOOM Lattice entity type
StatefulEntity
, there is no need to learn the operations of the StateStore
. You get all storage persistence for free when you use the StatefulEntity
abstract base types. Yet, you still must apply the above configuration steps.The following is the code that persists a given state and any sources (e.g.
DomainEvent
) to the StateStore
.stateStore.write(id, state, stateVersion, events, metadata, writeInterest);
The
write()
is used as follows.import io.vlingo.xoom.symbio.store.state;
...
stateStore.write(productState.id, productState, 1, productState.events, productState.metadata, writeInterest, writeTracker);
Note that a
ProductEntity
actor should be designed with a ProductState
that is the actual persistence state.The
WriteInterest
will receive the write outcome of success or failure.import io.vlingo.xoom.symbio.store.state;
...
public class ProductWriteInterest extends Actor implements WriteResultInterest {
void writeResultedIn(final Outcome<StorageException,Result> outcome, final String id, final S state, final int stateVersion, final List<Source<C>> sources, final Object object) {
final WriteTracker writeTracker = (WriteTracker) object;
outcome
.andThen(result -> {
// success actions...
return result;
})
.otherwise(cause -> {
// failure actions...
final String message = writeTracker.failureMessageFor(cause);
logger().error(message, cause);
throw new IllegalStateException(message, cause);
});
}
}
You may read a state from a
StateStore
as follows.stateStore.read(id, entityType, readInterest, referenceObject);
The
read()
is used as follows.import io.vlingo.xoom.symbio.store.state;
...
stateSore.read(productState.id, ProductState.class, 1, readInterest, ignoreNotFound);
The
readInterest
is the ReadResultInterest
that receives the outcome of the read operation, with the referenceObject
passed back to it. The ReadResultInterest
works as follows.import io.vlingo.xoom.symbio.store.state.StateStore.ReadResultInterest;
public class ProductReadInterest extends Actor implements ReadResultInterest {
final public <ST> void readResultedIn(final Outcome<StorageException, Result> outcome, final String id, final ST state, final int stateVersion, final Metadata metadata, final Object object) {
outcome
.andThen(result -> {
// success actions...
return result;
})
.otherwise(cause -> {
// failure actions...
final boolean ignoreNotFound = (boolean) object;
if (!ignoreNotFound) {
final String message = "State not restored for: " + getClass() + "(" + id + ") because: " + cause.result + " with: " + cause.getMessage();
logger().error(message, cause);
throw new IllegalStateException(message, cause);
}
return cause.result;
});
}
}
Streaming is an extremely important part of reactive. For queries, it prevents dealing with all results at one time by enabling a message-driven model where you see one element at a time as it is pushed to your consumer. You may stream over all instances of
ProductState
as follows.Stream<ProductState> stream = stateStore.streamAllOf(ProductState.class);
You may instead stream over a constrained subset of
ProductState
instances.Stream<ProductState> stream = stateStore.streamSomeUsing(productStateQuery);
A
QueryExpression
may be defined as follows.import io.vlingo.xoom.symbio.store;
Stream<ProductState> stream =
stateStore.streamSomeUsing(
QueryExpression.using(ProductState.class, "select ..."));
// or
Stream<ProductState> stream =
stateStore.streamSomeUsing(
QueryExpression.using(ProductState.class, "select ...", QueryMode.ReadOnly);
There are several implementations of the
StateStore
. One primary type is for JDBC over relational databases. There are also implementations for Amazon DynamoDB and Apache Geode. The following provides set up and configuration for each of these.Although the
StateStore
can auto-create all necessary tables, you may want to pre-create the necessary tables. There will be a unique table for every entity type stored. Note the {0}
parameter in the following table naming scheme. You will replace this parameter with your entity type name, which may be the class simple name. This must include the prefix "tbl_"
. For example, a class named Product
would have the table named tbl_product
. More formally, you may want to use the pattern
tbl_xoom_symbio_state_{0}
, the class simple name replacing the parameter. For the class named Product, your table would be named tbl_xoom_symbio_state_product
. Using this scheme visually documents that your table is specifically used by the SYMBIO StateStore
.This is the table creation script for Postgres.CREATE TABLE {0} (
s_id VARCHAR(128) PRIMARY KEY,
s_type VARCHAR(256) NOT NULL,
s_type_version INT NOT NULL,
s_data {1} NOT NULL,
s_data_version INT NOT NULL,
s_metadata_value TEXT NOT NULL,
s_metadata_op VARCHAR(128) NOT NULL
);
Note that the
s_data
column is created according to your preferred storage type, text or binary. The following table explains per database.Database | Column Type |
HSQLDB | The Text type is defaulted to LONGVARCHAR(65535) . |
HSQLDB | The Binary type is defaulted to VARBINARY(65535) . |
MariaDB | See MySQL. |
MySQL | The Text type is defaulted to TEXT . |
MySQL | The Binary type is defaulted to VARBINARY(4096) . |
Postgres | The Text type is defaulted to JSONB , making your entities' attributes searchable. |
Postgres | The Binary type is defaulted to BYTEA . |
YugaByte | See Postgres. |
You indicate your preferences in the configuration.
// Configuration
package io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
public class Configuration {
...
public final DataFormat format;
...
}
// DataFormat
package io.vlingo.xoom.symbio.store;
public enum DataFormat {
Binary,
Text
};
The following format type and column type pair up:
DataFormat.Binary
and s_data
column type BYTEA
. Likewise the following two are used together:DataFormat.Text
and s_data
column type TEXT
. You must consistently use the same types across all tables that use the same StateStore
.The following table must be created to support dispatching write events from the
StateStore
. This is how we ensure that Dispatchables
are delivered via the Dispatcher
.CREATE TABLE tbl_xoom_symbio_dispatchables (
d_id BIGSERIAL PRIMARY KEY,
d_created_at TIMESTAMP NOT NULL,
d_originator_id VARCHAR(32) NOT NULL,
d_dispatch_id VARCHAR(128) NOT NULL,
d_state_id VARCHAR(128) NOT NULL,
d_state_type VARCHAR(256) NOT NULL,
d_state_type_version INT NOT NULL,
d_state_data {1} NOT NULL,
d_state_data_version INT NOT NULL,
d_state_metadata_value TEXT NOT NULL,
d_state_metadata_op VARCHAR(128) NOT NULL,
d_state_metadata_object TEXT,
d_state_metadata_object_type VARCHAR(256),
d_entries TEXT
);
CREATE INDEX idx_dispatchables_dispatch_id
ON tbl_xoom_symbio_dispatchables (d_dispatch_id);
CREATE INDEX idx_dispatchables_originator_id
ON tbl_xoom_symbio_dispatchables (d_originator_id);
The same rules for the
DataFormat
and d_state_data
column type apply for the tbl_xoom_symbio_dispatchables
table. You must consistently use the same types across all tables that use the same StateStore
.The following is the table used to store
DomainEvent
instances and other Source
types such as Command
.CREATE TABLE tbl_xoom_symbio_state_entry (
e_id BIGSERIAL PRIMARY KEY,
e_type VARCHAR(256) NOT NULL,
e_type_version INT NOT NULL,
e_data {1} NOT NULL,
e_metadata_value VARCHAR(4000) NOT NULL,
e_metadata_op VARCHAR(128) NOT NULL
);
The same rules for the
DataFormat
and d_state_data
column type apply for the tbl_xoom_symbio_state_entry
table and its e_data
column. You must consistently use the same types across all tables that use the same StateStore
.The following table supports totally ordered streaming reads of the
Entry
instances stored in the tbl_xoom_symbio_state_entry
table. The readers implement the interfaceStateStoreEntryReader<T>
, which extends EntryReader<T>
.CREATE TABLE tbl_xoom_symbio_state_entry_offsets (
reader_name VARCHAR(128) PRIMARY KEY,
reader_offset BIGINT NOT NULL
);
Each
EntryReader<T>
type has a name
and a current offset
, which are used to track the current position in the stream. The stream position/offset corresponds to a value of e_id
in the tbl_xoom_symbio_state_entry
table.The Symbio
StateStore
support extends beyond relational databases storage engines. Two of the prominent NoSQL persistence mechanisms are DynamoDB and Apache Geode. The set up for these is handled by the StateStore
implementation and when registering types, such as by using StateTypeStateStoreMap
and EntryAdapterProvider
. When using Lattice, these are managed by the StatefulTypeRegistry
.Last modified 2yr ago