Projections

How to project state, Domain Events, and other Source types into a CQRS Query Model.

Projections are an important part of the component set required to support CQRS, whether or not you are using Event Sourcing. Recall that in CQRS there are two models, one called the Command Model and another called the Query Model.

The Command Model is often referred to as the "Write Model" and the Query Model as the "Read Model." These AKAs are quite inaccurate because you actually read from and write to both models. We prefer the names Command Model and Query Model because these adhere to the pattern name, CQRS, and express the purpose of each of the two models.

A projection is a component that takes one set of states and transforms those into another state. This is typically how it works:

  1. The Command Model causes a state mutation and emits one or more Domain Events

  2. The new Domain Events are persisted into a Journal

  3. The emitted Domain Events are dispatched/routed to a set of Projection components based on their interest in a given Domain Event type

  4. The Projection components interpret how each Domain Event should be reflected into the portion of the Query Model for which they are responsible, and project that interpretation into the persistent Query Model

In all of the above steps, you can replace "Domain Event" with "entity state," because you may decide not to use Domain Events in your Command Model. In other words, the full state of a Command Model entity may be interpreted for projection into the Query Model. Of course if you use Event Sourcing you will always project Domain Events into the Query Model.

With the vlingo/lattice projection components, we provide the means to project an entity's state and various concrete Source<?> types—DomainEvent and Command, for example—into a CQRS query model.

There are two steps required to use projections. You must implement one or more projections and you must configure those projections to be used within your service/application. These two topics are covered below.

Implementing a Projection

There is an Actor abstract base type for this: StateStoreProjectionActor. As the name indicates, this projection base is used to project into a StateStore.

Since object-relational mapping and other object persistence approaches are less efficient than key-value stores, we have decided not to support ORM-based projections at this time.

Here is the a condensed code listing of the StateStoreProjectionActor.

package io.vlingo.lattice.model.projection;
public abstract class StateStoreProjectionActor<T> extends Actor
implements Projection ... {
public StateStoreProjectionActor(final StateStore stateStore) {
this(stateStore, defaultTextStateAdapter(), defaultTextEntryAdapter());
}
public StateStoreProjectionActor(
final StateStore stateStore,
final StateAdapter<Object, State<?>> stateAdapter,
final EntryAdapter<Source<?>, Entry<?>> entryAdapter) {
...
}
public void projectWith(final Projectable projectable, final ProjectionControl control) {
upsertFor(projectable, control);
}
protected abstract T currentDataFor(final Projectable projectable);
protected String dataIdFor(final Projectable projectable) {
return projectable.dataId();
}
protected <S extends Source<?>, E extends Entry<?>> EntryAdapter<S,E> entryAdapter() {
return (EntryAdapter<S,E>) this.entryAdapter;
}
protected abstract T merge(final T previousData, final int previousVersion, final T currentData, final int currentVersion);
protected void prepareForMergeWith(final Projectable projectable) {
// no default behavior
}
protected <ST extends State<?>> StateAdapter<?,ST> stateAdapter() {
return (StateAdapter<?,ST>) this.stateAdapter;
}
protected void upsertFor(final Projectable projectable, final ProjectionControl control) {
...
}
protected <S> S typed(final Object state) {
return (S) state;
}
protected <E> E typed(final DomainEvent event) {
return (E) event;
}
}

To create a new type of projection extend StateStoreProjectionActor, as demonstrated next.

