Skip to main content

Guides

How to deal with CompletionStage?

CompletionStage and CompletableFuture are classes provided by Java to represent asynchronous actions.

Differences between Uni and CompletionStage

While CompletionStage and CompletableFuture are close to Uni in terms of use case, there are some fundamental differences.

CompletionStage are eager. When a method returns a CompletionStage, the operation has already been triggered. The outcome is used to complete the returned CompletionStage. On the other side, Unis are lazy. The operation is only triggered once there is a subscription.

CompletionStage caches the outcome. So, once received, you can retrieve the result. Every retrieval will get the same result. With Uni, every subscription has the opportunity to re-trigger the operation and gets a different result.

From Uni to CompletionStage

CompletionStage<String> cs = uni.subscribeAsCompletionStage();
note

It’s important to understand that retrieving a CompletionStage subscribes to the Uni. If you do this operation twice, it subscribes to the Uni twice and re-trigger the operation.

Creating a Uni from a CompletionStage#

To create a Uni from a CompletionStage, use Uni.createFrom().completionStage(...).

Uni<String> uni1 = Uni
// Create from a Completion Stage
.createFrom().completionStage(
CompletableFuture.supplyAsync(() -> "hello", executor)
)
.onItem().transform(String::toUpperCase);

Uni<String> uni2 = Uni
// Create from a Completion Stage supplier (recommended)
.createFrom().completionStage(
() -> CompletableFuture.supplyAsync(() -> "hello", executor)
)
.onItem().transform(String::toUpperCase);

Creating a Multi from a CompletionStage

To create a Multi from a CompletionStage, use Multi.createFrom().completionStage(...). It produces:

  • a multi emitting an item and completing - if the value produced by the completion stage is not null,
  • an empty multi if the value produced by the completion stage is null,
  • a failed multi is completion stage is completed exceptionally.
Multi<String> multi1 = Multi
.createFrom().completionStage(
CompletableFuture.supplyAsync(() -> "hello", executor)
)
.onItem().transform(String::toUpperCase);

Multi<String> multi2 = Multi
.createFrom().completionStage(() ->
CompletableFuture.supplyAsync(() -> "hello", executor)
)
.onItem().transform(String::toUpperCase);

Mutiny Vert.x bindings

Smallrye Mutiny Vert.x bindings

While the Vert.x core APIs expose asynchronous programming through callbacks and promise / future, code generators offer bindings to other asynchronous programming models, including: Kotlin coroutines, and RxJava 1, 2 and 3.

This project offers Vert.x binding for Mutiny.

What does the Mutiny code generator do?

vertx-mutiny-translations-01.png

Getting the bindings

The bindings can be accessed from the following Maven coordinates:

  • Group: io.smallrye.reactive
  • Artifact: smallrye-mutiny-vertx-<MODULE> where MODULE refers to a Vert.x module, such as core, pg-client, web-client, etc.

The full list of supported modules from the Vert.x stack is available at https://github.com/smallrye/smallrye-mutiny-vertx-bindings/tree/main/vertx-mutiny-clients

Example

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

public class hello {

static class MyVerticle extends AbstractVerticle {

private long counter = 0L;

/*
* Asynchronous start completion notification through a Uni.
* This is the Mutiny variant of `start(Promise<Void>)` in plain Vert.x.
*/
@Override
public Uni<Void> asyncStart() {

/*
* Vert.x stream (ticks every 2 seconds) to Mutiny stream (Multi),
* then increment a counter.
*/
vertx.periodicStream(2000L)
.toMulti()
.subscribe().with(tick -> counter++);

/*
* HTTP endpoint, where `listen` returns a `Uni<HttpServer>`.
* Notifies of the start procedure completion by replacing and
* returning the`Uni<HttpServer>` by `Uni<Void>`.
*/
return vertx.createHttpServer()
.requestHandler(req -> req.response().endAndForget("@" + counter))
.listen(8080)
.onItem()
.invoke(() -> System.out.println("See http://127.0.0.1:8080"))
.onFailure()
.invoke(Throwable::printStackTrace)
.replaceWithVoid();
}
}

/*
* Main method, deploys a verticle and awaits for the completion with
* an `*AndAwait()` method.
*/
public static void main(String... args) {
var vertx = Vertx.vertx();
System.out.println("Deployment Starting");
vertx.deployVerticleAndAwait(MyVerticle::new, new DeploymentOptions());
System.out.println("Deployment completed");
}
}

The deployed verticle uses the Mutiny API, where the start(Promise<Void>) method is replaced by asyncStart() method that returns a Uni<Void>. The code also shows how to convert Vert.x streams into Mutiny Multi streams, and how to await for the verticle deployment to complete.

API translation

The Vert.x Mutiny bindings are generated from the existing Vert.x API translations.