Object, Stateful, and Sourced Entities With CQRS

Understanding reactive Object, Stateful, Sourced, and CQRS actor-based persistence.

Command-Query Responsibility Segregation uses two models per Bounded Context (application or service). These are the Command Model and the Query Model, otherwise known as the Read Model and the Write Model, respectively.

The Command Model is often referred to as the "Write Model" and the Query Model as the "Read Model." These AKAs are quite inaccurate because you actually read from and write to both models. We prefer the names Command Model and Query Model because these names adhere to the pattern name, CQRS, and express the purpose of each of the two models.

The XOOM Lattice tools support three styles of persistence, including object, flat state, and sourced. The object style of persistence is more commonly known as Object-Relational Mapping (ORM), although a very low-impact mapping approach is also provided. The flat state style uses a Key-Value persistence, which is often suitable for both the CQRS Command Model and Query Model. The sourced style provides the persistence of a series of facts, known as Domain Events, that when applied or folded into a single object, combine to form a state. This is know as Event Sourcing, but may also be used with Commands to support Command Sourcing, and even other sources such as process messages.

Understanding Entity Actor Lifecycles

Let's start out by contrasting the lifecycles of Plain Old Java Objects (POJOs) with Actor-based entities.

In a typical object-based application or service that uses POJOs, the lifecycle of entities is different from that of actor-based entities. Here we can assume that Entity and Aggregate as defined within Domain-Driven Design are interchangeable; that is, what follows deals with both concepts.

POJO Lifecycles

The following describes the typical lifecycle of a POJO Entity:

  • Non-existing Entity states are newly constructed and then persisted to storage

  • Preexisting Entity states are reconstituted from storage, modified, and then persisted back to the same storage (updated).

  • Across a single JVM or a multi-JVM cluster, there may be any number of duplicated Entity instances. This implies that any of the duplicate instances may simultaneously have different operations performed by different users. In such cases the database must provide optimistic concurrency and detect concurrency violations via state versions.

  • The object reference is released after Entity persistence and the instance garbage collected.

Actor-Entity Lifecycles

The following describes the typical lifecycle of an actor-based Entity.

Lattice model entity types and respective storage interfaces.

Note that this applies across various types of Entities supported by XOOM Lattice, including Sourced<T>, StatefulEntity<T>, and ObjectEntity<T>:

  • Non-existing states are newly persisted, and the guaranteed persistent state is then set back onto the actor state.

  • Preexisting states are possibly already in memory; if not in memory, states are reconstituted from storage; proposed changes are then persisted back to the same storage; following persistence, the state is then set back onto the actor state.

  • Across a single JVM or a multi-JVM cluster, there is one uniquely identified Entity instance, and all requests for creation/modification will be focused on that single instance, wherever it may be.

  • The actor is retained in memory until memory constraints call for some actors to be evicted. A least-recently-used algorithm determines which specific actor instances must be evicted, and those actors are stopped. The evictions make room for other currently "hot" actors.

Commands, Domain Events, and Identified Domain Events

Lattice defines abstract base types: Command, DomainEvent, IdentifiedDomainEvent. These provide for modeling specific types as follows:

Type

Description

Command

An abstract type used to model the imperative intent directive to carry out a business operation. Alternatively a command can be a method declaration rather than an object/record

DomainEvent

A abstract type used to model a record of fact about an important business occurrence in the domain model

IdentifiedDomainEvent

A abstract type used to model a record of fact about an important business occurrence in the domain model and that has an identity associated with it, which can be used for correlation

These types, and especially DomainEvent and IdentifiedDomainEvent will be extended in order to create concrete types used in each domain model. In the examples that follow two domain events are used, CartCreated and ProductPlacedInCart. These could be modeled as either DomainEvent, or IdentifiedDomainEvent if there is a need to correlate multiple domain events to a kind of long-running process, also known as a Saga.

Entity Construction Details

Regarding the creation of the various Entity types, including Sourced,EventSourced, and CommandSourced, there are design decisions that determine some minor constraints.

In the following discussions we refer to apply() methods. These are provided by the aforementioned Entity abstract types: StatefulEntity, ObjectEntity, Sourced,EventSourced, CommandSourced, etc. These methods are responsible for managing the atomic persistence of Entity state and Source<T> instances, such as concrete extensions of DomainEvent. These are also responsible for setting the guaranteed persistent state back onto the Entity. These apply() methods are explained in more detail below, but you must be aware of their existence here.

