Object, Stateful, and Sourced Entities With CQRS

Understanding reactive Object, Stateful, Sourced, and CQRS actor-based persistence.

Command-Query Responsibility Segregation uses two models per Bounded Context (application or service). These are the Command Model and the Query Model, otherwise known as the Read Model and the Write Model, respectively.

The Command Model is often referred to as the "Write Model" and the Query Model as the "Read Model." These AKAs are quite inaccurate because you actually read from and write to both models. We prefer the names Command Model and Query Model because these names adhere to the pattern name, CQRS, and express the purpose of each of the two models.

In addition, VLINGO/LATTICE supports three styles of persistence, including object, flat state, and sourced. The object style of persistence is more commonly known as Object-Relational Mapping (ORM), although a very low-impact mapping approach is also provided. The flat state style uses a Key-Value persistence, which is often suitable for both the CQRS Command Model and Query Model. The sourced style provides the persistence of a series of facts, known as Domain Events, that when applied or folded into a single object, combine to form a state. This is know as Event Sourcing, but may also be used with Commands to support Command Sourcing.

Understanding Entity Actor Lifecycles

Let's start out by contrasting the lifecycles of Plain Old Java Objects (POJOs) with Actor-based entities.

In a typical object-based application or service that uses POJOs, the lifecycle of entities is different from that of actor-based entities. Here we can assume that Entity and Aggregate as defined within Domain-Driven Design are interchangeable; that is, what follows deals with both concepts.

POJO Lifecycles

The following describes the typical lifecycle of a POJO Entity:

  • Non-existing Entity states are newly constructed and then persisted to storage

  • Preexisting Entity states are reconstituted from storage, modified, and then persisted back to the same storage (updated).

  • Across a single JVM or a multi-JVM cluster, there may be any number of duplicated Entity instances. This implies that any of the duplicate instances may simultaneously have different operations performed by different users. In such cases the database must provide optimistic concurrency and detect concurrency violations via state versions.

  • The object reference is released after Entity persistence and the instance garbage collected.

Actor-Entity Lifecycles

The following describes the typical lifecycle of an actor-based Entity.

Lattice model entity types and respective storage interfaces.

Note that this applies across various types of Entities supported by VLINGO/LATTICE, including Sourced<T>, StatefulEntity<T>, and ObjectEntity<T>:

  • Non-existing states are newly persisted, and the guaranteed persistent state is then set back onto the actor state.

  • Preexisting states are possibly already in memory; if not in memory, states are reconstituted from storage; proposed changes are then persisted back to the same storage; following persistence, the state is then set back onto the actor state.

  • Across a single JVM or a multi-JVM cluster, there is one uniquely identified Entity instance, and all requests for creation/modification will be focused on that single instance, wherever it may be.

  • The actor is retained in memory until memory constraints call for some actors to be evicted. A least-recently-used algorithm determines which specific actor instances must be evicted, and those actors are stopped. The evictions make room for other currently "hot" actors.

Entity Construction Details

Regarding the creation of the various Entity types, including Sourced,EventSourced, and CommandSourced, there are design decisions that determine some minor constraints.

In the following discussions we refer to apply() methods. These are provided by the various aforementioned Entity abstract types: Sourced,EventSourced, CommandSourced, etc. These methods are responsible for managing the atomic persistence of Entity state and Source<T> instances, such as DomainEvent. These are also responsible for setting the guaranteed persistent current state back onto the Entity. These apply() methods are explained in more detail below, but you must be aware of their existence here.

Generally speaking, it is very difficult to support the use of apply() methods from inside Entity constructors. Thus, we do not currently support using an apply() method from a Entity constructor. Here's why, using Product as the concrete Entity type and EventSourced as the abstract base Entity, although it could be any of the abstract base types.

  1. The Product constructor doesn't execute until after the EventSourced constructor. Therefore, restoring the state of any preexisting Product from within the EventSourced constructor is impossible, unless forcing the Product and all other concrete Entity types to pass their id to the EventSourced constructor. Under the conditions that you are using the vlingo/lattice Grid, then we can derive the unique identity from the actor's address.

  2. Forcing the previous step (1) to work makes it extremely difficult to reconstitute the Product state dynamically when the Grid detects a sent message but the actor is not currently in memory.

  3. Even if forcing entities to pass their id to the EventSourced constructor, consider what would be required to enable the following apply() to work. The EventSourced constructor would have to suspend the actor message handling to make an asynchronous request of the Journal to recover state. The construction success would not be known before the constructor returned.

  4. To avoid issues 1-3 we could pass in the existing lifecycle event stream to a constructor, but this assumes that we could safely perform an asynchronous fetch of the stream and then reconstitute the Product before another thread raced to accomplish the same. This could result in duplicating Product instances in the cluster (or worse).

  5. Attempting 4 would require more asynchronous components to load the stream and reconstitute the Product. This could lead to requiring both Application Service components and Repository components, such as are found in the over-engineered N-tier architectures that the VLINGO/PLATFORM staunchly resists.

