Skip to main content

java.util.concurrent.Flow

The Java Flow API, introduced in Java 9, provides a standard and efficient way for components of a reactive system to communicate with each other asynchronously, without requiring them to know each other's implementation details.

Flow API enables us to adopt Reactive Programming using just the JDK, not needing additional libraries such as RxJava or Project Reactor, amongst others.

java-9-flow-api-operations-01.png

Class Flow

Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription.

These interfaces correspond to the reactive-streams specification. They apply in both concurrent and distributed asynchronous settings: All (seven) methods are defined in void "one-way" message style. Communication relies on a simple form of flow control (method Flow.Subscription.request(long)) that can be used to avoid resource management problems that may otherwise occur in "push" based systems.

Key components:

Publisher

flow-publisher

A Publisher is a provider of data items, which publishes them to one or more Subscriber instances. To implement a custom Publisher, you need to implement the java.util.concurrent.Flow.Publisher interface, which defines the following methods:

  • subscribe(Subscriber<? super T> subscriber): Subscribes a Subscriber to this Publisher, returning a new Subscription instance that represents the connection between them.

Subscriber

flow-publisher

A Subscriber is a consumer of data items, which receives them from a Publisher. To implement a custom Subscriber, you need to implement the java.util.concurrent.Flow.Subscriber interface, which defines the following methods:

  • onSubscribe(Subscription subscription): Receives a Subscription instance from the Publisher, which allows the Subscriber to request and receive data items from the Publisher.
  • onNext(T item): Receives the next data item from the Publisher.
  • onError(Throwable throwable): Receives an error notification from the Publisher.
  • onComplete(): Receives a completion notification from the Publisher.

Subscription

A Subscription is a connection between a Publisher and a Subscriber, which allows the Subscriber to request and receive data items from the Publisher. To implement a custom Subscription, you need to implement the java.util.concurrent.Flow.Subscription interface, which defines the following methods:

  • request(long n): Requests n data items from the Publisher.
  • cancel(): Cancels the Subscription, terminating the connection between the Publisher and Subscriber.

Processor

A Processor is a component that both subscribes to a Publisher and publishes to a Subscriber, allowing it to transform or filter data items. To implement a custom Processor, you need to implement both the java.util.concurrent.Flow.Publisher and java.util.concurrent.Flow.Subscriber interfaces.

SubmissionPublisher

Since Java 9, we can create Reactive Streams by introducing four core interfaces: Publisher, Subscriber, Subscription, Processor, and one concrete class: SubmissionPublisher that implements the Publisher interface. Each interface plays a different role, corresponding to the principles of Reactive Streams. We can use the submit() method of SubmissionPublisher class to publish the provided item to each subscriber.

Example: Flow.Publisher

A Flow.Publisher usually defines its own Flow.Subscription implementation; constructing one in method subscribe and issuing it to the calling Flow.Subscriber. It publishes items to the subscriber asynchronously, normally using an Executor. For example, here is a very simple publisher that only issues (when requested) a single TRUE item to a single subscriber. Because the subscriber receives only a single item, this class does not use buffering and ordering control required in most implementations (for example SubmissionPublisher).

 class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (!completed) {
completed = true;
if (n <= 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}

Example: Flow.Subscriber

A Flow.Subscriber arranges that items be requested and processed. Items (invocations of Flow.Subscriber.onNext(T)) are not issued unless requested, but multiple items may be requested. Many Subscriber implementations can arrange this in the style of the following example, where a buffer size of 1 single-steps, and larger sizes usually allow for more efficient overlapped processing with less communication; for example with a value of 64, this keeps total outstanding requests between 32 and 64. Because Subscriber method invocations for a given Flow.Subscription are strictly ordered, there is no need for these methods to use locks or volatiles unless a Subscriber maintains multiple Subscriptions (in which case it is better to instead define multiple Subscribers, each with its own Subscription).

 class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // re-request when half consumed
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0)
subscription.request(count = bufferSize - bufferSize / 2);
consumer.accept(item);
}
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
}

The default value of defaultBufferSize() may provide a useful starting point for choosing request sizes and capacities in Flow components based on expected rates, resources, and usages. Or, when flow control is never needed, a subscriber may initially request an effectively unbounded number of items, as in:

 class UnboundedSubscriber<T> implements Subscriber<T> {
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // effectively unbounded
}
public void onNext(T item) { use(item); }
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
void use(T item) { ... }
}