Processes
XOOM LATTICE long-running process orchestrations, aka Sagas, with guided steps to results.
Processes can be used to manage complex, coarse-grained, long-running business transactions. Each step in the process is orchestrated until a given outcome is reached, which may be a successful completion.
Some processes may never end in that they may repeat indefinitely, producing one or more smaller subsets of successful outcomes with each iteration.
Because such a process runs longer than for a single transaction and may spread across any number of Bounded Contexts (i.e. microservices), and because the process may at some point be temporarily evicted from memory, the process itself must be persistent. The XOOM LATTICE tools assist in managing such processes, whether by means of object persistence, stateful command models, or sourced with events and commands.
Note that a long-running process is often referred to as Saga. Even so, we here acknowledge that this use of the name Saga to refer to such a process is not typically according to the original pattern definition, although it may be designed as such. Primarily this would be accomplished by emulating a rollback by applying compensating transactions to the point where any previous transactions are essentially undone. This is known as backward recovery, but there is also a second choice known as forward recovery, and the two may be combined.
As a general rule, a process is informed of an outcome, and in reaction to that outcome causes another action to be taken. This will repeat as the process is informed of the outcome of the previous action that it caused, and then cause yet another action to be taken.
Typically a
DomainEvent
will trigger the process to start. The process will then emit a Command
, which is to cause the next step in the process. When the DomainEvent
from that step outcome is seen, process will again emit a Command
, which is to cause the next step in the process. This repeats until the process has completed (if in fact it is meant to complete). The process tracks the current state of the state machine, which along with the next DomainEvent
it sees, will determine the next step to be taken.
A process that spans three different Bounded Contexts.
The above diagram shows how the process starts by a user submitting a command and then proceeds through five total steps. Note that a
DomainEvent
is seen by the process as a stimulus, and the process then emits a Command
to cause the next action. This pattern repeats until completion is reached on the far right.
The Process and three abstract types, each corresponding to a model entity type.
There are three
Process
standard implementors that are abstract base types. These base classes are meant to be extended by concrete implementors.Type | Description |
SourcedProcess | Uses Command Sourcing and/or Event Sourcing to maintain the state of the process and to direct next steps. |
ObjectProcess | Use a persistent object-relational mapped object to maintain the state of the state machine. The extender must still emit commands or events to direct next steps. |
StatefulProcess | Use a persistent key-value to maintain the state of the state machine. The extender must still emit commands or events to direct next steps. |
These are all documented in detail below.
Especially in the case of implementing a
Process
, the SourcedProcess
is generally the best choice. The use of Command
and/or DomainEvent
types to direct the steps of the process are a natural fit. You may still, however, accomplish the same by using Command
and/or DomainEvent
types within an ObjectProcess
and StatefulProcess
. It may be more a matter of whether you prefer to always persist state snapshots, which you must always do with these two process types.The protocol for processes is defined in XOOM LATTICE.
package io.vlingo.lattice.model.process;
import java.util.List;
import java.util.function.Supplier;
import io.vlingo.common.Completes;
import io.vlingo.lattice.model.Command;
import io.vlingo.lattice.model.DomainEvent;
import io.vlingo.symbio.Source;
public interface Process<T> {
Chronicle<T> chronicle();
String id();
void process(final Command command);
<R> Completes<R> process(final Command command, final Supplier<R> andThen);
void process(final DomainEvent event);
<R> Completes<R> process(final DomainEvent event, final Supplier<R> andThen);
<C> void processAll(final List<Source<C>> sources);
<C,R> Completes<R> processAll(final List<Source<C>> sources, final Supplier<R> andThen);
void send(final Command command);
void send(final DomainEvent event);
}
As expected with the VLINGO XOOM platform, the protocol is quite simple. The operations work as described in the following subsections.
Returns the current state of the process as a
Chronicle<S>
, where S
is the type of the state. Since a Process
is a state machine, the current state held by the Chronicle<S>
is the state of the state machine.public class Chronicle<S> {
public final S state;
public Chronicle(final S state) {
this.state = state;
}
public Chronicle<S> transitionTo(final S state) {
return new Chronicle<>(state);
}
}
The API makes no assumptions that the full state itself will be persisted, but makes it possible. Since the
Process
may be CommandSourced
or EventSourced
, or a combination of both, it could well be that there is no need to use the Chronicle<S>
for full-state persistence. In such cases the Process may require minimal state transitions, and thus any state-representing stream may not be large enough to justify the use of snapshots.You use
transitionTo()
to create a new Chronicle<S>
that wraps the new state of the state machine.It is meant for you to extend the
Chronicle<S>
to manage your more fine-grained state transitions.Returns the global unique identity of the
Process
, which should likely be the id()
or streamName()
of the underlying base types: SourcedProcess
, ObjectProcess
, StatefulProcess
.Wraps the
Command
inside a ProcessMessage
and persists it to the underlying storage, and then sends it through the configured message exchange.Performs as does
process(Command)
, and also returns the Completes<R>
that provides the eventual outcome of the Supplier<R>
parameter andThen
. After the Command
is safely persisted, the andThen
is executed and its value is given as outcome of the Completes<R>
.Wraps the
DomainEvent
inside a ProcessMessage
and persists it to the underlying storage, and then sends it through the configured message exchange.Performs as does
process(DomainEvent)
, and also returns the Completes<R>
that provides the eventual outcome of the Supplier<R>
parameter andThen
. After the DomainEvent
is safely persisted, the andThen
is executed and its value is given as outcome of the Completes<R>
.Wraps each of the
Source<C>
instances inside its own ProcessMessage
and persists all of them to the underlying storage, and then sends each through the configured message exchange.Performs as does
processAll(List<Source<C>>)
, and also returns the Completes<R>
that provides the eventual outcome of the Supplier<R>
parameter andThen
. After all Source<C>
instances are safely persisted, the andThen
is executed and its value is given as outcome of the Completes<R>
.Note that the above six
process...()
operations support guaranteed at-least-once delivery as long as the persistence succeeds. Additionally, these operations provide traceability from/to where and why the messages were sent and delivered, and the persistence make the messages useful in the future. It is often impossible to predict all the uses of long-term persisted messages, but they have immediate purpose and generally future applicability.The following two
send()
operations will only succeed if the messaging mechanisms successfully enqueues the message and the queue/topic/exchange is durable. However, even if the send succeeds there is no traceability from/to where and why the message was sent and delivered, nor is the message persisted long-term. In fact, the durable queue (et al) will delete the message after its deliver is confirmed.The abstract base
Process
types—SourcedProcess
, ObjectProcess
, and StatefulProcess
—are described in the following subsections.To use a Command Sourced and/or Event Sourced Process type, extend the
SourcedProcess
. Recall from the above diagram that generally a process is stimulated by a DomainEvent
and then causes the next action by emitting a Command
. The fact that Command
instances are persisted to represent the process state, this makes the process Command Sourced, as in sourced by commands.public class ManagedOrderProcess extends SourcedProcess<OrderState> implements OrderProcess {
private OrderProcessState state;
public ManagedOrderProcess() {
super(); // uses address id as streamName
}
@Override
public void orderPlaced(Order order) {
state = OrderProcessState.with(order);
process(new AuthorizePayment(state.order.paymentInfo));
}
@Override
public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
state = state.with(paymentAuthorization);
process(new PickItems(state.order.items));
}
...
}
Note that the
DomainEvent
instances are not received directly by the process. Instead an exchange listener receives the DomainEvent, looks up the process, and dispatches one of the process protocol messages, such as orderPlaced()
and paymentAuthorized()
.The
StatefulProcess
is very similar to the SourcedProcess
, except that you must implement a few overrides necessary for the StatefulProcess
and underlying StatefulEntity
abstract base type.public class ManagedOrderProcess extends StatefulProcess<OrderState> implements OrderProcess {
private Chronicle<OrderProcessState> chronicle;
public ManagedOrderProcess() {
super(); // uses address id as streamName
}
@Override
public Chronicle<OrderProcessState> chronicle() {
return chronicle;
}
@Override
public void orderPlaced(Order order) {
state = OrderProcessState.with(order);
process(new AuthorizePayment(state.order.paymentInfo));
}
@Override
public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
state = state.with(paymentAuthorization);
process(new PickItems(state.order.items));
}
...
@Override
protected void state(final OrderProcessState state) {
this.chronicle = chronicle.transitionTo(state);
}
@Override
protected Class<OrderProcessState> stateType() {
return OrderProcessState.class;
}
}
The
chronicle()
is required by the StatefulProcess
, and the state()
and stateType()
are required by the StatefulEntity
.The
ObjectProcess
is very similar to the SourcedProcess
, except that you must implement a few overrides necessary for the ObjectProcess
and underlying ObjectEntity
abstract base type.public class ManagedOrderProcess extends ObjectProcess<OrderState> implements OrderProcess {
private Chronicle<OrderProcessState> chronicle;
public ManagedOrderProcess() {
super(); // uses address id as streamName
}
@Override
public Chronicle<OrderProcessState> chronicle() {
return chronicle;
}
@Override
public void orderPlaced(Order order) {
chronicle = new Chronicle(OrderProcessState.with(order));
process(new AuthorizePayment(state.order.paymentInfo));
}
@Override
public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
chronicle.state.setPaymentAuthorization(paymentAuthorization);
process(new PickItems(state.order.items));
}
...
@Override
protected OrderProcessState stateObject() {
return chronicle.state;
}
@Override
protected void state(final OrderProcessState state) {
this.chronicle = chronicle.transitionTo(state);
}
@Override
protected Class<OrderProcessState> stateType() {
return OrderProcessState.class;
}
}
The
chronicle()
is required by the ObjectProcess
, and the stateObject()
, state()
, and stateType()
are required by the ObjectEntity
.Last modified 1yr ago