vlingo/cluster

Scaling your services with resilience using vlingo/cluster.

Resiliency and Scale

The vlingo/cluster is a key component that sits on top of vlingo/actors to support the development of scalable and fault-tolerant tools and applications. Additional tools that build out the vlingo/platform will almost always be constructed on top of vlingo/cluster. Additionally, you will implement and deploy your services/applications in clusters.

In addition to scalable fault-tolerance, the vlingo/cluster also provides cluster-wide, synchronizing attributes of name-value pairs. This enables the cluster to share live and mutating operational state among all nodes. Likewise, application-level services can also make use of cluster-wide, synchronizing attributes in order to enhance shared application/service values.

Referring to the cluster diagram, if one of the three nodes is lost, the cluster will still maintain a quorum and remain healthy. However, if two nodes are lost and only one node remains in a running state, the quorum is lost and the cluster is considered unhealthy. In that case the one remaining node will enter an idle state and await one or more of the other nodes to return to active operation. When that occurs, the cluster will again constitute a quorum and reach a healthy state. Although many service/application clusters will require only three nodes for optimal use, clusters can support far more than three nodes. Yet, because of the performance and efficiencies of our platform, you may rarely need many nodes. Even a 9-node, 21-node, or 49-node cluster may be considered quite large due to our efficiency standards.

Using the vlingo/cluster

You must first configure the cluster for your intended use. Currently this involves creating a Java vlingo-cluster.properties file that is similar to the following, which is available under src/test/resources of the vlingo-cluster project.

################################
# cluster-wide configurations
################################
# maximum size of single operations message (which are actually tiny, other than DIR)
# assuming short host names 4096 would support approximately 90-99 nodes with DIR
cluster.op.buffer.size = 4096
# the interval (in ms) within which the operations inbound stream will be probed
# for available messages
cluster.op.incoming.probe.interval = 100
# maximum size of a single cluster client (tool or application) message
# you may be able to tune this to be much smaller depending on app messages
cluster.app.buffer.size = 10240
# the interval (in ms) within which the application inbound stream will be probed
# for available messages
cluster.app.incoming.probe.interval = 10
# number of polled buffers for outgoing asynchronous operations messages
cluster.op.outgoing.pooled.buffers = 20
# number of polled buffers for outgoing asynchronous operations messages
cluster.app.outgoing.pooled.buffers = 50
# default charset
cluster.msg.charset = UTF-8
# classname of client/application and its stage name
cluster.app.class = io.vlingo.cluster.model.application.FakeClusterApplicationActor
cluster.app.stage = fake.app.stage
# interval at which unconfirmed attribute changes are redistributed
cluster.attributes.redistribution.interval = 1000
# the number of retries for redistributing unconfirmed attribute changes
cluster.attributes.redistribution.retries = 20
# interval at which each health check is scheduled
cluster.health.check.interval = 2000
# after this limit with no pulse from given node, it's considered dead
cluster.live.node.timeout = 20000
# after this limit with too few nodes to constitute a quorum, terminate node
cluster.quorum.timeout = 60000
# currently unsupported
cluster.ssl = false
# currently all active nodes must be listed as seed nodes
cluster.seedNodes = node1,node2,node3
################################
# individual node configurations
################################
node.node1.id = 1
node.node1.name = node1
node.node1.host = localhost
node.node1.op.port = 37371
node.node1.app.port = 37372
node.node2.id = 2
node.node2.name = node2
node.node2.host = localhost
node.node2.op.port = 37373
node.node2.app.port = 37374
# highest id, default leader
node.node3.id = 3
node.node3.name = node3
node.node3.host = localhost
node.node3.op.port = 37375
node.node3.app.port = 37376

The sections are outlined next.

Cluster Limits, Timing, and Integration Configurations

Property

Value

cluster.op.buffer.size

The maximum size of a single operational message, which are actually tiny, other than DIR. Assuming short host names, 4096 would support approximately 90-99 nodes with DIR.

