Routers

Use routers to offload command and event dispatching toward an eventual outcome.

Message-driven command routing to handlers offloads the responsibility of dispatching potential effects to the domain model. When using CQRS the model is known as the Command Model or Write Model.

The following shows a fully asynchronous/concurrent command routing component available with XOOM Lattice.

See the package io.vlingo.xoom.lattice.router:

Type

Description

CommandRouter

The interface protocol for command routers. Provides a static factory method to create any of three types of routers:

  • LoadBalancingCommandRouter

  • PartitioningCommandRouter

  • RoundRobinCommandRouter

All concrete CommandRouter types must implement this protocol:

void route(final RoutableCommand command)

CommandDispatcher<P, C extends Command, A>

All commands routed through a CommandRouter must implement the CommandDispatcher interface. The P is the type name of the protocol to which the command will be sent. The C is the type name of this command type, which must be a subclass of io.vlingo.xoom.lattice.model.Command. The A is the answer type to be provided by the command, and is normally a Completes<T> where T is the type to be answered asynchronously.

RoutableCommand

A command to be routed through a defined CommandRouter. This is a standard container for a concrete command type. In other words, create a command type, such as Rename, that implements the protocol CommandDispatcher and instantiate a new RoutableCommand with the Rename command inside. The RoutableCommand is sent through the CommandRouter and the Rename command internally knows how to send a message to its P generic protocol type parameter.

The next section explains how to use these types and components.

Commands and Dispatching

You can see examples of this use in the XOOM Schemata implementation source code. The following demonstrates how a single command set is defined and used by Schemata.

Instantiating the Command

This examines the XOOM Schemata command set that support schema definitions. Since the command types are somewhat redundant other than the parameter details, the entire source is not shown.

As a design choice, all commands for a given feature are defined inside an outer parent class. In the case of schema definition commands, the parent class is SchemaCommands. Inside SchemaCommands where are a number of concrete command types. The SchemaCommands is a factory that instantiates all the concrete command types as needed. See the main source for full details, such as all necessary imports, etc.

package io.vlingo.xoom.schemata.resource;

import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.lattice.model.Command;
import io.vlingo.xoom.lattice.router.CommandDispatcher;
import io.vlingo.xoom.lattice.router.CommandRouter;
import io.vlingo.xoom.lattice.router.CommandRouter.Type;
import io.vlingo.xoom.lattice.router.RoutableCommand;
...

class SchemaCommands {
  private final CommandRouter router;
  private final Stage stage;

  SchemaCommands(final Stage stage, final int routees) {
    this.stage = stage;
    this.router = CommandRouter.of(stage, Type.LoadBalancing, routees);
  }
  ...
}

The SchemaCommands parent is instantiated with the Stage and the number of routees to be create for the router. The constructor creates a new CommandRouter that is the type used for LoadBalancing.

Once the SchemaCommands instance exists, it can be used to instantiate individual command instances. The first factory used is categorizeAs() that instantiates a CategorizeAs that is wrapped in a RoutableCommand.

class SchemaCommands {
  ...
  RoutableCommand<Schema,CategorizeAs,SchemaState> categorizeAs(
          final SchemaId schemaId,
          final Category category) {

    final CategorizeAs categorizedAs = new CategorizeAs(category);

    RoutableCommand<Schema,CategorizeAs,SchemaState> command =
            RoutableCommand
              .speaks(Schema.class)
              .to(SchemaEntity.class)
              .at(schemaId.value)
              .createsWith(Definition.parameters(schemaId))
              .named(Schema.nameFrom(schemaId))
              .delivers(categorizedAs)
              .answers(Completes.using(stage.scheduler()))
              .handledBy(categorizedAs);

    router.route(command);

    return command;
  }
  ...
}

The RoutableCommand generic type parameters are as follows:

  1. The P protocol is the Schema domain model interface.

  2. The C command type is Command subclass CategorizeAs.

  3. The A answer type is SchemaState, which is the type used to hold and transfer the Schema type's immutable state data.

Note that the SchemaCommands does not provide a factory method for the initial Schema type instance. That's because we use a factory method on the protocol type interface itself that's used to instantiate the initial Schema domain model object. The protocol commands other than the factory methods are provided.

The parameters to the command are SchemaId and Category. First the CategorizeAs concrete command type itself is instantiated with the two parameters. Next the instance of CategorizeAs is wrapped by a new RoutableCommand , and it is then routed. Finally the RoutableCommand is answered from the factory.

The RoutableCommand answer is used to receive the SchemaState instance when it arrives by way of the Completes<SchemaState>. The REST resource handler type SchemaResource initially received an HTTP PATCH request and used the SchemaCommands factory method categorizeAs() in order to create the RoutableCommand. Once the RoutableCommand is returned to the SchemaResource request handler, the request handler registers a function on the Completes<SchemaState> that will deliver the eventual outcome when it arrives.

public class SchemaResource extends DynamicResourceHandler {
  private final Grid grid;
  private final SchemaCommands commands;
  private final SchemaQueries queries;

  public SchemaResource(final Grid grid) {
    super(grid.world().stage());
    this.grid = grid;
    this.commands = new SchemaCommands(grid, 10);
    this.queries = StorageProvider.instance().schemaQueries;
  }
  ...
  public Completes<Response> categorizeAs(final String organizationId, final String unitId, final String contextId, final String schemaId, final String category) {
    return commands
            .categorizeAs(SchemaId.existing(organizationId, unitId, contextId, schemaId), Category.valueOf(category)).answer()
            .andThenTo(state -> Completes.withSuccess(Response.of(Ok, entityResponseOf(serialized(SchemaData.from(state))))));
  }
  ...
}

When the function is executed, the final outcomes is produced and in turn is put into a completes to be used by the XOOM HTTP server as a response to the client.

Dispatching the Command

The CategorizeAs command type itself plays and important role beyond carrying a parameter payload. When the router is ready to delivery the command, it uses the command itself to perform the dispatching.

public class SchemaResource extends DynamicResourceHandler {
    ...
    private static class CategorizeAs extends Command implements CommandDispatcher<Schema,CategorizeAs,Completes<SchemaState>> {
    private final Category category;

    CategorizeAs(final Category category) {
      this.category = category;
    }

    @Override
    public void accept(final Schema protocol, final CategorizeAs command, final Completes<SchemaState> answer) {
      protocol.categorizeAs(command.category).andThen(state -> answer.with(state));
    }
  }
  ...
}

To actually deliver the CategorizeAs command to its destination, the router uses the accept() on the CategorizeAs concrete CommandDispatcher, a Java functional interface, to do so. The accept() method sends the categorizeAs() message and parameters to the SchemaEntity actor through its registered protocol.

Other Examples

See the full XOOM Schemata source code for more examples.

Last updated