Consider also that any constructor is not a good expression of behavior for the Ubiquitous Language. Even when designing with POJOs it's generally desirable to hide the constructor behind an expressive Factory Method, such as:

Product product = Product.receiveInto(warehouse);

Look at the vlingo-iddd-collaboration example to see examples of how this may be accomplish in the Forum protocol Factory Method. Additionally, ForumEntity starts a new Discussion in its discussFor() message handler. Likewise, the DiscussionEntity creates a new Post in its postFor() message handler.

Object-Relational Mapping

There is already much literature on Object-Relational Mapping (ORM) so we won't spend much time explaining it here. The basic idea is that a software object may be mapped into the row of a relational database table. Each of the object's attributes are mapped to a column in that row. There are, of course, more elaborate mapping relationships that can be employed, such as for one-to-many, many-to-one, and many-to-many object compositions. Yet, the mappings still come down to tables, rows, and columns holding object state.

The vlingo/symbio components provide various JDBC implementations for ORM using the ObjectStore, including Java Persistence Architecture (JPA). The JPA options include EclipseLink, OpenJPA, and Hibernate. Another JDBC implementation of VLINGO/SYMBIO supports Jdbi, which is a very lightweight toolkit for mapping objects. As long as your entity designs are small, using Jdbi may be all you need and will be a welcome alternative to JPA and other traditional ORM tools.

The following table shows the databases current supporting ORM.

Database

Use

Apache Geode

Production

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Production

Below you will see how to use Object-Relational Mapping in your entities.

Stateful

The VLINGO/SYMBIO components provide simple key-value persistence that works over several different databases. This is called the StateStore. In addition to all supported relational (JDBC) databases, there is also support for Amazon DynamoDB and Apache Geode.

Database

Use

DynamoDB

Production

Apache Geode

Production

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Production

Below you will see how to use stateful entities.

Sourced

The word "source" conveys the idea of an official origin, in this case of a record. The record may be a fact, such as is captured by a DomainEvent. The record may instead be an imperative directive to carry out an operations, known as a Command.

A sourced entity is one that owns an ordered collection of records that each indicate a discreet change that was made to the state of that entity. This ordered collection may be likened to an accounting ledger, with one row per change to an account. The changes in a ledger are either debits or credits to the account. Starting from a balance of zero, a credit must be added to the ledger to provide the first available funds. From there the next entry may be another credit, or a debit. Over time there are a number of entries which can be used to derive the account's balance. Starting from the first entry in the ledger, add credits and subtract debits in the order in which they occurred. After applying the addition or subtraction of the last entry in the ledger you know the account's balance.

Now consider that you have a similar ledger, but made up of a sequence of DomainEvent instances. Starting with the first DomainEvent apply its attributes to the state of the entity that owns the DomainEvent. Do the same for every subsequent DomainEvent until there are no more. At the end you know the current state of the entity.

The sequence in which the discreet records were produced is strictly maintained because both the records and the order in which they occurred are required to reproduce the accurate and valid state of the entity.

Continuing with the above e-commerce example, the following may be the ordered collection of events for a shopping cart.

Index

Event Type

1

com.ecommrus.model.cart.CartCreated

2

com.ecommrus.model.cart.ProductPlacedInCart

3

com.ecommrus.model.cart.ProductQuantityIncreased

4

com.ecommrus.model.cart.ProductPlacedInCart

5

com.ecommrus.model.cart.ProductRemovedFromCart

The Index column indicates the order in which the event occurred on the given Cart entity. The Event Type column contains the name of the DomainEvent that occurred in the given order. Note that this table does not show the event data along side the type; the actual Journal would include the data. Here's what the events indicate.

  1. The shopping cart was created in behalf of a user.

  2. The user placed a given product into the cart.

  3. The user increased the quantity that they require for the product from #2.

  4. The user placed an additional product into the cart.

  5. The user removed one of their two products from the cart. To know which of the two products was removed, the data of the event must be examined for the product id.