cluster.op.incoming.probe.interval

The interval (in milliseconds) within which the operations inbound stream will be probed, for available messages. The default is 100 milliseconds.

cluster.app.buffer.size

The maximum size of a single cluster client (tool, application, service) message. The default is 10240. You may be able to tune this to be much smaller depending on the size of application/service messages.

cluster.app.incoming.probe.interval

The interval in milliseconds within which the application inbound stream will be probed for available messages. The default is 10 milliseconds.

cluster.op.outgoing.pooled.buffers

The number of polled buffers for outgoing asynchronous operations messages. The default is 20.

cluster.app.outgoing.pooled.buffers

The number of polled buffers for outgoing asynchronous operational messages. The default is 50.

cluster.msg.charset

The default Java character set type. The default is UTF-8.

cluster.app.class

The fully-qualified name of the Class that serves as the client application/service instance used to integrate with the cluster. This class must implement the interface ClusterApplication, or extend the class ClusterApplicationAdapter. For example, com.myapplication.cluster.MyClusterAppcould be the name of the class.

cluster.app.stage

The name of the Stage within which the client application/service instance is created. For example, myapplication.stage, may be the name of the Stage within which thecom.myapplication.cluster.MyClusterApp instance is created.

cluster.attributes.redistribution.interval

The interval in milliseconds within which the new and changed application/service level attributes maintained by the cluster are distributed and redistributed. The default is 1000 milliseconds (1 second).

cluster.attributes.redistribution.retries

The number of retries for redistributing unconfirmed attribute changes. The default is 20.

cluster.health.check.interval

The interval in milliseconds within which each cluster node reports its health. The default is 2000 (2 seconds).

cluster.live.node.timeout

The limit in milliseconds at which point a node not reporting its health check pulse is considered dead (inactive).

cluster.quorum.timeout

The limit in milliseconds at which point the cluster without a quorum of nodes is considered the running nodes are put into inactive state.

cluster.seed.nodes

The names of the configured nodes that make up the entire cluster. Per the example below, and example is:

cluster.seedNodes = node1,node2,node3

cluster.ssl

Currently not supported.

Cluster Node Configurations

Per the above described cluster.seed.nodes property, each of the nodes must have a section defining their individual configuration by node name. This section is repeated here for reference to the following table that defines each property.

################################
# individual node configurations
################################
node.node1.id = 1
node.node1.name = node1
node.node1.host = localhost
node.node1.op.port = 37371
node.node1.app.port = 37372
node.node2.id = 2
node.node2.name = node2
node.node2.host = localhost
node.node2.op.port = 37373
node.node2.app.port = 37374
# highest id, default leader
node.node3.id = 3
node.node3.name = node3
node.node3.host = localhost
node.node3.op.port = 37375
node.node3.app.port = 37376

Each node named in the cluster.seed.nodes must have a section with the name patternnode.nodename.property and associated values, where nodename is the named node and property is the specific property within that node. The specific properties and names are as follows.

Node Description Property

Value

node.nodename.id

The unique identity of the given node with the name nodename.

node.nodename.name

The unique name of the node that must match one of the names in the cluster.seed.nodes property. Cy

node.nodename.host

The DNS name of the network host of the node with the name nodename.

node.nodename.op.port

The port number of the port that is assigned to the cluster node's incoming operational socket channel.

node.nodename.app.port

The port number of the port that is assigned to the cluster node's incoming application/service socket channel.

The all of the above configurations should be tuned and assigned specifically for your cluster's operations. You should have enough nodes to form a cluster of an uneven number of nodes greater than one. This enables the cluster to establish a quorum of nodes. In the above example, the three total nodes would constitute a quorum, but if one of the three nodes would go down (fail or otherwise be stopped) the two remaining nodes would also form a quorum. However, if only one node of the three remains, this would not constitute a quorum. Your clusters should have 3, 5, 7, 9, or such odd number of nodes.

Cluster Application