Generally speaking, it is very difficult to support the use of apply() methods from inside Entity constructors. Thus, we do not currently support using an apply() method from a Entity constructor. Next it is explained why this is the case. A Product protocol is used as the concrete Entity type and EventSourced as the abstract base Entity, although it could be any of the abstract base types.

  1. All Entity abstract base types provide constructors that take either zero parameters or one parameter, that being the Entity unique identity.

  2. If the zero-argument constructor is used, the actor's address is used as the unique identity. This works out especially well for Grid-based actors because the address is based on a universally unique identity (our recommended approach).

  3. The Product constructor may take one or more parameters used for identity, but the abstract base constructor accepts only zero or a single parameter, but only one identity may be passed to the base constructor. When supplying the constructor with multiple identity parts, these can be turned into a single composite identity using the streamNameFrom(...) of Sourced<T> (e.g. EventSourced, CommandSourced, and Process) and the idFrom(...) for StatefulEntity and ObjectEntity.

  4. After the Entity constructor has completed, the actor will receive it's first message, known as start(). It is when handling this start() message that the abstract base Entity attempts to restore its state from persistence. If persistent state is not available, the Entity has its initial default state. If persistent state is available, the Entity is initialized with it. For Sourced<T> entities the initial state will be applied using one or more sources, such as DomainEvent types, in its stream. This emphasizes that if the concrete Entity type overrides the start() message, it must ensure that the base start() is always handled with super.start(). Also the concrete Entity type must assume that the initial state in not available until after the base start() handler is run. In fact, the concrete Entity type will never see a recovered initial state because the base start() handles recovery using asynchronous messaging to and from the storage actor.

Consider also that any constructor is not a good expression of behavior for the Ubiquitous Language. Even when designing with POJOs it's generally desirable to hide the constructor behind an expressive Factory Method, such as:

Product product = Product.receiveInto(warehouse);

Look at the vlingo-iddd-collaboration example to see examples of how this may be accomplish in the Forum protocol Factory Method. Additionally, ForumEntity starts a new Discussion in its discussFor() message handler. Likewise, the DiscussionEntity creates a new Post in its postFor() message handler.

Entity Storage Mechanisms

The following explains the storage options available for each Entity type: ObjectEntity, StatefulEntity, as well as EventSourced and CommandSourced.

Object-Relational Mapping

An ObjectEntity concrete extension is stored using Object-Relational Mapping (ORM). There is already much literature available on ORM so we won't spend much time explaining it here. The basic idea is that a software object may be mapped into the row of a relational database table. Each of the object's attributes are mapped to a column in that row. There are, of course, more elaborate mapping relationships that can be employed, such as for one-to-many, many-to-one, and many-to-many object compositions. Yet, the mappings still come down to tables, rows, and columns holding object state.

The XOOM Symbio components provide various JDBC implementations for ORM using the ObjectStore, including Java Persistence Architecture (JPA). The JPA options include EclipseLink, OpenJPA, and Hibernate. Another JDBC implementation of XOOM Symbio supports Jdbi, which is a very lightweight toolkit for mapping objects. As long as your entity designs are small, using Jdbi may be all you need and will be a welcome alternative to JPA and other traditional ORM tools.

The following table shows the databases current supporting ORM.

Database

Use

Apache Geode

Production

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Production

Below you will see how to use Object-Relational Mapping in your entities.

Stateful

A StatefulEntity concrete extension is stored using a key-value database. The XOOM Symbio components provide simple key-value persistence that works over several different database products. The storage type is know as the StateStore. In addition to all supported relational (JDBC) databases, there is also support for Amazon DynamoDB and Apache Geode.

Database

Use

DynamoDB

Production

Apache Geode

Production

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Production

Below you will see how to use stateful entities.

Sourced

The states of EventSourced and CommandSourced entities are not stored as whole objects. Instead, every such entity maintains a stream of records, each representing a partial state. The individual records of partial state are stored in a Journal, and collectively combine to create its state. The word "source" conveys the idea of an official origin, which in this case is a record of partial state. The record may be a fact, such as is captured by a DomainEvent. The record may instead be an imperative directive to carry out an operations, known as a Command. Persisting one or more DomainEvent instances to represent an aggregate's state is known as Event Sourcing. A similar approach in save the Command states, such as is generally the case with a Process (Manager), is known as Command Sourcing.

A sourced entity is one that owns an ordered collection of records that each indicate a discreet change that was made to the state of that entity. This ordered collection may be likened to an accounting ledger, with one row per change to an account. The changes in a ledger are either debits or credits to the account. Starting from a balance of zero, a credit must be added to the ledger to provide the first available funds. From there the next entry may be another credit, or a debit. Over time there are a number of entries which can be used to derive the account's balance. Starting from the first entry in the ledger, add credits and subtract debits in the order in which they occurred. After applying the addition or subtraction of the last entry in the ledger you know the account's balance.

