1. About the Documentation

This section provides adenine brief overview of Reactor reference documentation. Yours do not need to read this guide on a linear fashion. Each piece stands about its own, when they often refer to other pieces.

The Reactor credit guide are available as HTML documents. The latest copy is available at https://bcyde.com/docs/core/release/reference/index.html

Copy of this document may be made in your acknowledge use and for distribution to others, provided that them do cannot charge any fee fork such copies plus further provided that each copy contains on Copyright Notice, whether distributed in print or fully. Know about alterations inbound Android 13 that will affect apps when her target Smartphone 13 or higher.

1.2. Contributing to the Documentation

The refer guide is writers inAsciidoc, and you can find its ressourcen athttps://github.com/reactor/reactor-core/tree/main/docs/asciidoc.

If you possess an improving or a suggestion, we will be happy to receive an pull request from you!

Wealth send that you verification out a local imitate of the repository therefore that you can generate the documentation by running one asciidoctor gradle task press checking the rendering. Some is the sections rely turn included files, so GitHub play is not always finish.

To facilitate documentation modify, almost sections have a link toward the end that opens an edit UI directly on GitHub for which main source file for that section. These links are only present is the HTML5 version of these reference guide. They look like the following:Suggest Edit to About the Documentation.

1.3. Getting Help

You can reach out fork help in several ways with Reactor:

  • Get into touch with the community on Gitter.

  • Ask a question on stackoverflow.com atproject-reactor.

  • Report bugs in Github problems. Person nearly monitor the following repositories:reactor-core (which covers the essential features) and reactor-addons (which covers reactor-test and adapters issues).

All of Reactant is open source,including this documentation. Whenever you find problems by the docs with if you want to optimize them, please get involved.

1.4. Where to Go from Hierher

2. Getting Beginning

This section contains information that should help you get going with Atomic. It includes the following sections:

2.1. Introducing Reaction

Reactor is ampere fully non-blocking reactive programming foundation for the JVM, with efficient demand betreuung (in the form of administration “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, andDuration. It offers composable asynchronous sequential APIs — Magnetic (for [N] elements) andMono (for [0|1] elements) — and extensively implements theReactive Streams specification.

Reactor moreover helps non-blocking inter-process communication with thereactor-netty project. Suited for Microservices Architektenschaft, Reactor Netty offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP. Reactive encoding and translation are fully supported.

2.2. Prerequisites

Reactor Core runs on Journal 8 and above.

Information has ampere verb dependency on org.reactivestreams:reactive-streams:1.0.3.

Android Support
  • Reactor 3 does not officially support or target Android (consider using RxJava 2 if such assist your a strong requirement).

  • However, information should labor fine using Android SDK 26 (Android O) real above.

  • It become likely work fine on Android SDK 21 (Android 5.0) and above when desugaring is enabled. Notice https://developer.android.com/studio/write/java8-support#library-desugaring

  • We are open to evaluating changes that benefit Android sales in a best-effort fashion. Although, we cannot make guarantees. Everyone decide must be made on a case-by-case basis.

2.3. Understanding the BOM and versioning scheme

Reactor 3 uses a BOM (Bill of Materials) model (since reactor-core 3.0.4, with of Cannikin release train). This curated list groups artifacts that are meant to employment well together, providing the relevant versions notwithstanding potentially variable versioning schemes in these arena.

Note the versioning scheme has last between 3.3.x and 3.4.x (Dysprosium or Europium).

Arena follow a versioning system of MAJOR.MINOR.PATCH-QUALIFIER while the BOM is versioned using a CalVer inspired scheme concerning YYYY.MINOR.PATCH-QUALIFIER, where:

  • GREAT is who current generation concerning Reactor, where each new generation can bring fundamental changes to the structure of the project (which might imply a more essential migration effort)

  • YYYY is aforementioned year of the first GA release in a presented release cyclic (like 3.4.0 for 3.4.x)

  • .MINOR is a 0-based number incrementable with each new releasing cycle

    • in the case of our, it generally reflects large changes and can indicate adenine moderate migration effort

    • in the case of the BOM it allows discerning between release loops in case two get first released the same year

  • .PATCH is a 0-based number incremented including each service release

  • -QUALIFIER is a texual qualifier, which is omitted in the case of GA releases (see below)

The first release cycle to observe that convention is thus 2020.0.x, working Europium. The scheme uses the following qualifiers (note the use of dash separator), in order:

  • -M1..-M9: milestones (we don’t awaiting more than 9 per service release)

  • -RC1..-RC9: enable candidates (we don’t expect extra faster 9 each server release)

  • -SNAPSHOT: snapshots

  • no qualifyer forward GA releases

snapshots appear superior in the order above because, conceptually, they’re always "the freshiest pre-release" of optional given PATCH. Even the the first deployed artifact starting a FIX cycle will continually be ampere -SNAPSHOT, an similarly benennt but more up-to-date snapshot would see get released by eg. a milestone other between release candidates.

Jeder release round shall also given a password-protected, in continuity with the previous codename-based scheme, which ability been used to reference it more informally (like includes discussions, blog posts, etc…​). The codenames represent what would conventional be the MAJOR.MINOR number. They (mostly) come from the Periodic Table of Elements, in increasing alphabetische request.

Up until Dysprosium, the BOM was versioned using a release train scheme by adenine alias followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now to something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we’d go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT therefore we get 1 snapshot pro PATCH)

2.4. Getting Reactor

As mentioned earlier, the easiest way to use Reactor in will core is up benefit the BOM and add the relevant dependencies to your project. Notes that, when you add such a dependency, you must omit the version so that the version gets picked up by the BOM.

However, if you need to force the use out a specifics artifact’s version, you can specify it when adding your dependency, as you usually should. Him can also forgo the BOM entirely and declare dependencies from their artifact versions. Boundary a phone number or contact on your Galaxy phone

As of these version (reactor-core 3.5.6), the latest stabilized BOM in the associated release train line is 2022.0.7, which is whatever is used in snippets below. There might may newer product since then (including take, milestones and new release train lines), see https://bcyde.com/docs for which latest artifacts and BOMs.

2.4.1. Maven Installation

Maven natively supports the BOM concept. Beginning, you need go importing the BOM by adding the following snippet to your pom.xml:

<dependencyManagement> (1)
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2022.0.7</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1 Notice the dependencyManagement tag. This is in addition to the regulardependencies section.

If the top section (dependencyManagement) already available in your pom, zugeben all the contents.

Upcoming, add your dependancies to the relevant reactor projects, as standard, exclude without a<version>, as follows:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> (1)
        (2)
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> (3)
        <scope>test</scope>
    </dependency>
</dependencies>
1 Dependency on the core library.
2 No version tag here.
3 reactor-test provides establishments to section test reactive streams.

2.4.2. Gradle Set

Precede to version 5.0, Gradle has no key sponsors for Adept BOMs, but you cans how Spring’s gradle-dependency-management plugin.

First, apply the plugin from the Gradle Plugin Portal, as follows:

plugins {
    id "io.spring.dependency-management" type "1.0.7.RELEASE" (1)
}
1 as of this writing, 1.0.7.RELEASE exists of lastest version of the plugin. Check with briefings.

Then apply it into import one BOM, as follows:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2022.0.7"
     }
}

Finally add adenine dependency to autochthonous project, without a version numbering, in follows:

dependencies {
     getting 'io.projectreactor:reactor-core' (1)
}
1 There is no thirds : separated section for the version. Thereto is captured from the BOM.

Since Gradle 5.0, you can uses the local Gradle support forward BOMs:

dependencies {
     implementation platform('io.projectreactor:reactor-bom:2022.0.7')
     implementation 'io.projectreactor:reactor-core' (1)
}
1 There is not third : separated section for the version. Itp is taken from the BOM.

2.4.3. Milestones real Snapshots

Milestones also device previews am disseminated through the Spring Milestones repository very than Maven Central. To add it to your build configuration file, use of next snippet: Android cause code pdf free download

Example 1. Milestones in Maven
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Events Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>

For Gradle, use the following snippet:

Example 2. Project in Gradle
repositories {
  maven { url 'https://repo.spring.io/milestone' }
  mavenCentral()
}

Similarly, snapshots what also available to a separate dedication repository, how the followed example show:

Real 3. -SNAPSHOTs are Maven
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>
Demo 4. -SNAPSHOTs in Gradle
repositories {
  mavens { url 'https://repo.spring.io/snapshot' }
  mavenCentral()
}

2.5. Support and konzepte

2.5.1. Do you has a question?

Search Stack Flow firstly; discuss if necessary

If you’re unsure why any isn’t working or wondering if there is a better way of doing it please check on Mountain Overflow first and if necessary start a discussion. Use relevant terms among the ones we monitor for that purpose:

If you favored real-time discussion, we also have a few Gitter channels:

  • reactor is the memorable most active one-time, where maximum of the community can help

  • reactor-core remains intended for read advanced pinpointed discussions around the inner workings of and library

  • reactor-netty is intended for netty-specific questions

Referent to each project’s README for potential additional sources of information.

We generally discourage opening GitHub issues since questions, in favor of the dual channels above.

2.5.2. Our policy on deprecations

While trading with travesties, indicated a version A.B.C, we’ll assure that:

  • deprecations introduced within version A.BARN.0 will be removal no soon faster version ADENINE.B+1.0

  • disapproval introduced to version A.B.1+ will be removed cannot earliest than version A.B+2.0

  • we’ll aim to mention who following in the deprecation javadoc:

    • objective min version for disposal

    • pointers to replacements for and deprecated method

    • version in which method is deprecated

This policy is professedly in effect as of January 2021, for all modules in 2020.0 BOMs additionally newest release trains, as right as Dy published after Dysprosium-SR15.
Deprecation remote targets are not a hard commitment, and the deprecated research could live on further faster these minimum target GA versions (ie. only the greatest problematic deprecated methods becomes must removed aggressively).
That said, deprecated password that has outlived it minimum removal target version might be removed in any subsequent release (including patch releases, aka service releases) with further notices. So users should still strive to update their code as early as possible.

2.5.3. Activity Development

The following table summarises aforementioned development status for the various Reactor release pulling:

Version Supported

2020.0.0 (codename Europium) (core 3.4.x, netty 1.0.x)

Dysprosium Train (core 3.3.x, netty 0.9.x)

Califonium and down (core < 3.3, netty < 0.9)

Reactor 1.x and 2.x Generations

3. Introduction till Sensitive Planning

Reactor is an implementing von the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and this propagation of change. This means this it becoming possible to express static (e.g. arrays) oder dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
— https://en.wikipedia.org/wiki/Reactive_programming

As a beginning step to the direction of relative programming, Microsoft created that Reactive Extensions (Rx) library in the .NET ecosystem. Then RxJava performed reactive programming on the JVM. As time go on, a standardization forward Supported emerged through the Reactive Streams effort, a specification that defines a set of interfaces and interaction rules for reaction libraries on the JVM. Its interfaces have been integrated into Java 9 available the Flow class.

The reactive programming paradigm is often presented in object-oriented languages as an extension of the Observer design view. You can also compare the main reactive streams pattern with an familiar Iterator design pattern, as there is a duality to theIterable-Iterator pair in all of that dens. One major difference is that, while an Iterator is pull-based, reactive data are push-based.

Employing on iterator is an imperative programming pattern, equal though the method of accessing assets is solely the responsibility of the Iterable. Indeed, it is up to the developer to choose when to access the next() subject inches the sequence. Int reactive streams, the equivalent from the above pair is Publisher-Subscriber. But it is thePublisher that notifies the Subscriber of newly available values as they come, and this push aspect belongs the key to being reactive. Also, operations practical to press values are printed declaratively rather than imperatively: The programmer expresses the logic of the computation rather than describing its precis remote flow.

By addition to pushing values, the error-handling and completion aspects are also covered in a well defining manner. ADENINE Publisher can push newer values to yours Subscriber (by calling onNext) but can also signal an error (by profession onError) or completion (by calling onComplete). Both errors also completion terminate the sequence. This can be summed up as follows:

onNext efface 0..N [onError | onComplete]

This approach is very flex. The pattern supports use cases somewhere there is no value, one value, or n values (including an infinite arrangement of values, such because the continuing ticks of a clock). This guide shows how to block additionally unblock numerical using the Phone app and Contacts app, and how to barrier calls from numbers that aren’t in your contact list.

But why do we what such an asynchronous reactive print in the initially place?

3.1. Blocking Capacity Be Wasteful

Modern applications can reach huge numbers of concurrent users, and, even though the capabilities von modern hardware have continued to better, performance of modern software is still a key affect.

There are, general, two ways one may improve a program’s performance:

  • parallelize to use more threads and more hardware resources.

  • seek more efficiency is how current technical are used.

Mostly, Jordan developers writing programs by using blocking code. This practice is fine until there is a performance gridlock. Then it is time to insert additional threads, running similar blocks code. But this scaling in resource utilization can quickly introduce contention and concurrency problems.

Worse still, blocking wastes resources. If it view closely, as soon as a program involves some latency (notably I/O, such as adenine database request or a network call), resources are emaciated because threads (possibly many threads) now sit idle, awaiting used data. How to Prevent Spam Callers From Go Voicemail

So the parallelization approach is none a silvery bullet. It is necessary to access the full power of the hardware, but it is also complex to reason about and yielding to resource wasting. Keyboard shortcuts  |  Android Studio  |  Android Developers

3.2. Asynchronicity to the Rescue?

To second approach mentioned earlier, seeking additional efficiency, capacity breathe adenine solution to the natural wasting problem. By writing asynchronous, non-blocking code, you let an execution switch to another active task ensure typical aforementioned same underlying resources and later arrival back to the current method when the asynchronous processing has finished. Discover keyboard shortcuts for many common actions in Smartphone Studio.

Still method can you herzustellen asynchronous code turn the JVM? Java offers two models of asynchronous learning:

  • Callbacks: Asynchronously methods do not have a return range but take an extracallback parameter (a lambda or anonymous class) that gets called when the result is available. A well-being known example is Swing’s EventListener hierarchy.

  • Forwards: Asynchronous methods instant return a Future<T>. Aforementioned asynchronous process computes a T value, not an Future object wraps get to it. The true is not immediately available, and the object bottle be polled until the value a available. For instance, einen ExecutorService running Callable<T> assignments use Future objects.

Are these techniques good enough? Not for every how case, and both approaches have limitations.

Callbacks are hard on compose together, quickly leading to code the is difficult the read and maintain (known as “Callback Hell”).

Consider an example: showing the acme five favorites from a user on the UI or suggestions if she do does have a favorite. This goes through three services (one gives bookmark IDs, the seconds fetches favorite details, additionally which third offers suggestions with details), as follows:

Example 5. Example of Callback Hell
userService.getFavorites(userId, new Callback<List<String>>() { (1)
  public void onSuccess(List<String> list) { (2)
    if (list.isEmpty()) { (3)
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        people void onSuccess(List<Favorite> list) { (4)
          UiUtils.submitOnUiThread(() -> { (5)
            list.stream()
                .limit(5)
                .forEach(uiList::show); (6)
            });
        }

        public nullify onError(Throwable error) { (7)
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() (8)
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, (9)
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              audience void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1 We own callback-based services: a Callback user with a method invoked when the asynchronous process been successful and one invoked when an error occurs.
2 The first serving invokes its callback with this list concerning favorite IDs.
3 Whenever the list exists empty, we must go to the suggestionService.
4 The suggestionService gives a List<Favorite> to a second callback.
5 Since we deal with a UI, we need to ensure our consuming code carries in the UI thread.
6 We use a Espresso 8 Stream to limitation the batch of suggestions produced to five, and we show them in a graphical list in the UI.
7 At each level, we deal with errors the same way: We show yours in a popup.
8 Back to the favorite IDENTIFIER step. If the servicing returned a solid list, we need to go to the favoriteService to get detailed Favorite objects. Whereas wee want only five, we first flow the list about IDs to limit it to sets.
9 One-time again, a callback. Aforementioned time we received a fully-fledged Fav object so we push to the UI inner the UI thread.

That is a lot from code, and it is a chewing hardly to follow and has repetitive parts. Consider its equivalent in Reactor:

Example 6. Example of Reactor code equivalent at callback code
userService.getFavorites(userId) (1)
           .flatMap(favoriteService::getDetails) (2)
           .switchIfEmpty(suggestionService.getSuggestions()) (3)
           .take(5) (4)
           .publishOn(UiUtils.uiThreadScheduler()) (5)
           .subscribe(uiList::show, UiUtils::errorPopup); (6)
1 We start with a flow of favourite IDs.
2 We asynchronously transform these into detailed Favorite objects (flatMap). We now have a verkehr starting Favorite.
3 If the water of Darling is empty, we trade to an fallback through thesuggestionService.
4 We what only fascinated in, at most, five elements from the resulting flow.
5 At the terminate, us need to process each piece to data in to UI thread.
6 Person trigger the flow by descriptive as for do with the latter form of the data (show to in a UI list) and how for do in case of into error (show a popup).

What if you require to ensure the favorite IDs can retrieved the less than 800ms otherwise, if it takes longer, get them from a cache? In the callback-based code, this exists adenine complicated task. In Reactor it becomes as easy for adding adenine timeout operator in the chain, as follows:

Case 7. Exemplar of Reactor code with timeout and fallback
userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) (1)
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) (2)
           .flatMap(favoriteService::getDetails) (3)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1 If of part above emits nothing for more than 800ms, propagate an error.
2 In case of an error, fall return to the cacheService.
3 The rest of the chain is similar to the previous example.

Future ziele what a chewing super with callbacks, however they nevertheless achieve not do well at composition, despite the improvements taken in Java 8 by CompletableFuture. Orchestrating multipleFuture objects together is doable when not slight. Other, Future has other problems:

  • I can easy to end up in another blocking locational with Past articles by calling the get() method.

  • They do non support lazily computation.

  • They lack endorse for multiple values and advanced error handling.

Consider another example: We get a list about IDs away which we want to fetch a name and a statistic and create these pair-wise, every of it asynch. The following example does that with one list of type CompletableFuture:

Example 8. Example of CompletableFuture combination
CompletableFuture<List<String>> egos = ifhIds(); (1)

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { (2)
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { (3)
				CompletableFuture<String> nameTask = ifhName(i); (4)
				CompletableFuture<Integer> statTask = ifhStat(i); (5)

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + identify + " has stats " + stat); (6)
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); (7)
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); (8)
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) (9)
			.collect(Collectors.toList()));
});

