Comment on page
Exchange
Using the VLINGO XOOM message and event exchange across Bounded Contexts.
This XOOM LATTICE component provides an
Exchange
abstraction over Publish-Subscribe messaging for exchanges or topics. This is a means to publish messages to exchanges/topics inside or outside your current application/service, and to subscribe to such messages. The Exchange
provides translators from internal to external message types, and from external to internal message types.There are a few key abstractions. The following subsections describe each.
The
Exchange
itself is the primary protocol. It defines a message exchange, such as a queue or topic, through which any number of related ExchangeSender
, ExchangeReceiver
, and ExchangeAdapter
components are registered, and messages are sent. The protocol is defined as follows.public interface Exchange {
void close();
<T> T channel();
<T> T connection();
String name();
<L,E,EX> Exchange register(final Covey<L,E,EX> covey);
<L> void send(final L local);
}
The defined behaviors are described next.
void close();
Close the
Exchange
and any underlying resources. Given that the Exchange
is backed by some middleware messaging mechanism, and connections and other allocated resources to the mechanism are closed.<T> T channel();
Answers the underlying channel, which is implementation dependent. The
T
type is a parameter that determines the dynamic cast to the runtime type.<T> T connection();
Answers the underlying connection, which is implementation dependent. The
T
type is a parameter that determines the dynamic cast to the runtime type.String name();
Answers the name of this
Exchange
as a String
.<L,E,EX> Exchange register(final Covey<L,E,EX> covey);
Registers a
Covey
with the Exchange
. A Covey
is a set of Exchange
components used to send, receive, and translate messages. The Covey
is explained in a below subsection.<L> void send(final L local);
Sends the
local
message to the Exchange
after first adapting it to an Exchange
-compatible message. Any given message has three forms:- 1.Local—The type used by the local system to represent the message sent or received
- 2.External—The type the message will have when it is received on the external service
- 3.Exchange—The type used to transport the message on the Exchange from sender to receiver(s). The Exchange type holds an instance of the External type.
The
ConnectionSettings
is a configuration for making a connection to the underlying messaging mechanism, including information for the host, port, virtual host, and user.public class ConnectionSettings {
public final String hostName;
public final String password;
public final int port;
public final String username;
public final String virtualHost;
...
}
There are also a number of factory methods for creating the value state.
A covey by definition is a group or set. We use a
Covey
as a set of Exchange
components used for translating, sending, and receiving messages. A given Exchange
will have one Covey
for each local message type, either as a sender or a receiver of that message type.public class Covey<L,E,EX> {
public ExchangeAdapter<L,E,EX> adapter;
public ExchangeReceiver<L> receiver;
public ExchangeSender<EX> sender;
public final Class<L> localClass;
public final Class<E> externalClass;
public final Class<EX> exchangeClass;
...
}
The
adapter
is an ExchangeAdapter
that is capable of adapting messages to their three forms, Local, External, and Exchange.The
receiver
is an ExchangeReceiver
of messages of local type, meaning that the ExchangeAdapter
has already adapted the incoming message to the local type.The
sender
is an ExchangeSender
of messages of exchange type through the Exchange
, meaning that the ExchangeAdapter
has already adapted the local message to an outgoing type.Each of the
Class
instances is the type of a message form: localClass
, externalClass
, exchangeClass
.A
Queue
is just a specialized Exchange
. It is expected that a Queue
is not a fanout type but one that is point-to-point. Being point-to-point does not prevent the receiver end from enlisting multipleQueue
competing consumers. The Queue
protocol is identical to the Exchange
.A sender of messages to an
Exchange
or Queue
. public interface ExchangeSender<E> {
void send(final E message);
}
The single protocol operation is as follows.
void send(final E message);
Sends the
E
exchange typed message through the underlying exchange or queue, meaning that the ExchangeAdapter
has already adapted the local message to an outgoing type.From this it may be apparent that the
Exchange#send()
does not actually send the message. Instead it dispatches the local message through the Forwarder
for translation to the exchange type, and then the Forwarder
dispatches the message to the ExchangeSender
.The forwarder of all local and exchange messages, forwarding local messages through an
ExchangeSender
and exchange messages through the ExchangeReceiver
. It is the Forwarder
that holds the Covey
instances in behalf of the Exchange
or Queue
, and thus manages translation from local to exchange types and from exchange to local types.A builder of message parameters and set of metadata attributes, any number of which are common to associate with messages as parameters. These are to be used as appropriate or ignored if unneeded. The chosen parameters are built using the fluent interface.
public class MessageParameters {
public static enum DeliveryMode { Durable, Transient }
public static enum Priority { High, Normal, Medium, Low, P0, P1, P2, P3, P4, P5, P6, P7, P8, P9 }
private String applicationId;
private String contentEncoding;
private String contentType;
private String correlationId;
private String deliveryId;
private DeliveryMode deliveryMode;
private String exchangeName;
private Map<String,Object> headers;
private String messageId;
private String other1;
private String other2;
private String other3;
private Priority priority;
private String queueName;
private boolean redeliver;
private String replyTo;
private String returnAddress;
private List<String> routing;
private long timestamp;
private long timeToLive;
private String tag;
private String typeCode;
private String typeName;
private String userId;
...
}
There are factory methods and builder methods, as well as public accessors for each parameter. Many of the metadata parameters are common to AMQP, but may also be useful for any type of messaging protocol.
A receiver of messages from an
Exchange
, which may be implemented for each unique message type. The message type received has already been mapped and adapted from the exchange-typed message, and this is in the form of a local message.public interface ExchangeReceiver<L> {
void receive(final L message);
}
The single protocol operation is as follows.
void receive(final L message);
The implementor receives the delivered
L
local typed message from the Exchange
. The message is of local type L
having already been mapped and adapted from the exchange-typed message.Adapts local messages of type
L
to exchange messages of type EX
that hold external type E
. This may involve mapping, in which case the underlying implementation must arrange for an ExchangeMapper
to be registered and used. Note that the L and E types may be different between ExchangeAdapter
and ExchangeMapper
.public interface ExchangeAdapter<L,E,EX> {
L fromExchange(final EX exchangeMessage);
EX toExchange(final L localMessage);
boolean supports(final Object exchangeMessage);
}
The parameters of
L
, E
, and EX
are the local form, the external form, and the exchange form types, respectively. The operations of this protocol work as follows.L fromExchange(final EX exchangeMessage);
Answers the
L
typed local message from the exchangeMessage
of type EX
.EX toExchange(final L localMessage);
Answers the
EX
typed exchange message from the localMessage
of type L
.boolean supports(final Object exchangeMessage);
Answers whether or not the adapter supports the
exchangeMessage
. This is used by the Forwarder
to find the Covey
used for the type of exchangeMessage
.Supports mapping a local type message to external type, and a external type message to local type.
public interface ExchangeMapper<L,E> {
E localToExternal(final L local);
L externalToLocal(final E external);
}
The parameters of
L
and E
are the local form type and the external form type, respectively. The mapper operations are used as follows.E localToExternal(final L local);
Answers the external typed message given the local typed message.
L externalToLocal(final E external);
Answers the local typed message given the external typed message.
This section demonstrates how to use the
Exchange
. It provides a very simple usage example based on the RabbitMQ implementation.To use the RabbitMQ implementation you first get connection to an
Exchange
.final Exchange exchange =
ExchangeFactory.fanOutInstance(
settings(),
"test-fanout",
true);
This factory method is specific to the RabbitMQ implementation. The
settings()
is defined as:private ConnectionSettings settings() {
return ConnectionSettings.instance(
"localhost",
ConnectionSettings.UndefinedPort,
"/",
"guest",
"guest");
}
The connection is to RabbitMQ on
localhost
and uses the default port by passing the value of UndefinedPort
, which is -1
. The virtualHost
is "/"
and the username
and password
are both "guest"
. Now that the
Exchange
is open we can register a Covey
to handle a given set of message types.exchange
.register(Covey.of(
new MessageSender(exchange.connection()),
new TextMessageReceiver(),
new TextExchangeAdapter(),
String.class,
String.class,
Message.class));
To keep the example simple, both the local and external message types are
String
. The exchange message type is Message
, which is a specialized RabbitMQ type.package io.vlingo.lattice.exchange.rabbitmq;
import io.vlingo.lattice.exchange.MessageParameters;
public class Message {
public final MessageParameters messageParameters;
public final byte[] payload;
public Message(final byte[] payload, final MessageParameters messageParameters) {
this.payload = payload;
this.messageParameters = messageParameters;
}
public Message(final String payload, final MessageParameters messageParameters) {
this.payload = payload.getBytes();
this.messageParameters = messageParameters;
}
public String payloadAsText() {
return new String(payload);
}
}
The
Message
holds a byte[] payload
and an instance of MessageParameters
as metadata. The Message
may be constructed using a byte[]
or a String
, which is converted to a byte[]
, along with MessageParameters
.Now here is more specific detail on the
Covey
registration on the Exchange
. The MessageSender
is a predefined implementation of ExchangeSender
specifically for RabbitMQ. You instantiate the MessageSender
with the Exchange
's connection.The
TextMessageReceiver
and TextExchangeAdapter
are specifically for String
messages. The TextMessageReceiver
is implemented as follows.public class TextMessageReceiver implements ExchangeReceiver<String> {
public TextMessageReceiver() { }
@Override
public void receive(final String message) {
...
}
}
Your
receive()
method will apply the message as appropriate for your service/application. The following is the TextExchangeAdapter
.public class TextExchangeAdapter implements ExchangeAdapter<String,String,Message> {
private ExchangeMapper<String,String> mapper = new TextExchangeMapper();
@Override
public String fromExchange(final Message exchangeMessage) {
final String local = mapper.externalToLocal(exchangeMessage.payloadAsText());
return local;
}
@Override
public Message toExchange(final String localMessage) {
return new Message(localMessage, MessageParameters.bare().deliveryMode(DeliveryMode.Durable));
}
@Override
public boolean supports(final Object exchangeMessage) {
return Message.class == exchangeMessage.getClass();
}
}
This specific adapter supports any
Message
received from the Exchange
. To adapt from an Exchange
-received Message
to a String
, the adapter calls on the TextExchangeMapper
. To get a String
from the byte[] payload
, the adapter uses Message#payloadAsText()
. Going from a
localMessage
to a Message
for the Exchange
requires constructing with the localMessage
that is converted into a byte[]
, and providing a MessageParameters
instance indicating that the Message
must be sent as durable.This is the mapper implementation.
public class TextExchangeMapper implements ExchangeMapper<String,String> {
@Override
public String localToExternal(final String local) {
return new String(local);
}
@Override
public String externalToLocal(final String external) {
return new String(external);
}
}
Unsurprisingly this mapper, in mapping from
String
to String
, has almost no heavy-lifting functionality. Note that a new String
is created from both local and external String
messages to ensure the instances are different.That's all that is required for a simple text-based message exchange between a sending service and a receiving service.
A more sophisticated set of message types, adapters, and mappers are shown next. Note that the following message types all have
equals()
, hashCode()
, and toString()
, but are excluded here for clarity.public class LocalType1 {
public final String attribute1;
public final int attribute2;
public LocalType1(final String value1, final int value2) {
this.attribute1 = value1;
this.attribute2 = value2;
}
}
public class LocalType2 {
public final String attribute1;
public final float attribute2;
public LocalType2(final String value1, final float value2) {
this.attribute1 = value1;
this.attribute2 = value2;
}
}
The external message types are corresponding to the similarly named local types are as follows.
public class ExternalType1 implements Message {
public final String field1;
public final String field2;
public ExternalType1(final String value1, final int value2) {
this.field1 = value1;
this.field2 = Integer.toString(value2);
}
}
public class ExternalType2 implements Message {
public final String field1;
public final String field2;
public ExternalType2(final String value1, final float value2) {
this.field1 = value1;
this.field2 = Float.toString(value2);
}
}
The local types hold
String
, int
, and float
types, while the external types hold only String
attribute types. Also the attribute names are different. This justifies the need for an adapter and mapper for each local to external type. First are the adapter and mapper for LocalType1
and ExternalType1
.public class ExchangeAdapter1 implements ExchangeAdapter<LocalType1,ExternalType1,ExchangeMessage> {
private ExchangeMapper<LocalType1,ExternalType1> mapper = new TestExchangeMapper1();
@Override
public LocalType1 fromExchange(final ExchangeMessage externalMessage) {
final ExternalType1 external = JsonSerialization.deserialized(externalMessage.payload, ExternalType1.class);
final LocalType1 local = mapper.externalToLocal(external);
return local;
}
@Override
public ExchangeMessage toExchange(final LocalType1 localMessage) {
final ExternalType1 external = mapper.localToExternal(localMessage);
final String payload = JsonSerialization.serialized(external);
return new ExchangeMessage(ExternalType1.class.getName(), payload);
}
@Override
public boolean supports(final Object exchangeMessage) {
if (ExchangeMessage.class != exchangeMessage.getClass()) {
return false;
}
return ExternalType1.class.getName().equals(((ExchangeMessage) exchangeMessage).type);
}
}
public class ExchangeMapper1 implements ExchangeMapper<LocalType1,ExternalType1> {
@Override
public ExternalType1 localToExternal(final LocalType1 local) {
return new ExternalType1(local.attribute1, local.attribute2);
}
@Override
public LocalType1 externalToLocal(final ExternalType1 external) {
return new LocalType1(external.field1, Integer.parseInt(external.field2));
}
}
Next are the adapter and mapper for
LocalType2
and ExternalType2
.public class ExchangeAdapter2 implements ExchangeAdapter<LocalType2,ExternalType2,ExchangeMessage> {
private ExchangeMapper<LocalType2,ExternalType2> mapper = new TestExchangeMapper2();
@Override
public LocalType2 fromExchange(final ExchangeMessage externalMessage) {
final ExternalType2 external = JsonSerialization.deserialized(externalMessage.payload, ExternalType2.class);
final LocalType2 local = mapper.externalToLocal(external);
return local;
}
@Override
public ExchangeMessage toExchange(final LocalType2 localMessage) {
final ExternalType2 external = mapper.localToExternal(localMessage);
final String payload = JsonSerialization.serialized(external);
return new ExchangeMessage(ExternalType2.class.getName(), payload);
}
@Override
public boolean supports(final Object exchangeMessage) {
if (ExchangeMessage.class != exchangeMessage.getClass()) {
return false;
}
return ExternalType2.class.getName().equals(((ExchangeMessage) exchangeMessage).type);
}
}
public class ExchangeMapper2 implements ExchangeMapper<LocalType2,ExternalType2> {
@Override
public ExternalType2 localToExternal(final LocalType2 local) {
return new ExternalType2(local.attribute1, local.attribute2);
}
@Override
public LocalType2 externalToLocal(final ExternalType2 external) {
return new LocalType2(external.field1, Float.parseFloat(external.field2));
}
}
You may define any number of adapters and mappers to deal with the translations between local and external message types. Each of the sets will be held by a separate
Covey
and registered with the Exchange
or Queue
.Last modified 1yr ago