Skip to main content

Multi

The Multi type

A Multi represents a stream of data. A stream can emit 0, 1, n, or an infinite number of items. A Multi<T> is a data stream that:

  • emits 0..n item events
  • emits a failure event
  • emits a completion event for bounded streams
warning

Failures are terminal events: after having received a failure no further item will be emitted.

Multi<T> provides many operators that create, transform, and orchestrate Multi sequences. The operators can be used to define a processing pipeline. The events flow in this pipeline, and each operator can process or transform the events.

Multis are lazy by nature. To trigger the computation, you must subscribe.

The following snippet provides a simple example of pipeline using Multi:

Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().transform(i -> i * 2)
.select().first(3)
.onFailure().recoverWithItem(0)
.subscribe().with(System.out::println);
info

Remember, if you don’t subscribe, nothing is going to happen. Also, the pipeline is materialized for each subscription.

When subscribing to a Multi, you can pass an item callback (invoked when the item is emitted), or pass two callbacks, one receiving the item and one receiving the failure, or three callbacks to handle respectively the item, failure and completion events.

Cancellable cancellable = multi
.subscribe().with(
item -> System.out.println(item),
failure -> System.out.println("Failed with " + failure),
() -> System.out.println("Completed"));
note

Note the returned Cancellable: this object allows canceling the stream if need be.

Creating Multi (basics)

Multi

package _01_basics;

import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.smallrye.mutiny.Multi;

public class _13_Multi {

public static void main(String[] args) {
System.out.println("Hello world");

// -------------------------------------------------------------------------------------------------- //

Multi.createFrom().items(1, 2, 3)
.subscribe().with(
subscription -> {
System.out.println("Subscription: " + subscription);
subscription.request(10);
},
item -> System.out.println("Item: " + item),
failure -> System.out.println("Failure: " + failure.getMessage()),
() -> System.out.println("Completed"));

// -------------------------------------------------------------------------------------------------- //

System.out.println("----");

Multi.createFrom().range(10, 15)
.subscribe().with(System.out::println);

var randomNumbers = Stream
.generate(ThreadLocalRandom.current()::nextInt)
.limit(5)
.collect(Collectors.toList());

// -------------------------------------------------------------------------------------------------- //

System.out.println("----");

Multi.createFrom().iterable(randomNumbers)
.subscribe().with(System.out::println);
}
}

Output:

Hello world
Subscription: io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber@50040f0c
Item: 1
Item: 2
Item: 3
Completed
----
10
11
12
13
14
----
-1003133619
1768110119
-112749840
-701258952
-668099042

Multi_Subscriber


package _01_basics;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import io.smallrye.mutiny.Multi;

public class _14_Multi_Subscriber {

public static void main(String[] args) {
System.out.println("Hello world with subscriber");

Multi.createFrom().items(1, 2, 3).subscribe().withSubscriber(new Subscriber<Integer>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe()");
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
this.subscription.request(1);
}

@Override
public void onError(Throwable t) {
System.out.println("onError: " + t.getMessage());
}

@Override
public void onComplete() {
System.out.println("onComplete()");
}
});
}
}

Output:

Hello world with subscriber
onSubscribe()
onNext: 1
onNext: 2
onNext: 3
onComplete()

Multi_From_Emitter

package _01_basics;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.mutiny.Multi;

public class _15_Multi_From_Emitter {

public static void main(String[] args) throws InterruptedException {
System.out.println("Multi from emitter");

ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

AtomicReference<ScheduledFuture<?>> ref = new AtomicReference<>();
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);

Multi.createFrom().emitter(emitter -> {
ref.set(service.scheduleAtFixedRate(() -> {
emitter.emit("tick");
if (counter.getAndIncrement() == 5) {
ref.get().cancel(true);
emitter.complete();
latch.countDown();
}
}, 0, 500, TimeUnit.MILLISECONDS));
})
.subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done!"));

latch.await();
service.shutdown();
}
}

Output:

Multi from emitter
tick
tick
tick
tick
tick
tick
Done!

Multi_Control_Subscription

package _01_basics;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Flow.Subscription;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiSubscriber;

public class _16_Multi_Control_Subscription {

public static void main(String[] args) {
System.out.println("Multi and subscription");

Multi.createFrom()
.ticks().every(Duration.of(1, ChronoUnit.SECONDS))
.subscribe().withSubscriber(new MultiSubscriber<Long>() {

private Subscription subscription;
private int counter = 0;

@Override
public void onItem(Long tick) {
System.out.println("Tick: " + tick);
if (counter++ == 10) {
subscription.cancel();
} else {
subscription.request(1);
}
}

@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onCompletion() {
System.out.println("Done!");
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
});

}
}

Output:

Multi and subscription
Tick: 0
Tick: 1
Tick: 2
Tick: 3
Tick: 4
Tick: 5
Tick: 6
Tick: 7
Tick: 8
Tick: 9
Tick: 10

Multi_By_Repeating

package _01_basics;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class _17_Multi_By_Repeating {

public static void main(String[] args) throws InterruptedException {
System.out.println("Multi by repeating");

Multi.createBy()
.repeating()
.supplier(Service::fetchValue)
.until(n -> n > 1_000_000L)
.subscribe().with(System.out::println);

System.out.println("\n----\n");

CountDownLatch latch = new CountDownLatch(1);

Multi.createBy()
.repeating()
.uni(Service::asyncFetchValue)
.atMost(10)
.subscribe().with(System.out::println, Throwable::printStackTrace, latch::countDown);

latch.await();

System.out.println("\n----\n");

Multi.createBy()
.repeating()
.completionStage(Service::queryDb)
.whilst(n -> n < 1_000_000L)
.subscribe().with(System.out::println);
}

static class Service {

static long fetchValue() {
return ThreadLocalRandom.current().nextLong(1_001_000L);
}

static Uni<Long> asyncFetchValue() {
return Uni.createFrom().completionStage(Service::queryDb);
}

static CompletionStage<Long> queryDb() {
return CompletableFuture.supplyAsync(Service::fetchValue);
}
}
}

Multi_From_Resource

package _01_basics;

import io.smallrye.mutiny.Multi;

public class _18_Multi_From_Resource {

public static void main(String[] args) {
System.out.println("Multi from resource");

Multi.createFrom()
.resource(MyResource::new, MyResource::stream)
.withFinalizer(MyResource::close)
.subscribe().with(System.out::println);
}

static class MyResource {

public Multi<Integer> stream() {
System.out.println("stream()");
return Multi.createFrom().range(0, 10);
}

public void close() {
System.out.println("close()");
}
}
}

Output:

Multi from resource
stream()
0
1
2
3
4
5
6
7
8
9
close()

Go further with the Mutiny workshop!

https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples