Exchange

Using the VLINGO XOOM message and event exchange across Bounded Contexts.

This XOOM LATTICE component provides an Exchange abstraction over Publish-Subscribe messaging for exchanges or topics. This is a means to publish messages to exchanges/topics inside or outside your current application/service, and to subscribe to such messages. The Exchange provides translators from internal to external message types, and from external to internal message types.

Understanding the Exchange

There are a few key abstractions. The following subsections describe each.

Exchange

The Exchange itself is the primary protocol. It defines a message exchange, such as a queue or topic, through which any number of related ExchangeSender, ExchangeReceiver, and ExchangeAdapter components are registered, and messages are sent. The protocol is defined as follows.

public interface Exchange {
  void close();
  <T> T channel();
  <T> T connection();
  String name();
  <L,E,EX> Exchange register(final Covey<L,E,EX> covey);
  <L> void send(final L local);
}

The defined behaviors are described next.

void close();

Close the Exchange and any underlying resources. Given that the Exchange is backed by some middleware messaging mechanism, and connections and other allocated resources to the mechanism are closed.

<T> T channel();

Answers the underlying channel, which is implementation dependent. The T type is a parameter that determines the dynamic cast to the runtime type.

<T> T connection();

Answers the underlying connection, which is implementation dependent. The T type is a parameter that determines the dynamic cast to the runtime type.

String name();

Answers the name of this Exchange as a String.

<L,E,EX> Exchange register(final Covey<L,E,EX> covey);

Registers a Covey with the Exchange. A Covey is a set of Exchange components used to send, receive, and translate messages. The Covey is explained in a below subsection.

<L> void send(final L local);

Sends the local message to the Exchange after first adapting it to an Exchange-compatible message. Any given message has three forms:

  1. Local—The type used by the local system to represent the message sent or received

  2. External—The type the message will have when it is received on the external service

  3. Exchange—The type used to transport the message on the Exchange from sender to receiver(s). The Exchange type holds an instance of the External type.

ConnectionSettings

The ConnectionSettings is a configuration for making a connection to the underlying messaging mechanism, including information for the host, port, virtual host, and user.

public class ConnectionSettings {
  public final String hostName;
  public final String password;
  public final int port;
  public final String username;
  public final String virtualHost;
  ...
}

There are also a number of factory methods for creating the value state.

Covey

A covey by definition is a group or set. We use a Covey as a set of Exchange components used for translating, sending, and receiving messages. A given Exchange will have one Covey for each local message type, either as a sender or a receiver of that message type.

public class Covey<L,E,EX> {
  public ExchangeAdapter<L,E,EX> adapter;
  public ExchangeReceiver<L> receiver;
  public ExchangeSender<EX> sender;
  public final Class<L> localClass;
  public final Class<E> externalClass;
  public final Class<EX> exchangeClass;
  ...
}

The adapter is an ExchangeAdapter that is capable of adapting messages to their three forms, Local, External, and Exchange.

The receiver is an ExchangeReceiver of messages of local type, meaning that the ExchangeAdapter has already adapted the incoming message to the local type.

The sender is an ExchangeSender of messages of exchange type through the Exchange, meaning that the ExchangeAdapter has already adapted the local message to an outgoing type.

Each of the Class instances is the type of a message form: localClass, externalClass, exchangeClass.

Queue

A Queue is just a specialized Exchange. It is expected that a Queue is not a fanout type but one that is point-to-point. Being point-to-point does not prevent the receiver end from enlisting multipleQueue competing consumers. The Queue protocol is identical to the Exchange.

ExchangeSender

A sender of messages to an Exchange or Queue.

public interface ExchangeSender<E> {
  void send(final E message);
}

The single protocol operation is as follows.

void send(final E message);

Sends the E exchange typed message through the underlying exchange or queue, meaning that the ExchangeAdapter has already adapted the local message to an outgoing type.