Now consider that you have a similar ledger, but made up of a sequence of DomainEvent instances. Starting with the first DomainEvent apply its attributes to the state of the entity that owns the DomainEvent. Do the same for every subsequent DomainEvent until there are no more. At the end you know the current state of the entity.

The sequence in which the discreet records were produced is strictly maintained because both the records and the order in which they occurred are required to reproduce the accurate and valid state of the entity.

Continuing with the above e-commerce example, the following may be the ordered collection of events for a shopping cart.

Index

Event Type

1

com.ecommrus.model.cart.CartCreated

2

com.ecommrus.model.cart.ProductPlacedInCart

3

com.ecommrus.model.cart.ProductQuantityIncreased

4

com.ecommrus.model.cart.ProductPlacedInCart

5

com.ecommrus.model.cart.ProductRemovedFromCart

The Index column indicates the order in which the event occurred on the given Cart entity. The Event Type column contains the name of the DomainEvent that occurred in the given order. Note that this table does not show the event data along side the type; the actual Journal would include the data. Here's what the events indicate.

  1. The shopping cart was created on behalf of a user.

  2. The user placed a given product into the cart.

  3. The user increased the quantity that they require for the product from #2.

  4. The user placed an additional product into the cart.

  5. The user removed one of their two products from the cart. To know which of the two products was removed, the data of the event must be examined for the product id.

As noted previously the sequence of records may be specializations of DomainEvent and Command, but are more commonly of type DomainEvent. Thus, an entity that is sourced by a sequence of DomainEvent instances is Event Sourced. Assuming that the sequence of entries in the collection are DomainEvent types, this collection is known as the entity's event stream.

Each entity's event stream is persisted into a Journal. This Journal is responsible for maintaining two kinds of streams. One kind of stream is the individual stream of each entity. If you have 1 million Event Sourced entity instances in your Bounded Context then the Journal will have 1 million individual streams. The second kind of stream is the totally ordered entries of all entities in the entire Bounded Context. That is, as each entity appends new entries into its own stream, those entries are also indexed in a totally ordered stream of all entries. Thus, if each entity has an average of five entries in its individual stream, the totally ordered stream of all entries is 5 million total.

The Journal is append only. Once an entry has been appended it may not be (physically) deleted/removed. Further, every discreet entry is immutable.

The following table shows the databases current supporting Journal.

Database

Use

FoundationDB

Experimental

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Experimental

Below you will see how to use event sourced and command sourced entities.

Entity Types

ObjectEntity, StatefulEntity, and EventSourced are abstract base classes used in developing DDD models. The ObjectEntity supports various forms of relational database mapping, including ORM as in JPA and Hibernate, and Jdbi (a very thin wrapper around JDBC that maps with less pain). You can read more about those above and here. The StatefulEntity doesn't provide mapping, only a key-value store with the aggregate state as a CLOB/BLOB value. This enables using many different kinds of NoSql storage. The EventSourced supports Event Sourcing. Consider this simple usage example.

ObjectEntity Example

An actor that serves as a domain model entity may be designed extending the ObjectEntity type. The ObjectEntity should implement a protocol that defines the behavior available through message sending and delivery. Its state is stored in an ObjectStore.

CartEntity is an ObjectEntity extender, which means its state is OR mapped.

Here is a CartEntity that extends ObjectEntity and implements the Cart protocol.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
super(idFrom(":", cartId, userId);
this.cartState = CartState.initial(cartId, userId);
}
}

An ObjectEntity should receive its unique identity by means of its constructor, making the identity available during its initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the ObjectStore.

The following shows how a command message is handled and the state and event is persisted using a ObjectStore via the ObjectEntity.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(cartState.withSelected(product), productPlacedInCart);
}
...
}

The ObjectEntity provides a means to apply() a new state and concrete DomainEvent instances. In the above example a new CartState instance and a ProductPlacedInCart are applied together. The apply() causes four processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing ObjectStore.

  3. When the persistence is confirmed, the overridden stateObject(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the below code snippet displaying the overridden protected methods.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The ObjectEntity concrete type may not process its next messages until the current state transition and any events are fully applied. The same entity's state must not transition until the new state's persistence is confirmed.

The above steps 1-4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

In the following case the command message handler is augmented to answer an eventually available, guaranteed stored state of the entity. The message handler method would need to change as follows.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
public Completes<CartState> placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
return apply(
cartState.withSelected(product),
productPlacedInCart,
() -> cartState);
}
...
}

