Skip to main content

Mono and Flux

Mono represents a publisher (Project Reactor) of zero or one object of type T. Mono is a container for zero or one element. It represents a single result of an asynchronous computation. In simpler terms, it emits at most one item and then terminates, either with the item or empty.

Flux, on the other hand, is a container for zero to N elements. It represents a stream of data that can emit multiple items over time. It’s suitable for handling sequences of data.

Mono and Flux provides a rich set of methods to manipulate and work with the emitted data.

Common Methods and Operations

Example #1 Mono.just

Create a new Mono that emits the specified item, which is captured at instantiation time. Create and subscribe to Mono:

public class Main {
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");

// subscribe to a Mono
mono.subscribe();
}
}

We started consuming the data, but we are not doing anything with it.

Example #2 Mono.just

class ReactiveJavaTutorial {

public static void main(String[] args) {

// create a Mono
Mono<String> mono = Mono.just("Hello");

// subscribe to a Mono
mono.subscribe(data -> System.out.println(data));

}
}

Output:

Hello

Example #3 Mono.just

Let’s subscribe and define what should be triggered for each of the 3 events.

    public static void main(String[] args) {

// create a Mono
Mono<String> mono = Mono.just("Hello");

// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println(err), // onError
() -> System.out.println("Completed!") // onComplete
);

}

Output:

 Hello Completed!
We used here the
subscribe(Consumer<? super T> consumer,
Consumer <? super Throwable> errorConsumer,
Runnable completeConsumer)
method that subscribes a Consumer to this Mono that will respectively consume all the
elements in the sequence, handle errors and react to completion.

Example #4 Mono.fromSupplier

Let’s invoke the onError signal:

   public static void main(String[] args) {

// create a Mono
Mono<String> mono = Mono.fromSupplier(() -> {
throw new RuntimeException("Exception occurred!");
});

// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println("ERROR: " + err), // onError
() -> System.out.println("Completed!") // onComplete
);

}

Output:

ERROR: java.lang.RuntimeException: Exception occurred!

Example #5 zipWith() Method

The zipWith() method combines the result from this mono and another mono object.

public class FluxAndMonoServices {

public Mono<String> fruitsMonoZipWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");

return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}

public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();

fluxAndMonoServices.fruitsMonoZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}

Output:

Mono -> s = MangoTomato

Example #6 flatMap() and flatMapMany() Methods

  • flatMap() – Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).
  • flatMapMany() – Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned.
public class FluxAndMonoService {

public Mono<List<String>> fruitMonoFlatMap() {
return Mono.just("Mango")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}

public Flux<String> fruitMonoFlatMapMany() {
return Mono.just("Mango")
.flatMapMany(s -> Flux.just(s.split("")))
.log();
}

public static void main(String[] args) {

FluxAndMonoService fluxAndMonoServices
= new FluxAndMonoService();

fluxAndMonoServices.fruitMonoFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitMonoFlatMapMany()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}

Output:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(M)
Mono -> s = M
[ INFO] (main) onNext(a)
Mono -> s = a
[ INFO] (main) onNext(n)
Mono -> s = n
[ INFO] (main) onNext(g)
Mono -> s = g
[ INFO] (main) onNext(o)
Mono -> s = o
[ INFO] (main) onComplete()

Example #7 Flux.range

The following code shows an example of the basic method with no arguments:

public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);

// Subscribe in the simplest way.
ints.subscribe();
}
}

Example #8 Flux.range

The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:

public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);

// Subscribe with a subscriber that will print the values.
ints.subscribe(i -> System.out.println(i));
}
}

Output:

1
2
3

Example #9 Flux.range

To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:

public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4)
// We need a map so that we can handle some values differently.
.map(i -> {
// For most values, return the value.
if (i <= 3) return i;
// For one value, force an error.
throw new RuntimeException("Got to 4");
});
// Subscribe with a subscriber that includes an error handler.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error));
}
}

Output:

1
2
3
Error: java.lang.RuntimeException: Got to 4

