Uni
The Uni type
A Uni represents a stream that can only emit either an item or a failure event.
You rarely create instances of Uni yourself, but, instead, use a reactive client exposing a Mutiny API that provides Uni objects. That being said, it can be handy at times.
A Uni<T> is a specialized stream that emits only an item or a failure. Typically, Uni<T> are great to represent asynchronous actions such as a remote procedure call, an HTTP request, or an operation producing a single result.
Uni<T> provides many operators that create, transform, and orchestrate Uni sequences.
As said, Uni<T> emits either an item or a failure. Note that the item can be null, and the Uni API has specific methods for this case.
Typically, a Uni<Void> always emits null as item event or a failure if the represented operation fails. You can consider the item event as a completion signal indicating the success of the operation.
The offered operators can be used to define a processing pipeline. The event, either the item or failure, flows in this pipeline, and each operator can process or transform the event. Unis are lazy by nature.
To trigger the computation, you must have a final subscriber indicating your interest. The following snippet provides a simple example of pipeline using Uni:
Uni.createFrom().item(1)
.onItem().transform(i -> "hello-" + i)
.onItem().delayIt().by(Duration.ofMillis(100))
.subscribe().with(System.out::println);
Remember: if you don’t subscribe, nothing is going to happen. What’s more, the pipeline is materialized for each subscription.
When subscribing to a Uni, you can pass an item callback (invoked when the item is emitted), or two callbacks (one receiving the item and one receiving the failure):
Cancellable cancellable = uni
.subscribe().with(
item -> System.out.println(item),
failure -> System.out.println("Failed with " + failure));
Note the returned Cancellable: this object allows canceling the operation if need be.
Creating Unis (basics)
Uni
package _01_basics;
public class _01_Uni {
public static void main(String[] args) {
System.out.println("Hello world");
io.smallrye.mutiny.Uni<String> uni = io.smallrye.mutiny.Uni.createFrom().item("Hello, world!");
uni.subscribe().with(System.out::println);
}
}
Output:
Hello world
Hello, world!
UniSubscriber
package _01_basics;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
public class _02_Uni_UniSubscriber {
public static void main(String[] args) {
System.out.println("Hello world with UniSubscriber");
Uni<String> uni = Uni.createFrom().item("Hello, world!");
uni.subscribe().withSubscriber(new UniSubscriber<String>() {
@Override
public void onSubscribe(UniSubscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onItem(String item) {
System.out.println("onItem: " + item);
}
@Override
public void onFailure(Throwable failure) {
System.out.println("onFailure: " + failure.getMessage());
}
});
}
}
Output:
Hello world with UniSubscriber
onSubscribe
onItem: Hello, world!
Uni_From_Supplier
package _01_basics;
import java.util.Random;
import io.smallrye.mutiny.Uni;
public class _03_Uni_From_Supplier {
public static void main(String[] args) {
System.out.println("️Uni from supplier");
Random random = new Random();
Uni<Integer> uniFromSupplier = Uni.createFrom().item(random::nextInt);
for (var i = 0; i < 5; i++) {
uniFromSupplier.subscribe().with(System.out::println);
}
}
}
Output:
1190589952
1306938402
2056014134
45940004
-173217289
Uni_From_Supplier_And_State
package _01_basics;
import java.util.concurrent.atomic.AtomicInteger;
import io.smallrye.mutiny.Uni;
public class _04_Uni_From_Supplier_And_State {
public static void main(String[] args) {
System.out.println("️Uni from supplier with state");
Uni<Integer> uniFromSupplierAndState = Uni.createFrom().item(AtomicInteger::new, i -> i.addAndGet(10));
for (var i = 0; i < 5; i++) {
uniFromSupplierAndState.subscribe().with(System.out::println);
}
}
}
Output:
️Uni from supplier with state
10
20
30
40
50
Uni_Deferred
package _01_basics;
import java.util.concurrent.atomic.AtomicLong;
import io.smallrye.mutiny.Uni;
public class _05_Uni_Deferred {
public static void main(String[] args) {
System.out.println("️Deferred Uni");
AtomicLong ids = new AtomicLong();
Uni<Long> deferredUni = Uni.createFrom().deferred(() -> Uni.createFrom().item(ids::incrementAndGet));
for (var i = 0; i < 5; i++) {
deferredUni.subscribe().with(System.out::println);
}
}
}
Output:
️Deferred Uni
1
2
3
4
5
Uni_From_Emitter
package _01_basics;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import io.smallrye.mutiny.Uni;
public class _06_Uni_From_Emitter {
public static void main(String[] args) throws InterruptedException {
System.out.println("Uni from emitter");
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
CountDownLatch emitterLatch = new CountDownLatch(1);
Uni<String> uniFromEmitter = Uni.createFrom().emitter(emitter -> {
forkJoinPool.submit(() -> {
emitter.complete("Hello");
emitterLatch.countDown();
});
});
uniFromEmitter.subscribe().with(System.out::println);
emitterLatch.await();
}
}
Output:
Uni from emitter
Hello
Uni_From_Emitter_And_State
package _01_basics;
import java.util.concurrent.atomic.AtomicInteger;
import io.smallrye.mutiny.Uni;
public class _07_Uni_From_Emitter_And_State {
public static void main(String[] args) {
System.out.println("Uni from emitter and state");
Uni<Integer> uniFromEmitterAndState = Uni.createFrom()
.emitter(AtomicInteger::new, (i, e) -> e.complete(i.addAndGet(10)));
for (var i = 0; i < 5; i++) {
uniFromEmitterAndState.subscribe().with(System.out::println);
}
}
}
Output:
Uni from emitter and state
10
20
30
40
50
Uni_From_Failure
package _01_basics;
import java.io.IOException;
import io.smallrye.mutiny.Uni;
public class _08_Uni_From_Failure {
public static void main(String[] args) {
System.out.println("⚡️ Uni from failure");
Uni.createFrom().failure(new IOException("Boom"))
.subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage()));
Uni.createFrom().failure(() -> new IOException("Badaboom"))
.subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage()));
}
}
Output:
Uni from failure
Boom
Badaboom
Uni_From_CompletionStage
package _01_basics;
import static java.util.concurrent.CompletableFuture.delayedExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import io.smallrye.mutiny.Uni;
public class _09_Uni_From_CompletionStage {
public static void main(String[] args) throws InterruptedException {
System.out.println("Uni from CompletionStage");
var cs = CompletableFuture
.supplyAsync(() -> "Hello!", delayedExecutor(1, TimeUnit.SECONDS))
.thenApply(String::toUpperCase);
Uni.createFrom().completionStage(cs)
.subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage()));
Thread.sleep(2000);
}
}
Output:
Uni from CompletionStage
HELLO!
Go further with the Mutiny workshop!
https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples