Skip to main content

Subscription Request

flow-publisher

Description

Adds the given number n of items to the current unfulfilled demand for this subscription.

Requirements

JDK 21

Code

package com.jreact;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String> {

private Flow.Subscription subscription;

public void flowSubscriptionRequest() {
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());


try (final var publisher = new SubmissionPublisher<String>()) {

System.out.println("-- subscribe --");

final var subscriber = new MySubscriber();
publisher.subscribe(subscriber);

TimeUnit.SECONDS.sleep(1);

System.out.println("-- submit (a1, a2, a3, a4) --");

publisher.submit("a1");
publisher.submit("a2");
publisher.submit("a3");
publisher.submit("a4");

TimeUnit.SECONDS.sleep(1);

System.out.println("-- request 1 --");
subscriber.request(1);

TimeUnit.SECONDS.sleep(1);

System.out.println("-- request 2 --");
subscriber.request(2);

TimeUnit.SECONDS.sleep(1);

System.out.println("-- request 3 --");
subscriber.request(3);

TimeUnit.SECONDS.sleep(1);

System.out.println("-- submit (a5, a6) --");

publisher.submit("a5");
publisher.submit("a6");

TimeUnit.SECONDS.sleep(1);

System.out.println("-- close --");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());

this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);

subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}

@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}

void request(long n) {
if (subscription != null) {
subscription.request(n);
}
}
}

Output

main (tid=1)
-- subscribe --
onSubscribe (tid=29)
-- submit (a1, a2, a3, a4) --
onNext (tid=29) : a1
onNext (tid=29) : a2
onNext (tid=29) : a3
onNext (tid=29) : a4
-- request 1 --
-- request 2 --
-- request 3 --
-- submit (a5, a6) --
onNext (tid=29) : a5
onNext (tid=29) : a6
-- close --
onComplete (tid=29)

Running Example Code