Cluster

Scaling your services with resilience using XOOM Cluster.

Resiliency and Scale

The XOOM Cluster is a key component that sits on top of XOOM Actors to support the development of scalable and fault-tolerant tools and applications. Additional tools that build out the VLINGO XOOM platform will almost always be constructed on top of XOOM Cluster. You should also implement and deploy your services/applications in clusters.

Consider the following documentation specifically the how-to guides for XOOM Cluster; that is, how to configure it, how to run it, how it's designed, how it works. For practical use, including actor-based compute grid, service/application partitioning and scale, and cluster-wide data caching, see XOOM Lattice.

Besides scalable fault-tolerance, the XOOM Cluster also provides cluster-wide, synchronizing attributes of name-value pairs. This enables the cluster to share live and mutating operational state among all nodes. Likewise, application-level services can also make use of cluster-wide, synchronizing attributes in order to enhance shared application/service values.

Referring to the cluster diagram, if one of the three nodes is lost, the cluster will still maintain a quorum and remain healthy. However, if two nodes are lost and only one node remains in a running state, the quorum is lost and the cluster is considered unhealthy. In that case the one remaining node will enter an idle state and await one or more of the other nodes to return to active operation. When that occurs, the cluster will again constitute a quorum and reach a healthy state. Although many service/application clusters will require only three nodes for optimal use, clusters can support far more than three nodes. Yet, because of the performance and efficiencies of our platform, you may rarely need many nodes. Even a 9-node, 21-node, or 49-node cluster may be considered quite large due to our efficiency standards.

Using the XOOM Cluster

You must first configure the cluster for your intended use. Currently this involves creating a Java xoom-cluster.properties file, which looks like this.

The different properties are explained next.

Cluster Limits, Timing, and Integration Configurations

cluster.op.buffer.size

The maximum size of a single operational message, which are actually tiny, other than DIR. Assuming short host names, 4096 would support approximately 90-99 nodes with DIR.

cluster.app.buffer.size

The maximum size of a single cluster client (tool, application, service) message. You may be able to tune this to be much smaller depending on the size of application/service messages.

10240

cluster.app.incoming.probe.interval

The interval (in milliseconds) within which the application inbound stream will be probed for available messages.

10

cluster.op.outgoing.pooled.buffers

The number of polled buffers for outgoing asynchronous operations messages.

20

cluster.app.outgoing.pooled.buffers

The number of polled buffers for outgoing asynchronous application messages.

50

cluster.msg.charset

The default Java character set type.

UTF-8

cluster.app.class

The fully-qualified name of the Class that serves as the client application/service instance used to integrate with the cluster. This class must implement the interface ClusterApplication, or extend the class ClusterApplicationAdapter. For example, com.myapplication.cluster.MyClusterAppcould be the name of the class.

cluster.app.stage

The name of the Stage within which the client application/service instance is created. For example, myapplication.stage, may be the name of the Stage within which thecom.myapplication.cluster.MyClusterApp instance is created.

cluster.attributes

.redistribution.interval

The interval in milliseconds within which the new and changed application/service level attributes maintained by the cluster are distributed and redistributed.

1000

cluster.attributes

.redistribution.retries

The number of retries for redistributing unconfirmed attribute changes.

20

cluster.startup.period

The interval in milliseconds within which each newly joined node takes the opportunity to discover the other nodes from the cluster. Within this period the communication with the other nodes is suspended.

5000

cluster.nodes.quorum

Under elastic scale, this is the least number of live nodes expected for a healthy cluster quorum.

cluster.seeds

Under elastic scale, this is the list of all seed nodes within the respective cluster. A seed is a regular node with the seed functionality enabled. This list has the following format: <seed1Host>.<seed1Port>,<seed2Host>.<seed2Port>.

Cluster Node Configurations

The properties (configuration) of each node are specified at startup, as argument, in the format id:name:isSeed:host:opPort:appPort. The specific properties and names are as follows.

Node Description Property

Value

id

The unique identity of the given node with the name node.

name

The unique name of the node that must match one of the names in the cluster.nodes property.