In the above example, notice that the protocol has be changed to support a Completes<T> return value, specifically Completes<CartState>. The Completes<T> protocol is explained here. The apply(...) method now provides a return value, and it also takes a parameter to supply the final cartState value as the contents of the Completes<CartState>. The apply() causes five processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing ObjectStore.

  3. When the persistence is confirmed, the overridden stateObject(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the below code snippet displaying the overridden protected methods.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

  5. The newly stored cartState instance that was replaced by the stateObject() method (next example) is used to complete the Completes<CartState> outcome value.

Every concrete ObjectEntity must override a few methods.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
@Override
protected CartState stateObject() {
return cartState;
}
@Override
protected void stateObject(final CartState stateObject) {
this.cartState = stateObject;
}
@Override
protected Class<CartState> stateObjectType() {
return CartState.class;
}
@Override
protected <C> Tuple2<CartState,List<Source<DomainEvent>>> whenNewState() {
// answer a Tuple2 if the context is an intial state; otherwise null
if (cartState.isInitial()) {
return Tuple2.of(cartState, Arrays.asList(new CartCreated(...)));
}
return null;
}
}

These methods support the ObjectEntity abstract base in interacting with the ObjectStore in behalf of the concrete CartEntity.

  • The stateObject() answers the CartEntity current CartState value.

  • The stateObject(CartState) provides a new state to replace the previous state. This is called following the restoration of the CartState from persistence when the CartEntity is being (re)started from an existing state, and following the persistence of a new state and zero or more events.

  • The stateObjectType() answers the Class<T> of the specific entity, which in this case is CartState.class.

  • The whenNewState() is an optional override if it is desired to provide an initial state to be persisted only when the Entity has been newly created. In this example, when the cartState is considered initial, the whenNewState() answers a Tuple2 of a CartState and a List<Source<DomainEvent>>. If the cartState has already transitioned past its initial state, answer null and state restoration from the ObjectStore is attempted instead.

Read the API documentation for more details.

ObjectTypeRegistry

In order to introduce your concrete entity types to the underlying ObjectStore persistence mechanism that is used by the ObjectEntity abstract base, you must register each one with the ObjectTypeRegistry.

import io.vlingo.xoom.lattice.model.object.ObjectTypeRegistry;
import io.vlingo.xoom.symbio.store.object.StateObjectMapper;
ObjectTypeRegistry registry = new ObjectTypeRegistry(world);
StateObjectMapper cartStateMapper =
StateObjectMapper.with(
CartState.class,
JdbiPersistMapper.with(
insertQuery,
updateQuery,
SqlStatement::bindFields),
new CartStateMapper());
String cartQuery = "SELECT FROM TBL_CARTS ...";
Info<Cart> cartInfo =
new Info(
objectStore,
CartState.class,
"Cart-Database",
MapQueryExpression.using(
Cart.class,
cartQuery,
MapQueryExpression.map("cartId", "")),
cartStateMapper);
registry.register(cartInfo);

The above code registers a mapper for the Cart and CartState using a Jdbi mapping.

  1. The ObjectTypeRegistry is created within the World that is used by the service.

  2. The StateObjectMapper is created for the CartState.

  3. An ObjectTypeRegistry.Info instance is created, which holds the components necessary to persist and query a CartState via the Cart implementor CartEntity. The Info holds: (a) the ObjectStore that persists CartState, (b) the type that is persisted, in this caseCartState.class, (c) an identifying name given to the store, (d) a QueryExpression, in this case a MapQueryExpression, that uses the cartQuery that takes as a parameter cartId, and (e) the previously created StateObjectMapper for CartState instances.

  4. The ObjectTypeRegistry is then used to register the Info<Cart>, enabling these to be supportable by the XOOM Lattice ObjectEntity.

You must register such an ObjectTypeRegistry.Info for every model type and state type, as was done above for Cart and CartState, respectively.

StatefulEntity Example

An actor that serves as a domain model entity may be designed extending the StatefulEntity type. The StatefulEntity should implement a protocol that defines the behavior available through message sending and delivery.

CartEntity is an StatefulEntity extender, meaning its state is stored as a key-value/document.

Here is a CartEntity that extends StatefulEntity and implements the Cart protocol.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
super(":", cartId, userId);
this.cartState = CartState.initial(cartId, userId);
}
}

A StatefulEntity should receive its unique identity by means of its constructor, making the identity available during its initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the StateStore.

The following shows how a command is handled by a StatefulEntity to persist the new state and any events using the underlying StateStore. Notice that the interface for applying the state and event are identical to the ObjectEntity interface.

import io.vlingo.xoom.lattice.model.stateful.StatefulEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(cartState.withSelected(product), productPlacedInCart);
}
...
}