Example #10 Flux.range

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4);

// Subscribe with a Subscriber that includes a handler for completion events.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"));
}
}

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output: Output:

1
2
3
4
Done

Example #11 Flux.fromIterable

map() Method Example. The filter() method evaluates each source value against the given Predicate.

public class FluxAndMonoServices {

public Flux<String> fruitsFluxFilter(int number) {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.filter(s -> s.length() > number);
}

public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices = new FluxAndMonoServices();

fluxAndMonoServices.fruitsFluxFilter(2)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}

Output:

s = zjc
s = examples
s = Flux

Example #12 Flux.fromIterable

flatMap() and delayElements() Methods Example.

public class FluxAndMonoServices {

public Flux<String> stringsFluxFlatMap() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}

public Flux<String> stringsFluxFlatMapAsync() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split(""))
.delayElements(Duration.ofMillis(
new Random().nextInt(1000)
)))
.log();
}


public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();

fluxAndMonoServices.stringsFluxFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.stringsFluxFlatMapAsync()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}

Output:

[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(z)
s = z
[ INFO] (main) onNext(j)
s = j
[ INFO] (main) onNext(c)
s = c
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onNext(a)
s = a
[ INFO] (main) onNext(m)
s = m
[ INFO] (main) onNext(p)
s = p
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(s)
s = s
[ INFO] (main) onNext(F)
s = F
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(u)
s = u
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)

Example #13 Flux.fromIterable

  • transform(), defaultIfEmpty() and switchIfEmpty() Methods
  • transform() – Transform this Flux in order to generate a target Flux.
  • defaultIfEmpty() – Provide a default unique value if this sequence is completed without any data.
  • switchIfEmpty() – Switch to an alternative Publisher if this sequence is completed without any data.
public class FluxAndMonoServices {

public Flux<String> fruitsFluxTransform(int number) {

Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);

return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.log();
//.filter(s -> s.length() > number);
}

public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {

Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);

return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.defaultIfEmpty("Default")
.log();

}

public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {

Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);

return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.switchIfEmpty(Flux.just("123456789","1234 12345")
.transform(filterData))
.log();

}

public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();

fluxAndMonoServices.fruitsFluxTransform(5)
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}

Output:

[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(ex1234)
s = ex1234
[ INFO] (main) | onNext(Flux12)
s = Flux12
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(Default)
s = Default
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(123456789)
s = 123456789
[ INFO] (main) onNext(1234 12345)
s = 1234 12345
[ INFO] (main) onComplete()

Example #14 FluxConcat

  • concat() and concatWith() Methods
  • concat() – Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.
  • concatWith() – Concatenate emissions of this Flux with the provided Publisher (no interleave).


public class FluxAndMonoServices {

public Flux<String> fruitsFluxConcat() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");

return Flux.concat(chars,nrs);
}

public Flux<String> fruitsFluxConcatWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");

return chars.concatWith(nrs);
}


public Flux<String> fruitsMonoConcatWith() {
var chars = Mono.just("abcde");
var nrs = Mono.just("123456");

return chars.concatWith(nrs);
}

public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();

fluxAndMonoServices.fruitsFluxConcat()
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitsFluxConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitsMonoConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}

Output:

s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = 123456

Example #15 Flux.merge

  • merge() and mergeWith() Methods
  • merge() – Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.
  • mergeWith() – Merge data from this Flux and a Publisher into an interleaved merged sequence.
public class FluxAndMonoServices {

public Flux<String> fruitsFluxMerge() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return Flux.merge(chars, nrs);
}

public Flux<String> fruitsFluxMergeWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return chars.mergeWith(nrs);
}

public static void main(String[] args) {

FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();

fluxAndMonoServices.fruitsFluxMerge()
.subscribe(s -> {
System.out.println("s = " + s);
});

fluxAndMonoServices.fruitsFluxMergeWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}

Output:

s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901

Source Code:

https://github.com/ZbCiok/reactor-mono-and-flux-examples