vlingo/actors

How to use the vlingo/actors toolkit.

Actor Model Foundation

In 1973, Dr. Carl Hewitt and his colleagues formulated the Actor Model. In recent years, the inventor of object orientation, Alan Kay, has stated that the Actor Model retained more of what he thought were the important object ideas. So, when you think of the Actor Model, think of objects done right.

The vlingo/actors toolkit is an implementation of the Actor Model, and all primary platform components are built on our Actor Model implementation. Read on for detailed information on the use of vlingo/actors.

What Are Actors?

The vlingo/actors toolkit is an implementation of the Actor Model. The ideas behind the Actor Model are pretty simple, and these points show how vlingo/actors implement it.

  1. The basic unit of computation is expressed through actors. Actors are basically objects, but their behaviors are requested by sending messages asynchronously rather than through directly invoking their methods. This enables all communication between actors to be performed through asynchronous messaging passing. Determined by a scheduler, each actor that has been sent a message will be given the opportunity to receive and process it, which also happens asynchronously.

  2. Actors can create other actors. As Carl Hewitt is known to say, “One actor is no actors. Actors come in systems.” Thus, your applications should use not just some actors, but many actors. Understand that once you start down the road of asynchronous behaviors, you are all in. Just as you don’t kind of go swimming, because you are either completely wet, or you are not, you don't kind of use actors. If you try to fight the asynchrony, you will experience pain and software with very strange bugs.

  3. Each actor can designate the behavior it will exhibit for the next message it receives. This is roughly the State pattern. Any actor using the vlingo/actors toolkit can dynamically become another kind of actor in preparation for handling its current state and any subsequent messages.

  4. What is most unique about the vlingo/actors implementation is type safety by design, and the simplicity with which is it implemented and consumed. All that a programmer needs to understand is interfaces and implementation classes, and they get the asynchrony for free.

  5. Most other actor implementations use a receive method or code block that takes Object (or Any) as a parameter. Thus, the receive needs to determine which messages to accept and which ones are not permitted at any given time; this may be especially necessary when the actor designates its next behavior (it becomes another type of actor). Also what is unique about vlingo/actors is that the current designated behavior can be based on a different interface that is implemented by the actor. In other words, any one actor can implement multiple interfaces and receive messages for any given interface when it chooses to.

  6. Most modern actor model implementations use mailboxes to deliver messages. Each actor has a mailbox where messages are received into a FIFO queue, and each message is processed one at a time on an available thread. This is true for vlingo/actors, yet there are special kinds of mailboxes that have certain advantages (and possibly disadvantages).

  7. You can tune your actor’s world and stage to support any number of threads, but it’s best to limit this number according based on the available number of processor hyper-threaded cores, or a bit more. Fundamentally, you can’t run more threads simultaneously than there are available cores, e.g. Runtime.getRuntime().availableProcessors().

Using objects in a typical fashion, such as with Java or C#, we have become accustomed, even addicted, to a blocking paradigm.

Here a Client object invokes a method on a Server object. Understand that this is an in-process (in-VM) invocation, not a remote client and a remote server. The point is, when a method invocation occurs, the Client is blocked until the Server returns from the method invocation. In contrast, the Actor Model works differently.

When the Sender actor wants another actor to provide a service, it sends that actor a message. The message is sent to the Receiver actor and handled asynchronously, but not until a thread is available. The Sender continues moving forward with its current activities, and when completed returns from its own message handling.

As previously stated, with the various Actor Model implementations (e.g. Erlang and Elixir), neither messages or the message receiver are strongly typed. Yet, with vlingo/actors type-safe messages are the fundamental building block, not an experimental afterthought. A strongly-typed Actor Model implementation is important at this time when type safety is in high demand and can provide much more reliable systems.

With its careful but simple design, vlingo/actors are a great foundation on which to build the other tools in the vlingo/platform.

How Do Actors Work?

Actors collaborate by sending messages, one actor to another. When there are hundreds, thousands, or millions of actors, there are many actors sending messages simultaneously. Still, any one actor can send only one message to one other actor at a time. Following that, the same actor can send a message to the same actor or a different actor. This fulfills the first point of the following description of the actor message-receiving contract.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;

  • create a finite number of new actors;

  • designate the behavior to be used for the next message it receives.

There is no assumed sequence to the above actions and they could be carried out in parallel.

Actors also must be able to fulfill the second and third points: actors can create child actors, and actors can prepare themselves for subsequent message receipt.

Actors in Action

Now consider a brief tutorial on vlingo/actors. This tutorial takes you through preparing your build environment and also how to implement two actors that collaborate to accomplish a goal.

To get started, create your own playground project to work with. You can name this project playground. If you use Maven, place a dependency into your playground’s pom.xml file.

...
<dependencies>
<dependency>
<groupId>io.vlingo</groupId>
<artifactId>vlingo-actors</artifactId>
<version>x.y.z</version>
</dependency>
</dependencies>
...

If you prefer Gradle, insert the following into your build.gradle.

dependencies {
compile 'io.vlingo:vlingo-actors:x.y.z'
}
repositories {
jcenter()
}

The x.y.z is a semantic version number and reflects the version of vlingo-actors JAR file that you depend on. This number may be something such as 1.2.0. You will find the available versions, including the most recent version, available on one of the supported public repositories. One such is Bintray.

Additionally, add a JUnit dependency into your build script since the tutorial uses JUnit to run the actor collaboration.

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
...

Or if using Gradle:

dependencies {
compile 'junit:junit:4.11'
}

Although there are a few different ways to configure the vlingo/actors runtime environment, we will skip that step here. It's easier to start with reasonable defaults instead. The configuration approaches are presented later.

Without delay, consider the really fun part—the programming. You need to create a few Java interfaces and classes. There’s a Java interface that acts as the type safe messaging protocol of the first actor that you will create. For now, create a Pinger interface with a single method definition, named ping(), which takes a Ponger as a parameter.

package playground;
import io.vlingo.actors.Stoppable;
public interface Pinger extends Stoppable {
void ping(final Ponger ponger);
}

Next create a Ponger interface the same way, but with a pong() method that takes a Pinger as a parameter.

package playground;
import io.vlingo.actors.Stoppable;
public interface Ponger extends Stoppable {
void pong(final Pinger pinger);
}

Now you have two protocols or two different actors. These define the type-safe behaviors that one or more actors will implement, and the means by which clients will interact with the actors. In case it’s not obvious, Pinger is a client of Ponger, and Ponger is a client of Pinger.

It’s time to create two simple actors. First create one to implement the Pinger protocol.

package playground;
import io.vlingo.actors.Actor;
public class PingerActor extends Actor implements Pinger {
private final Pinger self;
public PingerActor() {
self = selfAs(Pinger.class);
}
public void ping(final Ponger ponger) {
ponger.pong(self);
}
}

After that, create another actor to implement the Ponger protocol.

package playground;
import io.vlingo.actors.Actor;
public class PongerActor extends Actor implements Ponger {
private final Ponger self;
public PongerActor() {
self = selfAs(Ponger.class);
}
public void pong(final Pinger pinger) {
pinger.ping(self);
}
}

You now have two actors that collaborate to play ping pong. The problem is that these actors will play ping pong nonstop, forever, unless we do something to prevent that. Doing so demonstrates how actors can maintain their own state, just like typical objects.

