Processes

XOOM LATTICE long-running process orchestrations, aka Sagas, with guided steps to results.

Processes can be used to manage complex, coarse-grained, long-running business transactions. Each step in the process is orchestrated until a given outcome is reached, which may be a successful completion.

Some processes may never end in that they may repeat indefinitely, producing one or more smaller subsets of successful outcomes with each iteration.

Because such a process runs longer than for a single transaction and may spread across any number of Bounded Contexts (i.e. microservices), and because the process may at some point be temporarily evicted from memory, the process itself must be persistent. The XOOM LATTICE tools assist in managing such processes, whether by means of object persistence, stateful command models, or sourced with events and commands.

Note that a long-running process is often referred to as Saga. Even so, we here acknowledge that this use of the name Saga to refer to such a process is not typically according to the original pattern definition, although it may be designed as such. Primarily this would be accomplished by emulating a rollback by applying compensating transactions to the point where any previous transactions are essentially undone. This is known as backward recovery, but there is also a second choice known as forward recovery, and the two may be combined.

How Processes Work

As a general rule, a process is informed of an outcome, and in reaction to that outcome causes another action to be taken. This will repeat as the process is informed of the outcome of the previous action that it caused, and then cause yet another action to be taken.

Typically a DomainEvent will trigger the process to start. The process will then emit a Command, which is to cause the next step in the process. When the DomainEvent from that step outcome is seen, process will again emit a Command, which is to cause the next step in the process. This repeats until the process has completed (if in fact it is meant to complete). The process tracks the current state of the state machine, which along with the next DomainEvent it sees, will determine the next step to be taken.

The above diagram shows how the process starts by a user submitting a command and then proceeds through five total steps. Note that a DomainEvent is seen by the process as a stimulus, and the process then emits a Command to cause the next action. This pattern repeats until completion is reached on the far right.

Process Types

A process is a finite state machine. XOOM LATTICE provides a protocol for processes, the Process.

There are three Process standard implementors that are abstract base types. These base classes are meant to be extended by concrete implementors.

Type

Description

SourcedProcess

Uses Command Sourcing and/or Event Sourcing to maintain the state of the process and to direct next steps.

ObjectProcess

Use a persistent object-relational mapped object to maintain the state of the state machine. The extender must still emit commands or events to direct next steps.

StatefulProcess

Use a persistent key-value to maintain the state of the state machine. The extender must still emit commands or events to direct next steps.

These are all documented in detail below.

Especially in the case of implementing a Process, the SourcedProcess is generally the best choice. The use of Command and/or DomainEvent types to direct the steps of the process are a natural fit. You may still, however, accomplish the same by using Command and/or DomainEvent types within an ObjectProcess and StatefulProcess. It may be more a matter of whether you prefer to always persist state snapshots, which you must always do with these two process types.

Process Protocol

The protocol for processes is defined in XOOM LATTICE.

package io.vlingo.lattice.model.process;

import java.util.List;
import java.util.function.Supplier;

import io.vlingo.common.Completes;
import io.vlingo.lattice.model.Command;
import io.vlingo.lattice.model.DomainEvent;
import io.vlingo.symbio.Source;

public interface Process<T> {

  Chronicle<T> chronicle();

  String id();

  void process(final Command command);

  <R> Completes<R> process(final Command command, final Supplier<R> andThen);

  void process(final DomainEvent event);

  <R> Completes<R> process(final DomainEvent event, final Supplier<R> andThen);

  <C> void processAll(final List<Source<C>> sources);

  <C,R> Completes<R> processAll(final List<Source<C>> sources, final Supplier<R> andThen);

  void send(final Command command);

  void send(final DomainEvent event);
}

As expected with the VLINGO XOOM platform, the protocol is quite simple. The operations work as described in the following subsections.

Chronicle<T> chronicle()

Returns the current state of the process as a Chronicle<S>, where S is the type of the state. Since a Process is a state machine, the current state held by the Chronicle<S> is the state of the state machine.

public class Chronicle<S> {
  public final S state;

  public Chronicle(final S state) {
    this.state = state;
  }

  public Chronicle<S> transitionTo(final S state) {
    return new Chronicle<>(state);
  }
}

The API makes no assumptions that the full state itself will be persisted, but makes it possible. Since the Process may be CommandSourced or EventSourced, or a combination of both, it could well be that there is no need to use the Chronicle<S> for full-state persistence. In such cases the Process may require minimal state transitions, and thus any state-representing stream may not be large enough to justify the use of snapshots.

You use transitionTo() to create a new Chronicle<S> that wraps the new state of the state machine.

It is meant for you to extend the Chronicle<S> to manage your more fine-grained state transitions.

String id()

Returns the global unique identity of the Process, which should likely be the id() or streamName() of the underlying base types: SourcedProcess, ObjectProcess, StatefulProcess.

void process(Command command)

Wraps the Command inside a ProcessMessage and persists it to the underlying storage, and then sends it through the configured message exchange.

<R> Completes<R> process(Command command, Supplier<R> andThen)

Performs as does process(Command), and also returns the Completes<R> that provides the eventual outcome of the Supplier<R> parameter andThen. After the Command is safely persisted, the andThen is executed and its value is given as outcome of the Completes<R>.

void process(DomainEvent event)

Wraps the DomainEvent inside a ProcessMessage and persists it to the underlying storage, and then sends it through the configured message exchange.

