Mono and Flux
Mono represents a publisher (Project Reactor) of zero or one object of type T. Mono is a container for zero or one element. It represents a single result of an asynchronous computation. In simpler terms, it emits at most one item and then terminates, either with the item or empty.
Flux, on the other hand, is a container for zero to N elements. It represents a stream of data that can emit multiple items over time. It’s suitable for handling sequences of data.
Mono and Flux provides a rich set of methods to manipulate and work with the emitted data.
Common Methods and Operations
Example #1 Mono.just
Create a new Mono that emits the specified item, which is captured at instantiation time. Create and subscribe to Mono:
public class Main {
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe();
}
}
We started consuming the data, but we are not doing anything with it.
Example #2 Mono.just
class ReactiveJavaTutorial {
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe(data -> System.out.println(data));
}
}
Output:
Hello
Example #3 Mono.just
Let’s subscribe and define what should be triggered for each of the 3 events.
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println(err), // onError
() -> System.out.println("Completed!") // onComplete
);
}
Output:
Hello Completed!
We used here the
subscribe(Consumer<? super T> consumer,
Consumer <? super Throwable> errorConsumer,
Runnable completeConsumer)
method that subscribes a Consumer to this Mono that will respectively consume all the
elements in the sequence, handle errors and react to completion.
Example #4 Mono.fromSupplier
Let’s invoke the onError signal:
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.fromSupplier(() -> {
throw new RuntimeException("Exception occurred!");
});
// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println("ERROR: " + err), // onError
() -> System.out.println("Completed!") // onComplete
);
}
Output:
ERROR: java.lang.RuntimeException: Exception occurred!
Example #5 zipWith() Method
The zipWith() method combines the result from this mono and another mono object.
public class FluxAndMonoServices {
public Mono<String> fruitsMonoZipWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsMonoZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
Mono -> s = MangoTomato
Example #6 flatMap() and flatMapMany() Methods
- flatMap() – Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).
- flatMapMany() – Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned.
public class FluxAndMonoService {
public Mono<List<String>> fruitMonoFlatMap() {
return Mono.just("Mango")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}
public Flux<String> fruitMonoFlatMapMany() {
return Mono.just("Mango")
.flatMapMany(s -> Flux.just(s.split("")))
.log();
}
public static void main(String[] args) {
FluxAndMonoService fluxAndMonoServices
= new FluxAndMonoService();
fluxAndMonoServices.fruitMonoFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitMonoFlatMapMany()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(M)
Mono -> s = M
[ INFO] (main) onNext(a)
Mono -> s = a
[ INFO] (main) onNext(n)
Mono -> s = n
[ INFO] (main) onNext(g)
Mono -> s = g
[ INFO] (main) onNext(o)
Mono -> s = o
[ INFO] (main) onComplete()
Example #7 Flux.range
The following code shows an example of the basic method with no arguments:
public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);
// Subscribe in the simplest way.
ints.subscribe();
}
}
Example #8 Flux.range
The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:
public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);
// Subscribe with a subscriber that will print the values.
ints.subscribe(i -> System.out.println(i));
}
}
Output:
1
2
3
Example #9 Flux.range
To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:
public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4)
// We need a map so that we can handle some values differently.
.map(i -> {
// For most values, return the value.
if (i <= 3) return i;
// For one value, force an error.
throw new RuntimeException("Got to 4");
});
// Subscribe with a subscriber that includes an error handler.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error));
}
}
Output:
1
2
3
Error: java.lang.RuntimeException: Got to 4
Example #10 Flux.range
The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:
public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4);
// Subscribe with a Subscriber that includes a handler for completion events.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"));
}
}
Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.
The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output: Output:
1
2
3
4
Done
Example #11 Flux.fromIterable
map() Method Example. The filter() method evaluates each source value against the given Predicate.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxFilter(int number) {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.filter(s -> s.length() > number);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices = new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFilter(2)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = zjc
s = examples
s = Flux
Example #12 Flux.fromIterable
flatMap() and delayElements() Methods Example.
public class FluxAndMonoServices {
public Flux<String> stringsFluxFlatMap() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}
public Flux<String> stringsFluxFlatMapAsync() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split(""))
.delayElements(Duration.ofMillis(
new Random().nextInt(1000)
)))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.stringsFluxFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.stringsFluxFlatMapAsync()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(z)
s = z
[ INFO] (main) onNext(j)
s = j
[ INFO] (main) onNext(c)
s = c
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onNext(a)
s = a
[ INFO] (main) onNext(m)
s = m
[ INFO] (main) onNext(p)
s = p
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(s)
s = s
[ INFO] (main) onNext(F)
s = F
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(u)
s = u
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
Example #13 Flux.fromIterable
- transform(), defaultIfEmpty() and switchIfEmpty() Methods
- transform() – Transform this Flux in order to generate a target Flux.
- defaultIfEmpty() – Provide a default unique value if this sequence is completed without any data.
- switchIfEmpty() – Switch to an alternative Publisher if this sequence is completed without any data.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxTransform(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.log();
//.filter(s -> s.length() > number);
}
public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.defaultIfEmpty("Default")
.log();
}
public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.switchIfEmpty(Flux.just("123456789","1234 12345")
.transform(filterData))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxTransform(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(ex1234)
s = ex1234
[ INFO] (main) | onNext(Flux12)
s = Flux12
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(Default)
s = Default
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(123456789)
s = 123456789
[ INFO] (main) onNext(1234 12345)
s = 1234 12345
[ INFO] (main) onComplete()
Example #14 FluxConcat
- concat() and concatWith() Methods
- concat() – Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.
- concatWith() – Concatenate emissions of this Flux with the provided Publisher (no interleave).
public class FluxAndMonoServices {
public Flux<String> fruitsFluxConcat() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return Flux.concat(chars,nrs);
}
public Flux<String> fruitsFluxConcatWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return chars.concatWith(nrs);
}
public Flux<String> fruitsMonoConcatWith() {
var chars = Mono.just("abcde");
var nrs = Mono.just("123456");
return chars.concatWith(nrs);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxConcat()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsMonoConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = 123456
Example #15 Flux.merge
- merge() and mergeWith() Methods
- merge() – Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.
- mergeWith() – Merge data from this Flux and a Publisher into an interleaved merged sequence.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxMerge() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return Flux.merge(chars, nrs);
}
public Flux<String> fruitsFluxMergeWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return chars.mergeWith(nrs);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMerge()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxMergeWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901