The VLINGO/CLUSTER is a key component that sits on top of VLINGO/ACTORS to support the development of scalable and fault-tolerant tools and applications. Additional tools that build out the VLINGO/PLATFORM will almost always be constructed on top of VLINGO/CLUSTER. You should also implement and deploy your services/applications in clusters.
Consider the following documentation specifically the how-to guides for VLINGO/CLUSTER; that is, how to configure it, how to run it, how it's designed, how it works. For practical use, including actor-based compute grid, service/application partitioning and scale, and cluster-wide data caching, see VLINGO/LATTICE.
Besides scalable fault-tolerance, the VLINGO/CLUSTER also provides cluster-wide, synchronizing attributes of name-value pairs. This enables the cluster to share live and mutating operational state among all nodes. Likewise, application-level services can also make use of cluster-wide, synchronizing attributes in order to enhance shared application/service values.
Referring to the cluster diagram, if one of the three nodes is lost, the cluster will still maintain a quorum and remain healthy. However, if two nodes are lost and only one node remains in a running state, the quorum is lost and the cluster is considered unhealthy. In that case the one remaining node will enter an idle state and await one or more of the other nodes to return to active operation. When that occurs, the cluster will again constitute a quorum and reach a healthy state. Although many service/application clusters will require only three nodes for optimal use, clusters can support far more than three nodes. Yet, because of the performance and efficiencies of our platform, you may rarely need many nodes. Even a 9-node, 21-node, or 49-node cluster may be considered quite large due to our efficiency standards.
You must first configure the cluster for your intended use. Currently this involves creating a Java vlingo-cluster.properties
file, which looks like this.
The different properties are explained next.
Property | Value | Default |
| The maximum size of a single operational message, which are actually tiny, other than DIR. Assuming short host names, 4096 would support approximately 90-99 nodes with DIR. | |
| The interval (in milliseconds) within which the operations inbound stream will be probed for available messages. | 100 |
| The maximum size of a single cluster client (tool, application, service) message. You may be able to tune this to be much smaller depending on the size of application/service messages. | 10240 |
| The interval (in milliseconds) within which the application inbound stream will be probed for available messages. | 10 |
| The number of polled buffers for outgoing asynchronous operations messages. | 20 |
| The number of polled buffers for outgoing asynchronous application messages. | 50 |
| The default Java character set type. | UTF-8 |
| The fully-qualified name of the | |
| The name of the | |
| The interval in milliseconds within which the new and changed application/service level attributes maintained by the cluster are distributed and redistributed. | 1000 |
| The number of retries for redistributing unconfirmed attribute changes. | 20 |
| The interval in milliseconds within which each cluster node reports its health. | 2000 |
| The limit in milliseconds at which point a node not reporting its health check pulse is considered dead (inactive). | |
| The limit in milliseconds at which point the cluster without a quorum of nodes is considered dead, the running nodes are put into inactive state. | |
| The names of the configured nodes that make up the entire cluster. Per the example below, an example is: | |
| Currently not supported. | |
Per the above described cluster.seedNodes
property, each of the nodes must have a section defining their individual configuration by node name, which looks like this.
These sections have the name patternnode.nodename.property
and their associated values, where nodename
is the named node and property
is the specific property within that node. The specific properties and names are as follows.
Node Description Property | Value |
| The unique identity of the given node with the name |
| The unique name of the node that must match one of the names in the |
| The DNS name of the network host of the node with name |
| The port number of the port that is assigned to the cluster node's incoming operational socket channel. |
| The port number that is assigned to the cluster node's incoming application/service socket channel. |
All of the above configurations should be tuned and assigned specifically for your cluster's operations. You should have enough nodes to form a cluster of an uneven number of nodes greater than one. This enables the cluster to establish a quorum of nodes. In the above example, the three total nodes would constitute a quorum, but if one of the three nodes would go down (fail or otherwise be stopped) the two remaining nodes would also form a quorum. However, if only one node of the three remains, this would not constitute a quorum. Your clusters should have 3, 5, 7, 9, or such odd number of nodes.
Each cluster node must have a configured cluster application Stage
and Class
. These are defined above as cluster.app.stage
and cluster.app.class
. See the above respective integration configurations for details. Any class must be an Actor
, which is extended by the ClusterApplicationAdapter
abstract base class. This adapter enables you to override only the integration methods of interest. The following example is the MyClusterApplication
that represents the supported integration interface, which demonstrates each available integration method.
public class MyClusterApplication extends ClusterApplicationAdapter {private AttributesProtocol client;private final Node localNode;public MyClusterApplication(final Node localNode) {this.localNode = localNode;}@Overridepublic void start() {logger().debug("APP: ClusterApplication started on node: " + localNode);}@Overridepublic void handleApplicationMessage(final RawMessage message, final ApplicationOutboundStream responder) {logger().debug("APP: Received application message: " + message.asTextMessage());}@Overridepublic void informAllLiveNodes(final Collection<Node> liveNodes, final boolean isHealthyCluster) {for (final Node id : liveNodes) {logger().debug("APP: Live node confirmed: " + id);}printHealthy(isHealthyCluster);}@Overridepublic void informLeaderElected(final Id leaderId, final boolean isHealthyCluster, final boolean isLocalNodeLeading) {logger().debug("APP: Leader elected: " + leaderId);printHealthy(isHealthyCluster);if (isLocalNodeLeading) {logger().debug("APP: Local node is leading.");}}@Overridepublic void informLeaderLost(final Id lostLeaderId, final boolean isHealthyCluster) {logger().debug("APP: Leader lost: " + lostLeaderId);printHealthy(isHealthyCluster);}@Overridepublic void informLocalNodeShutDown(final Id nodeId) {logger().debug("APP: Local node shut down: " + nodeId);}@Overridepublic void informLocalNodeStarted(final Id nodeId) {logger().debug("APP: Local node started: " + nodeId);}@Overridepublic void informNodeIsHealthy(final Id nodeId, final boolean isHealthyCluster) {logger().debug("APP: Node reported healthy: " + nodeId);printHealthy(isHealthyCluster);}@Overridepublic void informNodeJoinedCluster(final Id nodeId, final boolean isHealthyCluster) {logger().debug("APP: " + nodeId + " joined cluster");printHealthy(isHealthyCluster);}@Overridepublic void informNodeLeftCluster(final Id nodeId, final boolean isHealthyCluster) {logger().debug("APP: " + nodeId + " left cluster");printHealthy(isHealthyCluster);}@Overridepublic void informQuorumAchieved() {logger().debug("APP: Quorum achieved");printHealthy(true);}@Overridepublic void informQuorumLost() {logger().debug("APP: Quorum lost");printHealthy(false);}@Overridepublic void informAttributesClient(final AttributesProtocol client) {logger().debug("APP: Attributes Client received.");this.client = client;if (localNode.id().value() == 1) {client.add("fake.set", "fake.attribute.name1", "value1");client.add("fake.set", "fake.attribute.name2", "value2");}}@Overridepublic void informAttributeSetCreated(final String attributeSetName) {logger().debug("APP: Attributes Set Created: " + attributeSetName);}@Overridepublic void informAttributeAdded(final String attributeSetName, final String attributeName) {final Attribute<String> attr = client.attribute(attributeSetName, attributeName);logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Added: " + attributeName + " Value: " + attr.value);if (localNode.id().value() == 1) {client.replace("fake.set", "fake.attribute.name1", "value-replaced-2");client.replace("fake.set", "fake.attribute.name2", "value-replaced-20");}}@Overridepublic void informAttributeRemoved(final String attributeSetName, final String attributeName) {final Attribute<String> attr = client.attribute(attributeSetName, attributeName);logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Removed: " + attributeName + " Attribute: " + attr);}@Overridepublic void informAttributeSetRemoved(final String attributeSetName) {logger().debug("APP: Attributes Set Removed: " + attributeSetName);}@Overridepublic void informAttributeReplaced(final String attributeSetName, final String attributeName) {final Attribute<String> attr = client.attribute(attributeSetName, attributeName);logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Replaced: " + attributeName + " Value: " + attr.value);if (localNode.id().value() == 1) {client.remove("fake.set", "fake.attribute.name1");}}private void printHealthy(final boolean isHealthyCluster) {if (isHealthyCluster) {logger().debug("APP: Cluster is healthy");} else {logger().debug("APP: Cluster is NOT healthy");}}}
You would implement your own cluster application, overriding the methods that interest you.
Note that the above MyClusterApplication
has a property named client
, which is an AttributesProtocol
. The AttributesProtocol
is the gateway to establishing the use of cluster-wide attributes. This client
has the means to add new attributes, modify existing ones, and remove them.
public class MyClusterApplication extends ClusterApplicationAdapter {private AttributesProtocol client;...client.add("Set1", "Attribute1", "value1");...client.replace("Set1", "Attribute1", "value2");...client.remove("Set1", "Attribute1");}
Here "Set1"
is the name of a set of attributes with a specific name. All attributes added must be within a named set. The "Attribute1"
is the name of an attribute within "Set1"
. The "value1"
is the initial value of "Attribute1"
and "value2"
is a different value that is later assigned to "Attribute1"
. As these attribute setting operations take place, the sets of attributes are replicated across nodes within the cluster. Revisiting the MyClusterApplication
lifecycle messages in the above code shows how each node in the cluster is informed of newly added, replaced, and removed attributes.
An attribute may be given using various types. As expected these include: Byte
, Short
, Integer
, Long
, Character
, Float
, Double
, Boolean
, and String
. Currently there is no support for more complex objects.
Any object with a reference to the AttributesProtocol
or AttributesClient
(a non-Actor
implementation of AttributesProtocol
) may add new attributes, modify existing ones, and remove them. The same protocol provides the means to query for current attribute sets and attributes within specific sets.
final Collection<AttributeSet> allSets = client.all();final Collection<Attribute> allSetAttributes = client.allOf("Set1");
Since the client is purposely not backed by an Actor
the query results will be immediate. The contents of the Collection
results will be whatever the node currently has and will not reflect changes pending delivery from other nodes.
You must start each node in the cluster. The cluster will not become fully operational until a quorum of nodes are started and capable of communicating across their operational socket channels.
The main()
startup class within the vlingo-cluster JAR file is:
io.vlingo.cluster.NodeBootstrap
To start each node in the cluster, use the following commands. The first command may be used with Maven.
maven> mvn exec:java -Dexec.args=node1
This second command would be used directly with the Java VM (actually setting the classpath
as appropriate).
java> java -classpath path1:path2 io.vlingo.cluster.NodeBootstrap node1
The separator you should use between classpath
entries is determined by your runtime environment. For Un*x flavored environments this is :
as show above. For Windows this is the ;
separator, such as path1;path2
.
These two example commands start only the node named node1
. You would perform a startup for each of the nodes named and configured in the Java vlingo-cluster.properties
.
Start each node by providing the node's name, as explained above, as the parameter to the main()
class NodeBootstrap
.
Generally a cluster will be composed of multiple nodes of an odd number (not just one, but for example, 3, 5, 21, 49, or 127). The reason for choosing an odd number of nodes is to make it possible to determine whether there is a quorum of nodes (totalNodes / 2 + 1
) that can form a healthy cluster. Choosing an even number of nodes works, but in that case, when loosing one node, it doesn’t improve the quorum determination, nor does it strengthen the cluster when all nodes are available. Also configuring an even number of nodes can lead to a big problem, know as split-brain, with potentially disastrous consequences.
When a quorum of nodes is available and the individual nodes are communicating with one another, a leader is elected. The leader is responsible for making certain decisions in behalf of the cluster, and also announces newly joined nodes and those that have left the cluster.
The VLINGO/CLUSTER works in full-duplex mode, with two kinds of cluster node communication channels. There are the operational channels, which the cluster nodes use to maintain the health of the cluster. There are also application channels, which the services/applications use to pass messages between nodes. Using two different channel types allows for tuning each type in different ways. For example, application channels may be assigned to a faster network than the operational channels. It also opens the possibility for either channel type to use TCP or UDP independently of the other channel type.
The cluster consensus protocol is one known by the name Bully Algorithm. Although an unpleasant name, the protocol is simple but powerful. This consensus protocol chooses a leader by determining the node with the "greatest id value." The identity may be numeric or alphanumeric. If numeric the "greatest node id" is determined with numeric comparisons. If alphanumeric, the "greatest node id" is determined lexicographically.
Consider these node ids: Node1
, Node2
, Node3
. Of these three node ids, Node3
is lexicographically greatest, Node2
is next greatest, and Node1
is least greatest.
Any node that senses that the previous leader has been lost (left the cluster for any reason), messages the other known "nodes with greater ids" asking for an election to take place. Any "nodes with greater ids" tell the "lesser id node" that it will not be the leader. Finally when there are no nodes of "greater id" than a given node--known because it receives no "you aren't the leader" messages--that "greatest id node" declares itself leader. In essence the "node of the greatest id" bullies its way into the leadership position, asserting that it is currently greatest.
Any latent message received from a "node of even greater id" than the current declared leader will not take leadership away from it, because it is a healthy leader node. In other words, having the greatest node id value of all does not guarantee that the node will become cluster leader. Sometimes it is a matter of the node responding most rapidly that has a node id greater than all others, once a quorum of nodes is seen, will be voted leader.
It's possible that our consensus protocol will be augmented in the future with the Gossip Protocol to support very large clusters. We also have the option to swap out the Bully Algorithm for another, or to provide multiple consensus strategies.
A split-brain cluster occurs when a single cluster has two or more leaders. The VLINGO/CLUSTER prevents this from occurring by means of its cluster quorum and voting policies, and algorithm. Given that a cluster must have a full complement of nodes, or at least a quorum of nodes, how is a split-brain prevented?
Consider a three-node cluster, composed of Node1
, Node2
, Node3
. Per the Bully Algorithm, the greatest value node, Node3
, takes leadership. Rules: Three nodes is a full healthy cluster; two nodes is a quorum and still healthy; one node alone is not a quorum nor healthy. Now consider the following scenario.
Node3
is partitioned by the network, and cannot see Node1
or Node2
Node1
and Node2
remain and see each other; and Node2
as the greatest value node is made leader
When the partition that hid Node3
is healed between it and Node1
, but not with Node2
, Node3
will see itself as the greatest value node and vote itself into leadership, which is broadcast to Node1
(if Node1
accepts Node3
as leader, a split-brain occurs because Node2
will be the leader of the Node2+Node1
cluster, and Node3
will be the leader of Node3+Node1
cluster)
The vote for Node3
leadership is received by Node1
, but Node1
is already part of a healthy cluster with Node2
as leader
Node1
sends a message to Node3
indicating that it rejects Node3
as leader and that Node2
is already leader
Although Node3
cannot yet see Node2
, it accepts that it is not currently leader; because it cannot yet see Node2
it will continually retry leadership voting, but as long as Node1
sees Node2
as leader, Node1
repeats messaging Node3
about rejection and current leadership
Finally Node3
will see Node2
and Node2
will inform that it is the leader; Node3
will accept Node2
as leader and follow
In the above scenario consider one changed condition.
Node2
is downed before Node3
sees Node2
as leader
Node1
remains alive, but its cluster with Node2
is lost
When next Node3
sends a leadership vote that is received by Node1
, Node1
accepts Node3
as leader
When Node2
is available again it will be informed that Node3
is the leader; should Node2
see only Node1
and vote for itself to lead, Node1
will reject Node2
as leader and inform that Node3
is leading
The scenario just stated has the same eventual result as the one previous to it, but with Node2
in the role that Node3
played there.