package com.dashdawn.infra.projections.dashboard;
import static com.dashdawn.model.dashboard.Events;
import io.vlingo.lattice.model.DomainEvent;
import io.vlingo.lattice.model.IdentifiedDomainEvent;
import io.vlingo.lattice.model.projection.Projectable;
import io.vlingo.lattice.model.projection.StateStoreProjectionActor;
import io.vlingo.symbio.Entry;
public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
private String dataId;
private final List<IdentifiedDomainEvent> events;
public DashboardProjectionActor() {
super(QueryModelStoreProvider.instance().store);
}
@Override
protected DashboardView currentDataFor(final Projectable projectable) {
return Empty;
}
@Override
protected String dataIdFor(final Projectable projectable) {
return events.get(0).identity();
}
@Override
protected DashboardView merge(
final DashboardView previousData,
final int previousVersion,
final DashboardView currentData,
final int currentVersion) {
return mergeInto(currentData);
}
@Override
protected void prepareForMergeWith(final Projectable projectable) {
events.clear();
for (Entry <?> entry : projectable.entries()) {
events.add(entryAdapter().anyTypeFromEntry(entry));
}
}
private DashboardView mergeInto(final DashboardView view) {
for (final DomainEvent event : events) {
switch (match(event)) {
case DashboardDefined:
final DashboardDefined defined = typed(event);
view.id = defined.id;
view.title = defined.title;
view.summary = defined.summary;
view.status = Status.Inactive.name();
break;
case DashboardStreamed:
final DashboardStreamed streamed = typed(event);
view.streamValue = streamed.value;
view.status = Status.Streaming.name();
break;
case DashboardStreamHalted:
final DashboardStreamHalted halted = typed(event);
view.streamValue = "";
view.status = Status.Halted.name();
break;
case DashboardStreamRestarted:
final DashboardStreamRestarted restarted = typed(event);
view.streamValue = streamed.value;
view.status = Status.Restarted.name();
break;
case WaveBoardOpened:
final WaveBoardOpened opened = typed(event);
view.waveBoardName = opened.name;
view.waveBoardDevicePort = opened.port;
view.waveBoardCalibration = opened.calibration;
view.status = Status.WaveBoardOpened.name();
break;
case ...
case ...
case ...
case Unmatched:
logger().warn("Event of type " + event.typeName() + " was not matched.");
break;
}
}
return view;
}
private DashboardView match(final DomainEvent event) {
try {
return DashboardViewProjectableType.valueOf(event.typeName());
} catch (Exception e) {
return DashboardViewProjectableType.Unmatched;
}
}
}

This projection is next examined in detail, starting with the constructor.

public DashboardProjectionActor() {
super(QueryModelStoreProvider.instance().store);
}

The DashboardProjectionActor constructor uses the super constructor that takes only the StateStore used to write and read DashboardView instances. This single-parameter constructor provides a default StateAdapter and EntryAdapter for the concrete projection to use. These are part of vlingo/symbio and are known as DefaultTextStateAdapter and DefaultTextEntryAdapter.

public StateStoreProjectionActor(final StateStore stateStore) {
this(stateStore, defaultTextStateAdapter(), defaultTextEntryAdapter());
}

There is a second constructor for StateStoreProjectionActor that takes an additional two parameters, one for the StateAdapter and one for the EntryAdapter.

public StateStoreProjectionActor(
final StateStore stateStore,
final StateAdapter<Object, State<?>> stateAdapter,
final EntryAdapter<Source<?>, Entry<?>> entryAdapter) {
...
}

These adapters are used to adapt from persistent State and Entry types to the model types. When the single argument constructor is used you get the default StateAdapter and EntryAdapter.

Since there is only one EntryAdapter supported, but know doubt several or many DomainEvent types needed, your own custom EntryAdapter must internally hold the full number of EntryAdapter instances needed for all DomainEvent types. This might use a Map with keys of Class<? extends DomainEvent> and values of concrete EntryAdapter instances. The outer EntryAdapter registered with the StateStoreProjectionActor would operate by looking up the specific concrete EntryAdapter by DomainEvent type, dispatch to it, and answer its return value.

The DashboardProjectionActor implements the Projection protocol, but there is a default implementation of method projectWith() provided by the abstract base classStateStoreProjectionActor. The implementation of this method is quite simple for most uses. If you need a more involved implementation you may override the default provided by the abstract base class.

public void projectWith(final Projectable projectable, final ProjectionControl control) {
upsertFor(projectable, control);
}

The simple step is to "upsert" (i.e. insert or update, depending on existence) to current Projectable, which may hold a state, or entires, or both.

protected void upsertFor(final Projectable projectable, final ProjectionControl control) {
final T currentData = currentDataFor(projectable);
final int currentDataVersion = projectable.dataVersion();
prepareForMergeWith(projectable);
final String dataId = dataIdFor(projectable);
final BiConsumer<T,Integer> upserter = (previousData, previousVersion) -> {
final T data = merge(previousData, previousVersion, currentData, currentDataVersion);
stateStore.write(dataId, data, currentDataVersion, writeInterest, control.confirmerFor(projectable));
};
stateStore.read(dataId, currentData.getClass(), readInterest, upserter);
}

The upsertFor() does the following.

  1. Asks your concrete extender for its notion of the current state, as in the state that is now being projected. You must implement currentDataFor(projectable). This may or may not be useful, and the extender may decide to return a template value.

  2. Requests that the projection prepareForMergeWith(). The default behavior does nothing. If you have some preparation to do before the merge this method must be overridden.

  3. Reads the current T value from the StateStore (see the last statement in the method) and passes a BiConsumer<T, Integer> upserter. When the read completes, the upserter is used to merge to previousData with the currentData.

  4. Writes the merged state back to the StateStore. This happens inside the upserter.