isSeed

Indicates whether this node is a seed node. A seed node is one that will, virtually speaking, always be available for other nodes to receive service. Values are true or false. Each node which is seed as well must be declared in the seed list using host:opPort parameters.

host

The DNS name of the network host of the node with name node.

opPort

The port number that is assigned to the cluster node's operational socket channel.

appPort

The port number that is assigned to the cluster node's incoming application/service socket channel.

All of the above configurations should be tuned and assigned specifically for your cluster's operations. You should have a higher number of nodes than cluster.nodes.quorum to form a healthy cluster. Seed functionality facilitates the discovery of the nodes within the cluster and is mainly used at start up. Following startup the seed functionality is no longer needed and seed nodes simply act as regular nodes. The XOOM Cluster is developed based on a peer-to-peer, highly scalable cluster membership protocol, which manages membership after start up.

Cluster Application

Each cluster node must have a configured cluster application Stage and Class. These are defined above as cluster.app.stage and cluster.app.class. See the above respective integration configurations for details. Any class serving as a cluster application must implement the interface ClusterApplication and be an Actor. Both of these requirements are supported by extending the ClusterApplicationAdapter abstract base class. This adapter enables you to override only the integration methods of interest.

You likely will not need to provide your own ClusterApplication implementation because our recommendation is to use the XOOM Lattice Grid, which is based on XOOM Cluster. See below following the code example.

The following example is the MyClusterApplication that represents the supported integration interface, which demonstrates each available integration method.

public class MyClusterApplication extends ClusterApplicationAdapter {
  private AttributesProtocol client;
  private final Node localNode;
  
  public MyClusterApplication(final Node localNode) {
    this.localNode = localNode;
  }

  @Override
  public void start() {
    logger().debug("APP: ClusterApplication started on node: " + localNode);
  }
  
  @Override
  public void stop() {
      logger().debug("APP: Local node is going to be stopped...");
  }

  @Override
  public void handleApplicationMessage(final RawMessage message, final ApplicationOutboundStream responder) {
     logger().debug("APP: Received application message: " + message.asTextMessage());
  }

  @Override
  public void informAllLiveNodes(final Collection<Node> liveNodes, final boolean isHealthyCluster) {
    for (final Node id : liveNodes) {
       logger().debug("APP: Live node confirmed: " + id);
    }
    printHealthy(isHealthyCluster);
  }

  @Override
  public void informNodeIsHealthy(final Id nodeId, final boolean isHealthyCluster) {
    logger().debug("APP: Node reported healthy: " + nodeId);
    printHealthy(isHealthyCluster);
  }

  @Override
  public void informNodeJoinedCluster(final Id nodeId, final boolean isHealthyCluster) {
    logger().debug("APP: " + nodeId + " joined cluster");
    printHealthy(isHealthyCluster);
  }

  @Override
  public void informNodeLeftCluster(final Id nodeId, final boolean isHealthyCluster) {
    logger().debug("APP: " + nodeId + " left cluster");
    printHealthy(isHealthyCluster);
  }

  @Override
  public void informAttributesClient(final AttributesProtocol client) {
    logger().debug("APP: Attributes Client received.");
    this.client = client;
    if (localNode.id().value() == 1) {
      client.add("fake.set", "fake.attribute.name1", "value1");
      client.add("fake.set", "fake.attribute.name2", "value2");
    }
  }

  @Override
  public void informAttributeSetCreated(final String attributeSetName) {
     logger().debug("APP: Attributes Set Created: " + attributeSetName);
  }

  @Override
  public void informAttributeAdded(final String attributeSetName, final String attributeName) {
    final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
    logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Added: " + attributeName + " Value: " + attr.value);
    if (localNode.id().value() == 1) {
      client.replace("fake.set", "fake.attribute.name1", "value-replaced-2");
      client.replace("fake.set", "fake.attribute.name2", "value-replaced-20");
    }
  }

  @Override
  public void informAttributeRemoved(final String attributeSetName, final String attributeName) {
    final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
    logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Removed: " + attributeName + " Attribute: " + attr);
  }

