Routers
Use routers to offload command and event dispatching toward an eventual outcome.
Message-driven command routing to handlers offloads the responsibility of dispatching potential effects to the domain model. When using CQRS the model is known as the Command Model or Write Model.
The following shows a fully asynchronous/concurrent command routing component available with XOOM Lattice.
See the package
io.vlingo.xoom.lattice.router
:Type | Description |
CommandRouter | The interface protocol for command routers. Provides a static factory method to create any of three types of routers:
All concrete CommandRouter types must implement this protocol: void route(final RoutableCommand command) |
CommandDispatcher<P, C extends Command, A> | All commands routed through a CommandRouter must implement the CommandDispatcher interface. The P is the type name of the protocol to which the command will be sent. The C is the type name of this command type, which must be a subclass of io.vlingo.xoom.lattice.model.Command . The A is the answer type to be provided by the command, and is normally a Completes<T> where T is the type to be answered asynchronously. |
RoutableCommand | A command to be routed through a defined CommandRouter . This is a standard container for a concrete command type. In other words, create a command type, such as Rename , that implements the protocol CommandDispatcher and instantiate a new RoutableCommand with the Rename command inside. The RoutableCommand is sent through the CommandRouter and the Rename command internally knows how to send a message to its P generic protocol type parameter. |
The next section explains how to use these types and components.
You can see examples of this use in the XOOM Schemata implementation source code. The following demonstrates how a single command set is defined and used by Schemata.
This examines the XOOM Schemata command set that support schema definitions. Since the command types are somewhat redundant other than the parameter details, the entire source is not shown.
As a design choice, all commands for a given feature are defined inside an outer parent class. In the case of schema definition commands, the parent class is
SchemaCommands
. Inside SchemaCommands
where are a number of concrete command types. The SchemaCommands
is a factory that instantiates all the concrete command types as needed. See the main source for full details, such as all necessary imports, etc.package io.vlingo.xoom.schemata.resource;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.lattice.model.Command;
import io.vlingo.xoom.lattice.router.CommandDispatcher;
import io.vlingo.xoom.lattice.router.CommandRouter;
import io.vlingo.xoom.lattice.router.CommandRouter.Type;
import io.vlingo.xoom.lattice.router.RoutableCommand;
...
class SchemaCommands {
private final CommandRouter router;
private final Stage stage;
SchemaCommands(final Stage stage, final int routees) {
this.stage = stage;
this.router = CommandRouter.of(stage, Type.LoadBalancing, routees);
}
...
}
The
SchemaCommands
parent is instantiated with the Stage
and the number of routees
to be create for the router. The constructor creates a new CommandRouter
that is the type used for LoadBalancing
.Once the
SchemaCommands
instance exists, it can be used to instantiate individual command instances. The first factory used is categorizeAs()
that instantiates a CategorizeAs
that is wrapped in a RoutableCommand
.class SchemaCommands {
...
RoutableCommand<Schema,CategorizeAs,SchemaState> categorizeAs(
final SchemaId schemaId,
final Category category) {
final CategorizeAs categorizedAs = new CategorizeAs(category);
RoutableCommand<Schema,CategorizeAs,SchemaState> command =
RoutableCommand
.speaks(Schema.class)
.to(SchemaEntity.class)
.at(schemaId.value)
.createsWith(Definition.parameters(schemaId))
.named(Schema.nameFrom(schemaId))
.delivers(categorizedAs)
.answers(Completes.using(stage.scheduler()))
.handledBy(categorizedAs);
router.route(command);
return command;
}
...
}
The
RoutableCommand
generic type parameters are as follows:- 1.The
P
protocol is theSchema
domain model interface. - 2.The
C
command type isCommand
subclassCategorizeAs
. - 3.The A answer type is
SchemaState
, which is the type used to hold and transfer theSchema
type's immutable state data.
Note that the
SchemaCommands
does not provide a factory method for the initial Schema
type instance. That's because we use a factory method on the protocol type interface itself that's used to instantiate the initial Schema
domain model object. The protocol commands other than the factory methods are provided.The parameters to the command are
SchemaId
and Category
. First the CategorizeAs
concrete command type itself is instantiated with the two parameters. Next the instance of CategorizeAs
is wrapped by a new RoutableCommand
, and it is then routed. Finally the RoutableCommand
is answered from the factory.The
RoutableCommand
answer is used to receive the SchemaState
instance when it arrives by way of the Completes<SchemaState>
. The REST resource handler type SchemaResource
initially received an HTTP PATCH
request and used the SchemaCommands
factory method categorizeAs()
in order to create the RoutableCommand
. Once the RoutableCommand
is returned to the SchemaResource
request handler, the request handler registers a function on the Completes<SchemaState>
that will deliver the eventual outcome when it arrives.public class SchemaResource extends DynamicResourceHandler {
private final Grid grid;
private final SchemaCommands commands;
private final SchemaQueries queries;
public SchemaResource(final Grid grid) {
super(grid.world().stage());
this.grid = grid;
this.commands = new SchemaCommands(grid, 10);
this.queries = StorageProvider.instance().schemaQueries;
}
...
public Completes<Response> categorizeAs(final String organizationId, final String unitId, final String contextId, final String schemaId, final String category) {
return commands
.categorizeAs(SchemaId.existing(organizationId, unitId, contextId, schemaId), Category.valueOf(category)).answer()
.andThenTo(state -> Completes.withSuccess(Response.of(Ok, entityResponseOf(serialized(SchemaData.from(state))))));
}
...
}
When the function is executed, the final outcomes is produced and in turn is put into a completes to be used by the XOOM HTTP server as a response to the client.
The
CategorizeAs
command type itself plays and important role beyond carrying a parameter payload. When the router is ready to delivery the command, it uses the command itself to perform the dispatching.public class SchemaResource extends DynamicResourceHandler {
...
private static class CategorizeAs extends Command implements CommandDispatcher<Schema,CategorizeAs,Completes<SchemaState>> {
private final Category category;
CategorizeAs(final Category category) {
this.category = category;
}
@Override
public void accept(final Schema protocol, final CategorizeAs command, final Completes<SchemaState> answer) {
protocol.categorizeAs(command.category).andThen(state -> answer.with(state));
}
}
...
}
To actually deliver the
CategorizeAs
command to its destination, the router uses the accept()
on the CategorizeAs
concrete CommandDispatcher
, a Java functional interface, to do so. The accept()
method sends the categorizeAs()
message and parameters to the SchemaEntity
actor through its registered protocol.
Last modified 2yr ago