Scooter

A VLINGO XOOM toolset for implementing Actors and DDD entities/aggregates with persistence using a familiar blocking paradigm, supporting stepwise adoption to our full Reactive toolset.

A Gentle Introduction to Async

Admittedly most who refrain from the use of Reactive architectures, programming, and related tools, are generally concerned about the learning curve being too steep. Although a learning curve is indeed required, it's not as steep as most think. Whatever your expectations, we have provided a gentle path toward adopting Reactive. We think these will help you ease into Reactive.

Scooter offers a typical blocking paradigm for the purpose of gently introducing the tools and patterns generally used with DOMA and DDD.

Modeling With Entities

In a typical object-based application or service that uses POJOs, the lifecycle of entities is different from that of actor-based entities. Here we can assume that Entity and Aggregate as defined by DDD are interchangeable; that is, what follows deals with both concepts.

POJO Lifecycles

The following describes the typical lifecycle of a POJO Entity, which is what you will find when using Scooter:

  • Non-existing Entity states are newly constructed and then persisted to storage

  • Preexisting Entity states are reconstituted from storage, modified, and then persisted back to the same storage (updated).

  • Across a single JVM or a multi-JVM cluster, there may be any number of duplicated Entity instances. This implies that any of the duplicate instances may simultaneously have different operations performed by different users. In such cases the database must provide optimistic concurrency and detect concurrency violations via state versions.

  • The object reference is released after Entity persistence and the instance garbage collected.

To contrast this with the advantages of using Reactive Entities by means of XOOM Lattice, please see the related documentation.

Implementing an ObjectEntity

Implementing a concrete entity that uses object-relational mapping requires that you extend the ObjectEntity type. First create a protocol in the same manner that would be used for a corresponding Reactive entity.

import io.vlingo.xoom.lattice.model.DomainEvent;

public interface Employee {
  static Employee define() {
    return new EmployeeEntity();
  }

  void assign(final String number);
  void adjust(final int salary);
  void hire(final String number, final int salary);

  public static final class EmployeeHired extends DomainEvent { ... }
  public static final class EmployeeSalaryAdjusted extends DomainEvent { ... }
  public static final class EmployeeNumberAssigned extends DomainEvent { ... }
}

Next implement the concrete entity that implements the Employee protocol.

import io.vlingo.xoom.lattice.model.DomainEvent;
import io.vlingo.xoom.turbo.scooter.model.object.ObjectEntity;

public class EmployeeEntity extends ObjectEntity<EmployeeState,DomainEvent> implements Employee {
  private EmployeeState employee;

  public EmployeeEntity() {
    this.employee = new EmployeeState(); // new; identified in state ctor
  }

  public EmployeeEntity(final long id) {
    this.employee = new EmployeeState(id); // recovery; see stateObject()
  }

  @Override
  public void assign(final String number) {
    apply(employee.with(number), new EmployeeNumberAssigned(...));
  }

  @Override
  public void adjust(final int salary) {
    apply(employee.with(salary), new EmployeeSalaryAdjusted(...));
  }

  @Override
  public void hire(final String number, final int salary) {
    apply(employee.with(number).with(salary), new EmployeeHired(...));
  }

  @Override
  public String id() {
    return String.valueOf(employee.persistenceId());
  }

  @Override
  protected void stateObject(final EmployeeState stateObject) {
    this.employee = stateObject;
  }
}

When reconstituting the EmployeeEntity, the EmployeeState will be read by the OR-Mapping EmployeeRepository and set in the EmployeeEntity.

The EmployeeEntity uses the EmployeeState type as the actual persistent object, and all mutations will be reflected in transitioning that type.

import io.vlingo.xoom.common.identity.IdentityGenerator;

import java.util.concurrent.atomic.AtomicLong;

public class EmployeeState extends StateObject implements Comparable<EmployeeState> {
  private static final long serialVersionUID = 1L;

  private static final IdentityGenerator identityGenerator = new IdentityGenerator();

  public final int salary;
  public final String number;

  public EmployeeState(final String number, final int salary) {
    this(identityGenerator.nextLong(), number, salary);
  }

  public EmployeeState(final String id, final String number, final int salary) {
    super(id);
    this.number = number;
    this.salary = salary;
  }

  public EmployeeState(final String id) {
    this(id, null, 0);
  }

  EmployeeState() {
    super(identityGenerator.nextLong());
    this.number = "";
    this.salary = 0;
  }

  public EmployeeState with(final String number) {
    return new EmployeeState(this.persistenceId(), number, salary);
  }