From this it may be apparent that the Exchange#send() does not actually send the message. Instead it dispatches the local message through the Forwarder for translation to the exchange type, and then the Forwarder dispatches the message to the ExchangeSender.

Forwarder

The forwarder of all local and exchange messages, forwarding local messages through an ExchangeSender and exchange messages through the ExchangeReceiver. It is the Forwarder that holds the Covey instances in behalf of the Exchange or Queue, and thus manages translation from local to exchange types and from exchange to local types.

MessageParameters

A builder of message parameters and set of metadata attributes, any number of which are common to associate with messages as parameters. These are to be used as appropriate or ignored if unneeded. The chosen parameters are built using the fluent interface.

public class MessageParameters {
  public static enum DeliveryMode { Durable, Transient }
  public static enum Priority { High, Normal, Medium, Low, P0, P1, P2, P3, P4, P5, P6, P7, P8, P9 }

  private String applicationId;
  private String contentEncoding;
  private String contentType;
  private String correlationId;
  private String deliveryId;
  private DeliveryMode deliveryMode;
  private String exchangeName;
  private Map<String,Object> headers;
  private String messageId;
  private String other1;
  private String other2;
  private String other3;
  private Priority priority;
  private String queueName;
  private boolean redeliver;
  private String replyTo;
  private String returnAddress;
  private List<String> routing;
  private long timestamp;
  private long timeToLive;
  private String tag;
  private String typeCode;
  private String typeName;
  private String userId;
  ...
}

There are factory methods and builder methods, as well as public accessors for each parameter. Many of the metadata parameters are common to AMQP, but may also be useful for any type of messaging protocol.

ExchangeReceiver

A receiver of messages from an Exchange, which may be implemented for each unique message type. The message type received has already been mapped and adapted from the exchange-typed message, and this is in the form of a local message.

public interface ExchangeReceiver<L> {
  void receive(final L message);
}

The single protocol operation is as follows.

void receive(final L message);

The implementor receives the delivered L local typed message from the Exchange. The message is of local type L having already been mapped and adapted from the exchange-typed message.

ExchangeAdapter

Adapts local messages of type L to exchange messages of type EX that hold external type E. This may involve mapping, in which case the underlying implementation must arrange for an ExchangeMapper to be registered and used. Note that the L and E types may be different between ExchangeAdapter and ExchangeMapper.

public interface ExchangeAdapter<L,E,EX> {
  L fromExchange(final EX exchangeMessage);
  EX toExchange(final L localMessage);
  boolean supports(final Object exchangeMessage);
}

The parameters of L, E, and EX are the local form, the external form, and the exchange form types, respectively. The operations of this protocol work as follows.

L fromExchange(final EX exchangeMessage);

Answers the L typed local message from the exchangeMessage of type EX.

EX toExchange(final L localMessage);

Answers the EX typed exchange message from the localMessage of type L.

boolean supports(final Object exchangeMessage);

Answers whether or not the adapter supports the exchangeMessage. This is used by the Forwarder to find the Covey used for the type of exchangeMessage.

ExchangeMapper

Supports mapping a local type message to external type, and a external type message to local type.

public interface ExchangeMapper<L,E> {
  E localToExternal(final L local);
  L externalToLocal(final E external);
}

The parameters of L and E are the local form type and the external form type, respectively. The mapper operations are used as follows.

E localToExternal(final L local);

Answers the external typed message given the local typed message.

L externalToLocal(final E external);

Answers the local typed message given the external typed message.

Usage

This section demonstrates how to use the Exchange. It provides a very simple usage example based on the RabbitMQ implementation.

To use the RabbitMQ implementation you first get connection to an Exchange.

final Exchange exchange =
    ExchangeFactory.fanOutInstance(
        settings(),
        "test-fanout",
        true);

This factory method is specific to the RabbitMQ implementation. The settings() is defined as:

private ConnectionSettings settings() {
  return ConnectionSettings.instance(
      "localhost",
       ConnectionSettings.UndefinedPort,
       "/",
       "guest",
       "guest");
}