package playground;
import io.vlingo.actors.Actor;
public class PingerActor extends Actor implements Pinger {
private int count;
private final Pinger self;
public PingerActor() {
count = 0;
self = selfAs(Pinger.class);
}
public void ping(final Ponger ponger) {
if (++count >= 10) {
self.stop();
ponger.stop();
} else {
ponger.pong(self);
}
}
}

Looking back at the Pinger and Ponger interface definitions, you will notice that both of these protocols extend the Stoppable protocol. Thus, they can both be stopped by other actors that have a Stoppable reference to them. We use that capability from within PingerActor to cause both actors to stop when the count reaches 10.

Note that in this case the actors are not required to implement their own stop() methods. That’s because the abstract base class, Actor, implements stop() for them. You could override stop() to find out when your actor is being stopped, but that’s not necessarily a good idea. What if you forgot to invoke the super’s stop()? That would make you think that your actor was going to stop, but the actor would never shut down because the Actor base class behavior would never be run. If you want to know when you are being stopped, you can override one of the four life cycle methods instead of stop().

package playground;
import io.vlingo.actors.Actor;
public class PingerActor extends Actor implements Pinger {
private int count;
private final Pinger self;
public PingerActor() {
count = 0;
self = selfAs(Pinger.class);
}
public void ping(final Ponger ponger) {
if (++count >= 10) {
self.stop();
ponger.stop();
} else {
ponger.pong(self);
}
}
@Override
protected void afterStop() {
logger().log("Pinger " + address() + " just stopped!");
super.afterStop();
}
}

All five life cycle methods are:

  • beforeStart()

  • afterStop()

  • beforeRestart(final Throwable reason)

  • afterRestart(final Throwable reason)

  • beforeResume(final Throwable reason)

These enable you to see when significant life cycle events occur with your actor. The restart life cycle methods are related to actor supervision. When your actor’s supervisor sees your actor failed with an Exception, it can take a number of actions. Your supervisor can tell your actor to resume, to stop, or to restart. If it tells your actor to resume, the beforeResume() is invoked. When it tells your actor to restart, the beforeRestart() is invoked first, and then the afterRestart() is invoked. Since your actor has failed, it may have been left in an invalid state. In such cases, these three life cycle methods give your actor the opportunity to clean up after the problem that caused the Exception and also reinitialize itself before reacting to its next available protocol message.

The Exception recovery methods DO NOT cause the Actor instance to be completely discarded and recreated. Therefore, it is the responsibility of the Actor to set its state to a safe point before message processing resumes.

The above afterStop() method shows two additional perks of vlingo/actors. All actors have a personal address, which is available through your inherited address() method. Also, all actors have a Logger available via its logger() method. Any information that you log will be output asynchronously through a registered Logger actor, so your actor won't block while file output is performed.

Alright, we have two actors, but how do we bring the actors to life in the first place, and how do we get them to start collaborating in game play? Here’s how you start up the World for your actors to play in.

package playground;
import org.junit.Test;
import io.vlingo.actors.Definition;
import io.vlingo.actors.World;
public class PlaygroundTest {
@Test
public void testPlayPingPong() {
final World world = World.startWithDefaults("playground");
final Pinger pinger = world.actorFor(Pinger.class, PingerActor.class);
final Ponger ponger = world.actorFor(Ponger.class, PongerActor.class);
pinger.ping(ponger);
pauseThisThread();
world.terminate();
}
}

When this test is run, a World is created. The World is a major component of vlingo/actors. In a nutshell, a World is the primary container within which actors live and play. Generally you would create only one World per service instance. In DDD terms, a World is the root of a Bounded Context.

After the World is started, two actors are created, and a reference to their respective protocol is returned. Each actor is created by passing its protocol and the concrete actor type. You may also create actors by means of a Definition. The Definition indicates the class of the actor that implements the protocol, such as PingerActor.class, which implements the Pinger.class protocol. In this example neither actor takes constructor parameters. If they did, you would replace the Definition.NoParameters with a list of parameters in the order that the constructor takes them. Consider this following (non-working) example of the PingerActor taking two parameters, a String and an int.

final Pinger pinger = world.actorFor(
Pinger.class,
Definition.has(
PingerActor.class,
Definition.parameters("Hey, yo!", 42),
"name-that-actor"));

This example demonstrates that a Definition can be used to define parameters and also other actor characteristics, such as it’s text String name. Even so, the simplest way to create an actor with constructor parameters is by means of this shorthand method.

final Pinger pinger = world.actorFor(Pinger.class, PingerActor.class, "Hey, yo!", 42);

Look over the vlingo-actors source code repository for the several different ways that an actor can be created, including with a specific parent, a non-default logger, and a specialized supervisor.

One additional point about the unit test is appropriate. As you probably noticed, a method named pauseThisThread() is used.

...
private void pauseThisThread() {
try { Thread.sleep(100); } catch (Exception e) { }
}
...

Some sort of coordination is necessary because the actors send and receive all protocol messages asynchronously. Recall that there will be a total of 10 pings. Since the messages are all delivered and reacted to asynchronously, there is no “automatic” way to know when all the messages, including the stop() for both actors, have been delivered.

Don't use Thread.sleep()in your tests or your production services.

Even so, this particular sleep approach is not preferable, because on different machines a given sleep time may be insufficient for all messages to process. It's actually guess work to try to get this right. Additionally, if you configure a long-enough sleep time that will work for every possible machine and process load, it's going to make your tests slow on very fast machines and environments. So, one reason for showing you this is to emphasize that you should not do that.

The following shows how you can more conveniently test actors without using the thread sleep artifice. It uses the io.vlingo.actors.testkit.TestUntil component. This example also demonstrates how actors take constructor parameters. In the test method, create an instance of the TestUntil to pass to the PingerActor constructor.

public class PlaygroundTest {
@Test
public void testPlayPingPong() {
final World world = World.start("playground");
final TestUntil until = TestUntil.happenings(1);
final Pinger pinger = world.actorFor(Pinger.class, Definition.has(PingerActor.class, Definition.parameters(until)));
final Ponger ponger = world.actorFor(Ponger.class, Definition.has(PongerActor.class, Definition.NoParameters));
pinger.ping(ponger);
until.completes();
world.terminate();
}
}

Then refactor PingerActor to take a TestUntil instance as a constructor parameter. Also cause a happened() in its afterStop() method to signal to the test that the Pinger has stopped.

public class PingerActor extends Actor implements Pinger {
private int count;
private final Pinger self;
private final TestUntil until;
public PingerActor(final TestUntil until) {
this.until = until;
this.count = 0;
this.self = selfAs(Pinger.class);
}
...
@Override
protected void afterStop() {
logger().log("Pinger " + address() + " just stopped!");
until.happened();
super.afterStop();
}
}

Before the afterStop() method causes the until.happened() the test method will block. As soon as the until.happened() causes its state to transition from 1 to 0, the test will unblock and the World will terminate. This enables the test to complete.

Although passing a test construct into a production-quality actor is poor design choice, this example is only to show you that there are very reliable ways to test actors in an asynchronous messaging environment. Later you will see much better uses of TestUntil and other io.vlingo.actors.testkit tools.

