Publisher, first receiving a stream of elements and then publishing those following a probable transformation. This is perfect for defining streaming pipelines.
Publisherto which it is subscribed. The signals indicate that a
Subscriptionhas been granted, that a next element is available, that an error has occurred, and if the end of stream is reached, that the stream is completed.
Subscriber, which is used by the
Subscriberto request 1 to N sequenced elements from the
Publisher, and to cancel its subscription. The value of N may be effectively unbounded, which is indicated by requesting
Processoryourself. Default implementations of these are provided by XOOM Streams. You will, instead implement
Publisherand specifies the number of elements it can accept. By specifying the number of elements it can accept, backpressure is enforced on the
Publisherso that it does not overwhelm the
Publisherthat produces a sequenced series of
Longvalues and a
Sinkthat prints those as
Processoris created, which is both a
Subscriberto an upstream
Publisherto an downstream
Subscriberfrom the previous example, but injects the
Processorin between the two, where the
Sourceis the source of a stream of elements relayed by a
Sinkis the destination of the elements provided to a
Publisher. Next are ways that these may be used.
Sourcetypes, but you will also develop your own implementations. This is the
Sourceprotocol as provided by XOOM Streams.
Sourceis to take requests for the next elements in a stream, which are returned as a
Completes<Elements<T>>. This means that the
Elements<T>are completed at some future time. See
Completes<T>for details on use.
isSlow()protocol answers a
Completes<Boolean>indicating whether the
Sourcewill tend to be slow in providing next elements. The following demonstrates how you may answer.
falseaccurately is vitally important. If your Source is slow, stating so enables the
Publisherto decide on the kind of
Schedulerto use between probes for next elements. A slow
Sourcewill be managed by repeated schedule-once timer intervals, while a fast
Sourcewill be managed by a consistently schedule-many repeating time interval.
Sourcethat lazily (not all preallocated in a
List) provides a range of
Sourceinstances, and default implementation types of which these produce instances.
Sourcetypes will grow over future releases.
Sinktypes, but you will develop your own implementations. This is the
Sinkprotocol as provided by XOOM Streams.
ready()indicates that the
Sinkshould become prepared to handle incoming values. The
terminate()indicates that the
Sinkis being terminated and will no longer receive values. The
whenValue(T value)is used to provide the next available value from the
Sinkthat prints received values.
Sinkinstances, and default implementation types of which these produce instances.
Sinktypes will grow over future releases.
Streamthat is eventually available inside the
Completesis used to consume the elements of streaming data.
flowInto()overrides are used to start the stream flowing into a given
Sink<S>. Besides causing the flow to begin, additional options are available. To control the initial rate at which elements flow, as in the number of elements that will arrive in a single burst, pass a value for
flowElementsRate. If no parameter is provided, the default is
probeInterval. The default if not explicitly provided by the client is
5milliseconds. There are other constants that you may use. Ultimately the performance of stream throughput is based on the amount of throughput available to the
Source<S>and the speed at which the
Source<T>is probed by the
StreamPublisherto retrieve that data.
request()operation may be used to change the flow rate after the initial rate has been established. The
stop()method may be used to completely terminate the flow of elements from the
Publisher<T>. Note, however, that since the flow is asynchronous it is possible that some elements are already incoming to the
Sink<S>and thus will not be prevented from arriving. If no parameter is provided, the default is
StreamProcessoryou will probably want to transform the
StreamSubscriberside of the processor's incoming data to another type that is outgoing through its
StreamPublisherside. You can use an operator to do that.
Operator<T, R>takes type
Tas an input
valueand provides type
Ras output. Output is delivered to a
R, which means the
Rvalue is produced before the
Consumer<R>receives it. This is where the implementation of the
Operator<T, R>plays in. Note that there are currently three factory methods provided on the
Mapper, but also flattens the stream by reducing the inner structure to the single one produced by the mapping
filteris created that filters in all
Stringinstances that contain the substring
"1", and filters out all others. Then the
List<String>of six elements is iterated over and the filter's
performInto()operation is used. If the filter
s.contains("1")is satisfied, the match is added to the
results, which then contains elements
mapperis created that maps all
Stringinstances of digit characters to
List<String>of three elements is iterated over and the mapper's
performInto()operation is used. The new value is added to the
results, which then contains elements
List<List<String>> liststhat is a list of lists. The
mapperis created that internally uses the Java
mapperfirst streams over the
List<List<String>> lists. It then uses
flatMapto handle each of the single
List<String> list. It then streams over each of those lists, and each individual
Stringis transformed to an
FlatMapperoperator. Here's a
FlatMappersolution to a different problem:
LongRangeSource, which is a stream source that contains a
endExclusivevalue. In the first
endExclusiveof two greater, which is
3; the first numeric range is thus
2. The second range is
4. The flat-mapper reads every value in each
LongRangeSourceand adds it to the collection referenced by
providedLongs. When completed the two containers are flattened such that only the inner ranges are preserved in the
Operator<T, R>such as these, or use them in your
StateStoreReaderis provided as part of the
StateStoreprotocol. Thus, you may ask a
StateStoreto stream data from it's internal storage. The two interfaces provided support streaming all of a given state type; that is, a
stateTypeis a Java
Class<?>of which all states of that type are stored in a single container. In some storage mechanisms that container may be a database table or an in-memory grid region.
StateStoreReaderqueries that data from the container and provides an instance of
Source<?>types, such as
Commandinstances. For example, the
Journal<T>provides this streaming interface through the
JournalReader<T>, which in turn is an
JournalReader, then the
EntryBundleinstances, and then to flow the stream into the
Sink<EntryBundle>for reporting on each event.
Journal—support persisting a totally ordered stream of
Source<?>entries, such as
Command. You may obtain any of the total streams using the same basic techniques.