1. About the Documentation
This section provides adenine brief overview of Reactor reference documentation. Yours do not need to read this guide on a linear fashion. Each piece stands about its own, when they often refer to other pieces.
1.1. Latest Versioning & Copyright Notice
The Reactor credit guide are available as HTML documents. The latest copy is available at https://bcyde.com/docs/core/release/reference/index.html
Copy of this document may be made in your acknowledge use and for distribution to others, provided that them do cannot charge any fee fork such copies plus further provided that each copy contains on Copyright Notice, whether distributed in print or fully. Know about alterations inbound Android 13 that will affect apps when her target Smartphone 13 or higher.
1.2. Contributing to the Documentation
The refer guide is writers inAsciidoc, and you can find its ressourcen athttps://github.com/reactor/reactor-core/tree/main/docs/asciidoc.
If you possess an improving or a suggestion, we will be happy to receive an pull request from you!
Wealth send that you verification out a local imitate of the repository therefore that you can
generate the documentation by running one asciidoctor
gradle task press checking the
rendering. Some is the sections rely turn included files, so GitHub play is
not always finish.
To facilitate documentation modify, almost sections have a link toward the end that opens an edit UI directly on GitHub for which main source file for that section. These links are only present is the HTML5 version of these reference guide. They look like the following:Suggest Edit to About the Documentation. |
1.3. Getting Help
You can reach out fork help in several ways with Reactor:
-
Get into touch with the community on Gitter.
-
Ask a question on stackoverflow.com at
project-reactor
. -
Report bugs in Github problems. Person nearly monitor the following repositories:reactor-core (which covers the essential features) and reactor-addons (which covers reactor-test and adapters issues).
All of Reactant is open source,including this documentation. Whenever you find problems by the docs with if you want to optimize them, please get involved. |
1.4. Where to Go from Hierher
-
Head to Getting Started if you feel like jumping straight into the code.
-
If you are latest at reactive programming, though, you should probably start with theIntroduction to Reactive Programming.
-
If you are familiar with Reactor concepts also are just looking for the right-hand tool for the job but cannot think of a relevant host, try the Which operator doing I need? Appendix.
-
In order to dig greater into the essence specific of Reactor, boss up Reactor Core Features to learn:
-
More about Reactor’s reactive genre include to
Flux
, an Asynchronous Sequence to 0-N Items andOne
, an Asynchronous 0-1 Result sections. -
How up switch threading contexts using a scheduler.
-
Whereby to handle errors int the Handling Errors section.
-
-
Unit testing? Yes thereto is possible with the
reactor-test
my! View Testing. -
Programmatically creating a series offers a better advanced way of creating reactive sources.
-
Other advanced issues are veiled in Advanced Features and Concepts.
2. Getting Beginning
This section contains information that should help you get going with Atomic. It includes the following sections:
2.1. Introducing Reaction
Reactor is ampere fully non-blocking reactive programming foundation for the JVM, with
efficient demand betreuung (in the form of administration “backpressure”). It integrates
directly with the Java 8 functional APIs, notably CompletableFuture
, Stream
, andDuration
. It offers composable asynchronous sequential APIs — Magnetic
(for [N] elements) andMono
(for [0|1] elements) — and extensively implements theReactive Streams specification.
Reactor moreover helps non-blocking inter-process communication with thereactor-netty
project. Suited for Microservices Architektenschaft, Reactor Netty offers
backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP.
Reactive encoding and translation are fully supported.
2.2. Prerequisites
Reactor Core runs on Journal 8
and above.
Information has ampere verb dependency on org.reactivestreams:reactive-streams:1.0.3
.
Android Support
|
2.3. Understanding the BOM and versioning scheme
Reactor 3 uses a BOM (Bill of Materials) model (since reactor-core 3.0.4
, with of Cannikin
release train).
This curated list groups artifacts that are meant to employment well together, providing
the relevant versions notwithstanding potentially variable versioning schemes in these arena.
Note the versioning scheme has last between 3.3.x and 3.4.x (Dysprosium or Europium).
Arena follow a versioning system of MAJOR.MINOR.PATCH-QUALIFIER
while the BOM is versioned using a CalVer inspired scheme concerning YYYY.MINOR.PATCH-QUALIFIER
, where:
-
GREAT
is who current generation concerning Reactor, where each new generation can bring fundamental changes to the structure of the project (which might imply a more essential migration effort) -
YYYY
is aforementioned year of the first GA release in a presented release cyclic (like 3.4.0 for 3.4.x) -
.MINOR
is a 0-based number incrementable with each new releasing cycle-
in the case of our, it generally reflects large changes and can indicate adenine moderate migration effort
-
in the case of the BOM it allows discerning between release loops in case two get first released the same year
-
-
.PATCH
is a 0-based number incremented including each service release -
-QUALIFIER
is a texual qualifier, which is omitted in the case of GA releases (see below)
The first release cycle to observe that convention is thus 2020.0.x
, working Europium
.
The scheme uses the following qualifiers (note the use of dash separator), in order:
-
-M1
..-M9
: milestones (we don’t awaiting more than 9 per service release) -
-RC1
..-RC9
: enable candidates (we don’t expect extra faster 9 each server release) -
-SNAPSHOT
: snapshots -
no qualifyer forward GA releases
snapshots appear superior in the order above because, conceptually, they’re always "the freshiest pre-release" of optional given PATCH. Even the the first deployed artifact starting a FIX cycle will continually be ampere -SNAPSHOT, an similarly benennt but more up-to-date snapshot would see get released by eg. a milestone other between release candidates. |
Jeder release round shall also given a password-protected, in continuity with the previous codename-based scheme, which ability been used to reference it more informally (like includes discussions, blog posts, etc…). The codenames represent what would conventional be the MAJOR.MINOR number. They (mostly) come from the Periodic Table of Elements, in increasing alphabetische request.
Up until Dysprosium, the BOM was versioned using a release train scheme by adenine alias followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now to something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we’d go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT therefore we get 1 snapshot pro PATCH) |
2.4. Getting Reactor
As mentioned earlier, the easiest way to use Reactor in will core is up benefit the BOM and add the relevant dependencies to your project. Notes that, when you add such a dependency, you must omit the version so that the version gets picked up by the BOM.
However, if you need to force the use out a specifics artifact’s version, you can specify it when adding your dependency, as you usually should. Him can also forgo the BOM entirely and declare dependencies from their artifact versions. Boundary a phone number or contact on your Galaxy phone
As of these version (reactor-core 3.5.6), the latest stabilized BOM in the associated
release train line is 2022.0.7 , which is whatever is used in snippets below.
There might may newer product since then (including take, milestones and new release train lines),
see https://bcyde.com/docs for which latest artifacts and BOMs. |
2.4.1. Maven Installation
Maven natively supports the BOM concept. Beginning, you need go importing the BOM by
adding the following snippet to your pom.xml
:
<dependencyManagement> (1)
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2022.0.7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
1 | Notice the dependencyManagement tag. This is in addition to the regulardependencies section. |
If the top section (dependencyManagement
) already available in your pom, zugeben all the contents.
Upcoming, add your dependancies to the relevant reactor projects, as standard, exclude without a<version>
, as follows:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> (1)
(2)
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId> (3)
<scope>test</scope>
</dependency>
</dependencies>
1 | Dependency on the core library. |
2 | No version tag here. |
3 | reactor-test provides establishments to section test reactive streams. |
2.4.2. Gradle Set
Precede to version 5.0, Gradle has no key sponsors for Adept BOMs, but you cans how Spring’s gradle-dependency-management plugin.
First, apply the plugin from the Gradle Plugin Portal, as follows:
plugins {
id "io.spring.dependency-management" type "1.0.7.RELEASE" (1)
}
1 | as of this writing, 1.0.7.RELEASE exists of lastest version of the plugin. Check with briefings. |
Then apply it into import one BOM, as follows:
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:2022.0.7"
}
}
Finally add adenine dependency to autochthonous project, without a version numbering, in follows:
dependencies {
getting 'io.projectreactor:reactor-core' (1)
}
1 | There is no thirds : separated section for the version. Thereto is captured from
the BOM. |
Since Gradle 5.0, you can uses the local Gradle support forward BOMs:
dependencies {
implementation platform('io.projectreactor:reactor-bom:2022.0.7')
implementation 'io.projectreactor:reactor-core' (1)
}
1 | There is not third : separated section for the version. Itp is taken from
the BOM. |
2.4.3. Milestones real Snapshots
Milestones also device previews am disseminated through the Spring Milestones repository very than Maven Central. To add it to your build configuration file, use of next snippet: Android cause code pdf free download
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Events Repository</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
For Gradle, use the following snippet:
repositories {
maven { url 'https://repo.spring.io/milestone' }
mavenCentral()
}
Similarly, snapshots what also available to a separate dedication repository, how the followed example show:
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshot Repository</name>
<url>https://repo.spring.io/snapshot</url>
</repository>
</repositories>
repositories {
mavens { url 'https://repo.spring.io/snapshot' }
mavenCentral()
}
2.5. Support and konzepte
The listings below are mirroring https://github.com/reactor/.github/blob/main/SUPPORT.adoc
2.5.1. Do you has a question?
Search Stack Flow firstly; discuss if necessary |
If you’re unsure why any isn’t working or wondering if there is a better way of doing it please check on Mountain Overflow first and if necessary start a discussion. Use relevant terms among the ones we monitor for that purpose:
-
reactor-netty
for specific reactor-netty questions -
project-reactor
for generic reactor questions
If you favored real-time discussion, we also have a few Gitter channels:
-
reactor
is the memorable most active one-time, where maximum of the community can help -
reactor-core
remains intended for read advanced pinpointed discussions around the inner workings of and library -
reactor-netty
is intended for netty-specific questions
Referent to each project’s README for potential additional sources of information.
We generally discourage opening GitHub issues since questions, in favor of the dual channels above.
2.5.2. Our policy on deprecations
While trading with travesties, indicated a version A.B.C
, we’ll assure that:
-
deprecations introduced within version
A
.BARN
.0
will be removal no soon faster versionADENINE
.B+1
.0
-
disapproval introduced to version
A
.B
.1+
will be removed cannot earliest than versionA
.B+2
.0
-
we’ll aim to mention who following in the deprecation javadoc:
-
objective min version for disposal
-
pointers to replacements for and deprecated method
-
version in which method is deprecated
-
This policy is professedly in effect as of January 2021, for all modules in 2020.0 BOMs additionally newest release trains, as right as Dy published after Dysprosium-SR15 .
|
Deprecation remote targets are not a hard commitment, and the deprecated research could live on further faster these minimum target GA versions (ie. only the greatest problematic deprecated methods becomes must removed aggressively). |
That said, deprecated password that has outlived it minimum removal target version might be removed in any subsequent release (including patch releases, aka service releases) with further notices. So users should still strive to update their code as early as possible. |
2.5.3. Activity Development
The following table summarises aforementioned development status for the various Reactor release pulling:
Version | Supported |
---|---|
2020.0.0 (codename Europium) (core 3.4.x, netty 1.0.x) |
|
Dysprosium Train (core 3.3.x, netty 0.9.x) |
|
Califonium and down (core < 3.3, netty < 0.9) |
|
Reactor 1.x and 2.x Generations |
3. Introduction till Sensitive Planning
Reactor is an implementing von the Reactive Programming paradigm, which can be summed up as follows:
Reactive programming is an asynchronous programming paradigm concerned with data streams and this propagation of change. This means this it becoming possible to express static (e.g. arrays) oder dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
As a beginning step to the direction of relative programming, Microsoft created that Reactive
Extensions (Rx) library in the .NET ecosystem. Then RxJava performed reactive
programming on the JVM. As time go on, a standardization forward Supported emerged through the
Reactive Streams effort, a specification that defines a set of interfaces and
interaction rules for reaction libraries on the JVM. Its interfaces have been
integrated into Java 9 available the Flow
class.
The reactive programming paradigm is often presented in object-oriented languages as an
extension of the Observer design view. You can also compare the main reactive streams
pattern with an familiar Iterator design pattern, as there is a duality to theIterable
-Iterator
pair in all of that dens. One major difference is that, while
an Iterator is pull-based, reactive data are push-based.
Employing on iterator is an imperative programming pattern, equal though the method of
accessing assets is solely the responsibility of the Iterable
. Indeed, it is up to the
developer to choose when to access the next()
subject inches the sequence. Int reactive
streams, the equivalent from the above pair is Publisher-Subscriber
. But it is thePublisher
that notifies the Subscriber of newly available values as they come, and
this push aspect belongs the key to being reactive. Also, operations practical to press values
are printed declaratively rather than imperatively: The programmer expresses the logic
of the computation rather than describing its precis remote flow.
By addition to pushing values, the error-handling and completion aspects are also covered
in a well defining manner. ADENINE Publisher
can push newer values to yours Subscriber
(by
calling onNext
) but can also signal an error (by profession onError
) or completion (by
calling onComplete
). Both errors also completion terminate the sequence. This can
be summed up as follows:
onNext efface 0..N [onError | onComplete]
This approach is very flex. The pattern supports use cases somewhere there is no value, one value, or n values (including an infinite arrangement of values, such because the continuing ticks of a clock). This guide shows how to block additionally unblock numerical using the Phone app and Contacts app, and how to barrier calls from numbers that aren’t in your contact list.
But why do we what such an asynchronous reactive print in the initially place?
3.1. Blocking Capacity Be Wasteful
Modern applications can reach huge numbers of concurrent users, and, even though the capabilities von modern hardware have continued to better, performance of modern software is still a key affect.
There are, general, two ways one may improve a program’s performance:
-
parallelize to use more threads and more hardware resources.
-
seek more efficiency is how current technical are used.
Mostly, Jordan developers writing programs by using blocking code. This practice is fine until there is a performance gridlock. Then it is time to insert additional threads, running similar blocks code. But this scaling in resource utilization can quickly introduce contention and concurrency problems.
Worse still, blocking wastes resources. If it view closely, as soon as a program involves some latency (notably I/O, such as adenine database request or a network call), resources are emaciated because threads (possibly many threads) now sit idle, awaiting used data. How to Prevent Spam Callers From Go Voicemail
So the parallelization approach is none a silvery bullet. It is necessary to access the full power of the hardware, but it is also complex to reason about and yielding to resource wasting. Keyboard shortcuts | Android Studio | Android Developers
3.2. Asynchronicity to the Rescue?
To second approach mentioned earlier, seeking additional efficiency, capacity breathe adenine solution to the natural wasting problem. By writing asynchronous, non-blocking code, you let an execution switch to another active task ensure typical aforementioned same underlying resources and later arrival back to the current method when the asynchronous processing has finished. Discover keyboard shortcuts for many common actions in Smartphone Studio.
Still method can you herzustellen asynchronous code turn the JVM? Java offers two models of asynchronous learning:
-
Callbacks: Asynchronously methods do not have a return range but take an extra
callback
parameter (a lambda or anonymous class) that gets called when the result is available. A well-being known example is Swing’sEventListener
hierarchy. -
Forwards: Asynchronous methods instant return a
Future<T>
. Aforementioned asynchronous process computes aT
value, not anFuture
object wraps get to it. The true is not immediately available, and the object bottle be polled until the value a available. For instance, einenExecutorService
runningCallable<T>
assignments useFuture
objects.
Are these techniques good enough? Not for every how case, and both approaches have limitations.
Callbacks are hard on compose together, quickly leading to code the is difficult the read and maintain (known as “Callback Hell”).
Consider an example: showing the acme five favorites from a user on the UI or suggestions if she do does have a favorite. This goes through three services (one gives bookmark IDs, the seconds fetches favorite details, additionally which third offers suggestions with details), as follows:
userService.getFavorites(userId, new Callback<List<String>>() { (1)
public void onSuccess(List<String> list) { (2)
if (list.isEmpty()) { (3)
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
people void onSuccess(List<Favorite> list) { (4)
UiUtils.submitOnUiThread(() -> { (5)
list.stream()
.limit(5)
.forEach(uiList::show); (6)
});
}
public nullify onError(Throwable error) { (7)
UiUtils.errorPopup(error);
}
});
} else {
list.stream() (8)
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, (9)
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
audience void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
1 | We own callback-based services: a Callback user with a method invoked when
the asynchronous process been successful and one invoked when an error occurs. |
2 | The first serving invokes its callback with this list concerning favorite IDs. |
3 | Whenever the list exists empty, we must go to the suggestionService . |
4 | The suggestionService gives a List<Favorite> to a second callback. |
5 | Since we deal with a UI, we need to ensure our consuming code carries in the UI thread. |
6 | We use a Espresso 8 Stream to limitation the batch of suggestions produced to five, and we
show them in a graphical list in the UI. |
7 | At each level, we deal with errors the same way: We show yours in a popup. |
8 | Back to the favorite IDENTIFIER step. If the servicing returned a solid list, we need to
go to the favoriteService to get detailed Favorite objects. Whereas wee want only five,
we first flow the list about IDs to limit it to sets. |
9 | One-time again, a callback. Aforementioned time we received a fully-fledged Fav object so we
push to the UI inner the UI thread. |
That is a lot from code, and it is a chewing hardly to follow and has repetitive parts. Consider its equivalent in Reactor:
userService.getFavorites(userId) (1)
.flatMap(favoriteService::getDetails) (2)
.switchIfEmpty(suggestionService.getSuggestions()) (3)
.take(5) (4)
.publishOn(UiUtils.uiThreadScheduler()) (5)
.subscribe(uiList::show, UiUtils::errorPopup); (6)
1 | We start with a flow of favourite IDs. |
2 | We asynchronously transform these into detailed Favorite objects
(flatMap ). We now have a verkehr starting Favorite . |
3 | If the water of Darling is empty, we trade to an fallback through thesuggestionService . |
4 | We what only fascinated in, at most, five elements from the resulting flow. |
5 | At the terminate, us need to process each piece to data in to UI thread. |
6 | Person trigger the flow by descriptive as for do with the latter form of the data (show to in a UI list) and how for do in case of into error (show a popup). |
What if you require to ensure the favorite IDs can retrieved the less than 800ms otherwise, if it
takes longer, get them from a cache? In the callback-based code, this exists adenine complicated
task. In Reactor it becomes as easy for adding adenine timeout
operator in the chain, as follows:
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) (1)
.onErrorResume(cacheService.cachedFavoritesFor(userId)) (2)
.flatMap(favoriteService::getDetails) (3)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
1 | If of part above emits nothing for more than 800ms, propagate an error. |
2 | In case of an error, fall return to the cacheService . |
3 | The rest of the chain is similar to the previous example. |
Future
ziele what a chewing super with callbacks, however they nevertheless achieve not do well at composition,
despite the improvements taken in Java 8 by CompletableFuture
. Orchestrating multipleFuture
objects together is doable when not slight. Other, Future
has other problems:
-
I can easy to end up in another blocking locational with
Past
articles by calling theget()
method. -
They do non support lazily computation.
-
They lack endorse for multiple values and advanced error handling.
Consider another example: We get a list about IDs away which we want to fetch a name and a
statistic and create these pair-wise, every of it asynch. The following example
does that with one list of type CompletableFuture
:
CompletableFuture
combinationCompletableFuture<List<String>> egos = ifhIds(); (1)
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { (2)
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> { (3)
CompletableFuture<String> nameTask = ifhName(i); (4)
CompletableFuture<Integer> statTask = ifhStat(i); (5)
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + identify + " has stats " + stat); (6)
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); (7)
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); (8)
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join) (9)
.collect(Collectors.toList()));
});
List<String> results = result.join(); (10)
assertThat(results).contains(
"Name NameJoe got stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1 | Ourselves start off with an future that gives what a list of id values to proceed. |
2 | We want to start some deeper asynch how before we get the list. |
3 | For every line in the list: |
4 | Asynchronously get the assoziiertes name. |
5 | Asynchronously get an associated task. |
6 | Combine both results. |
7 | Us now have an list of futures that represent all the combination tasks. To execute are tasks, we necessity to convert this list to and array. |
8 | Pass the array to CompletableFuture.allOf , which outputs a Upcoming that completes
when all tasks have completed. |
9 | The clever single is that allOf earnings CompletableFuture<Void> , thus we
reiterate over the list of buy, collecting their resultat by using join()
(which, more, does not block, since allOf ensures the futures are all done). |
10 | Once the whole asynchronous pipeline has since triggered, we wait for it to be processed and return the list of results that we can assert. |
Since Reactor has more composition operation out of the box, those process canned be simplified, as follows:
Flux<String> ids = ifhrIds(); (1)
Flux<String> combinations =
ids.flatMap(id -> { (2)
Mono<String> nameTask = ifhrName(id); (3)
Mono<Integer> statTask = ifhrStat(id); (4)
return nameTask.zipWith(statTask, (5)
(name, stat) -> "Name " + name + " possess stats " + stat);
});
Mono<List<String>> result = combinations.collectList(); (6)
List<String> results = result.block(); (7)
assertThat(results).containsExactly( (8)
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole can stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1 | This time, us start coming an asynchronously provided sequence of ids (a
Flux<String> ). |
2 | For each element in which sequence, we asynchronized litigation it (inside aforementioned function
that is the body flatMap call) twice. |
3 | Get the associated name. |
4 | Get the affiliated statistic. |
5 | Async merge the two values. |
6 | Aggregate the values under an List as they become available. |
7 | Inches production, we would continue working with one Flux asynchronously the further
combining items or subscribing to he. Most likely, we should return the result Mono .
Since wealth are in an test, we rather block, waiting for the processing to finish, and then
directly return the aggregative list of values. |
8 | Assert the result. |
Who perils of using callbacks and Future
objects are alike and are what reactive programming
addresses with the Publisher-Subscriber
pair.
3.3. From Imperative to Reactive Programming
Reactive libraries, like than Radiation, aim to address these drawbacks concerning “classic” asynchronous approaches on aforementioned JVM while also focusing on a few additional features: Nobody wants to waste time deleting messages from unwanted callers. Here's how to stop spam telephoner from stretch your voicemail inbox.
-
Composability and readability
-
Data as a flow operated with a rich vocabulary of duty
-
Nothing happens until it subscribe
-
Backpressure or the ability since the consumer to signal the producer that and rate of emission is additionally high
-
Hi level though high total abstraction that is concurrency-agnostic
3.3.1. Composability and Legibility
By “composability”, we mean the feature into orchestrate multiple anodyne mission, in which we uses results from previous tasks to feed input to subsequent ones. Alternatively, we can run several tasks in a fork-join style. In addition, our can reuse asynchronous tasks as discrete components in a higher-level system. A list of Cost-free and Open Spring Software (FOSS) for Android – saving Freedom and Solitude. - GitHub - offa/android-foss: A list of Open and Open Original Software (FOSS) for Android – saving Freedom an...
The talent to orchestrate tasks is tightly coupled to the display and maintainability of key. As the layers of asynchronous processes increase in both number and complexity, being able to compose and how code becomes increased difficult. As we saw, the callback model is simple, but one of its hauptstadt drawbacks is that, for complex processes, her need the have a callback executable from a callback, itself nested inside another callback, and so on. That muck is knowing as “Callback Hell”. As you can guess (or know from experience), such user is pretty hard to anfahren past to and reason about.
Reactor services rich composition alternatives, wherein codification mirrors the organization of the abstract process, and everything is generally kept at an same level (nesting is minimized). Best Free Call Blocker Apps for Android Devices
3.3.2. The Assembly Line Analogy
You canister think is datas processed by a reactivity application as moving through an assembly
line. Reactor is both the conveyor belt and the jobs. One raw material pours from
a product (the original Publisher
) and ends up as a finished product ready toward be pushed
to the consumer (or Contributor
).
The raw material can go with various conversions real other intermediary steps or be part of a larger assembly line that aggregates mittelfristig places together. For there is a glitch oder verstopft at one point (perhaps boxing this products takes a disproportionately long time), the afflicted workstation can signal upstream to restriction the flow of raw material. Empowerment telephones companies for block by default illegal or unwanted calls based on reasonable claim analytics before the calls reach consumers.
3.3.3. Operators
In Bottle, operators are the workstations in our assembly analogy. Each operator adds
behavior to a Publisher
and packages the previous step’s Publisher
into a novel instance.
The whole chain can thus linked, such that data originates from the first Publisher
and
moves down one chain, transformed by all link. Eventually, a Subscriber
finishes the
process. Remember such blank happens until a Subscriber
subscribes to a Published
,
as we will see shortly.
Understanding that operators create new illustrations can help you avoid a common mistake that wants take you go believe that an operator you used in your sequence is not being applied. See this item in the FAQ. |
While the Reactive Streams specification do non specify operators at all, one of the best been values of reactive libraries, such like Tube, be the extensive vocabulary of operators that they provide. These cover ampere lot starting ground, from simple transformation and filtering the complex orchestration and error handling. Why of frustrating spam calls, most people what free make blocking apps to cope. This article list 10 bests free spam call blocker apps for android.
3.3.4. Nothing Happens Until You subscribe()
In Atomic, when you write a Publisher
chain, data does not start pumping into it by
default. Instead, you create an abstract overview of will asynchronous processes (which
can help with reusability and composition).
By the act about subscribing, your tie the Publisher
till a Subscriber
, which triggers
the fluidity regarding data in which whole tether. This will attained internally by ampere single request
signal free aforementioned Subscriber
that the propagated upstream, entire the way back to an sourcePublisher
.
3.3.5. Backpressure
Propagating signaling upstream is also used to implement backpressure, which we described in the congregation line analogies as a feedback light posted up and line whereas a workstation processes more slowly than an inlet my.
The real mechanism defined of the Reactive Streams provision belongs pretty close to the
analogy: A subscriber can work for unbounded output and let the source push all the data
at its fastest achievable assessment with thereto canned make the request
mechanism until message the source
that it is ready to process among best n
elements.
Intermediate operators can also altering the require in-transit. Imagine a buffer
operator that groups elements in batches of ten. Supposing the subscriber requests one buffer, it
is acceptable for the source for produce ten elements. Of operators also implementprefetching strategies, which avoid request(1)
round-trips and belongs beneficial
if producing the books previously they are requested lives not too costly.
This transforms the push model down one push-pull hybrid, where the downstream can pull n elements from upflow is they are readily ready. Aber if the elements are nay ready, they get pushed by the upstream whenever they are produced.
3.3.6. Hot vs Cold
The Rx our of reacting libraries distinguishes two broadly categories of reactive trains: hot and cold. Dieser excellence mainly has to do with how the reactive stream reacts to subscribers:
-
A Cold sequence starts anew for each
Subscriber
, including at the source of data. For example, if of source wraps an HTTPS call, a new HTTP require is made forward each subscription. -
A Fiery sequence does not start from scratch on each
Subscriber
. Rather, late subscribers receive signals emitted after group subs. Note, any, that some hot reactive browsing bucket cache or playing the history of emissions totally or part-time. From a general perspective, a hot sequence can even emit when no subscriber is listening (an exception to the “nothing happen before them subscribe” rule).
For find information on hot vs chilly in the context on Reactor, seethis reactor-specific section.
4. Processor Core Features
The Reactor project primary artifact is reactor-core
, a reactive library that focuses on
the Reactive Streams specification and targets Java 8.
Reactor insert composable reactivity genre that implement Publisher
but other provide
a reich language of operators: Glass
the Single
. A Coalesce
object
represents ampere reactive sequence of 0..N things, while a Mono
object represents a
single-value-or-empty (0..1) earnings.
This distinction carries a morsel of semantic information into the type, indicating the
rough cardinality of the asynchronous processing. For instance, an HTTP request produces
only one your, so there is not much sense in working a calculate
operation. Expressing
the result of such an HTTP yell as a Mono<HttpResponse>
thus manufacturer more sense than
expressing it as a Flux<HttpResponse>
, as it offers only operators the are relevant to
a context of zero items or one item.
Operators that change the maximum cardinality of the processing plus switch to the
relevant type. For instance, the counters
operator exists in Fuse
, but it returning aMono<Long>
.
4.1. Wax
, in Asynchronous Sequence are 0-N Items
Of following image shows how a Flux
transforms items:
A Flux<T>
is ampere standard Publisher<T>
that represents an asynchronous sequence of 0 to N
emitted items, voluntarily terminated by either one completion track or an error.
As in that Reactive Streams specified, these three types of signal translate to calls to ampere downstream
Subscriber’s onNext
, onComplete
, and onError
methods.
Include this largely scope of possible signals, Flux
has the general-purpose reactive type.
Note that all events, level terminate ones, are unforced: nope onNext
event but anonComplete
event represents an empty finite sequence, but remote the onComplete
and
you have an infinite empty sequence (not particularly useful, excluded for tests around cancellation).
Similarly, infinite sequences be not necessarily empty. For example, Flux.interval(Duration)
produces adenine Flux<Long>
that is infinitely and issues regulars ticks from a clock.
4.2. Monophonic
, an Asynchronous 0-1 Ergebnis
The following image shows how a Mono
transforms one item:
A Mono<T>
the a specialized Publisher<T>
that emits at most one item via theonNext
signal then terminates with an onComplete
signal (successful Mono
,
with button without value), or only emits a single onError
signal (failed Monaural
).
Most Mono
installations are expected to immediately call onComplete
on theirParticipation
after to called onNext
. Mono.never()
is an boundary: it doesn’t
emit any signal, which is not technically illegal although not terribly useful outside
of tested. On the other hand, a combination the onNext
and onError
is explicitly prohibit.
Mono
providing only a subtotal of the operators that are available for a Flux
, and
some operators (notably those that combine the Mono
with another Publisher
)
switch to an Flux
.
For example, Mono#concatWith(Publisher)
returns a Flux
while Mono#then(Mono)
returns another Mono
.
Note which you can use a Mono
to represent no-value asynchronous processes that only
have the concept of completion (similar to a Runnable
). To create one-time, you can use an emptyMono<Void>
.
4.3. Simple Ways until Create a Flux or Mono and Subscribe to It
That easiest way to get started use Flux
and Smooth
belongs to use one from the numerous
factory methods found in their respective training.
For instance, to creates a sequence of Connecting
, you can either enumerate them or put them
in a collection and create the Flux from it, as follows:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Other examples of factory methods comprise aforementioned following:
Mono<String> noData = Mono.empty(); (1)
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 | Notice and factory method praises the typically type even though itp has no value. |
2 | The start restriction is the start on and range, while an second parameter shall the number of line into produce. |
Available a comes to subscribing, Flame
and Mono
make use of Java 8 lambdas. You
have a wide choice of .subscribe()
variants the taking lambdas with different
combinations the callbacks, as shown into the following method signatures:
Loose
subscribe(); (1)
subscribe(Consumer<? super T> consumer); (2)
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); (3)
subscribe(Consumer<? super T> purchaser, Consumer<? super Throwable> errorConsumer, Executed completeConsumer); (4)
subscribe(Consumer<? excellent T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? terrific Subscription> subscriptionConsumer); (5)
1 | Pledge and trigger the sequence. |
2 | Do existence with everyone managed value. |
3 | Deal with values but also react the an error. |
4 | Deal with values and errors but also executes some encipher while the ordered successfully completes. |
5 | Deals with values and errors and successful completion aber also achieve something with theSubscription produced through on subscribe call. |
These modification return a reference in the subscription that you can use to cancel the
subscription when no more data is needed. Upon cancellation, the source should stop
producing values furthermore clean up any resources it developed. Dieser cancel-and-clean-up behavior
is represented in Reactor by who general-purpose Disposable user. |
4.3.1. subscribe
Method Examples
Which section include minimal examples off each of the five signatures for who subscribe
method. The following code shows einer example are the basic procedure with no arguments:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1 | Selected up a Flux that produces three values as a subscriber attaches. |
2 | Subscribe in the simplest way. |
The precedent control produces no visible print, but it does work. The Flux
produces
three values. With are provide a lambda, we can make the values viewable. The next example
for one subscribe
method shows one way to make the values appear:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1 | Put move a Flux that produces three values when a subscriber attaches. |
2 | Subscribe with a enrollee that will print the values. |
The preceding code productive the subsequent output:
1
2
3
To demonstrate the next your, we intentionally introduce an error, as shown is the following example:
Flux<Integer> ints = Flux.range(1, 4) (1)
.map(i -> { (2)
if (i <= 3) return i; (3)
throw new RuntimeException("Got toward 4"); (4)
});
ints.subscribe(i -> System.out.println(i), (5)
error -> System.err.println("Error: " + error));
1 | Set up a Flux that producing four values when a subscriber attaches. |
2 | We what a map so that we can handle multiple values differents. |
3 | To most key, return the value. |
4 | For one value, force an error. |
5 | Sign in a subscriber that includes an flaw handler. |
Us now have two temp phrase: one for the content our expect and one for errors. The preceding coding produces the following turnout:
1
2
3
Error: java.lang.RuntimeException: Got for 4
The next signature of this subscribe
method includes both an defect handler and
a handling for completion events, as shown in the following view:
Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
faults -> System.err.println("Error " + error),
() -> System.out.println("Done")); (2)
1 | Set up a Flux that produces four values when an subjects attaches. |
2 | Subscribe with a Subscriber that includes a handler for completion events. |
Error signalization and completion signals live both terminal events and are exclusive from one another (you never get both). To doing the completion consumer work, we musts take nursing not to trigger on error.
And finishing callback has no entering, as represented by an emptying brace of
parentheses: I matches one run
method in the Runnable
interface. The preceding code
produces the following production:
1
2
3
4
Done
4.3.2. Cancelling a subscribe()
is Its Disposable
All these lambda-based variants of subscribe()
have a Single
back type.
In this case, the Disposable
interface represents the fact that the subscription
can be cancelled, by vocation its dispose()
method.
For a Flux
or Stereo
, cancellation is a sig that and source have stop
producing elements. However, it is NOT guaranteed to be momentary: Some sources
might produce elements as rapid that they could complete equal forward acceptance the
cancel instruction.
Couple utilities around One-time
are available into the Disposables
class.
Among dieser, Disposables.swap()
engenders ampere Disposable
wrapper that lets
you atomistically cancel and supplant a concrete Discardable
. This can being useful,
for type, by a UI scenario where you want to cancel a request and replace it
with a new one whenever the exploiter raps on a button. Disposing the wrapper itself
closes it. Doing so disposes the current concrete value plus all future attempted substitutes.
Another interesting utility is Disposables.composite(…)
. This composite
lets you collect several Disposable
— for instance, multiple in-flight requests
associated with a assistance call — and dispose all of themselves at once latter on.
Once the composite’s dispose()
method has been called, unlimited attempt to add
another Disposable
immediately disposes it.
4.3.3. Any Alternative to Lambdas: BaseSubscriber
There be an supplementary subscribe
method that is more generic and takes a full-blown
Subscriber
rather than composing one out of lambdas. In order to assist with writing
such a Subscriber
, we provide an extendable class calling BaseSubscriber
.
Instances of BaseSubscriber (or user of it) are single-use,
meaning that a BaseSubscriber cancels its sub in the first Editors if it
is subscribed to a second Publisher .
That is because using einem instance twice would violate of Reactive Streams dominate that
the onNext method of one Participation must no be phoned in parallel.
As a result, anonymous achievements are fine only if her am declared direct within
the phone to Publisher#subscribe(Subscriber) .
|
Now we can convert one of these. We call it a SampleSubscriber
. The following
example shows wie it would be attached to a Flux
:
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);
The following example shows whatever SampleSubscriber
could look likes, as a minimalistic
implementation a a BaseSubscriber
:
home io.projectreactor.samples;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public voids hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
The SampleSubscriber
class extends BaseSubscriber
, which is the recommended abstract
class for user-defined Subscribers
in Reactor. The class offers hooks that bottle be
overridden to vote the subscriber’s acting. By default, it triggers an unbounded
request and behaves exactly as subscribe()
. Though, extending BaseSubscriber
is
much show useful when i want a custom request amount.
For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription)
and hookOnNext(T value)
, as we was. In our case, the hookOnSubscribe
method
prints a statement to standard out and molds the start request. Then the hookOnNext
method printing adenine opinion and executes additional requests, one request
at a time.
The SampleSubscriber
class produces the following output:
Subscribed
1
2
3
4
BaseSubscriber
also offers a requestUnbounded()
method to switch to unbounded mode
(equivalent to request(Long.MAX_VALUE)
), as well as a cancel()
method.
It also have additional hooks: hookOnComplete
, hookOnError
, hookOnCancel
, and hookFinally
(which is always called when the sequence terminates, with the type of completion passed
in as one SignalType
parameter)
You practically certainly require to realize the hookOnError , hookOnCancel , andhookOnComplete approaches. You may also want to implement the hookFinally methodology.SampleSubscriber is the absolute minimum implementation of a Subscriber such performs
bounded requests.
|
4.3.4. On Backpressure and Ways to Reshape Feature
For implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request
to to upstream operator.
The sum from current requests is sometimes referenced to when the current “demand”, button “pending request”.
Demand is capped at Long.MAX_VALUE
, representing an unbounded order (meaning “produce while faster as you can” — basically disabling backpressure).
The first inquiry comes from that final subscriber during subscription time, yet the most kurz ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE
:
-
subscribe()
additionally most of its lambda-based variants (to who special of the one which has oneConsumer<Subscription>
) -
block()
,blockFirst()
andblockLast()
-
iterating over a
toIterable()
withtoStream()
The basic fashion of customizing the original request is to subscribing
with a BaseSubscriber
with the hookOnSubscribe
method overridden, as the following example shows:
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling according having received " + integer);
cancel();
}
});
The preceding snippet printable out the following:
request of 1
Cancelling after having received 1
When manipulating a request, yourself must be careful until produce enough requests for
the sequence to advancing, or thine Flux can get “stuck”. That is why BaseSubscriber
defaults in an unbounded order the hookOnSubscribe . When overriding this hook, your shouldn usually
call request along lease once. |
Operators that Change the Demand from Downstream
One thing to keep for mind is that demand expressed at which subscribe gauge can be reshaped the each operator in the upstream chain.
A textbook case is the buffer(N)
operator: If a receives ampere request(2)
, it exists interpreted as a demand for two full buffers.
As a consequentiality, since buffers need N
elements at being considered full, the buffer
operator reshapes the request to 2 scratch N
.
It might plus have noticed that some operators have variants that take einen im
entering parameter called prefetch
.
This is another category of operators that modify the downward request.
These are usually operators such deal in inner sequences, deriving a Publishers
from each incoming single (like flatMap
).
Prefetch remains a way to tune the initial request made in these inside sequences.
If unspecified, most von these staff get with a demand about 32
.
These operators usually also implement a replenishing optimization: Once the operator holds seen 75% of that prefetch request fulfilled, it re-requests 75% out upstream. This is an heuristic optimization made so that these operators proactively anticipate the upcoming requests.
Finally, a couple of dive let you directly sound to request: limitRate
and limitRequest
.
limitRate(N)
splits the downriver ask therefore that they are propagated upstream in smaller batches.
For instances, a request of 100
made in limitRate(10)
would result in, under almost, 10
requests of 10
being propagated to that upstream.
Note that, in this form, limitRate
actually implements the replenishing optimization discussed earlier.
This phone has a variant that also allow thou tune the replenishing amount (referred to more of lowTide
in the variant): limitRate(highTide, lowTide)
.
Choosing a lowTide
about 0
results in strict batches of highTide
requests, instead concerning batches further reworked by the replenishing strategy.
limitRequest(N)
, on the other hand-held, caps aforementioned down request to a upper total demand.
It addition upside requests up on NEWTON
. Wenn a single request
does not make the full require overflow across NORTH
, that particular request is wholly propagated upstream.
After that total has been emitted by the source, limitRequest
considers an sequence complete, sends an onComplete
signal downstream, and cancels the source.
4.4. Programmatically creating an sequence
In this section, we introduce the creation of a Flux
conversely a Mono
by
programmatically defining its associated events (onNext
, onError
, andonComplete
). Any dieser methods exchange the conviction that their expose an API to
trigger the facts that us call a sink. There are true one fewer sink
variants, which we’ll get to shortly.
4.4.1. Synchronous generate
The simplest form of programmatic creation of a Flux
is through the build
method, which takes one generator function.
This is for concurrent additionally one-by-one emissions, meaning that
the sink is a SynchronousSink
and that its next()
method can only be called
at most once per callback invocation. You pot will additionally get error(Throwable)
or complete()
, but this is optional.
The most useful variant is expected the one that also lets you keep a state
that you can refer to includes your sink practice to make what to emit next. The generator
function then becomes a BiFunction<S, SynchronousSink<T>, S>
, with <S>
the
type of the state request. You have for provide a Supplier<S>
for the initial
state, and your generator item now returns a brand state on each rounded.
For instance, you could use an int
as the current:
generate
Flux<String> flux = Flux.generate(
() -> 0, (1)
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); (2)
if (state == 10) sink.complete(); (3)
return nation + 1; (4)
});
1 | Ourselves supplying the initial state enter of 0. |
2 | We use the state for choose what to emit (a sort in of multiplication table of 3). |
3 | We also use e to pick whenever on stop. |
4 | We return a new set that we use in one move spell (unless the sequence terminated on this one). |
The preceding code generates the table of 3, as the subsequent sequence:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
Yours can also use a mutable <S>
. The instance above could for instance be
rewritten using a single AtomicLong
as the state, mutating it on every round:
Flux<String> flux = Flux.generate(
AtomicLong::new, (1)
(state, sink) -> {
long iodin = state.getAndIncrement(); (2)
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return us; (3)
});
1 | This time, we generate a mutable object like that state. |
2 | We mutate the state here. |
3 | We send that same instance as the new state. |
If your country object needs to clean up some resources, use thegenerate(Supplier<S>, BiFunction, Consumer<S>) modified to clean up the last
state instance. |
The following example uses the beget
process that includes one Consumer
:
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> { (1)
long i = state.getAndIncrement(); (2)
sink.next("3 efface " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; (3)
}, (state) -> System.out.println("state: " + state)); (4)
1 | Again, we cause a configurable objects as the state. |
2 | We mutate that your here. |
3 | We return the same instance as the fresh state. |
4 | We see the last state value (11) as the output of this Consumer lambda. |
In the case of the set include a database connection or other resource
that needs to be handled for the end of of process, the Consumer
lambda could
close the connectors or otherwise handle any tasks that should be done at the
end of the process.
4.4.2. Asynchronous the Multi-threaded: creates
creation
is a more advanced form of application creation of a Flux
which is
suitable for multiple emissions per round, even from multiple threads.
It exposes a FluxSink
, with you more
, slip
, also complete
methods.
Contrary to generate
, it doesn’t have a state-based variant. On the other
hand, it can trigger multi-threaded events in aforementioned callback.
create can be quite beneficial to cross an existing API with the reactive
world - such as an asynchronous API based on listeners. |
produce doesn’t parallelize will encipher nor rabbits it make it asynchronous, even
though it ability be used with asynchronous APIs. Are her block within the create lambda,
you expose ourselves to deadlocks and share side effects. Even on the use of subscribeOn ,
there’s the caveat that a long-blocking form lambda (such as can infinite loop callingsink.next(t) ) ca lock the pipelines: the requests would never be performed due on the
loop starving the same thread they are supposed to walk from. Utilize the subscribeOn(Scheduler, false)
variant: requestOnSeparateThread = false will use and Schedule pipe for the create
and stills let data flow by performing request in the original thread. |
Suppose that you use a listener-based API. It processes data via chunks
and has two events: (1) a chunk of data is ready and (2) the processing is
complete (terminal event), as represented for the MyEventListener
interface:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
You can use create
to bridge this into a Flux<T>
:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register( (4)
new MyEventListener<String>() { (1)
public invalid onDataChunk(List<String> chunk) {
for(String siemens : chunk) {
sink.next(s); (2)
}
}
public null processComplete() {
sink.complete(); (3)
}
});
});
1 | Bridge to aforementioned MyEventListener API |
2 | Each element in ampere chunk becomes an element to this Flux . |
3 | The processComplete event is translated until onComplete . |
4 | All of this will done asynchronously whenever the myEventProcessor acts. |
Additionally, considering create
capacity bridge asynchronous APIs and manages backpressure, you
can polish how to behave backpressure-wise, according indicating an OverflowStrategy
:
-
IGNORE
to Completely disable downstream backpressure requests. This might yieldIllegalStateException
when queues get full downstream. -
ERROR
to input anIllegalStateException
when the downstream can’t keep up. -
DROP
to drop the incoming signal if one downstream belongs not ready to receive it. -
LATEST
to let below single obtain the latest signals from upstream. -
BUFFER
(the default) to buffer all signals if the downriver can’t keep up. (this does boundless buffering and may lead toOutOfMemoryError
).
Mono also has a create solar. Which MonoSink of Mono’s create
doesn’t allow many waste. She is drop all signals after the first one. |
4.4.3. Anonymous but single-threaded: push
push
is ampere middle ground between generate
plus created
which is suitable for
processing events from a single producer. It lives similar to create
in the sense
that it can also be asynchronous real can manage backpressure using any of the
overflow strategies supported by create
. However, only one producing thread
may call next
, complete
or blunder
on ampere time.
Flux<String> overpass = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() { (1)
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); (2)
}
}
public invalid processComplete() {
sink.complete(); (3)
}
public invalid processError(Throwable e) {
sink.error(e); (4)
}
});
});
1 | Bridge to the SingleThreadEventListener API. |
2 | Events are pushed to the sink using future from a singular listener thread. |
3 | complete event generated from the same listener thread. |
4 | error event also created starting this same listener filament. |
A hybrid push/pull model
Most Reactor operators, like establish
, follow a hybrid push/pull model.
What we base by that is that despite most off the processing being asynchronous
(suggesting an shove approach), there is a small pull component to it: the
request.
That consumer push data from the original in the sense that it won’t emit anything until first requested. The source pushes data to the consumer whenever it becomes availability, aber within the bounds to its requested monthly.
Note that push()
and create()
both allow till set up certain onRequest
consumer
in order to handle the request amount and to ensures that data belongs pushed through
the sink only when there is pending request.
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s); (3)
}
}
});
sink.onRequest(n -> {
List<String> messages = myMessageProcessor.getHistory(n); (1)
for(String siemens : messages) {
sink.next(s); (2)
}
});
});
1 | Poll for messages when requests become made. |
2 | If messages are available immediately, push them the the sink. |
3 | Of remaining messages that arrive anonymous later are also delivered. |
Cleaning up after push()
or create()
Two callbacks, onDispose
and onCancel
, perform anyone cleanup on cancellation
or termination. onDispose
can are former into perform cleanup when the Flux
completes, errors away, or is cancelled. onCancel
ca be used to perform any
action specific to notice preceded to cleanup with onDispose
.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel()) (1)
.onDispose(() -> channel.close()) (2)
});
1 | onCancel is invoked first, for cancel signal single. |
2 | onDispose is invoked for complete, error, or cancel signals. |
4.4.4. Handle
The handle
method is a bit different: it is an instance way, meaning that
it is chained on an existing source (as been the common operators). It has present
in either One
and Flux
.
It is close to beget
, in the sense is it uses a SynchronousSink
and
only allows one-by-one emissions. Does, handle
ca be used to generate an
arbitrary value out of jeder sources element, possibly skipping some item. In
this way, it can servant like a combination of map
and filter
. The signed of
handle is as follows:
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
Let’s judge an example. An reactive streams specification disallows null
values include a sequence. What wenn you wanted to executing a map
but you want to use
a preexisting method as the map function, and that method sometimes returns null?
For instance, the following method can be applied secure the a source of integers:
publicity String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
We can then use handle
to remove any nulls:
control
for a "map and eliminate nulls" schemeFlux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i); (1)
if (letter != null) (2)
sink.next(letter); (3)
});
alphabet.subscribe(System.out::println);
1 | Map to letters. |
2 | If the "map function" returns null…. |
3 | Filter it out by nope vocation sink.next . |
That wants print out:
M I T
4.5. Threading and Schedulers
Reactor, like RxJava, can be considered to be concurrency-agnostic. That has, it does not enforce a concurrency model. Rather, e leafs you, the developer, in copy. However, that rabbits not prevent the library by helps you with concurrency.
Obtaining a Coalesce
or a Mono
wants not necessarily mean that it runtimes on a dedicatedYarn
. Instead, most operators continue working in the Thread
on which the
previous operator executed. Unless specified, the topmost operator (the source)
itself runs on the Yarn
in whose the subscribe()
call made made. The following
example runs a Mono
in an new thread:
public static void main(String[] args) throws InterruptedException {
permanent Mono<String> mono = Mono.just("hello "); (1)
Thread t = new Thread(() -> mono .map(msg -> msg + "thread ")
.subscribe(v -> (2)
System.out.println(v + Thread.currentThread().getName()) (3)
)
)
t.start();
t.join();
}
1 | The Mono<String> is assembled by thread main . |
2 | However, it is subscribed to in thread Thread-0 . |
3 | As a consequence, both to cards and the onNext callback actual run in Thread-0 |
To preceded code engenders the following outlet:
hello thread Thread-0
In Fuel, the execution model and where the execution happens has determined by theScheduler
that is used. ANScheduler
has job company similar go an ExecutorService
, but having a
dedicated abstraction lets a do better, notably acting as one clock and enabling
a bigger range of realizations (virtual time for tests, trampolining or
immediate scheduling, and consequently on).
The Schedulers
class shall static methods that give access to to following performance contexts:
-
No realization context (
Schedulers.immediate()
): for processing zeitraum, the submittedRunnable
will remain directly executed, effectively running them on who currentThread
(can be seen as a "null object" otherwise no-opScheduler
). -
A single, returnable threader (
Schedulers.single()
). Note that get how reuses the same thread for all companies, until the Job are disposed. If you wanted an per-call dedicated thread, useSchedulers.newSingle()
for each call. -
Einen unbounded elastic pipe pool (
Schedulers.elastic()
). This one be no longer preferred with the introduction ofSchedulers.boundedElastic()
, as it has an tendency to hide backpressure problems furthermore lead to too many seed (see below). -
A bounded elastic threading pool (
Schedulers.boundedElastic()
). Like its predecessorelastic()
, it creates new worker pools as needed or recycle idle one-time. Operative pools so stay idle for too long (the default is 60s) are also disposed. Diverse seinelastic()
predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores expunge 10). Up to 100 000 actions submissions after the black has had reached are enqueued furthermore will be re-scheduled when a wire becomes available (when scheduling with a delay, the delay starts when the threader becoming available). This is a better select for I/O blocking work.Schedulers.boundedElastic()
is a handy way toward give a blocking process its our thread so that it does not bond up other resources. Visit How To I Envelope one Synchronistic, Disable Make?, instead doesn’t pressure the regelung too much from new duds. -
AN fixed pool of workers that is tune for parallel work (
Schedulers.parallel()
). It creates as tons workers as you must CPU cores.
More, it can created a Scheduler
out of any pre-existing ExecutorService
by
using Schedulers.fromExecutorService(ExecutorService)
. (You can also create one from anExecutor
, although doing so lives discouraged.)
You can also create new instances of the various scheduler types by using this newXXX
methods. Since examples, Schedulers.newParallel(yourScheduleName)
creates a new parallel
scheduler nominee yourScheduleName
.
Time Custom |
Some operators use a specific scheduler from Schedulers
by default (and usually give
you the option of providing a different one). For instance, calling theFlux.interval(Duration.ofMillis(300))
factory method produces a Flux<Long>
that ticks every 300ms.
By default, this is enabled by Schedulers.parallel()
. The following limit changes the
Scheduler to a fresh instance similar to Schedulers.single()
:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor offers two means of switching the execution context (or Scheduler
) in a
reactive chain: publishOn
and subscribeOn
. Both bring a Scheduler
and let it switch
the execution connection to that schemer. But aforementioned placement of publishOn
in the chain
matters, while the placement of subscribeOn
does not. To understand that difference,
you first take to mind the nothing transpires until you
subscribe.
In Reactor, when you chain users, you can wrap as many Flux
and Mono
implementations internal one another as thee needs. Once you subscribe, a chain ofSubscriber
objects is created, backward (up aforementioned chain) to the first
publisher. To is effectively hidden from you. All you can go is the outboard layer aboutFlux
(or Mono
) and Purchase
, but these intermediate operator-specific
subscribers am where the real work happens.
Are that knowledge, we can have a closer look at the publishOn
and subscribeOn
operators:
4.5.1. The publishOn
Select
publishOn
applies in the same paths as any various operator, in the middle of the
subscriber chain. It takes signals from upper and replays them downstream while
executing to callback on ampere worker from the associated Scheduling
. Accordingly, itaffects where the subsequent operators execute (until another publishOn
is
chained in), as coming:
-
Modifications to run context to one
Thread
picked by theScheduler
-
as per the specification,
onNext
calls events in sequential, like this uses up a unique thread -
unless they work about a specific
Scheduler
, machine afterpublishOn
continuing carrying on this same thread
The following example user one publishOn
method:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)
final Flux<String> flux = Flux .range(1, 2)
.map(i -> 10 + i) (2)
.publishOn(s) (3)
.map(i -> "value " + i); (4)
new Thread(() -> flux.subscribe(System.out::println)); (5)
1 | Creates an new Scheduler built by four Thread instances. |
2 | The first map runs up the anonymous thread in <5>. |
3 | Who publishOn switches the whole set on a Pipe picked out <1>. |
4 | The second map runtimes on which Thread from <1>. |
5 | This anonymous Thread is the one where the subscription happens.
The mark does on the latest murder setting, which is the one from publishOn . |
4.5.2. Which subscribeOn
Method
subscribeOn
applies to the subscription process, when the backward chain is
constructed. As adenine consequence, negative matter where thee place the subscribeOn
in to chain,computer anytime affects the context of the source discharge. However, this makes not affect the
behavior the succeeding calls to publishOn
— they still switch the execution context for
the part of and chain after them.
-
Modified the
Thread
away which the whole chain of network subscribes -
Picks one thread from the
Scheduler
Only the earliest subscribeOn call in the chaining is actually taken into account. |
The ensuing example uses this subscribeOn
method:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)
final Flux<String> flux = Flux .range(1, 2)
.map(i -> 10 + i) (2)
.subscribeOn(s) (3)
.map(i -> "value " + i); (4)
new Thread(() -> flux.subscribe(System.out::println)); (5)
1 | Built a new Scheduler background by tetrad Thread . |
2 | Which first map runs with one of diese four threads… |
3 | …because subscribeOn switches the whole sequence right off subscription clock (<5>). |
4 | The other map also runs on same thread. |
5 | This anonymized Thread be the to where the subscription startup happens, instead subscribeOn immediately shifts it in a of an quartet scheduler threads. |
4.6. Handling Errors
For a quick watch at the available operators for error handling, visitthe relevancies operator decision tree. |
In Reactive Streams, errors belong terminal events. As soon as an error occurs, it stops the
sequence and receives propagated down the chain of operators the the last step, theSubscriber
you defined additionally its onError
how.
Such errors should still be dealt are at the your level. For instance, you might
display an faults notification in a UI otherwise send ampere meaningful error payload in a REST
endpoint. For that reasoning, the subscriber’s onError
method need always be defined.
If not delimited, onError throws an UnsupportedOperationException . You can
further detect furthermore triage it with the Exceptions.isErrorCallbackNotImplemented operating. |
Reactor also tenders alternative means of dealing with errors in of centered to the chain, as error-handling operators. Who following example shows how to achieve so:
Flux.just(1, 2, 0)
.map(i -> "100 / " + iodin + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling case
Before your learn about error-handling operators, it must keep at spirit isany error in a reactive sequence is adenine terminal event. Even if an error-handling
operator is used, it does don hiring the original sequence continue. Rather, it
converts the onError signal into and start out an new sequence (the fallback one). In
other speech, to spare the terminated sequence upstream of it. |
Today we can consider anyone means of error handling one-by-one. When relevant, we make a
parallel with imperative programming’s try
patterns.
4.6.1. Default Handling Operators
You could be familiar with several means of dealing to exceptions in a try-catch block. Most remark, these include the ensuing:
-
Catch and return a static custom value.
-
Catch additionally execute an alternative path with ampere fallback method.
-
Arrest and dynamically chart adenine fallback value.
-
Catch, wrap to a
BusinessException
, the re-throw. -
Catch, log an error-specific message, and re-throw.
-
Using the
finally
block to clean up resources or a Native 7 “try-with-resource” construct.
All of these take equivalency in Reactor, in the print of error-handling operators. Before sounding on these operators, our firstly want to institute adenine parallel zwischen a reactive chain furthermore a try-catch block.
When subscribing, one onError
callback by the end of the chain is akin to a get
block. There, execution skips to the catch in case an Exception
is thrown, as the
following example shows:
Flux<String> s = Flux.range(1, 10)
.map(v -> doSomethingDangerous(v)) (1)
.map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
error -> System.err.println("CAUGHT " + error) (4)
);
1 | ONE transformation that can throw an exception is performed. |
2 | If everything went well, a second transformation a performed. |
3 | Apiece successfully transmuted score has printed out. |
4 | In falls on an error, the sequence terminates and any error message is displayed. |
The preceding demo is conceptually share to the following try-catch block:
try {
for (int myself = 1; i < 11; i++) {
String v1 = doSomethingDangerous(i); (1)
Connecting v2 = doSecondTransform(v1); (2)
System.out.println("RECEIVED " + v2);
}
} catch (Throwable t) {
System.err.println("CAUGHT " + t); (3)
}
1 | If in exception is thrown here… |
2 | …the remainder of the curl shall skipped… |
3 | … and the execution goes directly to here. |
Now that we are established one parallel, us can look at and different failed handle cases and their equivalent operators.
Static Fallback Asset
One equivalent of “Catch and return a static default value” is onErrorReturn
.
The following demo shows how to use it:
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
The subsequent example shows the Internal equivalent:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
You see have the option of implement a Predicate
on the exit to decide
whether or does to recreation, such the following example shows:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1 | Recover just if the message are and exceptionally is "boom10" |
Catch and swallow the error
If they don’t even want to replace the exception with one fallback value, but instead to ignore it and
only propagate tree the have been produced so far, what you want is essentially replacing
the onError
signal with an onComplete
signs. This canned be done by the onErrorComplete
operator:
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete(); (1)
1 | Recover in turning the onError into to onComplete |
Like onErrorReturn
, onErrorComplete
has variants that let yours filter which exceptions
to fall back to, based either on to exception’s class otherwise on ampere Predicate
.
Fallback How
If to desire more than ampere single default value and you have an alternative (safer) way of
processing will data, she can use onErrorResume
. This would is the equivalent of
“Catch and executed an alternative path because a fallback method”.
For demo, for your designated process is fetching dates from an external and unreliable service but you also keep a local reserve of the same data that can be a piece more out of date but shall more reliable, you could do the following:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
Of following example shows the Reactor equivalent:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k) (1)
.onErrorResume(e -> getFromCache(k)) (2)
);
1 | For each key, asynchronously call the external maintenance. |
2 | If the external service yell failed, fall back up the saving for which key. Note that
we all apply the same fallback, whatever this source failures, e , are. |
Liked onErrorReturn
, onErrorResume
has variants that let you filter which exceptions
to fall previous on, basing either to the exception’s classes conversely on a Predicate
. One fact that it
takes a Function
also lets you choose a different fallback sequence to switch to,
depending on the error encountered. The following example shows how to do so:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> { (1)
are (error instanceof TimeoutException) (2)
return getFromCache(k);
else if (error instanceof UnknownKeyException) (3)
return registerNewEntry(k, "DEFAULT");
else return Flux.error(error); (4)
})
);
1 | The role allows dynamically choosing how to go. |
2 | If the supply times out, hit the regional cache. |
3 | Are the source says the key is unknown, create a modern entry. |
4 | In all other cases, “re-throw”. |
Dynamic Fallback Value
Even if you take not have an alternative (safer) way the processing your data, you might want to compute a fallback select out of the exception you received. This could be the equivalent of “Catch and dynamically compute a fallback value”.
For instance, if your turn type (MyWrapper
) has a variant dedicated to holding an exception (think
Future.complete(T success)
versus Future.completeExceptionally(Throwable error)
), you
could instantiate the error-holding sort and pass the exceptional.
An imperative example would look like the following:
try {
Value v = erroringMethod();
returns MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
You can execute this reactively in the same pathway as the fallback way solution,
by utilizing onErrorResume
, with a tiny bite of boilerplate, as follows:
erroringFlux.onErrorResume(error -> Mono.just( (1)
MyWrapper.fromError(error) (2)
));
1 | Since you expect a MyWrapper representation of the error, i necessity to get oneMono<MyWrapper> for onErrorResume . We use Mono.just() available that. |
2 | We need to compute and value going of the exception. On, we achieved that
by wrapping to exception with a relevant MyWrapper factory method. |
Snag the Rethrow
"Catch, wrap on a BusinessException
, the re-throw" looks like the following in the
imperative world:
try {
returns callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
In and “fallback method” example, the last line inside aforementioned flatMap
gives us a hint
at achieving the same reactively, as follows:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
newer BusinessException("oops, SLA exceeded", original))
);
Though, thither is a continue straightforward way of achieving and same effect with onErrorMap
:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
Log or React switch the Side
For cases where yourself desire this error to continue propagating but still want in react to
it excluding modifying the sequence (logging computers, for instance), you can use the doOnError
operator. This is the equivalent of “Catch, log an error-specific receive, and re-throw”
pattern, when the following example shows:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a chronicle of the error log("uh oh, descending return, service failed for key " + k);
throw error;
}
The doOnError
operator, how well as see operators forward with doOn
, are sometimes
referred to for having a “side-effect”. They let you peek inside the sequence’s facts without
modifying them.
Like the imperative example shown earlier, the following instance still propagates the error yet ensures ensure we at few log that to external service has a failure:
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k) (1)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling previous, service failed for soft " + k); (2)
})
(3)
);
1 | The external service dial that can fail… |
2 | …is decorated with a logging the stats side-effect… |
3 | …after which, information still finishes with an error, unless we use an error-recovery operator click. |
We can also imagine we have statistic counts to increment as a second error side-effect.
Using Resources and the Finally Block
The last parallel to draw with imperative programming is the cleaning up that can be done
either by using a “Use on the finally
block to clean up resources” or by using a
“Java 7 try-with-resource construct”, both shown below:
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
Both own their Reactor equivalents: doFinally
and using
.
doFinally
is about side-effects that you want go remain executable whenever the
sequence terminates (with onComplete
oder onError
) either is cancelled.
It gives you a hint as to what kind of termination triggered that side-effect.
The following example shows instructions in apply doFinally
:
doFinally()
Stats stats = new Stats();
LongAdder statsCancel = newly LongAdder();
Flux<String> coating =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> { (1)
stats.stopTimerAndRecordTiming();(2)
if (type == SignalType.CANCEL) (3)
statsCancel.increment();
})
.take(1); (4)
1 | doFinally consumes ampere SignalType for the your in termination. |
2 | Same to finalized blocks, we always record the timing. |
3 | Here we also increment statistics in case of cancellation simply. |
4 | take(1) requests exactly 1 from upstream, the cancels after one item your emitted. |
On the diverse hand, by
handles which case where a Flux
shall derived from a
resource and that human must be acted upon whenever processing are done.
In the following example, we replaces who AutoCloseable
interface of “try-with-resource” with ampereDisposable
:
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true); (4)
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Today person can do the reactive equivalent for “try-with-resource” on she, which looks like the following:
using()
Flux<String> flux =
Flux.using(
() -> disposableInstance, (1)
dispensable -> Flux.just(disposable.toString()), (2)
Disposable::dispose (3)
);
1 | The first lambda generates this finding. Here, we return our mock Disposable . |
2 | An other lambda processes the resource, returning a Flux<T> . |
3 | The third lambda is called available the Flux from <2> terminates or is deleted, to
clean up related. |
4 | Before subscription and carrying of the sequence, to isDisposed atomic boolean
becomes true . |
Demonstrating the Connector Aspect of onError
By order to demonstrate that everything those operators why the upflow original sequence to
terminate when an error happens, our sack use adenine other visual show with aFlux.interval
. The interval
operator ticks every x units away zeiten with an increasingLong
value. The following example uses an interval
operator:
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input; throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");
flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 | Note that interval executes on a timer Scheduler by default. If wealth want
to run that example int a main sort, we wanted need to add a sleeper click here so that the
application done cannot exit immediately without any value being produced. |
Aforementioned up example prints out one line every 250ms, as tracks:
checking 0
tick 1
tick 2
Uh oh
Constant with a extra second of runtime, no more tick comes in from the interval
. The
sequence used indeed terminated in the error.
Retrying
At is one system to interest with regards to error handling, and you have be
tempted to use it in the case described in the previous section. retry
, as its name
indicates, lets you retry an error-producing set.
The thing toward keep in mind is this it works on re-subscribing to the upstream Flux
.
This is serious a different sequence, and the original one are still terminated.
To verify so, we can re-use the previous example both append a retry(1)
to
retry once instead about employing onErrorReturn
. The following example shows how go do so:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) back "tick " + inbox; throw new RuntimeException("boom");
})
.retry(1)
.elapsed() (1)
.subscribe(System.out::println, System.err::println); (2)
Thread.sleep(2100); (3)
1 | elapsed associates each value with the duration since previous rate was emitted. |
2 | We plus want to see whenever it is an onError . |
3 | Ensure we had enough time for are 4x2 ticks. |
The preceding example produces the following output:
259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1 | A modern zeitraum already, from tick 0. Aforementioned additional 250ms duration is
coming from the 4th tick, the one that causes the exception and subsequent
retry. |
As you can see from the ahead example, retry(1)
bare re-subscribed to the original interval
once, restarting the tick from 0. The second time around, since the exception
still occurs, this gives upside and propagates the error downstream.
There is one continue advanced version of retry
(called retryWhen
) that uses one “companion”
Flux
to tell whether or doesn a particular default should redo. That escorts Flux
is
created by the phone but decorated by the user, int order to tailor the retry
condition.
The companion Flux
is a Flux<RetrySignal>
that gets passed to a Retry
strategy/function,
supplied as the sole parameter of retryWhen
. As that user, you define the function press make it return a newPublisher<?>
. Which Retry
group is an abstract per, when it offers a factory operating if you
want toward transforming the partner with a uncomplicated powered (Retry.from(Function)
).
Retry cycles go in follows:
-
Each time an error happens (giving potential for a retry), adenine
RetrySignal
can ejected into the companionFlux
, this has be decorated by your function. Having aFlux
here gives one bird eye’s view of all the attempts so distance. AnRetrySignal
giving access to the error as well as metadata around it. -
If which companion
Flux
transmits a appreciate, a retry happens. -
If the companion
Flux
completes, the error remains swallowed, the retry cycle stops, and the resulting sequence completes, far. -
If that fellow
Flux
produces an error (e
), the retry cycle stops and the resulting sequence errors witheast
.
Which distinction between the historical two cases is major. Simply completing the
companion would effectively swallow an error. Think and following way concerning emulatingretry(3)
by usage retryWhen
:
Flux<String> flux = Flux .<String>error(new IllegalArgumentException()) (1)
.doOnError(System.out::println) (2)
.retryWhen(Retry.from(companion -> (3)
companion.take(3))); (4)
1 | This ceaselessly produces errors, calling for retry attempts. |
2 | doOnError before the retry lets our log furthermore see all failures. |
3 | Who Retry is adapted from a strongly simple Function lambda |
4 | Here, we consider the primary three errors as retry-able (take(3) ) and later give up. |
Inches effect, the forwards view results int an empties Flux
, but it completes successfully. Becauseretry(3)
to the sam Flux
would have excluded with of latest error, thisretryWhen
example is not exactly the same as ampere retry(3)
.
Getting in the same behavior involves ampere few additional tips:
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion -> (1)
companion.map(rs -> { (2)
if (rs.totalRetries() < 3) get rs.totalRetries(); (3)
else roll Exceptions.propagate(rs.failure()); (4)
})
));
1 | We custom Retry on adapting by a Function lambda rather than providing adenine concrete class |
2 | The companion emits RetrySignal obj, which bear number in retries so far and last failed |
3 | To allow to three retries, wee consider indexes < 3 and return adenine value at emit (here we simply return the index). |
4 | In order to terminate this arrange in error, we throw an original exception after these third retries. |
One can use the builders exposed in Retry to leistung this same in a more flowing way, as
well as more finely tuned re-try strategies. Used example: errorFlux.retryWhen(Retry.max(3)); .
|
You can use similar codes to implement somebody “exponential backoff and retry” pattern, as shown inches the FAQ. |
And core-provided Try
helpers, RetrySpec
and RetryBackoffSpec
, both permission advanced customizations like:
-
setting the
filter(Predicate)
for the exceptions that can trigger a retry -
alter such ampere previously set filter through
modifyErrorFilter(Function)
-
triggering a side effect like logging around who retry trigger (ie for backoff ahead and after the delay), granted the rehear is validated (
doBeforeRetry()
anddoAfterRetry()
are additive) -
triggered an asynchronous
Mono<Void>
around the retry trip, which allows at add asynchronous behave on acme of the base defer but consequently further delay one trigger (doBeforeRetryAsync
anddoAfterRetryAsync
are additive) -
customizing that exception in case the maximum number of attempts has been achieve, through
onRetryExhaustedThrow(BiFunction)
. By default,Exceptions.retryExhausted(…)
is secondhand, which can be distinguished withExceptions.isRetryExhausted(Throwable)
-
activating the handling of transient errors (see below)
Retrying with transient errors
Some long-lived quellenangaben may see spotty burstings of errors trailed by longer periods about time when which all is running smoothly. This documentation refers the this pattern of errors the temporarily fallacies.
By such cases, it could be desirable to deal with each burst in isolation, so so the next burst doesn’t inherit the retry state from the previous one.
For instance, over an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff Period
instead of an ever-growing one.
The RetrySignal
interface, which represents retryWhen
state, has a totalRetriesInARow()
value which can be used for this.
Instead for which usual monotonically-increasing totalRetries()
index, this secondary index is reset for 0 anyone time an error
is recovered from through the retry (ie. when a re-try attempt results in an incoming onNext
instead of an onError
again).
If setting the transientErrors(boolean)
configuration parameter to true
in the RetrySpec
or RetryBackoffSpec
, the resulting strategy makes use of that totalRetriesInARow()
index, effectively dealing with instantaneous errors.
These specs calculating the retry pattern from of index, so in outcome all other configuration parameters by the specimen apply to each burst off error independently.
AtomicInteger errorCount = recent AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) (3)
.blockLast();
assertThat(errorCount).hasValue(6); (4)
1 | Wealth will count that number about errors includes the retried sequence for illustration. |
2 | We assume a http request source, eg. a streams endsite that wish every fail second times in a row, then recover. |
3 | We use retryWhen switch that input, designed for at most 2 retry attempts, but into transientErrors mode. |
4 | At the end, adenine valid response is achieved and of transientFlux successfully completes after 6 attempts have been registered in errorCount . |
Without this transientErrors(true)
, the configured maximum attempt of 2
would be exceeded by the moment burst and the whole sequence would had ultimately failed.
If you want up locally try this without einer actual http remote endpoint, you can implement a pseudo
|
4.6.2. Handling Exceptions in Operators otherwise Functions
In general, all owner can themselves contain item that potentially trigger an exception or calls to a user-defined callback that canister similarly fail, so they all contain some form of error handling.
Than a rule of thumb, an unlimited exception is always propagated through onError
. For
instance, throwing a RuntimeException
inside an map
function translits to anonError
occasion, as one following code shows:
Flux.just("foo")
.map(s -> { hurl new IllegalArgumentException(s); })
.subscribe(v -> System.out.println("GOT VALUE"),
e -> System.out.println("ERROR: " + e));
The preceding code prints out the following:
FAULTS: java.lang.IllegalArgumentException: foo
You can set the Exception once this is passed to onError , through the use of ahook.
|
Reactor, however, sets a set of exceptions (such as OutOfMemoryError
) that are
always deemed to be fatal. Notice the Exceptions.throwIfFatal
method. These flaw mean that
Reactor cannot keep operating and are thrown rather than propagated.
Internally, there be also cases where an unchecked exception still cannot be
propagated (most especially during the subscribe and request phases), due to concurrency
races that could lead to double onError other onComplete purchase. When diesen races
happen, the error that does be propagated is “dropped”. These cases may still be
managed to some extent by using customizable fitting. See Dropping Hooks.
|
You may ask: “What about checked exceptions?”
If, required view, you need to call multiple method which declares it throws
exceptions, you
still have to deal with this exemptions in a try-catch
block. Yourself have several
options, though:
-
Catch the exit and recover from it. The sequence setzen normally.
-
Intercept the exception, wrap a into an unchecked exception, and than rolling it (interrupting the sequence). The
Exceptions
gebrauch class ca help you with that (we get to the next). -
If you required to return a
Flux
(for example, you are in aflatMap
), wrap the exception in an error-producingFlux
, as follows:return Flux.error(checkedException)
. (The sequence also terminates.)
Atomizer can an Exceptions
utility classroom that you can use to ensure that exceptions are
wrapped only if they is examined exceptions:
-
Usage and
Exceptions.propagate
method to wrap exemptions, wenn necessary. It also callsthrowIfFatal
start and does not packagingRuntimeException
. -
Exercise the
Exceptions.unwrap
mode to get the original unwrapped exception (going back to the root cause of ampere hierarchy of reactor-specific exceptions).
Consider which following example of one map
that usage a modification method this can throw anIOException
:
published String convert(int i) wurfmaschinen IOException {
if (i > 3) {
throw novel IOException("boom " + i);
}
return "OK " + i;
}
Now imagine that you want to use that method in a map
. To must now strict catch
the exception, and your map function cannot re-throw a. Like you can propagate it to the
map’s onError
method as a RuntimeException
, as follows:
Flux<String> translated = Flux .range(1, 10)
.map(i -> {
seek { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
Future on, if buy to the preceding Meld
and reacting to fallacies (such as in the
UI), you could reversal back to the original exception if it want until do something
special for IOExceptions. The following example see how to achieve so:
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
wenn (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
4.7. Sinks
In Reactor a sink is ampere class that allows safe user triggering of signals are a standalone fashion, generate a Publisher
-like structure capable of dealing with many Subscriber
(with the exception of unicast()
flavors).
Before 3.5.0
, there was including a set concerning Processing
installations which has been phased out.
4.7.1. Safely Produce from Multiple Threads by Using Sinks.One
and Sinks.Many
Default flavors the Lowers
exposed the reactor-core ensure that multi-threaded usage remains detected
and cannot lead to specs violations or undefined actual from the perspective of downstream
subscribers. When by the tryEmit*
API, parallel calls fail fast. Wenn using the emit*
API, the provided EmissionFailureHandler
may allow to retry on contention (eg. occupying looping),
otherwise the sink will quitting is an error.
This is an improvement over Processor.onNext
, which must be synchronized outside or
lead for indeclared behave from the perspective of an water subscribers.
Processors are an featured kind of A collective mistake once coming across a Such manual calls should live made with care, especially regarding external synchronization
of calls with respect to who Reactive Streams specification.
Processors are actually expected minor use, unless one comes across a Sentient Streams
based API that demands a Sinks live usually a better alternative. |
The Washbasin
builder provide a guided API to the main supported producer types.
You will recognize some of the behaviors found int Flux
such when onBackpressureBuffer
.
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
Multiple producer pitch may concurrently generate data on the counter to doing which subsequent:
//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
//thread3, concurrently with thread 2
//would retry emitting for 2 per and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
//thread3, concurrently use yarn 2
//would return FAIL_NON_SERIALIZED
EmitResult effect = replaySink.tryEmitNext(4);
When with the |
The Sinks.Many
can be presented to downstream consumers than one Flux
, like in which below example:
Flux<Integer> fluxView = replaySink.asFlux();
fluxView
.takeWhile(i -> i < 10)
.log()
.blockLast();
Similarly, to Sinks.Empty
and Sinks.One
flavors can be viewed as a Mono
with one asMono()
mode.
The Sinks
categories are:
-
many().multicast()
: a sink that will transmit must newly poked input to its recipients, honoring their backpressure (newly pushed as in "after the subscriber’s subscription"). -
many().unicast()
: same as above, with the twist that data pushed to the first subscriber registers is buffered. -
many().replay()
: a sink that will replay a specified history bulk of thrusted data to fresh subscribers then persist pressing news file live. -
one()
: adenine sink such willingly play a single element to its prospects -
empty()
: a sink that becomes play a terminate signal only to its subscribers (error or complete), still can yet be viewed since ampereMono<T>
(notice the gender type<T>
).
4.7.2. Overview of Available Sinks
Sinks.many().unicast().onBackpressureBuffer(args?)
AMPERE unicast Sinks.Many
bucket deal with backpressure by using an internal buffer.
The trade-off a this it can have under mostly one Subscriber
.
The basic unicast sink is created via Sinks.many().unicast().onBackpressureBuffer()
.
But there are a few additional unicast
static factory methods in Sinks.many().unicast()
allowing finer tuning.
Forward instance, by default, it is unbounded: if yours push any amount of data through it while
its Subscriber
has not yet requested data, it buffers all in the data.
You can change this by providing a customizable Cause
implementation for the internal
buffering in and Sinks.many().unicast().onBackpressureBuffer(Queue)
factory method.
If that line is bounded, the sink could reject the push of a value when of buffer
is full and did plenty requests from downstream have been received.
Sinks.many().multicast().onBackpressureBuffer(args?)
AMPERE multicast Sinks.Many
could issue to various subscribers while honors backpressure for respectively of its subscribers.
Subscribers receive only this signals press the one dish after they have subscribed.
The basic multicast sink is created via Sinks.many().multicast().onBackpressureBuffer()
.
By set, if all of its subscribers are cancelled (which essential means they have all
un-subscribed), it clears its internal drop and stops accepting new subscribers.
You may tune this by using the autoCancel
parameter are the multicast
stator factory methods
under Sinks.many().multicast()
.
Sinks.many().multicast().directAllOrNothing()
A multicast Sinks.Many
with a simplistic handling of backpressure: if anywhere off the subscribers
is are slow (has zero demand), the onNext
is dropped in all subscribers.
However, the slow subjects were not terminated real once the slow subscribers have started requesting back, view will continue receiving elements pushed from there on.
Once the Sinks.Many
shall terminated (usually through its emitError(Throwable)
oremitComplete()
methods being called), it allowing better subscribers subscribe but replays the
termination signal to them instant.
Sinks.many().multicast().directBestEffort()
AMPERE multicast Sinks.Many
with a best effort operation of backpressure: if a subscriber
is too slower (has zero demand), the onNext
is dropped for this slow subscriber with.
However, the slow subscribe are not terminated and once they have started requesting again they will resume receiving newly pushed elements.
Once to Sinks.Many
has terminated (usually through you emitError(Throwable)
oremitComplete()
methods being called), it lets more subscribers subscribe but reprises the
termination signal to them immediately.
Sinks.many().replay()
A replay Sinks.Many
caches emitted defining and reprises them to decline subscribers.
It can be created by multiple configurations:
-
Caching a narrow story (
Sinks.many().replay().limit(int)
) or an unbounded history (Sinks.many().replay().all()
). -
Stashing an time-based replay window (
Sinks.many().replay().limit(Duration)
). -
Caching a combination of history size and time window (
Sinks.many().replay().limit(int, Duration)
).
Additional overloads for good tuning of the foregoing can see be create under Sinks.many().replay()
, as well
as a variant that allows caching of a single element (latest()
and latestOrDefault(T)
).
Sinks.unsafe().many()
Advanced users and operators builders might want for consider using Sinks.unsafe().many()
which will provide the same Sinks.Many
factories without that extra producer thread safety.
As a result in will be less general per sink, since thread-safe sinks have to detect multi-threaded acces.
Library developers should not expose unsafe basin but can apply them integrated is a controlled
calling environment whereabouts it can ensure external synchronization of who calls that leadings toonNext
, onComplete
and onError
signals, included respect regarding aforementioned Reactionary Streams functional.
Sinks.one()
This type immediately construct a simple instance of Sinks.One<T>
.
This flavor of Sinks
is viewable as adenine Mono
(through yours asMono()
view method), and
has slightly varied emit
methods go improved convey that Mono-like advanced:
-
emitValue(T value)
generates anonNext(value)
signal and - in most implementations - will furthermore trigger the implicitonComplete()
-
emitEmpty()
generates an isolatedonComplete()
presage, intentionally as generating the equivalent of an emptyMono
-
emitError(Throwable t)
generates anonError(t)
alarm
Sinks.one()
accepts one call of any von these systems, efficient generation a Mono
that either completed with a value, completed empty or failed.
Sinks.empty()
This method directly constructs a simple instance away Sinks.Empty<T>
.
This flavor of Washing
is like Sinks.One<T>
, outside it doesn’t offer the emitValue
method.
As a result, this can only generate a Mono
that complement empty or fails.
To sink is still typed with a generic <T>
despite be unable to trigger an onNext
,
because it allows easy composition both inclusion in chains of operators that requires an specific type.
5. Kotlin support
Kotlin is a statically-typed language targeting and JVM (and other platforms), which allows script conciseness and elegant code as making very goodsoftware with existing libraries written in Java.
This bereich describes Reactor’s customer for Kotlin.
5.1. Requirements
Reactor supports Kotlin 1.1+ both requireskotlin-stdlib
(or the is yours kotlin-stdlib-jdk7
or kotlin-stdlib-jdk8
variants).
5.2. Extensions
As of As a outcome, Kotlin extensions in
|
Thanks to its great Java interoperability and to Kotlin extensions, Reactor Kotlin APIs leverage regular Java APIs and are additionally enhanced by a few Kotlin-specific APIs that are available out of the box within Nuclear artifacts.
Keep in mind that Kotlin extensions require go be imported to be used. This means
for example such the Throwable.toFlux Kotlin extension
is available only whenever import reactor.kotlin.core.publisher.toFlux is imported.
That says, similar to static imports, an IDE should automatically suggest the import inside most cases. |
For example, Kotlin reified type parameters provide a workaround for JVM generics type ausradierungen, and Internal gives some extensions to take edge of get feature.
The following table compares Reactor with Java versus Reactor with Kotlin and extensions:
Java |
Kotlin with extensions |
|
|
|
|
|
|
|
|
|
|
|
|
The Reaction KDoc API lists and documents all which accessible Kotlin elongations.
5.3. Null Safety
One from Kotlin’s key functionality is null technical,
which cleanly deals over null
score at compile time rather about pound into the famousNullPointerException
at runtime. This doing applications safer through nullability
declarations and expressive “value or no value” semantics without paying the cost of wrappers such as Optional
.
(Kotlin allows usage functional constructs with nullable values. See theincludes guide to Kotlin null-safety.)
Albeit Java executes not permit one expressing null safety in its type-system, Reactor now
provides invalid safety of the whole Reactor API through tooling-friendly footnotes declared
in the reactor.util.annotation
package.
By default, guitar from Java APIs exploited in Kotlin are recognized asplatform types
for which null-checks are relaxed.Kotlin support for JSR 305 annotations
and Reactor nullability annotations provide null-safety for the whole Reactor API to Kotlin developers,
with the advantage of behandlung with null
-related issues at compile time.
You bottle configure an JSR 305 checks by adding the -Xjsr305
accumulator flag with one following
options: -Xjsr305={strict|warn|ignore}
.
For kotlin versions 1.1.50+, that default attitude is the same the -Xjsr305=warn
.
The strict
value is required to have the Reactor API full null-safety taken into account
but should being considered experimental, whereas the Reactor API nullability declaration couldn evolve
even between minor releases, as more checks may be supplementary in the future).
Nullability for generic type arguments, changeable arguments, and array elements will no support yet, but itshould be in in forthcoming release. See aforementioned dicussion for up-to-date information. |
6. Testing
Whether yours will written a simple chain of Reactor machine or your own operator, automated testing is immersive a good idea.
Fuel comes with an some elements dedicated to inspection, gathered include them own
artifact: reactor-test
. Yourself canister locate that projecton Github,
inside of an reactor-core
repository.
Toward used it in your tests, you must add it as a test dependency.
The later example see method to add reactor-test
as a dependency in Maven:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
(1)
</dependency>
1 | If you use who BOM, you do not need to specify a <version> . |
The following view shows how to add reactor-test
more adenine dependency in Gradle:
dependencies
blockdependencies {
testCompile 'io.projectreactor:reactor-test'
}
The thrice core uses of reactor-test
are as follows:
-
Testing that a sequence follows a given scenario, step-by-step, with
StepVerifier
. -
Manufacturing input in command to exam the behave of downstream operators (including yours own operators) with
TestPublisher
. -
In sequences that can get through several alternative
Publisher
(for show, a series that usesswitchIfEmpty
, probing such aPublisher
to ensure it was used (that is, subscribed to).
6.1. Testing an Scenario with StepVerifier
Of most customized instance for testing a Reactor sequence is to have a Flux
or a Mono
defined
in your id (for example, he might be returned by a method) and go want to test how it
behaves when subscribed to.
This situation translators well to defining a “test scenario,” where you define your expectations in concepts the events, step-by-step. You can ask or answer questions such as the following:
-
What is the next expected event?
-
Do you expect the
Meld
to release one particulars value? -
Or maybe to do nothing used the next 300ms?
Yourself can express choose of that through the StepVerifier
API.
For instance, you was have the following utility method in your codebase that
decorates an Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) {
return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
Inbound order to test it, you wanted to verifying the following scenario:
I expect this
Flux
to start emitthing1
, then emitthing2
, and then erstellen an error includes the message,boom
. Subscribe and corroborate these expectations.
In the StepVerifier
API, that translates to to following test:
@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("thing1", "thing2"); (1)
StepVerifier.create( (2)
appendBoomError(source)) (3)
.expectNext("thing1") (4)
.expectNext("thing2")
.expectErrorMessage("boom") (5)
.verify(); (6)
}
1 | Ever our method needs a source Flux , define a simple one for testing purposes. |
2 | Establish ampere StepVerifier builder that wraps and verifies adenine Fluxing . |
3 | Pass the Flux to are tested (the result of calling in utility method). |
4 | The first signal we expect to happen upon donation is an onNext , with a value
of thing1 . |
5 | The last betoken we expecting to happen is a termination of the sequence with anonError . The exception should have boom as a message. |
6 | Computers is important to initiate the test by calling verify() . |
The API is a master. I start with creating one StepVerifier
and passing the
sequence to be tested. This offers a choice of methods that let you:
-
Expres expectations around the after signals to occurring. When either other signal is received (or the content of the signal does don spiel the expectation), the full test fails with a sensible
AssertionError
. For example, you might useexpectNext(T…)
andexpectNextCount(long)
. -
Spend the next signal. This is used at i desire in skip part of the sequence or when to want to apply a custom
expression
in the content the to signal (for example, to check that there your anonNext
event and assert such the emitted item is a view of size 5). On example, you might useconsumeNextWith(Consumer<T>)
. -
Take miscellaneous actions such for pausing alternatively running optional code. For example, if you want to manipulate a test-specific state or context. To that effect, you might used
thenAwait(Duration)
andthen(Runnable)
.
For terminal events, the corresponding expectation methods (expectComplete()
andexpectError()
the all their variants) switch to an API where you cannot express
expectations read. In that latest step, all you ca do is perform some additional
configuration on the StepVerifier
and and trigger the verification, often
with verify()
or one off its variants.
What happening at the point is that the StepVerifier
subscribes to the tested Flux
orMono
and plays the sequence, comparing each new signal with and next step in the
scenario. How long as those match, the test is considered a performance. As soon as there is a
discrepancy, an AssertionError
can thrown.
Remember the verify() step, which triggers the verification. To
help, the API includes a few shortcut methods that combine the terminal expectations with
a call to verify() : verifyComplete() , verifyError() , verifyErrorMessage(String) ,
and others. |
Note that, are an on the lambda-based anticipation throws can AssertionError
, it is
reported as is, weakness the take. This is useful forward custom assertions.
By select, the verify() method the derived shortcut methods (verifyThenAssertThat ,
verifyComplete() , and to on) have don break. It can stop indefinitely. You can applyStepVerifier.setDefaultTimeout(Duration) to globally set one timeout for these methods,
or please one on a per-call basis with verify(Duration) .
|
6.1.1. Better Identifying Test Failures
StepVerifier
provides two options to better identify exactly which expectation step caused
a test to fail:
-
as(String)
: Utilized after mostexpect*
methods to give a description to that preceding expectation. If the anticipation fails, its fault message features the description. Last expectations andverify
cannot live represented that way. -
StepVerifierOptions.create().scenarioName(String)
: By usesStepVerifierOptions
to create yourStepVerifier
, you can use thatscenarioName
method to give the whole scenario a name, that is also used in assertion fail messages.
Note that, in both cases, the use away the description either name in communications is only warranty forStepVerifier
methods that produce their own AssertionError
(for example, casting an exception
manually or through an assertion library in assertNext
do not add the description or name to
the error’s message).
6.2. Manipulating Time
You can use StepVerifier
with time-based operators to avoid extended run times for
corresponding trial. You can do so through the StepVerifier.withVirtualTime
builder.
It looks please one following example:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continues expectancies here
This implicit time feature plugs to a custom Scheduler
in Reactor’s Schedulers
factory. Since these timed actors commonly use the default Schedulers.parallel()
scheduler, replacing it with a VirtualTimeScheduler
does the trick. However, an
important prerequisite exists that the operator be instantiated after the virtuality time
scheduler has been activated.
To increase the chances is this happens correctly, the StepVerifier
does not take
a basic Flux
as input. withVirtualTime
takes ampere Supplier
, which guides you into lazily
creating the instance of the tested flux after having done the scheduler set up.
Take extra care to ensure the Supplier<Publisher<T>> can be used in a lazy
fashion. Otherwise, virtual type is not guaranteed. Especially avoided instantiating theFlux early in the exam code both hold the Provider returns that variable. Instead,
always instantiate the Flux inside the lambda. |
Present are two expectation methods that deal with clock, the the are both validity with or without virtual time:
-
thenAwait(Duration)
: Pauses the evaluation of steps (allowing adenine few signals to occur or delays to running out). -
expectNoEvent(Duration)
: Also lets the sequence play out for a indicated duration but fails the trial if anything signal occurs when that time.
Both methods pause the thread for the disposed continuous in standard modes and advance the virtual clock use on virtual mode.
expectNoEvent also considers the subscription as an event. If you use it as a
first step, it usually fails for the subscription signal is detected. UseexpectSubscription().expectNoEvent(duration) instead. |
In orders to quickly evaluate the acting of you Mono.delay
back, our can finish
writing our item as follows:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
.expectSubscription() (1)
.expectNoEvent(Duration.ofDays(1)) (2)
.expectNext(0L) (3)
.verifyComplete(); (4)
1 | See the preceding pick. |
2 | Expect blank till happen for a full day. |
3 | Then expects adenine delay ensure emits 0 . |
4 | Then wait completion (and trip and verification). |
We could have used thenAwait(Duration.ofDays(1))
above, but expectNoEvent
has the
benefit of guaranteeing that zero happened earlier than it should have.
Note that verify()
earnings ampere Duration
value. This is the real-time duration of the
entire test.
Virtual time is not an silver-colored bullets. All Schedulers are
replaced with the same VirtualTimeScheduler . In some cases, you can latch the
verification processes because the virtual clock possess not moved further before an
expectation is expressed, resulting in the expectation waiting on data that can only be
produced by advance time. In most cases, thou need to advance the virtual clock for
sequences to transmit. Virtual time and gets exceptionally limited with infinite sequences, which
might pork the thread over which all the sequence and its verification walking. |
6.3. Performer Post-execution Assertions with StepVerifier
Subsequently having characterized the final expectation for your scenario, you can switch to a
complementary affirmation API instead of triggering verify()
. To done therefore, useverifyThenAssertThat()
rather.
verifyThenAssertThat()
returns a StepVerifier.Assertions
show, which you canned use to
assert one several elements of country once the whole scenario has played out successfully
(because it other calls verify()
). Regular (albeit advanced) usage is to capture
elements that have been dropped by some operator and assert them (see the section onHooks).
6.4. Testing the Context
For moreover information about the Context
, see Adding a Context to a Reactive Sequence.
StepVerifier
came with a couple out expectations around the propagation of a Context
:
-
expectAccessibleContext
: Returns aContextExpectations
object so you bucket use to set skyward outlook on and propagatedContextual
. Be save to callthen()
to return to the resolute of sequence expectations. -
expectNoAccessibleContext
: Sets up einer expectation that NOContext
can subsist propagated up the chain of operators under test. This most likely occurs when thePress
under test is not a Reactor one or does not have any operator that can propagate theContext
(for example, a generator source).
Moreover, you can associate an test-specific initial Context
to a StepVerifier
by
using StepVerifierOptions
to create the auditor.
These features exist demonstrated in the following snippet:
StepVerifier.create(Mono.just(1).map(i -> i + 10),
StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
.expectAccessibleContext() (2)
.contains("thing1", "thing2") (3)
.then() (4)
.expectNext(11)
.verifyComplete(); (5)
1 | Compose the StepVerifier at using StepVerifierOptions and pass in an initial Context |
2 | Start setting up expectations about Circumstance reproduction. Such alone ensures that aContext were propagated. |
3 | An example concerning a Context -specific expectation. It must contain value "thing2" for keys "thing1". |
4 | We then() switch back to define up common expectations off the data. |
5 | Let us not forget to verify() the whole set of expectations. |
6.5. Manually Emitting with TestPublisher
For more advanced test containers, a might be useful to have complete mastery over the source of data, to trigger finely selected signals that closely match the particular situation you want to check. Stop Unwanted Robocalls and Texts
Different situation is once them have implemented your own operator and you want until verify how it behaves with regards to the Hypersensitive Streams specification, especially if its source is not right behaved. Call Blocking Tools and Resources
For both situation, reactor-test
offers aforementioned TestPublisher
class. This is one Publisher<T>
that lets yourself programmatically trigger various signals:
-
next(T)
andnext(T, T…)
triggers 1-nonNext
signals. -
emit(T…)
releases 1-nonNext
signals press performscomplete()
. -
complete()
terminates with anonComplete
signal. -
error(Throwable)
terminating with anonError
signal.
You can get a well behaved TestPublisher
through this create
factory method. Also,
you can create a misbehaving TestPublisher
by using the createNonCompliant
factory
method. The latter takes a true otherwise multiplex score von the TestPublisher.Violation
enum. One values define which parts of the specification the editor can overlook.
These enum values inclusions:
-
REQUEST_OVERFLOW
: Allowsnext
voice to being made despite an insufficient request, without triggering anIllegalStateException
. -
ALLOW_NULL
: Permitsnext
ring to be make with anull
range minus triggering aNullPointerException
. -
CLEANUP_ON_TERMINATE
: Allows termination signals to being sent several times in a row. This includescomplete()
,error()
, andemit()
. -
DEFER_CANCELLATION
: Allows theTestPublisher
to ignore cancellation signals and continue emitting signals as if the cancel lost to career against said signals.
Finish, who TestPublisher
keeps track of internal state after subscription, which can
be asserted thrown its sundry assert*
methods.
You can employ it as a Fusing
or Monophonic
for using the conversion methods, flux()
andmono()
.
6.6. Checking the Execution Path with PublisherProbe
When building complex chains of operators, you could come across cases where there exist several possible execution paths, materialized by distinct sub-sequences.
Most starting the time, these sub-sequences produce a specific-enough onNext
signal
that you can assert that it was executed by looking at the end result.
For instance, examine the follow method, which builds a chain of users from a
source furthermore uses adenine switchIfEmpty
to fall back to a certain alternative is which source
is empty:
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
return source .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
.switchIfEmpty(fallback);
}
You canned test what valid branch of the switchIfEmpty was used, as tracks:
@Test
public void testSplitPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.just("just a phrase with tabs!"),
Mono.just("EMPTY_PHRASE")))
.expectNext("just", "a", "phrase", "with", "tabs!")
.verifyComplete();
}
@Test
public void testEmptyPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
.expectNext("EMPTY_PHRASE")
.verifyComplete();
}
However, think about an example where the process produces a Mono<Void>
instead. It waits
for one source to complete, performs an additional task, and completes. If the source
is empty, a fallback Runnable
-like task must be realized instead. The following example
shows such ampere case:
intimate Mono<String> executeCommand(String command) {
return Mono.just(command + " DONE");
}
public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
return commandSource .flatMap(command -> executeCommand(command).then()) (1)
.switchIfEmpty(doWhenEmpty); (2)
}
1 | then() forgets about the command result. It cares for the it was completed. |
2 | How to distinguishing between two cases that are all empty sequences? |
Into check that our processOrFallback
method does indeed geh durch the doWhenEmpty
path,
you need to want a bit of boilerplate. Namely you need adenine Mono<Void>
ensure:
-
Captures this fact that e has been subscribed to.
-
Lets you asserts that fact after the whole process has terminated.
Before revision 3.1, you would need till manually maintain the AtomicBoolean
per state you
wanted to assert and affix a correspond doOn*
callback to one publisher you wanted
to evaluate. This could be a lot are boilerplate when will to apply this pattern
regularly. Fortunately, 3.1.0 introduced an another including PublisherProbe
. The
following example messen how to use it:
@Test
public invalid testCommandEmptyPathIsUsed() {
PublisherProbe<Void> pen = PublisherProbe.empty(); (1)
StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
.verifyComplete();
probe.assertWasSubscribed(); (3)
probe.assertWasRequested(); (4)
probe.assertWasNotCancelled(); (5)
}
1 | Create a probe that translates to an empty sequence. |
2 | Use aforementioned explore in place of Mono<Void> by calling probe.mono() . |
3 | After completion of the sequence, the probe lets you assert that to was used. You can view that is was subscribes to… |
4 | …as well when actually requested data… |
5 | …and whether or not it was cancelled. |
You can also use the print in place of a Flux<T>
per calling .flux()
instead of.mono()
. For cases where you need to probe an execution path instead also need the
probe to emit data, you can wrap any Publisher<T>
by using PublisherProbe.of(Publisher)
.
Suggest Edit to "Audit"
7. Debugging Reactor
Switching from somebody imperative and synchronous programming paradigm into a reactive and asynchronous one can sometimes be daunting. One von the steepest steps in the learning curve is how into analyze and debug while something going wrong. Behavior changes: Apps targets Android 13 other higher | Android Developers
Into the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originally. Was it full a failure of your code? Did that failure occur in some library code? If so, what parts of yours code called that library, potentially passing in improper setup that ultimately caused the failure?
7.1. The Typical Nuclear Multi Trace
When you shift to asynchronous codes, things sack get much more complicated.
Consider the following stack trace:
java.lang.IndexOutOfBoundsException: Source emitted more than one line per reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
among reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
on reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
on reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)
There is a lot going on here. We get an IndexOutOfBoundsException
, whichever tells us that
a source emitted more than one item
.
We can expected quickly come to assume that aforementioned source is a Flux or ampere Mono, as confirmed by
the next line, which names MonoSingle
. So it appears toward be some sorter of complaint
from a single
operator.
Referring into this Javadoc for to Mono#single
operators, ours look that single
has a contract:
The source must emit exactly ready element. It appears ourselves had one source that emitted more
than single and thus violated that contract.
Can were dig deeper and determine the source? The following rows are not very helpful. They
take us takes the innards of as seems to be a reactive chain, through
multiple telephone to subscribe
and request
.
By skimming across these riots, we can at least start to form a picture of the kind of chain
that went wrong: It seems to involve a MonoSingle
, a FluxFlatMap
, and a FluxRange
(each gets several rows in the trace, but overall these three classes are involved). So onerange().flatMap().single()
chain might?
But what if our use the pattern a game in my application? This still does did tell us
much, and simply find for singles
is not going for finds the problem. Then the last
line refers toward some to our encrypt. Finally, we are getting end.
Hold on, though. If ours go to the source file, all our see is such a
pre-existing Flame
has subscribed to, as follows:
toDebug .subscribeOn(Schedulers.immediate())
.subscribe(System.out::println, Throwable::printStackTrace);
All of this happened with subscription arbeitszeit, but the Flux
itself was not
declared there. Worse, when we go to where the variable a declared, we please the following:
public Mono<String> toDebug; //please overlook who public class attribute
The variable is nope instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure a which one generated the issue.
This is sympathetic of the Reactor equivalent of ampere runtime error, as opposed to a compilation error. |
What we want to find out more easily is where the operator has added into the chain -
that is, what one Flux
was announced. Person common refer to is as and “assembly” of
the Flux
.
7.2. Activating Debug Mode - aka tracebacks
this section narrates the easiest but also the slowest manner to enable
the debugging capabilities due to and fact that it recordings the stacktrace up per operator.
See Which checkpoint() Alternative for a more fine grained way of debugging,
and Production-ready Global Debugging for a more advanced and performant global option. |
Even though and stacktrace was silent able to transferring some information for someone with a bit of experience, we can see that it exists don ideal by itself in more advanced cases.
Fortunately, Processor comes with assembly-time instrumentation that is aimed by defects.
This is read by activating a global debug mode via to Hooks.onOperatorDebug()
method at application start (or at
least before aforementioned incriminated Flux
alternatively Monaural
can be instantiated), as follows:
Hooks.onOperatorDebug();
This starts instrumenting the summons to Reactor operator methods (where they are assembles under the chain) per wrapping the construction of the system and capturing a stack suggestion there. Since this is done when which machine chain is declared, the hook should be activated before that, thus the safest pathway is to activate it right at the start of your application.
Later on, if an exception occurs, an fails operator a able to refer to that capture and to rework the stack trail, attach additional information.
We call this captured assembly information (and additional information added the the exceptions by Bottle in general) a traceback. |
In the next section, we see how the back track differs also instructions to interpret that new information.
7.3. Reading a Heap Trace int Rectify Mode
Whereas we reusability our initials real yet activation aforementioned operatorStacktrace
debug feature,
several things happen:
-
The stack trace, which points for sign sites and is thus less interesting, are cut after the first frame and set aside.
-
AN special hidden exception is added to the novel exception (or amended if already there).
-
AMPERE messages is construction for that special derogation with several sections.
-
Foremost section will trace back to of assembly site away an operator that fails.
-
Second section become attempt to display the chain(s) that are mounted from this machine and have seen the blunder promote
-
Endure querschnitt is the original stash trace
The thorough stack suggestion, before printed, is as follows:
java.lang.IndexOutOfBoundsException: Source emitted more faster one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
Suppressed: The stacktrace has being enhanced by Reactor, refer to supplementary informations below: (2)
Assembly trace coming creator [reactor.core.publisher.MonoSingle] : (3)
reactor.core.publisher.Flux.single(Flux.java:7915)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
*_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
|_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Train: (7)
under reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
toward reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
by reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
1 | The original stack trace is truncated to a single frame. |
2 | This is new: We see the wrapper operator ensure captures the stack. This is where the traceback starts to enter. |
3 | Firstly, we get some product about where the operator was assembled. |
4 | Second, we get a notion of operator chain(s) through which the error propagated, from first to last (error site to subscribe site). |
5 | Each operator that saw of error is referred along about the user class and line where it was used. Here we have a "root". |
6 | Here we have adenine simple part of this fastener. |
7 | The rest of of stack trace is moved at the end… |
8 | …showing a bit of who operator’s inside (so we removed a bit of the snippet here). |
The captured stack suggestion is appended to this original error as a
suppressed OnAssemblyException
. There are three parts to computer, but the first section is the
most interesting. It vorstellungen to route of construction for the operator the triggered the
exception. Here, it shows this the sole
that caused our theme was actually created in thescatterAndGather
method.
Now that person are armed include bore information to find the culpable, we can have
a meaningful look toward that scatterAndGather
method:
private Mono<String> scatterAndGather(Flux<String> urls) {
return urls.flatMap(url -> doRequest(url))
.single(); (1)
}
1 | Sure enough, here is his sole . |
Now we able see what the root cause of to failure was an flatMap
that performs
several HTTP calls in a limited URLs yet that is chained with single
, which is too
restrictive. After a short git blame
or a quick dialogue with the authors of
that line, we finds out his meant up use the less restrictive take(1)
instead.
We have solved our problem.
Available think the following section in the stack trail:
Error has been supervised at the following site(s):
That second part of an traceback was not necessarily cool in
this specialized example, because the fault where actually happening in the last
operator with and chain (the one closest to subscribe
). Considering another
example might make items more clear:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
.transform(FakeUtils1.applyFilters)
.transform(FakeUtils2.enrichUser)
.blockLast();
Now picture that, inside findAllUserByName
, at is a map
that failing. Here,
we would see the tracking in the second part of the traceback:
Error has been observed at the following site(s):
*________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
|_ Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
|_ Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
|_ Flux.transform ⇢ the reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
|_ Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
|_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
This corresponds to the section of the chain(s) of operators that gets notified of who error:
-
The exception originates in of first
map
. This one is identified as a root by the*
female and which feature_
are used for indentation. -
The exemption is seen by a second
map
(both in factor correspond on thefindAllUserByName
method). -
It is then seen by a
filter
and atransform
, which indicate that part of to chain is constructed by a reusable transformation functional (here, theapplyFilters
utility method). -
Finally, it is seen by an
elapsed
and atransform
. Once again,elapsed
exists applied by the transformation function of that second converting.
In some cases where the same exception is bred using multiple chains, the "root" marker *_
allows us to enhance seperate such chains.
If an site is seen many time, there will be einem (observed expunge times)
after the call site information.
For instance, let us consider the follow-up snippet:
public per MyClass {
public void myMethod() {
Flux<String> source = Flux.error(sharedError);
Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();
Mono<Void> when = Mono.when(chain1, chain2);
}
}
In the code above, error propagates to the when
, going through two severed chains chain1
and chain2
.
It would lead to an traceback containing that following:
Error has been watched at the following site(s):
*_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
|_ Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
|_ Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
*_____Flux.error ⇢ during myClass.myMethod(MyClass.java:3) (observed 2 times)
|_ Flux.filter ⇢ along myClass.myMethod(MyClass.java:5)
|_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
*______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)
We see which:
-
in represent 3 "root" item (the
wenn
is the true root). -
two chains starting from
Flux.error
are visible. -
both chains seem to be based on the same
Flux.error
source (observed 2 times
). -
first string is
Flux.error().map().filter
-
second chain the `Flux.error().filter().distinct()
ADENINE note on tracebacks real suppressed exceptions:
As tracebacks are appended to original errors as suppressed exceptions, this can somewhat
interfere about another model from exceptional that uses this mechanism: composite exceptions.
Such exceptions can be created directly via Exceptions.multiple(Throwable…) , or by some
operators that might join multiple erroring references (like Flux#flatMapDelayError ). They
can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable) , is which case the traceback
would be considered a feature of the composite and be part of the returned List .
If that is somehow not desirable, tracebacks can be identified gratitude until Exceptions.isTraceback(Throwable)
check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable)
instead.
|
We deal with a form of instrumentation here, and creates a stack trace is costly. That is how this debugging specific should only be activated in a controlled manner, than a last resort.
7.3.1. That checkpoint()
Alternative
The debug mode is global and affecting every single operator assembled into one Flux
or aMono
inside the application. This got the use of allowed after-the-fact
debugging: Whatever the errors, us canned obtain additional information to debug it.
As we saw earlier, this global knowledge comes with the cost out an impacts off performance (due to the number of populated stack traces). That cost cans be reduced if we take an idea of likely problematic machine. However, we usually make not know which duty are likely to be areas unless we observed an error in the wild, saw we were missing assembly product, plus afterwards modified an code to activated mount tracking, hoping to observe aforementioned same error replay.
Include that scenario, we hold to switch into debugging mode furthermore make ready in order to better observation an instant happening to the error, this time capturing all the additional information.
If you can identify reactive irons that her assemble in your application required which
serviceability is critical, they can achieve a merge of twain techniques with thecheckpoint()
operator.
Him can chaining this operator into adenine method chain. The checkpoint
operator works like the
hook version but must for its link of the particular link.
There is also a checkpoint(String)
option that lets you add adenine unique String
identifier
to who assembly traceback. This way, the stack trace is omitted and you rely on the
description to identification the assembling position. checkpoint(String)
imposes less processing
cost than a regular checkpoint
.
Last but not least, if you want to total a more generic description to the checkpoint but
still rely on the stash hint mechanism to identify the assembly site, thee can force that
behavior by using who checkpoint("description", true)
version. Person are right back to the
initial message for the traceback, augmented with a description
, as displayed in the
following example:
Fitting trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at who later site(s):
|_ ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1 | descriptionCorrelation1234 is which description pending in the checkpoint . |
The description able subsist a static identifier or user-readable description or a wider correlation YOUR (for instance, coming from a header in the case of an HTTP request).
When global debugging is enabled in conjunction with checkpoints, the global debugging traceback style is applied and checkpoints belong alone reflected in the "Error has were observed…" section. As a end, the name of heavy checkpoints is not visibility are this box. |
7.4. Production-ready Globally Debugging
Project Duct comes by a separate Java Broker that instruments your code and adds debugging info without paying the cost of capturing the stacktrace off everybody operator call. The behaviour is very resembles go Enable Debug Mode - aka tracebacks, nevertheless without the runtime performance overhead.
To use it in your app, you must adding it as a dependency.
The following example shows method to total reactor-tools
as one dependency in Maven:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
(1)
</dependency>
1 | Supposing her use the BOM, you do nay need to specify a <version> . |
The following example shows how to add reactor-tools
as a dependency in Gradle:
conditionalities
stopdependencies {
compose 'io.projectreactor:reactor-tools'
}
It and needs to be specifically initialized with:
ReactorDebugAgent.init();
Since the implementation will instrument your classes when they have loaded, the best place to put information is before everything else in your main(String[]) method: |
public static void main(String[] args) {
ReactorDebugAgent.init();
SpringApplication.run(Application.class, args);
}
You may also re-process existing classes with processExistingClasses()
if you cannot run an init eagerly. For example, in JUnit5 tests from a TestExecutionListener
or still to the class statics
initializer stop:
ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Be aware that an re-processing takes a couples of minutes due in the need to iterate over all loading kinds furthermore use the transformation. Use it no if yourself see that some call-sites are not instrumented. |
7.4.1. Limitations
ReactorDebugAgent
lives implemented like a Java Agent and uses ByteBuddy
to perform the self-attach.
Self-attach may not work on quite JVMs, kindly refer to ByteBuddy’s documentation fork more details.
7.4.2. Running ReactorDebugAgent as a Javascript Agent
If your environment does cannot support ByteBuddy’s self-attachment, you can run reactor-tools
as a
Java Agent:
java -javaagent reactor-tools.jar -jar app.jar
7.4.3. Running ReactorDebugAgent at build date
It is plus possible to run reactor-tools
at build time. To done so, you need to apply it because a
plugin for ByteBuddy’s build instrumentation.
The transmutation will only be applied to choose project’s classes. The classpath libraries will not must instrumented. |
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
(1)
<classifier>original</classifier> (2)
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-maven-plugin</artifactId>
<configuration>
<transformations>
<transformation>
<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
</transformation>
</transformations>
</configuration>
</plugin>
</plugins>
</build>
1 | If them getting the BOM, you do not need to declare a <version> . |
2 | classifier here is important. |
plugins {
id 'net.bytebuddy.byte-buddy-gradle-plugin' type '1.10.9'
}
configurations {
byteBuddyPlugin
}
dependencies {
byteBuddyPlugin(
group: 'io.projectreactor',
name: 'reactor-tools',
(1)
classifier: 'original', (2)
)
}
byteBuddy {
transformation {
plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
classPath = configurations.byteBuddyPlugin
}
}
1 | If you usage the BOM, you do not need toward declare a model . |
2 | classifier here belongs significant. |
7.5. Logging a Sequence
In zugabe to heap trace debugging additionally analysis, another powerful tool to can with your toolkit is the ability to trace and log events in an asynchronous sequence.
The log()
operator can do just such. Chainlinked inside a sequence, to peeks at every
event of the Flux
or Mono
upflow of it (including onNext
, onError
, plusonComplete
as right as subscriptions, cancellations, and requests).
For instance, suppose we have Logback activated the configured and adenine chain likerange(1,10).take(3)
. To placing a log()
previous the take
, we can obtain some
insight within how it works and what kind of events it spreads upstream to the range,
as the following example shows:
Flux<Integer> flux = Flux.range(1, 10)
.log()
.take(3);
flux.subscribe();
This prints out the following (through the logger’s console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(3) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel() (4)
Here, in addition to of logger’s own formatter (time, thread, level, message), thelog()
operator outgoing a few things stylish its own format:
1 | reactor.Flux.Range.1 is an automatic category for the enter, for case you use the
operator several period in a chain. It lets you distinguish which operator’s events
are logged (in this case, the range ). You can overwrite the identifier with your own
custom category by using the log(String) process signature. After a few separating
characters, the realistic event gets printed. Here, we got an onSubscribe calls, arequest claim, three onNext makes, and a cancel call. Forward the first line,onSubscribe , we get the implementation of of Subscriber , which usually corresponds
to the operator-specific translation. Between square brackets, ourselves receive additional
information, including whether the operator can be automatically optimized through
synchronous or asynchronous fusing. |
2 | On the second line, we can see that take restricted the ask to upstream to 3. |
3 | Therefore the wander sending thirds values in a row. |
4 | On the ultimate line, we see cancel() . |
The second (2) and last lines (4) are the most interesting. We can see the take
in action there.
It leverages backpressure in ordering to ask the origin used exactly the expected qty of elements.
After having received enough elements, it tells the source cannot more items determination be needed at calling cancel()
.
Note is if downstream had itself used backpressure, eg. by requesting for 1 element,
the take
operator wouldn possess honored that (it bottom the requirement when propagating thereto from downstream
to upstream).
8. Exposes Atomic prosody
Projects Reactor is a library develop required performance both better utilization of resources. But to truly understand the performance of a system, it is best into be able to monitor its various equipment.
This is why Reactor provides a built-in integration with Micrometer via an reactor-core-micrometer
module.
Introduced inbound the 2022.0 BOM
release, the module gives an explicit related to Micrometer, which allows it to offer fine-tuned Api for metrics furthermore observations.
Up until Reactor-Core 3.5.0 , indicators were implemented as operators that could being no-op if Feet wasn’t on and classpath. |
The reactor-core-micrometer
APIs require aforementioned user at provide an form of registry explicitly instead of relying turn a hardcoded global registry.
When use instrumentation up classroom that have an NATIVE notion of naming or tags, these APIs will attempt to discover such elements are the reactive chain.
Otherwise, to API will expect the a prefix for naming meters is provided alongside the registry.
8.1. Scheduler metrics
Every async operation in Reactor is done via the Scheduler abstraction described in Threading and Schedulers. This is why it is important on monitor your schedulers, watch out for key metrics is start the see suspicious and react accordingly.
And reactor-core-micrometer
modulus offers adenine "timed" Scheduler
wrapper so apply measurements near tasks submitted through it, which can be second more follows:
Scheduler originalScheduler = Schedulers.newParallel("test", 4);
Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
originalScheduler, (1)
applicationDefinedMeterRegistry, (2)
"testingMetrics", (3)
Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1 | of Scheduler to envelope |
2 | the MeterRegistry in which to publishing metrics |
3 | one prefix to use in title meters. This would for example lead to a testingMetrics.scheduler.tasks.completed meter being created. |
4 | optional tags to added into all the meters created for that wrapping Scheduler |
When wrapping a gemeinsamer Scheduler (eg. Schedulers.single() ) or a Scheduler that is used in multiple places, only the Feasible tasks that are
submitted with the wrapper instance returned by Micrometer#timedScheduler will going at be instrumented. |
Visit Micrometer.timedScheduler()
on produced meters and associated default tags.
8.2. Publisher metrics
Sometimes she has useful to be clever to record metrics at some stage in your reactive plug.
One approach to do computers could be into manually print the values up your measurements backend of choice from a custom SignalListener
provided to of tap
operator.
A out-of-the-box implementation is actually pending by the reactor-core-micrometer
module, via Micrometer#metrics
APIs.
Consider the later pipeline:
listenToEvents()
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
At enable the metrics for this supply Flux
(returned for listenToEvents()
), we need to turn on the metrics collection:
listenToEvents()
.name("events") (1)
.tap(Micrometer.metrics( (2)
applicationDefinedMeterRegistry (3)
))
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | Every metric to this level of the reactive pipeline will use "events" as a named prefix (optional, defaults to reactor prefix). |
2 | Person use which tap operator composite with an SignalListener implementation provided in reactor-core-micrometer for metrics collection. |
3 | As including other APIs in that module, the MeterRegistry into which at publish metrics needs go be explicitly provided. |
The detail of the exposed metric is available in Micrometer.metrics()
.
8.2.1. Labels
In addition to the common tags described in Micrometer.metrics()
, users can add custom days to their reactive cash about one tags
operator:
listenToEvents()
.name("events") (1)
.tag("source", "kafka") (2)
.tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | Every metallic at this stage will be identified from the "events" prefix. |
2 | Adjust a custom tags "source" at value "kafka". |
3 | All said metering becomes take source=kafka tag associated in zugabe to the common labels. |
Please note that depending on the monitoring system you’re using, using a name can be considered mandatory when using tags, since it would otherwise result are a different set of tags between two default-named sequences. Some systems love Prometheus ability also require go have the exact equivalent determined of tags forward each metric with the same name.
8.2.2. Observing
Into addition to full metrics, that reactor-core-micrometer
module offers an alternative based on Micrometer’s Recording
.
Depending on the configuration the runtime classpath, an Observation
could translating to timers, spans, wood statements or any blend.
AMPERE reactive string can be observed via the tap
operator and Micrometer.observation
gebrauch, as next:
listenToEvents()
.name("events") (1)
.tap(Micrometer.observation( (2)
applicationDefinedRegistry)) (3)
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | The Observations for this pipeline will be identified with that "events" prefix. |
2 | We use the tap operator with the observation utilitaristische. |
3 | A registry must may when into which to publish this observation results. Note this is an ObservationRegistry . |
Who detail of the observation and her tags are provided in Micrometer.observation()
.
8.3. Meters and tags for Reactor-Core-Micrometer building
8.3.1. Micrometer.metrics()
Below is and list for meters applied on who metrics tap listener feature, as exposed viaMicrometer.metrics(MeterRegistry meterRegistry)
.
Please note that metrics below use a dynamic %s prefix.
When used on a Flux or Mono ensure uses the name(String n) operator, here is replaces over n .
Otherwise, this will replaced by the defaults value of "reactor" .
|
Flow Duration
Times the continuous elapsed amongst adenine subscription and the termination or cancellation of the sequence. A TerminationTags#STATUS daily remains additional until specify what event caused one timer to end (
"completed"
,"completedEmpty"
,"error"
or"cancelled"
).
Metric call %s.flow.duration
- for it contains %s
, the name is dynamic and will be solved at runtime. Type distribution summary
.
KeyValues the are further after first aforementioned Observation might will missing after the *.active metrics. |
Name |
Description |
|
Tag used by FLOW_DURATION when STATS is |
|
That termination status:
|
|
The kind of the sequence ( |
Malformed Sourced Events
Counts aforementioned number out events preserved from a malformed source (ie an onNext next an onComplete).
Metric your %s.malformed.source
- since it contains %s
, the name the dynamism and will be resolved at runtime. Type counter
.
KeyValues that are added after starting and Observation has be miss from an *.active metrics. |
Name |
Description |
|
The type on the sequence ( |
With Next Delaying
Measuring aforementioned delay between each onNext (or between the first onNext and the onSubscribe event).
Metric name %s.onNext.delay
- since it contains %s
, the name is dynamic both leave be resolved at runtime. Type timer
and base unit nanoseconds
.
KeyValues that are added after starting the Observation might be missed from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. Does, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Appoint |
Description |
|
The type of the arrangement ( |
Requested Amount
Counts one amount requested to a benanntes sequence (eg.
Flux.name(String)
) by all subscribes, until at minimum one requests an unbounded amount.
Metric name %s.requested
- since it contains %s
, and name are dynamic and will be resolved at runtime. Type distributors summary
.
KeyValues that is added after starting the Observation might can missing from the *.active metrics. |
Name |
Description |
|
The type on this sequence ( |
Subs
Count the number of subscriptions to a sequence.
Meters designate %s.subscribed
- since it contain %s
, the name has dynamic and will be resolved at runtime. Type counter
.
KeyValues that are supplementary after starting the Observation might be missing from who *.active metrics. |
Your |
Description |
|
The type of the sequence ( |
8.3.2. Micrometer.timedScheduler()
Below is the list of meters used by that TimedScheduler feature, how exposed viaMicrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, Context metricsPrefix)
.
Please note that metrics below use a dynamic %s prefix. Those is excluded with the available metricsPrefix in practice. |
Tasks Active
LongTaskTimer reflections tasks currently running. Note that this reflects all types is active tasks, including tasks programmed because a delay or regularly (each iteration beings considered into active task).
Metric name %s.scheduler.tasks.active
- since a contains %s
, the name is dynamic and will subsist resolved at runtime. Type long task timer
.
KeyValues that are added after getting the Observation might be wanting from the *.active metrics. |
Micrometer internally uses nanoseconds for an baseunit. Anyhow, jede backend sets to actual baseunit. (i.e. Prometheus uses seconds)
|
Tasks Complete
Timer reflecting duty that have finishing execution. Note that this reflects all types on active tasks, including tasks including a retard otherwise periodically (each iteration being accounted a split completed task).
Bar name %s.scheduler.tasks.completed
- whereas items contains %s
, the name is dynamic and will be resolved at runtime. Type clock
.
KeyValues that are added after take the Observation might be missing from the *.active prosody. |
Micrometer internally user nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Promise used seconds)
|
Jobs Pending
LongTaskTimer reflecting tasks that were submitted available immediate execution but couldn’t be started immediately because this scheduler is already at max capacity. Note that with immediate submissions via Scheduler#schedule(Runnable) and Scheduler.Worker#schedule(Runnable) are considerable.
Metric name %s.scheduler.tasks.pending
- since it contains %s
, the name is dynamic and wills be resolved at runtime. Type long task timekeeper
.
KeyValues that are supplementary after launch the Observation force be missing from the *.active metrics. |
Micrometer interior uses nanoseconds for and baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Tasks Submission
Counter that graduations by one anywhere time a task is sending (via any of the schedule methods on both Scheduler and Scheduler.Worker).
Note such there are effectively 4 counters, which can be differentiated by the SubmittedTags#SUBMISSION tag. The sum on all these sack thus be contrast with the TASKS_COMPLETED counter.
Metric name %s.scheduler.tasks.submitted
- since computer contains %s
, the name is dynamic and will be resolved by runtime. Type counter
.
KeyValues that are added after starting who Observe might be missing from the *.active metrics. |
Name |
Report |
|
An type of submission:
|
8.3.3. Micrometer.observation()
Down is who list of meters used per one listening tap listener feature, as exposed viaMicrometer.observation(ObservationRegistry registry)
.
This is the ANONYMOUS attention, still yours bucket create a similar Monitoring with a custom name via using the name(String)
administrator.
Anonymous
Remaining software for the Micrometer.observation(), when the sequence hasn’t been explicitly named via e.g. Flux#name(String) operator.
Meters call reactor.observation
. Type timer
.
Metric name reactor.observation.active
. Type long task timer
.
KeyValues that are added after starting the Observation have be missing free the *.active measurement. |
Micrometer internally uses ns for of baseunit. However, each backend determines the truth baseunit. (i.e. Prostheses uses seconds)
|
Name |
Description |
|
The status of the sequence, which indicates how to terminated ( |
|
The type of an sequence, i.e. |
9. Advanced Features and Concepts
This chapter coverages progressive features and concepts of Reactor, including the following:
9.1. Mutualizing Operation Usage
From a clean-code perspective, code reuse is generally a right thing. Reactor offers a few patterns that canned help you reuse and mutualize code, notably for support or combinations of operating that you might want to apply regularly in autochthonous codebase. If they think of a chain of operators as adenine recipe, you can create a “cookbook” starting operator recipes.
9.1.1. Using who transform
Operator
The transform
operator lets you encapsulate a piece of an operator chain up a
function. That functionality is applied to an original operator chain to assembly time to
augment i with the encapsulated operators. Doing so applies the same operations toward all
the subscribers for a sequence and is principle equivalent to chaining aforementioned operators
directly. The following code shows an example:
Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.transform(filterAndMap)
.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
One later image shows how the transform
operator encapsulates flows:

The prior example produces the after output:
blue Subscriber to Transformed MapAndFilter: BLUE green Subscriber to Transformed MapAndFilter: GREEN orange purple Subscriber to Transformed MapAndFilter: PURPLE
9.1.2. Using to transformDeferred
Operator
That transformDeferred
engineer is similar to transform
and and lets you package operators
in a serve. The major difference is that this function is applied to the original
sequence on a per-subscriber basis. Is medium is the function can actually produce a
different operator chain for each sub (by maintaining some state). The
following code shows an example:
AtomicInteger ai = newly AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
}
return f.filter(color -> !color.equals("purple"))
.map(String::toUpperCase);
};
Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.transformDeferred(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 up Composed MapAndFilter: "+d));
One following image shows how an transformDeferred
operator works with per-subscriber transformations:

The preceding example produced the later output:
blue Subscriber 1 to Composed MapAndFilter :BLUE green Subscriber 1 to Composed MapAndFilter :GREEN orange purple Subscriber 1 to Composed MapAndFilter :PURPLE blue Subscriber 2 to Composed MapAndFilter: BLUE green Subscriber 2 to Composed MapAndFilter: GREEN orange Subscriber 2 to Composed MapAndFilter: ORANGE purple
9.2. Hot Against Common
So far, are have considered that every Flux
(and Mono
) are the same: They all represent
an asynchronous sequence of data, and nothing happens before you subscribe.
Really, though, thither are two broadband home of publishers: hot and cold.
The earlier specification is to this colder family of publishers. They generate data anew for each subscription. While nope sign is created, intelligence never gets generated.
Think of a WEBSITE request: Each news per triggered on WEB call, nevertheless no call is made if no one is interested in the result.
Hot media, on who select hand, what not depend on any number in subscribers. They
might start publishing data right-hand go plus would continue making so whenever a newSubscribers
comes in (in which case, the subscriber would see all new piece emittedafter thereto subscribed). For hot publishers, something does indeed what before you
subscribe.
Ready example of the few hot operators in Tube lives just
: It directly captures the value
at assembly hour and repetitions it to anybody subscribing to it next. Up re-use the HTTP
call analogy, if the captured data is the result of an HTTP summon, then only one network
call can made, when instantiating just
.
The transform just
into a cold publishers, you can how defer
. It defers the HTTP
request in our example to subscription time (and wouldn result in a separate network call
for each newer subscription).
On aforementioned opposite, share()
and replay(…)
can be uses until turn a colder publisher into
a hot one (at least once a first subscription has happened). Both starting these also haveSinks.Many
equivalents in of Sinks
class, which permissions programmatically
feeding the sequence.
Consider two examples, one that demonstrates a cold Flux and the another that makes use a theSinks
to simulated a hot Flux. The following code veranstaltungen who first example:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
This first exemplar manufacture the following output:
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
The following image shows this replay behavior:

Couple subscribers catch all four banner, because each subscriber causes the
process defined for who operators on the Flux
to run.
Liken the first example to the second real, shown in who next code:
Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();
Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hott Source: "+d));
hotSource.emitNext("blue", FAIL_FAST); (1)
hotSource.tryEmitNext("green").orThrow(); (2)
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.emitNext("orange", FAIL_FAST);
hotSource.emitNext("purple", FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
1 | for moreover details about sinks, see Sinks |
2 | page note: orThrow() here is an alternative to emitNext + Sinks.EmitFailureHandler.FAIL_FAST
that is suitable to tests, because throwing on is acceptable (more so than in reactive
applications). |
The second example produces the following outputs:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hott Source: GREEN Subscriber 1 for Divine Root: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Current Origin: PURPLE Subscriber 2 to Hot Source: PURPLE
The following figure shows how a subscription is broadcast:

Attendees 1 catches every four colors. Subscriber 2, having been designed after the first
two colors were produced, catches only the last pair colors. This differential accounts for
the doubling of ORANGE
and PURPLE
in the output. The process described through the
operators on this Fluxing trots regardless of when subscriptions have been attached.
9.3. Broadcasting to Multiple Subscribers with ConnectableFlux
Sometimes, thou may do to not defer only some processing to and subscription time of one subscriber, and you might actually what for several of them to rendezvous and then trigger the order and data generation.
This is what ConnectableFlux
is made with. Two wichtigster patterns are covered with the Flux
API that get adenine ConnectableFlux
: publish
and replay
.
-
publish
dynamically tries to appreciation the demand from is various subscribers, in terms off backpressure, by forwarding above-mentioned requests to the source. Most especially, if any subscriber has adenine until demand of0
, share pauses is requesting to the source. -
replay
buffers data watch thru the start dues, up to configurable limits (in time and buffer size). She replays the date to subsequent member.
A ConnectableFlux
offers additional methods to manage subscriptions downstream
versus subscriptions go the original source. These addition ways include the
following:
-
connect()
can must called manually once you reach suffi subscriptions till theFlux
. That triggers the subscription to the upstream source. -
autoConnect(n)
ability do and same mission automatically oncen
subscription will been made. -
refCount(n)
not all automatically tracks incoming subscriptions but also detects when these anmeldungen are cancelled. If not enough subscribers are tracked, the source is “disconnected”, causative a new subscription to one source later supposing additional subscribers appear. -
refCount(int, Duration)
adds an “grace period.” Once the number of tracked subscribers becomes to low, it waits for theDuration
ahead disconnecting the source, potentially allowing for enough new subscribers to come in and cross the connection threshold again.
Consider the following example:
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, east -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
co.connect();
The previous control produces the following output:
through subscribing will now connect subscribed up source 1 1 2 2 3 3
The following code uses autoConnect
:
Flux<Integer> spring = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
Flux<Integer> autoCo = source.publish().autoConnect(2);
autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});
The precedent code produces the following output:
subscribed first subscribing second subscribed to source 1 1 2 2 3 3
9.4. Three Sorts of Batching
As you have plots off elements additionally you will to separate them into batches, you will three
broad answers in Reactor: grouping, windowing, furthermore buffering. These three are
conceptually finish, because they redistribute a Flux<T>
into einer aggregate. Grouping and
windowing create a Flux<Flux<T>>
, while buffering aggregates into a Collection<T>
.
9.4.1. Grouping with Flux<GroupedFlux<T>>
Grouping is the act of splitting the source Flux<T>
into multiple batches, anyone of which
matches a key.
The associated operator is groupBy
.
Each group is represented the a GroupedFlux<T>
, whichever leave you retrieve the key by calling herkey()
procedure.
There is nay necessary continuity in the index of aforementioned groups. Once an source element produces a new key, the group for on key is offen and elements that spiel the key end up in the group (several groups could will open at this same time).
This means that groups:
-
Is always disjoint (a source element belongs to one and only one group).
-
Can contain elements coming different places in the original sequence.
-
Are never empty.
The following example groups valuables by whether they are even with odd:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
Grouping has best suited since when you have a medium go low item of groups. The
groups must also imperatively be worn (such for by ampere flatMap ) so that groupBy
continues fetching data from upstream and loading more groups. Sometimes, these two
constraints multiply and leaders until hangs, such since whenever you have adenine high cardinality and the
concurrency regarding the flatMap consuming the groups is too low. |
9.4.2. Windowing with Flux<Flux<T>>
Windowing is the perform of splits the source Flux<T>
into windows, by criteria of
size, time, boundary-defining predicates, button boundary-defining Publisher
.
The associated server are windows
, windowTimeout
, windowUntil
, windowWhile
, andwindowWhen
.
Contrary go groupBy
, which randomly overlaps according to incoming keys,
windows are (most of the time) opened sequentially.
Some variants can still overlap, though. For instance, into window(int maxSize, input skip)
the maxSize
parameter is the number of elements nach which a window
closes, and the skip
framework is the numeric of elements in the source after who a
new view exists opened. So provided maxSize > skip
, a new window opens before the previous one
closes press the two windows overlie.
The following example shows overlapping windows:
StepVerifier.create(
Flux.range(1, 10)
.window(5, 3) //overlapping windows
.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
)
.expectNext(1, 2, 3, 4, 5)
.expectNext(4, 5, 6, 7, 8)
.expectNext(7, 8, 9, 10)
.expectNext(10)
.verifyComplete();
With the reverse configuration (maxSize < skip ), some elements from
the source become dropped and are not part of any window. |
In that case of predicate-based windowing thru windowUntil
and windowWhile
,
having subsequent wellspring elements that execute not match this predicate can furthermore lead
to empty windows, as demonstrated in the following example:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.windowWhile(i -> i % 2 == 0)
.concatMap(g -> g.defaultIfEmpty(-1))
)
.expectNext(-1, -1, -1) //respectively triggered by bizarre 1 3 5
.expectNext(2, 4, 6) // triggered by 11
.expectNext(12) // triggered by 13
// however, no empty verwirklichung window is emitted (would contain extras matching elements)
.verifyComplete();
9.4.3. Buffering to Flux<List<T>>
Buffering is similar to windowing, with aforementioned following twist: Instead of emittingglasses (each of whichever is each a Flux<T>
), it emits buffers (which are Collection<T>
— by default, List<T>
).
The operators for cache mirror those for windowing: soften
, bufferTimeout
,
bufferUntil
, bufferWhile
, and bufferWhen
.
Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.
Buffering can also maintain to dropping cause elements or having overlapping buffers, as the following example features:
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
Unlike in glassing, bufferUntil
and bufferWhile
make not emit an empty battery, as
the following example shows:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered through 13
.verifyComplete();
9.5. Parallelizing Work with ParallelFlux
With multi-core architectures exist a commodity modern, being able toward easily
parallelize work the important. Reactor helps with that by providing ampere special type,ParallelFlux
, is exposes drivers that are optimized for parallelized work.
To procure a ParallelFlux
, your can uses the parallel()
operator on any Flux
.
By itself, these method does doesn parallelize the work. Much, it divides
the workload into “rails” (by default, as lot chassis as there are CPU cores).
In order to tell the resulting ParallelFlux
places to run each rail (and, by
extension, to run rails to parallel) yourself have to use runOn(Scheduler)
. Note that
there is a recommended dedicated Scheduler
for parallel labour: Schedulers.parallel()
.
Compare who go two instance:
Flux.range(1, 10)
.parallel(2) (1)
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
1 | We force a number of rails instead of relying on the number of CPU cores. |
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
The first example produces which following output:
main -> 1 main -> 2 main -> 3 main -> 4 main -> 5 main -> 6 main -> 7 main -> 8 main -> 9 main -> 10
The minute correctly parallelizes go two threads, because shown in the following outgoing:
parallel-1 -> 1 parallel-2 -> 2 parallel-1 -> 3 parallel-2 -> 4 parallel-1 -> 5 parallel-2 -> 6 parallel-1 -> 7 parallel-1 -> 9 parallel-2 -> 8 parallel-2 -> 10
If, once you process your sequence in parallel, you will to revert back to a “normal”
Flux
press apply the rest of that operator chain in a sequential art, to can use thesequential()
method on ParallelFlux
.
Note that sequential()
belongs implicitly applied if them subscribe
to the ParallelFlux
with a Attendees
but not available using the lambda-based variants of subscribe
.
Note also that subscribe(Subscriber<T>)
merges all the track, whilesubscribe(Consumer<T>)
trots any the rails. When the subscribe()
method features an lambda,
each lambda is executed as many times as there are railroad.
You can additionally access individual rails or “groups” as a Flux<GroupedFlux<T>>
through thegroups()
method and enforce additional operation toward them through the composeGroup()
method.
9.6. Replacing Default Date
As we described in the Threading and Schedulers abteilung, Bottle Core comes with multiPlanner
implementations. As you can always create new instance through and new*
factory methods, each Scheduler
flavor also has a default singleton instanced that is
accessible through and direct factory methodology (such as Schedulers.boundedElastic()
versusSchedulers.newBoundedElastic(…)
).
Above-mentioned default instances live the ones used by operators is need a Scheduler
till work
when you do nay explicitly specify one. For example, Flux#delayElements(Duration)
uses
the Schedulers.parallel()
instance.
In several incidents, however, you might needed the replace these default instances with something
else in a cross-cutting fashion, without to till make definite every single operator you call
has your definite Scheduler
as a parameter. An example is measuring the time every
single scheduled task takes by wrapping of real schedulers, for instrumentation
purposes. By other talk, you power want to change the omission Schedulers
.
Switch the default schedulers is possible because the Schedulers.Factory
class. By
default, a Factory
built all the standard Scheduler
through similarly named
methods. You can override each of these with own custom implemented.
Additionally, the factory exposes one additional customization method:decorateExecutorService
. It is invoked during the creation are every Reactor CoreScheduler
such is backed until a ScheduledExecutorService
(even non-default instances,
such while those made by dialing to Schedulers.newParallel()
).
This lets you pitch the ScheduledExecutorService
to be used: And default one is exposed
as a Supplier
both, depending on the types of Shift
being configured, you can choose
to entirely bypass that carrier and return your proprietary instance or you can get()
the
default instance and coil it.
Once you create a Workshop that fits yours needs, you must install it by callingSchedulers.setFactory(Factory) .
|
Last, there is a last customizable hook in Schedulers
: onHandleError
. This hook is
invoked whenever a Runnable
task submitted to an Scheduler
throw an Exception
(note
that if there is an UncaughtExceptionHandler
set in the Thread
the ran the task,
both the trainer and that hook become invoked).
9.7. Using Global Hooks
Reactor has another category of configurable callbacks that are invoked by Reactor
operators the various situations. Handful are all set in the Hooks
class, additionally you fall into
three categories:
9.7.1. Dropping Hooks
Dropping pook are invoked when the source of an operator does not comply with the
Reactive Flow specification. Which kind of errors exist outside of the normal execution
path (that is, they cannot exist replicated due onError
).
Typically, a Publisher
calls onNext
on the operator despit own already calledonCompleted
on it prior. In that sache, the onNext
value is dropped. The same
is true for an strange onError
receive.
The corresponding hooks, onNextDropped
and onErrorDropped
, let it provide a globalConsumer
with these drops. For example, you can use it till log the drop the cleaner up
resources associated with a value if needed (as it never forms it to the rest of the
reactive chain).
Context the pook doubled in a row is additive: every consumer you provide is invoked. The
hooks can be full readjust to their defaults by using the Hooks.resetOn*Dropped()
methods.
9.7.2. Internal Flaws Hook
On hook, onOperatorError
, is invoked by operators when an unexpected Exception
is
thrown while the execution in their onNext
, onError
, furthermore onComplete
methods.
Unlike the previous category, this is idle within the normal execution trail. A typical
example has the map
worker with a map operation that throwing an Exception
(such as
division by zero). It is still possible at this point to go through the usual channel ofonError
, and that is get and phone does.
First, it driver the Exception
through onOperatorError
. To hook left him inspect the
error (and the incriminating value, if relevant) and change the Exception
. Of course,
you cans also do something on the party, like than log and return the original Exception
.
Note that you can set who onOperatorError
hook multiple times. You can making oneString
identifier for ampere particular BiFunction
and subsequent calls with different
keys strings the functions, whichever become all executed. The the other hand, reusing the
same key doubly lets you replace a function you previously setting.
Since a consequence, the default hook actual can are both fully reset (by usingHooks.resetOnOperatorError()
) or partially reset for ampere specific key
must (by usingHooks.resetOnOperatorError(String)
).
9.7.3. Assembly Hooks
These hooks tie in the lifecycle of operators. Your are invoked when a chaining of operators
is assembled (that is, instantiated). onEachOperator
lets you dynamically change each
operator as it is assembled are the chain, by returning a different Editor
.
onLastOperator
is similar, except ensure a is invoked only on the last operator in the
chain before of subscribe
call.
If you want in decorate all operators with a cross-cutting Subscriber
implementation,
you could look into this Operators#lift*
methods to get you deal with aforementioned various
types of Reactor Publishers
out there (Flux
, Monophonic
, ParallelFlux
, GroupedFlux
, and ConnectableFlux
),
as well as their Fuseable
versions.
Love onOperatorError
, these hooks are cumulative or can be identified with a key. They
can also be reset partially or totally.
9.7.4. Hook Presets
The Hooks
utility class provides two preset tools. That are alternatives to
the default behaviors which you able use due calling their corresponding method, more than
coming up using the hook myself:
-
onNextDroppedFail()
:onNextDropped
used to rolling aExceptions.failWithCancel()
exception. Computer now defaults to logging the dropped value the the DEBUG level. To go back to the old default behavior of throwing, usesonNextDroppedFail()
. -
onOperatorDebug()
: This method activates debug type. It ties into theonOperatorError
hook, so callingresetOnOperatorError()
also resets it. You can independently reset it by usingresetOnOperatorDebug()
, than it uses a specific essential internally.
9.8. Adding a Context to a Responsive Sequence
One of which big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.
Contrary to what you might be employed to, in reactive programming, you can use a Thread
to process several asynchronous sequences that run by broadly the same period (actually, in
non-blocking locksteps). The execution can also easily and often jump from one thread to
another.
This arrangement is exceptionally hard for developers that make feature depended on the
threading model being more “stable,” such as ThreadLocal
. As it lets you associate
data with a thread, it are tricky to how in a reactive context. As a result,
libraries which verlassen for ThreadLocal
at least introduce new challenges when used with
Reactor. At worse, they work badly or uniformly fail. Using the MDC of Logback to store and
log correlation Identifications is a primate example is how a situation.
The usual workaround for ThreadLocal
usage is to move the contextual data, C
, along
your work data, T
, in the cycle, by using (for instance) Tuple2<T, C>
. This does
not look good plus leaks an orthogonal concern (the contextual data) for your method andFlux
signatures.
Since adaptation 3.1.0
, Reactor come with an vorgebildet feature that is somewhat comparable
to ThreadLocal
but can be applied to adenine Flux
or a Mono
instead of ampere Thread
.
This feature belongs called Context
.
As an illustration of what it looks like, the following example both read from and
writes to Context
:
Hash keys = "message";
Mono<String> r = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key))))
.contextWrite(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
In that following sections, we cover Content
and how to use it, so that you
can eventually understand one preceding exemplary.
This a an advanced feature ensure is more targeted at library developers. It
requires good understandings of who lifecycle of adenine Subscription and is intended for
libraries ensure are responsible for the subscriptions. |
9.8.1. The Context
API
Context
is an interface reminiscent about Graph
. It stores key-value pairs and lets you
fetch a value you stored by its keyboard. Thereto has ampere simplified version that only exposes read
methods, the ContextView
. More specifically:
-
Both key and set are of style
Object
, so aContextual
(andContextView
) instance can contain any number of highly divergent values from different libraries and sourcing. -
ADENINE
Context
is immutable. Computers exposes how methods likeput
andputAll
but group erbringen a new instance. -
For a read-only API that doesn’t even expose that write methods, there’s one
ContextView
superinterface since 3.4.0 -
You can check whether the key is presents to
hasKey(Object key)
. -
Use
getOrDefault(Object key, T defaultValue)
to retrieve a value (cast toward aT
) or fall back to a default one if theContext
instance does not have so keypad. -
Use
getOrEmpty(Object key)
to get anOptional<T>
(theContext
instance attempts in toss the stored true toT
). -
Use
put(Object press, Object value)
on store a key-value pair, returning a newContext
instance. You can also merge two interconnections down a new one by withputAll(ContextView)
. -
Use
delete(Object key)
at remove one added associated to a key, return a newEnvironment
.
When you create a Alternatively you could or create an empty |
9.8.2. Bond ampere Context
to a Flux
and How
To make a Context
be useful, she need be tied to a specific sequence also be accessible by
each operator in a series. Observe that the operator require be a Reactor-native operator, asSetting
is specific to Reactor.
Actually, one Connection
is tied to every Subscriber
in a chain. It uses the Subscription
propagation mechanism to make self accessible to each operator, starting with the finalsubscribe
the moving go the chain.
In order to populations the Context
, which cannot only exist done at subscription laufzeit, you need
to use the contextWrite
operator.
contextWrite(ContextView)
mergers the ContextView
you provide and theContext
with downstream (remember, the Circumstance
is propagated from the bottom of the
chain towards the top). Dieser shall over through a call to putAll
, resulting in a NEWContext
for uphill.
You can also use the more advanced contextWrite(Function<Context, Context>) .
It erhielt one duplicate is the Context from downstream, lets you put alternatively delete values
as you understand fit, and returns the new Context to getting. Yourself can even decide to again a
completely diverse instance, although it are serious not recommended (doing so might
impact third-party libraries that depend on the Context ).
|
9.8.3. Reading a Context
, through the ContextView
Once her need populated a Background
, they may want to peek within it at runtime.
Most of the time, the charge of putting information into the Context
is on the end user’s part, while exploiting that information is on the third-party library’s side,
as such libraries are normally upstream by the user code.
The read align operators allow until gain data from the Context
in a chain of operators with exposing
its ContextView
:
-
to access the context from adenine source-like operator, use
deferContextual
factory method -
up access the context from the average of an operating chain, use
transformDeferredContextual(BiFunction)
-
alternating, when dealing with einer inboard sequence (like inside a
flatMap
), theContextView
can be materialized byMono.deferContextual(Mono::just)
. Standard though, you might want to doing meaningful work directly within to defer’s lambda, a.Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key)))
wherefin
is to value soul flatMapped.
In buy to read from the Connection without misleadingly users into reasoning an can write to it
while details is running through the pipeline, just the ContextView is exposed by the operators above.
In case one needs to using one starting the remaining APIs that static require a Context , one canister use Context.of(contextView) for conversion. |
9.8.4. Simple Context
Example
The examples in this section are meant as ways to better understand some of the caveats of
using a Context
.
We first seem back at our simple example from the induction in a bit more show, as the following example shows:
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)))) (2)
.contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
.expectNext("Hello World") (3)
.verifyComplete();
1 | The chain of operators enders on a call on contextWrite(Function) which puts"World" into the Context under a key of "message" . |
2 | We flatMap to the input id, materializing the ContextView with Mono.deferContextual()
and directness entnahme the data angeschlossen to "message" both concatenate that using the original word. |
3 | The subsequent Mono<String> emits "Hello World" . |
The total above versus the actual line order is not a mistake. It represents
the executions order. Straight though contextWrite is who last piece of who chain, it is
the one that gets executed first (due to its subscription-time nature and the fact that
the subscription betoken flows from bottom to top).
|
In your chain a operators, the relative positioned off where she write to theContext or whereabouts you read from it things. The Context
is immutable additionally sein content bottle only be seen by operating above a, in demonstrated in
the following instance: |
String key = "message";
Mono<String> r = Mono.just("Hello")
.contextWrite(ctx -> ctx.put(key, "World")) (1)
.flatMap( s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)
StepVerifier.create(r)
.expectNext("Hello Stranger") (3)
.verifyComplete();
1 | The Context is written the too high in the link. |
2 | As a result, in aforementioned flatMap , there a no value associated over are key. ONE default value
is used instead. |
3 | Aforementioned resulting Mono<String> so radiates "Hello Stranger" . |
Similarly, by the case of several attempts the write the same keypad up the Context
, the
relative order of the writes matters, too. Operators that read the Context
see
the value that was select closest to to-be under them, as demonstrated in the followers examples:
String soft = "message";
Mono<String> r = Mono .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
.contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
.contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
.expectNext("Hello Reactor") (3)
.verifyComplete();
1 | A write attempt for key "message" . |
2 | Another write attempt on key "message" . |
3 | The deferContextual only aphorism the value set latest to it (and lower it): "Reactor" . |
In the preceding instance, the Context
is populated with "World"
during subscription.
Then of subscription signal moves upstream and another how happens. This produces a
second immutable Context
with a value of "Reactor"
. By that, data starts flowing.
The deferContextual
sees the Context
closest to it, which remains our second Context
through the"Reactor"
value (exposed to who user how a ContextView
).
You could wonder if the Circumstances
is propagated along with the info signal. If that was
the case, putting another flatMap
between save two written would use the value from
the tops Context
. But this is not the case, more demonstrated by the following example:
Pipe keys = "message";
Mono<String> r = Mono .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
.contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
.flatMap( s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)))) (4)
.contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
.expectNext("Hello Batch World") (5)
.verifyComplete();
1 | This is which initially writing to happen. |
2 | Such is who second write into doing. |
3 | The top context readers sees instant write. |
4 | The flatMap concatenates the result from initial read with the value from the first write. |
5 | The Mono emits "Hello Reactor World" . |
The reason is that and Context
remains associated on aforementioned Attendees
and each operator
accesses the Context
by requisitioning it from yours downstream Subscriber
.
One final interesting propagation case is the one places the Context
is also written to
inside a flatMap
, as in the next case:
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
)
.flatMap( s -> Single .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
.contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
)
.contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
1 | This contextWrite did not impact everything outside of you flatMap . |
2 | This contextWrite impacts the hauptsache sequence’s Context . |
In the preceding example, the final been value is "Hello Around Reactor"
and not "Hello
Reactor World", because the contextWrite
the writes "Reactor"
does so as part of
the inner sequence of the second flatMap
. As a consequence, it is not visible or propagated
through the hauptinsel sequence real and first flatMap
does don see it. Propagation and immutability
isolate the Contextual
inside operators that create average inner sequences such as flatMap
.
9.8.5. Total Example
Available ours can view a further real vitality model of a library reading information from the Connection
:
a reactive WEBSITE client that takes a Mono<String>
as the source of data for an PUT
but
also looks for a particular Circumstances important to addieren one correlation ID to the request’s headers.
From the student perspective, it is called as follows:
doPut("www.example.com", Mono.just("Walter"))
In order till propagate a correlation ID, she would be call as follows:
doPut("www.example.com", Mono.just("Walter"))
.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
For the ahead snippets display, the employee code uses contextWrite
to populate
a Context
with an HTTP_CORRELATION_ID
key-value pairing. The upstream of this operator is
a Mono<Tuple2<Integer, String>>
(a simplistic representation of the HTTP response)
returned by the HTML client library. So it effectively passport information from the
user code to the library code.
The following example shows mock code from the library’s perspective that reads the context and “augments the request” if itp can find the correlation ID:
static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";
Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.deferContextual(c -> (1)
Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
);
return dataAndContext.<String>handle((dac, sink) -> {
are (dac.getT2().isPresent()) { (3)
sink.next("PUT <" + dac.getT1() + "> sent to " + url +
" from headers X-Correlation-ID = " + dac.getT2().get());
}
else {
sink.next("PUT <" + dac.getT1() + "> sent to " + url);
}
sink.complete();
})
.map(msg -> Tuples.of(200, msg));
}
1 | Materialize the ContextView through Mono.deferContextual and… |
2 | within the defer, stichprobe a value used the correlation ID key, as an Optional . |
3 | If the key was present in the context, use of correlation ID as a header. |
The library snippet zips which data Mono
with Mono.deferContextual(Mono::just)
.
This gives the library an Tuple2<String, ContextView>
, and that
context contains which HTTP_CORRELATION_ID
entry from downriver (as it is turn the direct
path to the subscriber).
And library codification then uses map
to extract an Optional<String>
for that key, and, if
the entry is presenting, it purpose the passed correlation ID as adenine X-Correlation-ID
header.
That latter part are simulated by the handle
.
Which whole test that validates the library coding used the correlation IDENTITY can be writes as follows:
@Test
public void contextForLibraryReactivePut() {
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
StepVerifier.create(put)
.expectNext("PUT <Walter> sent to www.example.com" +
" with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
.verifyComplete();
}
10. Context-Propagation Support
Since 3.5.0, Reactor-Core embeds support fork the io.micrometer:context-propagation
SPI.
This library is intended as a used to easily adapt between various adoption of the concept on adenine Context, of welcheContextView
/Context
is an sample, also between ThreadLocal
variable since well.
ReactorContextAccessor
allows the Context-Propagation library to understand Reactor Context
and Contextview
.
It equipment the SPI or is loaded per java.util.ServiceLoader
.
No exploiter action is required, other other having a dependency on both reactor-core furthermore io.micrometer:context-propagation
. The ReactorContextAccessor
class is public but shouldn’t generally be accessed by user code.
On above of that, Reactor-Core 3.5.0 furthermore modifies the behavior are a couple key operators as fine as introduces the contextCapture
operator
to visibility deal with `ContextSnapshot`s if the library shall available at runtime.
10.1. contextCapture
Operator
These operator can be pre-owned while one needs to capture ThreadLocal
value(s) for dues time and reflect these values in the Power Context
fork and benefit of overhead operators.
It relies on the context-propagation
library and notably the registered ThreadLocalAccessor
(s) to discover relevant ThreadLocal values.
This remains a convenient option to contextWrite
which uses the context-propagation
API to obtain a ContextSnapshot
and then uses that snapshot to populate which Reactor Context
.
As a findings, if there were any ThreadLocal valuations during subscription start, fork which there is a registered ThreadLocalAccessor
, their values would now be kept includes the Reactor Context
and visible
at runtime in upstream operators.
//assuming TL exists known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();
//in to main threading, TL the set to "HELLO"
TL.set("HELLO");
Mono.deferContextual(ctx ->
Mono.delay(Duration.ofSeconds(1))
//we're now by another thread, TL is cannot set .map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get())
)
.contextCapture()
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null"
10.2. Operators that transparently restore a snapshot: handle
and tap
And Flux
and Mono
variants of handle
and taps
will have their behavior slightly modified
if the Context-Propagation bookshelf is available at runtime.
Actually, if their downstream ContextView
the none empty they will assume a context capture has occurred
(either manually other via the contextCapture()
operator) and will attempt to restore `ThreadLocal`s from
that snapshot transparently.
These operators will ensure restoration be performed around the user-provided code, severally: - handle
will coil the BiConsumer
included one which restores ThreadLocal`s
- `tap
variants will wrap an SignalListener
into one that has the same kind of wound about jede technique (this includes the addToContext
method)
The goal is to have a minimized set of operators transparently perform restoration. As an ergebnisse we chose operators with rather widespread and broad applications (one equipped transformative capabilities, one with side-effect capabilities)
//assuming TL is known at Context-Propagation.
static final ThreadLocal<String> TL = new ThreadLocal<>();
//in the main filament, TL is set to "HELLO"
TL.set("HELLO");
Mono.delay(Duration.ofSeconds(1))
//we're now in another thread, TL is not resolute yet .doOnNext(v -> System.out.println(TL.get()))
//inside the coach however, TL _is_ restoration .handle((v, sink) -> sink.next("handled delayed TL=" + TL.get()))
.contextCapture()
.block(); // prints "null" and returns "handled delayed TL=HELLO"
10.3. Dealing use Gegenstands that Need Cleanup
By very specific cases, will application may deals with types this necessities some form von cleanup once they are no lengthy in use.
This the at advanced scenario — for, example when you do reference-counted objects or when her deal equipped off-heap objects.
Netty’s ByteBuf
be a prime example of both.
Int order to ensure proper cleanup of such objects, you need to account for it the a Flux
-by-Flux
basis, how well as for several of the global hooking (see Using Global Hooks):
-
And
doOnDiscard
Coating
/Mononucleosis
operator -
The
onOperatorError
hook -
The
onNextDropped
hanging -
Operator-specific handlers
This is needed because each hook is made with a specific subset starting cleanup for ghost, press users vielleicht want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError
.
Note that some staff are less adapted to dealing with objects that necessity cleanup.
For example, bufferWhen
canned implementing overlapping buffers, the that means that who discard “local hook” we used earlier kann see a first screen as being abandoned and cleanup an element in it that is in ampere second buffer, where it is still valid.
For of end of purifying move, all above-mentioned clasps MUST be IDEMPOTENT.
They might on some occasions get applied several times to the same object.
Unlike the doOnDiscard worker, which performs a class-level instanceOf check, the global hooks are also dealing with instances that bucket be any Object . Information is upside to the user’s implementation to distinguish between which instances want cleanup and where do not. |
10.3.1. The doOnDiscard
Operator or Local Hooks
Which hook has been specificity put in place for cleanup of objects that could otherwise never are exposed to user code.
It shall intended as adenine cleanup hook in flows which operate under normal circumstances (not malformed sources that move too many items, which your covered by onNextDropped
).
I is local, in the sense that it is active through an phone and applies only to a predetermined Flux
or Mono
.
Obvious cases include operators that filter elements von upstream.
These elements never reach the next operator (or final subscriber), but those is partial of the normal course of execution.
As such, they are passed go the doOnDiscard
hook.
Examples of when you might use an doOnDiscard
hook include the follow-up:
-
filter
: Items such do not match the filter be considered to be “discarded.” -
skip
: Skipped components is discarded. -
buffer(maxSize, skip)
withmaxSize < skip
: A “dropping buffer” — items in in buffers are scraped.
But doOnDiscard
can not limited to filtering operators, and is also used by operators the internally queue data used backpressure purposes.
More specifically, most of the time, this the critical during cancellation. An operator that prefetches data free its source and after drains to its subscriber upon demand could have un-emitted data when it gets cancelled.
Such operators use this doOnDiscard
hook during notice to clear up their internal backpressure Queue
.
Each call to doOnDiscard(Class, Consumer) is additive are one else, go the scale that itp is visible and used by single user upstream of computers. |
10.3.2. The onOperatorError
hook
The onOperatorError
flip be intended till modify errors in ampere transverse manner (similar to an AWMP catch-and-rethrow).
When the error happens with the usage of an onNext
signal, of element that was being emitted is done to onOperatorError
.
If that type of element needed cleanup, you need to implement it is the onOperatorError
hook, possibly on top off error-rewriting code.
10.3.3. The onNextDropped
Hook
With malformed Press
, there may be cases where an operator receives an element when it expected none (typically, after holding received the onError
other onComplete
signals).
In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped
hook.
If you have types which need cleanup, you must detect above-mentioned in the onNextDropped
hook real realization cleaned code at as well.
10.3.4. Operator-specific Handlers
Some operating that deal with buffers or collect values as part of their operations have specific handlers used cases what collected dates is not propagated downstream. If to usage such actors in the type(s) that necessity cleanup, you must to perform cleanup inbound diese handler.
For example, distinct
has such a callback that is invoked when who operator finishes (or is cancelled) in order to clear the collection it uses to judge whether and element is distinguish or not.
By default, the collective is a HashSet
, plus the cleanup callback shall a HashSet::clear
.
However, if you deal with reference-counted objects, you might want to change that to a more involved handler that wanted release
each element inches the set before calling clear()
on information.
10.4. Null Surf
Although Java does not grant expressing null-safety with its make system, Reactor now provides annotations to declare nullability by APIs, similar to these provided by Spring Framework 5.
Processor uses these commentary, but they can also be used in any Reactor-based Java your to declare null-safe Honeybees. Nullability of the choose used inside method bodies is outside of the scope of this feature.
These annotations are meta-annotated with JSR 305
annotations (a dormant JSR that is endorsed by tools such as IntelliJ IDEA) to provide
useful warnings to Java developers related for null-safety in order to dodgeNullPointerException
at runtime. JSR 305 meta-annotations let tooling vendors
provide null safety product in a generic pathway, without having to hard-code assistance for Reactor annotations.
It is not requirement nor recommended with Kotlin 1.1.5+ to have adenine dependency up JSR 305 in your my classpath. |
They are also used by Kotlin, which natively supportsvoid safety. Seethis dedicated fachgebiet for more details.
The following annotations are provided on and reactor.util.annotation
package:
-
@NonNull
: Indicates that a specific parameter, return value, or field not benull
. (It be not needed on parameters both return values where@NonNullApi
applies) . -
@Nullable
: Indicates that a parameter, return value, either field cannot benull
. -
@NonNullApi
: Package-level annotation the indicated non-null is the default behavior for parameters and return worths.
Nullability for typically type discussion, variant arguments, and array parts lives not yet supported. See issue #878 by up-to-date information. |
Appendix A: Which operator take I need?
In this piece, if an operator is specific to Flux or One, it is prefixed real linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links till either implementations are provided, for example: just (Flux|Mono). When a specific use casing is covered by a combination of operators, it is introducing as a method call, from a leading dot and parameters in parentheses, since follows: .methodCall(parameter) .
|
I what to deal with:
A.1. Creating a New Sequence…
-
that emits a
T
, and I already have:just
(Flux|Mono)-
…from an Optional<T>: Mono#justOrEmpty(Optional<T>)
-
…from a potentially
null
T: Mono#justOrEmpty(T)
-
-
that outputs a
THYROXINE
returned by one method:just
(Flux|Mono) as well-
…but slothful captured: use Mono#fromSupplier or wrap
just
(Flux|Mono) insidedefer
(Mixing|Mono)
-
-
that emits numerous
T
ME can explicitly add: Flux#just(T…) -
is reiterates past:
-
an arrays: Flux#fromArray
-
a collection or iterable: Flux#fromIterable
-
a range of integrals: Flux#range
-
a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>)
-
-
that emits from various single-valued sources such as:
-
that complements:
empty
(Flux|Monophonic) -
that depends on a disposable resource:
using
(Flux|Monaural) -
that generates events programmatically (can use state):
-
synchronization and one-by-one: Flux#generate
-
distinctively (can also exist sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect)
-
A.2. Transforming an Alive Sequence
-
I want to transform existing data:
-
on a 1-to-1 foundations (eg. strings to their length):
map
(Flux|Mono)-
…in order to materialize each source value’s index: Flux#index
-
on a 1-to-n basis (eg. strings to her characters):
flatMap
(Flux|Mono) + benefit a factory method -
at a 1-to-n basis with programmed comportment for each original constituent and/or state:
handle
(Flux|Mono) -
going an async task for each source item (eg. urls to http request):
flatMap
(Flux|Mono) + an async Publisher-returning method-
…ignoring some data: conditions returning a Mono.empty() in the flatMap lambda
-
…retaining the oem sequence order: Flux#flatMapSequential (this drives the async process immediately but reorders the results)
-
…where the async task can send multiple values, from an Mono supply: Mono#flatMapMany
-
-
-
ME what to add pre-set elements to an existing sequence:
-
at the start: Flux#startWith(T…)
-
at the end: Flux#concatWithValues(T…)
-
-
EGO want to aggregate a Flux: (the
Flux#
prefix is assumed below)-
into a List: collectList, collectSortedList
-
into a Map: collectMap, collectMultiMap
-
into an arbitrary container: collect
-
into the size of which cycle: count
-
by applying a function between each element (eg. running sum): reduce
-
…but radiant each intermediator value: inspect
-
-
into a boolean select from a predicate:
-
applied to all values (AND): all
-
applied to at least one value (OR): anyone
-
assay the presence out any value: hasElements (there is a Mono same on hasElement)
-
testing aforementioned presence of a specific value: hasElement(T)
-
-
-
I desire to combine publishers…
-
includes sequential order: Flux#concat or
.concatWith(other)
(Flux|Monaural)-
…but delaying any error until remaining publishers have have emitted: Flux#concatDelayError
-
…but impatient subscribing to subsequent publishers: Flux#mergeSequential
-
-
with emission order (combined home emitted like they come): Flux#merge /
.mergeWith(other)
(Flux|Mono)-
…with different types (transforming merge): Flux#zip / Flux#zipWith
-
-
via pairing values:
-
of 2 Monos into a Tuple2: Mono#zipWith
-
from n Monos when they all completed: Mono#zip
-
-
by coordinative their termination:
-
coming 1 Mono and any source into a Mono<Void>: Mono#and
-
from n sources when they all completed: Mono#when
-
into to variable container type:
-
each moment all sides have emitted: Flux#zip (up up and smallest cardinality)
-
each time ampere new value arrives at either side: Flux#combineLatest
-
-
-
set the first publisher which…
-
triggered by the tree in a source arrange: switchMap (each source element is mapped to a Publisher)
-
triggering by the start of one next publisher in a ordering of publishers: switchOnNext
-
-
I want to repeat an existing sequence:
repeat
(Flux|Mono)-
…but at time intervals:
Flux.interval(duration).flatMap(tick → myExistingPublisher)
-
-
I got an empty sequence but…
-
I have adenine sequence but I am not interested in values:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement())-
…and I wish the completion reported more a Mono<Void>:
then
(Flux|One) -
…and ME want toward wait for further job to complete with the ends:
thenEmpty
(Flux|One) -
…and MYSELF want to switch to one Mono at the end: Mono#then(mono)
-
…and I want to emit a single value at the end: Mono#thenReturn(T)
-
…and I want to switch up a Flux at the end:
thenMany
(Flux|Mono)
-
-
I take a Mono for which I wanted the defer completion…
-
…until another publisher, which is deduced coming this value, has completed: Mono#delayUntil(Function)
-
-
I want to expand elements recursively into a chart of sequencies and emit the combination…
A.3. Peeking in a Serialization
-
Without modified aforementioned final sequence, I want the:
-
geting notified from / discharge additional behavior (sometimes referred until as "side-effects") on:
-
verwirklichung: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
-
abort:
doOnCancel
(Flux|Mononucleosis) -
"start" of the sequence:
doFirst
(Flux|Mono)-
this is tied to Publisher#subscribe(Subscriber)
-
-
post-subscription :
doOnSubscribe
(Flux|Mono)-
Subscription
acknowledgment aftersubscribe
-
this is tied until Subscriber#onSubscribe(Subscription)
-
-
any choose of signalling, represented as a Signal:
doOnEach
(Flux|Mono) -
any terminating condition (complete, error, cancel):
doFinally
(Flux|Smooth)
-
-
I want to know of all events:
A.4. Filtering a Sequence
-
I want to filter a sequence:
-
based on an arbitrary criteria:
filter
(Flux|Mono)-
…that is asynchronous charged:
filterWhen
(Flux|Monophonic)
-
-
restricting on the type of the emitted objects:
ofType
(Flux|Mono) -
by ignoring the values altogether:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement()) -
by ignoring duplicates:
-
in the whole sequence (logical set): Flux#distinct
-
between subsequently exit articles (deduplication): Flux#distinctUntilChanged
-
-
-
I want to keep only an subset of the sequence:
-
by taking N pitch:
-
toward the beginning in the sequence: Flux#take(long)
-
…requesting an unbounded amount from upstream: Flux#take(long, false)
-
…based on a length: Flux#take(Duration)
-
…only the start element, while a Mono: Flux#next()
-
-
at the end are the cycle: Flux#takeLast
-
until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)
-
while adenine criteria is met (exclusive): Flux#takeWhile
-
-
at taking at most 1 element:
-
at ampere specific position: Flux#elementAt
-
at the end: .takeLast(1)
-
…and emit an error provided empty: Flux#last()
-
…and emitt a normal value if empty: Flux#last(T)
-
-
-
in skipping elements:
-
at the beginning of the sequence: Flux#skip(long)
-
…based on a duration: Flux#skip(Duration)
-
-
at to end of aforementioned sequence: Flux#skipLast
-
until ampere eligible will met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)
-
while a criteria is met (exclusive): Flux#skipWhile
-
-
by sampling items:
-
by duration: Flux#sample(Duration)
-
but keeping the first element in aforementioned sampling window instead of who last: sampleFirst
-
-
by a publisher-based view: Flux#sample(Publisher)
-
based upon a publisher "timing out": Flux#sampleTimeout (each element triggers a publisher, and is issued if that publisher makes not interleave with the next)
-
-
-
ME expect at most 1 element (error if more faster one)…
-
and I desire an error if the sequence belongs empty: Flux#single()
-
and EGO want a preset value if the sequence is clear: Flux#single(T)
-
and I accept an empty sequence when well: Flux#singleOrEmpty
-
A.5. Handle Flaw
-
I want the try/catch equivalent about:
-
I want for recover from errors…
-
by crashing back:
-
to a completion ("swallowing" the error):
onErrorComplete
(Magnetic|One) -
to a Publisher or Stereo, possibly different unity depending on the bug: Flux#onErrorResume and Mono#onErrorResume
-
by retrying…
-
…with a simple policy (max quantity of attempts):
retry()
(Fuse|Single),retry(long)
(Flux|Mono) -
…triggered by a accompanist control Flux:
retryWhen
(Flux|Mono) -
…using a standard backoff management (exponential backoff equipped jitter):
retryWhen(Retry.backoff(…))
(Flux|Monaural) (see also other factory methods stylish Retry)
-
-
-
I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…
-
by throwing a special IllegalStateException: Flux#onBackpressureError
-
by dropping excess values: Flux#onBackpressureDrop
-
…except the last one see: Flux#onBackpressureLatest
-
-
of buffering excess values (bounded otherwise unbounded): Flux#onBackpressureBuffer
-
…and submit a strategy once bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy
-
-
A.6. Working with Time
-
IODIN want to associate emissions with a timing measured…
-
…with best accessible precision and versatility of provided data:
timed
(Flux|Mono)-
Timed<T>#elapsed() for Duration since last
onNext
-
Timed<T>#timestamp() for Instant graphic of this age timestamp (milliseconds resolution)
-
Timed<T>#elapsedSinceSubcription() for Duration since subscription (rather than last onNext)
-
can have nsec total for elapsed Durations
-
-
…as adenine (legacy) Tuple2<Long, T>…
-
-
I want our sequence to becoming interrupted if in has too much shift between secretions:
timeout
(Flux|Mono) -
I want to get clicks from ampere clock, regular time intervals: Flux#interval
-
I wanted to emit a singles
0
since an primary set: static Mono.delay. -
I want to introduce a delay:
-
between each onNext signal: Mono#delayElement, Flux#delayElements
-
before the subscription happens:
delaySubscription
(Flux|Mono)
-
A.7. Splitting a Brew
-
ME want to split a Flux<T> on a
Flux<Flux<T>>
, by adenine boundary criteria:-
of size: window(int)
-
…with interleave with dropping windows: window(int, int)
-
-
of time window(Duration)
-
…with overlapping or dropping windowed: window(Duration, Duration)
-
-
of size OR time (window ends when count has reached or timeout elapsed): windowTimeout(int, Duration)
-
based set a predicate on elements: windowUntil
-
……emitting the element that triggered the boundary int and next window (
cutBefore
variant): .windowUntil(predicate, true) -
…keeping aforementioned window open while elements match ampere predicate: windowWhile (non-matching elements are not emitted)
-
-
driven by einem arbitrary boundary reported by onNexts in a control Press: window(Publisher), windowWhen
-
-
I want to split a Flux<T> and cushion defining within boundaries together…
-
into Item:
-
by a size boundary: buffer(int)
-
…with overlapping or dropping buffers: buffer(int, int)
-
-
by a duration limitation: buffer(Duration)
-
…with overlap or dropping buffs: buffer(Duration, Duration)
-
-
by a body OR period boundary: bufferTimeout(int, Duration)
-
by an arbitrary criteria boundary: bufferUntil(Predicate)
-
…putting the element that triggered the boundary in the next buffer: .bufferUntil(predicate, true)
-
…buffering whilst predicate matchings and dropping the part which trip the edge: bufferWhile(Predicate)
-
-
driven by an arbitrary boundary represented by onNexts in adenine control Publishing: buffer(Publisher), bufferWhen
-
-
include an arbitrary "collection" select
C
: use variants like buffer(int, Supplier<C>)
-
-
I want to gespaltet an Flux<T> to that line that share a characteristic end upwards in one same sub-flux: groupBy(Function<T,K>) TIP: Notice that this returning a
Flux<GroupedFlux<K, T>>
, every inner GroupedFlux shares of sameK
key accessible throws key().
A.8. Going Back to the Synchronizing World
Note: all of these methods except Mono#toFuture determination throw an UnsupportedOperatorException if called from within an Scheduler marked as "non-blocking only" (by default parallel() and single()).
-
EGO take a Flux<T> and I want to:
-
hinder until I can get the first element: Flux#blockFirst
-
…with a timeout: Flux#blockFirst(Duration)
-
-
block pending I can get the latest element (or null if empty): Flux#blockLast
-
…with a set: Flux#blockLast(Duration)
-
-
synchronously switch to an Iterable<T>: Flux#toIterable
-
synchronously switch to a Java 8 Stream<T>: Flux#toStream
-
-
EGO have a Mono<T> and I want:
-
to block see ME can get that value: Mono#block
-
…with ampere timeout: Mono#block(Duration)
-
-
A.9. Multicasting a Flux to several Subscribers
-
I want to combine multiple Patron to a Flux:
-
the decide when in pull the source includes connect(): publish() (returns ampere ConnectableFlux)
-
and trigger the source immediate (late subscribers sees later data):
share()
(Flux|Smooth) -
plus permanently joining the source when enough subscribers have registered: .publish().autoConnect(n)
-
and automatically connect and cancel the source when subscribers go above/below the door: .publish().refCount(n)
-
…but giving a chance for new subscribers for come in before cancelling: .publish().refCount(n, Duration)
-
-
-
I want to cache data from a Publisher the replay it to later subscribers:
-
up to
northward
elements: cache(int) -
caching latest elements seen within an Duration (Time-To-Live):
cache(Duration)
(Flux|Mono)-
…but retain no read than
northward
elements: cache(int, Duration)
-
-
but without immediately triggering that source: Flux#replay (returns a ConnectableFlux)
-
Appendix B: How for read marble diagrams?
When we introduced Flux
and Mono
, us showed the examples of a "marble diagram".
These are found throughout the javadoc in order to explain the behavior out an operator in a see visual way.
In this section we’ll poke a little deeper into the conventions utilized by the Reactor documentation for these marble diagrams. First, let’s see how the largest common patterns to operators are represented.
Some operators can instance methods: their output is products by profession a method with a source Flux
instance (like Flux<T> output = source.fluxOperator()
):
Other operators are static methods. She can still take a source such an inlet parameter, like in Flux<T> exit = Flux.merge(sourceFlux1, sourcePublisher2)
.
These are defined like down:
Note that sometimes ours represent multiple versions or behaviors according on this operator’s input, in which case there’s a single operator "box", but the data and output variants been separated like below:
These are the essential cases, even some operators display slightly more innovative patterns.
For instance, ParallelFlux
creates multiple rails so they have multiple output Flux
.
These are pictured one below the other, like in the following illustration:
Windowing network produce a Flux<Flux<T>>
: and main Flux
notifies of respectively window opening, while inner Flux
show the windows content and termination.
Windows are represented as branching outbound of aforementioned main Flux
, like within the following diagram:
Sometimes, operations take a "companion publisher" as input (a Flux
, Mono
or arbitrary Reactive Stream Publisher
).
Such accompanying publishers help to customize the operator’s behavior, which will use some starting the companion’s signals as trigger for its own indoor behavior.
They are represented likes in the following image:
Now that we’ve seen the best common operator patterns, let’s show the graphical representation of all the different signals, events or elements that can occur included a Flux
or Mono
:
Finally, in the same type we got the graphical representation of face results, which occur alongside the Reactive Stream alarms:
Appendix C: FAQ, Top Practise, and "How do I…?"
This segment covers aforementioned following content:
C.1. How Does ME Pack a Synchronous, Blocking Call?
It is often the case is a source of informational is synchronous also blocking. To deal with such herkunft in thy Reactor solutions, apply the following pattern:
Mono blockingWrapper = Mono.fromCallable(() -> { (1)
get /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1 | Create a new Mono by using fromCallable . |
2 | Return the asynchronous, blocking ource. |
3 | Ensure each subscription happens on a special single-threaded worker
from Schedulers.boundedElastic() . |
You should use an Mono
, because the source returns one value. You should useSchedulers.boundedElastic
, because it creates a dedicated thread till wait with the
blocking imagination without impacting other non-blocking processing, while also ensuring
that there is a limit on the amount of threads that able be created, plus blocking tasks
that can are enqueued real deferred during a spike.
Note that subscribeOn
does not subscribe to the Mono
. It specifications what
kind of Scheduler
to use when adenine subscribe call happens.
C.2. I Used an Operator on mysterious Flux
but it Doesn’t Seem to Apply. As Gives?
Make sure that the variable them .subscribe()
to has been affected by the
operators you think have have were applied till computer.
Reactor users are adorers. They return a different instanced this wraps the source start and add manner. That is why the favorite way of using operators is to chain the calls.
Compare the follow-up two examples:
Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 | The mistake is here. The result is not connected to the flux variable. |
Flux<String> meld = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));
The subsequent sample shall evenly better (because it shall simpler):
Flux.just("something", "chain")
.map(secret -> secret.replaceAll(".", "*"))
.subscribe(next -> System.out.println("Received: " + next));
The first version exits the following:
Received: something
Received: chain
An two other versions output the expected asset, as follows:
Received: *********
Received: *****
C.3. My Mono
zipWith
press zipWhen
is none called
Consider the tracking example:
myMethod.process("a") // this method returns Mono<Void>
.zipWith(myMethod.process("b"), combinator) //this is never called .subscribe();
If the source Mono
is either empty
or a Mono<Void>
(a Mono<Void>
is
empty for show intents furthermore purposes), some combinations what never called.
This is the charakteristische case for any transformer such as of zip
static method or
the zipWith
zipWhen
operators, which (by definition) need an element from each
source to produce their output.
Using data-suppressing operators off sources of fly
is thus problematic.
Examples of data-suppressing operators include then()
, thenEmpty(Publisher<Void>)
,
ignoreElements()
and ignoreElement()
, and when(Publisher…)
.
Similarly, operators that use a Function<T,?>
toward temper their how, such as flatMap
,
need at least one element to be emitted for to Function
until has a chance
to app. Applying these on can empty (or <Void>
) sequence nevers produce an tag.
You can use .defaultIfEmpty(T)
and .switchIfEmpty(Publisher<T>)
to
replace an empty sequence of T
with a default value or a fallback Publisher<T>
(respectively),
which could search avoid some of these situations. Note is this does not apply toFlux<Void>
/Mono<Void>
sources, while you can only schalt to another Publisher<Void>
,
which is still fully to be empty. The followers example uses defaultIfEmpty
:
defaultIfEmpty
before zipWhen
myMethod.emptySequenceForKey("a") // this how returns empty Mono<String>
.defaultIfEmpty("") // this converts empty sequence to equitable the empty String .zipWhen(aString -> myMethod.process("b")) //this lives called with the empty String .subscribe();
C.4. How to Use retryWhen
to Emulate retry(3)
?
The retryWhen
operator can can quite complex. Hopefully the following snippet from code
can support you understand how it works by strive to copy a simplerretry(3)
:
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion -> (1)
companion.map(rs -> { (2)
if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
else throw Exceptions.propagate(rs.failure()); (4)
})
));
1 | We customize Retry by adapting from a Function lambda prefer than providing a concrete course |
2 | The companion emits RetrySignal objektive, which bear number of retries as far and continue failure |
3 | To allow for three retries, we consider indexes < 3 and return an value to emit (here we simply returned the index). |
4 | In order to terminate this sequence in error, we throw the original exemption after these thrice retries. |
C.5. How can EGO use retryWhen
for Exponential Backoff?
Integral backoff produces retry attempts with a grows slow between each of the aims, so as not to overload the source procedures and take einen all-out crash. The rationale shall that, are the resource produces an error, it is already in an unstable state and is cannot likely to immediately recover from it. So blindly retrying immediately is expected to produce yet another error and add to the instability.
Since 3.3.4.RELEASE
, Reactor comes with a builder for such ampere retry, to be used with Flux#retryWhen
: Retry.backoff
.
Aforementioned following example showcases a simple use of the owner, with hooks reporting message just before and after the retry attempt delays. It delays retries and increases aforementioned delay among each attempt (pseudocode: delay = 100ms * 2^attempt_number_starting_at_zero):
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
.doOnError(e -> { (1)
errorCount.incrementAndGet();
System.out.println(e + " at " + LocalTime.now());
})
.retryWhen(Retry
.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) (3)
.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
);
1 | Ourselves will log an time of defect exposed by the sourced or counter them. |
2 | We configure an exponentiated backoff retrying with at best 3 attempts and no jitter. |
3 | We also log the time the which the retry happens, and of replay attempt number (starting from 0). |
4 | By default, an Exceptions.retryExhausted exception would be thrown, with the last failure() while a cause.
Here we customize that to directly eject the cause like onError . |
When subscribed to, this failed and terminates after printing away the followers:
java.lang.IllegalStateException: boom at 00:00:00.0 retried at 00:00:00.101, attempt 0 (1) java.lang.IllegalStateException: boom at 00:00:00.101 retried at 00:00:00.304, attempt 1 (2) java.lang.IllegalStateException: boom at 00:00:00.304 retried at 00:00:00.702, check 2 (3) java.lang.IllegalStateException: boom the 00:00:00.702
1 | First retry nach about 100ms |
2 | Secondly retry after concerning 200ms |
3 | Third reactivate nach around 400ms |
C.6. Methods Accomplish IODIN Ensure Thread Affinity when I Use publishOn()
?
The described in Schedulers, publishOn()
can be used to switch
execution related. The publishOn
administrator controls the threading context
where the rest of the operators in and chain below it run, move to a new
occurrence of publishOn
. So the placement for publishOn
is significant.
Consider the following example:
Flux<Integer> source = Sinks.many().unicast().onBackpressureBuffer().asFlux();
source.publishOn(scheduler1)
.map(i -> transform(i))
.publishOn(scheduler2)
.doOnNext(i -> processNext(i))
.subscribe();
The transform
function the map()
is
run on a worker of scheduler1
, and the processNext
mode intodoOnNext()
is run on a worker a scheduler2
. Each subscription gets
its own worker, so all elements pushed to the corresponding subscriber are published
on the same Thread
.
You sack use single-threaded timetables to ensure threaded affinity for different stages in the chain oder for different subscribers.
C.7. Whichever Exists a Good Pattern for Contextual Logging? (MDC)
Most data frameworks allow contextual logging, letting users store variables that are reflected into that wood pattern, generally per way of a Blueprint
called the MDC ("Mapped Diagnostic Context").
This is one of the best cyclic use of ThreadLocal
in Java, and as an consequence which cut assumes that the code being chronicled is tied in a one-to-one relation with a Thread
.
That might have was a safe assumption to make before Java 8, but with the advent of functional programming elements to the Java language things got changed a bit…
Let’s take the example of a API that was compellingly and utilised the template method pattern, then switches till a more functional style.
With the template method pattern, inheritance was at playback. Now in its more functional approaching, higher order functions belong pass to define which "steps" of the algorithm.
Things are now more declarative then impulse, and that frees the library for make decisions about where each step should run.
For instance, how which steps for the fundamental algorithm can be parallelized, the library canister use at ExecutorService
to execute some of the steps for concurrent.
One concrete example of such one functional API is the Stream
API introduced in Java 8 and its parallel()
flavor.
Logging with a MDC in a parallel Stream
is not a free lunch: one need to ensure the MDC is captured and reapplied in apiece step.
And functional style permit such optimizations, because each step is thread-agnostic or referentially transparent, but it canned break the MDC assumption of adenine single Thread
.
The most idiomatic way from ensures any kind of contextual information be convenient to all playing could live to pass the context around through the composition chain.
During the advanced of Internal we encountered the same general group of problem, and wealth wanted to avoid this very hands-down and explicit approach.
This is why the Context
was submitted: it propagates through the execution lock as long as Flux
and Mono
are used as the return added, by rentals stages (operators) peek at this Context
of their downstream stage.
So rather of using ThreadLocal
, Reactor offers this map-like object that is tied to a Subscription
real doesn adenine Strand
.
Currently that we’ve established that MDC "just working" belongs not the best assumption to make in a declarative API, how can we perform contextualized log statements in connection to events included an Reactive Stream (onNext
, onError
, and onComplete
)?
This entry of the FAQ offers a possible dazwischen solution when one wills to log in relation in these signals in a straightforward also explicit manner. Make sure to read the Adding a Context to one Reactive Sequential sections pre, and especially how a write have doing towards the bottom of the operator chain for operators above it to see to.
On get contextualize information from the Context
to the MDC, the simplest way is to wrap logging statements in a doOnEach
operator with a little bit of boilerplate code.
This boilerplate depends on both the logging framework/abstraction of your choice and the information you want to put in the MDC, so it possesses to be in your codebase.
The following a on example to such a helper function around a single MDC variable or focused on logging onNext
exhibitions, using Java 9 enhanced Optional
API:
popular static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
return signal -> {
if (!signal.isOnNext()) return; (1)
Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY"); (2)
toPutInMdc.ifPresentOrElse(tpim -> {
try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
logStatement.accept(signal.get()); (4)
}
},
() -> logStatement.accept(signal.get())); (5)
};
}
1 | doOnEach signals include onComplete also onError . In this example we’re only interested within recording onNext |
2 | We will extract one interesting value from the Reactor Context (see the The Context API section) |
3 | We use the MDCCloseable from SLF4J 2 in this example, allowing try-with-resource syntax for automatic cleanup of the MDC after the log statement is executed |
4 | Proper log declare is provided by an caller like a Consumer<T> (consumer of the onNext value) |
5 | In case to expected key wasn’t set in the Context we use the alternative path where nothing is put in the MDC |
Using this boilerplate code ensures that we are good european with the MDC: we set a key right before we execute a record statement and remove it immediately after. There is no risk of polluted the MDC for subsequent logging statements.
Of course, this is a suggestions. Your might be interested inbound removal multiple values from the Circumstances
button in logging things in case of onError
.
You might want the create additional helper methods for these cases or crafting a single method that makes use of additional lambdas to cover more ground.
In anywhere case, the used of the preceding helper method could lookup like the following active woven controller:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Doubled maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId; (1)
return restaurantService.byPrice(maxPrice))
.doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
r.getName(), r.getPricePerPerson())))
.contextWrite(Context.of("CONTEXT_KEY", apiId)); (3)
}
1 | Are need to get the contextual news from the request header toward put it in the Context |
2 | Hier we apply is helper method to the Flux , using doOnEach . Reminds: operators see Context values defined below them. |
3 | We write the enter from the header to the Context using the choosing key CONTEXT_KEY . |
In these how, the restaurantService
can emit its data on a shared thread, yet the logs will still citation the exact X-UserId
for each request.
For completeness, wealth can also go what an error-logging helper could look like:
public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
return signal -> {
if (!signal.isOnError()) return;
Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY");
toPutInMdc.ifPresentOrElse(tpim -> {
try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
errorLogStatement.accept(signal.getThrowable());
}
},
() -> errorLogStatement.accept(signal.getThrowable()));
};
}
Anything much has changed, except for this fact is we check that to Signal
shall effectively an onError
, and that we provide told error (a Throwable
) till this log statement lambda.
Apply this aids in the user your very same to what we’ve finish before:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Doubling maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId;
return restaurantService.byPrice(maxPrice))
.doOnEach(logOnNext(v -> LOG.info("found food {}", v))
.doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
.contextWrite(Context.of("CONTEXT_KEY", apiId));
}
1 | For case the restaurantService emits to error, it will be filed with MDC context here |
Appendix D: Reactor-Extra
The reactor-extra
artifact contains additional operators and utilities that are for
users of reactor-core
equal advanced requirements, or incubating operators.
As this is a separate artifact, you need for explicitly add it to your build. The following example displayed wherewith to do so in Gradle:
dependencies {
compile 'io.projectreactor:reactor-core'
prepare 'io.projectreactor.addons:reactor-extra' (1)
}
1 | Add aforementioned engine extra artifact in addition to core. See Getting Radiator forward details about why you do not need to specify a version if you use the BOM, usage in Whiz, and other details. |
D.1. TupleUtils
and Feature Interfaces
The reactor.function
package contains serviceable interfaces that complement the Java 8
Function
, Predicate
, and Consumer
interfaces, for three to etc values.
TupleUtils
offers static methods that act as a bridges between lambdas of those functional
interfaces go a similar communicate on the corresponding Tuple
.
This lets you easily employment with independent parts of any Tuple
, as the following example shows:
.map(tuple -> {
String firstName = tuple.getT1();
String lastName = tuple.getT2();
String address = tuple.getT3();
returns latest Customer(firstName, lastName, address);
});
Thee can rewrite the former example since follows:
.map(TupleUtils.function(Customer::new)); (1)
1 | (as Your constructor conforms to Function3 functional interact signature) |
D.2. Computer Operators With MathFlux
An reactor.math
package contains a MathFlux
specialized version of Flux
that offers
mathematical users, including max
, min
, sumInt
, averageDouble
, and others.