Each cluster node must have a configured cluster application Stage and Class. These are defined above as cluster.app.stage and cluster.app.class. See the above respective integration configurations for details. Any class must be an Actor, which is extended by the ClusterApplicationAdapter abstract base class. This adapter enables you to override only the integration methods of interest. The following example is the MyClusterApplication that represents the supported integration interface, which demonstrates each available integration method.

public class MyClusterApplication extends ClusterApplicationAdapter {
private AttributesProtocol client;
private final Node localNode;
public MyClusterApplication(final Node localNode) {
this.localNode = localNode;
}
@Override
public void start() {
logger().debug("APP: ClusterApplication started on node: " + localNode);
}
@Override
public void handleApplicationMessage(final RawMessage message, final ApplicationOutboundStream responder) {
logger().debug("APP: Received application message: " + message.asTextMessage());
}
@Override
public void informAllLiveNodes(final Collection<Node> liveNodes, final boolean isHealthyCluster) {
for (final Node id : liveNodes) {
logger().debug("APP: Live node confirmed: " + id);
}
printHealthy(isHealthyCluster);
}
@Override
public void informLeaderElected(final Id leaderId, final boolean isHealthyCluster, final boolean isLocalNodeLeading) {
logger().debug("APP: Leader elected: " + leaderId);
printHealthy(isHealthyCluster);
if (isLocalNodeLeading) {
logger().debug("APP: Local node is leading.");
}
}
@Override
public void informLeaderLost(final Id lostLeaderId, final boolean isHealthyCluster) {
logger().debug("APP: Leader lost: " + lostLeaderId);
printHealthy(isHealthyCluster);
}
@Override
public void informLocalNodeShutDown(final Id nodeId) {
logger().debug("APP: Local node shut down: " + nodeId);
}
@Override
public void informLocalNodeStarted(final Id nodeId) {
logger().debug("APP: Local node started: " + nodeId);
}
@Override
public void informNodeIsHealthy(final Id nodeId, final boolean isHealthyCluster) {
logger().debug("APP: Node reported healthy: " + nodeId);
printHealthy(isHealthyCluster);
}
@Override
public void informNodeJoinedCluster(final Id nodeId, final boolean isHealthyCluster) {
logger().debug("APP: " + nodeId + " joined cluster");
printHealthy(isHealthyCluster);
}
@Override
public void informNodeLeftCluster(final Id nodeId, final boolean isHealthyCluster) {
logger().debug("APP: " + nodeId + " left cluster");
printHealthy(isHealthyCluster);
}
@Override
public void informQuorumAchieved() {
logger().debug("APP: Quorum achieved");
printHealthy(true);
}
@Override
public void informQuorumLost() {
logger().debug("APP: Quorum lost");
printHealthy(false);
}
@Override
public void informAttributesClient(final AttributesProtocol client) {
logger().debug("APP: Attributes Client received.");
this.client = client;
if (localNode.id().value() == 1) {
client.add("fake.set", "fake.attribute.name1", "value1");
client.add("fake.set", "fake.attribute.name2", "value2");
}
}
@Override
public void informAttributeSetCreated(final String attributeSetName) {
logger().debug("APP: Attributes Set Created: " + attributeSetName);
}
@Override
public void informAttributeAdded(final String attributeSetName, final String attributeName) {
final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Added: " + attributeName + " Value: " + attr.value);
if (localNode.id().value() == 1) {
client.replace("fake.set", "fake.attribute.name1", "value-replaced-2");
client.replace("fake.set", "fake.attribute.name2", "value-replaced-20");
}
}
@Override
public void informAttributeRemoved(final String attributeSetName, final String attributeName) {
final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Removed: " + attributeName + " Attribute: " + attr);
}
@Override
public void informAttributeSetRemoved(final String attributeSetName) {
logger().debug("APP: Attributes Set Removed: " + attributeSetName);
}
@Override
public void informAttributeReplaced(final String attributeSetName, final String attributeName) {
final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Replaced: " + attributeName + " Value: " + attr.value);
if (localNode.id().value() == 1) {
client.remove("fake.set", "fake.attribute.name1");
}
}
private void printHealthy(final boolean isHealthyCluster) {
if (isHealthyCluster) {
logger().debug("APP: Cluster is healthy");
} else {
logger().debug("APP: Cluster is NOT healthy");
}
}
}