As noted previously the sequence of records may be specializations of DomainEvent and Command, but are more commonly of type DomainEvent. Thus, an entity that is sourced by a sequence of DomainEvent instances is Event Sourced. Assuming that the sequence of entries in the collection are DomainEvent types, this collection is known as the entity's event stream.

Each entity's event stream is persisted into a Journal. This Journal is responsible for maintaining two kinds of streams. One kind of stream is the individual stream of each entity. If you have 1 million Event Sourced entity instances in your Bounded Context then the Journal will have 1 million individual streams. The second kind of stream is the totally ordered entries of all entities in the entire Bounded Context. That is, as each entity appends new entries into its own stream, those entries are also indexed in a totally ordered stream of all entries. Thus, if each entity has an average of five entries in its individual stream, the totally ordered stream of all entries is 5 million total.

The Journal is append only. Once an entry has been appended it may not be (physically) deleted/removed. Further, every discreet entry is immutable.

The following table shows the databases current supporting Journal.

Database

Use

FoundationDB

Experimental

HSQLDB (in-memory)

Testing

In-Memory

Testing

MariaDB

Production

MySQL

Production

PostgreSQL

Production

YugaByte

Experimental

Below you will see how to use event sourced and command sourced entities.

CQRS

The Command-Query Responsibility Segregation pattern is used to optimize the different kinds of operations common with domain models, that of writing and reading data. Because these two kinds of operations tend to have different drivers, the ways that data is structured for one doesn't lend itself to supporting the other. Thus, the optimizations are accomplished by separating or segregating the two kinds of operations.

The writes or modifications to system state are accomplished by executing commands against one model of the domain known as the Command Model (Write Model). This model is limited to handling modifications to the state of the system. If you want to effect changes to the data, you use the Command Model.

The reads of the system state are made through queries on another model, known as the Query Model (Read Model). This model is optimized specifically for the kinds of views of the data that the user needs displayed for information consumption and decision making.

If you think about a traditional model interface, such as that for a Product, there are two kinds of operations. One kind modifies the Product data, and the other kind queries the Product data. You may expect such a Product model to look like this.

package com.ecommrus.catalog.model;
public interface Product {
void changeSku(final SKU sku);
SKU querySku();
void rename(final String name);
String queryName();
void summarize(final String summary);
String querySummary();
void describe(final String description);
String queryDescription();
...
}

Although there is more detail to the Product interface, this is all we need to make the essential point about CQRS. This interface combines both commands and queries. The changeSku() and rename() are command methods, which are used to modify the sku and name, respectively. The querySku() and queryName() are used to read the current state of the Product sku and name, respectively.

This seems to be designed appropriately. The problem is that when the user needs to view the Product they also need to view the Pricing, Availability, SimilarProducts, ProductReviews, and possibly other information associated with a given Product. Considering that there is a minimum of five objects that must be retrieved for that user view, it will certainly be quite difficult and inefficient to query each piece of data separately.

With motivations to optimize for the above use case, we decide to design our Command Model and our Query Model separately. Thus, the Product in the Command Model is refactored as follows, with command operations only.

package com.ecommrus.catalog.command;
public interface Product {
void changeSku(final SKU sku);
void rename(final String name);
void summarize(final String summary);
void describe(final String description);
...
}

The Query Model is designed far differently, and is optimized for the catalog of viewableProductData instances.

package com.ecommrus.catalog.query;
public class ProductData {
public final String sku;
public final String name;
public final String summary;
public final String description;
...
public final AvailabilityData availability;
public final PricingData pricing;
public final ProductReviewsData productReviews;
public final SimilarProductsData similarProducts;
}

This ProductData object composes much more than the basic Product data. It includes all the data that the user view requires on one convenient object.

Now, an essential point is that all of the data assembled in ProductData was queried in one batch while using no database joins. That's because the ProductData was projected into a single document with the unique sku as the key. This document was built up over time by projecting new and modified state from the Command Model into a single state.

Generally speaking you will likely want to use the StateStore for query/read model persistence since it provides simple key-value persistence over several database types. Doing so your projections may be supported using StatefulEntity.

A later subsection discusses Projections. For more information about the CQRS approach, see Really Simple CQRS.

Entity Types

ObjectEntity, StatefulEntity, and EventSourced are abstract base classes used in developing DDD models. The ObjectEntity supports various forms of relational database mapping, including ORM as in JPA and Hibernate, and Jdbi (a very thin wrapper around JDBC that maps with less pain). You can read more about those above and here. The StatefulEntity doesn't provide mapping, only a key-value store with the aggregate state as a CLOB/BLOB value. This enables using many different kinds of NoSql storage. The EventSourced supports Event Sourcing. Consider this simple usage example.