The connection is to RabbitMQ on localhost and uses the default port by passing the value of UndefinedPort, which is -1. The virtualHost is "/" and the username and password are both "guest".

Now that the Exchange is open we can register a Covey to handle a given set of message types.

exchange
  .register(Covey.of(
      new MessageSender(exchange.connection()),
      new TextMessageReceiver(),
      new TextExchangeAdapter(),
      String.class,
      String.class,
      Message.class));

To keep the example simple, both the local and external message types are String. The exchange message type is Message, which is a specialized RabbitMQ type.

package io.vlingo.lattice.exchange.rabbitmq;

import io.vlingo.lattice.exchange.MessageParameters;

public class Message {
  public final MessageParameters messageParameters;
  public final byte[] payload;

  public Message(final byte[] payload, final MessageParameters messageParameters) {
    this.payload = payload;
    this.messageParameters = messageParameters;
  }

  public Message(final String payload, final MessageParameters messageParameters) {
    this.payload = payload.getBytes();
    this.messageParameters = messageParameters;
  }

  public String payloadAsText() {
    return new String(payload);
  }
}

The Message holds a byte[] payload and an instance of MessageParameters as metadata. The Message may be constructed using a byte[] or a String, which is converted to a byte[], along with MessageParameters.

Now here is more specific detail on the Covey registration on the Exchange. The MessageSender is a predefined implementation of ExchangeSender specifically for RabbitMQ. You instantiate the MessageSender with the Exchange's connection.

The TextMessageReceiver and TextExchangeAdapter are specifically for String messages. The TextMessageReceiver is implemented as follows.

public class TextMessageReceiver implements ExchangeReceiver<String> {
  public TextMessageReceiver() { }

  @Override
  public void receive(final String message) {
    ...
  }
}

Your receive() method will apply the message as appropriate for your service/application. The following is the TextExchangeAdapter.

public class TextExchangeAdapter implements ExchangeAdapter<String,String,Message> {
  private ExchangeMapper<String,String> mapper = new TextExchangeMapper();

  @Override
  public String fromExchange(final Message exchangeMessage) {
    final String local = mapper.externalToLocal(exchangeMessage.payloadAsText());
    return local;
  }

  @Override
  public Message toExchange(final String localMessage) {
    return new Message(localMessage, MessageParameters.bare().deliveryMode(DeliveryMode.Durable));
  }

  @Override
  public boolean supports(final Object exchangeMessage) {
    return Message.class == exchangeMessage.getClass();
  }
}

This specific adapter supports any Message received from the Exchange. To adapt from an Exchange-received Message to a String, the adapter calls on the TextExchangeMapper. To get a String from the byte[] payload, the adapter uses Message#payloadAsText().

Going from a localMessage to a Message for the Exchange requires constructing with the localMessage that is converted into a byte[], and providing a MessageParameters instance indicating that the Message must be sent as durable.

This is the mapper implementation.

public class TextExchangeMapper implements ExchangeMapper<String,String> {

  @Override
  public String localToExternal(final String local) {
    return new String(local);
  }

  @Override
  public String externalToLocal(final String external) {
    return new String(external);
  }
}

Unsurprisingly this mapper, in mapping from String to String, has almost no heavy-lifting functionality. Note that a new String is created from both local and external String messages to ensure the instances are different.

That's all that is required for a simple text-based message exchange between a sending service and a receiving service.

A more sophisticated set of message types, adapters, and mappers are shown next. Note that the following message types all have equals(), hashCode(), and toString(), but are excluded here for clarity.

public class LocalType1 {
  public final String attribute1;
  public final int attribute2;

  public LocalType1(final String value1, final int value2) {
    this.attribute1 = value1;
    this.attribute2 = value2;
  }
}
public class LocalType2 {
  public final String attribute1;
  public final float attribute2;

  public LocalType2(final String value1, final float value2) {
    this.attribute1 = value1;
    this.attribute2 = value2;
  }
}