You would implement your own cluster application, overriding the methods that interest you.

Cluster-Wide Attributes

Note that the above MyClusterApplication has a property named client, which is an AttributesProtocol. The AttributesProtocol is the gateway to establishing the use of cluster-wide attributes. This client has the means to add new attributes, modify existing ones, and remove them.

public class MyClusterApplication extends ClusterApplicationAdapter {
private AttributesProtocol client;
...
client.add("Set1", "Attribute1", "value1");
...
client.replace("Set1", "Attribute1", "value2");
...
client.remove("Set1", "Attribute1");
}

Here "Set1" is the name of a set of attributes with a specific name. All attributes added must be within a named set. The "Attribute1" is the name of an attribute within "Set1". The "value1" is the initial value of "Attribute1" and "value2" is a different value that is later assigned to "Attribute1". As these attribute setting operations take place, the sets of attributes is replicated across nodes within the cluster. Revisiting the MyClusterApplication lifecycle messages in the above code shows how each node in the cluster is informed of newly added, replaced, and removed attributes.

An attribute may be given using various types. As expected these include: Byte, Short, Integer, Long, Character, Float, Double, Boolean, and String. Currently there is no support for more complex objects.

Any object with a reference to the AttributesProtocol or AttributesClient (a non-Actor implementation of AttributesProtocol) may add new attributes, modify existing ones, and remove them. The same protocol provides the means to query for current attribute sets and attributes within specific sets.

final Collection<AttributeSet> allSets = client.all();
final Collection<Attribute> allSetAttributes = client.allOf("Set1");

Since the client is purposely not backed by an Actor the query results will be immediate. The contents of the Collection results will be whatever the node currently has and will not reflect changes pending delivery from other nodes.

Starting the Cluster

You must start each node in the cluster. The cluster will not become fully operational until a quorum of nodes are started and capable of communicating across their operational socket channels.

The main() startup class within the vlingo-cluster JAR file is:

io.vlingo.cluster.NodeBootstrap

To start each node in the cluster, use the following commands. The first command may be used with Maven.

maven> mvn exec:java -Dexec.args=node1

This second command would be used directly with the Java VM (actually setting the classpath as appropriate).

java> java -classpath path1:path2 io.vlingo.cluster.NodeBootstrap node1

The separator you should use between classpath entries is determined by your runtime environment. For Un*x flavored environments this is : as show above. For Windows this is the ; separator, such as path1;path2.

These two example commands start only the node named node1. You would perform a startup for each of the nodes named and configured in the Java vlingo-cluster.properties.

Start each node by providing the node's name, as explained above, as the parameter to the main() class NodeBootstrap.

Cluster Health, Quorum, and Leadership

Generally a cluster will be composed of multiple nodes of an odd number (not just one, but for example, 3, 5, 21, 49, or 127). The reason for choosing an odd number of nodes is to make it possible to determine whether there is a quorum of nodes (totalNodes / 2 + 1) that can form a healthy cluster. Choosing an even number of nodes works, but in that case, when loosing one node, it doesn’t improve the quorum determination, nor does it strengthen the cluster when all nodes are available. Also configuring an even number of nodes can lead to a big problem, know as split-brain, with potentially disastrous consequences.

When a quorum of nodes is available and the individual nodes are communicating with one another, a leader is elected. The leader is responsible for making certain decisions in behalf of the cluster, and also announces newly joined nodes and those that have left the cluster.

The vlingo/cluster works in full-duplex mode, with two kinds of cluster node communication channels. There are the operational channels, which the cluster nodes use to maintain the health of the cluster. There are also application channels, which the services/applications use to pass messages between nodes. Using two different channel types allows for tuning each type in different ways. For example, application channels may be assigned to a faster network than the operational channels. It also opens the possibility for either channel type to use TCP or UDP independently of the other channel type.

