HTTP (REST and RPC)

The embeddable HTTP server for Reactive REST backed by XOOM Actors and XOOM Wire.

Reactive REST

The XOOM HTTP component supports reactive, scalable, and resilient HTTP servers and REST services running as a small, fast, embedded server. Thus, this component does not run standalone, but is meant to provide very lightweight and high-performing HTTP support within a application- or microservice-based runtime, such as a Bounded Context. The server supports REST and RPC dispatching to implementations of fluent APIs.

It is common for developers to think of REST over HTTP in terms of CRUD because the primary methods provided by the protocol are: POST, GET, PUT, and DELETE. If taken at face value, these verbs are CRUD through and through. Yet, almost no one uses the methods to map directly from browser to a database. Most contemporary applications instead map the HTTP methods to procedures (i.e. a method on a Java or C# class) within the application. Those procedures are responsible for adapting path parameters, headers, and the request body to a form that can be consumed by the inner application components.

The procedure to which the HTTP request is mapped need not represent a CRUD operation at all. When you consider that POST, GET, PUT, DELETE, and others such as PATCH, are message categories that map to procedures, not only ways to perform CRUD operations on data, it makes for a lot more flexibility in how HTTP can be used.

For example, a POST method would always be used with the intention to create some entity resource, such as a product; that is, POST is in the procedure category of creation. Yet, there is no reason for the procedure to which that HTTP request is mapped to be named postProduct() or even the "obvious" createProduct(). Instead, the procedure could be named something more in line with the language spoken by the business, such a catalogProduct(). Here catalog is used as a verb; that is, our company is not in the business of creating new products. The products come to the company already created by a manufacturer. Our company sells products through catalogs. For us, products need to be cataloged (verb) for them to be accessible for purchase through a catalog (noun). With that in mind, establishing a new kind of catalog for a kind of product might be thought of as defining a catalog. Thus, the procedure would be named defineCatalog().

Given this line of reasoning, the ways that HTTP methods are mapped are virtually unlimited, and all the while still honoring the REST way of doing things. The older SOAP protocol used to implement RPC-based APIs, was known to those who could cut through the jargon as "XML over HTTP." Thus, if XML over HTTP can be used to implement RPC, certainly it stands to reason that JSON over HTTP could be used in the same way.

The following sections demonstrate the flexibility of using HTTP as a means to support far more than CRUD. Avoid pedantic opinions that limit creativity. Such opinions are generally held by those who think only in terms of technology and expect the business to speak their language, which is composed mostly of CRUD and collection-oriented (add, insert, remove) terminology.

Consider the overall architecture of our HTTP server. It is highly Reactive by employing actors at every major operational junction.

REST over HTTP is quite commonly employed to support user interfaces, service integrations, and even distribution of event streams. Of course, we don't suggest that your services should be primarily REST-based, we provide our HTTP server for when REST it is useful. Even RPC can be supported. See the above information box. Since REST over HTTP is the most common contemporary use case, the documentation mostly refers to REST rather than RPC, but everything herein is equally applicable to RPC using the style discussed in the above information box.

Use With Domain-Driven Design Context Mapping Patterns

With any use of the XOOM HTTP server, which might be with a REST flavor as more so as RPC, the use of a few Domain-Driven Design patterns are common. Among the Context Mapping patterns, the Open-Host Service and Published Language patterns naturally support these API styles. The Open-Host Service is the well defined API that is open for public use, with the possible requirement of client credentials. The Published Language is the REST resources or RPC response values that are provided.

Overview

Our HTTP component can help you create REST-based services rapidly and with great simplicity. One glance at the REST request mappings to Java objects is all it takes to understand this. The following demonstrates both file-based and code-based fluent request handler mappings.

action.user.register.method = POST
action.user.register.uri = /users
action.user.register.to = register(body:sample.user.UserData userData)

action.user.contact.method = PATCH
action.user.contact.uri = /users/{userId}/contact
action.user.contact.to = changeContact(String userId, body:sample.user.ContactData contactData)

The above declarations are made in the vlingo-http.properties file. The following is the resource handler implementation to match it.

public class UserResource extends ResourceHandler {
  public void register(final UserData userData) {
    final User user =
            User.from(
                    Name.from(userData.nameData.given, userData.nameData.family),
                    Contact.from(userData.contactData.emailAddress, userData.contactData.telephoneNumber));

    repository.save(user);
    
    completes().with(Response.of(Created, headers(of(Location, userLocation(user.id))), serialized(UserData.from(user))));
  }

  public void changeContact(final String userId, final ContactData contactData) {
    final User user = repository.userOf(userId);
    if (user.doesNotExist()) {
      completes().with(Response.of(NotFound, userLocation(userId)));
      return;
    }
   
    final User changedUser = user.withContact(new Contact(contactData.emailAddress, contactData.telephoneNumber));
    
    repository.save(changedUser);
   
    completes().with(Response.of(Ok, serialized(UserData.from(changedUser))));
  }
}

By means of the ResourceHandler base class, there are several Request parts available: URI, headers, body, and any query parameters. You may also access the default ContentType, which may be overridden. See the following queryUsers() handler method.

  public Completes<Response> queryUsers() {
    final String page = context().request().queryParameters().valuesOf("page");
    final String contentType = context().contentType();
    ...
  }

The above is from file-based request/response resource mappings defined in the properties filevlingo-http.properties. Yet, our versatile API also supports fluent mappings in source code.

public class UserResource {
  ...
  public Resource<?> routes() {
    return resource("User Resource",
      post("/users")
        .body(UserData.class)
        .handle(this::register),
      patch("/users/{userId}/contact")
        .param(String.class)
        .body(ContactData.class)
        .handle(this::changeContact));
  }
}

In the above example only the routes() is shown, and would replace the file-based route mappings previously shown. Also note in this example, when using the fluent routes API your resource class should not extend ResourceHandler. Although you may do so, at time of execution its state will be hollow because the runtime does not support ResourceHandler. Instead, to get similar base class state and behavior with dynamic resources created with the fluent API, use the optional DynamicResourceHandler.

public class UserResource extends DynamicResourceHandler {
  private static final AtomicInteger nextId = new AtomicInteger(0);
  private final int id;
  
  public UserResource(final Stage stage) {
    super(stage);
    
    this.id = nextId.incrementAndGet();
    
    logger().info("UserResource: Pooled instance created: " + this.id);
  }
  
  public Completes<Response> register(final UserData data) {
    final String userType = context().request().queryParameters().valuesOf("userType");
    final String contentType = context().contentType();
    ...
  }
  
  ...
  @Override
  public Resource<?> routes() {
    logger().info("UserResource: wiring resources for: " + this.id);
    
    return resource("User Resource", this,
      post("/users")
        .body(UserData.class)
        .handle(this::register),
      patch("/users/{userId}/contact")
        .param(String.class)
        .body(ContactData.class)
        .handle(this::changeContact));
  }
}

By extending DynamicResourceHandler your resource may use the Stage managing this resource, the Logger and Scheduler of that Stage, and the Context with the current Request. The Request provides various request parts such as the URI, headers, body, and any query parameters. You may also access the default ContentType, which may be overridden. See the register(UserData) handler method for an example accessing the query parameters and ContentType.

Query parameters may be mapped more naturally using the fluent API as seen below.

The concrete resource handler must provide a Stage to the DynamicResourceHandler constructor. It is assumed that the concrete resource handler's constructor will take at least the Stage as a parameter. The DynamicResourceHandler declares the routes() method abstract, so it must be overridden in the concrete extender. As you can see above, routes() returns resource()with an optional parameter of the DynamicResourceHandler as thethis object, which is to be given the current request Context before each handler invocation.

In the following sections you will learn how to quickly set up and start your server with resources handlers.

Setting Up the Server

If you will use the file-based configuration you must create vlingo-http.properties. Otherwise you will write source code to define the server configuration. First the file-based configuration is explained, and then the source code and fluent API approach.

File-Based Configuration

Create the file vlingo-http.properties and place it into the project directory hierarchy used for resources. For Maven that would be src/main/resources for production and if you are defining the properties for test, it would be src/test/resources. In this file create the following properties. Your values may differ.

#=====================================
# server
#=====================================

server.http.port = 8080
server.dispatcher.pool = 10
server.buffer.pool.size = 100
server.message.buffer.size = 65535
server.probe.interval = 10
server.probe.timeout = 2
server.processor.pool.size = 10
server.request.missing.content.timeout = 100

#=====================================
# generated resource dispatchers
#=====================================

resource.dispatcher.generated.sources.main = target/generated-sources/
resource.dispatcher.generated.sources.test = target/generated-test-sources/
resource.dispatcher.generated.classes.main = target/classes/
resource.dispatcher.generated.classes.test = target/test-classes/

This is the basic minimum configuration necessary to start the server. There are other properties that you will learn about later. The following summarizes these properties.

  1. server.http.port: the socket port to be used by the server, which here is 8080.

  2. server.dispatcher.pool: the server uses a pool of actors to dispatch incoming requests asynchronous. Here the dispatcher pool size is 10.

  3. server.buffer.pool.size: used by the server to create a pool of reusable ByteBuffer instances to use for incoming requests and outgoing resources. There will be at least 100 buffers in this pool. Yet, the elastic pool design enables it to grow dynamically under heavy load and contract back down to 100 buffers as load diminishes.

  4. server.message.buffer.size: used by the server when creating the buffer pool to allocate each buffer with this many bytes. Here the buffers will each be 65,535 bytes. (a) This does not limit the overall size of a given incoming message because these may be read in chunks and span multiple buffers. (b) This also does not limit the size of outgoing responses because a non-pooled buffer will be temporarily allocated to send payloads larger than this maximum. Serving larger responses should be the exception rather than common, otherwise performance will suffer. If a common occurrence consider setting this buffer limit to the largest common payload size.

  5. server.probe.interval: determines the number of milliseconds between each socket channel probe to receive new connections and requests, and to send new responses. Be careful with this value as various O/S and JDKs deal differently with intervals, possibly being too fast to too slow.

  6. server.probe.timeout: the amount of time in milliseconds that the socket channel probe will wait for new connections and requests, and to check for writable status used for sending responses. Be careful with this value as when there are no new requests, it causes the actor's thread to block inside the socket channel probe until this timeout is reached.

  7. server.processor.pool.size: used by the server to size the pool of socket channel processors. Here the processor pool size is 10. There will be 10 total actors created and used in round-robin order as new connections are accepted, which will each read from and write to every newly accepted client socket channel connection.

  8. server.request.missing.content.timeout: as previously indicated, very large incoming request messages of byte length greater than server.message.buffer.size will require spanning two or more total buffers. This value indicates how long the incomplete message will be retained in the server before it is considered a bad request (missing bytes). This example indicates that such an incomplete request may be retained for a maximum of 100 milliseconds in anticipation of remaining bytes being received.

  9. resource.dispatcher.generated.sources.???: These four properties define where generated source code is to be saved as Java source files and where class files are save after dynamic compilation. This example uses the Maven target layout and defines both main and test areas.

In addition to this, it only makes sense to include the description of at least one resource handler (a.k.a. endpoint or controller).

#=====================================
# user resources
#=====================================

resource.name.user = [register, contact, name, queryUser, queryUsers, queryUserError]

resource.user.handler = io.vlingo.xoom.http.sample.user.UserResource
resource.user.pool = 10
resource.user.disallowPathParametersWithSlash = true

action.user.register.method = POST
action.user.register.uri = /users
action.user.register.to = register(body:io.vlingo.xoom.http.sample.user.UserData userData)

action.user.contact.method = PATCH
action.user.contact.uri = /users/{userId}/contact
action.user.contact.to = changeContact(String userId, body:io.vlingo.xoom.http.sample.user.ContactData contactData)

action.user.name.method = PATCH
action.user.name.uri = /users/{userId}/name
action.user.name.to = changeName(String userId, body:io.vlingo.xoom.http.sample.user.NameData nameData)

action.user.queryUser.method = GET
action.user.queryUser.uri = /users/{userId}
action.user.queryUser.to = queryUser(String userId)

action.user.queryUsers.method = GET
action.user.queryUsers.uri = /users
action.user.queryUsers.to = queryUsers()

action.user.queryUserError.method = GET
action.user.queryUserError.uri = /user/{userId}/error
action.user.queryUserError.to = queryUserError(String userId)

This is a more complete resource routing definition. The properties are summarized next.

  1. resource.name.{name}: The first property of any resource handler is the name, which above is resource.name.user. This says, here is a resource named user. The property's value is an array of method names that are used as individual route request handlers. Each of these names must have a corresponding set of action.{name}... properties, which are documented in a following numbered descriptions. If you do not list a name in this array for every route request handler, the corresponding handler action definition will not be found for the missing name.

  2. All properties associated with the named resource are in the form resource.name.property, such as resource.user.handler.

  3. resource.{name}.handler: the fully-qualified class name of the concrete ResourceHandler extender. The server will dispatch to actions matching the URI patterns to the methods in this ResourceHandler. The above example references the UserResource, but along with it's package name.

  4. resource.{name}.pool: used to create a pool of actors for ResourceHandler instances of the type defined by the resource.{name}.handler property. Individual requests are handled in a round-robin fashion. In this example the resource.user.pool defines a pool size of 10. Note that this pool is created for each server.dispatcher.pool, meaning that there will be a total of server.dispatcher.pool * resource.name.pool actors to handle requests to this named resource.

  5. resource.{name}.disallowPathParametersWithSlash: deprecated and must always be true. Above is for the resource named user, as in resource.user.disallow...

  6. action.{name}.{endpoint}.method: used to define the HTTP method to be used to make a matching request to the given URI. Here {name} and {endpoint} are placeholders for the actual names. In the first example above {name} is user and the {endpoint} method is register. Note that this property's value may be one of: POST, GET, PUT, PATCH, DELETE, HEAD, TRACE, OPTIONS, or CONNECT.

  7. action.{name}.{endpoint}.uri: used to define the URI that will map to the given endpoint. Here {name} and {endpoint} are placeholders for the actual names. In the first example above {name} is user and the {endpoint} method is register. Path parameters, if any, are surrounded by curly braces, such as in /users/{userId}/contact, where the parameter name is userId. In the actual URI, userId must be replaced with some sort of identity, such as may be mapped to a String value.

  8. action.{name}.{endpoint}.to: used to define the method name and signature to which this route mapping will dispatch to on the given resource.{name}.handler class instance. Here {name} and {endpoint} are placeholders for the actual names. In the first example above {name} is user and the {endpoint} method is register. When the HTTP method is POST and the URI is /users then the match will route to the given Java method, such as register(UserData userData). In the above example the body: keyword indicates that the UserData will be found in the request body. Depending on the content type, which is by default JSON, the body will be automatically deserialized into an instance of the given type, such as UserData. Note that when the URI contains one or more path parameters, the {paramName} will be mapped to the matching Java method parameter with the same name, and automatically deserialized into the given Java parameter type. In the above example the /users/{userId}/contact maps to the Java method parameter String userId.

When using the file-based configuration there is no limit to the number of path parameters. However, the greater the number of parameters the more complex the Java method will be to create and maintain.

It is quite simple to start a server from file-based configuration.

final World world = World.startWithDefaults("server");

final Server server = Server.startWith(world.stage());

Using the above examples, this Server is started on port 8080 and has a single resource handler, UserResource. The remaining properties are applied to the server as previously explained.

The ResourceHandler and DynamicResourceHandler base classes provide access to several request parts, such as headers, query parameters, and other environmental objects.

Provided By ResourceHandler and DynamicResourceHandler:

Type Accessor

Description

Context context()

The Context type provides numerous details. This contains the Request, which in turn contains Method, URL, Version, Headers, and Body. This gives you access to all parts of the request.

ContentType contentType()

The details about the content type of the request. (only on ResourceHandler)

Logger logger()

The means to log debug, error, and informational messages.

Scheduler scheduler()

Use to manage the scheduling of future tasks.

Stage stage()

The Stage of this request.

Source Code Configuration

The following provides a very simple example of source code configuration along with the XOOM HTTP fluent route mapping API. This provides the minimum code to create an endpoint that responds with "Hello, World!". Both Java and Kotlin examples are available. The tutorial descriptions refer to the Java code.

Create a new project with your favorite editor/IDE and create a Bootstrap class with the following content.

Bootstrap.java
import io.vlingo.xoom.actors.World;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.http.Response;
import io.vlingo.xoom.http.resource.Configuration.Sizing;
import io.vlingo.xoom.http.resource.Configuration.Timing;
import io.vlingo.xoom.http.resource.Resource;
import io.vlingo.xoom.http.resource.Resources;
import io.vlingo.xoom.http.resource.Server;

import static io.vlingo.xoom.http.resource.ResourceBuilder.get;
import static io.vlingo.xoom.http.resource.ResourceBuilder.resource;

public class Bootstrap {
  private final static int PORT = 8080;
  private static Bootstrap instance;
    
  public final Server server;
  public final World world;

  private Bootstrap() {
    this.world = World.startWithDefaults("hello world example java");

    final Resources resources = Resources.are(helloWorldResource());

    this.server =
            Server.startWith(
                world.stage(),
                resources,
                PORT,
                Sizing.define(),
                Timing.define());
    }

    private Resource helloWorldResource() {
      return resource("Hello World",
        get("/helloworld")
          .handle(() -> withSuccess(of(Ok, "Hello, World!")))
        );
    }

    public static final Bootstrap instance() {
      if (instance == null) {
        instance = new Bootstrap();
      }
      return instance;
    }

    public static void main(final String[] args) throws Exception {
        System.out.println("=========================================");
        System.out.println("service: started at http://localhost:" + Bootstrap.PORT);
        System.out.println("try: curl http://localhost:" + Bootstrap.PORT + "/helloworld");
        System.out.println("=========================================");
        Bootstrap.instance();
    }
}

Compile and run the server by means of the Java main() method. Check the results by running the following command-line.

> curl http://localhost:8080/helloworld

The curl command should print the following text into the terminal window: Hello, World!

The Server logs informational output by means of the World standard Logger configuration.

The Resources(23) is the set of HTTP endpoints, which in this case is only one. The Server instance (25) is where the Resources are held and used to match and dispatch on requests. To start a Server you simply provide a Stage and the Resources. When created, the Server starts listening on port 8080 for HTTP requests.

Sizing (30) is the configuration parameter with the processor pool size, dispatcher pool size, max buffer pool size, and max message size. Timing (31) is the configuration with the probe interval and timeout parameters. For now, we use the default configuration.

The next sections use the previous code as starting point.

Configuring Special Features

There are some additional features available through configuration: static file resources, server-sent events, and feed resources.

Static File Resources

Static file resources are ordinary content in disk files, such as HTML, images, and video. To serve static file resources use the vlingo-http.properties file-based configuration.

#=====================================
# static file resources
#=====================================

static.files.resource.pool = 5
static.files.resource.root = /siteroot/content
static.files.resource.subpaths = [/, /css, /js, /views]

This configuration auto-creates io.vlingo.xoom.http.resource.StaticFilesResource that serves files from the resource root /siteroot/content directory structure. Any request URI that begins with the static.files.resource.subpaths list will be served.

Virtual URI

Physical URI

/{file}

/siteroot/content/{file}

/css/{file}

/siteroot/content/css/{file}

/js/{file}

/siteroot/content/js/{file}

/views/{file}

/siteroot/content/views/{file}

Server-Sent Events (SSE)

You may create a pre-packaged SSE resource using the following configuration.

#=====================================
# server-sent events
#=====================================

sse.stream.name.all = /eventstreams/all
sse.stream.all.feed.class = io.vlingo.xoom.http.sample.user.AllSseFeedActor
sse.stream.all.feed.payload = 50
sse.stream.all.feed.interval = 1000
sse.stream.all.feed.default.id = -1
sse.stream.all.pool = 10

This enables clients to register for a long-lasting stream of events from the server. The URI used by clients is the value of sse.stream.name.{name}. In the above example the name is all and the URL is /eventstreams/all.

The sse.stream.all.feed.class is provided by the custom service/application, and in this case is io.vlingo.xoom.http.sample.user.AllSseFeedActor. The fully-qualified class name must be given. There may be up to 50 events in a single feed (one send to a client) with an interval of 1,000 milliseconds between feeds. If the client does not provide an id for the starting event, the default.id is used. In this case it is -1. There will be 10 total feed instances created in the pool.

A client makes a request to subscribe, such as the following.

Client client = Client.using(Configuration.defaultedKeepAliveExceptFor(...), ...);

Request subscribe =
    Request
        .method(Method.GET)
        .uri("/eventstreams/all")
        .and(RequestHeader.host("StreamsRUs.co"))
        .and(RequestHeader.accept("text/event-stream;charset=utf-8"));

client
    .requestWith(subscribe)
    .andThenConsume(response -> {
        switch (response.status) {
        case Ok:
          processEvents(response);
          break;
        default:
          logger().error("Unexpected: " + response.status);
          break;
        }
    })
    .repeat();

In the above example the asynchronous io.vlingo.xoom.http.resource.Client will continue to receive responses as feeds occur because it is configured for keep-alive mode and tells the Completes<Response> to repeat() after every feed is received.

The following is a skeleton of class AllSseFeedActor.

public class AllSseFeedActor extends Actor implements SseFeed {

  public AllSseFeedActor(final String streamName, final int feedPayload, final String feedDefaultId) {
    ...
  }

  @Override
  public void to(final Collection<SseSubscriber> subscribers) {
    ...
  }
}

In the above example the custom feed actor is class AllSseFeedActor, which must extend Actor and implement io.vlingo.xoom.http.resource.sse.SseFeed. See the io.vlingo.xoom.http.resource.sse.SseStreamResource that manages the feed generation process, using the custom SseFeed actor when needed.

The feed must follow the SSE standard definition. The following is one example, but not the only possibility.

event: SecurityTokenIssued
data: {"username": "jclifford", "token": "je;se9727anndnds!@"}

event: SecurityTokenIssued
data: {"username": "l.mary", "token": "l9928**)^^322nand$"}

event: SecurityTokenIssued
data: {"username": "camerontyrone", "token": "pdpjehrhfks'//&dh+a"}

There is a complete example in the xoom-http-frontservice and xoom-http-backservice in the Github repository vlingo/xoom-examples.

Feed Resources

There is another kind of feed resource, one that is not a SSE stream, but has similar characteristics. It is a stream of any kind and defines its own response body payload. You might think of such as Atom feeds, or similar.

#=====================================
# feed resources
#=====================================

feed.resource.name.events = /feeds/events
feed.resource.events.producer.class = io.vlingo.xoom.http.resource.feed.EventsFeedProducerActor
feed.resource.events.elements = 20
feed.resource.events.pool = 10

The feed has a name, in this case feed.resource.name.events, which is the URI /feeds/events. Clients may request a feed region by providing an id. The ...elements property indicates the maximum number of 20 elements may be in a given feed. There will be 10 total feed instances created in the pool.

public class EventsFeedProducerActor extends Actor implements FeedProducer {
  ...
  @Override
  public void produceFeedFor(final FeedProductRequest request) {
    ...
  }
}

In the above example the custom feed actor is class EventsFeedProducerActor, which must extend Actor and implement io.vlingo.xoom.http.resource.feed.FeedProducer. The format of the feed itself is not standardized, but may be JSON, XML, or otherwise follow the Atom standard. The EventsFeedProducerActor itself may be backed by the XOOM Lattice exchange feed type, which may stream from a XOOM Symbio Journal, or any kind of EntryReader for the storage types ObjectStore and StateStore.

package io.vlingo.xoom.lattice.exchange.feed.Feed;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.Stage;
import io.vlingo.xoom.symbio.Entry;
import io.vlingo.xoom.symbio.Source;
import io.vlingo.xoom.symbio.store.EntryReader;

/**
 * Provides support utilities for {@code Feed} and related types.
 * Every {@code Feed} has an {@code exchangeName}.
 */
public interface Feed {
  /** The default number of messages per feed. */
  static final int DefaultMessagesPerFeedItem = 20;

  /**
   * Answer a new {@code Feed} with the given properties.
   * @param stage the Stage used to create my Feeder
   * @param exchangeName the String name of my exchange
   * @param feederType the Actor type of my Feeder
   * @param entryReaderType the EntryReader that my Feeder uses
   * @return Feed
   */
  static Feed defaultFeedWith(final Stage stage, final String exchangeName, final Class<? extends Actor> feederType, final EntryReader<?> entryReaderType) {
    return new DefaultFeed(stage, exchangeName, feederType, entryReaderType);
  }

  ...
}

The above DefaultFeed is a factory for actor-based Feeder types. Invoking the Feed::feeder() will return a new actor-based Feeder of feederType with the given entryReaderType.

There is a default Feeder , the TextEntryReaderFeeder, that can consume text entries from any XOOM Symbio EntryReader implementation and produce simple feeds.

package io.vlingo.xoom.lattice.exchange.feed;

import java.util.ArrayList;
import java.util.List;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.symbio.BaseEntry.TextEntry;
import io.vlingo.xoom.symbio.store.EntryReader;

/**
 * The {@code Feeder} serving {@code TextEntry} instances.
 */
public class TextEntryReaderFeeder extends Actor implements Feeder {
  private final EntryReader<TextEntry> entryReader;
  private final Feed feed;

  /**
   * Construct my default state.
   * @param feed the Feed that I serve
   * @param entryReader the {@code EntryReader<TextEntry>} from which content is read
   */
  public TextEntryReaderFeeder(final Feed feed, final EntryReader<TextEntry> entryReader) {
    this.feed = feed;
    this.entryReader = entryReader;
  }

  /**
   * @see io.vlingo.xoom.lattice.exchange.feed.Feeder#feedItemTo(io.vlingo.lattice.exchange.feed.FeedItemId, io.vlingo.lattice.exchange.feed.FeedConsumer)
   */
  @Override
  public void feedItemTo(final FeedItemId feedItemId, final FeedConsumer feedInterest) {
    final long feedId = feedItemId.toLong();
    final long id = (feedId - 1L) * feed.messagesPerFeedItem() + 1;

    entryReader
      .readNext(String.valueOf(id), feed.messagesPerFeedItem())
      .andThen(entries -> {
        feedInterest.consumeFeedItem(toFeedItem(feedItemId, entries));
        return entries;
      });
  }

  /**
   * Answer a new {@code FeedItem} from converted {@code entries}.
   * @param feedItemId the FeedItemId of the current item
   * @param entries the List<TextEntry> to convert
   * @return FeedItem
   */
  private FeedItem toFeedItem(final FeedItemId feedItemId, final List<TextEntry> entries) {
    final List<FeedMessage> messages = new ArrayList<>(entries.size());
    for (final TextEntry entry : entries) {
      final FeedMessageBody body = FeedMessageBody.with(entry.entryData());
      final FeedMessage message = FeedMessage.with(entry.id(), body, entry.typeName(), entry.typeVersion());
      messages.add(message);
    }

    if (feed.messagesPerFeedItem() == entries.size()) {
      return FeedItem.archivedFeedItemWith(feedItemId, feedItemId.next(), feedItemId.previous(), messages);
    } else {
      return FeedItem.currentFeedWith(feedItemId, feedItemId.previous(), messages);
    }
  }
}

It should be clear how you can stitch together a number of feeds to provide HTTP feeds of event streams: Response <- http-Feed <- lattice-Feed <- symbio-EntryReader

Request and Response Filters

You can register filters with the Server for both requests and responses. The following is an example of how to start the Server with any number and type of filters:

Filters filters =
    Filters.are(
        Arrays.asList(new DateRequestFilter()),
        Arrays.asList(new DateResponseFilter()));

Server server =
    Server.startWith(
        world.stage(),
        resources,
        filters,
        PORT,
        Sizing.define(),
        Timing.define());

The following is an example of a RequestFilter that is used to ensure that every request has a Date header:

public class DateHeaderFilter extends RequestFilter {
  @Override
  public Tuple2<Request, Boolean> filter(final Request request) {
    if (request.headerValueOr(RequestHeader.Date, null) == null) {
      request.header(RequestHeader.Date, Instant.now().toString());
    }
    return Tuple2.from(request, true);
  }
}

The return type of Tuple2<Request, Boolean> provides both the Request results of the filter and a Boolean to indicate whether the filter chain should continue (true) or short circuit (false).

The following ResponseFilter does the same as the previous, but for responses:

public class DateHeaderFilter extends ResponseFilter {
  @Override
  public Tuple2<Request, Boolean> filter(final Response response) {
    if (response.headerValueOr(ResponseHeader.Date, null) == null) {
      response.header(ResponseHeader.Date, Instant.now().toString());
    }
    return Tuple2.from(response, true);
  }
}

The return type is the same as for the RequestFilter and has the same meaning.

There is a special ResponseFilter that supports CORS request-response from cross-origin clients:

final CORSResponseFilter filter = new CORSResponseFilter();

final List<ResponseHeader> headers =
        Arrays.asList(
                ResponseHeader.of(ResponseHeader.AccessControlAllowOrigin, "*"),
                ResponseHeader.of(ResponseHeader.AccessControlAllowHeaders, "Content-Type, Content-Length"),
                ResponseHeader.of(ResponseHeader.AccessControlAllowMethods, "POST,GET"));

filter.originHeadersFor(headerAcceptOriginAny, headers);

final Filters filters = Filters.are(Filters.noRequestFilters(), Arrays.asList(filter));

Server server =
    Server.startWith(
                world.stage(),
                resources,
                filters,
                PORT,
                Sizing.define(),
                Timing.define());

Register the kinds of access control headers that are supported by the receiving application. There must always be an Access-Control-Allow-Origin header, which is used to match that sent by the client agent. The matching string may be "*" for any origin or some well-know URI of a acceptable application. Other headers are as follows, and each has a corresponding constant in class ResponseHeader:

Header

Constant in Class ResponseHeader

Access-Control-Allow-Origin

AccessControlAllowOrigin

Access-Control-Allow-Credentials

AccessControlAllowCredentials

Access-Control-Expose-Headers

AccessControlExposeHeaders

Access-Control-Max-Age

AccessControlMaxAge

Access-Control-Allow-Methods

AccessControlAllowMethods

Access-Control-Allow-Headers

AccessControlAllowHeaders

Reactive Client

As noted earlier, XOOM HTTP provides a special Client used for making HTTP requests and reacting to responses as they happen. Being reactive, this Client is a non-blocking tool. The above example showed how to use a reactive pipeline to both request and reactive to the eventual response. Here is a similar example that could be used for GET requests and responses:

import io.vlingo.xoom.http.resource.Client;
...
  Catalog catalog = Catalog.of(CatalogType.SummerOffers);
  Client client = Client.using(configuration, clientConsumerType, poolSize);

  client
    .requestWith(
        Request
          .method(Method.GET)
          .uri("/offers/summer-4528")
          .and(RequestHeader.host("chocolaterocks.io"))
          .and(RequestHeader.accept("application/json")))
    .andThenConsume(response -> {
        switch (response.status) {
        case Ok:
          catalog.offer(offerFrom(response));
          break;
        default:
          logger().error("Unexpected: " + response.status);
          break;
        }
      })
      .close();
    ...

Assume that the catalog object is backed by an actor. In this example the close() occurs just after the offer() message is sent to the catalog. The Client could have remained open if it was configured for keep-alive mode, and if it had used repeat() as the final pipeline expression (demonstrated previously).

The Client provides a factory with the means to configure the resulting Client object along with one of three client types by means of the ClientConsumerType.

The Configuration has a number of factories that arrange for defaults to be set, and one that accepts all possible configuration parameters:

FactoryDescription

defaultedExceptFor( Stage, ResponseConsumer)

The Stage is used to access standard platform resources, such as the Scheduler. The ResponseConsumer provides the the actor-backed protocol for the "consumer of unknown responses." The remaining attributes will use defaults, which means keep-alive is false. The remaining attributes will use defaults, which means keep-alive is false.

defaultedExceptFor(

Stage

Address,

ResponseConsumer)

The same as above, but with the Address of the host server (host name and port). The remaining attributes will use defaults, which means keep-alive is false.

defaultedExceptFor(

Stage

Address,

ResponseConsumer,

int writeBufferSize,

int readBufferSize)

The same as above, but provides the size (length) of write and read ByteBuffer instances, respectively, created to be used by the channel. The default for both is 10,240 bytes. The remaining attributes will use defaults, which means keep-alive is false.

defaultedKeepAliveExceptFor(

Stage

Address,

ResponseConsumer)

The same as above, but requests that the the connection channel be kept alive (keep-alive is true) until it is explicitly closed by the caller. When keep-alive is used, it is possible and practical to reuse Completes<Response> that is the return value of Client requestWith(...). To accomplish this, use the repeat() as the final expression of the completes pipeline. The remaining attributes will use defaults.

has(

Stage

Address,

ResponseConsumer,

boolean keepAlive,

long probeInterval,

int writeBufferSize,

int readBufferPoolSize,

int readBufferSize)

Creates the Configuration with all explicit parameters for full attribute setting. All of the parameters are explain above except for probeInterval and readBufferPoolSize.

The probeInterval is the frequency in milliseconds that the channel is read. We recommend that you use the default of 10. You can experiment with different intervals, but if problems occur it is probably too frequent or too seldom. The readBufferPoolSize is the number of read ByteBuffer instances allocated in a pool, with each used to read different channels asynchronously. The default is 10. If the number of pooled buffers are exhausted, the pool will automatically grow to handle a greater number of simultaneous requests. Any extra temporary buffers are discarded as load diminishes.

The ClientConsumerType is an enum that enables the following dispatcher types:

TypeDescription

Correlating

Used to create a Client that interacts with the HTTP server using the extended header X-Correlation-ID. Each request is sent with a unique identity (UUID) header value and the server is expected to include the same header and value in its response. The unique identity can be provided as a pre-set X-Correlation-ID header by the using(...) caller, or the header will be created by the internal requester.

When used, it is often advantageous that the Client be created with the keep-alive configuration. Keep-alive mode ensures that the underlying connection remains open and is thus reusable for many request-response pairs. Even so, a keep-alive Client is not required.

When the response is received and parsed, it is provided through the platform standard Completes<Response> object.

LoadBalancing

Used to create a Client that makes requests using a load-balanced dispatch across some number of channel actors. The Client.using() parameter poolSize is used to create the number of round-robin dispatchers. The load-balancing is based on the dispatcher with the least number of requests currently in its mailbox.

When the response is received and parsed, it is provided through the platform standard Completes<Response> that is the return value when calling the Client requestWith().

RoundRobin

Used to create a Client that makes requests using a round-robin dispatch across some number of channel actors. The Client.using() parameter poolSize is used to create the number of round-robin dispatchers. When the response is received and parsed, it is provided through the platform standard Completes<Response> that is the return value when calling the Client requestWith().

Determine which kind of Client to create for various request-response situations.

Defining Dynamic Resources

XOOM HTTP provides a fluent API to define HTTP endpoints and corresponding Java handler methods. These kinds of resources are different than those loaded from the file vlingo-http.properties. You had a brief introduction to the handler methods at the outset of this chapter. Here you will see them in more detail.

The following is the ResourceBuilder used to fluently create route mappings.

ResourceBuilder.resource(final String name, final RequestHandler... requestHandlers) 

You give the Resource a name and a varargs list of RequestHandler definitions. This method returns a Resource. The Server needs a set of Resource instances to match and route HTTP requests to handlers that respond. Each of the RequestHandler instances are mapped by means of declaring HTTP methods, as next discussed.

HTTP Methods

The fluent API supports the following HTTP methods.

  • ResourceBuilder.post(final String uri)

  • ResourceBuilder.get(final String uri)

  • ResourceBuilder.put(final String uri)

  • ResourceBuilder.patch(final String uri)

  • ResourceBuilder.delete(final String uri)

  • ResourceBuilder.head(final String uri)

  • ResourceBuilder.options(final String uri)

  • ResourceBuilder.trace(final String uri)

  • ResourceBuilder.connect(final String uri)

All of the above methods answer a new resource handler instance, whether or not it extends the base class DynamicRequestHandler. We recommend using static imports for the most fluent use of the API.

import static io.vlingo.xoom.common.serialization.JsonSerialization.serialized;
import static io.vlingo.xoom.http.Response.Status.Created;
import static io.vlingo.xoom.http.Response.Status.NotFound;
import static io.vlingo.xoom.http.Response.Status.Ok;
import static io.vlingo.xoom.http.ResponseHeader.Location;
import static io.vlingo.xoom.http.ResponseHeader.headers;
import static io.vlingo.xoom.http.ResponseHeader.of;
import static io.vlingo.xoom.http.resource.ResourceBuilder.get;
import static io.vlingo.xoom.http.resource.ResourceBuilder.patch;
import static io.vlingo.xoom.http.resource.ResourceBuilder.post;
import static io.vlingo.xoom.http.resource.ResourceBuilder.resource;

The above shows other common static imports besides those needed for fluent resource wiring.

Request Handler

The request handler enforces type safety on the handler function method definition through the declaration of expected parameters. There are several mapping options, so consider each.

Path Parameters

Map path parameters by identifying them in the URI as a {variable} and then indicate the type in the path() method to be used by your handler.

get("/user/{userId}")
    .path(String.class)
    .handler((userId) -> /* */);

Here the userId will be mapped to a String because the method path(String.class) specifies that type. You can specify parameter mappings to any of the following types.

  • String

  • Long

  • Integer

  • Float

  • Double

  • Boolean

  • Short

  • Character

The path() method must be used before any other route mapping method, otherwise an exception will be thrown when the Server starts.

When using path() be sure you have the same path variable in the URI between brackets {<variable>} as the path() methods reference.

Body Parameter

The body() method maps the HTTP body into the type you specify.

post("/user")
    .body(NameData.class)
    .handler((nameData) -> /* */);

Query Parameters

You may use query parameters in the expected way. For example, page=5 is a query parameter in this curl command.

> curl http://localhost:8080/user?page=5

You may map this specific query parameter fluently in three different ways.

get("/user")
    .query("page")
    .handler((page) -> /* */);

get("/user")
    .query("page", Integer.class)
    .handler((page) -> /* */);

get("/user")
    .query("page", Integer.class, 0 /* default value */)
    .handler((page) -> /* */);

By default, the type of all query parameters is String. When the query parameter isn't present, the value is null. It's good practice to always specify a default value for query parameters to avoid unexpected behavior.

Request Headers

You may access request headers as expected.

get("/user")
    .header("Location")
    .handler((location) -> /* */);

Combining Them All

The following is an example showing all of the above parameter options.

post("/user/{userId}")
    .param(String.class)
    .body(UserData.class)
    .header("Location")
    .handler((userId, userData, location) -> {
        // Perform some action
        return Completes.withSuccess(Response.of(Ok, serialized(userData)));
    });

The order of the parameters matters. Try changing the order of body and header. You will see that the type of the userData parameter becomes a header type and location now has the UserData type.

Check the xoom-http-frontservice for a more complete working example.

Content Negotiation

To support media/content negotiation request handlers use the ObjectResponse<?> return type. The ? is replaced with your actual representation data type, such as PersonData or EmployeeData, or whatever; the type that is represented to clients.

The from(...) method enables building an ObjectResponse with a response status, headers, and a concrete represented content type. This response is serialized according to the accept headers of the request via the MediaTypeMapper, or JSON if a given MediaTypeMapper is not supplied. If there is no match on the specific media type, the client receives the appropriate HTTP status code.

Once serialized by the supplied MediaTypeMapper , the final Response is generated and the appropriate Content-Type header is automatically inserted into the Response, along with any other headers provided by the handler method. The following is a basic example of how to create a handler that returns MyType as the ObjectResponse type.

// GET /resources/mytype
ObjectResponse<MyType> provideMyType() {
  return ObjectResponse.from(Status.ok, new MyType("some value"));
}

The previous Mapper type has been deprecated, which affects places where the existing Mapper is being used as an input parameter to the RequestHandler methods. The older Mapper is undesirable because it ignores the accept header and may create confusion—the deprecation note speaks to that.

Resource Handler Methods

There are a few things to keep in mind when designing a request handler method on a resource.

Resource methods must return one of two types. It may be Completes<Response> or Completes<ObjectResponse>. The use of the Completes<?> enables the server to operate asynchronously by postponing responses until all necessary processing has completed.

If there are a number of asynchronous steps required to carry out a request to the ultimate response, the steps should be managed through a Completes pipeline.

public class UserResource {
  private final AddressFactory addressFactory;
  private final Stage stage;

  public UserResource(final World world) {
    this.addressFactory = world.addressFactory();
    this.stage = world.stage();
  }
  ...
  public Completes<Response> changeContact(final String userId, final ContactData contactData) {
    return stage.actorOf(User.class, addressFactory.from(userId))
      .andThenTo(user -> user.withContact(new Contact(contactData.emailAddress, contactData.telephoneNumber)))
      .andThenTo(userState -> Completes.withSuccess(Response.of(Ok, serialized(UserData.from(userState)))))
      .otherwise(noUser -> Response.of(NotFound, userLocation(userId)));
  }
  ...
  public Resource<?> routes() {
    return resource("User Resource",
      ...
      patch("/users/{userId}/contact")
        .param(String.class)
        .body(ContactData.class)
        .handle(this::changeContact),
      ...
      );
  }
}

Recall that andThenTo() is used to send an asynchronous message to an actor with an eventual outcome via Completes, and otherwise() handles an error outcome, while exceptions are handled by recoverFrom(). If you must simply map the outcome value to a new value and possibly a new type, you need only use andThen(). You can read more details about Completes<T> here.

Due to the fact that any given handler instance will be reused many times, handler methods must be side-effect free. That is, they must not mutate the state of the handler instance. In the above example you can see that all UserResource state is final and the changeContact() method is side-effect free. Handler methods are themselves pure functions, but do not require components that they use to be pure functions. In other words, the User domain object actor may be mutable.

Known Limitations

We know of a few limitations.

  • Using the fluent API, we currently support up to six handler function arguments. If you need more than six, please open an issue here.

  • There is currently a problem with some Linux servers and the JDK implementation of non-blocking socket channel reads. (Was fixed.)

Would you like to contribute to XOOM HTTP? See the vlingo/xoom-http repository and review any outstanding issues. We are happy to help you with the simple on-boarding steps.

Last updated