The StatefulEntity provides a means to apply() a new state and concrete DomainEvent instances. In the above example a new CartState instance and a ProductPlacedInCart are applied together. The apply() causes four processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing StateStore.

  3. When the persistence is confirmed, the overridden state(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the following code snippet.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The StatefulEntity concrete type may not process its next messages until the current state transition and any events are fully applied. The same entity's stated must not transition until the new state's persistence is confirmed.

The above steps 1-4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

In the following case the command message handler is augmented to answer an eventually available, guaranteed stored state of the entity. The message handler method would need to change as follows.

import io.vlingo.xoom.lattice.model.stateful.StatefulEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
public Completes<CartState> placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
return apply(
cartState.withSelected(product),
productPlacedInCart,
() -> cartState);
}
...
}

In the above example, notice that the protocol has be changed to support a Completes<T> return value, specifically Completes<CartState>. The Completes<T> protocol is explained here. The apply(...) method now provides a return value, and it also takes a parameter to supply the final cartState value as the contents of the Completes<CartState>. The apply() causes five processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing ObjectStore.

  3. When the persistence is confirmed, the overridden stateObject(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the below code snippet displaying the overridden protected methods.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

  5. The newly stored cartState instance that was replaced by the stateObject() method (next example) is used to complete the Completes<CartState> outcome value.

Every concrete StatefulEntity must override a few methods.

import io.vlingo.xoom.lattice.model.DomainEvent;
import io.vlingo.xoom.lattice.model.stateful.StatefulEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
@Override
protected void state(final CartState state) {
this.cartState = state;
}
@Override
protected Class<CartState> stateType() {
return CartState.class;
}
@Override
protected <C> Tuple3<CartState,List<Source<DomainEvent>>> whenNewState() {
// answer a Tuple2 if the context is an intial state; otherwise null
if (cartState.isInitial()) {
return Tuple3.of(cartState, Arrays.asList(new CartCreated(...)), "created");
}
return null;
}
}

These methods support the StatefulEntity abstract base in interacting with the StateStore in behalf of the concrete CartEntity.

  • The id() answers the String unique identity of the CartEntity.

  • The state(CartState) provides a new state to replace the previous state. This is called following the restoration of the CartState from persistence when the CartEntity is being (re)started from an existing state, and following the persistence of a new state and zero or more events.

  • The stateType() answers the concrete state type, such as the Class<CartState>.

  • The whenNewState() is an optional override if it is desired to provide an initial state to be persisted, but only when the Entity has been newly created. The whenNewState() is called each time the CartEntity actor is started. On the initial start just following the first ever construction of the entity, this method offers the opportunity to provide the initial state to be applied. The offer may be ignored by not overriding this method, as the default answers null. If this method is called at any other time other than the first ever construction of the entity, it must answer null. In this example, when the cartState is considered initial, the whenNewState() answers a Tuple3 of a CartState, a List<Source<DomainEvent>>, and the "created" operation which caused it. If the cartState has already transitioned past its initial state, answer null and state restoration from the ObjectStore is attempted instead.

Read the API documentation for more details.

StatefulTypeRegistry

In order to introduce your concrete entity types to the underlying StateStore persistence mechanism that is used by the StatefulEntity abstract base, you must register each one with the StatefulTypeRegistry.

import io.vlingo.xoom.lattice.model.stateful.StatefulTypeRegistry;
import io.vlingo.xoom.symbio.store.state.dynamodb.DynamoDBStateActor;
import io.vlingo.xoom.symbio.store.state.StateStore;
StateStore store = world.actorFor(StateStore.class, DynamoDBStateActor.class, dispatcher);
registry = new StatefulTypeRegistry(world);
registry.register(new Info<>(store, CartState.class, CartState.class.getSimpleName()));

You must register such an StateTypeRegistry.Info for every state type, as was done above for CartState.

EventSourced Example

An actor that serves as a domain model entity may be designed as an extension to the EventSourced type. The EventSourced should implement a protocol that defines the behavior available through message sending and delivery.

CartEntity is an EventSource extender, meaning its events are stored as a stream in a journal.

Here is a CartEntity that extends EventSourced and implements the Cart protocol.

import io.vlingo.xoom.lattice.model.object.ObjectEntity;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
super(); // use the actor address as unique identity
this.cartState = CartState.initial(cartId, userId);
}
}

An EventSourced may receive its unique identity by means of its constructor, or it may provide no identity, in which case the actor's address will be used. Either way, this makes the identity available during the entity's initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the Journal.

The following shows how a command is handled by an EventSourced to persist events using the underlying Journal. Notice that the interface for applying the event(s) is different from the ObjectEntity and StatefulEntity.