Consensus Protocol and Algorithm

The cluster consensus protocol is one known by the name Bully Algorithm. Although an unpleasant name, the protocol is simple but powerful. This consensus protocol chooses a leader by determining the node with the "greatest id value." The identity may be numeric or alphanumeric. If numeric the "greatest node id" is determined with numeric comparisons. If alphanumeric, the "greatest node id" is determined lexicographically.

Consider these node ids: Node1, Node2, Node3. Of these three node ids, Node3 is lexicographically greatest, Node2 is next greatest, and Node1 is least greatest.

Any node that senses that the previous leader has been lost (left the cluster for any reason), messages the other known "nodes with greater ids" asking for an election to take place. Any "nodes with greater ids" tell the "lesser id node" that it will not be the leader. Finally when there are no nodes of "greater id" than a given node--known because it receives no "you aren't the leader" messages--that "greatest id node" declares itself leader. In essence the "node of the greatest id" bullies its way into the leadership position, asserting that it is currently greatest.

Any latent message received from a "node of even greater id" than the current declared leader will not take leadership away from it, because it is a healthy leader node. In other words, having the greatest node id value of all does not guarantee that the node will become cluster leader. Sometimes it is a matter of the node responding most rapidly that has a node id greater than all others, once a quorum of nodes is seen, will be voted leader.

It's possible that our consensus protocol will be augmented in the future with the Gossip Protocol to support very large clusters. We also have the option to swap out the Bully Algorithm for another, or to provide multiple consensus strategies.

Preventing Split-Brain Conditions

A split-brain cluster occurs when a single cluster has two or more leaders. The vlingo/cluster prevents this from occurring by means of its cluster quorum and voting policies, and algorithm. Given that a cluster must have a full complement of nodes, or at least a quorum of nodes, how is a split-brain prevented?

Consider a three-node cluster, composed of Node1, Node2, Node3. Per the Bully Algorithm, the greatest value node, Node3, takes leadership. Rules: Three nodes is a full healthy cluster; two nodes is a quorum and still healthy; one node alone is not a quorum nor healthy. Now consider the following scenario.

  1. Node3 is partitioned by the network, and cannot see Node1 or Node2

  2. Node1 and Node2 remain and see each other; and Node2 as the greatest value node is made leader

  3. When the partition that hid Node3 is healed between it and Node1, but not with Node2, Node3 will see itself as the greatest value node and vote itself into leadership, which is broadcast to Node1 (if Node1 accepts Node3 as leader, a split-brain occurs because Node2 will be the leader of the Node2+Node1 cluster, and Node3 will be the leader of Node3+Node1 cluster)

  4. The vote for Node3 leadership is received by Node1, but Node1 is already part of a healthy cluster with Node2 as leader

  5. Node1 sends a message to Node3 indicating that it rejects Node3 as leader and that Node2 is already leader

  6. Although Node3 cannot yet see Node2, it accepts that it is not currently leader; because it cannot yet see Node2 it will continually retry leadership voting, but as long as Node1 sees Node2 as leader, Node1 repeats messaging Node3 about rejection and current leadership

  7. Finally Node3 will see Node2 and Node2 will inform that it is the leader; Node3 will accept Node2 as leader and follow

In the above scenario consider one changed condition.

  1. Node2 is downed before Node3 sees Node2 as leader

  2. Node1 remains alive, but its cluster with Node2 is lost

  3. When next Node3 sends a leadership vote that is received by Node1, Node1 accepts Node3 as leader

  4. When Node2 is available again it will be informed that Node3 is the leader; should Node2 see only Node1 and vote for itself to lead, Node1 will reject Node2 as leader and inform that Node3 is leading

The scenario just stated has the same eventual result as the one previous to it, but with Node2 in the role that Node3 played there.