ObjectEntity Example

An actor that serves as a domain model entity may be designed extending the ObjectEntity type. The ObjectEntity should implement a protocol that defines the behavior available through message sending and delivery.

CartEntity is an ObjectEntity extender, which means its state is OR mapped.

Here is a CartEntity that extends ObjectEntity and implements the Cart protocol.

import io.vlingo.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
this.cartState = CartState.initial(cartId, userId);
}
}

An ObjectEntity should receive its unique identity by means of its constructor, making the identity available during its initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the ObjectStore.

The following shows how a command message is handled and the state and event is persisted using a ObjectStore via the ObjectEntity.

import io.vlingo.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(cartState.withSelected(product), productPlacedInCart);
}
...
}

The ObjectEntity provides a means to apply() a new state and concrete DomainEvent instances. In the above example a new CartState instance and a ProductPlacedInCart are applied together. The apply() causes four processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing ObjectStore.

  3. When the persistence is confirmed, the overridden stateObject(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the following code snippet.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The ObjectEntity concrete type may not process its next messages until the current state transition and any events are fully applied. The same entity's state must not transition until the new state's persistence is confirmed.

The above steps 1-4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

Every concrete ObjectEntity must override a few methods.

import io.vlingo.lattice.model.object.ObjectEntity;
public class CartEntity extends ObjectEntity implements Cart {
private CartState cartState;
...
@Override
protected String id() {
return cartState.cartId;
}
@Override
protected CartState stateObject() {
return cartState;
}
@Override
protected void stateObject(final CartState stateObject) {
this.cartState = stateObject;
}
@Override
protected Class<CartState> stateObjectType() {
return CartState.class;
}
}

These methods support the ObjectEntity abstract base in interacting with the ObjectStore in behalf of the concrete CartEntity.

  • The id() answers the String unique identity of the CartEntity.

  • The stateObject() answers the CartEntity current CartState value.

  • The stateObject(CartState) provides a new state to replace the previous state. This is called following the restoration of the CartState from persistence when the CartEntity is being (re)started from an existing state, and following the persistence of a new state and zero or more events.

Read the API documentation for more details.

ObjectTypeRegistry

In order to introduce your concrete entity types to the underlying ObjectStore persistence mechanism that is used by the ObjectEntity abstract base, you must register each one with the ObjectTypeRegistry.

import io.vlingo.lattice.model.object.ObjectTypeRegistry;
import io.vlingo.symbio.store.object.StateObjectMapper;
ObjectTypeRegistry registry = new ObjectTypeRegistry(world);
StateObjectMapper cartStateMapper =
StateObjectMapper.with(
CartState.class,
JdbiPersistMapper.with(
insertQuery,
updateQuery,
SqlStatement::bindFields),
new CartStateMapper());
String cartQuery = "SELECT FROM TBL_CARTS ...";
Info<Cart> cartInfo =
new Info(
objectStore,
CartState.class,
"Cart-Database",
MapQueryExpression.using(
Cart.class,
cartQuery,
MapQueryExpression.map("cartId", "")),
cartStateMapper);
registry.register(cartInfo);

The above code registers a mapper for the Cart and CartState using a Jdbi mapping.

  1. The ObjectTypeRegistry is created within the World that is used by the service.

  2. The StateObjectMapper is created for the CartState.

  3. An ObjectTypeRegistry.Info instance is created, which holds the components necessary to persist and query a CartState via the Cart implementor CartEntity. The Info holds: (a) the ObjectStore that persists CartState, (b) the type that is persisted, in this caseCartState.class, (c) an identifying name given to the store, (d) a QueryExpression, in this case a MapQueryExpression, that uses the cartQuery that takes as a parameter cartId, and (e) the previously created StateObjectMapper for CartState instances.

  4. The ObjectTypeRegistry is then used to register the Info<Cart>, enabling these to be supportable by the vlingo/lattice ObjectEntity.

You must register such an ObjectTypeRegistry.Info for every model type and state type, as was done above for Cart and CartState, respectively.

StatefulEntity Example

An actor that serves as a domain model entity may be designed extending the StatefulEntity type. The StatefulEntity should implement a protocol that defines the behavior available through message sending and delivery.

CartEntity is an StatefulEntity extender, meaning its state is stored as a key-value/document.

Here is a CartEntity that extends StatefulEntity and implements the Cart protocol.

import io.vlingo.lattice.model.object.ObjectEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
this.cartState = CartState.initial(cartId, userId);
}
}

A StatefulEntity should receive its unique identity by means of its constructor, making the identity available during its initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the StateStore.

The following shows how a command is handled by a StatefulEntity to persist the new state and any events using the underlying StateStore. Notice that the interface for applying the state and event are identical to the ObjectEntity interface.

import io.vlingo.lattice.model.stateful.StatefulEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(cartState.withSelected(product), productPlacedInCart);
}
...
}

The StatefulEntity provides a means to apply() a new state and concrete DomainEvent instances. In the above example a new CartState instance and a ProductPlacedInCart are applied together. The apply() causes four processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartState and the ProductPlacedInCart event are asynchronously and atomically persisted to the backing StateStore.

  3. When the persistence is confirmed, the overridden state(CartState) method is called to replace the current cartState with the new value. You can see this overridden method in the following code snippet.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The StatefulEntity concrete type may not process its next messages until the current state transition and any events are fully applied. The same entity's stated must not transition until the new state's persistence is confirmed.

The above steps 1-4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

Every concrete StatefulEntity must override a few methods.

import io.vlingo.lattice.model.DomainEvent;
import io.vlingo.lattice.model.stateful.StatefulEntity;
public class CartEntity extends StatefulEntity implements Cart {
private CartState cartState;
...
@Override
protected String id() {
return cartState.cartId;
}
@Override
protected void state(final CartState state) {
this.cartState = state;
}
@Override
protected Class<CartState> stateType() {
return CartState.class;
}
@Override
protected <C> Tuple3<CartState,List<Source<DomainEvent>>,String> whenNewState() {
// answer a Tuple3 if the context is an intial state; otherwise null
if (cartState.isInitial()) {
return Tuple3.of(cartState, new CartCreated(...), "created");
}
return null;
}
...
}

These methods support the StatefulEntity abstract base in interacting with the StateStore in behalf of the concrete CartEntity.

  • The id() answers the String unique identity of the CartEntity.

  • The state(CartState) provides a new state to replace the previous state. This is called following the restoration of the CartState from persistence when the CartEntity is being (re)started from an existing state, and following the persistence of a new state and zero or more events.

  • The stateType() answers the concrete state type, such as the Class<CartState>.

  • The whenNewState() is called each time the CartEntity actor is started. On the initial start just following the first ever construction of the entity, this method offers the opportunity to provide the initial state to be applied. The offer may be ignored by not overriding this method, as the default answers null. If this method is called at any other time other than the first ever construction of the entity, it must answer null.

Read the API documentation for more details.

StatefulTypeRegistry

In order to introduce your concrete entity types to the underlying StateStore persistence mechanism that is used by the StatefulEntity abstract base, you must register each one with the StatefulTypeRegistry.

import io.vlingo.lattice.model.stateful.StatefulTypeRegistry;
import io.vlingo.symbio.store.state.dynamodb.DynamoDBStateActor;
import io.vlingo.symbio.store.state.StateStore;
StateStore store = world.actorFor(StateStore.class, DynamoDBStateActor.class, dispatcher);
registry = new StatefulTypeRegistry(world);
registry.register(new Info<>(store, CartState.class, CartState.class.getSimpleName()));

You must register such an StateTypeRegistry.Info for every state type, as was done above for CartState.

EventSourced Example

An actor that serves as a domain model entity may be designed extending the EventSourced type. The EventSourced should implement a protocol that defines the behavior available through message sending and delivery.

CartEntity is an EventSource extender, meaning its events are stored as a stream in a journal.

Here is a CartEntity that extends EventSourced and implements the Cart protocol.

import io.vlingo.lattice.model.object.ObjectEntity;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
public CartEntity(String cartId, String userId) {
this.cartState = CartState.initial(cartId, userId);
}
}

A EventSourced should receive its unique identity by means of its constructor, making the identity available during its initialization. If the entity is preexisting, the identity is used during actor startup to reconstitute its state from the Journal.

The following shows how a command is handled by an EventSourced to persist events using the underlying Journal. Notice that the interface for applying the event(s) is different from the ObjectEntity and StatefulEntity.

import io.vlingo.lattice.model.sourcing.EventSourced;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
public void placeProductInCart(Product product) {
val productPlacedInCart =
ProductPlacedInCart.with(product.sku, product.description, product.price);
apply(productPlacedInCart);
}
...
}