import io.vlingo.xoom.lattice.model.sourcing.EventSourced;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(productPlacedInCart);
}
...
}

In the case of EventSourced the state is not automatically persisted because it is not generally needed. The events themselves represent the state of the cart, as explained above. You'll next see how the state is persisted if the conditions exist to justify that.

The unique identity of the entity is set in the base constructor in the streamName immutable variable.

protected final String streamName;

In the case of Sourced<T> entities, such as EventSourced, the implementor will be asked if they would like to include a snapshot of the state at any given point of apply(). The concrete EventSourced, in this case CartEntity, may do so using the following override.

@Override
protected CartSate snapshot() {
return cartState;
}

Notice, however, that this example unconditionally provides the CartState instance as a snapshot in all cases. This is generally not desirable because it is probably not necessary. In fact snapshots are generally only useful when a given entity may have many, many events associated with it. In such cases it is best to use a heuristic metric to determine when a snapshot is necessary, where the metric is based on some point where the performance of entity reconstitution starts to degrade with many events.

For example, say that you have an entity that could have thousands of items in it. This might be a large order being populated with a many products from a warehouse. To solve the overhead of loading all events from the Journal, you may decide to take a snapshot at every 250th event.

@Override
protected OrderState snapshot() {
return currentVersion() % 250 == 0 state : null;
}

The currentVersion() provides a 1-based version or sequence number of the next event to be appended to the Journal; that is, the first of potentially multiple events currently being applied. In the above example the snapshot of the OrderState will only be provided every 250th event. Otherwise null is answered to prevent a snapshot from being persisted along with the event(s).

If any given command message handler produces multiple events, it is possible for the above calculation to be inexact.

If using snapshots, the following method must be overridden in order to apply the most recent snapshot to the entity's state when the entity is being reconstituted.

...
protected <OrderState> void restoreSnapshot(final OrderState snapshot, final int currentVersion) {
this.state = snapshot;
}
...

Another useful tool is asList(), which places multiple events into a List<DomainEvent> that may be passed to the respective apply() methods. For example, it's possible that a shopping cart is created by the user selecting the currently viewed product into a new cart.

public void createCartWithFirstProduct(...) {
apply(asList(new CartCreated(...), new ProductPlacedInCart(...)));
}

The EventSourced provides a means to apply() one or more concrete DomainEvent instances. In the above example a new CartCreated and ProductPlacedInCart are applied together. The apply() causes five processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The snapshot() is used to determine whether or not a snapshot instance should be persisted with the events. If not, the snapshot() will answer null. The default is to never provide snapshots.

  3. The new CartCreated and ProductPlacedInCart events are asynchronously and atomically appended to the backing Journal.

  4. When the persistence is confirmed, a registered event-specific method is called, once for each applied event, to transition the current cartState with based on the new event value. You can see examples in the below code snippet.

  5. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The EventSourced concrete type may not process its next messages until the events are fully applied. The same entity's stated must not transition until the state's storage is confirmed.

The above steps 3 and 4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

Every concrete EventSourced must register an event-specific method transition from the current state to a new state for each newly applied event.

import io.vlingo.xoom.lattice.model.sourcing.EventSourced;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
static {
registerConsumer(CartEntity.class, CartCreated.class, CartEntity::applyCartCreated);
registerConsumer(CartEntity.class, ProductPlacedInCart.class, CartEntity::applyProductPlacedInCart);
...
}
private void applyCartCreated(CartCreated event) {
cartState = CartState.initial(event.cartId, event.userId);
}
private void applyProductPlacedInCart(ProductPlacedInCart event) {
Product product = new Product(event.productId, event.quantity, event.price);
cartState = cartState.withProduct(product);
}
...
}

Following the confirmation of the CartCreated event being stored, the EventSourced base class calls applyCartCreated() with the CartCreated instance. This gives the CartEntity the opportunity to transition the CartState to the new value, which in this case is its initial value.

Following the confirmation of the ProductPlacedInCart event being stored, the EventSourced base class calls applyProductPlacedInCart() with the new ProductPlacedInCart instance. This gives the CartEntity the opportunity to transition the CartState to the new value to include the newly placed product.

Read the API documentation for more details on all possible overrides. Note that all of the above applies also to the CommandSourced, with the same API documentation.

SourcedTypeRegistry

In order to introduce your concrete entity types to the underlying Journal persistence mechanism that is used by the EventSourced abstract base, you must register each one with the SourcedTypeRegistry.

