Skip to main content

SubmissionPublisher

flow-publisher

Description

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered.

Requirements

JDK 21

Code

package com.jreact;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String> {

private Flow.Subscription subscription;

public void flowSubmissionPublisher() {
System.out.printf("main (tid=%d)%n", Thread.currentThread().threadId());


try (final var publisher = new SubmissionPublisher<String>()) {

System.out.println("-- subscribe --");
publisher.subscribe(new MySubscriber());

TimeUnit.SECONDS.sleep(1);
System.out.println("-- submit --");

publisher.submit("abc");
publisher.submit("123");
publisher.submit("XYZ");

TimeUnit.SECONDS.sleep(1);
System.out.println("-- close --");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf(" onSubscribe (tid=%d)%n",
Thread.currentThread().threadId());

this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(String item) {
System.out.printf(" onNext (tid=%d) : %s%n",
Thread.currentThread().threadId(), item);

subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
System.out.printf(" onError (tid=%d) : %s%n",
Thread.currentThread().threadId(), throwable);
}

@Override
public void onComplete() {
System.out.printf(" onComplete (tid=%d)%n",
Thread.currentThread().threadId());
}
}

Output

main (tid=1)
-- subscribe --
onSubscribe (tid=29)
-- submit --
onNext (tid=29) : abc
onNext (tid=29) : 123
onNext (tid=29) : XYZ
-- close --
onComplete (tid=29)

Running Example Code