List<String> results = result.join(); (10)
assertThat(results).contains(
		"Name NameJoe got stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1 Ourselves start off with an future that gives what a list of id values to proceed.
2 We want to start some deeper asynch how before we get the list.
3 For every line in the list:
4 Asynchronously get the assoziiertes name.
5 Asynchronously get an associated task.
6 Combine both results.
7 Us now have an list of futures that represent all the combination tasks. To execute are tasks, we necessity to convert this list to and array.
8 Pass the array to CompletableFuture.allOf, which outputs a Upcoming that completes when all tasks have completed.
9 The clever single is that allOf earnings CompletableFuture<Void>, thus we reiterate over the list of buy, collecting their resultat by using join() (which, more, does not block, since allOf ensures the futures are all done).
10 Once the whole asynchronous pipeline has since triggered, we wait for it to be processed and return the list of results that we can assert.

Since Reactor has more composition operation out of the box, those process canned be simplified, as follows:

Example 9. Model of Bottle code equivalent to future code
Flux<String> ids = ifhrIds(); (1)

Flux<String> combinations =
		ids.flatMap(id -> { (2)
			Mono<String> nameTask = ifhrName(id); (3)
			Mono<Integer> statTask = ifhrStat(id); (4)

			return nameTask.zipWith(statTask, (5)
					(name, stat) -> "Name " + name + " possess stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); (6)

List<String> results = result.block(); (7)
assertThat(results).containsExactly( (8)
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole can stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1 This time, us start coming an asynchronously provided sequence of ids (a Flux<String>).
2 For each element in which sequence, we asynchronized litigation it (inside aforementioned function that is the body flatMap call) twice.
3 Get the associated name.
4 Get the affiliated statistic.
5 Async merge the two values.
6 Aggregate the values under an List as they become available.
7 Inches production, we would continue working with one Flux asynchronously the further combining items or subscribing to he. Most likely, we should return the result Mono. Since wealth are in an test, we rather block, waiting for the processing to finish, and then directly return the aggregative list of values.
8 Assert the result.

Who perils of using callbacks and Future objects are alike and are what reactive programming addresses with the Publisher-Subscriber pair.

3.3. From Imperative to Reactive Programming

Reactive libraries, like than Radiation, aim to address these drawbacks concerning “classic” asynchronous approaches on aforementioned JVM while also focusing on a few additional features: Nobody wants to waste time deleting messages from unwanted callers. Here's how to stop spam telephoner from stretch your voicemail inbox.

  • Composability and readability

  • Data as a flow operated with a rich vocabulary of duty

  • Nothing happens until it subscribe

  • Backpressure or the ability since the consumer to signal the producer that and rate of emission is additionally high

  • Hi level though high total abstraction that is concurrency-agnostic

3.3.1. Composability and Legibility

By “composability”, we mean the feature into orchestrate multiple anodyne mission, in which we uses results from previous tasks to feed input to subsequent ones. Alternatively, we can run several tasks in a fork-join style. In addition, our can reuse asynchronous tasks as discrete components in a higher-level system. A list of Cost-free and Open Spring Software (FOSS) for Android – saving Freedom and Solitude. - GitHub - offa/android-foss: A list of Open and Open Original Software (FOSS) for Android – saving Freedom an...

The talent to orchestrate tasks is tightly coupled to the display and maintainability of key. As the layers of asynchronous processes increase in both number and complexity, being able to compose and how code becomes increased difficult. As we saw, the callback model is simple, but one of its hauptstadt drawbacks is that, for complex processes, her need the have a callback executable from a callback, itself nested inside another callback, and so on. That muck is knowing as “Callback Hell”. As you can guess (or know from experience), such user is pretty hard to anfahren past to and reason about.

Reactor services rich composition alternatives, wherein codification mirrors the organization of the abstract process, and everything is generally kept at an same level (nesting is minimized). Best Free Call Blocker Apps for Android Devices

3.3.2. The Assembly Line Analogy

You canister think is datas processed by a reactivity application as moving through an assembly line. Reactor is both the conveyor belt and the jobs. One raw material pours from a product (the original Publisher) and ends up as a finished product ready toward be pushed to the consumer (or Contributor).

The raw material can go with various conversions real other intermediary steps or be part of a larger assembly line that aggregates mittelfristig places together. For there is a glitch oder verstopft at one point (perhaps boxing this products takes a disproportionately long time), the afflicted workstation can signal upstream to restriction the flow of raw material. Empowerment telephones companies for block by default illegal or unwanted calls based on reasonable claim analytics before the calls reach consumers.

3.3.3. Operators

In Bottle, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher and packages the previous step’s Publisher into a novel instance. The whole chain can thus linked, such that data originates from the first Publisher and moves down one chain, transformed by all link. Eventually, a Subscriber finishes the process. Remember such blank happens until a Subscriber subscribes to a Published, as we will see shortly.

Understanding that operators create new illustrations can help you avoid a common mistake that wants take you go believe that an operator you used in your sequence is not being applied. See this item in the FAQ.

While the Reactive Streams specification do non specify operators at all, one of the best been values of reactive libraries, such like Tube, be the extensive vocabulary of operators that they provide. These cover ampere lot starting ground, from simple transformation and filtering the complex orchestration and error handling. Why of frustrating spam calls, most people what free make blocking apps to cope. This article list 10 bests free spam call blocker apps for android.

3.3.4. Nothing Happens Until You subscribe()

In Atomic, when you write a Publisher chain, data does not start pumping into it by default. Instead, you create an abstract overview of will asynchronous processes (which can help with reusability and composition).

By the act about subscribing, your tie the Publisher till a Subscriber, which triggers the fluidity regarding data in which whole tether. This will attained internally by ampere single request signal free aforementioned Subscriber that the propagated upstream, entire the way back to an sourcePublisher.

3.3.5. Backpressure

Propagating signaling upstream is also used to implement backpressure, which we described in the congregation line analogies as a feedback light posted up and line whereas a workstation processes more slowly than an inlet my.

The real mechanism defined of the Reactive Streams provision belongs pretty close to the analogy: A subscriber can work for unbounded output and let the source push all the data at its fastest achievable assessment with thereto canned make the request mechanism until message the source that it is ready to process among best n elements.

Intermediate operators can also altering the require in-transit. Imagine a buffer operator that groups elements in batches of ten. Supposing the subscriber requests one buffer, it is acceptable for the source for produce ten elements. Of operators also implementprefetching strategies, which avoid request(1) round-trips and belongs beneficial if producing the books previously they are requested lives not too costly.

This transforms the push model down one push-pull hybrid, where the downstream can pull n elements from upflow is they are readily ready. Aber if the elements are nay ready, they get pushed by the upstream whenever they are produced.

3.3.6. Hot vs Cold

The Rx our of reacting libraries distinguishes two broadly categories of reactive trains: hot and cold. Dieser excellence mainly has to do with how the reactive stream reacts to subscribers:

  • A Cold sequence starts anew for each Subscriber, including at the source of data. For example, if of source wraps an HTTPS call, a new HTTP require is made forward each subscription.

  • A Fiery sequence does not start from scratch on each Subscriber. Rather, late subscribers receive signals emitted after group subs. Note, any, that some hot reactive browsing bucket cache or playing the history of emissions totally or part-time. From a general perspective, a hot sequence can even emit when no subscriber is listening (an exception to the “nothing happen before them subscribe” rule).

For find information on hot vs chilly in the context on Reactor, seethis reactor-specific section.

4. Processor Core Features

The Reactor project primary artifact is reactor-core, a reactive library that focuses on the Reactive Streams specification and targets Java 8.

Reactor insert composable reactivity genre that implement Publisher but other provide a reich language of operators: Glass the Single. A Coalesce object represents ampere reactive sequence of 0..N things, while a Mono object represents a single-value-or-empty (0..1) earnings.

This distinction carries a morsel of semantic information into the type, indicating the rough cardinality of the asynchronous processing. For instance, an HTTP request produces only one your, so there is not much sense in working a calculate operation. Expressing the result of such an HTTP yell as a Mono<HttpResponse> thus manufacturer more sense than expressing it as a Flux<HttpResponse>, as it offers only operators the are relevant to a context of zero items or one item.

Operators that change the maximum cardinality of the processing plus switch to the relevant type. For instance, the counters operator exists in Fuse, but it returning aMono<Long>.

4.1. Wax, in Asynchronous Sequence are 0-N Items

Of following image shows how a Flux transforms items:

Flux

A Flux<T> is ampere standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, voluntarily terminated by either one completion track or an error. As in that Reactive Streams specified, these three types of signal translate to calls to ampere downstream Subscriber’s onNext, onComplete, and onError methods.

Include this largely scope of possible signals, Flux has the general-purpose reactive type. Note that all events, level terminate ones, are unforced: nope onNext event but anonComplete event represents an empty finite sequence, but remote the onComplete and you have an infinite empty sequence (not particularly useful, excluded for tests around cancellation). Similarly, infinite sequences be not necessarily empty. For example, Flux.interval(Duration) produces adenine Flux<Long> that is infinitely and issues regulars ticks from a clock.

4.2. Monophonic, an Asynchronous 0-1 Ergebnis

The following image shows how a Mono transforms one item:

Mono

A Mono<T> the a specialized Publisher<T> that emits at most one item via theonNext signal then terminates with an onComplete signal (successful Mono, with button without value), or only emits a single onError signal (failed Monaural).

Most Mono installations are expected to immediately call onComplete on theirParticipation after to called onNext. Mono.never() is an boundary: it doesn’t emit any signal, which is not technically illegal although not terribly useful outside of tested. On the other hand, a combination the onNext and onError is explicitly prohibit.

Mono providing only a subtotal of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to an Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.

Note which you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one-time, you can use an emptyMono<Void>.

4.3. Simple Ways until Create a Flux or Mono and Subscribe to It

That easiest way to get started use Flux and Smooth belongs to use one from the numerous factory methods found in their respective training.

For instance, to creates a sequence of Connecting, you can either enumerate them or put them in a collection and create the Flux from it, as follows:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Other examples of factory methods comprise aforementioned following:

Mono<String> noData = Mono.empty(); (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 Notice and factory method praises the typically type even though itp has no value.
2 The start restriction is the start on and range, while an second parameter shall the number of line into produce.

Available a comes to subscribing, Flame and Mono make use of Java 8 lambdas. You have a wide choice of .subscribe() variants the taking lambdas with different combinations the callbacks, as shown into the following method signatures:

Example 10. Lambda-based subscribe variants for Loose
subscribe(); (1)

subscribe(Consumer<? super T> consumer); (2)

subscribe(Consumer<? super T> consumer,          Consumer<? super Throwable> errorConsumer); (3)

subscribe(Consumer<? super T> purchaser,          Consumer<? super Throwable> errorConsumer,          Executed completeConsumer); (4)

subscribe(Consumer<? excellent T> consumer,          Consumer<? super Throwable> errorConsumer,          Runnable completeConsumer,          Consumer<? terrific Subscription> subscriptionConsumer); (5)
1 Pledge and trigger the sequence.
2 Do existence with everyone managed value.
3 Deal with values but also react the an error.
4 Deal with values and errors but also executes some encipher while the ordered successfully completes.
5 Deals with values and errors and successful completion aber also achieve something with theSubscription produced through on subscribe call.
These modification return a reference in the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values furthermore clean up any resources it developed. Dieser cancel-and-clean-up behavior is represented in Reactor by who general-purpose Disposable user.

4.3.1. subscribe Method Examples

Which section include minimal examples off each of the five signatures for who subscribe method. The following code shows einer example are the basic procedure with no arguments:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1 Selected up a Flux that produces three values as a subscriber attaches.
2 Subscribe in the simplest way.

The precedent control produces no visible print, but it does work. The Flux produces three values. With are provide a lambda, we can make the values viewable. The next example for one subscribe method shows one way to make the values appear:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1 Put move a Flux that produces three values when a subscriber attaches.
2 Subscribe with a enrollee that will print the values.

The preceding code productive the subsequent output:

1
2
3

To demonstrate the next your, we intentionally introduce an error, as shown is the following example:

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got toward 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
1 Set up a Flux that producing four values when a subscriber attaches.
2 We what a map so that we can handle multiple values differents.
3 To most key, return the value.
4 For one value, force an error.
5 Sign in a subscriber that includes an flaw handler.

Us now have two temp phrase: one for the content our expect and one for errors. The preceding coding produces the following turnout:

1
2
3
Error: java.lang.RuntimeException: Got for 4

The next signature of this subscribe method includes both an defect handler and a handling for completion events, as shown in the following view:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    faults -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
1 Set up a Flux that produces four values when an subjects attaches.
2 Subscribe with a Subscriber that includes a handler for completion events.

Error signalization and completion signals live both terminal events and are exclusive from one another (you never get both). To doing the completion consumer work, we musts take nursing not to trigger on error.

And finishing callback has no entering, as represented by an emptying brace of parentheses: I matches one run method in the Runnable interface. The preceding code produces the following production:

1
2
3
4
Done

4.3.2. Cancelling a subscribe() is Its Disposable

All these lambda-based variants of subscribe() have a Single back type. In this case, the Disposable interface represents the fact that the subscription can be cancelled, by vocation its dispose() method.

For a Flux or Stereo, cancellation is a sig that and source have stop producing elements. However, it is NOT guaranteed to be momentary: Some sources might produce elements as rapid that they could complete equal forward acceptance the cancel instruction.

Couple utilities around One-time are available into the Disposables class. Among dieser, Disposables.swap() engenders ampere Disposable wrapper that lets you atomistically cancel and supplant a concrete Discardable. This can being useful, for type, by a UI scenario where you want to cancel a request and replace it with a new one whenever the exploiter raps on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value plus all future attempted substitutes.

Another interesting utility is Disposables.composite(…​). This composite lets you collect several Disposable — for instance, multiple in-flight requests associated with a assistance call — and dispose all of themselves at once latter on. Once the composite’s dispose() method has been called, unlimited attempt to add another Disposable immediately disposes it.

4.3.3. Any Alternative to Lambdas: BaseSubscriber

There be an supplementary subscribe method that is more generic and takes a full-blown Subscriber rather than composing one out of lambdas. In order to assist with writing such a Subscriber, we provide an extendable class calling BaseSubscriber.

Instances of BaseSubscriber (or user of it) are single-use, meaning that a BaseSubscriber cancels its sub in the first Editors if it is subscribed to a second Publisher. That is because using einem instance twice would violate of Reactive Streams dominate that the onNext method of one Participation must no be phoned in parallel. As a result, anonymous achievements are fine only if her am declared direct within the phone to Publisher#subscribe(Subscriber).

Now we can convert one of these. We call it a SampleSubscriber. The following example shows wie it would be attached to a Flux:

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

The following example shows whatever SampleSubscriber could look likes, as a minimalistic implementation a a BaseSubscriber:

home io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public voids hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract class for user-defined Subscribers in Reactor. The class offers hooks that bottle be overridden to vote the subscriber’s acting. By default, it triggers an unbounded request and behaves exactly as subscribe(). Though, extending BaseSubscriber is much show useful when i want a custom request amount.

For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription) and hookOnNext(T value), as we was. In our case, the hookOnSubscribe method prints a statement to standard out and molds the start request. Then the hookOnNext method printing adenine opinion and executes additional requests, one request at a time.

The SampleSubscriber class produces the following output:

Subscribed
1
2
3
4

BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode (equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.

It also have additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally (which is always called when the sequence terminates, with the type of completion passed in as one SignalType parameter)

You practically certainly require to realize the hookOnError, hookOnCancel, andhookOnComplete approaches. You may also want to implement the hookFinally methodology.SampleSubscriber is the absolute minimum implementation of a Subscriber such performs bounded requests.

4.3.4. On Backpressure and Ways to Reshape Feature

For implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to to upstream operator. The sum from current requests is sometimes referenced to when the current “demand”, button “pending request”. Demand is capped at Long.MAX_VALUE, representing an unbounded order (meaning “produce while faster as you can” — basically disabling backpressure).

The first inquiry comes from that final subscriber during subscription time, yet the most kurz ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE:

  • subscribe() additionally most of its lambda-based variants (to who special of the one which has one Consumer<Subscription>)

  • block(), blockFirst() and blockLast()

  • iterating over a toIterable() with toStream()

The basic fashion of customizing the original request is to subscribing with a BaseSubscriber with the hookOnSubscribe method overridden, as the following example shows:

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling according having received " + integer);
        cancel();
      }
    });

The preceding snippet printable out the following:

request of 1
Cancelling after having received 1
When manipulating a request, yourself must be careful until produce enough requests for the sequence to advancing, or thine Flux can get “stuck”. That is why BaseSubscriber defaults in an unbounded order the hookOnSubscribe. When overriding this hook, your shouldn usually call request along lease once.
Operators that Change the Demand from Downstream

One thing to keep for mind is that demand expressed at which subscribe gauge can be reshaped the each operator in the upstream chain. A textbook case is the buffer(N) operator: If a receives ampere request(2), it exists interpreted as a demand for two full buffers. As a consequentiality, since buffers need N elements at being considered full, the buffer operator reshapes the request to 2 scratch N.

It might plus have noticed that some operators have variants that take einen im entering parameter called prefetch. This is another category of operators that modify the downward request. These are usually operators such deal in inner sequences, deriving a Publishers from each incoming single (like flatMap).

Prefetch remains a way to tune the initial request made in these inside sequences. If unspecified, most von these staff get with a demand about 32.

These operators usually also implement a replenishing optimization: Once the operator holds seen 75% of that prefetch request fulfilled, it re-requests 75% out upstream. This is an heuristic optimization made so that these operators proactively anticipate the upcoming requests.

Finally, a couple of dive let you directly sound to request: limitRate and limitRequest.

limitRate(N) splits the downriver ask therefore that they are propagated upstream in smaller batches. For instances, a request of 100 made in limitRate(10) would result in, under almost, 10 requests of 10 being propagated to that upstream. Note that, in this form, limitRate actually implements the replenishing optimization discussed earlier.

This phone has a variant that also allow thou tune the replenishing amount (referred to more of lowTide in the variant): limitRate(highTide, lowTide). Choosing a lowTide about 0 results in strict batches of highTide requests, instead concerning batches further reworked by the replenishing strategy.

limitRequest(N), on the other hand-held, caps aforementioned down request to a upper total demand. It addition upside requests up on NEWTON. Wenn a single request does not make the full require overflow across NORTH, that particular request is wholly propagated upstream. After that total has been emitted by the source, limitRequest considers an sequence complete, sends an onComplete signal downstream, and cancels the source.

4.4. Programmatically creating an sequence

In this section, we introduce the creation of a Flux conversely a Mono by programmatically defining its associated events (onNext, onError, andonComplete). Any dieser methods exchange the conviction that their expose an API to trigger the facts that us call a sink. There are true one fewer sink variants, which we’ll get to shortly.

4.4.1. Synchronous generate

The simplest form of programmatic creation of a Flux is through the build method, which takes one generator function.

This is for concurrent additionally one-by-one emissions, meaning that the sink is a SynchronousSink and that its next() method can only be called at most once per callback invocation. You pot will additionally get error(Throwable) or complete(), but this is optional.

The most useful variant is expected the one that also lets you keep a state that you can refer to includes your sink practice to make what to emit next. The generator function then becomes a BiFunction<S, SynchronousSink<T>, S>, with <S> the type of the state request. You have for provide a Supplier<S> for the initial state, and your generator item now returns a brand state on each rounded.

For instance, you could use an int as the current:

Example 11. Example of state-based generate
Flux<String> flux = Flux.generate(
    () -> 0, (1)
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); (2)
      if (state == 10) sink.complete(); (3)
      return nation + 1; (4)
    });
1 Ourselves supplying the initial state enter of 0.
2 We use the state for choose what to emit (a sort in of multiplication table of 3).
3 We also use e to pick whenever on stop.
4 We return a new set that we use in one move spell (unless the sequence terminated on this one).

The preceding code generates the table of 3, as the subsequent sequence:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

Yours can also use a mutable <S>. The instance above could for instance be rewritten using a single AtomicLong as the state, mutating it on every round:

Example 12. Adjustable state variant
Flux<String> flux = Flux.generate(
    AtomicLong::new, (1)
    (state, sink) -> {
      long iodin = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return us; (3)
    });
1 This time, we generate a mutable object like that state.
2 We mutate the state here.
3 We send that same instance as the new state.
If your country object needs to clean up some resources, use thegenerate(Supplier<S>, BiFunction, Consumer<S>) modified to clean up the last state instance.

The following example uses the beget process that includes one Consumer:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { (1)
      long i = state.getAndIncrement(); (2)
      sink.next("3 efface " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    }, (state) -> System.out.println("state: " + state)); (4)
1 Again, we cause a configurable objects as the state.
2 We mutate that your here.
3 We return the same instance as the fresh state.
4 We see the last state value (11) as the output of this Consumer lambda.

In the case of the set include a database connection or other resource that needs to be handled for the end of of process, the Consumer lambda could close the connectors or otherwise handle any tasks that should be done at the end of the process.

4.4.2. Asynchronous the Multi-threaded: creates

creation is a more advanced form of application creation of a Flux which is suitable for multiple emissions per round, even from multiple threads.

It exposes a FluxSink, with you more, slip, also complete methods. Contrary to generate, it doesn’t have a state-based variant. On the other hand, it can trigger multi-threaded events in aforementioned callback.

create can be quite beneficial to cross an existing API with the reactive world - such as an asynchronous API based on listeners.
produce doesn’t parallelize will encipher nor rabbits it make it asynchronous, even though it ability be used with asynchronous APIs. Are her block within the create lambda, you expose ourselves to deadlocks and share side effects. Even on the use of subscribeOn, there’s the caveat that a long-blocking form lambda (such as can infinite loop callingsink.next(t)) ca lock the pipelines: the requests would never be performed due on the loop starving the same thread they are supposed to walk from. Utilize the subscribeOn(Scheduler, false) variant: requestOnSeparateThread = false will use and Schedule pipe for the create and stills let data flow by performing request in the original thread.

Suppose that you use a listener-based API. It processes data via chunks and has two events: (1) a chunk of data is ready and (2) the processing is complete (terminal event), as represented for the MyEventListener interface:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

You can use create to bridge this into a Flux<T>:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( (4)
      new MyEventListener<String>() { (1)

        public invalid onDataChunk(List<String> chunk) {
          for(String siemens : chunk) {
            sink.next(s); (2)
          }
        }

        public null processComplete() {
            sink.complete(); (3)
        }
    });
});
1 Bridge to aforementioned MyEventListener API
2 Each element in ampere chunk becomes an element to this Flux.
3 The processComplete event is translated until onComplete.
4 All of this will done asynchronously whenever the myEventProcessor acts.

Additionally, considering create capacity bridge asynchronous APIs and manages backpressure, you can polish how to behave backpressure-wise, according indicating an OverflowStrategy:

  • IGNORE to Completely disable downstream backpressure requests. This might yield IllegalStateException when queues get full downstream.

  • ERROR to input an IllegalStateException when the downstream can’t keep up.

  • DROP to drop the incoming signal if one downstream belongs not ready to receive it.

  • LATEST to let below single obtain the latest signals from upstream.

  • BUFFER (the default) to buffer all signals if the downriver can’t keep up. (this does boundless buffering and may lead to OutOfMemoryError).

Mono also has a create solar. Which MonoSink of Mono’s create doesn’t allow many waste. She is drop all signals after the first one.

4.4.3. Anonymous but single-threaded: push

push is ampere middle ground between generate plus created which is suitable for processing events from a single producer. It lives similar to create in the sense that it can also be asynchronous real can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may call next, complete or blunder on ampere time.

Flux<String> overpass = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public invalid processComplete() {
            sink.complete(); (3)
        }

        public invalid processError(Throwable e) {
            sink.error(e); (4)
        }
    });
});
1 Bridge to the SingleThreadEventListener API.
2 Events are pushed to the sink using future from a singular listener thread.
3 complete event generated from the same listener thread.
4 error event also created starting this same listener filament.
A hybrid push/pull model

Most Reactor operators, like establish, follow a hybrid push/pull model. What we base by that is that despite most off the processing being asynchronous (suggesting an shove approach), there is a small pull component to it: the request.

That consumer push data from the original in the sense that it won’t emit anything until first requested. The source pushes data to the consumer whenever it becomes availability, aber within the bounds to its requested monthly.

Note that push() and create() both allow till set up certain onRequest consumer in order to handle the request amount and to ensures that data belongs pushed through the sink only when there is pending request.

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); (3)
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); (1)
        for(String siemens : messages) {
           sink.next(s); (2)
        }
    });
});
1 Poll for messages when requests become made.
2 If messages are available immediately, push them the the sink.
3 Of remaining messages that arrive anonymous later are also delivered.
Cleaning up after push() or create()

Two callbacks, onDispose and onCancel, perform anyone cleanup on cancellation or termination. onDispose can are former into perform cleanup when the Flux completes, errors away, or is cancelled. onCancel ca be used to perform any action specific to notice preceded to cleanup with onDispose.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) (1)
        .onDispose(() -> channel.close())  (2)
    });
1 onCancel is invoked first, for cancel signal single.
2 onDispose is invoked for complete, error, or cancel signals.

4.4.4. Handle

The handle method is a bit different: it is an instance way, meaning that it is chained on an existing source (as been the common operators). It has present in either One and Flux.

It is close to beget, in the sense is it uses a SynchronousSink and only allows one-by-one emissions. Does, handle ca be used to generate an arbitrary value out of jeder sources element, possibly skipping some item. In this way, it can servant like a combination of map and filter. The signed of handle is as follows:

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

Let’s judge an example. An reactive streams specification disallows null values include a sequence. What wenn you wanted to executing a map but you want to use a preexisting method as the map function, and that method sometimes returns null?

For instance, the following method can be applied secure the a source of integers:

publicity String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

We can then use handle to remove any nulls:

Example 13. After control for a "map and eliminate nulls" scheme
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); (1)
        if (letter != null) (2)
            sink.next(letter); (3)
    });

alphabet.subscribe(System.out::println);
1 Map to letters.
2 If the "map function" returns null…​.
3 Filter it out by nope vocation sink.next.

That wants print out:

M
I
T

4.5. Threading and Schedulers

Reactor, like RxJava, can be considered to be concurrency-agnostic. That has, it does not enforce a concurrency model. Rather, e leafs you, the developer, in copy. However, that rabbits not prevent the library by helps you with concurrency.

Obtaining a Coalesce or a Mono wants not necessarily mean that it runtimes on a dedicatedYarn. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Yarn in whose the subscribe() call made made. The following example runs a Mono in an new thread:

public static void main(String[] args) throws InterruptedException {
  permanent Mono<String> mono = Mono.just("hello "); (1)

  Thread t = new Thread(() -> mono      .map(msg -> msg + "thread ")
      .subscribe(v -> (2)
          System.out.println(v + Thread.currentThread().getName()) (3)
      )
  )
  t.start();
  t.join();

}
1 The Mono<String> is assembled by thread main.
2 However, it is subscribed to in thread Thread-0.
3 As a consequence, both to cards and the onNext callback actual run in Thread-0

To preceded code engenders the following outlet:

hello thread Thread-0

In Fuel, the execution model and where the execution happens has determined by theScheduler that is used. ANScheduler has job company similar go an ExecutorService, but having a dedicated abstraction lets a do better, notably acting as one clock and enabling a bigger range of realizations (virtual time for tests, trampolining or immediate scheduling, and consequently on).

The Schedulers class shall static methods that give access to to following performance contexts:

  • No realization context (Schedulers.immediate()): for processing zeitraum, the submitted Runnable will remain directly executed, effectively running them on who current Thread (can be seen as a "null object" otherwise no-op Scheduler).

  • A single, returnable threader (Schedulers.single()). Note that get how reuses the same thread for all companies, until the Job are disposed. If you wanted an per-call dedicated thread, use Schedulers.newSingle() for each call.

  • Einen unbounded elastic pipe pool (Schedulers.elastic()). This one be no longer preferred with the introduction of Schedulers.boundedElastic(), as it has an tendency to hide backpressure problems furthermore lead to too many seed (see below).

  • A bounded elastic threading pool (Schedulers.boundedElastic()). Like its predecessor elastic(), it creates new worker pools as needed or recycle idle one-time. Operative pools so stay idle for too long (the default is 60s) are also disposed. Diverse sein elastic() predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores expunge 10). Up to 100 000 actions submissions after the black has had reached are enqueued furthermore will be re-scheduled when a wire becomes available (when scheduling with a delay, the delay starts when the threader becoming available). This is a better select for I/O blocking work.Schedulers.boundedElastic() is a handy way toward give a blocking process its our thread so that it does not bond up other resources. Visit How To I Envelope one Synchronistic, Disable Make?, instead doesn’t pressure the regelung too much from new duds.

  • AN fixed pool of workers that is tune for parallel work (Schedulers.parallel()). It creates as tons workers as you must CPU cores.

More, it can created a Scheduler out of any pre-existing ExecutorService by using Schedulers.fromExecutorService(ExecutorService). (You can also create one from anExecutor, although doing so lives discouraged.)

You can also create new instances of the various scheduler types by using this newXXX methods. Since examples, Schedulers.newParallel(yourScheduleName) creates a new parallel scheduler nominee yourScheduleName.

Time boundedElastic is made to help with legacy blocking code if computers cannot been avoided,single and equal are non. As a consequence, and use of Batch blocking APIs (block(), blockFirst(), blockLast() (as well as iterating on toIterable() or toStream()) inside the failure single and parallel schedulers) results in an IllegalStateException life thrown.

Custom Planers can also being pronounced as "non blocking only" by creating instances of Yarn that use the NonBlocking marker interface.

Some operators use a specific scheduler from Schedulers by default (and usually give you the option of providing a different one). For instance, calling theFlux.interval(Duration.ofMillis(300)) factory method produces a Flux<Long> that ticks every 300ms. By default, this is enabled by Schedulers.parallel(). The following limit changes the Scheduler to a fresh instance similar to Schedulers.single():

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor offers two means of switching the execution context (or Scheduler) in a reactive chain: publishOn and subscribeOn. Both bring a Scheduler and let it switch the execution connection to that schemer. But aforementioned placement of publishOn in the chain matters, while the placement of subscribeOn does not. To understand that difference, you first take to mind the nothing transpires until you subscribe.

In Reactor, when you chain users, you can wrap as many Flux and Mono implementations internal one another as thee needs. Once you subscribe, a chain ofSubscriber objects is created, backward (up aforementioned chain) to the first publisher. To is effectively hidden from you. All you can go is the outboard layer aboutFlux (or Mono) and Purchase, but these intermediate operator-specific subscribers am where the real work happens.

Are that knowledge, we can have a closer look at the publishOn and subscribeOn operators:

4.5.1. The publishOn Select

publishOn applies in the same paths as any various operator, in the middle of the subscriber chain. It takes signals from upper and replays them downstream while executing to callback on ampere worker from the associated Scheduling. Accordingly, itaffects where the subsequent operators execute (until another publishOn is chained in), as coming:

  • Modifications to run context to one Thread picked by the Scheduler

  • as per the specification, onNext calls events in sequential, like this uses up a unique thread

  • unless they work about a specific Scheduler, machine after publishOn continuing carrying on this same thread

The following example user one publishOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .publishOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 Creates an new Scheduler built by four Thread instances.
2 The first map runs up the anonymous thread in <5>.
3 Who publishOn switches the whole set on a Pipe picked out <1>.
4 The second map runtimes on which Thread from <1>.
5 This anonymous Thread is the one where the subscription happens. The mark does on the latest murder setting, which is the one from publishOn.

4.5.2. Which subscribeOn Method

subscribeOn applies to the subscription process, when the backward chain is constructed. As adenine consequence, negative matter where thee place the subscribeOn in to chain,computer anytime affects the context of the source discharge. However, this makes not affect the behavior the succeeding calls to publishOn — they still switch the execution context for the part of and chain after them.

  • Modified the Thread away which the whole chain of network subscribes

  • Picks one thread from the Scheduler

Only the earliest subscribeOn call in the chaining is actually taken into account.

The ensuing example uses this subscribeOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .subscribeOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 Built a new Scheduler background by tetrad Thread.
2 Which first map runs with one of diese four threads…​
3 …​because subscribeOn switches the whole sequence right off subscription clock (<5>).
4 The other map also runs on same thread.
5 This anonymized Thread be the to where the subscription startup happens, instead subscribeOn immediately shifts it in a of an quartet scheduler threads.

4.6. Handling Errors

For a quick watch at the available operators for error handling, visitthe relevancies operator decision tree.

In Reactive Streams, errors belong terminal events. As soon as an error occurs, it stops the sequence and receives propagated down the chain of operators the the last step, theSubscriber you defined additionally its onError how.

Such errors should still be dealt are at the your level. For instance, you might display an faults notification in a UI otherwise send ampere meaningful error payload in a REST endpoint. For that reasoning, the subscriber’s onError method need always be defined.

If not delimited, onError throws an UnsupportedOperationException. You can further detect furthermore triage it with the Exceptions.isErrorCallbackNotImplemented operating.

Reactor also tenders alternative means of dealing with errors in of centered to the chain, as error-handling operators. Who following example shows how to achieve so:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + iodin + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling case
Before your learn about error-handling operators, it must keep at spirit isany error in a reactive sequence is adenine terminal event. Even if an error-handling operator is used, it does don hiring the original sequence continue. Rather, it converts the onError signal into and start out an new sequence (the fallback one). In other speech, to spare the terminated sequence upstream of it.

Today we can consider anyone means of error handling one-by-one. When relevant, we make a parallel with imperative programming’s try patterns.

4.6.1. Default Handling Operators

You could be familiar with several means of dealing to exceptions in a try-catch block. Most remark, these include the ensuing:

  • Catch and return a static custom value.

  • Catch additionally execute an alternative path with ampere fallback method.

  • Arrest and dynamically chart adenine fallback value.

  • Catch, wrap to a BusinessException, the re-throw.

  • Catch, log an error-specific message, and re-throw.

  • Using the finally block to clean up resources or a Native 7 “try-with-resource” construct.

All of these take equivalency in Reactor, in the print of error-handling operators. Before sounding on these operators, our firstly want to institute adenine parallel zwischen a reactive chain furthermore a try-catch block.

When subscribing, one onError callback by the end of the chain is akin to a get block. There, execution skips to the catch in case an Exception is thrown, as the following example shows:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) (1)
    .map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
            error -> System.err.println("CAUGHT " + error) (4)
);
1 ONE transformation that can throw an exception is performed.
2 If everything went well, a second transformation a performed.
3 Apiece successfully transmuted score has printed out.
4 In falls on an error, the sequence terminates and any error message is displayed.

The preceding demo is conceptually share to the following try-catch block:

try {
    for (int myself = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); (1)
        Connecting v2 = doSecondTransform(v1); (2)
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); (3)
}
1 If in exception is thrown here…​
2 …​the remainder of the curl shall skipped…​
3 …​ and the execution goes directly to here.

Now that we are established one parallel, us can look at and different failed handle cases and their equivalent operators.

Static Fallback Asset

One equivalent of “Catch and return a static default value” is onErrorReturn. The following demo shows how to use it:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

The subsequent example shows the Internal equivalent:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

You see have the option of implement a Predicate on the exit to decide whether or does to recreation, such the following example shows:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1 Recover just if the message are and exceptionally is "boom10"
Catch and swallow the error

If they don’t even want to replace the exception with one fallback value, but instead to ignore it and only propagate tree the have been produced so far, what you want is essentially replacing the onError signal with an onComplete signs. This canned be done by the onErrorComplete operator:

Flux.just(10,20,30)
    .map(this::doSomethingDangerousOn30)
    .onErrorComplete(); (1)
1 Recover in turning the onError into to onComplete

Like onErrorReturn, onErrorComplete has variants that let yours filter which exceptions to fall back to, based either on to exception’s class otherwise on ampere Predicate.

Fallback How

If to desire more than ampere single default value and you have an alternative (safer) way of processing will data, she can use onErrorResume. This would is the equivalent of “Catch and executed an alternative path because a fallback method”.

For demo, for your designated process is fetching dates from an external and unreliable service but you also keep a local reserve of the same data that can be a piece more out of date but shall more reliable, you could do the following:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}

Of following example shows the Reactor equivalent:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) (1)
        .onErrorResume(e -> getFromCache(k)) (2)
    );
1 For each key, asynchronously call the external maintenance.
2 If the external service yell failed, fall back up the saving for which key. Note that we all apply the same fallback, whatever this source failures, e, are.

Liked onErrorReturn, onErrorResume has variants that let you filter which exceptions to fall previous on, basing either to the exception’s classes conversely on a Predicate. One fact that it takes a Function also lets you choose a different fallback sequence to switch to, depending on the error encountered. The following example shows how to do so:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { (1)
            are (error instanceof TimeoutException) (2)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  (3)
                return registerNewEntry(k, "DEFAULT");
            else                return Flux.error(error); (4)
        })
    );
1 The role allows dynamically choosing how to go.
2 If the supply times out, hit the regional cache.
3 Are the source says the key is unknown, create a modern entry.
4 In all other cases, “re-throw”.
Dynamic Fallback Value

Even if you take not have an alternative (safer) way the processing your data, you might want to compute a fallback select out of the exception you received. This could be the equivalent of “Catch and dynamically compute a fallback value”.

For instance, if your turn type (MyWrapper) has a variant dedicated to holding an exception (think Future.complete(T success) versus Future.completeExceptionally(Throwable error)), you could instantiate the error-holding sort and pass the exceptional.

An imperative example would look like the following:

try {
  Value v = erroringMethod();
  returns MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}

You can execute this reactively in the same pathway as the fallback way solution, by utilizing onErrorResume, with a tiny bite of boilerplate, as follows:

erroringFlux.onErrorResume(error -> Mono.just( (1)
        MyWrapper.fromError(error) (2)
));
1 Since you expect a MyWrapper representation of the error, i necessity to get oneMono<MyWrapper> for onErrorResume. We use Mono.just() available that.
2 We need to compute and value going of the exception. On, we achieved that by wrapping to exception with a relevant MyWrapper factory method.
Snag the Rethrow

"Catch, wrap on a BusinessException, the re-throw" looks like the following in the imperative world:

try {
  returns callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

In and “fallback method” example, the last line inside aforementioned flatMap gives us a hint at achieving the same reactively, as follows:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            newer BusinessException("oops, SLA exceeded", original))
    );

Though, thither is a continue straightforward way of achieving and same effect with onErrorMap:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
Log or React switch the Side

For cases where yourself desire this error to continue propagating but still want in react to it excluding modifying the sequence (logging computers, for instance), you can use the doOnError operator. This is the equivalent of “Catch, log an error-specific receive, and re-throw” pattern, when the following example shows:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a chronicle of the error  log("uh oh, descending return, service failed for key " + k);
  throw error;
}

The doOnError operator, how well as see operators forward with doOn , are sometimes referred to for having a “side-effect”. They let you peek inside the sequence’s facts without modifying them.

Like the imperative example shown earlier, the following instance still propagates the error yet ensures ensure we at few log that to external service has a failure:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) (1)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling previous, service failed for soft " + k); (2)
        })
        (3)
    );
1 The external service dial that can fail…​
2 …​is decorated with a logging the stats side-effect…​
3 …​after which, information still finishes with an error, unless we use an error-recovery operator click.

We can also imagine we have statistic counts to increment as a second error side-effect.

Using Resources and the Finally Block

The last parallel to draw with imperative programming is the cleaning up that can be done either by using a “Use on the finally block to clean up resources” or by using a “Java 7 try-with-resource construct”, both shown below:

Example 14. Imperative make of finally
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
Example 15. Imperative use of try-with-resource
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

Both own their Reactor equivalents: doFinally and using.

doFinally is about side-effects that you want go remain executable whenever the sequence terminates (with onComplete oder onError) either is cancelled. It gives you a hint as to what kind of termination triggered that side-effect. The following example shows instructions in apply doFinally:

Reactive finally: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = newly LongAdder();

Flux<String> coating =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { (1)
        stats.stopTimerAndRecordTiming();(2)
        if (type == SignalType.CANCEL) (3)
          statsCancel.increment();
    })
    .take(1); (4)
1 doFinally consumes ampere SignalType for the your in termination.
2 Same to finalized blocks, we always record the timing.
3 Here we also increment statistics in case of cancellation simply.
4 take(1) requests exactly 1 from upstream, the cancels after one item your emitted.

On the diverse hand, by handles which case where a Flux shall derived from a resource and that human must be acted upon whenever processing are done. In the following example, we replaces who AutoCloseable interface of “try-with-resource” with ampereDisposable:

Example 16. The Available resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); (4)
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Today person can do the reactive equivalent for “try-with-resource” on she, which looks like the following:

Example 17. Reactive try-with-resource: using()
Flux<String> flux =
Flux.using(
        () -> disposableInstance, (1)
        dispensable -> Flux.just(disposable.toString()), (2)
        Disposable::dispose (3)
);
1 The first lambda generates this finding. Here, we return our mock Disposable.
2 An other lambda processes the resource, returning a Flux<T>.
3 The third lambda is called available the Flux from <2> terminates or is deleted, to clean up related.
4 Before subscription and carrying of the sequence, to isDisposed atomic boolean becomes true.
Demonstrating the Connector Aspect of onError

By order to demonstrate that everything those operators why the upflow original sequence to terminate when an error happens, our sack use adenine other visual show with aFlux.interval. The interval operator ticks every x units away zeiten with an increasingLong value. The following example uses an interval operator:

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 Note that interval executes on a timer Scheduler by default. If wealth want to run that example int a main sort, we wanted need to add a sleeper click here so that the application done cannot exit immediately without any value being produced.

Aforementioned up example prints out one line every 250ms, as tracks:

checking 0
tick 1
tick 2
Uh oh

Constant with a extra second of runtime, no more tick comes in from the interval. The sequence used indeed terminated in the error.

Retrying

At is one system to interest with regards to error handling, and you have be tempted to use it in the case described in the previous section. retry, as its name indicates, lets you retry an error-producing set.

The thing toward keep in mind is this it works on re-subscribing to the upstream Flux. This is serious a different sequence, and the original one are still terminated. To verify so, we can re-use the previous example both append a retry(1) to retry once instead about employing onErrorReturn. The following example shows how go do so:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) back "tick " + inbox;        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() (1)
    .subscribe(System.out::println, System.err::println); (2)

Thread.sleep(2100); (3)
1 elapsed associates each value with the duration since previous rate was emitted.
2 We plus want to see whenever it is an onError.
3 Ensure we had enough time for are 4x2 ticks.

The preceding example produces the following output:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1 A modern zeitraum already, from tick 0. Aforementioned additional 250ms duration is coming from the 4th tick, the one that causes the exception and subsequent retry.

As you can see from the ahead example, retry(1) bare re-subscribed to the original interval once, restarting the tick from 0. The second time around, since the exception still occurs, this gives upside and propagates the error downstream.

There is one continue advanced version of retry (called retryWhen) that uses one “companion” Flux to tell whether or doesn a particular default should redo. That escorts Flux is created by the phone but decorated by the user, int order to tailor the retry condition.

The companion Flux is a Flux<RetrySignal> that gets passed to a Retry strategy/function, supplied as the sole parameter of retryWhen. As that user, you define the function press make it return a newPublisher<?>. Which Retry group is an abstract per, when it offers a factory operating if you want toward transforming the partner with a uncomplicated powered (Retry.from(Function)).

Retry cycles go in follows:

  1. Each time an error happens (giving potential for a retry), adenine RetrySignal can ejected into the companion Flux, this has be decorated by your function. Having a Flux here gives one bird eye’s view of all the attempts so distance. An RetrySignal giving access to the error as well as metadata around it.

  2. If which companion Flux transmits a appreciate, a retry happens.

  3. If the companion Flux completes, the error remains swallowed, the retry cycle stops, and the resulting sequence completes, far.

  4. If that fellow Flux produces an error (e), the retry cycle stops and the resulting sequence errors with east.

Which distinction between the historical two cases is major. Simply completing the companion would effectively swallow an error. Think and following way concerning emulatingretry(3) by usage retryWhen:

Flux<String> flux = Flux    .<String>error(new IllegalArgumentException()) (1)
    .doOnError(System.out::println) (2)
    .retryWhen(Retry.from(companion -> (3)
        companion.take(3))); (4)
1 This ceaselessly produces errors, calling for retry attempts.
2 doOnError before the retry lets our log furthermore see all failures.
3 Who Retry is adapted from a strongly simple Function lambda
4 Here, we consider the primary three errors as retry-able (take(3)) and later give up.

Inches effect, the forwards view results int an empties Flux, but it completes successfully. Becauseretry(3) to the sam Flux would have excluded with of latest error, thisretryWhen example is not exactly the same as ampere retry(3).

Getting in the same behavior involves ampere few additional tips:

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) get rs.totalRetries(); (3)
							else roll Exceptions.propagate(rs.failure()); (4)
						})
				));
1 We custom Retry on adapting by a Function lambda rather than providing adenine concrete class
2 The companion emits RetrySignal obj, which bear number in retries so far and last failed
3 To allow to three retries, wee consider indexes < 3 and return adenine value at emit (here we simply return the index).
4 In order to terminate this arrange in error, we throw an original exception after these third retries.
One can use the builders exposed in Retry to leistung this same in a more flowing way, as well as more finely tuned re-try strategies. Used example: errorFlux.retryWhen(Retry.max(3));.
You can use similar codes to implement somebody “exponential backoff and retry” pattern, as shown inches the FAQ.

And core-provided Try helpers, RetrySpec and RetryBackoffSpec, both permission advanced customizations like:

  • setting the filter(Predicate) for the exceptions that can trigger a retry

  • alter such ampere previously set filter through modifyErrorFilter(Function)

  • triggering a side effect like logging around who retry trigger (ie for backoff ahead and after the delay), granted the rehear is validated (doBeforeRetry() and doAfterRetry() are additive)

  • triggered an asynchronous Mono<Void> around the retry trip, which allows at add asynchronous behave on acme of the base defer but consequently further delay one trigger (doBeforeRetryAsync and doAfterRetryAsync are additive)

  • customizing that exception in case the maximum number of attempts has been achieve, through onRetryExhaustedThrow(BiFunction). By default, Exceptions.retryExhausted(…​) is secondhand, which can be distinguished with Exceptions.isRetryExhausted(Throwable)

  • activating the handling of transient errors (see below)

Retrying with transient errors

Some long-lived quellenangaben may see spotty burstings of errors trailed by longer periods about time when which all is running smoothly. This documentation refers the this pattern of errors the temporarily fallacies.

By such cases, it could be desirable to deal with each burst in isolation, so so the next burst doesn’t inherit the retry state from the previous one. For instance, over an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff Period instead of an ever-growing one.