import io.vlingo.xoom.lattice.model.sourcing.SourcedTypeRegistry.Info;
import io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
import io.vlingo.xoom.symbio.store.journal.Journal;
...
journal = world.actorFor(Journal.class, JDBCJournalActor.class, configuration);
registry = new SourcedTypeRegistry(world);
Info info = new Info(journal, CartEntity.class, CartEntity.class.getSimpleName());
registry.register(info);

You must register such an SourcedTypeRegistry.Info for every state type, as was done above for CartState.

Summary

In all cases, the new state and/or events are persisted asynchronously and safely replaced on the entity instance when reconstituted. The apply() handles all that for you, but without involving the aggregate designer in persistence, and minus technical naming. There are no race conditions or conflicting operation results, or failed persistence due to a database consistency violation.

Modeling State

In all of the above entity examples there is a single state object held by each of the entity types, the CartState type. This approach has several advantages.

  • There is only one variable to manage for all state, including one or more identities and all state that may transition over time.

  • Less state objects to manage requires less ongoing cognition.

  • The state is immutable, which makes the state simpler to maintain and reason about.

  • The state has a side-effect-free, intention-revealing interface, resulting in states transitioning by being fully replaced.

  • The state transitions one operation at a time, with each transition clearly focused on a single reason for replacement.

Such a state object may be designed as follows.

public final class CartState {
public final String cartId;
public final List<Product> products;
public final String userId;
public static CartState initial(String cartId, String userId) {
new CartState(cartId, userId);
}
public CartState withProduct(Product product) {
new CartState(cartId, userId, place(product));
}
private CartState(String cartId, String userId) {
this(cartId, userId, new ArrayList<>(0));
}
private CartState(String cartId, String userId, List<Product> products) {
this.cartId = cartId;
this.userId = userId;
this.products = products;
}
private List<Product> place(Product product) {
List<Product> products = new ArrayList(products);
products.add(product);
return Collections.unmodifiableList(products);
}
}

Note that the instance variables are all declared public final, making accessor methods unnecessary because clients may directly access the immutable variables. A functional style of programming is employed, which is named referential transparency. That is, the value outcome of a function may be replaced by the value itself. The benefit is the reduction of the cognitive overhead of the interface. Yet, this means that you must take care to design naturally mutable objects, such as List<Product>, as immutable. The above example employs Collections.unmodifiableList() to make the products immutable so that client access may not directly change the list without the CartState knowledge.

You may easily add support for other concepts in the state type, such as collections of Coupon or DiscountCode instances, depending on you Ubiquitous Language. All such additions would focus on only the changes necessary to support the CartEntity commands.

Querying a StateStore

XOOM Lattice provides a specialized query component for querying a StateStore asynchronously. It is the abstract base class StateStoreQueryActor.

import io.vlingo.xoom.lattice.query.StateStoreQueryActor;
public class ProductQueriesActor extends StateStoreQueryActor implements ProductQueries {
public ProductQueriesActor(final StateStore stateStore) {
super(stateStore);
}
// implement ProductQueries
public Completes<ProductCategoryView> productCategoryOf(final String category) {
return queryStateFor(category, ProductCategoryView.class);
}
public Completes<ProductView> productOf(final String productId) {
return queryStateFor(productId, ProductView.class);
}
}

Use the StateStoreQueryActor to quickly implement queries of various kinds on the StateStore .

StateAdapter and StateAdapterProvider

All store types, ObjectStore, StateStore, and Journal, serialize state objects to State<T> instances. Some stores must serialize state objects in order to persist them, while others do not. In the case of StateStore and Journal, the states are indeed serialized for the sake of persistence. On the other hand, the ObjectStore will always map state to a row and columns, and thus does not use a serialized State<T> to persist. Even so, ObjectStore will still always serialize the state to a State<T> for the purpose of post persistence processing. Therefore, every state type must use a StateAdapter.

import io.vlingo.xoom.common.serialization.JsonSerialization;
import io.vlingo.xoom.symbio.BaseEntry.TextEntry;
import io.vlingo.xoom.symbio.StateAdapter;
import io.vlingo.xoom.symbio.Metadata;
public class CartStateAdapter implements StateAdapter<CartState, TextState> {
@Override
public int typeVersion() {
return 1;
}
@Override
public CartState fromRawState(final TextState raw) {
return JsonSerialization.deserialized(raw.data, CartState.class);
}
@Override
public <ST> ST fromRawState(final TextState raw, final Class<ST> stateType) {
return JsonSerialization.deserialized(raw.data, stateType);
}
@Override
public TextState toRawState(CartState state, int stateVersion, Metadata metadata) {
final String serialization = JsonSerialization.serialized(state);
return new TextState(state.id, CartState.class, typeVersion(), serialization, stateVersion, metadata);
}
@Override
public TextState toRawState(final String id, final CartState state, final int stateVersion, final Metadata metadata) {
final String serialization = JsonSerialization.serialized(state);
return new TextState(id, CartState.class, typeVersion(), serialization, stateVersion, metadata);
}
}