  public EmployeeState with(final int salary) {
    return new EmployeeState(this.persistenceId(), number, salary);
  }

  @Override
  public int hashCode() {
    return 31 * number.hashCode() * salary;
  }

  @Override
  public boolean equals(final Object other) {
    if (other == null || other.getClass() != getClass()) {
      return false;
    } else if (this == other) {
      return true;
    }

    final EmployeeState otherPerson = (EmployeeState) other;

    return this.persistenceId() == otherPerson.persistenceId();
  }

  @Override
  public String toString() {
    return "EmployeeState[persistenceId=" + persistenceId() + " number=" + number + " salary=" + salary + "]";
  }

  @Override
  public int compareTo(final EmployeeState otherPerson) {
    return Long.compare(this.persistenceId(), otherPerson.persistenceId());
  }
}

To persist an instance of the EmployeeEntity's EmployeeState to the database, use it's repository. Although we provide abstract bases of both JournalRepository and StatefulRepository, we can't do so for ObjectRepository due to the numerous different ways that Object-Relational Mapping can be implemented.

Implementing Sourced Entities

Implementing a concrete entity that uses Event Sourcing requires that you extend one of the SourcedEntity base types, such as EventSourcedEntity or CommandSourcedEntity. First create a protocol in the same manner that would be used for a corresponding Reactive entity.

public interface Product {
  static Product define(final String type, final String category, final String name, final String description, final long price) {
    return new ProductEntity(type, category, name, description, price);
  }

  void adjustPrice(final long price);
  void changeDescription(final String description);
  void rename(final String name);
}

Next implement the concrete entity that implements the Product protocol.

import io.vlingo.xoom.turbo.scooter.model.sourced.EventSourcedEntity;

public class ProductEntity extends EventSourcedEntity implements Product {
  private String category;
  private String type;
  public String name;
  public String description;
  public long price;

  public ProductEntity(final String type, final String category, final String name, final String description, final long price) {
    apply(new ProductDefined(id(), type, category, name, description, price));
  }

  @Override
  public String id() {
    return streamName();
  }

  @Override
  public void adjustPrice(final long price) {
    apply(new ProductPriceAdjusted(id(), price));
  }

  @Override
  public void changeDescription(final String description) {
    apply(new ProductDescriptionChanged(id(), description));
  }

  @Override
  public void rename(final String name) {
    apply(new ProductRenamed(id(), name));
  }

  // INTERNAL USE ONLY: used by repository
  public ProductEntity(final List<Source<DomainEvent>> eventStream, final int streamVersion) {
    super(eventStream, streamVersion);
  }

  protected void whenProductDefined(final ProductDefined event) {
    this.name = event.name;
    this.description = event.description;
    this.price = event.price;
  }

  protected void whenProductDescriptionChanged(final ProductDescriptionChanged event) {
    this.description = event.description;
  }

  protected void whenProductPriceAdjusted(final ProductPriceAdjusted event) {
    this.price = event.price;
  }

  protected void whenProductRenamed(final ProductRenamed event) {
    this.name = event.name;
  }

  static {
    registerConsumer(Product.class, ProductDefined.class, Product::whenProductDefined);
    registerConsumer(Product.class, ProductDescriptionChanged.class, Product::whenProductDescriptionChanged);
    registerConsumer(Product.class, ProductPriceAdjusted.class, Product::whenProductPriceAdjusted);
    registerConsumer(Product.class, ProductRenamed.class, Product::whenProductRename);
  }
}

Note that this ProductEntity does not use an internal state type, although it could. There is an internal constructor that is to be used only by the repository, although it must be public since the implementations of ProductEntity and the repository are in two different packages. The "when" methods, which mutate the internal state of the entity as each event is applied, are registered in the static initializer and dispatched to by the base class just as they are with the corresponding Reactive entity types.

The following JournalProductRepository extends the Scooter JournalRepository and implements the ProductRepository.

import io.vlingo.xoom.symbio.store.journal.Journal;
import io.vlingo.xoom.turbo.scooter.persistence.JournalRepository;

public class JournalProductRepository extends JournalRepository implements ProductRepository {
  private final EntryAdapterProvider entryAdapterProvider;
  private final Journal<String> journal;
  private final StreamReader<String> streamReader;