The DashboardProjectionActor implements currentDataFor() as follows.

public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
...
protected DashboardView currentDataFor(final Projectable projectable) {
return Empty;
}
...
}

As previously indicated, the currentDataFor() may not be useful in terms of providing a new state. This implementation answers an Empty instance of the DashboardView, which will be used by the merge() as an accumulator of modifications.

Next the DashboardProjectionActor is given the opportunity to prepare for the merge.

public class ControllerProjectionActor extends StateStoreProjectionActor<ControllerView> {
private String dataId;
private final List<IdentifiedDomainEvent> events;
...
@Override
protected void prepareForMergeWith(final Projectable projectable) {
events.clear();
for (Entry <?> entry : projectable.entries()) {
events.add(entryAdapter().anyTypeFromEntry(entry));
}
}
...
}

In this implementation the prepareForMergeWith() adapts all Entry instances into instances of the DomainEvent specialization IdentifiedDomainEvent. This gives the concrete event types the means to provide the identity of the event instance.

Immediately following prepareForMergeWith() the upsert() calls dataIdFor().

@Override
protected String dataIdFor(final Projectable projectable) {
dataId = events.get(0).identity();
return dataId;
}

As scene above, the dataId is provided by the first (and possibly only) instance of theIdentifiedDomainEvent in the events list. Recall that this events list was just populated by the prepareForMergeWith().

The following is the DashboardProjectionActor implementation of merge() and its helper method mergeInto().

public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
private Projectable projectable;
...
@Override
protected DashboardView merge(
final DashboardView previousData,
final int previousVersion,
final DashboardView currentData,
final int currentVersion) {
return mergeInto(currentData);
}
...
private DashboardView mergeInto(final DashboardView view) {
for (final DomainEvent event : events) {
switch (match(event)) {
case DashboardDefined:
final DashboardDefined defined = typed(event);
view.id = defined.id;
view.title = defined.title;
view.summary = defined.summary;
view.status = Status.Inactive.name();
break;
case DashboardStreamed:
final DashboardStreamed streamed = typed(event);
view.streamValue = streamed.value;
view.status = Status.Streaming.name();
break;
case DashboardStreamHalted:
final DashboardStreamHalted halted = typed(event);
view.streamValue = "";
view.status = Status.Halted.name();
break;
case DashboardStreamRestarted:
final DashboardStreamRestarted restarted = typed(event);
view.streamValue = streamed.value;
view.status = Status.Restarted.name();
break;
case WaveBoardOpened:
final WaveBoardOpened opened = typed(event);
view.waveBoardName = opened.name;
view.waveBoardDevicePort = opened.port;
view.waveBoardCalibration = opened.calibration;
view.status = Status.WaveBoardOpened.name();
break;
case ...
case ...
case ...
case Unmatched:
logger().warn("Event of type " + event.typeName() + " was not matched.");
break;
}
}
return view;
}
...
}

The merge() passes the currentData to be used by mergeInto() as an accumulator of the new state. Any preexisting state on the currentData that coincides with the interpretation of the DomainEvent will be overwritten/replaced.

Because the Projectable may have multiple entries (serialized DomainEvent instances), the mergeInto() iterates over the entire collection of events deserialized by the method prepareForMergeWith().

The switch uses an enum of DashboardViewProjectableType to match on each of the DomainEvent instances.

public enum DashboardViewProjectableType {
DashboardDefined,
DashboardStreamed,
DashboardStreamHalted,
DashboardStreamRestarted,
WaveBoardOpened,
Unmatched
}

The DomainEvent type is matched to the enum by the match() method. A non-match produces the Unmatched type, which is ignored by mergeInto(), but logged as a warning.

private DashboardView match(final DomainEvent event) {
try {
return DashboardViewProjectableType.valueOf(event.typeName());
} catch (Exception e) {
return DashboardViewProjectableType.Unmatched;
}
}

Once a specific type is matched, the mergeInto() maps event attributes to DashboardView attributes. Any preexisting values that are mapped are overwritten on the view accumulator. The iteration and matching is done for any number of events, which may be included with the current Projectable or others received in the future.

case DashboardDefined:
final DashboardDefined defined = typed(event);
view.id = defined.id;
view.title = defined.title;
view.summary = defined.summary;
view.status = Status.Inactive.name();
break;