In order to make the ping pong playground produce some output, create some log output in the ping() and pong() methods.

// in Pinger
...
public void ping(final Ponger ponger) {
++count;
logger().log("ping " + count);
if (count >= 10) {
self.stop();
ponger.stop();
} else {
ponger.pong(self);
}
}
...
// in Ponger
public void pong(final Pinger pinger) {
logger().log("pong");
pinger.ping(self);
}
...

When the test is run, you will see the following output.

vlingo/actors(test): ping 1
vlingo/actors(test): pong
vlingo/actors(test): ping 2
vlingo/actors(test): pong
vlingo/actors(test): ping 3
vlingo/actors(test): pong
vlingo/actors(test): ping 4
vlingo/actors(test): pong
vlingo/actors(test): ping 5
vlingo/actors(test): pong
vlingo/actors(test): ping 6
vlingo/actors(test): pong
vlingo/actors(test): ping 7
vlingo/actors(test): pong
vlingo/actors(test): ping 8
vlingo/actors(test): pong
vlingo/actors(test): ping 9
vlingo/actors(test): pong
vlingo/actors(test): ping 10

You can find an implementation of this tutorial code in the vlingo-examples repository.

Now that this tutorial has given you the some of the most import knowledge about vlingo/actors, you are ready to take a deeper dive into more details about the other facilities provided by vlingo/actors.

API

This section provides a how-to for the vlingo/actors toolkit API. The details are covered in sections. Some of these details are already demonstrated in the previous sections, including the tutorial.

Starting and Terminating the Actor Runtime

To start up the vlingo/actors runtime you the World object as follows.

final World world = World.startWithDefaults("my-world");

This starts a World with normal runtime defaults in which Actor instances are created and run. For many uses of vlingo/actors the defaults are the easiest and safest way to use the Actorruntime.

There are a few different ways to start a World. The following is a summary.

Use this API when you want to start a World by loading configurations from the file named vlingo-actors.properties. The name is used to name the World instance.

public static World start(final String name)

The following API is used when you want to start a World with your own name-value pairs using java.util.Properties defined in code. The details of the vlingo-actors.properties file are discussed below.

public static World start(final String name, final java.util.Properties properties)

A World can be started using fluent configuration.

public static World start(final String name, final Configuration configuration)

The details of programatic Configuration are discussed below.

When you are preparing to shut down your application or service that is using the World, you should use the following to terminate the Actor runtime.

world.terminate();

The World::terminate() method is currently a synchronous operation, but in the future will become asynchronous. When the World terminates asynchronously there will be a Completes<T> or callback construct to inform the client when the termination has completed.

Using the vlingo-actors.properties File

The Actor runtime may be configured by means of a file that adheres to the java.util.Properties conventions. Each property is defined by a name followed by = and then a value. For the vlingo/actors toolkit the file must be named vlingo-actors.properties and be located in the runtime classpath. The following shows how the standard ConcurrentQueueMailbox can be defined in this properties file.

plugin.name.queueMailbox = true plugin.queueMailbox.classname =\ io.vlingo.actors.plugin.mailbox.concurrentqueue.ConcurrentQueueMailboxPlugin plugin.queueMailbox.defaultMailbox = true plugin.queueMailbox.numberOfDispatchersFactor = 1.5 plugin.queueMailbox.dispatcherThrottlingCount = 1

When defining properties in a properties file, long lines may be continued by placing an escape character of \ at the end of the line to be continued on the next line. To see example properties that can be used, you should review the following file.

vlingo-actors/src/test/resources/vlingo-actors.properties

Also the next subsection shows several configuration type objects and options.

Using Programmatic Configurations

As an alternative to using the file-based configuration, you can instead employ a configuration approach that provides a fluent API, an example of which follows.

final Configuration configuration = Configuration .define() .with(PooledCompletesPluginConfiguration .define() .mailbox("queueMailbox") .poolSize(10)) .with(SharedRingBufferMailboxPluginConfiguration .define() .ringSize(65535) .fixedBackoff(2) .dispatcherThrottlingCount(10)) .with(ManyToOneConcurrentArrayQueuePluginConfiguration .define() .ringSize(65535) .fixedBackoff(2) .dispatcherThrottlingCount(10) .sendRetires(10)) .with(ConcurrentQueueMailboxPluginConfiguration .define() .defaultMailbox() .numberOfDispatchersFactor(1.5f) .dispatcherThrottlingCount(10)) .with(JDKLoggerPluginConfiguration .define() .defaultLogger() .name("vlingo/actors(test)") .handlerClass(DefaultHandler.class) .handlerName("vlingo") .handlerLevel("ALL")) .with(CommonSupervisorsPluginConfiguration .define() .supervisor("default", "pingSupervisor", Ping.class, PingSupervisorActor.class) .supervisor("default", "pongSupervisor", Pong.class, PongSupervisorActor.class)) .with(DefaultSupervisorOverridePluginConfiguration .define() .supervisor("default", "overrideSupervisor", DefaultSupervisorOverride.class)) .usingMainProxyGeneratedClassesPath("target/classes/") .usingMainProxyGeneratedSourcesPath("target/generated-sources/") .usingTestProxyGeneratedClassesPath("target/test-classes/") .usingTestProxyGeneratedSourcesPath("target/generated-test-sources/");

Follow the configuration definition it can be used to start the World instance.

final World world = World.start("my-world", configuration);

World Miscellaneous Resources

In addition to the above facilities, a World provides the following. All concrete Actor instance may obtain both their Stage and their World instances as follows, which enables the Actor to reach specific facilities offered by each.

// inside an actor final Configuration configuration = stage().world().configuration();

You may require the use of the means to create unique Actor addresses or a way to produce an address from a primitive or String value. To do so, request it by means of the method addressFactory().

public AddressFactory addressFactory()

You may obtain the immutable Configuration of the World runtime. Even if you load your runtime properties from the vlingo-actors.properties or your own java.util.Properties definition, all of your runtime configurations are placed in the Configuration.

public Configuration configuration()

All messages sent to Actor instances that cannot be delivered for any reasons, such as the Actor instance has previously been stopped, are delivered to the special Actor know as DeadLetters. You may subscribe to receives DeadLetters messages.

public DeadLetters deadLetters()

Use the following DeadLetters protocol method to subscribe to its received messages. Your listener Actor must implement the DeadLettersListener protocol. void registerListener(final DeadLettersListener listener)

If you want to obtain the default Logger that is provided to all Actor instances, use the following method.

public Logger defaultLogger()

Every top-level application- or service-created Actor is a child of the default parent Actor.

public Actor defaultParent()

Every Actor must be assigned to a overarching supervisor. The following provides a reference to the default supervisor of all newly created Actor instance.

public Supervisor defaultSupervisor()

Although the default Logger is available through the World interface, there may also be a number of named Logger instances. If you use non-default Logger instances, they may be obtained via the following World facility.

public Logger logger(final String name)

The Logger protocol provides two primary facilities for logging application/service output and exceptions.

void log(final String message)

void log(final String message, final Throwable throwable)

The World interface provides some additional facilities, but ones that are useful only for plugins. Those are documented below.

Creating Actors