  @SuppressWarnings({ "rawtypes", "unchecked" })
  public JournalProductRepository(final Journal<String> journal) {
    this.journal = journal;
    this.streamReader = journal.streamReader("ProductStreams").await();
    this.entryAdapterProvider = PersistenceInitializer.instance().entryAdapterProvider;

    final EntryAdapter entryAdapter = PersistenceInitializer.instance().defaultEntryAdapter;

    entryAdapterProvider.registerAdapter(ProductDefined.class, entryAdapter);
    entryAdapterProvider.registerAdapter(ProductDescriptionChanged.class, entryAdapter);
    entryAdapterProvider.registerAdapter(ProductPriceAdjusted.class, entryAdapter);
    entryAdapterProvider.registerAdapter(ProductRenamed.class, entryAdapter);
  }

  @Override
  public Product productOf(final String productId) {
    final EntityStream<String> stream = streamReader.streamFor(productId).await();
    final List<Source<DomainEvent>> sources = entryAdapterProvider.asSources(stream.entries);
    return new ProductEntity(sources, stream.streamVersion);
  }

  @Override
  public void save(final Product product) {
    final AppendInterest interest = appendInterest();
    final ProductEntity entity = (ProductEntity) product;
    journal.appendAll(entity.id(), entity.nextVersion(), entity.applied().sources(), interest, null);
    await(interest);
  }
}

Note specifically the various uses of await in the repository implementation. This is to address the otherwise asynchronous nature of XOOM Lattice and XOOM Symbio. See Completes::await() and JournalRepository::await(AppendInterest).

Implementing a StatefulEntity

Implementing a concrete entity that uses key-value/state storage requires that you extend the StatefulEntity type. First create a protocol in the same manner that would be used for a corresponding Reactive entity.

public static interface Person {
  static Person define(final Name name, final int age) {
    return new PersonEntity(new PersonState(name, age));
  }

  void changeName(final Name name);
  void increaseAge();
}

Next implement the concrete entity that implements the Person protocol.

import io.vlingo.xoom.turbo.scooter.model.stateful.StatefulEntity;

public static class PersonEntity extends StatefulEntity<PersonState,DomainEvent> implements Person {
  private PersonState state;

  public PersonEntity(final PersonState state) {
    this.state = state;
  }

  @Override
  public void changeName(final Name name) {
    apply(state.withName(name));
  }

  @Override
  public void increaseAge() {
    apply(state.withAge(state.age + 1));
  }

  // StatefulEntity

  @Override
  public String id() {
    return state.id;
  }

  @Override
  public PersonState state() {
    return state;
  }

  @Override
  protected void state(final PersonState state) {
    this.state = state;
  }
}

The PersonEntity uses the PersonState type as the actual persistent object, and all mutations will be reflected in transitioning that type. Note that the PersonEntity does not emit domain events when it applies new state, although it is fully supported if the implementation uses events.

public static class PersonState {
  public final String id;
  public final String name;
  public final int age;

  public PersonState(final String id, final String name, final int age) {
    this.id = id;
    this.name = name;
    this.age = age;
  }

  public PersonState(final String id) {
    this(id, null, 0);
  }

  public PersonState copy() {
    return new PersonState(id, name, age);
  }

  public boolean hasState() {
    return id != null && name != null && age > 0;
  }

  public PersonState withName(final String name) {
    return new PersonState(this.id, name, this.age);
  }

  public PersonState withAge(final int age) {
    return new PersonState(this.id, this.name, age);
  }

  @Override
  public boolean equals(final Object other) {
    if (other == null || other.getClass() != this.getClass()) {
      return false;
    }
    final PersonState otherState = (PersonState) other;
    return this.id.equals(otherState.id);
  }

  @Override
  public String toString() {
    return "PersonState[id=" + id + " name=" + name + " age=" + age + "]";
  }
}

The following PersonRepository extends the Scooter StatefulRepository.

import io.vlingo.xoom.symbio.store.state.StateStore;
import io.vlingo.xoom.turbo.scooter.persistence.StatefulRepository;

public class PersonRepository extends StatefulRepository {
  private final StateStore store;

  public PersonRepository(final StateStore store) {
    this.store = store;
  }

  public Person personOf(final String id) {
    final ReadInterest interest = readInterest();
    store.read(id, PersonState.class, interest);
    return await(interest);
  }

  public void save(final Person person) {
    final WriteInterest interest = writeInterest();
    final PersonEntity entity = (PersonEntity) person;
    final PersonState state = entity.state();
    store.write(state.id, state, entity.currentVersion(), interest);
    await(interest);
  }
}