In the case of EventSourced the state is not automatically persisted because it is not generally needed. The events themselves represent the state of the cart, as explained above. You'll next see how the state is persisted if the conditions exist to justify that.

Every concrete EventSourced must override at least one of a few methods.

import io.vlingo.lattice.model.sourcing.EventSourced;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
@Override
protected String streamName() {
return cartState.cartId;
}
...
}

In the above example the CartEntity must provide its streamName, which is the unique identity of the CartState. The streamName must always be a String, but this may be a String representation of another type. For example, the underlying id type may be a long or a UUID, both of which can have String representations.

@Override
protected String streamName() {
// from long to String
return String.valueOf(cartState.cartId);
}
@Override
protected String streamName() {
// from UUID to String
return cartState.cartId.toString();
}

In the case of EventSourced the implementor will be asked if they would like to include a snapshot of the state at any given apply(). The concrete EventSourced, in this case CartEntity, may do so using the following override.

@Override
protected CartSate snapshot() {
return cartState;
}

Notice however that this example unconditionally provides a CartState as a snapshot in all cases. This is generally not desirable because it is probably not necessary. In fact snapshots are generally only useful when a given entity may have many, many events associated with it. In such cases it is best to use a heuristic metric to determine when a snapshot is necessary, where the metric is based on some point where performance starts to degrade with many events.

For example, say that you have an entity that could have thousands of items in it. This might be a large order being populated with a many products from a warehouse. To solve the overhead of loading all events from the Journal, you may decide to take a snapshot at every 250th event.

@Override
protected OrderState snapshot() {
return currentVersion() % 250 == 0 state : null;
}

The currentVersion() provides a 1-based version or sequence number of the next event to be appended to the Journal; that is, the first event currently being applied. In the above example the snapshot of the OrderState will only be provided every 250th event. Otherwise null is answered to prevent a snapshot from being persisted along with the event(s).

The EventSourced base class has several optional overrides and tools. For example, if your streamName is a composite, you can simply override the corresponding method. Here the streamName is composed with a tenentId and a productId, each segment being separated by ":".

@Override
protected String streamName() {
// composite stream name
return streamNameFrom(":", state.tenentId, state.productId);
}

Another useful tool is asList(), which places multiple events into a List<DomainEvent> that may be passed to the respective apply() methods. For example, it's possible that a shopping cart is created by the user selecting the currently viewed product into a new cart.

public void createCartWithFirstProduct(...) {
apply(asList(new CartCreated(...), new ProductPlacedInCart(...)));
}

The EventSourced provides a means to apply() one or more concrete DomainEvent instances. In the above example a new CartCreated and ProductPlacedInCart are applied together. The apply() causes four processing steps.

  1. The CartEntity message processing is temporarily suspended until steps 2 and 3 are completed.

  2. The new CartCreated and ProductPlacedInCart events are asynchronously and atomically appended to the backing Journal.

  3. When the persistence is confirmed, a registered event-specific method is called, once for each applied event, to transition the current cartState with the new value. You can see examples in the following code snippet.

  4. The CartEntity message processing is restored, allowing subsequent messages, such as commands, to be handled.

The EventSourced concrete type may not process its next messages until the events are fully applied. The same entity's stated must not transition until the new state's persistence is confirmed.

The above steps 1-4 are processed asynchronously; that is, the entity does not block a thread while these steps are carried out.

Every concrete EventSourced must register an event-specific method transition from the current state to a new state for each newly applied event.

import io.vlingo.lattice.model.sourcing.EventSourced;
public class CartEntity extends EventSourced implements Cart {
private CartState cartState;
...
static {
registerConsumer(CartEntity.class, CartCreated.class, CartEntity::applyCartCreated);
registerConsumer(CartEntity.class, ProductPlacedInCart.class, CartEntity::applyProductPlacedInCart);
...
}
private void applyCartCreated(CartCreated event) {
cartState = CartState.initial(event.cartId, event.userId);
}
private void applyProductPlacedInCart(ProductPlacedInCart event) {
Product product = new Product(event.productId, event.quantity, event.price);
cartState = cartState.withProduct(product);
}
...
}

Following the confirmation of the CartCreated event being persisted, the EventSourced base class calls applyCartCreated() with the CartCreated instance. This gives the CartEntity the opportunity to transition the CartState to the new value, which in this case is its initial value.

