HTTP (REST and RPC)
The embeddable HTTP server for Reactive REST backed by XOOM Actors and XOOM Wire.
The XOOM HTTP component supports reactive, scalable, and resilient HTTP servers and REST services running as a small, fast, embedded server. Thus, this component does not run standalone, but is meant to provide very lightweight and high-performing HTTP support within a application- or microservice-based runtime, such as a Bounded Context. The server supports REST and RPC dispatching to implementations of fluent APIs.
It is common for developers to think of REST over HTTP in terms of CRUD because the primary methods provided by the protocol are: POST, GET, PUT, and DELETE. If taken at face value, these verbs are CRUD through and through. Yet, almost no one uses the methods to map directly from browser to a database. Most contemporary applications instead map the HTTP methods to procedures (i.e. a method on a Java or C# class) within the application. Those procedures are responsible for adapting path parameters, headers, and the request body to a form that can be consumed by the inner application components.
The procedure to which the HTTP request is mapped need not represent a CRUD operation at all. When you consider that POST, GET, PUT, DELETE, and others such as PATCH, are message categories that map to procedures, not only ways to perform CRUD operations on data, it makes for a lot more flexibility in how HTTP can be used.
For example, a POST method would always be used with the intention to create some entity resource, such as a product; that is, POST is in the procedure category of creation. Yet, there is no reason for the procedure to which that HTTP request is mapped to be named
postProduct()
or even the "obvious" createProduct()
. Instead, the procedure could be named something more in line with the language spoken by the business, such a catalogProduct()
. Here catalog is used as a verb; that is, our company is not in the business of creating new products. The products come to the company already created by a manufacturer. Our company sells products through catalogs. For us, products need to be cataloged (verb) for them to be accessible for purchase through a catalog (noun). With that in mind, establishing a new kind of catalog for a kind of product might be thought of as defining a catalog. Thus, the procedure would be named defineCatalog()
.Given this line of reasoning, the ways that HTTP methods are mapped are virtually unlimited, and all the while still honoring the REST way of doing things. The older SOAP protocol used to implement RPC-based APIs, was known to those who could cut through the jargon as "XML over HTTP." Thus, if XML over HTTP can be used to implement RPC, certainly it stands to reason that JSON over HTTP could be used in the same way.
The following sections demonstrate the flexibility of using HTTP as a means to support far more than CRUD. Avoid pedantic opinions that limit creativity. Such opinions are generally held by those who think only in terms of technology and expect the business to speak their language, which is composed mostly of CRUD and collection-oriented (add, insert, remove) terminology.
Consider the overall architecture of our HTTP server. It is highly Reactive by employing actors at every major operational junction.

A request is received and processed asynchronously from beginning through to response.
REST over HTTP is quite commonly employed to support user interfaces, service integrations, and even distribution of event streams. Of course, we don't suggest that your services should be primarily REST-based, we provide our HTTP server for when REST it is useful. Even RPC can be supported. See the above information box. Since REST over HTTP is the most common contemporary use case, the documentation mostly refers to REST rather than RPC, but everything herein is equally applicable to RPC using the style discussed in the above information box.
With any use of the XOOM HTTP server, which might be with a REST flavor as more so as RPC, the use of a few Domain-Driven Design patterns are common. Among the Context Mapping patterns, the Open-Host Service and Published Language patterns naturally support these API styles. The Open-Host Service is the well defined API that is open for public use, with the possible requirement of client credentials. The Published Language is the REST resources or RPC response values that are provided.
Our HTTP component can help you create REST-based services rapidly and with great simplicity. One glance at the REST request mappings to Java objects is all it takes to understand this. The following demonstrates both file-based and code-based fluent request handler mappings.
action.user.register.method = POST
action.user.register.uri = /users
action.user.register.to = register(body:sample.user.UserData userData)
action.user.contact.method = PATCH
action.user.contact.uri = /users/{userId}/contact
action.user.contact.to = changeContact(String userId, body:sample.user.ContactData contactData)
The above declarations are made in the
vlingo-http.properties
file. The following is the resource handler implementation to match it.public class UserResource extends ResourceHandler {
public void register(final UserData userData) {
final User user =
User.from(
Name.from(userData.nameData.given, userData.nameData.family),
Contact.from(userData.contactData.emailAddress, userData.contactData.telephoneNumber));
repository.save(user);
completes().with(Response.of(Created, headers(of(Location, userLocation(user.id))), serialized(UserData.from(user))));
}
public void changeContact(final String userId, final ContactData contactData) {
final User user = repository.userOf(userId);
if (user.doesNotExist()) {
completes().with(Response.of(NotFound, userLocation(userId)));
return;
}
final User changedUser = user.withContact(new Contact(contactData.emailAddress, contactData.telephoneNumber));
repository.save(changedUser);
completes().with(Response.of(Ok, serialized(UserData.from(changedUser))));
}
}
By means of the
ResourceHandler
base class, there are several Request parts available: URI, headers, body, and any query parameters. You may also access the default ContentType
, which may be overridden. See the following queryUsers()
handler method. public Completes<Response> queryUsers() {
final String page = context().request().queryParameters().valuesOf("page");
final String contentType = context().contentType();
...
}
The above is from file-based request/response resource mappings defined in the properties file
vlingo-http.properties
. Yet, our versatile API also supports fluent mappings in source code.public class UserResource {
...
public Resource<?> routes() {
return resource("User Resource",
post("/users")
.body(UserData.class)
.handle(this::register),
patch("/users/{userId}/contact")
.param(String.class)
.body(ContactData.class)
.handle(this::changeContact));
}
}
In the above example only the
routes()
is shown, and would replace the file-based route mappings previously shown. Also note in this example, when using the fluent routes API your resource class should not extend ResourceHandler
. Although you may do so, at time of execution its state will be hollow because the runtime does not support ResourceHandler
. Instead, to get similar base class state and behavior with dynamic resources created with the fluent API, use the optional DynamicResourceHandler
.public class UserResource extends DynamicResourceHandler {
private static final AtomicInteger nextId = new AtomicInteger(0);
private final int id;
public UserResource(final Stage stage) {
super(stage);
this.id = nextId.incrementAndGet();
logger().info("UserResource: Pooled instance created: " + this.id);
}
public Completes<Response> register(final UserData data) {
final String userType = context().request().queryParameters().valuesOf("userType");
final String contentType = context().contentType();
...
}
...
@Override
public Resource<?> routes() {
logger().info("UserResource: wiring resources for: " + this.id);
return resource("User Resource", this,
post("/users")
.body(UserData.class)
.handle(this::register),
patch("/users/{userId}/contact")
.param(String.class)
.body(ContactData.class)
.handle(this::changeContact));
}
}
By extending
DynamicResourceHandler
your resource may use the Stage
managing this resource, the Logger
and Scheduler
of that Stage
, and the Context
with the current Request
. The Request
provides various request parts such as the URI, headers, body, and any query parameters. You may also access the default ContentType
, which may be overridden. See the register(UserData)
handler method for an example accessing the query parameters and ContentType
.The concrete resource handler must provide a
Stage
to the DynamicResourceHandler
constructor. It is assumed that the concrete resource handler's constructor will take at least the Stage
as a parameter. The DynamicResourceHandler
declares the routes()
method abstract, so it must be overridden in the concrete extender. As you can see above, routes()
returns resource()
with an optional parameter of the DynamicResourceHandler
as thethis
object, which is to be given the current request Context
before each handler invocation.In the following sections you will learn how to quickly set up and start your server with resources handlers.
If you will use the file-based configuration you must create
vlingo-http.properties
. Otherwise you will write source code to define the server configuration. First the file-based configuration is explained, and then the source code and fluent API approach.Create the file
vlingo-http.properties
and place it into the project directory hierarchy used for resources. For Maven that would be src/main/resources
for production and if you are defining the properties for test, it would be src/test/resources
. In this file create the following properties. Your values may differ.#=====================================
# server
#=====================================
server.http.port = 8080
server.dispatcher.pool = 10
server.buffer.pool.size = 100
server.message.buffer.size = 65535
server.probe.interval = 10
server.probe.timeout = 2
server.processor.pool.size = 10
server.request.missing.content.timeout = 100
#=====================================
# generated resource dispatchers
#=====================================
resource.dispatcher.generated.sources.main = target/generated-sources/
resource.dispatcher.generated.sources.test = target/generated-test-sources/
resource.dispatcher.generated.classes.main = target/classes/
resource.dispatcher.generated.classes.test = target/test-classes/
This is the basic minimum configuration necessary to start the server. There are other properties that you will learn about later. The following summarizes these properties.
- 1.
server.http.port
: the socket port to be used by the server, which here is 8080. - 2.
server.dispatcher.pool
: the server uses a pool of actors to dispatch incoming requests asynchronous. Here the dispatcher pool size is 10. - 3.
server.buffer.pool.size
: used by the server to create a pool of reusable ByteBuffer instances to use for incoming requests and outgoing resources. There will be at least 100 buffers in this pool. Yet, the elastic pool design enables it to grow dynamically under heavy load and contract back down to 100 buffers as load diminishes. - 4.
server.message.buffer.size
: used by the server when creating the buffer pool to allocate each buffer with this many bytes. Here the buffers will each be 65,535 bytes. (a) This does not limit the overall size of a given incoming message because these may be read in chunks and span multiple buffers. (b) This also does not limit the size of outgoing responses because a non-pooled buffer will be temporarily allocated to send payloads larger than this maximum. Serving larger responses should be the exception rather than common, otherwise performance will suffer. If a common occurrence consider setting this buffer limit to the largest common payload size. - 5.
server.probe.interval
: determines the number of milliseconds between each socket channel probe to receive new connections and requests, and to send new responses. Be careful with this value as various O/S and JDKs deal differently with intervals, possibly being too fast to too slow. - 6.
server.probe.timeout
: the amount of time in milliseconds that the socket channel probe will wait for new connections and requests, and to check for writable status used for sending responses. Be careful with this value as when there are no new requests, it causes the actor's thread to block inside the socket channel probe until this timeout is reached. - 7.
server.processor.pool.size
: used by the server to size the pool of socket channel processors. Here the processor pool size is 10. There will be 10 total actors created and used in round-robin order as new connections are accepted, which will each read from and write to every newly accepted client socket channel connection. - 8.
server.request.missing.content.timeout
: as previously indicated, very large incoming request messages of byte length greater thanserver.message.buffer.size
will require spanning two or more total buffers. This value indicates how long the incomplete message will be retained in the server before it is considered a bad request (missing bytes). This example indicates that such an incomplete request may be retained for a maximum of 100 milliseconds in anticipation of remaining bytes being received. - 9.
resource.dispatcher.generated.sources.???
: These four properties define where generated source code is to be saved as Java source files and where class files are save after dynamic compilation. This example uses the Maven target layout and defines both main and test areas.
In addition to this, it only makes sense to include the description of at least one resource handler (a.k.a. endpoint or controller).
#=====================================
# user resources
#=====================================
resource.name.user = [register, contact, name, queryUser, queryUsers, queryUserError]
resource.user.handler = io.vlingo.xoom.http.sample.user.UserResource
resource.user.pool = 10
resource.user.disallowPathParametersWithSlash = true
action.user.register.method = POST
action.user.register.uri = /users
action.user.register.to = register(body:io.vlingo.xoom.http.sample.user.UserData userData)
action.user.contact.method = PATCH
action.user.contact.uri = /users/{userId}/contact
action.user.contact.to = changeContact(String userId, body:io.vlingo.xoom.http.sample.user.ContactData contactData)
action.user.name.method = PATCH
action.user.name.uri = /users/{userId}/name
action.user.name.to = changeName(String userId, body:io.vlingo.xoom.http.sample.user.NameData nameData)
action.user.queryUser.method = GET
action.user.queryUser.uri = /users/{userId}
action.user.queryUser.to = queryUser(String userId)
action.user.queryUsers.method = GET
action.user.queryUsers.uri = /users
action.user.queryUsers.to = queryUsers()
action.user.queryUserError.method = GET
action.user.queryUserError.uri = /user/{userId}/error
action.user.queryUserError.to = queryUserError(String userId)
This is a more complete resource routing definition. The properties are summarized next.
- 1.
resource.name.{name}
: The first property of any resource handler is the name, which above isresource.name.user
. This says, here is a resource nameduser
. The property's value is an array of method names that are used as individual route request handlers. Each of these names must have a corresponding set ofaction.{name}...
properties, which are documented in a following numbered descriptions. If you do not list a name in this array for every route request handler, the corresponding handler action definition will not be found for the missing name. - 2.All properties associated with the named resource are in the form
resource.name.property
, such asresource.user.handler
. - 3.
resource.{name}.handler
: the fully-qualified class name of the concreteResourceHandler
extender. The server will dispatch to actions matching the URI patterns to the methods in thisResourceHandler
. The above example references theUserResource
, but along with it's package name. - 4.
resource.{name}.pool
: used to create a pool of actors forResourceHandler
instances of the type defined by theresource.{name}.handler
property. Individual requests are handled in a round-robin fashion. In this example theresource.user.pool
defines a pool size of 10. Note that this pool is created for eachserver.dispatcher.pool
, meaning that there will be a total ofserver.dispatcher.pool
*resource.name.pool
actors to handle requests to this named resource. - 5.
resource.{name}.disallowPathParametersWithSlash
: deprecated and must always betrue
. Above is for the resource nameduser
, as inresource.user.disallow...
- 6.
action.{name}.{endpoint}.method
: used to define the HTTP method to be used to make a matching request to the given URI. Here{name}
and{endpoint}
are placeholders for the actual names. In the first example above{name}
isuser
and the{endpoint}
method isregister
. Note that this property's value may be one of:POST
,GET
,PUT
,PATCH
,DELETE
,HEAD
,TRACE
,OPTIONS
, orCONNECT
. - 7.
action.{name}.{endpoint}.uri
: used to define the URI that will map to the given endpoint. Here{name}
and{endpoint}
are placeholders for the actual names. In the first example above{name}
isuser
and the{endpoint}
method isregister
. Path parameters, if any, are surrounded by curly braces, such as in/users/{userId}/contact
, where the parameter name isuserId
. In the actual URI, userId must be replaced with some sort of identity, such as may be mapped to aString
value. - 8.
action.{name}.{endpoint}.to
: used to define the method name and signature to which this route mapping will dispatch to on the givenresource.{name}.handler
class instance. Here{name}
and{endpoint}
are placeholders for the actual names. In the first example above{name}
isuser
and the{endpoint}
method isregister
. When the HTTP method isPOST
and the URI is/users
then the match will route to the given Java method, such asregister(UserData userData)
. In the above example thebody:
keyword indicates that theUserData
will be found in the request body. Depending on the content type, which is by default JSON, the body will be automatically deserialized into an instance of the given type, such asUserData
. Note that when the URI contains one or more path parameters, the{paramName}
will be mapped to the matching Java method parameter with the same name, and automatically deserialized into the given Java parameter type. In the above example the/users/{userId}/contact
maps to the Java method parameterString userId
.
When using the file-based configuration there is no limit to the number of path parameters. However, the greater the number of parameters the more complex the Java method will be to create and maintain.
It is quite simple to start a server from file-based configuration.
final World world = World.startWithDefaults("server");
final Server server = Server.startWith(world.stage());
Using the above examples, this
Server
is started on port 8080 and has a single resource handler, UserResource
. The remaining properties are applied to the server as previously explained.The
ResourceHandler
and DynamicResourceHandler
base classes provide access to several request parts, such as headers, query parameters, and other environmental objects.Type Accessor | Description |
Context context() | The Context type provides numerous details. This contains the Request , which in turn contains Method , URL , Version , Headers , and Body . This gives you access to all parts of the request. |
ContentType contentType() | The details about the content type of the request. (only on ResourceHandler ) |
Logger logger() | The means to log debug, error, and informational messages. |
Scheduler scheduler() | Use to manage the scheduling of future tasks. |
Stage stage() | The Stage of this request. |
The following provides a very simple example of source code configuration along with the XOOM HTTP fluent route mapping API. This provides the minimum code to create an endpoint that responds with
"Hello, World!"
. Both Java and Kotlin examples are available. The tutorial descriptions refer to the Java code.Create a new project with your favorite editor/IDE and create a
Bootstrap
class with the following content.Java
Kotlin
Bootstrap.java
import io.vlingo.xoom.actors.World;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.http.Response;
import io.vlingo.xoom.http.resource.Configuration.Sizing;
import io.vlingo.xoom.http.resource.Configuration.Timing;
import io.vlingo.xoom.http.resource.Resource;
import io.vlingo.xoom.http.resource.Resources;
import io.vlingo.xoom.http.resource.Server;
import static io.vlingo.xoom.http.resource.ResourceBuilder.get;
import static io.vlingo.xoom.http.resource.ResourceBuilder.resource;
public class Bootstrap {
private final static int PORT = 8080;
private static Bootstrap instance;
public final Server server;
public final World world;
private Bootstrap() {
this.world = World.startWithDefaults("hello world example java");
final Resources resources = Resources.are(helloWorldResource());
this.server =
Server.startWith(
world.stage(),
resources,
PORT,
Sizing.define(),
Timing.define());
}
private Resource helloWorldResource() {
return resource("Hello World",
get("/helloworld")
.handle(() -> withSuccess(of(Ok, "Hello, World!")))
);
}
public static final Bootstrap instance() {
if (instance == null) {
instance = new Bootstrap();
}
return instance;
}
public static void main(final String[] args) throws Exception {
System.out.println("=========================================");
System.out.println("service: started at http://localhost:" + Bootstrap.PORT);
System.out.println("try: curl http://localhost:" + Bootstrap.PORT + "/helloworld");
System.out.println("=========================================");
Bootstrap.instance();
}
}
Bootstrap.kt
import io.vlingo.actors.World
import io.vlingo.common.Completes.withSuccess
import io.vlingo.http.Response.of
import io.vlingo.http.Response.Status.Ok
import io.vlingo.http.resource.Configuration.Sizing
import io.vlingo.http.resource.Configuration.Timing
import io.vlingo.http.resource.Resource
import io.vlingo.http.resource.ResourceBuilder.get
import io.vlingo.http.resource.ResourceBuilder.resource
import io.vlingo.http.resource.Resources
import io.vlingo.http.resource.Server
class Bootstrap {
private val world = World.startWithDefaults("hello world example kotlin")
private val server: Server
init {
val resources = Resources.are(helloWorldResource())
server = Server.startWith(world.stage(),
resources,
PORT,
Sizing.define(),
Timing.define())
}
private fun helloWorldResource(): Resource<*> {
return resource("hello world resource",
get("/helloworld")
.handle { withSuccess(of(Ok, "Hello World")) })
}
companion object {
const val PORT = 8080
}
}
fun main(args: Array<String>) {
println("=========================================")
println("service: started at http://localhost:" + Bootstrap.PORT)
println("check out http://localhost:" + Bootstrap.PORT + "/helloworld")
println("=========================================")
Bootstrap()
}
Compile and run the server by means of the Java
main()
method. Check the results by running the following command-line.> curl http://localhost:8080/helloworld
The
curl
command should print the following text into the terminal window: Hello, World!
The
Server
logs informational output by means of the World
standard Logger
configuration.The
Resources
(23) is the set of HTTP endpoints, which in this case is only one. The Server
instance (25) is where the Resources
are held and used to match and dispatch on requests. To start a Server
you simply provide a Stage
and the Resources
. When created, the Server
starts listening on port 8080 for HTTP requests.Sizing
(30) is the configuration parameter with the processor pool size, dispatcher pool size, max buffer pool size, and max message size. Timing
(31) is the configuration with the probe interval and timeout parameters. For now, we use the default configuration.The next sections use the previous code as starting point.
There are some additional features available through configuration: static file resources, server-sent events, and feed resources.
Static file resources are ordinary content in disk files, such as HTML, images, and video. To serve static file resources use the
vlingo-http.properties
file-based configuration.#=====================================
# static file resources
#=====================================
static.files.resource.pool = 5
static.files.resource.root = /siteroot/content
static.files.resource.subpaths = [/, /css, /js, /views]
This configuration auto-creates
io.vlingo.xoom.http.resource.StaticFilesResource
that serves files from the resource root /siteroot/content
directory structure. Any request URI that begins with the static.files.resource.subpaths
list will be served.Virtual URI | Physical URI |
/{file} | /siteroot/content/{file} |
/css/{file} | /siteroot/content/css/{file} |
/js/{file} | /siteroot/content/js/{file} |
/views/{file} | /siteroot/content/views/{file} |
You may create a pre-packaged SSE resource using the following configuration.
#=====================================
# server-sent events
#=====================================
sse.stream.name.all = /eventstreams/all
sse.stream.all.feed.class = io.vlingo.xoom.http.sample.user.AllSseFeedActor
sse.stream.all.feed.payload = 50
sse.stream.all.feed.interval = 1000
sse.stream.all.feed.default.id = -1
sse.stream.all.pool = 10
This enables clients to register for a long-lasting stream of events from the server. The URI used by clients is the value of
sse.stream.name.{name}
. In the above example the name is all
and the URL is /eventstreams/all
.The
sse.stream.all.feed.class
is provided by the custom service/application, and in this case is io.vlingo.xoom.http.sample.user.AllSseFeedActor
. The fully-qualified class name must be given. There may be up to 50 events in a single feed (one send to a client) with an interval of 1,000 milliseconds between feeds. If the client does not provide an id for the starting event, the default.id
is used. In this case it is -1. There will be 10 total feed instances created in the pool.A client makes a request to subscribe, such as the following.
Client client = Client.using(Configuration.defaultedKeepAliveExceptFor(...), ...);
Request subscribe =
Request
.method(Method.GET)
.uri("/eventstreams/all")
.and(RequestHeader.host("StreamsRUs.co"))
.and(RequestHeader.accept("text/event-stream;charset=utf-8"));
client
.requestWith(subscribe)
.andThenConsume(response -> {
switch (response.status) {
case Ok:
processEvents(response);
break;
default:
logger().error("Unexpected: " + response.status);
break;
}
})
.repeat();
In the above example the asynchronous
io.vlingo.xoom.http.resource.Client
will continue to receive responses as feeds occur because it is configured for keep-alive mode and tells the Completes<Response>
to repeat()
after every feed is received.The following is a skeleton of class
AllSseFeedActor
.public class AllSseFeedActor extends Actor implements SseFeed {
public AllSseFeedActor(final String streamName, final int feedPayload, final String feedDefaultId) {
...
}
@Override
public void to(final Collection<SseSubscriber> subscribers) {
...
}
}
In the above example the custom feed actor is class
AllSseFeedActor
, which must extend Actor
and implement io.vlingo.xoom.http.resource.sse.SseFeed
. See the io.vlingo.xoom.http.resource.sse.SseStreamResource
that manages the feed generation process, using the custom SseFeed
actor when needed.The feed must follow the SSE standard definition. The following is one example, but not the only possibility.
event: SecurityTokenIssued
data: {"username": "jclifford", "token": "je;se9727anndnds!@"}
event: SecurityTokenIssued
data: {"username": "l.mary", "token": "l9928**)^^322nand$"}
event: SecurityTokenIssued
data: {"username": "camerontyrone", "token": "pdpjehrhfks'//&dh+a"}
There is a complete example in the
xoom-http-frontservice
and xoom-http-backservice
in the Github repository vlingo/xoom-examples.There is another kind of feed resource, one that is not a SSE stream, but has similar characteristics. It is a stream of any kind and defines its own response body payload. You might think of such as Atom feeds, or similar.
#=====================================
# feed resources
#=====================================
feed.resource.name.events = /feeds/events
feed.resource.events.producer.class = io.vlingo.xoom.http.resource.feed.EventsFeedProducerActor
feed.resource.events.elements = 20
feed.resource.events.pool = 10
The feed has a name, in this case
feed.resource.name.events
, which is the URI /feeds/events
. Clients may request a feed region by providing an id. The ...elements
property indicates the maximum number of 20 elements may be in a given feed. There will be 10 total feed instances created in the pool.public class EventsFeedProducerActor extends Actor implements FeedProducer {
...
@Override
public void produceFeedFor(final FeedProductRequest request) {
...
}
}
In the above example the custom feed actor is class
EventsFeedProducerActor
, which must extend Actor
and implement io.vlingo.xoom.http.resource.feed.FeedProducer
. The format of the feed itself is not standardized, but may be JSON, XML, or otherwise follow the Atom standard. The EventsFeedProducerActor
itself may be backed by the XOOM Lattice exchange feed type, which may stream from a XOOM Symbio Journal
, or any kind of EntryReader
for the storage types ObjectStore
and StateStore
.package io.vlingo.xoom.lattice.exchange.feed.Feed;
import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.Stage;
import io.vlingo.xoom.symbio.Entry;
import io.vlingo.xoom.symbio.Source;
import io.vlingo.xoom.symbio.store.EntryReader;
/**
* Provides support utilities for {@code Feed} and related types.
* Every {@code Feed} has an {@code exchangeName}.
*/
public interface Feed {
/** The default number of messages per feed. */
static final int DefaultMessagesPerFeedItem = 20;
/**
* Answer a new {@code Feed} with the given properties.
* @param stage the Stage used to create my Feeder
* @param exchangeName the String name of my exchange
* @param feederType the Actor type of my Feeder
* @param entryReaderType the EntryReader that my Feeder uses
* @return Feed
*/
static Feed defaultFeedWith(final Stage stage, final String exchangeName, final Class<? extends Actor> feederType, final EntryReader<?> entryReaderType) {
return new DefaultFeed(stage, exchangeName, feederType, entryReaderType);
}
...
}
The above
DefaultFeed
is a factory for actor-based Feeder
types. Invoking the Feed::feeder()
will return a new actor-based Feeder
of feederType
with the given entryReaderType
.There is a default
Feeder
, the TextEntryReaderFeeder
, that can consume text entries from any XOOM Symbio EntryReader
implementation and produce simple feeds.package io.vlingo.xoom.lattice.exchange.feed;
import java.util.ArrayList;
import java.util.List;
import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.symbio.BaseEntry.TextEntry;
import io.vlingo.xoom.symbio.store.EntryReader;
/**
* The {@code Feeder} serving {@code TextEntry} instances.
*/
public class TextEntryReaderFeeder extends Actor implements Feeder {
private final EntryReader<TextEntry> entryReader;
private final Feed feed;
/**
* Construct my default state.
* @param feed the Feed that I serve
* @param entryReader the {@code EntryReader<TextEntry>} from which content is read
*/
public TextEntryReaderFeeder(final Feed feed, final EntryReader<TextEntry> entryReader) {
this.feed = feed;
this.entryReader = entryReader;
}
/**
* @see io.vlingo.xoom.lattice.exchange.feed.Feeder#feedItemTo(io.vlingo.lattice.exchange.feed.FeedItemId, io.vlingo.lattice.exchange.feed.FeedConsumer)
*/
@Override
public void feedItemTo(final FeedItemId feedItemId, final FeedConsumer feedInterest) {
final long feedId = feedItemId.toLong();
final long id = (feedId - 1L) * feed.messagesPerFeedItem() + 1;
entryReader
.readNext(String.valueOf(id), feed.messagesPerFeedItem())
.andThen(entries -> {
feedInterest.consumeFeedItem(toFeedItem(feedItemId, entries));
return entries;
});
}
/**
* Answer a new {@code FeedItem} from converted {@code entries}.
* @param feedItemId the FeedItemId of the current item
* @param entries the List<TextEntry> to convert
* @return FeedItem
*/
private FeedItem toFeedItem(final FeedItemId feedItemId, final List<TextEntry> entries) {
final List<FeedMessage> messages = new ArrayList<>(entries.size());
for (final TextEntry entry : entries) {
final FeedMessageBody body = FeedMessageBody.with(entry.entryData());
final FeedMessage message = FeedMessage.with(entry.id(), body, entry.typeName(), entry.typeVersion());
messages.add(message);
}
if (feed.messagesPerFeedItem() == entries.size()) {
return FeedItem.archivedFeedItemWith(feedItemId, feedItemId.next(), feedItemId.previous(), messages);
} else {
return FeedItem.currentFeedWith(feedItemId, feedItemId.previous(), messages);
}
}
}
It should be clear how you can stitch together a number of feeds to provide HTTP feeds of event streams:
Response <- http-Feed <- lattice-Feed <- symbio-EntryReader
You can register filters with the
Server
for both requests and responses. The following is an example of how to start the Server with any number and type of filters:Filters filters =
Filters.are(
Arrays.asList(new DateRequestFilter()),
Arrays.asList(new DateResponseFilter()));
Server server =
Server.startWith(
world.stage(),
resources,
filters,
PORT,
Sizing.define(),
Timing.define());
The following is an example of a
RequestFilter
that is used to ensure that every request has a Date
header:public class DateHeaderFilter extends RequestFilter {
@Override
public Tuple2<Request, Boolean> filter(final Request request) {
if (request.headerValueOr(RequestHeader.Date, null) == null) {
request.header(RequestHeader.Date, Instant.now().toString());
}
return Tuple2.from(request, true);
}
}
The return type of
Tuple2<Request, Boolean>
provides both the Request
results of the filter and a Boolean
to indicate whether the filter chain should continue (true
) or short circuit (false
).The following
ResponseFilter
does the same as the previous, but for responses:public class DateHeaderFilter extends ResponseFilter {
@Override
public Tuple2<Request, Boolean> filter(final Response response) {
if (response.headerValueOr(ResponseHeader.Date, null) == null) {
response.header(ResponseHeader.Date, Instant.now().toString());
}
return Tuple2.from(response, true);
}
}
The return type is the same as for the
RequestFilter
and has the same meaning.There is a special
ResponseFilter
that supports CORS request-response from cross-origin clients:final CORSResponseFilter filter = new CORSResponseFilter();
final List<ResponseHeader> headers =
Arrays.asList(
ResponseHeader.of(ResponseHeader.AccessControlAllowOrigin, "*"),
ResponseHeader.of(ResponseHeader.AccessControlAllowHeaders, "Content-Type, Content-Length"),
ResponseHeader.of(ResponseHeader.AccessControlAllowMethods, "POST,GET"));
filter.originHeadersFor(headerAcceptOriginAny, headers);
final Filters filters = Filters.are(Filters.noRequestFilters(), Arrays.asList(filter));
Server server =
Server.startWith(
world.stage(),
resources,
filters,
PORT,
Sizing.define(),
Timing.define());
Register the kinds of access control headers that are supported by the receiving application. There must always be an
Access-Control-Allow-Origin
header, which is used to match that sent by the client agent. The matching string may be "*"
for any origin or some well-know URI of a acceptable application. Other headers are as follows, and each has a corresponding constant in class ResponseHeader
:Header | Constant in Class ResponseHeader |
Access-Control-Allow-Origin | AccessControlAllowOrigin |
Access-Control-Allow-Credentials | AccessControlAllowCredentials |
Access-Control-Expose-Headers | AccessControlExposeHeaders |
Access-Control-Max-Age | AccessControlMaxAge |
Access-Control-Allow-Methods | AccessControlAllowMethods |
Access-Control-Allow-Headers | AccessControlAllowHeaders |
As noted earlier, XOOM HTTP provides a special
Client
used for making HTTP requests and reacting to responses as they happen. Being reactive, this Client
is a non-blocking tool. The above example showed how to use a reactive pipeline to both request and reactive to the eventual response. Here is a similar example that could be used for GET
requests and responses:import io.vlingo.xoom.http.resource.Client;
...
Catalog catalog = Catalog.of(CatalogType.SummerOffers);
Client client = Client.using(configuration, clientConsumerType, poolSize);
client
.requestWith(
Request
.method(Method.GET)