  @Override
  public void informAttributeSetRemoved(final String attributeSetName) {
    logger().debug("APP: Attributes Set Removed: " + attributeSetName);
  }

  @Override
  public void informAttributeReplaced(final String attributeSetName, final String attributeName) {
    final Attribute<String> attr = client.attribute(attributeSetName, attributeName);
    logger().debug("APP: Attribute Set " + attributeSetName + " Attribute Replaced: " + attributeName + " Value: " + attr.value);
    if (localNode.id().value() == 1) {
      client.remove("fake.set", "fake.attribute.name1");
    }
  }

  private void printHealthy(final boolean isHealthyCluster) {
    if (isHealthyCluster) {
       logger().debug("APP: Cluster is healthy");
    } else {
       logger().debug("APP: Cluster is NOT healthy");
    }
  }
}

You would implement your own cluster application, overriding the methods that interest you. Note that when using the XOOM Lattice Grid, the ClusterApplication is provided for you by the Grid. Thus, only low-level cluster nodes are required to provide a ClusterApplication.

Cluster-Wide Attributes

Note that the above MyClusterApplication has a property named client, which is an AttributesProtocol. The AttributesProtocol is the gateway to establishing the use of cluster-wide attributes. This client has the means to add new attributes, modify existing ones, and remove them.

public class MyClusterApplication extends ClusterApplicationAdapter {
  private AttributesProtocol client;
  ...
    client.add("Set1", "Attribute1", "value1");
  ...
    client.replace("Set1", "Attribute1", "value2");
  ...
    client.remove("Set1", "Attribute1");
}

Here "Set1" is the name of a set of attributes with a specific name. All attributes added must be within a named set. The "Attribute1" is the name of an attribute within "Set1". The "value1" is the initial value of "Attribute1" and "value2" is a different value that is later assigned to "Attribute1". As these attribute setting operations take place, the sets of attributes are replicated across nodes within the cluster. Revisiting the MyClusterApplication lifecycle messages in the above code shows how each node in the cluster is informed of newly added, replaced, and removed attributes.

An attribute may be given using various types. As expected these include: Byte, Short, Integer, Long, Character, Float, Double, Boolean, and String. Currently there is no support for more complex objects.

Any object with a reference to the AttributesProtocol or AttributesClient (a non-Actor implementation of AttributesProtocol) may add new attributes, modify existing ones, and remove them. The same protocol provides the means to query for current attribute sets and attributes within specific sets.

final Collection<AttributeSet> allSets = client.all();
final Collection<Attribute> allSetAttributes = client.allOf("Set1");

Since the client is purposely not backed by an Actor the query results will be immediate. The contents of the Collection results will be whatever the node currently has and will not reflect changes pending delivery from other nodes.

Starting the Cluster

You must start each node in the cluster. The cluster will not become fully operational until a quorum of nodes are started and capable of communicating across their operational socket channels.

The main() startup class within the xoom-cluster JAR file is:

io.vlingo.xoom.cluster.NodeBootstrap

To start each node in the cluster, use the following commands. The first command may be used with Maven.

$ mvn exec:java -Dexec.args=1:node1:localhost:false:17171:17172

This second command would be used directly with the Java VM (actually setting the classpath as appropriate).

$ java -classpath path1:path2 io.vlingo.xoom.cluster.NodeBootstrap 1:node1:localhost:false:17171:17172

The separator you should use between classpath entries is determined by your runtime environment. For Un*x flavored environments this is : as show above. For Windows this is the ; separator, such as path1;path2.

These two example commands start only the node named node1. You would perform a startup for each of the nodes named and configured in the Java xoom-cluster.properties.

Start each node by providing the node's name, as explained above, as the parameter to the main() class NodeBootstrap.

Cluster Health and Membership

A cluster will composed of a dynamic number of nodes. Nodes can elastically join or leave the cluster. The minimum number nodes for a healthy cluster is cluster.nodes.quorum. The cluster membership is implemented based on SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) protocol.

Last updated