<R> Completes<R> process(DomainEvent event, Supplier<R> andThen)

Performs as does process(DomainEvent), and also returns the Completes<R> that provides the eventual outcome of the Supplier<R> parameter andThen. After the DomainEvent is safely persisted, the andThen is executed and its value is given as outcome of the Completes<R>.

<C> void processAll(List<Source<C>> sources)

Wraps each of the Source<C> instances inside its own ProcessMessage and persists all of them to the underlying storage, and then sends each through the configured message exchange.

<C,R> Completes<R> processAll(List<Source<C>> sources, Supplier<R> andThen)

Performs as does processAll(List<Source<C>>), and also returns the Completes<R> that provides the eventual outcome of the Supplier<R> parameter andThen. After all Source<C> instances are safely persisted, the andThen is executed and its value is given as outcome of the Completes<R>.

Note that the above six process...() operations support guaranteed at-least-once delivery as long as the persistence succeeds. Additionally, these operations provide traceability from/to where and why the messages were sent and delivered, and the persistence make the messages useful in the future. It is often impossible to predict all the uses of long-term persisted messages, but they have immediate purpose and generally future applicability.

The following two send() operations will only succeed if the messaging mechanisms successfully enqueues the message and the queue/topic/exchange is durable. However, even if the send succeeds there is no traceability from/to where and why the message was sent and delivered, nor is the message persisted long-term. In fact, the durable queue (et al) will delete the message after its deliver is confirmed.

void send(Command command)

Sends the Command through the configured message exchange without first persisting it.

void send(DomainEvent event)

Sends the DomainEvent through the configured message exchange without first persisting it.

Using the Process Types

The abstract base Process types—SourcedProcess, ObjectProcess, and StatefulProcess—are described in the following subsections.

SourcedProcess

To use a Command Sourced and/or Event Sourced Process type, extend the SourcedProcess. Recall from the above diagram that generally a process is stimulated by a DomainEvent and then causes the next action by emitting a Command. The fact that Command instances are persisted to represent the process state, this makes the process Command Sourced, as in sourced by commands.

public class ManagedOrderProcess extends SourcedProcess<OrderState> implements OrderProcess {
  private OrderProcessState state;
  
  public ManagedOrderProcess() {
    super(); // uses address id as streamName
  }
  
  @Override
  public void orderPlaced(Order order) {
    state = OrderProcessState.with(order);
    process(new AuthorizePayment(state.order.paymentInfo));
  }
  
  @Override
  public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
    state = state.with(paymentAuthorization);
    process(new PickItems(state.order.items));
  }
  
  ...
}

Note that the DomainEvent instances are not received directly by the process. Instead an exchange listener receives the DomainEvent, looks up the process, and dispatches one of the process protocol messages, such as orderPlaced() and paymentAuthorized().

StatefulProcess

The StatefulProcess is very similar to the SourcedProcess, except that you must implement a few overrides necessary for the StatefulProcess and underlying StatefulEntity abstract base type.

public class ManagedOrderProcess extends StatefulProcess<OrderState> implements OrderProcess {
  private Chronicle<OrderProcessState> chronicle;
  
  public ManagedOrderProcess() {
    super(); // uses address id as streamName
  }
  
  @Override
  public Chronicle<OrderProcessState> chronicle() {
    return chronicle;
  }
  
  @Override
  public void orderPlaced(Order order) {
    state = OrderProcessState.with(order);
    process(new AuthorizePayment(state.order.paymentInfo));
  }
  
  @Override
  public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
    state = state.with(paymentAuthorization);
    process(new PickItems(state.order.items));
  }

  ...

  @Override
  protected void state(final OrderProcessState state) {
    this.chronicle = chronicle.transitionTo(state);
  }

  @Override
  protected Class<OrderProcessState> stateType() {
    return OrderProcessState.class;
  }
}

The chronicle() is required by the StatefulProcess, and the state() and stateType() are required by the StatefulEntity.

ObjectProcess

The ObjectProcess is very similar to the SourcedProcess, except that you must implement a few overrides necessary for the ObjectProcess and underlying ObjectEntity abstract base type.

public class ManagedOrderProcess extends ObjectProcess<OrderState> implements OrderProcess {
  private Chronicle<OrderProcessState> chronicle;
  
  public ManagedOrderProcess() {
    super(); // uses address id as streamName
  }
  
  @Override
  public Chronicle<OrderProcessState> chronicle() {
    return chronicle;
  }
  
  @Override
  public void orderPlaced(Order order) {
    chronicle = new Chronicle(OrderProcessState.with(order));
    process(new AuthorizePayment(state.order.paymentInfo));
  }
  
  @Override
  public void paymentAuthorized(PaymentAuthorization paymentAuthorization) {
    chronicle.state.setPaymentAuthorization(paymentAuthorization);
    process(new PickItems(state.order.items));
  }

  ...

  @Override
  protected OrderProcessState stateObject() {
    return chronicle.state;
  }

  @Override
  protected void state(final OrderProcessState state) {
    this.chronicle = chronicle.transitionTo(state);
  }

  @Override
  protected Class<OrderProcessState> stateType() {
    return OrderProcessState.class;
  }
}

The chronicle() is required by the ObjectProcess, and the stateObject(), state(), and stateType() are required by the ObjectEntity.

Last updated