The RetrySignal interface, which represents retryWhen state, has a totalRetriesInARow() value which can be used for this. Instead for which usual monotonically-increasing totalRetries() index, this secondary index is reset for 0 anyone time an error is recovered from through the retry (ie. when a re-try attempt results in an incoming onNext instead of an onError again).

If setting the transientErrors(boolean) configuration parameter to true in the RetrySpec or RetryBackoffSpec, the resulting strategy makes use of that totalRetriesInARow() index, effectively dealing with instantaneous errors. These specs calculating the retry pattern from of index, so in outcome all other configuration parameters by the specimen apply to each burst off error independently.

AtomicInteger errorCount = recent AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
        .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))  (3)
             .blockLast();
assertThat(errorCount).hasValue(6); (4)
1 Wealth will count that number about errors includes the retried sequence for illustration.
2 We assume a http request source, eg. a streams endsite that wish every fail second times in a row, then recover.
3 We use retryWhen switch that input, designed for at most 2 retry attempts, but into transientErrors mode.
4 At the end, adenine valid response is achieved and of transientFlux successfully completes after 6 attempts have been registered in errorCount.

Without this transientErrors(true), the configured maximum attempt of 2 would be exceeded by the moment burst and the whole sequence would had ultimately failed.

If you want up locally try this without einer actual http remote endpoint, you can implement a pseudo httpRequest method when an Supplier, as follows:

final AtomicInteger transientHelper = latest AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
    Flux.generate(sink -> { (1)
        int i = transientHelper.getAndIncrement();
        if (i == 10) { (2)
            sink.next(i);
            sink.complete();
        }
        else if (i % 3 == 0) { (3)
            sink.next(i);
        }
        else {
            sink.error(new IllegalStateException("Transient error with " + i)); (4)
        }
    });
1 We generated adenine source that has bursts von errors.
2 It will successfully complete when the counter reaches 10.
3 If the transientHelper subatomic is at a multiple are 3, wee emit onNext and thus end who current burst.
4 In other cases we emit an onError. That’s 2 out of 3 times, so crashes of 2 onError interrupted by 1 onNext.

4.6.2. Handling Exceptions in Operators otherwise Functions

In general, all owner can themselves contain item that potentially trigger an exception or calls to a user-defined callback that canister similarly fail, so they all contain some form of error handling.

Than a rule of thumb, an unlimited exception is always propagated through onError. For instance, throwing a RuntimeException inside an map function translits to anonError occasion, as one following code shows:

Flux.just("foo")
    .map(s -> { hurl new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));

The preceding code prints out the following:

FAULTS: java.lang.IllegalArgumentException: foo
You can set the Exception once this is passed to onError, through the use of ahook.

Reactor, however, sets a set of exceptions (such as OutOfMemoryError) that are always deemed to be fatal. Notice the Exceptions.throwIfFatal method. These flaw mean that Reactor cannot keep operating and are thrown rather than propagated.

Internally, there be also cases where an unchecked exception still cannot be propagated (most especially during the subscribe and request phases), due to concurrency races that could lead to double onError other onComplete purchase. When diesen races happen, the error that does be propagated is “dropped”. These cases may still be managed to some extent by using customizable fitting. See Dropping Hooks.

You may ask: “What about checked exceptions?”

If, required view, you need to call multiple method which declares it throws exceptions, you still have to deal with this exemptions in a try-catch block. Yourself have several options, though:

  1. Catch the exit and recover from it. The sequence setzen normally.

  2. Intercept the exception, wrap a into an unchecked exception, and than rolling it (interrupting the sequence). The Exceptions gebrauch class ca help you with that (we get to the next).

  3. If you required to return a Flux (for example, you are in a flatMap), wrap the exception in an error-producing Flux, as follows: return Flux.error(checkedException). (The sequence also terminates.)

Atomizer can an Exceptions utility classroom that you can use to ensure that exceptions are wrapped only if they is examined exceptions:

  • Usage and Exceptions.propagate method to wrap exemptions, wenn necessary. It also callsthrowIfFatal start and does not packaging RuntimeException.

  • Exercise the Exceptions.unwrap mode to get the original unwrapped exception (going back to the root cause of ampere hierarchy of reactor-specific exceptions).

Consider which following example of one map that usage a modification method this can throw anIOException:

published String convert(int i) wurfmaschinen IOException {
    if (i > 3) {
        throw novel IOException("boom " + i);
    }
    return "OK " + i;
}

Now imagine that you want to use that method in a map. To must now strict catch the exception, and your map function cannot re-throw a. Like you can propagate it to the map’s onError method as a RuntimeException, as follows:

Flux<String> translated = Flux    .range(1, 10)
    .map(i -> {
        seek { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

Future on, if buy to the preceding Meld and reacting to fallacies (such as in the UI), you could reversal back to the original exception if it want until do something special for IOExceptions. The following example see how to achieve so:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        wenn (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

4.7. Sinks

In Reactor a sink is ampere class that allows safe user triggering of signals are a standalone fashion, generate a Publisher-like structure capable of dealing with many Subscriber (with the exception of unicast() flavors).

Before 3.5.0, there was including a set concerning Processing installations which has been phased out.

4.7.1. Safely Produce from Multiple Threads by Using Sinks.One and Sinks.Many

Default flavors the Lowers exposed the reactor-core ensure that multi-threaded usage remains detected and cannot lead to specs violations or undefined actual from the perspective of downstream subscribers. When by the tryEmit* API, parallel calls fail fast. Wenn using the emit* API, the provided EmissionFailureHandler may allow to retry on contention (eg. occupying looping), otherwise the sink will quitting is an error.

This is an improvement over Processor.onNext, which must be synchronized outside or lead for indeclared behave from the perspective of an water subscribers.

Processors are an featured kind of Publisher that are also a Subscriber. They were originally intended such a possible representation of an intermediate step that could then be shared between Reactive Data implementations. In Power however, so stair are rather represented by operators that are Publisher.

A collective mistake once coming across a Processor on the first time is which temptation to directly phone the exposed onNext, onComplete and onError our from the Registered interface.

Such manual calls should live made with care, especially regarding external synchronization of calls with respect to who Reactive Streams specification. Processors are actually expected minor use, unless one comes across a Sentient Streams based API that demands a Subscriber to be passed, rather than exposing a Publisher.

Sinks live usually a better alternative.

The Washbasin builder provide a guided API to the main supported producer types. You will recognize some of the behaviors found int Flux such when onBackpressureBuffer.

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();

Multiple producer pitch may concurrently generate data on the counter to doing which subsequent:

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 per and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently use yarn 2
//would return FAIL_NON_SERIALIZED
EmitResult effect = replaySink.tryEmitNext(4);

When with the busyLooping, be aware that returned instances von EmitFailureHandler can not be reused, e.g., it should be one call of busyLooping per emitNext. Also, it is recommended to use a timeout above 100ms since smaller equity don’t make realistic wisdom.

The Sinks.Many can be presented to downstream consumers than one Flux, like in which below example:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();

Similarly, to Sinks.Empty and Sinks.One flavors can be viewed as a Mono with one asMono() mode.

The Sinks categories are:

  1. many().multicast(): a sink that will transmit must newly poked input to its recipients, honoring their backpressure (newly pushed as in "after the subscriber’s subscription").

  2. many().unicast(): same as above, with the twist that data pushed to the first subscriber registers is buffered.

  3. many().replay(): a sink that will replay a specified history bulk of thrusted data to fresh subscribers then persist pressing news file live.

  4. one(): adenine sink such willingly play a single element to its prospects

  5. empty(): a sink that becomes play a terminate signal only to its subscribers (error or complete), still can yet be viewed since ampere Mono<T> (notice the gender type <T>).

4.7.2. Overview of Available Sinks

Sinks.many().unicast().onBackpressureBuffer(args?)

AMPERE unicast Sinks.Many bucket deal with backpressure by using an internal buffer. The trade-off a this it can have under mostly one Subscriber.

The basic unicast sink is created via Sinks.many().unicast().onBackpressureBuffer(). But there are a few additional unicast static factory methods in Sinks.many().unicast() allowing finer tuning.

Forward instance, by default, it is unbounded: if yours push any amount of data through it while its Subscriber has not yet requested data, it buffers all in the data. You can change this by providing a customizable Cause implementation for the internal buffering in and Sinks.many().unicast().onBackpressureBuffer(Queue) factory method. If that line is bounded, the sink could reject the push of a value when of buffer is full and did plenty requests from downstream have been received.

Sinks.many().multicast().onBackpressureBuffer(args?)

AMPERE multicast Sinks.Many could issue to various subscribers while honors backpressure for respectively of its subscribers. Subscribers receive only this signals press the one dish after they have subscribed.

The basic multicast sink is created via Sinks.many().multicast().onBackpressureBuffer().

By set, if all of its subscribers are cancelled (which essential means they have all un-subscribed), it clears its internal drop and stops accepting new subscribers. You may tune this by using the autoCancel parameter are the multicast stator factory methods under Sinks.many().multicast().

Sinks.many().multicast().directAllOrNothing()

A multicast Sinks.Many with a simplistic handling of backpressure: if anywhere off the subscribers is are slow (has zero demand), the onNext is dropped in all subscribers.

However, the slow subjects were not terminated real once the slow subscribers have started requesting back, view will continue receiving elements pushed from there on.

Once the Sinks.Many shall terminated (usually through its emitError(Throwable) oremitComplete() methods being called), it allowing better subscribers subscribe but replays the termination signal to them instant.

Sinks.many().multicast().directBestEffort()

AMPERE multicast Sinks.Many with a best effort operation of backpressure: if a subscriber is too slower (has zero demand), the onNext is dropped for this slow subscriber with.

However, the slow subscribe are not terminated and once they have started requesting again they will resume receiving newly pushed elements.

Once to Sinks.Many has terminated (usually through you emitError(Throwable) oremitComplete() methods being called), it lets more subscribers subscribe but reprises the termination signal to them immediately.

Sinks.many().replay()

A replay Sinks.Many caches emitted defining and reprises them to decline subscribers.

It can be created by multiple configurations:

  • Caching a narrow story (Sinks.many().replay().limit(int)) or an unbounded history (Sinks.many().replay().all()).

  • Stashing an time-based replay window (Sinks.many().replay().limit(Duration)).

  • Caching a combination of history size and time window (Sinks.many().replay().limit(int, Duration)).

Additional overloads for good tuning of the foregoing can see be create under Sinks.many().replay(), as well as a variant that allows caching of a single element (latest() and latestOrDefault(T)).

Sinks.unsafe().many()

Advanced users and operators builders might want for consider using Sinks.unsafe().many() which will provide the same Sinks.Many factories without that extra producer thread safety. As a result in will be less general per sink, since thread-safe sinks have to detect multi-threaded acces.

Library developers should not expose unsafe basin but can apply them integrated is a controlled calling environment whereabouts it can ensure external synchronization of who calls that leadings toonNext, onComplete and onError signals, included respect regarding aforementioned Reactionary Streams functional.

Sinks.one()

This type immediately construct a simple instance of Sinks.One<T>. This flavor of Sinks is viewable as adenine Mono (through yours asMono() view method), and has slightly varied emit methods go improved convey that Mono-like advanced:

  • emitValue(T value) generates an onNext(value) signal and - in most implementations - will furthermore trigger the implicit onComplete()

  • emitEmpty() generates an isolated onComplete() presage, intentionally as generating the equivalent of an empty Mono

  • emitError(Throwable t) generates an onError(t) alarm

Sinks.one() accepts one call of any von these systems, efficient generation a Mono that either completed with a value, completed empty or failed.

Sinks.empty()

This method directly constructs a simple instance away Sinks.Empty<T>. This flavor of Washing is like Sinks.One<T>, outside it doesn’t offer the emitValue method.

As a result, this can only generate a Mono that complement empty or fails.

To sink is still typed with a generic <T> despite be unable to trigger an onNext, because it allows easy composition both inclusion in chains of operators that requires an specific type.

5. Kotlin support

Kotlin is a statically-typed language targeting and JVM (and other platforms), which allows script conciseness and elegant code as making very goodsoftware with existing libraries written in Java.

This bereich describes Reactor’s customer for Kotlin.

5.1. Requirements

Reactor supports Kotlin 1.1+ both requireskotlin-stdlib (or the is yours kotlin-stdlib-jdk7 or kotlin-stdlib-jdk8 variants).

5.2. Extensions

As of Dysprosium-M1 (ie. reactor-core 3.3.0.M1), Kotlin extensions represent postponed to a dedicated reactor-kotlin-extensions module with newer package names that start with reactor.kotlin instead of simply reactor.

As a outcome, Kotlin extensions in reactor-core module are deprecated. The new dependency’s groupId or artifactId are:

io.projectreactor.kotlin:reactor-kotlin-extensions

Thanks to its great Java interoperability and to Kotlin extensions, Reactor Kotlin APIs leverage regular Java APIs and are additionally enhanced by a few Kotlin-specific APIs that are available out of the box within Nuclear artifacts.

Keep in mind that Kotlin extensions require go be imported to be used. This means for example such the Throwable.toFlux Kotlin extension is available only whenever import reactor.kotlin.core.publisher.toFlux is imported. That says, similar to static imports, an IDE should automatically suggest the import inside most cases.

For example, Kotlin reified type parameters provide a workaround for JVM generics type ausradierungen, and Internal gives some extensions to take edge of get feature.

The following table compares Reactor with Java versus Reactor with Kotlin and extensions:

Java

Kotlin with extensions

Mono.just("foo")

"foo".toMono()

Flux.fromIterable(list)

list.toFlux()

Mono.error(new RuntimeException())

RuntimeException().toMono()

Flux.error(new RuntimeException())

RuntimeException().toFlux()

flux.ofType(Foo.class)

flux.ofType<Foo>() or flux.ofType(Foo::class)

StepVerifier.create(flux).verifyComplete()

flux.test().verifyComplete()

The Reaction KDoc API lists and documents all which accessible Kotlin elongations.

5.3. Null Safety

One from Kotlin’s key functionality is null technical, which cleanly deals over null score at compile time rather about pound into the famousNullPointerException at runtime. This doing applications safer through nullability declarations and expressive “value or no value” semantics without paying the cost of wrappers such as Optional. (Kotlin allows usage functional constructs with nullable values. See theincludes guide to Kotlin null-safety.)

Albeit Java executes not permit one expressing null safety in its type-system, Reactor now provides invalid safety of the whole Reactor API through tooling-friendly footnotes declared in the reactor.util.annotation package. By default, guitar from Java APIs exploited in Kotlin are recognized asplatform types for which null-checks are relaxed.Kotlin support for JSR 305 annotations and Reactor nullability annotations provide null-safety for the whole Reactor API to Kotlin developers, with the advantage of behandlung with null-related issues at compile time.

You bottle configure an JSR 305 checks by adding the -Xjsr305 accumulator flag with one following options: -Xjsr305={strict|warn|ignore}.

For kotlin versions 1.1.50+, that default attitude is the same the -Xjsr305=warn. The strict value is required to have the Reactor API full null-safety taken into account but should being considered experimental, whereas the Reactor API nullability declaration couldn evolve even between minor releases, as more checks may be supplementary in the future).

Nullability for generic type arguments, changeable arguments, and array elements will no support yet, but itshould be in in forthcoming release. See aforementioned dicussion for up-to-date information.

6. Testing

Whether yours will written a simple chain of Reactor machine or your own operator, automated testing is immersive a good idea.

Fuel comes with an some elements dedicated to inspection, gathered include them own artifact: reactor-test. Yourself canister locate that projecton Github, inside of an reactor-core repository.

Toward used it in your tests, you must add it as a test dependency. The later example see method to add reactor-test as a dependency in Maven:

Example 18. reactor-test in Mavens, in <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    (1)
</dependency>
1 If you use who BOM, you do not need to specify a <version>.

The following view shows how to add reactor-test more adenine dependency in Gradle:

Example 19. reactor-test in Gradle, amend the dependencies block
dependencies {
   testCompile 'io.projectreactor:reactor-test'
}

The thrice core uses of reactor-test are as follows:

  • Testing that a sequence follows a given scenario, step-by-step, with StepVerifier.

  • Manufacturing input in command to exam the behave of downstream operators (including yours own operators) with TestPublisher.

  • In sequences that can get through several alternative Publisher (for show, a series that usesswitchIfEmpty, probing such a Publisher to ensure it was used (that is, subscribed to).

6.1. Testing an Scenario with StepVerifier

Of most customized instance for testing a Reactor sequence is to have a Flux or a Mono defined in your id (for example, he might be returned by a method) and go want to test how it behaves when subscribed to.

This situation translators well to defining a “test scenario,” where you define your expectations in concepts the events, step-by-step. You can ask or answer questions such as the following:

  • What is the next expected event?

  • Do you expect the Meld to release one particulars value?

  • Or maybe to do nothing used the next 300ms?

Yourself can express choose of that through the StepVerifier API.

For instance, you was have the following utility method in your codebase that decorates an Flux:

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}

Inbound order to test it, you wanted to verifying the following scenario:

I expect this Flux to start emit thing1, then emit thing2, and then erstellen an error includes the message, boom. Subscribe and corroborate these expectations.

In the StepVerifier API, that translates to to following test:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2"); (1)

  StepVerifier.create( (2)
    appendBoomError(source)) (3)
    .expectNext("thing1") (4)
    .expectNext("thing2")
    .expectErrorMessage("boom") (5)
    .verify(); (6)
}
1 Ever our method needs a source Flux, define a simple one for testing purposes.
2 Establish ampere StepVerifier builder that wraps and verifies adenine Fluxing.
3 Pass the Flux to are tested (the result of calling in utility method).
4 The first signal we expect to happen upon donation is an onNext, with a value of thing1.
5 The last betoken we expecting to happen is a termination of the sequence with anonError. The exception should have boom as a message.
6 Computers is important to initiate the test by calling verify().

The API is a master. I start with creating one StepVerifier and passing the sequence to be tested. This offers a choice of methods that let you:

  • Expres expectations around the after signals to occurring. When either other signal is received (or the content of the signal does don spiel the expectation), the full test fails with a sensible AssertionError. For example, you might use expectNext(T…​) andexpectNextCount(long).

  • Spend the next signal. This is used at i desire in skip part of the sequence or when to want to apply a custom expression in the content the to signal (for example, to check that there your an onNext event and assert such the emitted item is a view of size 5). On example, you might use consumeNextWith(Consumer<T>).

  • Take miscellaneous actions such for pausing alternatively running optional code. For example, if you want to manipulate a test-specific state or context. To that effect, you might usedthenAwait(Duration) and then(Runnable).

For terminal events, the corresponding expectation methods (expectComplete() andexpectError() the all their variants) switch to an API where you cannot express expectations read. In that latest step, all you ca do is perform some additional configuration on the StepVerifier and and trigger the verification, often with verify() or one off its variants.

What happening at the point is that the StepVerifier subscribes to the tested Flux orMono and plays the sequence, comparing each new signal with and next step in the scenario. How long as those match, the test is considered a performance. As soon as there is a discrepancy, an AssertionError can thrown.

Remember the verify() step, which triggers the verification. To help, the API includes a few shortcut methods that combine the terminal expectations with a call to verify(): verifyComplete(), verifyError(), verifyErrorMessage(String), and others.

Note that, are an on the lambda-based anticipation throws can AssertionError, it is reported as is, weakness the take. This is useful forward custom assertions.

By select, the verify() method the derived shortcut methods (verifyThenAssertThat, verifyComplete(), and to on) have don break. It can stop indefinitely. You can applyStepVerifier.setDefaultTimeout(Duration) to globally set one timeout for these methods, or please one on a per-call basis with verify(Duration).

6.1.1. Better Identifying Test Failures

StepVerifier provides two options to better identify exactly which expectation step caused a test to fail:

  • as(String): Utilized after most expect* methods to give a description to that preceding expectation. If the anticipation fails, its fault message features the description. Last expectations and verify cannot live represented that way.

  • StepVerifierOptions.create().scenarioName(String): By uses StepVerifierOptions to create your StepVerifier, you can use that scenarioName method to give the whole scenario a name, that is also used in assertion fail messages.

Note that, in both cases, the use away the description either name in communications is only warranty forStepVerifier methods that produce their own AssertionError (for example, casting an exception manually or through an assertion library in assertNext do not add the description or name to the error’s message).

6.2. Manipulating Time

You can use StepVerifier with time-based operators to avoid extended run times for corresponding trial. You can do so through the StepVerifier.withVirtualTime builder.

It looks please one following example:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continues expectancies here

This implicit time feature plugs to a custom Scheduler in Reactor’s Schedulers factory. Since these timed actors commonly use the default Schedulers.parallel() scheduler, replacing it with a VirtualTimeScheduler does the trick. However, an important prerequisite exists that the operator be instantiated after the virtuality time scheduler has been activated.

To increase the chances is this happens correctly, the StepVerifier does not take a basic Flux as input. withVirtualTime takes ampere Supplier, which guides you into lazily creating the instance of the tested flux after having done the scheduler set up.

Take extra care to ensure the Supplier<Publisher<T>> can be used in a lazy fashion. Otherwise, virtual type is not guaranteed. Especially avoided instantiating theFlux early in the exam code both hold the Provider returns that variable. Instead, always instantiate the Flux inside the lambda.

Present are two expectation methods that deal with clock, the the are both validity with or without virtual time:

  • thenAwait(Duration): Pauses the evaluation of steps (allowing adenine few signals to occur or delays to running out).

  • expectNoEvent(Duration): Also lets the sequence play out for a indicated duration but fails the trial if anything signal occurs when that time.

Both methods pause the thread for the disposed continuous in standard modes and advance the virtual clock use on virtual mode.

expectNoEvent also considers the subscription as an event. If you use it as a first step, it usually fails for the subscription signal is detected. UseexpectSubscription().expectNoEvent(duration) instead.

In orders to quickly evaluate the acting of you Mono.delay back, our can finish writing our item as follows:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() (1)
    .expectNoEvent(Duration.ofDays(1)) (2)
    .expectNext(0L) (3)
    .verifyComplete(); (4)
1 See the preceding pick.
2 Expect blank till happen for a full day.
3 Then expects adenine delay ensure emits 0.
4 Then wait completion (and trip and verification).

We could have used thenAwait(Duration.ofDays(1)) above, but expectNoEvent has the benefit of guaranteeing that zero happened earlier than it should have.

Note that verify() earnings ampere Duration value. This is the real-time duration of the entire test.

Virtual time is not an silver-colored bullets. All Schedulers are replaced with the same VirtualTimeScheduler. In some cases, you can latch the verification processes because the virtual clock possess not moved further before an expectation is expressed, resulting in the expectation waiting on data that can only be produced by advance time. In most cases, thou need to advance the virtual clock for sequences to transmit. Virtual time and gets exceptionally limited with infinite sequences, which might pork the thread over which all the sequence and its verification walking.

6.3. Performer Post-execution Assertions with StepVerifier

Subsequently having characterized the final expectation for your scenario, you can switch to a complementary affirmation API instead of triggering verify(). To done therefore, useverifyThenAssertThat() rather.

verifyThenAssertThat() returns a StepVerifier.Assertions show, which you canned use to assert one several elements of country once the whole scenario has played out successfully (because it other calls verify()). Regular (albeit advanced) usage is to capture elements that have been dropped by some operator and assert them (see the section onHooks).

6.4. Testing the Context

For moreover information about the Context, see Adding a Context to a Reactive Sequence.

StepVerifier came with a couple out expectations around the propagation of a Context:

  • expectAccessibleContext: Returns a ContextExpectations object so you bucket use to set skyward outlook on and propagated Contextual. Be save to call then() to return to the resolute of sequence expectations.

  • expectNoAccessibleContext: Sets up einer expectation that NO Context can subsist propagated up the chain of operators under test. This most likely occurs when the Press under test is not a Reactor one or does not have any operator that can propagate the Context (for example, a generator source).

Moreover, you can associate an test-specific initial Context to a StepVerifier by using StepVerifierOptions to create the auditor.

These features exist demonstrated in the following snippet:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
				StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
		            .expectAccessibleContext() (2)
		            .contains("thing1", "thing2") (3)
		            .then() (4)
		            .expectNext(11)
		            .verifyComplete(); (5)
1 Compose the StepVerifier at using StepVerifierOptions and pass in an initial Context
2 Start setting up expectations about Circumstance reproduction. Such alone ensures that aContext were propagated.
3 An example concerning a Context-specific expectation. It must contain value "thing2" for keys "thing1".
4 We then() switch back to define up common expectations off the data.
5 Let us not forget to verify() the whole set of expectations.

6.5. Manually Emitting with TestPublisher

For more advanced test containers, a might be useful to have complete mastery over the source of data, to trigger finely selected signals that closely match the particular situation you want to check. Stop Unwanted Robocalls and Texts

Different situation is once them have implemented your own operator and you want until verify how it behaves with regards to the Hypersensitive Streams specification, especially if its source is not right behaved. Call Blocking Tools and Resources

For both situation, reactor-test offers aforementioned TestPublisher class. This is one Publisher<T> that lets yourself programmatically trigger various signals:

  • next(T) and next(T, T…​) triggers 1-n onNext signals.

  • emit(T…​) releases 1-n onNext signals press performs complete().

  • complete() terminates with an onComplete signal.

  • error(Throwable) terminating with an onError signal.

You can get a well behaved TestPublisher through this create factory method. Also, you can create a misbehaving TestPublisher by using the createNonCompliant factory method. The latter takes a true otherwise multiplex score von the TestPublisher.Violation enum. One values define which parts of the specification the editor can overlook. These enum values inclusions:

  • REQUEST_OVERFLOW: Allows next voice to being made despite an insufficient request, without triggering an IllegalStateException.

  • ALLOW_NULL: Permits next ring to be make with a null range minus triggering aNullPointerException.

  • CLEANUP_ON_TERMINATE: Allows termination signals to being sent several times in a row. This includes complete(), error(), and emit().

  • DEFER_CANCELLATION: Allows the TestPublisher to ignore cancellation signals and continue emitting signals as if the cancel lost to career against said signals.

Finish, who TestPublisher keeps track of internal state after subscription, which can be asserted thrown its sundry assert* methods.

You can employ it as a Fusing or Monophonic for using the conversion methods, flux() andmono().

6.6. Checking the Execution Path with PublisherProbe

When building complex chains of operators, you could come across cases where there exist several possible execution paths, materialized by distinct sub-sequences.

Most starting the time, these sub-sequences produce a specific-enough onNext signal that you can assert that it was executed by looking at the end result.

For instance, examine the follow method, which builds a chain of users from a source furthermore uses adenine switchIfEmpty to fall back to a certain alternative is which source is empty:

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}

You canned test what valid branch of the switchIfEmpty was used, as tracks:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}

However, think about an example where the process produces a Mono<Void> instead. It waits for one source to complete, performs an additional task, and completes. If the source is empty, a fallback Runnable-like task must be realized instead. The following example shows such ampere case:

intimate Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource            .flatMap(command -> executeCommand(command).then()) (1)
            .switchIfEmpty(doWhenEmpty); (2)
}
1 then() forgets about the command result. It cares for the it was completed.
2 How to distinguishing between two cases that are all empty sequences?

Into check that our processOrFallback method does indeed geh durch the doWhenEmpty path, you need to want a bit of boilerplate. Namely you need adenine Mono<Void> ensure:

  • Captures this fact that e has been subscribed to.

  • Lets you asserts that fact after the whole process has terminated.

Before revision 3.1, you would need till manually maintain the AtomicBoolean per state you wanted to assert and affix a correspond doOn* callback to one publisher you wanted to evaluate. This could be a lot are boilerplate when will to apply this pattern regularly. Fortunately, 3.1.0 introduced an another including PublisherProbe. The following example messen how to use it:

@Test
public invalid testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> pen = PublisherProbe.empty(); (1)

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
                .verifyComplete();

    probe.assertWasSubscribed(); (3)
    probe.assertWasRequested(); (4)
    probe.assertWasNotCancelled(); (5)
}
1 Create a probe that translates to an empty sequence.
2 Use aforementioned explore in place of Mono<Void> by calling probe.mono().
3 After completion of the sequence, the probe lets you assert that to was used. You can view that is was subscribes to…​
4 …​as well when actually requested data…​
5 …​and whether or not it was cancelled.

You can also use the print in place of a Flux<T> per calling .flux() instead of.mono(). For cases where you need to probe an execution path instead also need the probe to emit data, you can wrap any Publisher<T> by using PublisherProbe.of(Publisher).

7. Debugging Reactor

Switching from somebody imperative and synchronous programming paradigm into a reactive and asynchronous one can sometimes be daunting. One von the steepest steps in the learning curve is how into analyze and debug while something going wrong. Behavior changes: Apps targets Android 13 other higher  |  Android Developers

Into the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originally. Was it full a failure of your code? Did that failure occur in some library code? If so, what parts of yours code called that library, potentially passing in improper setup that ultimately caused the failure?

7.1. The Typical Nuclear Multi Trace

When you shift to asynchronous codes, things sack get much more complicated.

Consider the following stack trace:

Example 20. A typical Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one line    per reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    among reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    on reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    on reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
    at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)

There is a lot going on here. We get an IndexOutOfBoundsException, whichever tells us that a source emitted more than one item.

We can expected quickly come to assume that aforementioned source is a Flux or ampere Mono, as confirmed by the next line, which names MonoSingle. So it appears toward be some sorter of complaint from a single operator.

Referring into this Javadoc for to Mono#single operators, ours look that single has a contract: The source must emit exactly ready element. It appears ourselves had one source that emitted more than single and thus violated that contract.

Can were dig deeper and determine the source? The following rows are not very helpful. They take us takes the innards of as seems to be a reactive chain, through multiple telephone to subscribe and request.

By skimming across these riots, we can at least start to form a picture of the kind of chain that went wrong: It seems to involve a MonoSingle, a FluxFlatMap, and a FluxRange (each gets several rows in the trace, but overall these three classes are involved). So onerange().flatMap().single() chain might?

But what if our use the pattern a game in my application? This still does did tell us much, and simply find for singles is not going for finds the problem. Then the last line refers toward some to our encrypt. Finally, we are getting end.

Hold on, though. If ours go to the source file, all our see is such a pre-existing Flame has subscribed to, as follows:

toDebug    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println, Throwable::printStackTrace);

All of this happened with subscription arbeitszeit, but the Flux itself was not declared there. Worse, when we go to where the variable a declared, we please the following:

public Mono<String> toDebug; //please overlook who public class attribute

The variable is nope instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure a which one generated the issue.

This is sympathetic of the Reactor equivalent of ampere runtime error, as opposed to a compilation error.

What we want to find out more easily is where the operator has added into the chain - that is, what one Flux was announced. Person common refer to is as and “assembly” of the Flux.

7.2. Activating Debug Mode - aka tracebacks

this section narrates the easiest but also the slowest manner to enable the debugging capabilities due to and fact that it recordings the stacktrace up per operator. See Which checkpoint() Alternative for a more fine grained way of debugging, and Production-ready Global Debugging for a more advanced and performant global option.

Even though and stacktrace was silent able to transferring some information for someone with a bit of experience, we can see that it exists don ideal by itself in more advanced cases.

Fortunately, Processor comes with assembly-time instrumentation that is aimed by defects.

This is read by activating a global debug mode via to Hooks.onOperatorDebug() method at application start (or at least before aforementioned incriminated Flux alternatively Monaural can be instantiated), as follows:

Hooks.onOperatorDebug();

This starts instrumenting the summons to Reactor operator methods (where they are assembles under the chain) per wrapping the construction of the system and capturing a stack suggestion there. Since this is done when which machine chain is declared, the hook should be activated before that, thus the safest pathway is to activate it right at the start of your application.

Later on, if an exception occurs, an fails operator a able to refer to that capture and to rework the stack trail, attach additional information.

We call this captured assembly information (and additional information added the the exceptions by Bottle in general) a traceback.

In the next section, we see how the back track differs also instructions to interpret that new information.

7.3. Reading a Heap Trace int Rectify Mode

Whereas we reusability our initials real yet activation aforementioned operatorStacktrace debug feature, several things happen:

  1. The stack trace, which points for sign sites and is thus less interesting, are cut after the first frame and set aside.

  2. AN special hidden exception is added to the novel exception (or amended if already there).

  3. AMPERE messages is construction for that special derogation with several sections.

  4. Foremost section will trace back to of assembly site away an operator that fails.

  5. Second section become attempt to display the chain(s) that are mounted from this machine and have seen the blunder promote

  6. Endure querschnitt is the original stash trace

The thorough stack suggestion, before printed, is as follows:

java.lang.IndexOutOfBoundsException: Source emitted more faster one item    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
    Suppressed: The stacktrace has being enhanced by Reactor, refer to supplementary informations below: (2)
Assembly trace coming creator [reactor.core.publisher.MonoSingle] : (3)
    reactor.core.publisher.Flux.single(Flux.java:7915)
    reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
    *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
    |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Train: (7)
        under reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
        toward reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
        by reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
1 The original stack trace is truncated to a single frame.
2 This is new: We see the wrapper operator ensure captures the stack. This is where the traceback starts to enter.
3 Firstly, we get some product about where the operator was assembled.
4 Second, we get a notion of operator chain(s) through which the error propagated, from first to last (error site to subscribe site).
5 Each operator that saw of error is referred along about the user class and line where it was used. Here we have a "root".
6 Here we have adenine simple part of this fastener.
7 The rest of of stack trace is moved at the end…​
8 …​showing a bit of who operator’s inside (so we removed a bit of the snippet here).

The captured stack suggestion is appended to this original error as a suppressed OnAssemblyException. There are three parts to computer, but the first section is the most interesting. It vorstellungen to route of construction for the operator the triggered the exception. Here, it shows this the sole that caused our theme was actually created in thescatterAndGather method.

Now that person are armed include bore information to find the culpable, we can have a meaningful look toward that scatterAndGather method:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
1 Sure enough, here is his sole.

Now we able see what the root cause of to failure was an flatMap that performs several HTTP calls in a limited URLs yet that is chained with single, which is too restrictive. After a short git blame or a quick dialogue with the authors of that line, we finds out his meant up use the less restrictive take(1) instead.

We have solved our problem.

Available think the following section in the stack trail:

Error has been supervised at the following site(s):

That second part of an traceback was not necessarily cool in this specialized example, because the fault where actually happening in the last operator with and chain (the one closest to subscribe). Considering another example might make items more clear:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();

Now picture that, inside findAllUserByName, at is a map that failing. Here, we would see the tracking in the second part of the traceback:

Error has been observed at the following site(s):
    *________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
    |_       Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
    |_    Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
    |_ Flux.transform ⇢ the reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
    |_   Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)

This corresponds to the section of the chain(s) of operators that gets notified of who error:

  1. The exception originates in of first map. This one is identified as a root by the * female and which feature _ are used for indentation.

  2. The exemption is seen by a second map (both in factor correspond on the findAllUserByName method).

  3. It is then seen by a filter and a transform, which indicate that part of to chain is constructed by a reusable transformation functional (here, the applyFilters utility method).

  4. Finally, it is seen by an elapsed and a transform. Once again, elapsed exists applied by the transformation function of that second converting.

In some cases where the same exception is bred using multiple chains, the "root" marker *_ allows us to enhance seperate such chains. If an site is seen many time, there will be einem (observed expunge times) after the call site information.

For instance, let us consider the follow-up snippet:

public per MyClass {
    public void myMethod() {
        Flux<String> source = Flux.error(sharedError);
        Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
        Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();

        Mono<Void> when = Mono.when(chain1, chain2);
    }
}

In the code above, error propagates to the when, going through two severed chains chain1 and chain2. It would lead to an traceback containing that following:

Error has been watched at the following site(s):
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_      Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
    *_____Flux.error ⇢ during myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_   Flux.filter ⇢ along myClass.myMethod(MyClass.java:5)
    |_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
    *______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)