Once you have started a World you are able to create Actor instance to run in it. As a reminder, there is nothing mysterious about actors. Actors are basically objects, but their behaviors are requested by sending messages asynchronously rather than through directly invoking their methods. The following shows you how to create an actor.

final Simple simple = world.actorFor(Simple.class, SimpleActor.class);

In this example the World is used to create a new instance of the SimpleActor type. The SimpleActor type implements the protocol defined by the interface named Simple. Thus, actors provide protocol implementations, and they can implement multiple protocols.

In this example the SimpleActor takes no constructor parameters. If it did accept parameters then the parameters could be listed as follows.

final Simple simple = world.actorFor(Simple.class, SimpleActor.class, p1, p2, p3);

The parameters must be listed in the order in which to constructor accepts them. In the above example the parameters are listed as p1, p2, and p3. These parameters could be of any type, and each parameter is required to follow the convention that the constructor contract requires.

When the World method actorFor() returns, the requesting client is given a reference to the protocol that provides asynchronous access to the Actor. This reference is used to send messages via methods on the protocol. The Simple protocol is defined as follows.

public interface Simple { void simpleSay(); }

There is a single method named simpleSay(). Messages are sent asynchronously to the concrete Actor instance, which in the case of the above example is an instance of SimpleActor. This behavior is demonstrated by the following expression.

simple.simpleSay();

The fundamental behavior of a method invocation on the protocol reference is to reify the method invocation to a message that is sent asynchronously to the Actor. The reification is accomplished by creating a Function that represents the method invocation and enqueuing the Function instance on to the Actor's Mailbox. The Function will later be applied to the Actorinstance when all previously enqueued messages have been processed and a thread becomes available to process the message at the front of the Mailbox. Thus, the method invocation is temporally decoupled from the sending side and the receiving side.

A Stage for Actors

The World does not directly create an Actor. Instead, the World dispatches actorFor() requests to the default Stage. It is the Stage that provides the concrete implementation of actorFor().

A Stage is the abstraction within which Actor instances are maintained. A World has at least one Stage, which is known as the default Stage, and its name is "__defaultStage". If you need to query the instance of the default stage, you use the following World method.

public Stage stage()

Every Stage supports two important behaviors, actorFor() and actorOf(). The actorFor() is a creational method, and there are several overloads supporting various parameter options. Using the method various implementations of the method actorFor() will create a new Actor instance and answer one or more protocols. Once you have a reference to a protocol you are able to send messages to the given Actor that implements the protocol(s).

The actorOf() method has a different purpose. Given an existing Actor that is contained within a given Stage, you may use actorOf() to find the existing Actor by passing the Address of the Actor. The actorOf() answers a Completes<T> because it is an asynchronous operation, answering the requested protocol eventually.

An Actor may obtain its containing Stage and its World as follows.

// inside an actor final Stage myStage = stage(); final World myWorld = myStage.world();

You may create additional Stage instances within the World, but the default Stage is automatically provided when a World starts. The World default Stage is responsible for holding the private root actor and the public root actor. These two instances act as default supervisors, which are responsible for protecting a World from catastrophic failures. Supervision is explained in detail below.

Since the World must create some default operational Actor instances when it is started, it may be best to segregate your application/service Actor instances into another Stage. It's simple to accomplish, as seen here.

public Stage stageNamed(final String name)

If the Stage instance with the given name does not yet exist, a new Stage with that name is created and returned.

We don't suggest creating several or many Stage instances. It's likely that the default Stage and one application/service Stage instance will be sufficient. Yet, we don't set a limit on the number of Stage instances in case more than two would be useful.

Scheduler and the Scheduled Protocol

Every Stage has its own Schedular, which may be used to produce time-lapsed events that are sent to an Actor.

// inside an actor final DataPacket packet = new DataPacket(0); stage().scheduler().schedule(selfAs(Scheduled.class), packet, 100, 1_000);

An Actor can schedule itself or another Actor, such as one or more of its children, for some timed event notification. In other words, it need not pass itself as the Scheduled instance.

The above example registers a repeating schedule that will begin within 100 milliseconds of the registration and will repeat every 1 second (1_000 milliseconds). The Actor scheduling itself for notifications must implement the Scheduled protocol, and it passes an Actor enabled reference to that effect using the runtime method selfAs(). The Actor can associate some specific data with which it will be notified on each event, which in this example is the DataPacket instance packet. This DataPacket type is only used for the example, and would be replaced with your own type, or you can pass null if the data is unused.

Similarly you can schedule a single notification. The interval will not not be repeated as in the above example.

// inside an actor final DataPacket packet = new DataPacket(0); stage().scheduler().scheduleOnce(selfAs(Scheduled.class), packet, 100, 1_000);

When registering a Scheduled object you are provided a Cancellable instance. You may use this instance to cancel one-time or repeating occurrences.

this.cancellable = stage().scheduler().scheduleOnce(...); ... cancellable.cancel();

The Actor receiving the timed event will be notified using the intervalSignal() method of the Scheduled protocol.

public interface Scheduled { void intervalSignal(final Scheduled scheduled, final Object data); }

It may be implemented something like the following.

@Override public void intervalSignal(final Scheduled scheduled, final Object data) { ((DataPacket) data).ifAccumulatedEnd(cancellable -> cancellable.cancel()); }

Actor Supervision

The Actor Model supports scoped supervision of Actor instances. When the World is created with default configuration, the following hierarchy of supervision is established.

PrivateRootActor (literal) PublicRootActor (literal) ApplicationActor1 (example) ApplicationActor1_1 (example)

A concrete Actor that serves as a supervisor must implement the protocol defined by the io.vlingo.actors.Supervisor. Two of the standard supervisors know as the PrivateRootActor and the PublicRootActor implement the io.vlingo.actors.Supervisor protocol. Furthermore, if ApplicationActor1 and ApplicationActor1_1 are to serve as supervisors, they too must implement the io.vlingo.actors.Supervisor protocol.

The above example demonstrates that there is a base supervisor known as the PrivateRootActor. It is the ultimate supervisor that protects the World from catastrophic failure by serving as an impenetrable shield against lower-level failures. Just below the PrivateRootActor is the PublicRootActor. All top-level application/service Actor instances, such as ApplicationActor1 in the above example, are supervised by the PublicRootActor. Any children of the ApplicationActor1, such as ApplicationActor1_1 in the above example, are supervised by ApplicationActor1.

Thus, any newly created concrete Actor whose parent is known and implements the standard Supervisor protocol is the child's supervisor. If the parent is not known or the new Actor is top-level, then both its parent and its supervisor are assigned as the PublicRootActor.

You may overrides the default supervisor arrangement using the vlingo-actors.properties configuration.

plugin.name.override_supervisor = true plugin.override_supervisor.classname = io.vlingo.actors.plugin.supervision.DefaultSupervisorOverridePlugin plugin.override_supervisor.types =\ [stage=default name=overrideSupervisor supervisor=io.vlingo.actors.plugin.supervision.DefaultSupervisorOverride]

This will assign the default supervisor as the plugin DefaultSupervisorOverride, which is explained under the plugins section. This override may also be defined using fluent configuration.

final Configuration configuration = Configuration .define() ... .with(DefaultSupervisorOverridePluginConfiguration .define() .supervisor("default", "overrideSupervisor", DefaultSupervisorOverride.class)) ...

When and Actor throws an exception, the exception is caught by the runtime. When the exception is caught, the Actor is suspended, meaning that it is not permitted to process any messages until the exception has been handled. The exception will be handled by its supervisor when the exception is reified as a message and sent asynchronously to its supervisor. When received, the exception is interpreted and the supervisor's recovery strategy is employed. The supervisor strategy can specify the following actions.

  • Resume operation of the Actor starting with its next message

  • Restart the Actor

  • Stop the Actor, which implies also stopping any of its children

  • Escalate the recovery up another ancestor level

When the supervisor strategy is to restart, a restart is done only for a maximum number of restarts within a stipulated timeframe. In other words, the Actor may not be permitted to crash repeatedly over an extended period of time. To control restarts or leave them available continuously, the following stipulations may be used.

  • If the current restart intensity count is within the specified intensity over the given period of time, restart the Actor.

  • An example of intensity is 10 times within a period of 5 seconds, or 50 times over a period of 1 minute.

  • You may specify a maximum intensity of an unlimited number of times over an infinite time period.

For examples see the following.

io.vlingo.actors.PublicRootActor io.vlingo.actors.SupervisionStrategy io.vlingo.actors.DefaultSupervisor io.vlingo.actors.DefaultSupervisorStrategy

Plugins

The vlingo/actors toolkit provides the following plugins.

Completes

Some Actor protocols answer outcomes to one or more messages that they handle. Because messages are sent and received asynchronously, the sender will not block until the answered outcome is available. Thus, the contract must be asynchronous for a returned value from theActor to the message-sending client. This contract is satisfied by means of the Completes<T> mechanism. The client sender of an outcome-answering Actor protocol message immediately receives a Completes<T> object. The client then registers a function with the Completes<T> object, and the registered function will be called when the answer becomes available. You may read more about the Completes<T> capabilities in the chapter vlingo/common.

Sometimes an asynchronous message outcome must be helped by some additional Actor-based plumbing. This is where the CompletesEventually plugin is used. This plugin is known as "pooledCompletes", and as its name indicates, a pool of Actor instances is used to process possible high volumes of eventual outcomes.

You may configure this pool using the standard properties file as follows, or by using the fluent configuration API.

plugin.name.pooledCompletes = true plugin.pooledCompletes.classname =\ io.vlingo.actors.plugin.completes.PooledCompletesPlugin plugin.pooledCompletes.pool = 10 plugin.pooledCompletes.mailbox = queueMailbox

This configuration establishes 10 Actor instances that may be used to handle all eventual outcomes. Those Actor instances will used the queueMailbox as their mailbox type (see Mailbox plugin below).

Logging

The standard logging capabilities of the io.vlingo.actors.Logger protocol are provided by the logging plugin found in package io.vlingo.actors.plugin.logger. The JDKLoggerPlugin is the current default logger found under io.vlingo.actors.plugin.logger.jdk. There is also a "no op" logger that may be used for quicker testing, packaged under io.vlingo.actors.plugin.logger.noop.

The configurations for the JDK logger follow those available with java.util.logging. See the following class for configuration details.

io.vlingo.actors.plugin.logging.jdk.JDKLoggerPlugin.JDKLoggerPluginConfiguration

Mailbox

There are three mailbox implementations provided as default plugins.

  1. Name "arrayQueueMailbox": A fast mailbox based on the Agrona project's non-blocking many-to-one concurrent array queue. io.vlingo.actors.plugin.mailbox.agronampscarrayqueue.ManyToOneConcurrentArrayQueueMailbox

  2. Name "queueMailbox": An non-blocking unbounded mailbox based on the Java ConcurrentLinkedQueue implementation. Being an unbounded mailbox means that this one can result in OutOfMemoryException if any given Actor receives many messages but cannot process them as quickly as received. io.vlingo.actors.plugin.mailbox.concurrentqueue.ConcurrentQueueMailbox

  3. Name "ringBufferMailbox": A fast mailbox based on a non-blocking ring buffer algorithm. io.vlingo.actors.plugin.mailbox.sharedringbuffer.SharedRingBufferMailbox

Mailboxes 1 and 3 are primarily used by a single Actor that requires very high throughput, or multiple Actor instances that can share a mailbox. For single Actor instances the mailbox can be allocated with fewer available element slots. When shared among a few or several Actor instances the mailbox should be created with enough elements to prevent stalling the sending of new messages into the array.

Mailbox 2 is meant for single Actor instances, and is the standard default unless another mailbox is named when the Actor is created.

Configuration examples can be found here: vlingo-actors/src/test/resources/vlingo-actors.properties

Supervision

Supervision alternatives may be provided as plugins. The two possible types of extended supervision are as follows.

  1. Name "override_supervisor": Registering this plugin enables an override for the standard default supervisor provided by PublicRootActor.

  2. Name "common_supervisors": Registering this plugin enables any number of different supervisors used to protect against crashed actors of specific protocol types. In other words, you may register a supervisor that will be used to handle exceptions of all actors that implement a specific protocol, and are handling a message for that protocol when the exception is thrown.

The following shows configuration examples for these two kinds of override supervisors found in the file: vlingo-actors/src/test/resources/vlingo-actors.properties

plugin.name.override_supervisor = true plugin.override_supervisor.classname =\ io.vlingo.actors.plugin.supervision.DefaultSupervisorOverridePlugin plugin.override_supervisor.types =\ [stage=default name=overrideSupervisor \ supervisor=io.vlingo.actors.plugin.supervision.DefaultSupervisorOverride] plugin.name.common_supervisors = true plugin.common_supervisors.classname =\ io.vlingo.actors.plugin.supervision.CommonSupervisorsPlugin plugin.common_supervisors.types =\ [stage=default name=pingSupervisor protocol=io.vlingo.actors.supervision.Ping \ supervisor=io.vlingo.actors.supervision.PingSupervisorActor] \ [stage=default name=pongSupervisor protocol=io.vlingo.actors.supervision.Pong \ supervisor=io.vlingo.actors.supervision.PongSupervisorActor]

Actor Runtime and Life Cycle

Every Actor has a life cycle. The Actor is first created and begins life. The Actor may receive and process a number of messages. After some time, the Actor may have reached the extent of its usefulness. At that point some component, possibly another Actor, will request that it be stopped. At that time the Stage containing the Actor instance will remove it from its internal directory, and stop it.

Common Protocols

The following protocols are available to Actor instances to help manage its life cycle.

  1. Protocol Startable: All concrete Actor types support this protocol. When the Actor instance is first being started, it receives a start() message. If the concrete Actor wants to react to this message it must override the default behavior, which does nothing.

  2. Protocol Stoppable: All concrete Actor types support this protocol. When the Actor instance is first being stopped, it receives a stop() message. Unlike with the Startable protocol, the base Actor does provide specific critical behavior for its stop() message. Therefore, if the concrete Actor wishes to react to this message by overriding to default, it must ensure that the base implementation is invoked by using super.stop(). If your Actor override attempts to use certain facilities, such as children, the outcome is unpredictable. Due to this somewhat finicky behavior, it may be best for your Actor to instead override the afterStop() life cycle message handler.

  3. Protocol Scheduled: Concrete Actor types by default do not support this protocol, and it must implemented when using the Scheduler to receive time-based events.