Following the confirmation of the ProductPlacedInCart event being persisted, the EventSourced base class calls applyProductPlacedInCart() with the new ProductPlacedInCart instance. This gives the CartEntity the opportunity to transition the CartState to the new value to include the newly placed product.

Read the API documentation for more details on all possible overrides. Note that all of the above applies also to the CommandSourced, with the same API documentation.

SourcedTypeRegistry

In order to introduce your concrete entity types to the underlying Journal persistence mechanism that is used by the EventSourced abstract base, you must register each one with the SourcedTypeRegistry.

import io.vlingo.lattice.model.sourcing.SourcedTypeRegistry.Info;
import io.vlingo.symbio.store.common.jdbc.Configuration;
import io.vlingo.symbio.store.journal.Journal;
...
journal = world.actorFor(Journal.class, JDBCJournalActor.class, configuration);
registry = new SourcedTypeRegistry(world);
Info info = new Info(journal, CartEntity.class, CartEntity.class.getSimpleName());
registry.register(info);

You must register such an SourcedTypeRegistry.Info for every state type, as was done above for CartState.

Summary

In all cases, the new state and/or events are persisted asynchronously and safely replaced on the entity instance when reconstituted. The apply() handles all that for you, but without involving the aggregate designer in persistence, and minus technical naming. There are no race conditions or conflicting operation results, or failed persistence due to a database consistency violation.

Modeling State

In all of the above entity examples there is a single state object held by each of the entity types, the CartState type. This approach has several advantages.

  • There is only one variable to manage for all state, including one or more identities and all state that may transition over time.

  • Less state objects to manage requires less ongoing cognition.

  • The state is immutable, which makes the state simpler to maintain and reason about.

  • The state has a side-effect-free, intention-revealing interface, resulting in states transitioning by being fully replaced.

  • The state transitions one operation at a time, with each transition clearly focused on a single reason for replacement.

Such a state object may be designed as follows.

public final class CartState {
public final String cartId;
public final List<Product> products;
public final String userId;
public static CartState initial(String cartId, String userId) {
new CartState(cartId, userId);
}
public CartState withProduct(Product product) {
new CartState(cartId, userId, place(product));
}
private CartState(String cartId, String userId) {
this(cartId, userId, new ArrayList<>(0));
}
private CartState(String cartId, String userId, List<Product> products) {
this.cartId = cartId;
this.userId = userId;
this.products = products;
}
private List<Product> place(Product product) {
List<Product> products = new ArrayList(products);
products.add(product);
return Collections.unmodifiableList(products);
}
}

Note that the instance variables are all declared public final, making accessor methods unnecessary because clients may directly access the immutable variables. A functional style of programming is employed, which is named referential transparency. That is, the value outcome of a function may be replaced by the value itself. The benefit is the reduction of the cognitive overhead of the interface. Yet, this means that you must take care to design naturally mutable objects, such as List<Product>, as immutable. The above example employs Collections.unmodifiableList() to make the products immutable so that client access may not directly change the list without the CartState knowledge.

You may easily add support for other concepts in the state type, such as collections of Coupon or DiscountCode instances, depending on you Ubiquitous Language. All such additions would focus on only the changes necessary to support the CartEntity commands.

StateAdapter and StateAdapterProvider

All store types, ObjectStore, StateStore, and Journal, serialize state objects to State<T> instances. Some stores must serialize state objects in order to persist them, while others do not. In the case of StateStore and Journal, the states are indeed serialized for the sake of persistence. On the other hand, the ObjectStore will always map state to a row and columns, and thus does not use a serialized State<T> to persist. Even so, ObjectStore will still always serialize the state to a State<T> for the purpose of post persistence processing. Therefore, every state type must use a StateAdapter.

import io.vlingo.common.serialization.JsonSerialization;
import io.vlingo.symbio.BaseEntry.TextEntry;
import io.vlingo.symbio.StateAdapter;
import io.vlingo.symbio.Metadata;
public class CartStateAdapter implements StateAdapter<CartState, TextState> {
@Override
public int typeVersion() {
return 1;
}
@Override
public CartState fromRawState(final TextState raw) {
return JsonSerialization.deserialized(raw.data, CartState.class);
}
@Override
public <ST> ST fromRawState(final TextState raw, final Class<ST> stateType) {
return JsonSerialization.deserialized(raw.data, stateType);
}
@Override
public TextState toRawState(CartState state, int stateVersion, Metadata metadata) {
final String serialization = JsonSerialization.serialized(state);
return new TextState(state.id, CartState.class, typeVersion(), serialization, stateVersion, metadata);
}
@Override
public TextState toRawState(final String id, final CartState state, final int stateVersion, final Metadata metadata) {
final String serialization = JsonSerialization.serialized(state);
return new TextState(id, CartState.class, typeVersion(), serialization, stateVersion, metadata);
}
}

