This XOOM LATTICE component provides an API for distributed computations and data processing across multiple nodes in a cluster. It supports distributed parallel processing by sending computational execution requests to actors on any node in a XOOM CLUSTER with the potential to receive results in return, if the protocol specifies a completable future outcome. This as well as elastic resiliency are embodied in the XOOM LATTICE Grid.
The XOOM LATTICE Grid is a distributed compute construct that is implemented as a XOOM ACTORS
Grid API is the same as you would expect from any
Stage with one exception: the
Actor implementations that should be started on a
Grid must have a grid-compatible
Address. There are no other requirements, such as implementing a special interface or extending an abstract base class.
To start the
Grid use one of the static
Grid.start(...) methods. You may then start any actor that has a grid-compatible
Address as you would normally do on a
final Grid grid = Grid.start("product-service", "product-grid");final Product product = grid.actorFor(Product.class, ProductEntity.class);
In the above example the
ProductEntity actor instance is assigned a grid-compatible
Address, and is therefore reachable by any message sender on the
All XOOM LATTICE base model types are compatible with the grid without any changes. The following are the types.
// model entitiesimport io.vlingo.lattice.model.sourcing.EventSourced;import io.vlingo.lattice.model.sourcing.CommandSourced;import io.vlingo.lattice.model.object.ObjectEntity;import io.vlingo.lattice.model.stateful.StatefulEntity;// model processes (a.k.a. sagas)import io.vlingo.lattice.model.process.ObjectProcess;import io.vlingo.lattice.model.process.SourcedProcess;import io.vlingo.lattice.model.process.StatefulProcess;
Use all of these base types as you would if not using the grid.
Each node in the grid has a representative abstraction name
GridNode in the cluster has a copy of a hash ring data structure. Without going into details on how a hash ring works, it is used to determine where in the cluster a given actor is to be distributed and subsequently found within the grid. An actor's node location is determined by where the hash of its
Address distributes on the hash ring data structure. For example, if there are three nodes in the cluster, each actor
Address must hash into one of the three nodes. This means that a given actor purposely has a single instance in a single grid node in the cluster; that is, that one actor instance is pinned to a given node.
When a message is sent to a grid-compatible actor, the grid node of the message sender looks up in the hash ring for the node on which that actor is located. The message is then serialized and sent over the network to that node. The receiving grid node searches up the actor locally and delivers the message through the actor's mailbox.
There is, of course, an optimization if the message is sent to an actor that is on the same node as the sender. Such a condition delivers the message directly through the target actor's mailbox without any network overhead.
There are several grid operational scenarios to be aware of. Although service and application developers will not need to do anything special when creating actors in the grid and sending messages from many client actors to many target actors, it is useful to understand the common scenarios.
An actor on a given grid node sends a message to an actor on a grid node somewhere in the cluster. The sender has a protocol reference to the target actor and sends a message via a protocol method invocation. The target actor's grid-compatible
Address is used to look up the grid node on which it is pinned. The message is serialized, sent across a network socket channel to the node of the target actor, deserialized, and delivered to the actor via its mailbox.
It is possible/likely that some sender actors will be on the same node as the target actor. In that case, the message send is optimized and delivered directly to the actor's mailbox.
A node that experiences a network partition will fail to receive continuous health updates from other nodes that can no longer see it. The partitioned node will be considered lost from the other nodes that can't see it, and that node will in short time understand that it has lost the cluster quorum with the other cluster nodes (as per the XOOM CLUSTER specification). This partitioned node then enters into an idle state. When the node understands that it is in an idle state it will provide this information to the clients running in the local node.
In some network partition cases any N nodes out of M total may see each other, and yet there must be P nodes excluded from the cluster. In the case of a three-node cluster, N=2, M=3, and P=1.
One of the N nodes will be elected leader of the quorum. Any node in the set of P that is not seen by the leader will be prevented from entering the cluster. Should any partitioned node within the set of P claim to be leader among the N and/or P nodes that it can see (it has the highest value node id among any other nodes it can see), the nodes that receive its declaration of leadership will reject that claim. In turn any of the rejecting nodes will inform the failed leadership-declaring node (within P) of the real leader (within N) that it recognizes. This is strictly handled by the cluster and involves the grid only to provide information that it is not in the cluster quorum.
A node that is downed within the cluster has left it, which will be communicated to all remaining nodes.
The actors that were distributed to the node that left must continue to operate within the grid on the nodes still in the cluster quorum. Thus, the actors that would otherwise be lost can be recovered onto a different node. Consider the following types of recovery.
Actors with ephemeral or transient state, and model actors that have persistent state, will be recovered on a different node according to the adjusted hash ring. This will not actually occur until a message is sent to any such actor.
Actors that have non-transient state but that are not persistent model actor types cannot be fully recovered on other nodes with the state they previously held on the downed node. One way to ensure that the actor state can be maintained is to place in a XOOM LATTICE Space. A Space is a distributed, in-memory, key-value store. As long as the cluster nodes remain running on which a given Space key-value is persistent in memory, any actor using that Space key-value can restore from that state as needed.
A node that newly joins a cluster will cause the hash ring to adjust to that node, meaning that the actors that were on any preexisting nodes may be repartitioned on to one or more other nodes. In other words, a node joining the cluster changes the hashing results because there are now more nodes than previously available.
This repartitioning could be quite expensive if the actors were to be immediately relocated. Instead what happens is the same that occurs when a cluster node leaves. The recovery onto a different node will occur only when a message is sent to any such relocated actor.
The implementation of the typical scenario is: when the cluster node updates its hash ring to include the newly joined node, it scans its internal directory for actors whose Address indicates that they no longer belong, and evicts those actors. The actors evicted from that node will not be repartitioned until receiving their next message. Any messages for the evicted actors that are received latently are relayed to the node that now contains the given actors.
There are a few challenges to this typical scenario.
An evicted actor with messages still in its mailbox must be given the opportunity to process those messages. Thus, before evicting such an actor, its remaining mailbox messages must be sent to the grid node that is now responsible for it.
It is possible that when multiple nodes are joining within a short time frame, or one or more nodes leave the cluster near the same time as one or more joining, there may be some shuffling of actors until the repartitioning settles. The best that a given evicting node can do is relay received messages to actors that it is no longer responsible for to the node it currently understands to be the one hosting such actors. This will cause some additional latency.
There is no way to avoid these challenges, because there is no way for a given node to predict that something about the cluster will change just following its update of its own hash ring.
Yet, these challenges do emphasize that the eviction process must be asynchronous to the ongoing receipt of cluster health and actor targeted messages. That enables the node to immediately update its hash ring to its current understanding of possible ongoing cluster changes. As a result, the asynchronous eviction process has the opportunity to check for a possible refreshed hash ring as it decides whether or not to evict a given actor, and if evicted, which node should receive it's messages—at least as of that very instant in time.