Actor Life Cycle Messages

The Actor abstract base class provides five life cycle message handlers:

  • beforeStart()

  • afterStop()

  • beforeResume(final Throwable reason)

  • beforeRestart(final Throwable reason)

  • afterRestart(final Throwable reason)

These enable you to see when significant life cycle events occur with your actor.

The beforeStart() message is sent and handled before each Actor is fully started. The default behavior does nothing. If you wish to handle this message in your concrete Actor you must override it.

The afterStop() message is sent an handled after each Actor is fully stopped. The default behavior does nothing. If you wish to handle this message in your concrete Actor you must override it.

The restart life cycle methods are related to actor supervision. When your actor’s supervisor sees your actor failed with an Exception, it can take a number of actions. Your supervisor can tell your actor to resume, to stop, or to restart. If it tells your actor to resume, the beforeResume() is invoked. When it tells your actor to restart, the beforeRestart() is invoked first, and then the afterRestart() is invoked.

Since your actor has failed, it may have been left in an invalid state. In such cases, these life cycle methods give your actor the opportunity to clean up after the problem that caused the Exception and also reinitialize itself before reacting to its next available protocol message.

The Exception recovery methods DO NOT cause the Actor instance to be completely discarded and recreated. Therefore, it is the responsibility of the Actor to set its state to a safe point before message processing resumes.

Since all actors have a Logger available via its logger() method, any information that you log due to a life cycle message will be output asynchronously through a registered Logger. Your Actor won't block while output is performed.

Becoming Other Characters

One of the primary capabilities of an Actor is to prepare itself to receive and handle its next message. Since the vlingo/actors toolkit fully supports the Actor Model of computation, we enable you to morph any given Actor to the protocol that suites its current state. Thus, and Actor is a state machine and when it reaches any given state its behavior may change to support the specific state.

Changing characters dynamically is managed by the io.vlingo.actors.Characters mechanism. This test demonstrates how: io.vlingo.actors.CharactersTest

Testing Actors

Testing asynchronous components can be quite challenging. Yet, the vlingo/actors toolkit makes it quite simple to manage. There are a few components that help.

  1. Component TestWorld: This component wraps the standard World object with test capabilities. It supports all of the standard World facilities, but in a specialize way that assist in testing. A TestWorld may be started in exactly the same ways was a World. Creating an Actor through the TestWorld actually creates a TestActor<T>.

  2. Component TestActor<T>: This component is a thin wrapper around the T typed concrete Actor instance. There are three ways to access the Actor to send it a message: (a) Using its T typed protocol, but via a synchronous mailbox; messages are sent and received synchronously. (b) By using the Actor instance directly per the default protocol, enabling direct method invocations. (c) By dynamically casting the Actor instance to another one of its supported protocols, and then directly invoking a method (like b, but using a non-default protocol).

  3. Component TestState: Every Actor implements the TestStateView protocol, enabling tests to request data of the internal state of the Actor. Using method viewTestState() a test can acquire the TestStateof the Actor. A TestState is a set of key-value pairs, providing internal state data that a given concrete Actor is willing to share with a test in order for expectations to be asserted.

  4. Component TestUntil: This component is backed by a Java CountDownLatch, enabling a test to wait on an expected number of happenings in a tested unit. A TestUntil is instantiated by a test to a predetermined number of happenings, such as three. The test invokes until.completes(), which blocks until the number of happenings occurs. The TestUntil instance is shared with a unit under test, and each time that an expected happening takes place, the TestUntil is notified using its happened() protocol. Each happened() notification causes the internal atomic counter to decrement. When the expected number of occurrences has happened, the CountDownLatch reaches zero, and the client unit test side unblocks, allowing the test to complete.

You can see these facilities used throughout the vlingo/platform tests. One place to see the test facilities is to view its own tests: io.vlingo.actors.testkit.TestkitTest

All actors created directly through the TestWorld are assigned a mailbox type of TestMailbox. The TestMailbox does not queue messages, but performs immediate, synchronous delivery. This is done with the purpose of instantly testing the impact of each received message on the receiving actor. Such a test does not have to wait for asynchronous message delivery, making it much simpler to see the resulting message-drive state transition on the receiving actor, for example. When testing actors created through the conventional World, the test must employ a mechanism such as TestUntil to eventually see and assert against the expected outcomes. It is not always convenient to mock a protocol interface that is backed by a TestUntil instance, making an actor created through the TestWorld and possessing a TestMailbox an essential tool.

Yet, you must be careful when using an actor created by the TestWorld because such actors don't have asynchronous semantics. If you use several such actors together, it is almost certain that you will experience race conditions. As with all software tools, TestWorld and actors created through it, come with tradeoffs. Use the tools with this knowledge. Note that inside every TestWorld is a normal World that you can query.

World world = testWorld.world();

This enables the creation of both TestActor<T> and conventional actor instances.

Thread-safe Data Access

There is a specific io.vlingo.actors.testkit tool for maintaining thread-safety of data shared between tests and the vlingo/platform components. The tools is named io.vlingo.actors.testkit.AccessSafely and provides multi-threaded access for both the test thread and the actors being run by the test.

It should be understood that multi-threading is hard to get right, and working with Java on modern CPUs can make it even more complex. Modern CPUs are not only multi-core, but also have performance optimizations that perform what may seem to be counter intuitive operations. One such operations it executing code out of sequence from the actual implementation seen in source code. Thus, even when using TestUntil to protect access to data until it has been written to by an actor running on a separate thread, if the visible/logical program code is not run in the expected order, other threads can see unexpected values.

atomicValue.set(1); until.completed();

In this very simple example, what would happen if the CPU decided that it would be a more performant to execute the second statement first, and the first statement second? Impossible? Consider these statements in the Wikipedia article on the Java Memory Model.

The major caveat of this is that as-if-serial semantics do not prevent different threads from having different views of the data.... The basic rules imply that individual actions can be reordered, as long as the as-if-serial semantics of the thread are not violated.

What would happen is the test thread that is blocking on the TestUntil completion would unblock and attempt to read the atomicValue. If the other thread has not yet written 1 to the atomicValue, the test thread will see the wrong value and its assertion will fail. When you review the code you will insist that it is impossible for the atomicValue to remain unchanged if the TestUntil completed, and yet due to the CPU architecture you would be wrong.

Next, consider a related and critical point from the same Java Memory Model article.

Actions that imply communication between threads, such as the acquisition or release of a lock, ensure that actions that happen prior to them are seen by other threads that see their effects. For example, everything that happens before the release of a lock will be seen to be ordered before and visible to everything that happens after a subsequent acquisition of that same lock.

The AccessSafely testkit component helps you avoid such problems and solve them using the aforementioned techniques, relieving you from the burden of writing complex test code.

To use this tool, first create a component to be used under the control of the test, such as a mock object, such as one passed to an actor. The mock would resemble the expected asynchronous argument, but is not an asynchronous implementation. One such example is creating a mock implementor of PersistResultInterest, such as is done in vlingo/symbio with MockPersistResultInterest. Please see the tests that use the mocks. The following is a snippet from one part of the mock.

Inside the MockPersistResultInterest you set up some writers for the inside to write with, and readers for the test to read with.