Note that the design of the DashboardView to directly use public mutable attributes was done to keep the implementation simple, albeit somewhat "dangerous" according to opinions. It doesn't seem like a bad thing in this specific case because the view object is used in very specific limited ways. Feel free to include accessor methods for reading and writing in your designs.

When the merge() completes the modified DashboardView is written to the StateStore.

Configuring Projections

Once you have one or more projections you must configure them into the other tooling.

package com.dashdawn.infra.projections;
import java.util.Arrays;
import java.util.List;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Protocols;
import io.vlingo.actors.Stage;
import io.vlingo.lattice.model.projection.ProjectionDispatcher;
import io.vlingo.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
import io.vlingo.lattice.model.projection.TextProjectionDispatcherActor;
import io.vlingo.symbio.store.dispatch.Dispatcher;
@SuppressWarnings("rawtypes")
public class ProjectionDispatcherProvider {
private static ProjectionDispatcherProvider instance;
public final ProjectionDispatcher projectionDispatcher;
public final Dispatcher storeDispatcher;
public static ProjectionDispatcherProvider instance() {
return instance;
}
public static ProjectionDispatcherProvider using(final Stage stage) {
if (instance != null) return instance;
final List<ProjectToDescription> descriptions =
Arrays.asList(
new ProjectToDescription(DashboardProjectionActor.class, com.dashdawn.model.dashboard.Events.class.getPackage()),
new ProjectToDescription(WaveBoardDetailsProjectionActor.class, com.dashdawn.model.waveboard.Events.class.getPackage()),
new ProjectToDescription(ProfileProjectionActor.class, "Profile:new", "Profile:twitter", "Profile:linkedIn", "Profile:website"));
final Protocols dispatcherProtocols =
stage.actorFor(
new Class<?>[] { Dispatcher.class, ProjectionDispatcher.class },
Definition.has(TextProjectionDispatcherActor.class, Definition.parameters(descriptions)));
final Protocols.Two<Dispatcher, ProjectionDispatcher> dispatchers = Protocols.two(dispatcherProtocols);
instance = new ProjectionDispatcherProvider(dispatchers._1, dispatchers._2);
return instance;
}
private ProjectionDispatcherProvider(final Dispatcher storeDispatcher, final ProjectionDispatcher projectionDispatcher) {
this.storeDispatcher = storeDispatcher;
this.projectionDispatcher = projectionDispatcher;
}
}

There are three primary objects created.

  1. Notable are the three ProjectToDescription instances created, which are formed into a single List<ProjectToDescription>. Each of the descriptions indicate how various command model changes will be dispatched into the actor instance whose type is the first parameter. For example, this DashboardProjectionActor instance will be created inside the ProjectionDispatcher and all DomainEvent types defined as inner classes of the com.dashdawn.model.dashboard.Events class, will be dispatched to it. The same goes for the WaveBoardDetailsProjectionActor and the inner Events types defined in its package.

  2. The descriptions are then used to create the ProjectionDispatcher, which is implemented by the TextProjectionDispatcherActor provided by vlingo/lattice. Actually this actor implements two protocols, both the Dispatcher and the ProjectionDispatcher. The Dispatcher is used by the Journal to dispatch newly appended events to the ProjectionDispatcher. (This is implemented by the abstract base ProjectionDispatcherActor, which is extended by the concrete actors, TextProjectionDispatcherActor and BinaryProjectionDispatcherActor). The two protocol references answered by actorFor() are captured in the Protocols type.

  3. Lastly the ProjectionDispatcherProvider instance is created, where both protocol references are held and available to dependents.

Projecting Entity State

The third ProjectionDescription is different from the first two that project events emitted by two EventSourced entities. The third one projects directly from state changes that are persisted by a StatefulEntity.

ProjectToDescription profileProjectionDescription =
new ProjectToDescription(
ProfileProjectionActor.class,
"Profile:new", "Profile:twitter", "Profile:linkedIn", "Profile:website");

We provide a comprehensive example of using this full-state projection technique, known as the vlingo-http-frontservice.

Configuring the Dispatcher

The following is an example of how the Dispatcher is used, where the command model storage will use it to dispatch events into the projections. Note that the command model store knows nothing about projections, only the Dispatcher.

ProjectionDispatcherProvider projectionDispatcherProvider =
ProjectionDispatcherProvider.using(world.stage());
CommandModelStoreProvider commandModelStoreProvider =
CommandModelStoreProvider
.using(
world.stage(),
registry,
projectionDispatcherProvider.storeDispatcher);

The above code may be placed in your Bootstrap or Startup class containing a main().