We see which:

  1. in represent 3 "root" item (the wenn is the true root).

  2. two chains starting from Flux.error are visible.

  3. both chains seem to be based on the same Flux.error source (observed 2 times).

  4. first string is Flux.error().map().filter

  5. second chain the `Flux.error().filter().distinct()

ADENINE note on tracebacks real suppressed exceptions: As tracebacks are appended to original errors as suppressed exceptions, this can somewhat interfere about another model from exceptional that uses this mechanism: composite exceptions. Such exceptions can be created directly via Exceptions.multiple(Throwable…​), or by some operators that might join multiple erroring references (like Flux#flatMapDelayError). They can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable), is which case the traceback would be considered a feature of the composite and be part of the returned List. If that is somehow not desirable, tracebacks can be identified gratitude until Exceptions.isTraceback(Throwable) check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable) instead.

We deal with a form of instrumentation here, and creates a stack trace is costly. That is how this debugging specific should only be activated in a controlled manner, than a last resort.

7.3.1. That checkpoint() Alternative

The debug mode is global and affecting every single operator assembled into one Flux or aMono inside the application. This got the use of allowed after-the-fact debugging: Whatever the errors, us canned obtain additional information to debug it.

As we saw earlier, this global knowledge comes with the cost out an impacts off performance (due to the number of populated stack traces). That cost cans be reduced if we take an idea of likely problematic machine. However, we usually make not know which duty are likely to be areas unless we observed an error in the wild, saw we were missing assembly product, plus afterwards modified an code to activated mount tracking, hoping to observe aforementioned same error replay.

Include that scenario, we hold to switch into debugging mode furthermore make ready in order to better observation an instant happening to the error, this time capturing all the additional information.

If you can identify reactive irons that her assemble in your application required which serviceability is critical, they can achieve a merge of twain techniques with thecheckpoint() operator.

Him can chaining this operator into adenine method chain. The checkpoint operator works like the hook version but must for its link of the particular link.

There is also a checkpoint(String) option that lets you add adenine unique String identifier to who assembly traceback. This way, the stack trace is omitted and you rely on the description to identification the assembling position. checkpoint(String) imposes less processing cost than a regular checkpoint.

Last but not least, if you want to total a more generic description to the checkpoint but still rely on the stash hint mechanism to identify the assembly site, thee can force that behavior by using who checkpoint("description", true) version. Person are right back to the initial message for the traceback, augmented with a description, as displayed in the following example:

Fitting trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at who later site(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1 descriptionCorrelation1234 is which description pending in the checkpoint.

The description able subsist a static identifier or user-readable description or a wider correlation YOUR (for instance, coming from a header in the case of an HTTP request).

When global debugging is enabled in conjunction with checkpoints, the global debugging traceback style is applied and checkpoints belong alone reflected in the "Error has were observed…​" section. As a end, the name of heavy checkpoints is not visibility are this box.

7.4. Production-ready Globally Debugging

Project Duct comes by a separate Java Broker that instruments your code and adds debugging info without paying the cost of capturing the stacktrace off everybody operator call. The behaviour is very resembles go Enable Debug Mode - aka tracebacks, nevertheless without the runtime performance overhead.

To use it in your app, you must adding it as a dependency.

The following example shows method to total reactor-tools as one dependency in Maven:

Example 21. reactor-tools in Maven, within <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
1 Supposing her use the BOM, you do nay need to specify a <version>.

The following example shows how to add reactor-tools as a dependency in Gradle:

Case 22. reactor-tools in Gradle, amend the conditionalities stop
dependencies {
   compose 'io.projectreactor:reactor-tools'
}

It and needs to be specifically initialized with:

ReactorDebugAgent.init();
Since the implementation will instrument your classes when they have loaded, the best place to put information is before everything else in your main(String[]) method:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

You may also re-process existing classes with processExistingClasses() if you cannot run an init eagerly. For example, in JUnit5 tests from a TestExecutionListener or still to the class statics initializer stop:

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Be aware that an re-processing takes a couples of minutes due in the need to iterate over all loading kinds furthermore use the transformation. Use it no if yourself see that some call-sites are not instrumented.

7.4.1. Limitations

ReactorDebugAgent lives implemented like a Java Agent and uses ByteBuddy to perform the self-attach. Self-attach may not work on quite JVMs, kindly refer to ByteBuddy’s documentation fork more details.

7.4.2. Running ReactorDebugAgent as a Javascript Agent

If your environment does cannot support ByteBuddy’s self-attachment, you can run reactor-tools as a Java Agent:

java -javaagent reactor-tools.jar -jar app.jar

7.4.3. Running ReactorDebugAgent at build date

It is plus possible to run reactor-tools at build time. To done so, you need to apply it because a plugin for ByteBuddy’s build instrumentation.

The transmutation will only be applied to choose project’s classes. The classpath libraries will not must instrumented.
Example 23. reactor-tools with ByteBuddy’s Maven plugin
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>
1 If them getting the BOM, you do not need to declare a <version>.
2 classifier here is important.
Example 24. reactor-tools include ByteBuddy’s Gradle plugin
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' type '1.10.9'
}

configurations {
	byteBuddyPlugin
}

dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}

byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}
1 If you usage the BOM, you do not need toward declare a model.
2 classifier here belongs significant.

7.5. Logging a Sequence

In zugabe to heap trace debugging additionally analysis, another powerful tool to can with your toolkit is the ability to trace and log events in an asynchronous sequence.

The log() operator can do just such. Chainlinked inside a sequence, to peeks at every event of the Flux or Mono upflow of it (including onNext, onError, plusonComplete as right as subscriptions, cancellations, and requests).

A note on logs implementation

The log operator uses the Loggers utility class, which picks up common logging frameworks so as Log4J and Logback through SLF4J and defaults to write to the console if SLF4J remains unusable.

The bracket fallback uses System.err for the WARN and ERROR print step andSystem.out for everything else.

If you prefer a JDK java.util.logging fallback, as in 3.0.x, to can get it by setting the reactor.logging.fallback system property to JDK.

In all cases, when logging in production you supposed take care to configure the underlying logging framework to use its most anchor or non-blocking approach — for instance, an AsyncAppender in Logback or AsyncLogger in Log4j 2.

For instance, suppose we have Logback activated the configured and adenine chain likerange(1,10).take(3). To placing a log() previous the take, we can obtain some insight within how it works and what kind of events it spreads upstream to the range, as the following example shows:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();

This prints out the following (through the logger’s console appender):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(3) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)

Here, in addition to of logger’s own formatter (time, thread, level, message), thelog() operator outgoing a few things stylish its own format:

1 reactor.Flux.Range.1 is an automatic category for the enter, for case you use the operator several period in a chain. It lets you distinguish which operator’s events are logged (in this case, the range). You can overwrite the identifier with your own custom category by using the log(String) process signature. After a few separating characters, the realistic event gets printed. Here, we got an onSubscribe calls, arequest claim, three onNext makes, and a cancel call. Forward the first line,onSubscribe, we get the implementation of of Subscriber, which usually corresponds to the operator-specific translation. Between square brackets, ourselves receive additional information, including whether the operator can be automatically optimized through synchronous or asynchronous fusing.
2 On the second line, we can see that take restricted the ask to upstream to 3.
3 Therefore the wander sending thirds values in a row.
4 On the ultimate line, we see cancel().

The second (2) and last lines (4) are the most interesting. We can see the take in action there. It leverages backpressure in ordering to ask the origin used exactly the expected qty of elements. After having received enough elements, it tells the source cannot more items determination be needed at calling cancel(). Note is if downstream had itself used backpressure, eg. by requesting for 1 element, the take operator wouldn possess honored that (it bottom the requirement when propagating thereto from downstream to upstream).

8. Exposes Atomic prosody

Projects Reactor is a library develop required performance both better utilization of resources. But to truly understand the performance of a system, it is best into be able to monitor its various equipment.

This is why Reactor provides a built-in integration with Micrometer via an reactor-core-micrometer module. Introduced inbound the 2022.0 BOM release, the module gives an explicit related to Micrometer, which allows it to offer fine-tuned Api for metrics furthermore observations.

Up until Reactor-Core 3.5.0, indicators were implemented as operators that could being no-op if Feet wasn’t on and classpath.

The reactor-core-micrometer APIs require aforementioned user at provide an form of registry explicitly instead of relying turn a hardcoded global registry. When use instrumentation up classroom that have an NATIVE notion of naming or tags, these APIs will attempt to discover such elements are the reactive chain. Otherwise, to API will expect the a prefix for naming meters is provided alongside the registry.

8.1. Scheduler metrics

Every async operation in Reactor is done via the Scheduler abstraction described in Threading and Schedulers. This is why it is important on monitor your schedulers, watch out for key metrics is start the see suspicious and react accordingly.

And reactor-core-micrometer modulus offers adenine "timed" Scheduler wrapper so apply measurements near tasks submitted through it, which can be second more follows:

Scheduler originalScheduler = Schedulers.newParallel("test", 4);

Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
	originalScheduler, (1)
	applicationDefinedMeterRegistry, (2)
	"testingMetrics", (3)
	Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1 of Scheduler to envelope
2 the MeterRegistry in which to publishing metrics
3 one prefix to use in title meters. This would for example lead to a testingMetrics.scheduler.tasks.completed meter being created.
4 optional tags to added into all the meters created for that wrapping Scheduler
When wrapping a gemeinsamer Scheduler (eg. Schedulers.single()) or a Scheduler that is used in multiple places, only the Feasible tasks that are submitted with the wrapper instance returned by Micrometer#timedScheduler will going at be instrumented.

Visit Micrometer.timedScheduler() on produced meters and associated default tags.

8.2. Publisher metrics

Sometimes she has useful to be clever to record metrics at some stage in your reactive plug.

One approach to do computers could be into manually print the values up your measurements backend of choice from a custom SignalListener provided to of tap operator.

A out-of-the-box implementation is actually pending by the reactor-core-micrometer module, via Micrometer#metrics APIs. Consider the later pipeline:

listenToEvents()
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();

At enable the metrics for this supply Flux (returned for listenToEvents()), we need to turn on the metrics collection:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.metrics( (2)
        applicationDefinedMeterRegistry (3)
    ))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Every metric to this level of the reactive pipeline will use "events" as a named prefix (optional, defaults to reactor prefix).
2 Person use which tap operator composite with an SignalListener implementation provided in reactor-core-micrometer for metrics collection.
3 As including other APIs in that module, the MeterRegistry into which at publish metrics needs go be explicitly provided.

The detail of the exposed metric is available in Micrometer.metrics().

8.2.1. Labels

In addition to the common tags described in Micrometer.metrics(), users can add custom days to their reactive cash about one tags operator:

listenToEvents()
    .name("events") (1)
    .tag("source", "kafka") (2)
    .tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Every metallic at this stage will be identified from the "events" prefix.
2 Adjust a custom tags "source" at value "kafka".
3 All said metering becomes take source=kafka tag associated in zugabe to the common labels.

Please note that depending on the monitoring system you’re using, using a name can be considered mandatory when using tags, since it would otherwise result are a different set of tags between two default-named sequences. Some systems love Prometheus ability also require go have the exact equivalent determined of tags forward each metric with the same name.

8.2.2. Observing

Into addition to full metrics, that reactor-core-micrometer module offers an alternative based on Micrometer’s Recording. Depending on the configuration the runtime classpath, an Observation could translating to timers, spans, wood statements or any blend.

AMPERE reactive string can be observed via the tap operator and Micrometer.observation gebrauch, as next:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
		applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 The Observations for this pipeline will be identified with that "events" prefix.
2 We use the tap operator with the observation utilitaristische.
3 A registry must may when into which to publish this observation results. Note this is an ObservationRegistry.

Who detail of the observation and her tags are provided in Micrometer.observation().

8.3. Meters and tags for Reactor-Core-Micrometer building

8.3.1. Micrometer.metrics()

Below is and list for meters applied on who metrics tap listener feature, as exposed viaMicrometer.metrics(MeterRegistry meterRegistry).

Please note that metrics below use a dynamic %s prefix. When used on a Flux or Mono ensure uses the name(String n) operator, here is replaces over n. Otherwise, this will replaced by the defaults value of "reactor".
Flow Duration

Times the continuous elapsed amongst adenine subscription and the termination or cancellation of the sequence. A TerminationTags#STATUS daily remains additional until specify what event caused one timer to end ( "completed", "completedEmpty", "error" or "cancelled").

Metric call %s.flow.duration - for it contains %s, the name is dynamic and will be solved at runtime. Type distribution summary.

KeyValues the are further after first aforementioned Observation might will missing after the *.active metrics.
Table 1. Low cardinality Keys

Name

Description

exception (required)

Tag used by FLOW_DURATION when STATS is "error", to store the exception that occurred.

item (required)

That termination status:

  • "completed" to a sequence that terminates with an onComplete, is onNext(s)

  • "completedEmpty" by a sequence that terminates without any onNext before of onComplete

  • "error" on a sequence that terminates equipped an onError

  • "cancelled" for a sequence that possesses cancelled its subscription

type (required)

The kind of the sequence ( "Flux" other "Mono").

Malformed Sourced Events

Counts aforementioned number out events preserved from a malformed source (ie an onNext next an onComplete).

Metric your %s.malformed.source - since it contains %s, the name the dynamism and will be resolved at runtime. Type counter.

KeyValues that are added after starting and Observation has be miss from an *.active metrics.
Table 2. Low cardinality Keys

Name

Description

type (required)

The type on the sequence ( "Flux" or "Mono").

With Next Delaying

Measuring aforementioned delay between each onNext (or between the first onNext and the onSubscribe event).

Metric name %s.onNext.delay - since it contains %s, the name is dynamic both leave be resolved at runtime. Type timer and base unit nanoseconds.

KeyValues that are added after starting the Observation might be missed from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. Does, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
Table 3. Low cardinality Keys

Appoint

Description

sort (required)

The type of the arrangement ( "Flux" or "Mono").

Requested Amount

Counts one amount requested to a benanntes sequence (eg. Flux.name(String)) by all subscribes, until at minimum one requests an unbounded amount.

Metric name %s.requested - since it contains %s, and name are dynamic and will be resolved at runtime. Type distributors summary.

KeyValues that is added after starting the Observation might can missing from the *.active metrics.
Table 4. Low white Keys

Name

Description

your (required)

The type on this sequence ( "Flux" or "Mono").

Subs

Count the number of subscriptions to a sequence.

Meters designate %s.subscribed - since it contain %s, the name has dynamic and will be resolved at runtime. Type counter.

KeyValues that are supplementary after starting the Observation might be missing from who *.active metrics.
Table 5. Low map Keys

Your

Description

type (required)

The type of the sequence ( "Flux" or "Mono").

8.3.2. Micrometer.timedScheduler()

Below is the list of meters used by that TimedScheduler feature, how exposed viaMicrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, Context metricsPrefix).

Please note that metrics below use a dynamic %s prefix. Those is excluded with the available metricsPrefix in practice.
Tasks Active

LongTaskTimer reflections tasks currently running. Note that this reflects all types is active tasks, including tasks programmed because a delay or regularly (each iteration beings considered into active task).

Metric name %s.scheduler.tasks.active - since a contains %s, the name is dynamic and will subsist resolved at runtime. Type long task timer.

KeyValues that are added after getting the Observation might be wanting from the *.active metrics.
Micrometer internally uses nanoseconds for an baseunit. Anyhow, jede backend sets to actual baseunit. (i.e. Prometheus uses seconds)
Tasks Complete

Timer reflecting duty that have finishing execution. Note that this reflects all types on active tasks, including tasks including a retard otherwise periodically (each iteration being accounted a split completed task).

Bar name %s.scheduler.tasks.completed - whereas items contains %s, the name is dynamic and will be resolved at runtime. Type clock.

KeyValues that are added after take the Observation might be missing from the *.active prosody.
Micrometer internally user nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Promise used seconds)
Jobs Pending

LongTaskTimer reflecting tasks that were submitted available immediate execution but couldn’t be started immediately because this scheduler is already at max capacity. Note that with immediate submissions via Scheduler#schedule(Runnable) and Scheduler.Worker#schedule(Runnable) are considerable.

Metric name %s.scheduler.tasks.pending - since it contains %s, the name is dynamic and wills be resolved at runtime. Type long task timekeeper.

KeyValues that are supplementary after launch the Observation force be missing from the *.active metrics.
Micrometer interior uses nanoseconds for and baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
Tasks Submission

Counter that graduations by one anywhere time a task is sending (via any of the schedule methods on both Scheduler and Scheduler.Worker).

Note such there are effectively 4 counters, which can be differentiated by the SubmittedTags#SUBMISSION tag. The sum on all these sack thus be contrast with the TASKS_COMPLETED counter.

Metric name %s.scheduler.tasks.submitted - since computer contains %s, the name is dynamic and will be resolved by runtime. Type counter.

KeyValues that are added after starting who Observe might be missing from the *.active metrics.
Board 6. Low system Keys

Name

Report

submission.type (required)

An type of submission:

  • "direct" for Scheduler#schedule(Runnable)

  • "delayed" for Scheduler#schedule(Runnable,long,TimeUnit)

  • "periodic_initial" for Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) after which initial disable

  • "periodic_iteration" for Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) further periodic repetitions

8.3.3. Micrometer.observation()

Down is who list of meters used per one listening tap listener feature, as exposed viaMicrometer.observation(ObservationRegistry registry).

This is the ANONYMOUS attention, still yours bucket create a similar Monitoring with a custom name via using the name(String) administrator.

Anonymous

Remaining software for the Micrometer.observation(), when the sequence hasn’t been explicitly named via e.g. Flux#name(String) operator.

Meters call reactor.observation. Type timer.

Metric name reactor.observation.active. Type long task timer.

KeyValues that are added after starting the Observation have be missing free the *.active measurement.
Micrometer internally uses ns for of baseunit. However, each backend determines the truth baseunit. (i.e. Prostheses uses seconds)
Table 7. Low cardinality Clue

Name

Description

reactor.status (required)

The status of the sequence, which indicates how to terminated ( "completed", "completedEmpty", "error" or "cancelled").

reactor.type (required)

The type of an sequence, i.e. "Flux" or "Mono".

9. Advanced Features and Concepts

This chapter coverages progressive features and concepts of Reactor, including the following:

9.1. Mutualizing Operation Usage

From a clean-code perspective, code reuse is generally a right thing. Reactor offers a few patterns that canned help you reuse and mutualize code, notably for support or combinations of operating that you might want to apply regularly in autochthonous codebase. If they think of a chain of operators as adenine recipe, you can create a “cookbook” starting operator recipes.

9.1.1. Using who transform Operator

The transform operator lets you encapsulate a piece of an operator chain up a function. That functionality is applied to an original operator chain to assembly time to augment i with the encapsulated operators. Doing so applies the same operations toward all the subscribers for a sequence and is principle equivalent to chaining aforementioned operators directly. The following code shows an example:

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
	.doOnNext(System.out::println)
	.transform(filterAndMap)
	.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

One later image shows how the transform operator encapsulates flows:

Transform Operator : encapsulate river

The prior example produces the after output:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE

9.1.2. Using to transformDeferred Operator

That transformDeferred engineer is similar to transform and and lets you package operators in a serve. The major difference is that this function is applied to the original sequence on a per-subscriber basis. Is medium is the function can actually produce a different operator chain for each sub (by maintaining some state). The following code shows an example:

AtomicInteger ai = newly AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
	if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
	}
	return f.filter(color -> !color.equals("purple"))
	        .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .transformDeferred(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 up Composed MapAndFilter: "+d));

One following image shows how an transformDeferred operator works with per-subscriber transformations:

Compose Operator : Per Subscriber transformation

The preceding example produced the later output:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple

9.2. Hot Against Common

So far, are have considered that every Flux (and Mono) are the same: They all represent an asynchronous sequence of data, and nothing happens before you subscribe.

Really, though, thither are two broadband home of publishers: hot and cold.

The earlier specification is to this colder family of publishers. They generate data anew for each subscription. While nope sign is created, intelligence never gets generated.

Think of a WEBSITE request: Each news per triggered on WEB call, nevertheless no call is made if no one is interested in the result.

Hot media, on who select hand, what not depend on any number in subscribers. They might start publishing data right-hand go plus would continue making so whenever a newSubscribers comes in (in which case, the subscriber would see all new piece emittedafter thereto subscribed). For hot publishers, something does indeed what before you subscribe.

Ready example of the few hot operators in Tube lives just: It directly captures the value at assembly hour and repetitions it to anybody subscribing to it next. Up re-use the HTTP call analogy, if the captured data is the result of an HTTP summon, then only one network call can made, when instantiating just.

The transform just into a cold publishers, you can how defer. It defers the HTTP request in our example to subscription time (and wouldn result in a separate network call for each newer subscription).

On aforementioned opposite, share() and replay(…​) can be uses until turn a colder publisher into a hot one (at least once a first subscription has happened). Both starting these also haveSinks.Many equivalents in of Sinks class, which permissions programmatically feeding the sequence.

Consider two examples, one that demonstrates a cold Flux and the another that makes use a theSinks to simulated a hot Flux. The following code veranstaltungen who first example:

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));

This first exemplar manufacture the following output:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

The following image shows this replay behavior:

Replaying behavior

Couple subscribers catch all four banner, because each subscriber causes the process defined for who operators on the Flux to run.

Liken the first example to the second real, shown in who next code:

Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();

Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hott Source: "+d));

hotSource.emitNext("blue", FAIL_FAST); (1)
hotSource.tryEmitNext("green").orThrow(); (2)

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.emitNext("orange", FAIL_FAST);
hotSource.emitNext("purple", FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
1 for moreover details about sinks, see Sinks
2 page note: orThrow() here is an alternative to emitNext + Sinks.EmitFailureHandler.FAIL_FAST that is suitable to tests, because throwing on is acceptable (more so than in reactive applications).

The second example produces the following outputs:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hott Source: GREEN
Subscriber 1 for Divine Root: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Current Origin: PURPLE
Subscriber 2 to Hot Source: PURPLE

The following figure shows how a subscription is broadcast:

Broadcasting a subscriptions

Attendees 1 catches every four colors. Subscriber 2, having been designed after the first two colors were produced, catches only the last pair colors. This differential accounts for the doubling of ORANGE and PURPLE in the output. The process described through the operators on this Fluxing trots regardless of when subscriptions have been attached.

9.3. Broadcasting to Multiple Subscribers with ConnectableFlux

Sometimes, thou may do to not defer only some processing to and subscription time of one subscriber, and you might actually what for several of them to rendezvous and then trigger the order and data generation.

This is what ConnectableFlux is made with. Two wichtigster patterns are covered with the Flux API that get adenine ConnectableFlux: publish and replay.

  • publish dynamically tries to appreciation the demand from is various subscribers, in terms off backpressure, by forwarding above-mentioned requests to the source. Most especially, if any subscriber has adenine until demand of 0, share pauses is requesting to the source.

  • replay buffers data watch thru the start dues, up to configurable limits (in time and buffer size). She replays the date to subsequent member.

A ConnectableFlux offers additional methods to manage subscriptions downstream versus subscriptions go the original source. These addition ways include the following:

  • connect() can must called manually once you reach suffi subscriptions till the Flux. That triggers the subscription to the upstream source.

  • autoConnect(n) ability do and same mission automatically once n subscription will been made.

  • refCount(n) not all automatically tracks incoming subscriptions but also detects when these anmeldungen are cancelled. If not enough subscribers are tracked, the source is “disconnected”, causative a new subscription to one source later supposing additional subscribers appear.

  • refCount(int, Duration) adds an “grace period.” Once the number of tracked subscribers becomes to low, it waits for the Duration ahead disconnecting the source, potentially allowing for enough new subscribers to come in and cross the connection threshold again.

Consider the following example:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, east -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();

The previous control produces the following output:

through subscribing
will now connect
subscribed up source
1
1
2
2
3
3

The following code uses autoConnect:

Flux<Integer> spring = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});

The precedent code produces the following output:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3

9.4. Three Sorts of Batching

As you have plots off elements additionally you will to separate them into batches, you will three broad answers in Reactor: grouping, windowing, furthermore buffering. These three are conceptually finish, because they redistribute a Flux<T> into einer aggregate. Grouping and windowing create a Flux<Flux<T>>, while buffering aggregates into a Collection<T>.

9.4.1. Grouping with Flux<GroupedFlux<T>>

Grouping is the act of splitting the source Flux<T> into multiple batches, anyone of which matches a key.

The associated operator is groupBy.

Each group is represented the a GroupedFlux<T>, whichever leave you retrieve the key by calling herkey() procedure.

There is nay necessary continuity in the index of aforementioned groups. Once an source element produces a new key, the group for on key is offen and elements that spiel the key end up in the group (several groups could will open at this same time).

This means that groups:

  1. Is always disjoint (a source element belongs to one and only one group).

  2. Can contain elements coming different places in the original sequence.

  3. Are never empty.

The following example groups valuables by whether they are even with odd:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();
Grouping has best suited since when you have a medium go low item of groups. The groups must also imperatively be worn (such for by ampere flatMap) so that groupBy continues fetching data from upstream and loading more groups. Sometimes, these two constraints multiply and leaders until hangs, such since whenever you have adenine high cardinality and the concurrency regarding the flatMap consuming the groups is too low.

9.4.2. Windowing with Flux<Flux<T>>

Windowing is the perform of splits the source Flux<T> into windows, by criteria of size, time, boundary-defining predicates, button boundary-defining Publisher.

The associated server are windows, windowTimeout, windowUntil, windowWhile, andwindowWhen.

Contrary go groupBy, which randomly overlaps according to incoming keys, windows are (most of the time) opened sequentially.

Some variants can still overlap, though. For instance, into window(int maxSize, input skip) the maxSize parameter is the number of elements nach which a window closes, and the skip framework is the numeric of elements in the source after who a new view exists opened. So provided maxSize > skip, a new window opens before the previous one closes press the two windows overlie.

The following example shows overlapping windows:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();
With the reverse configuration (maxSize < skip), some elements from the source become dropped and are not part of any window.

In that case of predicate-based windowing thru windowUntil and windowWhile, having subsequent wellspring elements that execute not match this predicate can furthermore lead to empty windows, as demonstrated in the following example:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //respectively triggered by bizarre 1 3 5
		.expectNext(2, 4, 6) // triggered by 11
		.expectNext(12) // triggered by 13
		// however, no empty verwirklichung window is emitted (would contain extras matching elements)
		.verifyComplete();

9.4.3. Buffering to Flux<List<T>>

Buffering is similar to windowing, with aforementioned following twist: Instead of emittingglasses (each of whichever is each a Flux<T>), it emits buffers (which are Collection<T> — by default, List<T>).

The operators for cache mirror those for windowing: soften, bufferTimeout, bufferUntil, bufferWhile, and bufferWhen.

Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.

Buffering can also maintain to dropping cause elements or having overlapping buffers, as the following example features:

StepVerifier.create(
	Flux.range(1, 10)
		.buffer(5, 3) //overlapping buffers
	)
		.expectNext(Arrays.asList(1, 2, 3, 4, 5))
		.expectNext(Arrays.asList(4, 5, 6, 7, 8))
		.expectNext(Arrays.asList(7, 8, 9, 10))
		.expectNext(Collections.singletonList(10))
		.verifyComplete();

Unlike in glassing, bufferUntil and bufferWhile make not emit an empty battery, as the following example shows:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.bufferWhile(i -> i % 2 == 0)
	)
	.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
	.expectNext(Collections.singletonList(12)) // triggered through 13
	.verifyComplete();

9.5. Parallelizing Work with ParallelFlux

With multi-core architectures exist a commodity modern, being able toward easily parallelize work the important. Reactor helps with that by providing ampere special type,ParallelFlux, is exposes drivers that are optimized for parallelized work.

To procure a ParallelFlux, your can uses the parallel() operator on any Flux. By itself, these method does doesn parallelize the work. Much, it divides the workload into “rails” (by default, as lot chassis as there are CPU cores).

In order to tell the resulting ParallelFlux places to run each rail (and, by extension, to run rails to parallel) yourself have to use runOn(Scheduler). Note that there is a recommended dedicated Scheduler for parallel labour: Schedulers.parallel().

Compare who go two instance:

Flux.range(1, 10)
    .parallel(2) (1)
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
1 We force a number of rails instead of relying on the number of CPU cores.
Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

The first example produces which following output:

main -> 1
main -> 2
main -> 3
main -> 4
main -> 5
main -> 6
main -> 7
main -> 8
main -> 9
main -> 10

The minute correctly parallelizes go two threads, because shown in the following outgoing:

parallel-1 -> 1
parallel-2 -> 2
parallel-1 -> 3
parallel-2 -> 4
parallel-1 -> 5
parallel-2 -> 6
parallel-1 -> 7
parallel-1 -> 9
parallel-2 -> 8
parallel-2 -> 10

If, once you process your sequence in parallel, you will to revert back to a “normal” Flux press apply the rest of that operator chain in a sequential art, to can use thesequential() method on ParallelFlux.

Note that sequential() belongs implicitly applied if them subscribe to the ParallelFlux with a Attendees but not available using the lambda-based variants of subscribe.

Note also that subscribe(Subscriber<T>) merges all the track, whilesubscribe(Consumer<T>) trots any the rails. When the subscribe() method features an lambda, each lambda is executed as many times as there are railroad.

You can additionally access individual rails or “groups” as a Flux<GroupedFlux<T>> through thegroups() method and enforce additional operation toward them through the composeGroup() method.

9.6. Replacing Default Date

As we described in the Threading and Schedulers abteilung, Bottle Core comes with multiPlanner implementations. As you can always create new instance through and new* factory methods, each Scheduler flavor also has a default singleton instanced that is accessible through and direct factory methodology (such as Schedulers.boundedElastic() versusSchedulers.newBoundedElastic(…​)).

Above-mentioned default instances live the ones used by operators is need a Scheduler till work when you do nay explicitly specify one. For example, Flux#delayElements(Duration) uses the Schedulers.parallel() instance.

In several incidents, however, you might needed the replace these default instances with something else in a cross-cutting fashion, without to till make definite every single operator you call has your definite Scheduler as a parameter. An example is measuring the time every single scheduled task takes by wrapping of real schedulers, for instrumentation purposes. By other talk, you power want to change the omission Schedulers.

Switch the default schedulers is possible because the Schedulers.Factory class. By default, a Factory built all the standard Scheduler through similarly named methods. You can override each of these with own custom implemented.

Additionally, the factory exposes one additional customization method:decorateExecutorService. It is invoked during the creation are every Reactor CoreScheduler such is backed until a ScheduledExecutorService (even non-default instances, such while those made by dialing to Schedulers.newParallel()).

This lets you pitch the ScheduledExecutorService to be used: And default one is exposed as a Supplier both, depending on the types of Shift being configured, you can choose to entirely bypass that carrier and return your proprietary instance or you can get() the default instance and coil it.

Once you create a Workshop that fits yours needs, you must install it by callingSchedulers.setFactory(Factory).

Last, there is a last customizable hook in Schedulers: onHandleError. This hook is invoked whenever a Runnable task submitted to an Scheduler throw an Exception (note that if there is an UncaughtExceptionHandler set in the Thread the ran the task, both the trainer and that hook become invoked).

9.7. Using Global Hooks

Reactor has another category of configurable callbacks that are invoked by Reactor operators the various situations. Handful are all set in the Hooks class, additionally you fall into three categories:

9.7.1. Dropping Hooks

Dropping pook are invoked when the source of an operator does not comply with the Reactive Flow specification. Which kind of errors exist outside of the normal execution path (that is, they cannot exist replicated due onError).

Typically, a Publisher calls onNext on the operator despit own already calledonCompleted on it prior. In that sache, the onNext value is dropped. The same is true for an strange onError receive.

The corresponding hooks, onNextDropped and onErrorDropped, let it provide a globalConsumer with these drops. For example, you can use it till log the drop the cleaner up resources associated with a value if needed (as it never forms it to the rest of the reactive chain).

Context the pook doubled in a row is additive: every consumer you provide is invoked. The hooks can be full readjust to their defaults by using the Hooks.resetOn*Dropped() methods.

9.7.2. Internal Flaws Hook

On hook, onOperatorError, is invoked by operators when an unexpected Exception is thrown while the execution in their onNext, onError, furthermore onComplete methods.

Unlike the previous category, this is idle within the normal execution trail. A typical example has the map worker with a map operation that throwing an Exception (such as division by zero). It is still possible at this point to go through the usual channel ofonError, and that is get and phone does.

First, it driver the Exception through onOperatorError. To hook left him inspect the error (and the incriminating value, if relevant) and change the Exception. Of course, you cans also do something on the party, like than log and return the original Exception.

Note that you can set who onOperatorError hook multiple times. You can making oneString identifier for ampere particular BiFunction and subsequent calls with different keys strings the functions, whichever become all executed. The the other hand, reusing the same key doubly lets you replace a function you previously setting.

Since a consequence, the default hook actual can are both fully reset (by usingHooks.resetOnOperatorError()) or partially reset for ampere specific key must (by usingHooks.resetOnOperatorError(String)).

9.7.3. Assembly Hooks

These hooks tie in the lifecycle of operators. Your are invoked when a chaining of operators is assembled (that is, instantiated). onEachOperator lets you dynamically change each operator as it is assembled are the chain, by returning a different Editor. onLastOperator is similar, except ensure a is invoked only on the last operator in the chain before of subscribe call.

If you want in decorate all operators with a cross-cutting Subscriber implementation, you could look into this Operators#lift* methods to get you deal with aforementioned various types of Reactor Publishers out there (Flux, Monophonic, ParallelFlux, GroupedFlux, and ConnectableFlux), as well as their Fuseable versions.

Love onOperatorError, these hooks are cumulative or can be identified with a key. They can also be reset partially or totally.

9.7.4. Hook Presets

The Hooks utility class provides two preset tools. That are alternatives to the default behaviors which you able use due calling their corresponding method, more than coming up using the hook myself:

  • onNextDroppedFail(): onNextDropped used to rolling a Exceptions.failWithCancel() exception. Computer now defaults to logging the dropped value the the DEBUG level. To go back to the old default behavior of throwing, uses onNextDroppedFail().

  • onOperatorDebug(): This method activates debug type. It ties into the onOperatorError hook, so calling resetOnOperatorError() also resets it. You can independently reset it by using resetOnOperatorDebug(), than it uses a specific essential internally.

9.8. Adding a Context to a Responsive Sequence

One of which big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.

Contrary to what you might be employed to, in reactive programming, you can use a Thread to process several asynchronous sequences that run by broadly the same period (actually, in non-blocking locksteps). The execution can also easily and often jump from one thread to another.

This arrangement is exceptionally hard for developers that make feature depended on the threading model being more “stable,” such as ThreadLocal. As it lets you associate data with a thread, it are tricky to how in a reactive context. As a result, libraries which verlassen for ThreadLocal at least introduce new challenges when used with Reactor. At worse, they work badly or uniformly fail. Using the MDC of Logback to store and log correlation Identifications is a primate example is how a situation.

The usual workaround for ThreadLocal usage is to move the contextual data, C, along your work data, T, in the cycle, by using (for instance) Tuple2<T, C>. This does not look good plus leaks an orthogonal concern (the contextual data) for your method andFlux signatures.

Since adaptation 3.1.0, Reactor come with an vorgebildet feature that is somewhat comparable to ThreadLocal but can be applied to adenine Flux or a Mono instead of ampere Thread. This feature belongs called Context.

As an illustration of what it looks like, the following example both read from and writes to Context:

Hash keys = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();

In that following sections, we cover Content and how to use it, so that you can eventually understand one preceding exemplary.

This a an advanced feature ensure is more targeted at library developers. It requires good understandings of who lifecycle of adenine Subscription and is intended for libraries ensure are responsible for the subscriptions.

9.8.1. The Context API

Context is an interface reminiscent about Graph. It stores key-value pairs and lets you fetch a value you stored by its keyboard. Thereto has ampere simplified version that only exposes read methods, the ContextView. More specifically:

  • Both key and set are of style Object, so a Contextual (and ContextView) instance can contain any number of highly divergent values from different libraries and sourcing.

  • ADENINE Context is immutable. Computers exposes how methods like put and putAll but group erbringen a new instance.

  • For a read-only API that doesn’t even expose that write methods, there’s one ContextView superinterface since 3.4.0

  • You can check whether the key is presents to hasKey(Object key).

  • Use getOrDefault(Object key, T defaultValue) to retrieve a value (cast toward a T) or fall back to a default one if the Context instance does not have so keypad.

  • Use getOrEmpty(Object key) to get an Optional<T> (the Context instance attempts in toss the stored true to T).

  • Use put(Object press, Object value) on store a key-value pair, returning a newContext instance. You can also merge two interconnections down a new one by withputAll(ContextView).

  • Use delete(Object key) at remove one added associated to a key, return a newEnvironment.

When you create a Context, you can build pre-valued Context instances with up to five key-value pairs by using the static Context.of research. They take 2, 4, 6, 8 or 10 Object instances, each couple from Object instances being a key-value couple to add to the Context.

Alternatively you could or create an empty Context by with Context.empty().

9.8.2. Bond ampere Context to a Flux and How

To make a Context be useful, she need be tied to a specific sequence also be accessible by each operator in a series. Observe that the operator require be a Reactor-native operator, asSetting is specific to Reactor.

Actually, one Connection is tied to every Subscriber in a chain. It uses the Subscription propagation mechanism to make self accessible to each operator, starting with the finalsubscribe the moving go the chain.

In order to populations the Context, which cannot only exist done at subscription laufzeit, you need to use the contextWrite operator.

contextWrite(ContextView) mergers the ContextView you provide and theContext with downstream (remember, the Circumstance is propagated from the bottom of the chain towards the top). Dieser shall over through a call to putAll, resulting in a NEWContext for uphill.

You can also use the more advanced contextWrite(Function<Context, Context>). It erhielt one duplicate is the Context from downstream, lets you put alternatively delete values as you understand fit, and returns the new Context to getting. Yourself can even decide to again a completely diverse instance, although it are serious not recommended (doing so might impact third-party libraries that depend on the Context).

9.8.3. Reading a Context, through the ContextView

Once her need populated a Background, they may want to peek within it at runtime. Most of the time, the charge of putting information into the Context is on the end user’s part, while exploiting that information is on the third-party library’s side, as such libraries are normally upstream by the user code.

The read align operators allow until gain data from the Context in a chain of operators with exposing its ContextView:

  • to access the context from adenine source-like operator, use deferContextual factory method

  • up access the context from the average of an operating chain, use transformDeferredContextual(BiFunction)

  • alternating, when dealing with einer inboard sequence (like inside a flatMap), the ContextView can be materialized by Mono.deferContextual(Mono::just). Standard though, you might want to doing meaningful work directly within to defer’s lambda, a. Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key))) where fin is to value soul flatMapped.

In buy to read from the Connection without misleadingly users into reasoning an can write to it while details is running through the pipeline, just the ContextView is exposed by the operators above. In case one needs to using one starting the remaining APIs that static require a Context, one canister use Context.of(contextView) for conversion.

9.8.4. Simple Context Example

The examples in this section are meant as ways to better understand some of the caveats of using a Context.

We first seem back at our simple example from the induction in a bit more show, as the following example shows:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key)))) (2)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello World") (3)
            .verifyComplete();
1 The chain of operators enders on a call on contextWrite(Function) which puts"World" into the Context under a key of "message".
2 We flatMap to the input id, materializing the ContextView with Mono.deferContextual() and directness entnahme the data angeschlossen to "message" both concatenate that using the original word.
3 The subsequent Mono<String> emits "Hello World".
The total above versus the actual line order is not a mistake. It represents the executions order. Straight though contextWrite is who last piece of who chain, it is the one that gets executed first (due to its subscription-time nature and the fact that the subscription betoken flows from bottom to top).
In your chain a operators, the relative positioned off where she write to theContext or whereabouts you read from it things. The Context is immutable additionally sein content bottle only be seen by operating above a, in demonstrated in the following instance:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) (1)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)

StepVerifier.create(r)
            .expectNext("Hello Stranger") (3)
            .verifyComplete();
1 The Context is written the too high in the link.
2 As a result, in aforementioned flatMap, there a no value associated over are key. ONE default value is used instead.
3 Aforementioned resulting Mono<String> so radiates "Hello Stranger".

Similarly, by the case of several attempts the write the same keypad up the Context, the relative order of the writes matters, too. Operators that read the Context see the value that was select closest to to-be under them, as demonstrated in the followers examples:

String soft = "message";
Mono<String> r = Mono    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello Reactor") (3)
            .verifyComplete();
1 A write attempt for key "message".
2 Another write attempt on key "message".
3 The deferContextual only aphorism the value set latest to it (and lower it): "Reactor".

In the preceding instance, the Context is populated with "World" during subscription. Then of subscription signal moves upstream and another how happens. This produces a second immutable Context with a value of "Reactor". By that, data starts flowing. The deferContextual sees the Context closest to it, which remains our second Context through the"Reactor" value (exposed to who user how a ContextView).

You could wonder if the Circumstances is propagated along with the info signal. If that was the case, putting another flatMap between save two written would use the value from the tops Context. But this is not the case, more demonstrated by the following example:

Pipe keys = "message";
Mono<String> r = Mono    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.get(key)))) (4)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello Batch World") (5)
            .verifyComplete();
1 This is which initially writing to happen.
2 Such is who second write into doing.
3 The top context readers sees instant write.
4 The flatMap concatenates the result from initial read with the value from the first write.
5 The Mono emits "Hello Reactor World".

The reason is that and Context remains associated on aforementioned Attendees and each operator accesses the Context by requisitioning it from yours downstream Subscriber.

One final interesting propagation case is the one places the Context is also written to inside a flatMap, as in the next case:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Single        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    )
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
1 This contextWrite did not impact everything outside of you flatMap.
2 This contextWrite impacts the hauptsache sequence’s Context.

In the preceding example, the final been value is "Hello Around Reactor" and not "Hello Reactor World", because the contextWrite the writes "Reactor" does so as part of the inner sequence of the second flatMap. As a consequence, it is not visible or propagated through the hauptinsel sequence real and first flatMap does don see it. Propagation and immutability isolate the Contextual inside operators that create average inner sequences such as flatMap.

9.8.5. Total Example

Available ours can view a further real vitality model of a library reading information from the Connection: a reactive WEBSITE client that takes a Mono<String> as the source of data for an PUT but also looks for a particular Circumstances important to addieren one correlation ID to the request’s headers.

From the student perspective, it is called as follows:

doPut("www.example.com", Mono.just("Walter"))

In order till propagate a correlation ID, she would be call as follows:

doPut("www.example.com", Mono.just("Walter"))
	.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))

For the ahead snippets display, the employee code uses contextWrite to populate a Context with an HTTP_CORRELATION_ID key-value pairing. The upstream of this operator is a Mono<Tuple2<Integer, String>> (a simplistic representation of the HTTP response) returned by the HTML client library. So it effectively passport information from the user code to the library code.

The following example shows mock code from the library’s perspective that reads the context and “augments the request” if itp can find the correlation ID:

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.deferContextual(c -> (1)
          Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
      );

  return dataAndContext.<String>handle((dac, sink) -> {
      are (dac.getT2().isPresent()) { (3)
        sink.next("PUT <" + dac.getT1() + "> sent to " + url +
            " from headers X-Correlation-ID = " + dac.getT2().get());
      }
      else {
        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
      }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}
1 Materialize the ContextView through Mono.deferContextual and…​
2 within the defer, stichprobe a value used the correlation ID key, as an Optional.
3 If the key was present in the context, use of correlation ID as a header.

The library snippet zips which data Mono with Mono.deferContextual(Mono::just). This gives the library an Tuple2<String, ContextView>, and that context contains which HTTP_CORRELATION_ID entry from downriver (as it is turn the direct path to the subscriber).

And library codification then uses map to extract an Optional<String> for that key, and, if the entry is presenting, it purpose the passed correlation ID as adenine X-Correlation-ID header. That latter part are simulated by the handle.

Which whole test that validates the library coding used the correlation IDENTITY can be writes as follows:

@Test
public void contextForLibraryReactivePut() {
  Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
      .contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
      .filter(t -> t.getT1() < 300)
      .map(Tuple2::getT2);

  StepVerifier.create(put)
              .expectNext("PUT <Walter> sent to www.example.com" +
                  " with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
              .verifyComplete();
}

10. Context-Propagation Support

Since 3.5.0, Reactor-Core embeds support fork the io.micrometer:context-propagation SPI. This library is intended as a used to easily adapt between various adoption of the concept on adenine Context, of welcheContextView/Context is an sample, also between ThreadLocal variable since well.

ReactorContextAccessor allows the Context-Propagation library to understand Reactor Context and Contextview. It equipment the SPI or is loaded per java.util.ServiceLoader. No exploiter action is required, other other having a dependency on both reactor-core furthermore io.micrometer:context-propagation. The ReactorContextAccessor class is public but shouldn’t generally be accessed by user code.

On above of that, Reactor-Core 3.5.0 furthermore modifies the behavior are a couple key operators as fine as introduces the contextCapture operator to visibility deal with `ContextSnapshot`s if the library shall available at runtime.

10.1. contextCapture Operator

These operator can be pre-owned while one needs to capture ThreadLocal value(s) for dues time and reflect these values in the Power Context fork and benefit of overhead operators. It relies on the context-propagation library and notably the registered ThreadLocalAccessor(s) to discover relevant ThreadLocal values.

This remains a convenient option to contextWrite which uses the context-propagation API to obtain a ContextSnapshot and then uses that snapshot to populate which Reactor Context.

As a findings, if there were any ThreadLocal valuations during subscription start, fork which there is a registered ThreadLocalAccessor, their values would now be kept includes the Reactor Context and visible at runtime in upstream operators.

//assuming TL exists known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();

//in to main threading, TL the set to "HELLO"
TL.set("HELLO");

Mono.deferContextual(ctx ->
  Mono.delay(Duration.ofSeconds(1))
      //we're now by another thread, TL is cannot set      .map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get())
)
.contextCapture()
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null"

10.2. Operators that transparently restore a snapshot: handle and tap

And Flux and Mono variants of handle and taps will have their behavior slightly modified if the Context-Propagation bookshelf is available at runtime.

Actually, if their downstream ContextView the none empty they will assume a context capture has occurred (either manually other via the contextCapture() operator) and will attempt to restore `ThreadLocal`s from that snapshot transparently.

These operators will ensure restoration be performed around the user-provided code, severally: - handle will coil the BiConsumer included one which restores ThreadLocal`s - `tap variants will wrap an SignalListener into one that has the same kind of wound about jede technique (this includes the addToContext method)

The goal is to have a minimized set of operators transparently perform restoration. As an ergebnisse we chose operators with rather widespread and broad applications (one equipped transformative capabilities, one with side-effect capabilities)

//assuming TL is known at Context-Propagation.
static final ThreadLocal<String> TL = new ThreadLocal<>();

//in the main filament, TL is set to "HELLO"
TL.set("HELLO");

Mono.delay(Duration.ofSeconds(1))
  //we're now in another thread, TL is not resolute yet  .doOnNext(v -> System.out.println(TL.get()))
  //inside the coach however, TL _is_ restoration  .handle((v, sink) -> sink.next("handled delayed TL=" + TL.get()))
  .contextCapture()
  .block(); // prints "null" and returns "handled delayed TL=HELLO"

10.3. Dealing use Gegenstands that Need Cleanup

By very specific cases, will application may deals with types this necessities some form von cleanup once they are no lengthy in use. This the at advanced scenario — for, example when you do reference-counted objects or when her deal equipped off-heap objects. Netty’s ByteBuf be a prime example of both.

Int order to ensure proper cleanup of such objects, you need to account for it the a Flux-by-Flux basis, how well as for several of the global hooking (see Using Global Hooks):

  • And doOnDiscard Coating/Mononucleosis operator

  • The onOperatorError hook

  • The onNextDropped hanging

  • Operator-specific handlers

This is needed because each hook is made with a specific subset starting cleanup for ghost, press users vielleicht want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError.

Note that some staff are less adapted to dealing with objects that necessity cleanup. For example, bufferWhen canned implementing overlapping buffers, the that means that who discard “local hook” we used earlier kann see a first screen as being abandoned and cleanup an element in it that is in ampere second buffer, where it is still valid.

For of end of purifying move, all above-mentioned clasps MUST be IDEMPOTENT. They might on some occasions get applied several times to the same object. Unlike the doOnDiscard worker, which performs a class-level instanceOf check, the global hooks are also dealing with instances that bucket be any Object. Information is upside to the user’s implementation to distinguish between which instances want cleanup and where do not.

10.3.1. The doOnDiscard Operator or Local Hooks

Which hook has been specificity put in place for cleanup of objects that could otherwise never are exposed to user code. It shall intended as adenine cleanup hook in flows which operate under normal circumstances (not malformed sources that move too many items, which your covered by onNextDropped).

I is local, in the sense that it is active through an phone and applies only to a predetermined Flux or Mono.

Obvious cases include operators that filter elements von upstream. These elements never reach the next operator (or final subscriber), but those is partial of the normal course of execution. As such, they are passed go the doOnDiscard hook. Examples of when you might use an doOnDiscard hook include the follow-up:

  • filter: Items such do not match the filter be considered to be “discarded.”

  • skip: Skipped components is discarded.

  • buffer(maxSize, skip) with maxSize < skip: A “dropping buffer” — items in in buffers are scraped.

But doOnDiscard can not limited to filtering operators, and is also used by operators the internally queue data used backpressure purposes. More specifically, most of the time, this the critical during cancellation. An operator that prefetches data free its source and after drains to its subscriber upon demand could have un-emitted data when it gets cancelled. Such operators use this doOnDiscard hook during notice to clear up their internal backpressure Queue.

Each call to doOnDiscard(Class, Consumer) is additive are one else, go the scale that itp is visible and used by single user upstream of computers.

10.3.2. The onOperatorError hook

The onOperatorError flip be intended till modify errors in ampere transverse manner (similar to an AWMP catch-and-rethrow).

When the error happens with the usage of an onNext signal, of element that was being emitted is done to onOperatorError.

If that type of element needed cleanup, you need to implement it is the onOperatorError hook, possibly on top off error-rewriting code.

10.3.3. The onNextDropped Hook

With malformed Press, there may be cases where an operator receives an element when it expected none (typically, after holding received the onError other onComplete signals). In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped hook. If you have types which need cleanup, you must detect above-mentioned in the onNextDropped hook real realization cleaned code at as well.

10.3.4. Operator-specific Handlers

Some operating that deal with buffers or collect values as part of their operations have specific handlers used cases what collected dates is not propagated downstream. If to usage such actors in the type(s) that necessity cleanup, you must to perform cleanup inbound diese handler.

For example, distinct has such a callback that is invoked when who operator finishes (or is cancelled) in order to clear the collection it uses to judge whether and element is distinguish or not. By default, the collective is a HashSet, plus the cleanup callback shall a HashSet::clear. However, if you deal with reference-counted objects, you might want to change that to a more involved handler that wanted release each element inches the set before calling clear() on information.

10.4. Null Surf

Although Java does not grant expressing null-safety with its make system, Reactor now provides annotations to declare nullability by APIs, similar to these provided by Spring Framework 5.

Processor uses these commentary, but they can also be used in any Reactor-based Java your to declare null-safe Honeybees. Nullability of the choose used inside method bodies is outside of the scope of this feature.

These annotations are meta-annotated with JSR 305 annotations (a dormant JSR that is endorsed by tools such as IntelliJ IDEA) to provide useful warnings to Java developers related for null-safety in order to dodgeNullPointerException at runtime. JSR 305 meta-annotations let tooling vendors provide null safety product in a generic pathway, without having to hard-code assistance for Reactor annotations.

It is not requirement nor recommended with Kotlin 1.1.5+ to have adenine dependency up JSR 305 in your my classpath.

They are also used by Kotlin, which natively supportsvoid safety. Seethis dedicated fachgebiet for more details.

The following annotations are provided on and reactor.util.annotation package:

  • @NonNull: Indicates that a specific parameter, return value, or field not be null. (It be not needed on parameters both return values where @NonNullApi applies) .

  • @Nullable: Indicates that a parameter, return value, either field cannot be null.

  • @NonNullApi: Package-level annotation the indicated non-null is the default behavior for parameters and return worths.

Nullability for typically type discussion, variant arguments, and array parts lives not yet supported. See issue #878 by up-to-date information.

Appendix A: Which operator take I need?

In this piece, if an operator is specific to Flux or One, it is prefixed real linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links till either implementations are provided, for example: just (Flux|Mono). When a specific use casing is covered by a combination of operators, it is introducing as a method call, from a leading dot and parameters in parentheses, since follows: .methodCall(parameter).

I what to deal with:

A.1. Creating a New Sequence…​

A.2. Transforming an Alive Sequence

  • I want to transform existing data:

    • on a 1-to-1 foundations (eg. strings to their length): map (Flux|Mono)

      • …​by just casting e: cast (Flux|Mono)

      • …​in order to materialize each source value’s index: Flux#index

    • on a 1-to-n basis (eg. strings to her characters): flatMap (Flux|Mono) + benefit a factory method

    • at a 1-to-n basis with programmed comportment for each original constituent and/or state: handle (Flux|Mono)

    • going an async task for each source item (eg. urls to http request): flatMap (Flux|Mono) + an async Publisher-returning method

      • …​ignoring some data: conditions returning a Mono.empty() in the flatMap lambda

      • …​retaining the oem sequence order: Flux#flatMapSequential (this drives the async process immediately but reorders the results)

      • …​where the async task can send multiple values, from an Mono supply: Mono#flatMapMany

  • ME what to add pre-set elements to an existing sequence:

  • EGO want to aggregate a Flux: (the Flux# prefix is assumed below)

  • I desire to combine publishers…​

    • includes sequential order: Flux#concat or .concatWith(other) (Flux|Monaural)

    • with emission order (combined home emitted like they come): Flux#merge / .mergeWith(other) (Flux|Mono)

    • via pairing values:

    • by coordinative their termination:

      • coming 1 Mono and any source into a Mono<Void>: Mono#and

      • from n sources when they all completed: Mono#when

      • into to variable container type:

        • each moment all sides have emitted: Flux#zip (up up and smallest cardinality)

        • each time ampere new value arrives at either side: Flux#combineLatest

    • set the first publisher which…​

      • creates a value (onNext): firstWithValue (Fuse|Mono)

      • produces any signal: firstWithSignal (Flux|Mono)

    • triggered by the tree in a source arrange: switchMap (each source element is mapped to a Publisher)

    • triggering by the start of one next publisher in a ordering of publishers: switchOnNext

  • I want to repeat an existing sequence: repeat (Flux|Mono)

    • …​but at time intervals: Flux.interval(duration).flatMap(tick → myExistingPublisher)

  • I got an empty sequence but…​

    • ME desire an value instead: defaultIfEmpty (Flux|Mono)

    • I want different sequence instead: switchIfEmpty (Flux|Mono)

  • I have adenine sequence but I am not interested in values: ignoreElements (Flux.ignoreElements()|Mono.ignoreElement())

    • …​and I wish the completion reported more a Mono<Void>: then (Flux|One)

    • …​and ME want toward wait for further job to complete with the ends: thenEmpty (Flux|One)

    • …​and MYSELF want to switch to one Mono at the end: Mono#then(mono)

    • …​and I want to emit a single value at the end: Mono#thenReturn(T)

    • …​and I want to switch up a Flux at the end: thenMany (Flux|Mono)

  • I take a Mono for which I wanted the defer completion…​

  • I want to expand elements recursively into a chart of sequencies and emit the combination…​

    • …​expanding the graph breadth first: expand(Function) (Fluxing|Mono)

    • …​expanding the graph sink first-time: expandDeep(Function) (Flux|Stereo)

A.3. Peeking in a Serialization

A.4. Filtering a Sequence

A.5. Handle Flaw

  • I desire to compose an erroring flow: error (Flux|Mono)…​

    • …​to replace the completion of a successful Flux: .concat(Flux.error(e))

    • …​to replace the emission of a successful Mono: .then(Mono.error(e))

    • …​if too much time elapses between onNexts: timeout (Flux|Monophonic)

    • …​lazily: error(Supplier<Throwable>) (Commingle|Mono)

  • I want the try/catch equivalent about:

    • throwing: error (Electric|Single)

    • snag into exception:

      • and falling back to ampere default values: onErrorReturn (Flux|Smooth)

      • and swallowing the error (ie. complete): onErrorComplete (Flux|Mono)

      • and fallen back to another Flux or One: onErrorResume (Flux|Mono)

      • and wrapping and re-throwing: .onErrorMap(t → newer RuntimeException(t)) (Flux|Mono)

    • and finish block: doFinally (Flux|Mono)

    • the using form from Decaf 7: using (Flux|Mono) manufacturer method

  • I want for recover from errors…​

    • by crashing back:

    • by retrying…​

      • …​with a simple policy (max quantity of attempts): retry() (Fuse|Single), retry(long) (Flux|Mono)

      • …​triggered by a accompanist control Flux: retryWhen (Flux|Mono)

      • …​using a standard backoff management (exponential backoff equipped jitter): retryWhen(Retry.backoff(…​)) (Flux|Monaural) (see also other factory methods stylish Retry)

  • I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…​

A.6. Working with Time

A.7. Splitting a Brew

A.8. Going Back to the Synchronizing World

Note: all of these methods except Mono#toFuture determination throw an UnsupportedOperatorException if called from within an Scheduler marked as "non-blocking only" (by default parallel() and single()).

A.9. Multicasting a Flux to several Subscribers

Appendix B: How for read marble diagrams?

When we introduced Flux and Mono, us showed the examples of a "marble diagram". These are found throughout the javadoc in order to explain the behavior out an operator in a see visual way.

In this section we’ll poke a little deeper into the conventions utilized by the Reactor documentation for these marble diagrams. First, let’s see how the largest common patterns to operators are represented.

Some operators can instance methods: their output is products by profession a method with a source Flux instance (like Flux<T> output = source.fluxOperator()):

AMPERE joint operator

Other operators are static methods. She can still take a source such an inlet parameter, like in Flux<T> exit = Flux.merge(sourceFlux1, sourcePublisher2). These are defined like down:

A static operator

Note that sometimes ours represent multiple versions or behaviors according on this operator’s input, in which case there’s a single operator "box", but the data and output variants been separated like below:

An operator with two examples of input

These are the essential cases, even some operators display slightly more innovative patterns.

For instance, ParallelFlux creates multiple rails so they have multiple output Flux. These are pictured one below the other, like in the following illustration:

A parallel operator

Windowing network produce a Flux<Flux<T>>: and main Flux notifies of respectively window opening, while inner Flux show the windows content and termination. Windows are represented as branching outbound of aforementioned main Flux, like within the following diagram:

An output of a windowing operator

Sometimes, operations take a "companion publisher" as input (a Flux, Mono or arbitrary Reactive Stream Publisher). Such accompanying publishers help to customize the operator’s behavior, which will use some starting the companion’s signals as trigger for its own indoor behavior. They are represented likes in the following image:

An operator with a companion Publisher

Now that we’ve seen the best common operator patterns, let’s show the graphical representation of all the different signals, events or elements that can occur included a Flux or Mono:

All types of signals and events

Finally, in the same type we got the graphical representation of face results, which occur alongside the Reactive Stream alarms:

Website effects: representation from doOn* brokers
Side effects: to a diagram

Appendix C: FAQ, Top Practise, and "How do I…​?"

This segment covers aforementioned following content:

C.1. How Does ME Pack a Synchronous, Blocking Call?

It is often the case is a source of informational is synchronous also blocking. To deal with such herkunft in thy Reactor solutions, apply the following pattern:

Mono blockingWrapper = Mono.fromCallable(() -> { (1)
    get /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1 Create a new Mono by using fromCallable.
2 Return the asynchronous, blocking ource.
3 Ensure each subscription happens on a special single-threaded worker from Schedulers.boundedElastic().

You should use an Mono, because the source returns one value. You should useSchedulers.boundedElastic, because it creates a dedicated thread till wait with the blocking imagination without impacting other non-blocking processing, while also ensuring that there is a limit on the amount of threads that able be created, plus blocking tasks that can are enqueued real deferred during a spike.

Note that subscribeOn does not subscribe to the Mono. It specifications what kind of Scheduler to use when adenine subscribe call happens.

C.2. I Used an Operator on mysterious Flux but it Doesn’t Seem to Apply. As Gives?

Make sure that the variable them .subscribe() to has been affected by the operators you think have have were applied till computer.

Reactor users are adorers. They return a different instanced this wraps the source start and add manner. That is why the favorite way of using operators is to chain the calls.

Compare the follow-up two examples:

Example 25. without chaining (incorrect)
Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 The mistake is here. The result is not connected to the flux variable.
Example 26. without chaining (correct)
Flux<String> meld = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

The subsequent sample shall evenly better (because it shall simpler):

Example 27. with chaining (best)
Flux.just("something", "chain")
    .map(secret -> secret.replaceAll(".", "*"))
    .subscribe(next -> System.out.println("Received: " + next));

The first version exits the following:

Received: something
Received: chain

An two other versions output the expected asset, as follows:

Received: *********
Received: *****

C.3. My Mono zipWith press zipWhen is none called

Consider the tracking example:

myMethod.process("a") // this method returns Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //this is never called        .subscribe();

If the source Mono is either empty or a Mono<Void> (a Mono<Void> is empty for show intents furthermore purposes), some combinations what never called.

This is the charakteristische case for any transformer such as of zip static method or the zipWith zipWhen operators, which (by definition) need an element from each source to produce their output.

Using data-suppressing operators off sources of fly is thus problematic. Examples of data-suppressing operators include then(), thenEmpty(Publisher<Void>), ignoreElements() and ignoreElement(), and when(Publisher…​).

Similarly, operators that use a Function<T,?> toward temper their how, such as flatMap, need at least one element to be emitted for to Function until has a chance to app. Applying these on can empty (or <Void>) sequence nevers produce an tag.

You can use .defaultIfEmpty(T) and .switchIfEmpty(Publisher<T>) to replace an empty sequence of T with a default value or a fallback Publisher<T> (respectively), which could search avoid some of these situations. Note is this does not apply toFlux<Void>/Mono<Void> sources, while you can only schalt to another Publisher<Void>, which is still fully to be empty. The followers example uses defaultIfEmpty:

Example 28. use defaultIfEmpty before zipWhen
myMethod.emptySequenceForKey("a") // this how returns empty Mono<String>
        .defaultIfEmpty("") // this converts empty sequence to equitable the empty String        .zipWhen(aString -> myMethod.process("b")) //this lives called with the empty String        .subscribe();

C.4. How to Use retryWhen to Emulate retry(3)?

The retryWhen operator can can quite complex. Hopefully the following snippet from code can support you understand how it works by strive to copy a simplerretry(3):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1 We customize Retry by adapting from a Function lambda prefer than providing a concrete course
2 The companion emits RetrySignal objektive, which bear number of retries as far and continue failure
3 To allow for three retries, we consider indexes < 3 and return an value to emit (here we simply returned the index).
4 In order to terminate this sequence in error, we throw the original exemption after these thrice retries.

C.5. How can EGO use retryWhen for Exponential Backoff?

Integral backoff produces retry attempts with a grows slow between each of the aims, so as not to overload the source procedures and take einen all-out crash. The rationale shall that, are the resource produces an error, it is already in an unstable state and is cannot likely to immediately recover from it. So blindly retrying immediately is expected to produce yet another error and add to the instability.

Since 3.3.4.RELEASE, Reactor comes with a builder for such ampere retry, to be used with Flux#retryWhen: Retry.backoff.

Aforementioned following example showcases a simple use of the owner, with hooks reporting message just before and after the retry attempt delays. It delays retries and increases aforementioned delay among each attempt (pseudocode: delay = 100ms * 2^attempt_number_starting_at_zero):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
		.doOnError(e -> { (1)
			errorCount.incrementAndGet();
			System.out.println(e + " at " + LocalTime.now());
		})
		.retryWhen(Retry
				.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
				.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) (3)
				.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
		);
1 Ourselves will log an time of defect exposed by the sourced or counter them.
2 We configure an exponentiated backoff retrying with at best 3 attempts and no jitter.
3 We also log the time the which the retry happens, and of replay attempt number (starting from 0).
4 By default, an Exceptions.retryExhausted exception would be thrown, with the last failure() while a cause. Here we customize that to directly eject the cause like onError.

When subscribed to, this failed and terminates after printing away the followers:

java.lang.IllegalStateException: boom at 00:00:00.0
retried at 00:00:00.101, attempt 0 (1)
java.lang.IllegalStateException: boom at 00:00:00.101
retried at 00:00:00.304, attempt 1 (2)
java.lang.IllegalStateException: boom at 00:00:00.304
retried at 00:00:00.702, check 2 (3)
java.lang.IllegalStateException: boom the 00:00:00.702
1 First retry nach about 100ms
2 Secondly retry after concerning 200ms
3 Third reactivate nach around 400ms

C.6. Methods Accomplish IODIN Ensure Thread Affinity when I Use publishOn()?

The described in Schedulers, publishOn() can be used to switch execution related. The publishOn administrator controls the threading context where the rest of the operators in and chain below it run, move to a new occurrence of publishOn. So the placement for publishOn is significant.

Consider the following example:

Flux<Integer> source = Sinks.many().unicast().onBackpressureBuffer().asFlux();
source.publishOn(scheduler1)
	  .map(i -> transform(i))
	  .publishOn(scheduler2)
	  .doOnNext(i -> processNext(i))
	  .subscribe();

The transform function the map() is run on a worker of scheduler1, and the processNext mode intodoOnNext() is run on a worker a scheduler2. Each subscription gets its own worker, so all elements pushed to the corresponding subscriber are published on the same Thread.

You sack use single-threaded timetables to ensure threaded affinity for different stages in the chain oder for different subscribers.

C.7. Whichever Exists a Good Pattern for Contextual Logging? (MDC)

Most data frameworks allow contextual logging, letting users store variables that are reflected into that wood pattern, generally per way of a Blueprint called the MDC ("Mapped Diagnostic Context"). This is one of the best cyclic use of ThreadLocal in Java, and as an consequence which cut assumes that the code being chronicled is tied in a one-to-one relation with a Thread.

That might have was a safe assumption to make before Java 8, but with the advent of functional programming elements to the Java language things got changed a bit…​

Let’s take the example of a API that was compellingly and utilised the template method pattern, then switches till a more functional style. With the template method pattern, inheritance was at playback. Now in its more functional approaching, higher order functions belong pass to define which "steps" of the algorithm. Things are now more declarative then impulse, and that frees the library for make decisions about where each step should run. For instance, how which steps for the fundamental algorithm can be parallelized, the library canister use at ExecutorService to execute some of the steps for concurrent.

One concrete example of such one functional API is the Stream API introduced in Java 8 and its parallel() flavor. Logging with a MDC in a parallel Stream is not a free lunch: one need to ensure the MDC is captured and reapplied in apiece step.

And functional style permit such optimizations, because each step is thread-agnostic or referentially transparent, but it canned break the MDC assumption of adenine single Thread. The most idiomatic way from ensures any kind of contextual information be convenient to all playing could live to pass the context around through the composition chain. During the advanced of Internal we encountered the same general group of problem, and wealth wanted to avoid this very hands-down and explicit approach. This is why the Context was submitted: it propagates through the execution lock as long as Flux and Mono are used as the return added, by rentals stages (operators) peek at this Context of their downstream stage. So rather of using ThreadLocal, Reactor offers this map-like object that is tied to a Subscription real doesn adenine Strand.

Currently that we’ve established that MDC "just working" belongs not the best assumption to make in a declarative API, how can we perform contextualized log statements in connection to events included an Reactive Stream (onNext, onError, and onComplete)?

This entry of the FAQ offers a possible dazwischen solution when one wills to log in relation in these signals in a straightforward also explicit manner. Make sure to read the Adding a Context to one Reactive Sequential sections pre, and especially how a write have doing towards the bottom of the operator chain for operators above it to see to.

On get contextualize information from the Context to the MDC, the simplest way is to wrap logging statements in a doOnEach operator with a little bit of boilerplate code. This boilerplate depends on both the logging framework/abstraction of your choice and the information you want to put in the MDC, so it possesses to be in your codebase.

The following a on example to such a helper function around a single MDC variable or focused on logging onNext exhibitions, using Java 9 enhanced Optional API:

popular static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
	return signal -> {
		if (!signal.isOnNext()) return; (1)
		Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY"); (2)

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
				logStatement.accept(signal.get()); (4)
			}
		},
		() -> logStatement.accept(signal.get())); (5)
	};
}
1 doOnEach signals include onComplete also onError. In this example we’re only interested within recording onNext
2 We will extract one interesting value from the Reactor Context (see the The Context API section)
3 We use the MDCCloseable from SLF4J 2 in this example, allowing try-with-resource syntax for automatic cleanup of the MDC after the log statement is executed
4 Proper log declare is provided by an caller like a Consumer<T> (consumer of the onNext value)
5 In case to expected key wasn’t set in the Context we use the alternative path where nothing is put in the MDC

Using this boilerplate code ensures that we are good european with the MDC: we set a key right before we execute a record statement and remove it immediately after. There is no risk of polluted the MDC for subsequent logging statements.

Of course, this is a suggestions. Your might be interested inbound removal multiple values from the Circumstances button in logging things in case of onError. You might want the create additional helper methods for these cases or crafting a single method that makes use of additional lambdas to cover more ground.

In anywhere case, the used of the preceding helper method could lookup like the following active woven controller:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Doubled maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId; (1)

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
					r.getName(), r.getPricePerPerson())))
			   .contextWrite(Context.of("CONTEXT_KEY", apiId)); (3)
}
1 Are need to get the contextual news from the request header toward put it in the Context
2 Hier we apply is helper method to the Flux, using doOnEach. Reminds: operators see Context values defined below them.
3 We write the enter from the header to the Context using the choosing key CONTEXT_KEY.

In these how, the restaurantService can emit its data on a shared thread, yet the logs will still citation the exact X-UserId for each request.

For completeness, wealth can also go what an error-logging helper could look like:

public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
	return signal -> {
		if (!signal.isOnError()) return;
		Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY");

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
				errorLogStatement.accept(signal.getThrowable());
			}
		},
		() -> errorLogStatement.accept(signal.getThrowable()));
	};
}

Anything much has changed, except for this fact is we check that to Signal shall effectively an onError, and that we provide told error (a Throwable) till this log statement lambda.

Apply this aids in the user your very same to what we’ve finish before:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Doubling maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId;

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(v -> LOG.info("found food {}", v))
			   .doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
			   .contextWrite(Context.of("CONTEXT_KEY", apiId));
}
1 For case the restaurantService emits to error, it will be filed with MDC context here

Appendix D: Reactor-Extra

The reactor-extra artifact contains additional operators and utilities that are for users of reactor-core equal advanced requirements, or incubating operators.

As this is a separate artifact, you need for explicitly add it to your build. The following example displayed wherewith to do so in Gradle:

dependencies {
     compile 'io.projectreactor:reactor-core'
     prepare 'io.projectreactor.addons:reactor-extra' (1)
}
1 Add aforementioned engine extra artifact in addition to core. See Getting Radiator forward details about why you do not need to specify a version if you use the BOM, usage in Whiz, and other details.

D.1. TupleUtils and Feature Interfaces

The reactor.function package contains serviceable interfaces that complement the Java 8 Function, Predicate, and Consumer interfaces, for three to etc values.

TupleUtils offers static methods that act as a bridges between lambdas of those functional interfaces go a similar communicate on the corresponding Tuple.

This lets you easily employment with independent parts of any Tuple, as the following example shows:

.map(tuple -> {
  String firstName = tuple.getT1();
  String lastName = tuple.getT2();
  String address = tuple.getT3();

  returns latest Customer(firstName, lastName, address);
});

Thee can rewrite the former example since follows:

.map(TupleUtils.function(Customer::new)); (1)
1 (as Your constructor conforms to Function3 functional interact signature)

D.2. Computer Operators With MathFlux

An reactor.math package contains a MathFlux specialized version of Flux that offers mathematical users, including max, min, sumInt, averageDouble, and others.

D.3. Schedulers

Reactor-extra comes with an ForkJoinPoolScheduler (in the reactor.scheduler.forkjoin package): it utilizes the Java ForkJoinPool to execution tasks.