Initially it is not necessary to create a StateAdapter<S,E> during early development if you are using the text edition of a given store, such as ObjectStore<String>, StateStore<String>, or Journal<String>. Instead you may benefit from the DefaultTextStateAdapter. This default adapter is used when no adapter is registered for a given state type. However, when state types change, you must definitely implement adapters for the changing types. In this case the adapters play an important role of upgrading, referred to as upcasting, the persisted State<T> to the new version of the state type.

See: io.vlingo.symbio.DefaultTextStateAdapter

To make the adapters available to stores, use the StateAdapterProvider.

import io.vlingo.symbio.StateAdapterProvider;
StateAdapterProvider stateAdapterProvider =
StateAdapterProvider.instance(world);
stateAdapterProvider.registerAdapter(
CartState.class, new CartStateAdapter());

EntryAdapter and EntryAdapterProvider

When using domain events, Source<T> types, such as concrete DomainEvent types, may be persisted to a store database. Such DomainEvent instances, such as CartCreated and ProductAddedToCart, must be serialized to a form that can be inserted into the underlying database as an instances of Entry<T>. Two types of Entry<T> are supported, TextEntry and BinaryEntry. The TextEntry is backed by a String, which is appropriate for serializing with JSON. The BinaryEntry is backed by a byte[], which may be used to hold the result of binary serialization (e.g. Protobuf and Avro).

It is strongly advised to never use Java default serialization. If you are unaware why, google for "problems with Java serialization".

Adapters are created by implementing EntryAdapter<S,E>. These adapt Source<T> instances to Entry<T> instances, and Entry<T> instances to Source<T> instances. For example, CartCreated instances may be adapted to TextEntry instances, and the same TextEntry instances adapted back to the corresponding CartCreated instances. Such an adapter would be named CartCreatedAdapter.

import io.vlingo.common.serialization.JsonSerialization;
import io.vlingo.symbio.BaseEntry.TextEntry;
import io.vlingo.symbio.EntryAdapter;
import io.vlingo.symbio.Metadata;
public final class CartCreatedAdapter implements EntryAdapter<CartCreated, TextEntry> {
@Override
public CartCreated fromEntry(TextEntry entry) {
return JsonSerialization.deserialized(entry.entryData(), CartCreated.class);
}
@Override
public TextEntry toEntry(CartCreated source, Metadata metadata) {
String serialization = JsonSerialization.serialized(source);
return new TextEntry(CartCreated.class, 1, serialization, metadata);
}
@Override
public TextEntry toEntry(CartCreated source, String id, Metadata metadata) {
String serialization = JsonSerialization.serialized(source);
return new TextEntry(id, CartCreated.class, 1, serialization, metadata);
}
@Override
public TextEntry toEntry(CartCreated source, int version, String id, Metadata metadata) {
final String serialization = JsonSerialization.serialized(source);
return new TextEntry(id, CartCreated.class, 1, serialization, version, metadata);
}
}

The EntryAdapter<S,E> provides three overloaded methods for adapting from a Source<S> to an Entry<E>, and one for adapting from Entry<E> to Source<S>. In the above example the Source<S> type CartCreated is adapted to a TextEntry using JSON, and the persisted TextEntry back to a CartCreated.

Initially it is not necessary to create an EntryAdapter<S,E> during early development if you are using the text edition of a given store, such as ObjectStore<String>, StateStore<String>, or Journal<String>. Instead you may benefit from the DefaultTextEntryAdapter. This default adapter is used when no adapter is registered for a given Source<S> type. However, when Source<S> types change, you must definitely implement adapters for the changing types. In this case the adapters play an important role of upgrading, referred to as upcasting, the persisted Entry<E> to the new version of the Source<S>type.

See: io.vlingo.symbio.DefaultTextEntryAdapter

To make the adapters available to stores, use the EntryAdapterProvider.

import io.vlingo.symbio.EntryAdapterProvider;
EntryAdapterProvider entryAdapterProvider =
EntryAdapterProvider.instance(world);
entryAdapterProvider.registerAdapter(
CartCreated.class, new CartCreatedAdapter());
entryAdapterProvider.registerAdapter(
ProductAddedToCart.class, new ProductAddedToCartAdapter());
...