Skip to main content

Observing Events

Observing the events emitted by Uni and Multi instances.

Uni and Multi emit events. Your code is going to observe and process these events:

Events

EventUni / MultiDirectionNote
itemUni + Multiupstream -> downstreamThe upstream sent an item.
failureUni + Multiupstream -> downstreamThe upstream failed.
completionMultiupstream -> downstreamThe upstream completed.
subscribeUni and Multidownstream -> upstreamA downstream subscriber is interested in the data.
subscriptionUni and Multiupstream -> downstreamEvent happening after a subscribe event to indicate that the upstream acknowledged the subscription.
cancellationUni and Multidownstream -> upstreamA downstream subscriber does not want any more events.
overflowMultiupstream -> downstreamThe upstream has emitted more than the downstream can handle.
requestMultidownstream -> upstreamThe downstream indicates its capacity to handle n items.

For each kind of event, there is an associated group providing the methods to handle that specific event: onItem(), onFailure(), onCompletion() and so on. These groups provide two methods to peek at the various events without impacting its distribution: invoke(...) and call(...). It does not transform the received event; it notifies you that something happened and let you react. Once this reaction completes, the event is propagated downstream or upstream depending on the direction of the event.

The invoke method

The invoke method is synchronous and the passed callback does not return anything. Mutiny invokes the configured callback when the observed stream dispatches the event:

Uni<String> u = uni.onItem()
.invoke(i -> System.out.println("Received item: " + i));

Multi<String> m = multi.onItem()
.invoke(i -> System.out.println("Received item: " + i));

As said above, invoke is synchronous. Mutiny invokes the callback and propagates the event downstream when the callback returns. It blocks the dispatching.

invoke-method-01.png

Of course, we highly recommend you not to block.

The following snippets show how you can log the different types of events.

multi
.onSubscription()
.invoke(() -> System.out.println("⬇️ Subscribed"))
.onItem()
.invoke(i -> System.out.println("⬇️ Received item: " + i))
.onFailure()
.invoke(f -> System.out.println("⬇️ Failed with " + f))
.onCompletion()
.invoke(() -> System.out.println("⬇️ Completed"))
.onCancellation()
.invoke(() -> System.out.println("⬆️ Cancelled"))
.onRequest()
.invoke(l -> System.out.println("⬆️ Requested: " + l));

The call method

Unlike invoke, call is asynchronous, and the callback returns a Uni<?> object.

call is often used when you need to implement asynchronous side-effects, such as closing resources.

call-method-01.png

Mutiny does not dispatch the original event downstream until the Uni returned by the callback emits an item:

multi
.onItem().call(i ->
Uni.createFrom().voidItem()
.onItem().delayIt().by(Duration.ofSeconds(1)
)
);

As shown in the previous snippet, you can use this approach to delay items. But, the primary use case is about completing asynchronous actions such as calling an asynchronous close method on a resource:

multi
.onCompletion().call(() -> resource.close());

Under the hood, Mutiny gets the Uni (by invoking the callback) and subscribes to it. It observes the item or failure event from that Uni. It discards the item value as only the emission matters in this case.

If the callback throws an exception or the produced Uni produces a failure, Mutiny propagates that failure (or a CompositeException) downstream, replacing the original event.

Summary

  • The invoke and call methods are handy when you need to observe a Uni or a Multi without changing the transiting events.
  • Use invoke for implementing synchronous side-effects or logging events.
  • The asynchronous nature of call makes it perfect for implementing asynchronous side-effects, such as closing resources, flushing data, delay items, and so on.

The following table highlights the key differences:

invokecall
Naturesynchronousasynchronous
Return typevoidUni<T>
Main use caseslogging, synchronous side-effectsI/O operations, closing resources, flushing data