Projections
The XOOM LATTICE way to project state, Domain Events, and other Source types into a CQRS Query Model.
Projections are an important part of the component set required to support CQRS, whether or not you are using Event Sourcing. Recall that in CQRS there are two models, one called the Command Model and another called the Query Model.
The Command Model is often referred to as the "Write Model" and the Query Model as the "Read Model." These AKAs are quite inaccurate because you actually read from and write to both models. We prefer the names Command Model and Query Model because these adhere to the pattern name, CQRS, and express the purpose of each of the two models.
A projection is a component that takes one set of states and transforms those into another state. This is typically how it works:
- 1.The Command Model causes a state mutation and emits one or more Domain Events
- 2.The new Domain Events are persisted into a
Journal
- 3.The emitted Domain Events are dispatched/routed to a set of Projection components based on their interest in a given Domain Event type
- 4.The Projection components interpret how each Domain Event should be reflected into the portion of the Query Model for which they are responsible, and project that interpretation into the persistent Query Model
In all of the above steps, you can replace "Domain Event" with "entity state," because you may decide not to use Domain Events in your Command Model. In other words, the full state of a Command Model entity may be interpreted for projection into the Query Model. Of course if you use Event Sourcing you will always project Domain Events into the Query Model.

The upper area of the diagram shows Events being projected into the Query Model.
With the XOOM LATTICE projection components, we provide the means to project an entity's state and various concrete
Source<?>
types—DomainEvent
and Command
, for example—into a CQRS query model.There are two steps required to use projections. You must implement one or more projections and you must configure those projections to be used within your service/application. These two topics are covered below.
The components used to manage the data filtering, routing, and the actual projections, must be created. The following are the component types.
- 1.The
ProjectToDescription
is a description of how data and events are filtered and selected. You may configure aProjectToDescription
for projectingState<T>
and also for projectingSource<T>
instances, such asDomainEvent
types. Both are described below. - 2.The
Dispatcher
andProjectionDispatcher
to be registered with the storage mechanism is created by means of an actor that implements both protocols. Two such are theBinaryProjectionDispatcherActor
andTextProjectionDispatcherActor
, and used when a binary store or text store is used, respectively. - 3.The store is created and
Dispatcher
registered with it.
The following describes each of the above steps.
A
ProjectToDescription
may be defined for state data and for events. The following shows a description defined for a state object.import io.vlingo.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
ProjectToDescription userProjectTo =
ProjectToDescription.with(
UserProjectionActor.class,
"User:new", "User:contact", "User:name");
State data may have associated
Metadata
, and the Metadata
instance may have a named operation
that caused the state to transition to the current value. The above description indicates that if the state data contains an operation
that matches "User:new"
, "User:contact"
, or "User:name"
, the dispatcher will route the data to the UserProjectionActor
.The following demonstrates how the
operation
name is associated with the new state. In this case the command is to replace the user's name with a different name. The transitioned state and the descriptive operation is applied. After the state is persisted, the Dispatcher
is given the opportunity to route the new state to a projection. As seen in the description of the previous code snippet, matching the "User:name"
operation will cause the new state to be routed to the UserProjectionActor
. @Override
public void withName(final Name name) {
return apply(state.withName(name), "User:name");
}
We provide a comprehensive example of using this full-state projection technique, known as the vlingo-http-frontservice.
A
ProjectToDescription
may be defined for state data and for events. The following shows a description defined for events.import io.vlingo.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
ProjectToDescription forumProjectTo =
ProjectToDescription.with(
ForumProjectionActor.class,
ForumStarted.class, ForumClosed.class, ForumModeratorAssigned.class);
The above description indicates that if a concrete
DomainEvent
is one of the types ForumStarted
, ForumClosed
, or ForumModeratorAssigned
, the dispatcher will route the event to the ForumProjectionActor
.The following demonstrates how the
ForumStarted
event occurs. After the event is persisted, the Dispatcher
is given the opportunity to route it to a projection. As seen in the description of the previous code snippet, matching the ForumStarted
event will cause it to be routed to the ForumProjectionActor
.public void startForum(Moderator moderator, String topic, String description) {
apply(new ForumStarted(moderator, topic, description));
}
Note however that keeping the
ProjectToDescription
up to date with new and changing event types could be error prone by being overlooked. Instead of referencing each event type, you may instead reference the Java package that contains all of the events.ProjectToDescription forumProjectTo =
ProjectToDescription.with(
ForumProjectionActor.class,
ForumStarted.class.getPackage());
The following demonstrates how the
ProjectToDescription
, Dispatcher
, ProjectionDispatcher
, TextProjectionDispatcherActor
, and Journal
are wired for use.import io.vlingo.actors.Protocols;
import io.vlingo.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
import io.vlingo.lattice.model.projection.TextProjectionDispatcherActor;
import io.vlingo.symbio.store.journal.Journal;
import io.vlingo.symbio.store.journal.jdbc.JDBCJournalActor;
List<ProjectToDescription> descriptions = Arrays.asList(forumProjectTo);
Protocols dispatcherProtocols =
world.stage().actorFor(
new Class<?>[] { Dispatcher.class, ProjectionDispatcher.class },
Definition.has(TextProjectionDispatcherActor.class, descriptions));
Protocols.Two<Dispatcher, ProjectionDispatcher> dispatchers =
Protocols.two(dispatcherProtocols);
Journal journal =
Journal.using(world.stage(), JDBCJournalActor.class,
dispatchers._1, configuration);
You may want to have an isolated component to configure your projections, such as this
ProjectionDispatcherProvider
example.package com.dashdawn.infra.projections;
import java.util.Arrays;
import java.util.List;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Protocols;
import io.vlingo.actors.Stage;
import io.vlingo.lattice.model.projection.ProjectionDispatcher;
import io.vlingo.lattice.model.projection.ProjectionDispatcher.ProjectToDescription;
import io.vlingo.lattice.model.projection.TextProjectionDispatcherActor;
import io.vlingo.symbio.store.dispatch.Dispatcher;
@SuppressWarnings("rawtypes")
public class ProjectionDispatcherProvider {
private static ProjectionDispatcherProvider instance;
public final ProjectionDispatcher projectionDispatcher;
public final Dispatcher storeDispatcher;
public static ProjectionDispatcherProvider instance() {
return instance;
}
public static ProjectionDispatcherProvider using(final Stage stage) {
if (instance != null) return instance;
final List<ProjectToDescription> descriptions =
Arrays.asList(
new ProjectToDescription(DashboardProjectionActor.class, com.dashdawn.model.dashboard.Events.class.getPackage()),
new ProjectToDescription(WaveBoardDetailsProjectionActor.class, com.dashdawn.model.waveboard.Events.class.getPackage()),
new ProjectToDescription(ProfileProjectionActor.class, "Profile:new", "Profile:twitter", "Profile:linkedIn", "Profile:website"));
final Protocols dispatcherProtocols =
stage.actorFor(
new Class<?>[] { Dispatcher.class, ProjectionDispatcher.class },
Definition.has(TextProjectionDispatcherActor.class, Definition.parameters(descriptions)));
final Protocols.Two<Dispatcher, ProjectionDispatcher> dispatchers = Protocols.two(dispatcherProtocols);
instance = new ProjectionDispatcherProvider(dispatchers._1, dispatchers._2);
return instance;
}
private ProjectionDispatcherProvider(final Dispatcher storeDispatcher, final ProjectionDispatcher projectionDispatcher) {
this.storeDispatcher = storeDispatcher;
this.projectionDispatcher = projectionDispatcher;
}
}
There are three primary objects created.
- 1.Notable are the three
ProjectToDescription
instances created, which are formed into a singleList<ProjectToDescription>
. Each of the descriptions indicate how various command model changes will be dispatched into the actor instance whose type is the first parameter. For example, thisDashboardProjectionActor
instance will be created inside theProjectionDispatcher
and allDomainEvent
types defined as inner classes of thecom.dashdawn.model.dashboard.Events
class, will be dispatched to it. The same goes for theWaveBoardDetailsProjectionActor
and the innerEvents
types defined in its package. - 2.The
descriptions
are then used to create theProjectionDispatcher
, which is implemented by theTextProjectionDispatcherActor
provided by XOOM LATTICE. Actually this actor implements two protocols, both theDispatcher
and theProjectionDispatcher
. TheDispatcher
is used by theJournal
to dispatch newly appended events to theProjectionDispatcher
. (This is implemented by the abstract baseProjectionDispatcherActor
, which is extended by the concrete actors,TextProjectionDispatcherActor
andBinaryProjectionDispatcherActor
). The two protocol references answered byactorFor()
are captured in theProtocols
type. - 3.Lastly the
ProjectionDispatcherProvider
instance is created, where both protocol references are held and available to dependents.
The following is an example of how the
Dispatcher
is used, where the command model storage will use it to dispatch events into the projections. Note that the command model store knows nothing about projections, only the Dispatcher
.ProjectionDispatcherProvider projectionDispatcherProvider =
ProjectionDispatcherProvider.using(world.stage());
CommandModelStoreProvider commandModelStoreProvider =
CommandModelStoreProvider
.using(
world.stage(),
registry,
projectionDispatcherProvider.storeDispatcher);
The above code may be placed in your
Bootstrap
or Startup
class containing a main()
.The following is an example of a
Projection
designed to handle state mutation operations. It corresponds to the above section ProjectToDescription
for State<T>
.public class UserProjectionActor extends Actor implements Projection {
...
@Override
public void projectWith(Projectable projectable, ProjectionControl control) {
User.UserState state = projectable.object();
UserData current = UserData.from(state);
Confirmer confirmer = control.confirmerFor(projectable);
switch (projectable.becauseOf()[0]) {
case "User:new":
...
break;
case "User:contact":
...
break;
case "User:name":
...
break;
}
confirmer.confirm();
}
}
The
Projectable
contains the data to be projected. The ProjectionControl
is used to confirm that the Projectable
has been projected so that the Dispatcher
will not route it again. The code inside each of the cases would create or update the views effected by the state mutations.The following is an example of a
Projection
designed to handle events. This example corresponds to the above section ProjectToDescription
for Source<T>
.public static class ForumProjectionActor extends Actor implements Projection {
...
@Override
public void projectWith(final Projectable projectable, final ProjectionControl control) {
projectable.entries().forEach(entry -> {
switch (entry.typed().getSimpleName()) {
case "ForumStarted":
...
break;
case "ForumClosed":
...
break;
case "ForumModeratorAssigned":
...
break;
}
});
control.confirmProjected(projectable.projectionId());
}
}
Since there may be one or more events that were applied, persisted, dispatched, and routed to the projection, the projection must iterate over all entries so as not to miss any. The code inside each of the cases would create or update the views effected by the events. A much cleaner approach follows.
There is an
Actor
abstract base type for this: StateStoreProjectionActor
. This is a much more powerful abstract than directly using the Projection
interface. As the name indicates, this projection base is used to project into a StateStore
.Since object-relational mapping and other object persistence approaches are less efficient than key-value stores, we have decided not to support ORM-based projections at this time.
Here is the a condensed code listing of the
StateStoreProjectionActor
.package io.vlingo.lattice.model.projection;
public abstract class StateStoreProjectionActor<T> extends Actor
implements Projection ... {
private final List<Source<?>> adaptedSources;
private final EntryAdapter<Source<?>, Entry<?>> entryAdapter;
private final StateAdapter<Object, State<?>> stateAdapter;
private final ReadResultInterest readInterest;
private final WriteResultInterest writeInterest;
private final StateStore stateStore;
public StateStoreProjectionActor(final StateStore stateStore) {
this(stateStore, defaultTextStateAdapter(), defaultTextEntryAdapter());
}
public StateStoreProjectionActor(
final StateStore stateStore,
final StateAdapter<Object, State<?>> stateAdapter,
final EntryAdapter<Source<?>, Entry<?>> entryAdapter) {
this.stateStore = stateStore;
this.stateAdapter = stateAdapter;
this.entryAdapter = entryAdapter;
this.readInterest = selfAs(ReadResultInterest.class);
this.writeInterest = selfAs(WriteResultInterest.class);
this.adaptedSources = new ArrayList<>(2);
}
public void projectWith(final Projectable projectable, final ProjectionControl control) {
upsertFor(projectable, control);
}
protected boolean alwaysWrite() {
return true;
}
protected T currentDataFor(final Projectable projectable) {
return projectable.object();
}
protected int currentDataVersionFor(final Projectable projectable, final T previousData, final int previousVersion) {
return alwaysWrite() ? projectable.dataVersion() : (previousVersion == -1 ? 1 : (previousVersion + 1));
}
protected String dataIdFor(final Projectable projectable) {
String dataId = projectable.dataId();
if (dataId.isEmpty()) {
try {
dataId = typedToIdentifiedDomainEvent(sources().get(0)).identity();
} catch (Exception e) {
// ignore; fall through
}
}
return dataId;
}
protected <S extends Source<?>, E extends Entry<?>> EntryAdapter<S,E> entryAdapter() {
return (EntryAdapter<S,E>) this.entryAdapter;
}
// Override if using full-state projections
protected T merge(final T previousData, final int previousVersion, final T currentData, final int currentVersion) {
return currentData;
}
// Override if using Event Sourcing projections
protected T merge(final T previousData, final int previousVersion, final T currentData, final int currentVersion, final List<Source<?>> sources) {
return merge(previousData, previousVersion, currentData, currentVersion);
}
protected void prepareForMergeWith(final Projectable projectable) {
adaptedSources.clear();
for (Entry <?> entry : projectable.entries()) {
adaptedSources.add(entryAdapter.anyTypeFromEntry(entry));
}
}
protected List<Source<?>> sources() {
return adaptedSources;
}
protected <ST extends State<?>> StateAdapter<?,ST> stateAdapter() {
return (StateAdapter<?,ST>) this.stateAdapter;
}
protected void upsertFor(final Projectable projectable, final ProjectionControl control) {
...
}
protected <S> S typed(final Object state) {
return (S) state;
}
protected <E> E typed(final DomainEvent event) {
return (E) event;
}
protected <E> E typed(final Source<?> source) {
return (E) source;
}
protected IdentifiedDomainEvent typedToIdentifiedDomainEvent(final Source<?> source) {
return (IdentifiedDomainEvent) source;
}
}
In the following documentation we explain a number of available overridable base class methods. Note that these are all optional, except that one of the two
merge(...)
methods should be overridden.Override the following method if you are performing full-state projections:
protected T merge(
final T previousData,
final int previousVersion,
final T currentData,
final int currentVersion)
Override the following method if you are performing projections for Event Sourcing:
protected T merge(
final T previousData,
final int previousVersion,
final T currentData,
final int currentVersion,
final List> sources)
The default implementation behaviors of all non-merge methods will very likely serve your needs fully.
To create a new type of projection, extend
StateStoreProjectionActor
, as demonstrated next. This example overrides the merge(...)
that is best used with Event Sourcing.package com.dashdawn.infra.projections.dashboard;
import static com.dashdawn.model.dashboard.Events;
import io.vlingo.lattice.model.DomainEvent;
import io.vlingo.lattice.model.IdentifiedDomainEvent;
import io.vlingo.lattice.model.projection.Projectable;
import io.vlingo.lattice.model.projection.StateStoreProjectionActor;
import io.vlingo.symbio.Entry;
public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
public DashboardProjectionActor() {
super(QueryModelStoreProvider.instance().store);
}
@Override
protected DashboardView merge(
final DashboardView previousData,
final int previousVersion,
final DashboardView currentData,
final int currentVersion,
final List<Source<?>> sources) {
final DashboardView view =
previousData == null
? currentData
: previousData;
for (final Source<?> event : sources) {
switch (match(event)) {
case DashboardDefined:
final DashboardDefined defined = typed(event);
view.id = defined.id;
view.title = defined.title;
view.summary = defined.summary;
view.status = Status.Inactive.name();
break;
case DashboardStreamed:
final DashboardStreamed streamed = typed(event);
view.streamValue = streamed.value;
view.status = Status.Streaming.name();
break;
case DashboardStreamHalted:
final DashboardStreamHalted halted = typed(event);
view.previousStreamValue = view.streamValue;
view.streamValue = "";
view.status = Status.Halted.name();
break;
case DashboardStreamRestarted:
final DashboardStreamRestarted restarted = typed(event);
view.streamValue = view.previousStreamValue;
view.status = Status.Restarted.name();
break;
case WaveBoardOpened:
final WaveBoardOpened opened = typed(event);
view.waveBoardName = opened.name;
view.waveBoardDevicePort = opened.port;
view.waveBoardCalibration = opened.calibration;
view.status = Status.WaveBoardOpened.name();
break;
case ...
case ...
case ...
case Unmatched:
logger().warn("Event of type " + event.typeName() + " was not matched.");
break;
}
}
return view;
}
private DashboardView match(final DomainEvent event) {
try {
return DashboardViewProjectableType.valueOf(event.typeName());
} catch (Exception e) {
return DashboardViewProjectableType.Unmatched;
}
}
}
This projection is next examined in detail, starting with the constructor.
public DashboardProjectionActor() {
super(QueryModelStoreProvider.instance().store);
}
The
DashboardProjectionActor
constructor uses the super constructor that takes only the StateStore
used to write and read DashboardView
instances. This single-parameter constructor provides a default StateAdapter
and EntryAdapter
for the concrete projection to use. These are part of XOOM SYMBIO and are known as DefaultTextStateAdapter
and DefaultTextEntryAdapter
. public StateStoreProjectionActor(final StateStore stateStore) {
this(stateStore, defaultTextStateAdapter(), defaultTextEntryAdapter());
}
There is a second constructor for
StateStoreProjectionActor
that takes an additional two parameters, one for the StateAdapter
and one for the EntryAdapter
. public StateStoreProjectionActor(
final StateStore stateStore,
final StateAdapter<Object, State<?>> stateAdapter,
final EntryAdapter<Source<?>, Entry<?>> entryAdapter) {
...
}
These adapters are used to adapt from persistent
State
and Entry
types to the model types. When the single argument constructor is used you get the default StateAdapter
and EntryAdapter
. Since there is only one
EntryAdapter
supported, but no doubt several or many DomainEvent
types needed, your own custom EntryAdapter
must internally hold the full number of EntryAdapter
instances needed for all DomainEvent
types. This might use a Map
with keys of Class<? extends DomainEvent>
and values of concrete EntryAdapter
instances. The outer EntryAdapter
registered with the StateStoreProjectionActor
would operate by looking up the specific concrete EntryAdapter
by DomainEvent
type, dispatch to it, and answer its return value.The
DashboardProjectionActor
implements the Projection
protocol, but there is a default implementation of method projectWith()
provided by the abstract base classStateStoreProjectionActor
. The implementation of this method is quite simple for most uses. If you need a more involved implementation you may override the default provided by the abstract base class. public void projectWith(final Projectable projectable, final ProjectionControl control) {
upsertFor(projectable, control);
}
The simple step is to "upsert" (i.e. insert or update, depending on existence) to current
Projectable
, which may hold a state, or entries, or both. protected void upsertFor(final Projectable projectable, final ProjectionControl control) {
final T currentData = currentDataFor(projectable);
prepareForMergeWith(projectable);
final String dataId = dataIdFor(projectable);
final BiConsumer<T,Integer> upserter = (previousData, previousVersion) -> {
final int currentDataVersion = currentDataVersionFor(projectable, previousData, previousVersion);
final T data = merge(previousData, previousVersion, currentData, currentDataVersion, sources());
final Confirmer confirmer = ProjectionControl.confirmerFor(projectable, control);
if (alwaysWrite() || !data.equals(previousData)) {
stateStore.write(dataId, data, currentDataVersion, writeInterest, confirmer);
} else {
confirmProjection(confirmer);
}
};
stowMessages(ReadResultInterest.class, WriteResultInterest.class);
stateStore.read(dataId, currentData.getClass(), readInterest, upserter);
}
The
upsertFor()
does the following.- 1.Asks
currentDataFor(projectable)
for its notion of the current state, as in the state that is now being projected. You may overridecurrentDataFor(projectable)
. This may or may not be useful, but if it is, you must return the value that is considered current from theProjectable
. This method is generally used only for full-state projections rather than those based onDomainEvent
instances. - 2.Requests that the projection
prepareForMergeWith()
. The default behavior adapts anyentries()
in theProjectable
toSource<?>
instances, holding them in an internalList<Source<?>>
. The concrete extender may get the adaptedSource<?>
instances using thesources()
method. These are provided by default with the secondmerge(...)
method that is used for Event Sourcing projections. If you have some alternative or additional preparation to do before the merge this method must be overridden. - 3.Reads the current
T
value from theStateStore
(see the last statement in the method) and passes aBiConsumer<T, Integer> upserter
. When the read completes, theupserter
is used to merge thepreviousData
with thecurrentData
and/or apply anySource<?>
instances. - 4.Writes the merged state back to the
StateStore
ifalwaysWrite()
istrue
or if the newly merged data is not equal to the previous data. This happens inside theupserter
.
The
DashboardProjectionActor
implements currentDataFor()
as follows.public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
...
protected DashboardView currentDataFor(final Projectable projectable) {
return Empty;
}
...
}
As previously indicated, the
currentDataFor()
may or may not be useful in terms of providing a new state. This implementation answers an Empty
instance of the DashboardView
, which will be used by the merge()
as an accumulator of modifications. This particular override example is unnecessary but demonstrates an alternative to the default implementation. Assume that we decide not to implement the above override and remove the Empty
value declaration as well as the currentDataFor()
method override.If you are not projecting from events, but the full state of the domain object, an implementation of
currentDataFor()
and merge()
similar to the previous section is necessary.Next the
DashboardProjectionActor
is given the opportunity to prepare for the merge.public class ControllerProjectionActor extends StateStoreProjectionActor<ControllerView> {
private String dataId;
private final List<IdentifiedDomainEvent> events;
...
@Override
protected void prepareForMergeWith(final Projectable projectable) {
events.clear();
for (Entry <?> entry : projectable.entries()) {
events.add(entryAdapter().anyTypeFromEntry(entry));
}
}
...
}
In this implementation the
prepareForMergeWith()
adapts all Entry
instances into instances of the DomainEvent
specialization IdentifiedDomainEvent
. This gives the concrete event types the means to provide the identity of the event instance. As indicated above, this specific implementation of prepareForMergeWith()
is redundant because it does the same that the default behavior already does. You would choose to override prepareForMergeWith()
only if you required behavior beyond the above implementation.Immediately following
prepareForMergeWith()
the upsert()
calls dataIdFor()
. @Override
protected String dataIdFor(final Projectable projectable) {
dataId = events.get(0).identity();
return dataId;
}
As seen above, the
dataId
is provided by the first (and possibly only) instance of the IdentifiedDomainEvent
in the events
list. Recall that this events
list was just previously populated by the prepareForMergeWith()
. This example dataIdFor()
method override is likely redundant because the default behavior provides a more thorough implementation, first checking for the Projectable dataId()
, and if that is not available, looks for the identity from one of the first IdentifiedDomainEvent
. If neither of these are available then the dataId
will be a blank String
, and an override of this method would be necessary to provide a custom value.If you are using the default implementation of
prepareForMergeWith()
, the source/event instances are retrieved as follows. Here again, your DomainEvent
instances must be of the extends IdentifiedDomainEvent
type, which can provide the identity()
of the entity that emitted them. protected String dataIdFor(final Projectable projectable) {
String dataId = projectable.dataId();
if (dataId.isEmpty()) {
try {
dataId = typedToIdentifiedDomainEvent(sources().get(0)).identity();
} catch (Exception e) {
// ignore; fall through
}
}
return dataId;
}
The following is the
DashboardProjectionActor
implementation of merge()
and its helper method mergeInto()
, assuming that the prepareForMergeWith()
was also overridden and a local List<DomainEvent>
of events
was collected by it. This example shows an alternative to using the more appropriate merge(...)
override that provides the List<Source<?>>
parameter.public class DashboardProjectionActor extends StateStoreProjectionActor<ControllerView> {
private static final DashboardView Empty = new DashboardView();
private final List<IdentifiedDomainEvent> events;
private Projectable projectable;
...
@Override
protected DashboardView merge(
final DashboardView previousData,
final int previousVersion,
final DashboardView currentData,
final int currentVersion) {
return mergeInto(currentData);
}
...
@Override
protected void prepareForMergeWith(final Projectable projectable) {
events.clear();
for (Entry <?> entry : projectable.entries()) {
events.add(entryAdapter().anyTypeFromEntry(entry));
}
}
private DashboardView mergeInto(final DashboardView view) {
for (final DomainEvent event : events) {
switch (match(event)) {
case DashboardDefined:
final DashboardDefined defined = typed(event);
view.id = defined.id;
view.title = defined.title;
view.summary = defined.summary;
view.status = Status.Inactive.name();
break;
case DashboardStreamed:
final DashboardStreamed streamed = typed(event);
view.streamValue = streamed.value;
view.status = Status.Streaming.name();
break;
case DashboardStreamHalted:
final DashboardStreamHalted halted = typed(event);
view.streamValue = "";
view.status = Status.Halted.name();
break;
case DashboardStreamRestarted:
final DashboardStreamRestarted restarted = typed(event);
view.streamValue = streamed.value;
view.status = Status.Restarted.name();
break;
case WaveBoardOpened:
final WaveBoardOpened opened = typed(event);
view.waveBoardName = opened.name;
view.waveBoardDevicePort = opened.port;
view.waveBoardCalibration = opened.calibration;
view.status = Status.WaveBoardOpened.name();
break;
case ...
case ...
case ...
case Unmatched:
logger().warn("Event of type " + event.typeName() + " was not matched.");
break;
}
}
return view;
}
...
}
The
merge()
passes the currentData
to be used by mergeInto()
as an accumulator of the new state. Any preexisting state on the currentData
that coincides with the interpretation of the DomainEvent
will be overwritten/replaced.Because the
Projectable
may have multiple entries (serialized DomainEvent
instances), the mergeInto()
iterates over the entire collection of events deserialized by the method prepareForMergeWith()
.Assuming you are using the default
prepareForMergeWith()
implementation, you would iterate as follows, using the sources()
method rather than a collection that you gathered on your own. ...
private DashboardView mergeInto(final DashboardView view) {
for (final Source<?> event : sources()) {
switch (match(event)) {
case DashboardDefined:
final DashboardDefined defined = typed(event);
...
case DashboardStreamed:
final DashboardStreamed streamed = typed(event);
...
}
}
return view;
}
...
The
switch
uses an enum
of DashboardViewProjectableType
to match on each of the DomainEvent
instances.public enum DashboardViewProjectableType {
DashboardDefined,
DashboardStreamed,
DashboardStreamHalted,
DashboardStreamRestarted,
WaveBoardOpened,
Unmatched
}
The
DomainEvent
type is matched to the enum by the match()
method. A non-match produces the Unmatched
type, which is ignored by mergeInto()
, but logged as a warning. private DashboardView match(final DomainEvent event) {
try {
return DashboardViewProjectableType.valueOf(event.typeName());
} catch (Exception e) {
return DashboardViewProjectableType.Unmatched;
}
}
Once a specific type is matched, the
mergeInto()
maps event attributes to DashboardView
attributes. Any preexisting values that are mapped are overwritten on the view
accumulator. The iteration and matching is done for any number of events, which may be included with the current Projectable
or others received in the future. case DashboardDefined:
final DashboardDefined defined =