The external message types are corresponding to the similarly named local types are as follows.

public class ExternalType1 implements Message {
  public final String field1;
  public final String field2;

  public ExternalType1(final String value1, final int value2) {
    this.field1 = value1;
    this.field2 = Integer.toString(value2);
  }
}
public class ExternalType2 implements Message {
  public final String field1;
  public final String field2;

  public ExternalType2(final String value1, final float value2) {
    this.field1 = value1;
    this.field2 = Float.toString(value2);
  }
}

The local types hold String, int, and float types, while the external types hold only String attribute types. Also the attribute names are different. This justifies the need for an adapter and mapper for each local to external type. First are the adapter and mapper for LocalType1 and ExternalType1.

public class ExchangeAdapter1 implements ExchangeAdapter<LocalType1,ExternalType1,ExchangeMessage> {
  private ExchangeMapper<LocalType1,ExternalType1> mapper = new TestExchangeMapper1();

  @Override
  public LocalType1 fromExchange(final ExchangeMessage externalMessage) {
    final ExternalType1 external = JsonSerialization.deserialized(externalMessage.payload, ExternalType1.class);
    final LocalType1 local = mapper.externalToLocal(external);
    return local;
  }

  @Override
  public ExchangeMessage toExchange(final LocalType1 localMessage) {
    final ExternalType1 external = mapper.localToExternal(localMessage);
    final String payload = JsonSerialization.serialized(external);
    return new ExchangeMessage(ExternalType1.class.getName(), payload);
  }

  @Override
  public boolean supports(final Object exchangeMessage) {
    if (ExchangeMessage.class != exchangeMessage.getClass()) {
      return false;
    }
    return ExternalType1.class.getName().equals(((ExchangeMessage) exchangeMessage).type);
  }
}
public class ExchangeMapper1 implements ExchangeMapper<LocalType1,ExternalType1> {

  @Override
  public ExternalType1 localToExternal(final LocalType1 local) {
    return new ExternalType1(local.attribute1, local.attribute2);
  }

  @Override
  public LocalType1 externalToLocal(final ExternalType1 external) {
    return new LocalType1(external.field1, Integer.parseInt(external.field2));
  }
}

Next are the adapter and mapper for LocalType2 and ExternalType2.

public class ExchangeAdapter2 implements ExchangeAdapter<LocalType2,ExternalType2,ExchangeMessage> {
  private ExchangeMapper<LocalType2,ExternalType2> mapper = new TestExchangeMapper2();

  @Override
  public LocalType2 fromExchange(final ExchangeMessage externalMessage) {
    final ExternalType2 external = JsonSerialization.deserialized(externalMessage.payload, ExternalType2.class);
    final LocalType2 local = mapper.externalToLocal(external);
    return local;
  }

  @Override
  public ExchangeMessage toExchange(final LocalType2 localMessage) {
    final ExternalType2 external = mapper.localToExternal(localMessage);
    final String payload = JsonSerialization.serialized(external);
    return new ExchangeMessage(ExternalType2.class.getName(), payload);
  }

  @Override
  public boolean supports(final Object exchangeMessage) {
    if (ExchangeMessage.class != exchangeMessage.getClass()) {
      return false;
    }
    return ExternalType2.class.getName().equals(((ExchangeMessage) exchangeMessage).type);
  }
}
public class ExchangeMapper2 implements ExchangeMapper<LocalType2,ExternalType2> {

  @Override
  public ExternalType2 localToExternal(final LocalType2 local) {
    return new ExternalType2(local.attribute1, local.attribute2);
  }

  @Override
  public LocalType2 externalToLocal(final ExternalType2 external) {
    return new LocalType2(external.field1, Float.parseFloat(external.field2));
  }
}

You may define any number of adapters and mappers to deal with the translations between local and external message types. Each of the sets will be held by a separate Covey and registered with the Exchange or Queue.

Last updated