Note specifically the uses of await in the repository implementation. This is to address the otherwise asynchronous nature of XOOM Lattice and XOOM Symbio. See the source ofStatefulRepository for uses of await(ReadInterest) and await(WriteInterest). The await(ReadInterest) returns the result of the StateStore::read() once it has completed.

Persistence

Use the JournalRepository abstract base class to implement repositories for SourcedEntity types that use Event Sourcing, Command Sourcing, or another type. You can see an implementation in the previous section.

import io.vlingo.xoom.symbio.store.journal.Journal;
import io.vlingo.xoom.turbo.scooter.persistence.JournalRepository;

public class JournalProductRepository extends JournalRepository implements ProductRepository {
  private final EntryAdapterProvider entryAdapterProvider;
  private final Journal<String> journal;
  private final StreamReader<String> streamReader;

  @SuppressWarnings({ "rawtypes", "unchecked" })
  public JournalProductRepository(final Journal<String> journal) {
    this.journal = journal;
    this.streamReader = journal.streamReader("ProductStreams").await();
    this.entryAdapterProvider = PersistenceInitializer.instance().entryAdapterProvider;
    ...
  }
  ...
}

Use the StatefulRepository abstract base class to implement repositories for StatefulEntity state types. You can see an implementation in the previous section.

import io.vlingo.xoom.symbio.store.state.StateStore;
import io.vlingo.xoom.turbo.scooter.persistence.StatefulRepository;

public class PersonRepository extends StatefulRepository {
  private final StateStore store;

  public PersonRepository(final StateStore store) {
    this.store = store;
  }
  ...
}

Although we provide abstract bases of both JournalRepository and StatefulRepository, we can't do so for an ObjectRepository due to the numerous different ways that Object-Relational Mapping can be implemented. You may follow one of the styles exemplified in Implementing Domain-Driven Design.

Blocking Mailbox

If you consider asynchronous actor messaging to be daunting, why not trying synchronous actor messaging?

The tool provided is a specialized actor mailbox. You can read more about actors and the role of their mailboxes here and here. The XOOM Actors tooling supports any number of mailbox plugins. The particular one provided by Scooter is a blocking mailbox that requires the actor to handle the message before the sender receives control again.

import io.vlingo.xoom.turbo.scooter.plugin.mailbox.blocking.BlockingMailbox;

This mailbox works by delivering a message to the target actor immediately rather than leaving the message for another thread to deliver. This allows you to get the feel of actor-based programming but without the unfamiliar nuances of asynchrony.

There is a potential problem with this. When you consider a request-response example, where the actor must send a message back to its sender. What would happen if the request handling actor and the response handling actor continued indefinitely sending each other messages rather than stopping after one? Correct, an ugly stack overflow would soon happen, or even re-entering an actor on the same thread and potentially modifying its state unexpectedly before the first message delivery has returned.

To prevent this the blocking mailbox does in fact implement a queue. It protects access to the queue using a compare-and-set operation. This limits the queue polling to only the first arriving enqueuing access. Think of the same thread that delivers to the request actor, next delivering a message to the response actor on the same thread. As described previously, this could cause a number of problems. So, what should be done?

public class BlockingMailbox implements Mailbox {
  ...
  public void send(final Message message) {
    if (isClosed()) return;

    queue.add(message);

    if (isSuspended()) {
      return;
    }

    try {
      boolean deliver = true;

      while (deliver) {
        if (delivering.compareAndSet(false, true)) {
          while (deliverAll())
            ;
          delivering.set(false);
        }
        deliver = false;
      }
    } catch (Throwable t) {
      if (delivering.get()) {
        delivering.set(false);
      }
      throw new RuntimeException(t.getMessage(), t);
    }
  }
  ...
}

Consider the above send(Message message) method of the BlockingMailbox. The mailbox invocation that occurs first locks the mailbox queue, but without blocking another attempt to lock. The compare-and-set prevents blocking on any secondary deliveries. So a second, third, forth, etc., delivery on the same thread enqueues the message, and when it sees that access is already reserved, it simply returns. When the message deliveries (method invocations) unwind, the original access will see all additional messages enqueued and deliver them. This could go on for a long time without causing issues, as long as the stack is given the opportunity to unwind before the queue causes an out-of-memory condition.

We suggest that you not plan to use the BlockingMailbox for all future development. It is provided more as a learning tool, rather than a production worthy tool. Using it in production for most or all actors gives you no advantages over a normal blocking paradigm with direct method invocations. Instead, we suggest that you use the above blocking entity types since they take advantage of strengths of direct method invocations.

Last updated