public AccessSafely afterCompleting(final int times) { access = AccessSafely .afterCompleting(times) .writingWith("add", (value) -> persistentObjects.add((int) value)) .writingWith("addAll", (values) -> persistentObjects.addAll((List) values)) .readingWith("object", (index) -> persistentObjects.get((int) index)) .readingWith("size", () -> persistentObjects.size()); return access; }

This creates a new AccessSafely for some number of write occurrences followed by the registration of properties with write and read access. The property names are provided as String values. The actual writers and readers are lambdas.

The times of predetermined actions is backed by TestUntil inside (see above). When you first readFrom("some-property") you will block until those actions have been completed, at which time you will get the return value of the property that you asked for. In the above example there are four properties including "size" and "object". When the test client uses readFrom("size") it may block, but the second one readFrom("object", 0) doesn't, because both are protected by the same TestUntil created with afterCompleting(1).

To create properties, register both writers and readers. The writers are all java.util.function.Consumer<T> expressions. The readers may be simple java.util.function.Supplier<T> expressions or java.util.function.Function<T,R> expressions. The properties are now available on both sides of the test.

Again inside the MockPersistResultInterest create the code that handles writing to properties when it receives a notification of some persistence results. It uses a property writer to set the value atomically and inside a lock. You don't have to know the details of the implementation inside AccessSafely, only that thread-safe property access is handled for you.

public void persistResultedIn( final Outcome outcome, final Object persistentObject, final int possible, final int actual, final Object object) { if (actual == 1) { access.writeUsing("add", persistentObject); } else if (actual > 1) { access.writeUsing("addAll", persistentObject); } else { throw new IllegalArgumentException(...); } }

This ensures that a barrier/fence is placed around each data property so each property can be both written to and read from in a thread-safe manner. Here is a code snippet from the test that reads the properties to assert on its expectations.

public void testThatObjectPersistsQuerys() { final AccessSafely persistAllAccess = persistInterest.afterCompleting(1); final Person person = new Person("Tom Jones", 85); objectStore.persist(person, persistInterest); final int persistSize = persistAllAccess.readFrom("size"); assertEquals(1, persistSize); assertEquals(person, persistAllAccess.readFrom("object", 0)); ... }

This simple tool will prevent what may appear to be impossible data values as test outcomes.

Publish Subscribe

Describe pub-sub facilities.

Routing and Routers

Routers offer a way to deliver a message to one or more other actors, according to a specified routing strategy. They allow you to decouple the source of a message from its destination and they function as a branching mechanism in the message channel between actors. Routers can be used to control and improve message throughput for a particular message protocol, to distribute workload among a pool of actors in a desirable way, or even both.

Based on it's routing strategy, a router will compute a "routing" which consists of an ordered list of one or more actors, referred to as "routees", to which a message will be dispatched for processing. The router may consider any state, including the message itself, in computing the routing.

Router Responsibilities

Routers have four main responsibilities:

  1. Maintain the list of actors that are currently subscribed as routees

  2. Support the subscription and unsubscription of routees

  3. Computing a routing for a given message according to some routing logic

  4. Dispatch messages to the actors who were selected to be members of the routing

Type Safety

Routers and routees are actors and, like all actors in vlingo, are type safe. Each will implement one or more messaging protocols (i.e., Java interfaces). A router and its routees must implement the same messaging protocol, though each may also implement other protocols that are not shared.

io.vlingo.actors.Router and its abstract subclasses are generic types whose type parameter <P> is the Java interface representing the message protocol implemented by the router and its routees:

public abstract class Router<P> extends Actor

Provided Routers

vlingo/actors provides several kinds of routers out-of-the-box that are mainly distinguished by the routing strategy they employ:

  • io.vlingo.actors.BroadcastRouter - dispatches every message to every routee

  • io.vlingo.actors.ContentBasedRouter - considers the content of the message in choosing which routee to dispatch the message to

  • io.vlingo.actors.RandomRouter - dispatches each message to a random one of the routees

  • io.vlingo.actors.RoundRobinRouter - dispatches each message to the next routee, in turn

  • io.vlingo.actors.SmallestMailboxRouter - dispatches each message to the routee with the least number of messages in its mailbox at the time

Each of these routers is provided as an abstract class that extends io.vlingo.actors.Router (which extends io.vlingo.actors.Actor).

It is possible, of course, to implement your own new kind of router by directly extending io.vlingo.actors.Router.

Routee Subscription

Routees must be subscribed to a router before they can be dispatched any messages. There are two approaches for subscribing routees:

  • Automatically during router initialization

  • Explicitly after router initialization

Automatic routee subscription is accomplished by passing a io.vlingo.actors.RouterSpecification with a non-zero initial pool size to your router's constructor. For more information , see the Creating a Router: Construction and Routee Pool Initialization section below.

Explicit routee subscription requires the router to expose the subscription (and unsubscription) protocol as a public method(s). Since all the routees must comply with the messaging protocol, create an interface providing the type safe subscription protocol, for example:

public interface InvoiceSubmitterSubscription {
void subscribe(InvoiceSubmitter submitter);
void unsubscribe(InvoiceSubmitter submitter);
}

The router then needs to implement this interface, self-delegating to the protected subscribe(Routee) and unsubscribe(Routee) methods defined in the io.vlingo.actors.Router superclass:

public class InvoiceSubmissionRouter extends ..... impements InvoiceSubmitterSubscription, ...{
public void subscribe(InvoiceSubmitter submitter) {
subscribe(Routee.of(submitter));
}
public void unsubscribe(InvoiceSubmitter submitter) {
unsubscribe(Routee.of(submitter));
}
}

Route Computation

The io.vlingo.actors.Router class provides a set of route computation methods:

<T1> Routing<P> routingFor(final T1 routable1)
<T1, T2> Routing<P> routingFor(final T1 routable1, final T2 routable2)
<T1, T2, T3> Routing<P> routingFor(final T1 routable1, final T2 routable2, final T3 routable3)
<T1, T2, T3, T4> Routing<P> routingFor(final T1 routable1, final T2 routable2, final T3 routable3, final T4 routable4)

Most types of routers do not base routing decisions on the message being routed because their routing logic is more mechanical (e.g., round robin) or is based on the state of the routees (e.g., smallest mailbox).

For this reason, the default implementation of the above four route computation methods simply self-delegates to the method:

protected abstract Routing<P> computeRouting();

You will find that each subclass of io.vlingo.actors.Router will provide a concrete implementation of this method that encodes its particular routing strategy logic.

The exception is io.vlingo.actors.ContentBasedRouter for which, by definition, the routing logic depends on the specific message being routed. For this reason, ContentBasedRouter implements computeRouting() to throw java.lang.UnsupportedOperationException. Instead, you will need to override one of the routingFor() methods listed above and encode your routing logic there.

Router Message Dispatching

As noted previously, one of the responsibilities of routers is to dispatch messages to the actors that were selected as routees. The abstract class io.vlingo.actors.Router provides two sets of message dispatching protocol:

  • Command dispatching

  • Query dispatching

The command dispatching protocol is used to dispatch messages that may have arguments but do not have a return value, for example void submitInvoice(Invoice):