Initially it is not necessary to create a StateAdapter<S,E> during early development if you are using the text edition of a given store, such as ObjectStore<String>, StateStore<String>, or Journal<String>. Instead you may benefit from the DefaultTextStateAdapter. This default adapter is used when no adapter is registered for a given state type. However, when state types change, you must definitely implement adapters for the changing types. In this case the adapters play an important role of upgrading, referred to as upcasting, the persisted State<T> to the new version of the state type.

See: io.vlingo.xoom.symbio.DefaultTextStateAdapter

To make the adapters available to stores, use the StateAdapterProvider.

import io.vlingo.xoom.symbio.StateAdapterProvider;
StateAdapterProvider stateAdapterProvider =
StateAdapterProvider.instance(world);
stateAdapterProvider.registerAdapter(
CartState.class, new CartStateAdapter());

EntryAdapter and EntryAdapterProvider

When using domain events, Source<T> types, such as concrete DomainEvent types, may be persisted to a store database. Such DomainEvent instances, such as CartCreated and ProductAddedToCart, must be serialized to a form that can be inserted into the underlying database as an instances of Entry<T>. Two types of Entry<T> are supported, TextEntry and BinaryEntry. The TextEntry is backed by a String, which is appropriate for serializing with JSON. The BinaryEntry is backed by a byte[], which may be used to hold the result of binary serialization (e.g. Protobuf and Avro).

It is strongly advised to never use Java default serialization. If you are unaware why, search for "problems with Java serialization".

Adapters are created by implementing EntryAdapter<S,E>. These adapt Source<T> instances to Entry<T> instances, and Entry<T> instances to Source<T> instances. For example, CartCreated instances may be adapted to TextEntry instances, and the same TextEntry instances adapted back to the corresponding CartCreated instances. Such an adapter would be named CartCreatedAdapter.

import io.vlingo.xoom.common.serialization.JsonSerialization;
import io.vlingo.xoom.symbio.BaseEntry.TextEntry;
import io.vlingo.xoom.symbio.EntryAdapter;
import io.vlingo.xoom.symbio.Metadata;
public final class CartCreatedAdapter implements EntryAdapter<CartCreated, TextEntry> {
@Override
public CartCreated fromEntry(TextEntry entry) {
return JsonSerialization.deserialized(entry.entryData(), CartCreated.class);
}
@Override
public TextEntry toEntry(CartCreated source, Metadata metadata) {
String serialization = JsonSerialization.serialized(source);
return new TextEntry(CartCreated.class, 1, serialization, metadata);
}
@Override
public TextEntry toEntry(CartCreated source, String id, Metadata metadata) {
String serialization = JsonSerialization.serialized(source);
return new TextEntry(id, CartCreated.class, 1, serialization, metadata);
}
@Override
public TextEntry toEntry(CartCreated source, int version, String id, Metadata metadata) {
final String serialization = JsonSerialization.serialized(source);
return new TextEntry(id, CartCreated.class, 1, serialization, version, metadata);
}
}

The EntryAdapter<S,E> provides three overloaded methods for adapting from a Source<S> to an Entry<E>, and one for adapting from Entry<E> to Source<S>. In the above example the Source<S> type CartCreated is adapted to a TextEntry using JSON, and the persisted TextEntry back to a CartCreated.

Initially it is not necessary to create an EntryAdapter<S,E> during early development if you are using the text edition of a given store, such as ObjectStore<String>, StateStore<String>, or Journal<String>. Instead you may benefit from the DefaultTextEntryAdapter. This default adapter is used when no adapter is registered for a given Source<S> type. However, when Source<S> types change, you must definitely implement adapters for the changing types. In this case the adapters play an important role of upgrading, referred to as upcasting, the persisted Entry<E> to the new version of the Source<S>type.

See: io.vlingo.xoom.symbio.DefaultTextEntryAdapter

To make the adapters available to stores, use the EntryAdapterProvider.

import io.vlingo.xoom.symbio.EntryAdapterProvider;
EntryAdapterProvider entryAdapterProvider =
EntryAdapterProvider.instance(world);
entryAdapterProvider.registerAdapter(
CartCreated.class, new CartCreatedAdapter());
entryAdapterProvider.registerAdapter(
ProductAddedToCart.class, new ProductPlacedInCartAdapter());
...