<T1> void dispatchCommand(final BiConsumer<P, T1> action, final T1 routable1)
<T1, T2> void dispatchCommand(final TriConsumer<P, T1, T2> action, final T1 routable1, final T2 routable2)
<T1, T2, T3> void dispatchCommand(final QuadConsumer<P, T1, T2, T3> action, final T1 routable1, final T2 routable2, final T3 routable3)
<T1, T2, T3, T4> void dispatchCommand(final PentaConsumer<P, T1, T2, T3, T4> action, final T1 routable1, final T2 routable2, final T3 routable3, final T4 routable4)

The command dispatching methods take a Consumer (function) as the first argument, and then from 1 to 4 objects that will be the arguments to the Consumer function. These objects map to the arguments of the messaging protocol method.

The query dispatching protocol is used to dispatch messages that may have arguments and do return a value, for example Price computePrice(ProductID, CustomerID):

<T1, R extends Completes<?>> R dispatchQuery(final BiFunction<P, T1, R> query, final T1 routable1)
<T1, T2, R extends Completes<?>> R dispatchQuery(final TriFunction<P, T1, T2, R> query, final T1 routable1, final T2 routable2)
<T1, T2, T3, R extends Completes<?>> R dispatchQuery(final QuadFunction<P, T1, T2, T3, R> query, final T1 routable1, final T2 routable2, final T3 routable3)
<T1, T2, T3, T4, R extends Completes<?>> R dispatchQuery(final PentaFunction<P, T1, T2, T3, T4, R> query, final T1 routable1, final T2 routable2, final T3 routable3, final T4 routable4)

The query dispatching methods take a Function as the first argument, and then from 1 to 4 objects that will be the arguments to the Function. These objects map to the arguments of the messaging protocol method.

Implementing a Router in Your Application

Implementing a router in your application is straight forward. You'll need to:

  • Create the message protocol that your router and routees will support as a Java interface

  • Create your routee actor

  • Create your router actor by extending one of the abstract subclasses of io.vlingo.actors.Router

Create the Message Protocol

As an example, let's say you are implementing an invoice submission process and there are several ERP systems to which any given invoice might need to be submitted. Your router will route each Invoice to one of several InvoiceSubmitter actors, each of which represents a particular ERP system. The selected routing will likely depend on information in the Invoice itself (e.g., customer ID).

To support this scenario, you might design the following protocol:

public interface InvoiceSubmitter {
void submitInvoice(Invoice invoice);
}

Create the Routee Actor(s)

Now that you have the InvoiceSubmitter protocol defined, you can create the actor(s) to which your router will route messages. These actors will be the ones actually doing the work, in our case submitting an Invoice to a specific ERP system.

Let's assume ERP systems in your company are named after Greek mythological gods, such as Apollo and Atlas, and that for various reasons you decide to create a different actor per ERP system.

Here is the rough definition of the actor for the Apollo ERP invoice submission:

public class ApolloInvoiceSubmitter extends Actor implements InvoiceSubmitter {
public void submitInvoice(Invoice invoice) {
//do some work here
}
}

Your actor for Atlas and other ERPs would have a similar structure but different logic in their submitInvoice methods.

Create a Router: Extending a Built-in Router

As noted above, vlingo/actors provides a number of built-in implementations of io.vlingo.actors.Router that you can extend.

In our example, the routing will be content-based, because the router will use information from the message itself (e.g., the customer ID of the Invoice) to select which ERP-specific routee to dispatch the message to. As such, we will extend io.vlingo.actors.ContentBasedRouter:

public class InvoiceSubmissionRouter
extends ContentBasedRouter<InvoiceSubmitter>
implements InvoiceSubmitter {
...
}

Creating a Router: Construction and Routee Pool Initialization

The io.vlingo.actors.Router class requires that a io.vlingo.actors.RouterSpecification be provided as a constructor argument and offers subclasses an opportunity to get involved in the initialization of the routee pool:

public Router(final RouterSpecification<P> specification) {
this.routees = new ArrayList<Routee<P>>();
initRoutees(specification);
}

By default, the information in the RouterSpecificaiton will be used to create an initial (possibly empty) pool of routee actors as child actors of the router actor. Subclasses may customize that behavior by overriding initRoutees(RouterSpecification).

Often, your concrete router class can make these decisions for itself by implementing a constructor that supplies them. For example:

public OrderRouterActor(final int poolSize) {
super(
new RouterSpecification(
poolSize,
Definition.has(OrderRouterWorker.class, Definition.NoParameters()),
OrderRouter.class)
);
}

Creating a Router: Routing Logic

In the Route Computation section above, we learned that subclasses of io.vlingo.actors.ContentBasedRouter need to override one of the routingFor() methods inherited from io.vlingo.actors.Router and implement the routing logic there. The routingFor() method variants provide access to the message arguments so that the router can inspect that information as part of its routing logic.

In our example, the message protocol void submitInvoice(Invoice invoice) has a single argument, so we can override the single-argument routingFor() method:

@override
protected <T1> Routing<P> routingFor(final T1 routable1) {
Invoice invoice = (Invoice) routable1;
InvoiceSubmitter submitter = selectERPInvoiceSubmitter(invoice.customerID);
return Routing.with(submitter);
}
private InvoiceSubmitter selectERPInvoiceSubmitter(CustomerID customerID) {
//select one of the subscribed InvoiceSubmitter actors based on CustomerID
}

Note that both InvoiceSubmissionRouter and ERPInvoiceSubmitter implement the InvoiceSubmitter interface, thereby establishing the type safe messaging protocol that they share, which allows the router to forward messages to the routees in a type safe way.

Creating a Router: Dispatching

Implementing dispatching logic requires implementing the message protocol and calling one of the methods discussed in the Router Message Dispatching section above.

The InvoiceSubmitter protocol declares the one-argument command method void submitInvoice(Invoice), so we we need to provide an implementation that calls the dispatchCommand method inherited from io.vlingo.actors.Router that accommodates one argument:

protected <T1> void dispatchCommand(final BiConsumer<P, T1> action, final T1 routable1)

Notice that the first argument to each dispatchCommand (and dispatchQuery) method is a function. vlingo/actors supplies variants of these dispatching methods for 1 to 4 arity functions. Any lambda expression matching the signature of the dispatching method can be passed.

Here, then, is the dispatching logic of our InvoiceSubmissionRouter:

@Override
public void submitInvoice(final Invoice invoice) {
dispatchCommand(InvoiceSubmitter::submitInvoice, invoice);
}

In the implementation above, we take advantage of the fact that Java method references are shorthand for lambda expressions that call the same method - in this case the void submitInvoice(Invoice) method of InvoiceSubmitter.

Creating a Router: Subscribing Routees

As noted in the "Creating a Router: Construction and Routee Pool Initialization" section above, one easy way of subscribing routees is to pass a RouterSpecification with a non-zero initial pool size to your router's constructor. The actor Definition in the RouterSpecification will be used by your router to create and subscribe a number of child actors that implement your routing message protocol. This approach works well when the routees can all be instances of the same class.

Sometimes, such as in our invoice submission example, the routees need to be instances of different classes that all implement the same message protocol. In our example, the InvoiceSubmitter actors for the Apollo and Atlas ERPs were implemented as different classes. In such cases, actors need to be explicitly subscribed to the router. This requires the router to provide public un/subscription protocol, as described in the Routee Subscription section above.