diff --git a/README.md b/README.md index 520e4fb2a6..03cbd1d852 100644 --- a/README.md +++ b/README.md @@ -25,10 +25,10 @@ With Gradle from repo.spring.io or Maven Central repositories (stable releases o } dependencies { - //compile "io.projectreactor:reactor-core:3.1.3.BUILD-SNAPSHOT" - //testCompile("io.projectreactor:reactor-test:3.1.3.BUILD-SNAPSHOT") - compile "io.projectreactor:reactor-core:3.1.2.RELEASE" - testCompile("io.projectreactor:reactor-test:3.1.2.RELEASE") + //compile "io.projectreactor:reactor-core:3.1.4.BUILD-SNAPSHOT" + //testCompile("io.projectreactor:reactor-test:3.1.4.BUILD-SNAPSHOT") + compile "io.projectreactor:reactor-core:3.1.3.RELEASE" + testCompile("io.projectreactor:reactor-test:3.1.3.RELEASE") } ``` @@ -54,7 +54,7 @@ A Reactive Streams Publisher with basic flow operators. - Static factories on Flux allow for source generation from arbitrary callbacks types. - Instance methods allows operational building, materialized on each _Flux#subscribe()_, _Flux#subscribe()_ or multicasting operations such as _Flux#publish_ and _Flux#publishNext_. -[](http://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) +[](http://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) Flux in action : ```java @@ -73,7 +73,7 @@ A Reactive Streams Publisher constrained to *ZERO* or *ONE* element with appropr - Static factories on Mono allow for deterministic *zero or one* sequence generation from arbitrary callbacks types. - Instance methods allows operational building, materialized on each _Mono#subscribe()_ or _Mono#get()_ eventually called. -[](http://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) +[](http://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) Mono in action : ```java diff --git a/reactor-core/src/main/java/reactor/core/publisher/ConnectableFlux.java b/reactor-core/src/main/java/reactor/core/publisher/ConnectableFlux.java index 54a8080fd1..d0592532bc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ConnectableFlux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ConnectableFlux.java @@ -38,7 +38,7 @@ public abstract class ConnectableFlux extends Flux { * subscribes. * *

- * + * * * @return a {@link Flux} that connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes */ @@ -54,7 +54,7 @@ public final Flux autoConnect() { * triggers the connection. * *

- * + * * * @param minSubscribers the minimum number of subscribers * @@ -71,7 +71,7 @@ public final Flux autoConnect(int minSubscribers) { * @param cancelSupport the consumer that will receive the {@link Disposable} that allows disconnecting * *

- * + * * * @return a {@link Flux} that connects to the upstream source when the given amount of subscribers subscribed */ @@ -118,7 +118,7 @@ public final Disposable connect() { * when all Subscribers cancelled or the upstream source completed. * *

- * + * * * @return a reference counting {@link Flux} */ @@ -131,7 +131,7 @@ public final Flux refCount() { * when all Subscribers cancelled or the upstream source completed. * *

- * + * * * @param minSubscribers the number of subscribers expected to subscribe before connection * @@ -149,7 +149,7 @@ public final Flux refCount(int minSubscribers) { * in during the {@code gracePeriod} that follows, the disconnection is cancelled. * *

- * + * * * @param minSubscribers the number of subscribers expected to subscribe before connection * @param gracePeriod the {@link Duration} for which to wait for new subscribers before actually @@ -169,7 +169,7 @@ public final Flux refCount(int minSubscribers, Duration gracePeriod) { * in during the {@code gracePeriod} that follows, the disconnection is cancelled. * *

- * + * * * @param minSubscribers the number of subscribers expected to subscribe before connection * @param gracePeriod the {@link Duration} for which to wait for new subscribers before actually diff --git a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java index d4566e9784..1b7cc6c8a2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java @@ -44,7 +44,7 @@ * the parent sequence after a given {@link Subscriber} is subscribed. *

*

- * *

* diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 5fabf210e6..6f1f1fb22e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -74,7 +74,7 @@ * (successfully or with an error). * *

- * + * *

* *

It is intended to be used in implementations and return types. Input parameters should keep using raw @@ -111,7 +111,7 @@ public abstract class Flux implements Publisher { * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of the {@link Publisher} sources. *

- * * * @param sources The {@link Publisher} sources to combine values from @@ -131,7 +131,7 @@ public static Flux combineLatest(Function combinator, Pub * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of the {@link Publisher} sources. *

- * * * @param sources The {@link Publisher} sources to combine values from @@ -168,7 +168,7 @@ public static Flux combineLatest(Function combinator, int * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of two {@link Publisher} sources. *

- * * * @param source1 The first {@link Publisher} source to combine values from @@ -192,7 +192,7 @@ public static Flux combineLatest(Publisher source1, * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of three {@link Publisher} sources. *

- * * * @param source1 The first {@link Publisher} source to combine values from @@ -218,7 +218,7 @@ public static Flux combineLatest(Publisher sour * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of four {@link Publisher} sources. *

- * * * @param source1 The first {@link Publisher} source to combine values from @@ -247,7 +247,7 @@ public static Flux combineLatest(Publisher * Build a {@link Flux} whose data are generated by the combination of the most recently published value from each * of five {@link Publisher} sources. *

- * * * @param source1 The first {@link Publisher} source to combine values from @@ -279,7 +279,7 @@ public static Flux combineLatest(Publisher - * * * @param source1 The first {@link Publisher} source to combine values from @@ -314,7 +314,7 @@ public static Flux combineLatest(Publisher - * * * @param sources The list of {@link Publisher} sources to combine values from @@ -334,7 +334,7 @@ public static Flux combineLatest(Iterable - * * * @param sources The list of {@link Publisher} sources to combine values from @@ -364,7 +364,7 @@ public static Flux combineLatest(Iterable - * + * * * @param sources The {@link Iterable} of {@link Publisher} to concatenate * @param The type of values in both source and output sequences @@ -384,7 +384,7 @@ public static Flux concat(Iterable> sour * last source completes. Any error interrupts the sequence immediately and is * forwarded downstream. *

- * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concatenate * @param The type of values in both source and output sequences @@ -404,7 +404,7 @@ public static Flux concat(Publisher> sou * last source completes. Any error interrupts the sequence immediately and is * forwarded downstream. *

- * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concatenate * @param prefetch the inner source request size @@ -428,7 +428,7 @@ public static Flux concat(Publisher> sou * last source completes. Any error interrupts the sequence immediately and is * forwarded downstream. *

- * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concat * @param The type of values in both source and output sequences @@ -449,7 +449,7 @@ public static Flux concat(Publisher... sources) { * last source completes. Errors do not interrupt the main sequence but are propagated * after the rest of the sources have had a chance to be concatenated. *

- * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concatenate * @param The type of values in both source and output sequences @@ -469,7 +469,7 @@ public static Flux concatDelayError(Publisher - * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concatenate * @param prefetch the inner source request size @@ -496,7 +496,7 @@ public static Flux concatDelayError(Publisher - * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concatenate * @param delayUntilEnd delay error until all sources have been consumed instead of @@ -523,7 +523,7 @@ public static Flux concatDelayError(Publisher - * + * *

* @param sources The {@link Publisher} of {@link Publisher} to concat * @param The type of values in both source and output sequences @@ -695,7 +695,7 @@ public static Flux push(Consumer> emitter, OverflowSt * effectively behave like {@link #from(Publisher)}. * *

- * + * * * @param supplier the {@link Publisher} {@link Supplier} to call on subscribe * @param the type of values passing through the {@link Flux} @@ -709,7 +709,7 @@ public static Flux defer(Supplier> supplier) { /** * Create a {@link Flux} that completes without emitting any item. *

- * + * *

* @param the reified type of the target {@link Subscriber} * @@ -723,7 +723,7 @@ public static Flux empty() { * Create a {@link Flux} that terminates with the specified error immediately after * being subscribed to. *

- * + * *

* @param error the error to signal to each {@link Subscriber} * @param the reified type of the target {@link Subscriber} @@ -739,7 +739,7 @@ public static Flux error(Throwable error) { * after being subscribed to or after being first requested. * *

- * + * * * @param throwable the error to signal to each {@link Subscriber} * @param whenRequested if true, will onError on the first request instead of subscribe(). @@ -762,7 +762,7 @@ public static Flux error(Throwable throwable, boolean whenRequested) { * fastest of these competing sources. * *

- * + * *

* * @param sources The competing source publishers @@ -781,7 +781,7 @@ public static Flux first(Publisher... sources) { * fastest of these competing sources. * *

- * + * *

* * @param sources The competing source publishers @@ -796,7 +796,7 @@ public static Flux first(Iterable> sourc /** * Decorate the specified {@link Publisher} with the {@link Flux} API. *

- * + * *

* @param source the source to decorate * @param The type of values in both source and output sequences @@ -829,7 +829,7 @@ public static Flux from(Publisher source) { /** * Create a {@link Flux} that emits the items contained in the provided array. *

- * + * *

* @param array the array to read data from * @param The type of values in the source array and resulting Flux @@ -850,7 +850,7 @@ public static Flux fromArray(T[] array) { * Create a {@link Flux} that emits the items contained in the provided {@link Iterable}. * A new iterator will be created for each subscriber. *

- * + * *

* @param it the {@link Iterable} to read data from * @param The type of values in the source {@link Iterable} and resulting Flux @@ -868,7 +868,7 @@ public static Flux fromIterable(Iterable it) { * {@link #retry()}). The {@link Stream} is {@link Stream#close() closed} automatically * by the operator on cancellation, error or completion. *

- * + * *

* @param s the {@link Stream} to read data from * @param The type of values in the source {@link Stream} and resulting Flux @@ -886,7 +886,7 @@ public static Flux fromStream(Stream s) { * {@link Stream#close() closed} automatically by the operator on cancellation, error * or completion. *

- * + * *

* @param streamSupplier the {@link Supplier} that generates the {@link Stream} from * which to read data @@ -902,7 +902,7 @@ public static Flux fromStream(Supplier> streamSupplie * Programmatically create a {@link Flux} by generating signals one-by-one via a * consumer callback. *

- * + * *

* * @param the value type emitted @@ -920,7 +920,7 @@ public static Flux generate(Consumer> generator) { * Programmatically create a {@link Flux} by generating signals one-by-one via a * consumer callback and some state. The {@code stateSupplier} may return {@literal null}. *

- * + * *

* * @param the value type emitted @@ -941,7 +941,7 @@ public static Flux generate(Callable stateSupplier, BiFunction - * + * *

* * @param the value type emitted @@ -968,7 +968,7 @@ public static Flux generate(Callable stateSupplier, BiFunction * Runs on the {@link Schedulers#parallel()} Scheduler. *

- * + * *

* @param period the period {@link Duration} between each increment * @return a new {@link Flux} emitting increasing numbers at regular intervals @@ -987,7 +987,7 @@ public static Flux interval(Duration period) { *

* Runs on the {@link Schedulers#parallel()} Scheduler. *

- * + * * * @param delay the {@link Duration} to wait before emitting 0l * @param period the period {@link Duration} before each following increment @@ -1005,7 +1005,7 @@ public static Flux interval(Duration delay, Duration period) { * {@code IllegalStateException} detailing the tick that couldn't be emitted. * In normal conditions, the {@link Flux} will never complete. *

- * + * *

* @param period the period {@link Duration} between each increment * @param timer a time-capable {@link Scheduler} instance to run on @@ -1024,7 +1024,7 @@ public static Flux interval(Duration period, Scheduler timer) { * detailing the tick that couldn't be emitted. In normal conditions, the {@link Flux} * will never complete. *

- * + * * * @param delay the {@link Duration} to wait before emitting 0l * @param period the period {@link Duration} before each following increment @@ -1039,7 +1039,7 @@ public static Flux interval(Duration delay, Duration period, Scheduler tim /** * Create a {@link Flux} that emits the provided elements and then completes. *

- * + * *

* @param data the elements to emit, as a vararg * @param the emitted data type @@ -1054,7 +1054,7 @@ public static Flux just(T... data) { /** * Create a new {@link Flux} that will only emit a single element then onComplete. *

- * + * *

* @param data the single element to emit * @param the emitted data type @@ -1070,7 +1070,7 @@ public static Flux just(T data) { * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat}, inner * sources are subscribed to eagerly. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1094,7 +1094,7 @@ public static Flux merge(Publisher> sour * sources are subscribed to eagerly (but at most {@code concurrency} sources are * subscribed to at the same time). *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1117,7 +1117,7 @@ public static Flux merge(Publisher> sour * sources are subscribed to eagerly (but at most {@code concurrency} sources are * subscribed to at the same time). *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1148,7 +1148,7 @@ public static Flux merge(Publisher> sour * sources are subscribed to eagerly. * A new {@link Iterator} will be created for each subscriber. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1169,7 +1169,7 @@ public static Flux merge(Iterable> sourc * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat}, * sources are subscribed to eagerly. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1191,7 +1191,7 @@ public static Flux merge(Publisher... sources) { * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat}, * sources are subscribed to eagerly. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1215,7 +1215,7 @@ public static Flux merge(int prefetch, Publisher... sources) * sources are subscribed to eagerly. * This variant will delay any error until after the rest of the merge backlog has been processed. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -1239,7 +1239,7 @@ public static Flux mergeDelayError(int prefetch, Publisher.. * eagerly. Unlike merge, their emitted values are merged into the final sequence in * subscription order. *

- * + * *

* @param sources a {@link Publisher} of {@link Publisher} sources to merge * @param the merged type @@ -1257,7 +1257,7 @@ public static Flux mergeSequential(Publisher - * + * *

* @param sources a {@link Publisher} of {@link Publisher} sources to merge * @param prefetch the inner source request size @@ -1278,7 +1278,7 @@ public static Flux mergeSequential(Publisher - * + * *

* @param sources a {@link Publisher} of {@link Publisher} sources to merge * @param prefetch the inner source request size @@ -1297,7 +1297,7 @@ public static Flux mergeSequentialDelayError(Publisher - * + * *

* @param sources a number of {@link Publisher} sequences to merge * @param the merged type @@ -1314,7 +1314,7 @@ public static Flux mergeSequential(Publisher... sources) { * into an ordered merged sequence. Unlike concat, sources are subscribed to * eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order. *

- * + * *

* @param prefetch the inner source request size * @param sources a number of {@link Publisher} sequences to merge @@ -1334,7 +1334,7 @@ public static Flux mergeSequential(int prefetch, Publisher.. * This variant will delay any error until after the rest of the mergeSequential backlog * has been processed. *

- * + * *

* @param prefetch the inner source request size * @param sources a number of {@link Publisher} sequences to merge @@ -1352,7 +1352,7 @@ public static Flux mergeSequentialDelayError(int prefetch, Publisher - * + * *

* @param sources an {@link Iterable} of {@link Publisher} sequences to merge * @param the merged type @@ -1370,7 +1370,7 @@ public static Flux mergeSequential(Iterable - * + * *

* @param sources an {@link Iterable} of {@link Publisher} sequences to merge * @param maxConcurrency the request produced to the main source thus limiting concurrent merge backlog @@ -1392,7 +1392,7 @@ public static Flux mergeSequential(Iterable - * + * *

* @param sources an {@link Iterable} of {@link Publisher} sequences to merge * @param maxConcurrency the request produced to the main source thus limiting concurrent merge backlog @@ -1409,7 +1409,7 @@ public static Flux mergeSequentialDelayError(Iterable - * + * *

* @param the {@link Subscriber} type target * @@ -1425,7 +1425,7 @@ public static Flux never() { * and {@code start + count} (excluded) then complete. * *

- * + * * * @param start the first integer to be emit * @param count the total number of incrementing values to emit, including the first value @@ -1449,7 +1449,7 @@ public static Flux range(int start, int count) { * the source (source has completed) and the last mirrored {@link Publisher} has also * completed. *

- * * * @param mergedPublishers The {@link Publisher} of {@link Publisher} to switch on and mirror. @@ -1469,7 +1469,7 @@ public static Flux switchOnNext(Publisher - * * * @param mergedPublishers The {@link Publisher} of {@link Publisher} to switch on and mirror. @@ -1492,7 +1492,7 @@ public static Flux switchOnNext(Publisher - * * * @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource @@ -1516,7 +1516,7 @@ public static Flux using(Callable resourceSupplier, Funct *

*

- * * * @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource @@ -1543,7 +1543,7 @@ public static Flux using(Callable resourceSupplier, Funct * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* * @param source1 The first {@link Publisher} source to zip. @@ -1574,7 +1574,7 @@ public static Flux zip(Publisher source1, * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source1 The first {@link Publisher} source to zip. * @param source2 The second {@link Publisher} source to zip. @@ -1594,7 +1594,7 @@ public static Flux> zip(Publisher source1, * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source1 The first upstream {@link Publisher} to subscribe to. * @param source2 The second upstream {@link Publisher} to subscribe to. @@ -1618,7 +1618,7 @@ public static Flux> zip(Publisher * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source1 The first upstream {@link Publisher} to subscribe to. * @param source2 The second upstream {@link Publisher} to subscribe to. @@ -1645,7 +1645,7 @@ public static Flux> zip(Publisher - * + * *

* @param source1 The first upstream {@link Publisher} to subscribe to. * @param source2 The second upstream {@link Publisher} to subscribe to. @@ -1675,7 +1675,7 @@ public static Flux> zip(Publishe * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source1 The first upstream {@link Publisher} to subscribe to. * @param source2 The second upstream {@link Publisher} to subscribe to. @@ -1712,7 +1712,7 @@ public static Flux> zip( * The {@link Iterable#iterator()} will be called on each {@link Publisher#subscribe(Subscriber)}. * *

- * + * * * @param sources the {@link Iterable} providing sources to zip * @param combinator The aggregate function that will receive a unique value from each upstream and return the value @@ -1738,7 +1738,7 @@ public static Flux zip(Iterable> sources, * The {@link Iterable#iterator()} will be called on each {@link Publisher#subscribe(Subscriber)}. * *

- * + * * * @param sources the {@link Iterable} providing sources to zip * @param prefetch the inner source request size @@ -1766,7 +1766,7 @@ public static Flux zip(Iterable> sources, * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param combinator The aggregate function that will receive a unique value from each upstream and return the * value to signal downstream @@ -1790,7 +1790,7 @@ public static Flux zip( * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param combinator The aggregate function that will receive a unique value from each upstream and return the * value to signal downstream @@ -1836,7 +1836,7 @@ public static Flux zip(final Function c * Note that the {@link Publisher} sources from the outer {@link Publisher} will * accumulate into an exhaustive list before starting zip operation. *

- * * * @param sources The {@link Publisher} of {@link Publisher} sources to zip. A finite publisher is required. @@ -1872,7 +1872,7 @@ public Publisher apply(List> publishers) { * the predicate doesn't match a value. * *

- * + * * * @param predicate the {@link Predicate} that needs to apply to all emitted items * @@ -1891,7 +1891,7 @@ public final Mono all(Predicate predicate) { * doesn't match the predicate. * *

- * + * * * @param predicate the {@link Predicate} that needs to apply to at least one emitted item * @@ -2002,7 +2002,7 @@ public final T blockLast(Duration timeout) { * Collect all incoming values into a single {@link List} buffer that will be emitted * by the returned {@link Flux} once this Flux completes. *

- * * * @return a buffered {@link Flux} of at most one {@link List} @@ -2017,7 +2017,7 @@ public final Flux> buffer() { * by the returned {@link Flux} each time the given max size is reached or once this * Flux completes. *

- * * * @param maxSize the maximum collected size @@ -2033,7 +2033,7 @@ public final Flux> buffer(int maxSize) { * will be emitted by the returned {@link Flux} each time the given max size is reached * or once this Flux completes. *

- * * * @param maxSize the maximum collected size @@ -2054,17 +2054,17 @@ public final > Flux buffer(int maxSize, Suppl *

* When maxSize < skip : dropping buffers *

- * *

* When maxSize > skip : overlapping buffers *

- * *

* When maxSize == skip : exact buffers *

- * * * @param skip the number of items to count before creating a new buffer @@ -2084,17 +2084,17 @@ public final Flux> buffer(int maxSize, int skip) { *

* When maxSize < skip : dropping buffers *

- * *

* When maxSize > skip : overlapping buffers *

- * *

* When maxSize == skip : exact buffers *

- * * * @param skip the number of items to count before creating a new buffer @@ -2114,7 +2114,7 @@ public final > Flux buffer(int maxSize, * Collect incoming values into multiple {@link List} buffers, as delimited by the * signals of a companion {@link Publisher} this operator will subscribe to. *

- * * * @param other the companion {@link Publisher} whose signals trigger new buffers @@ -2130,7 +2130,7 @@ public final Flux> buffer(Publisher other) { * delimited by the signals of a companion {@link Publisher} this operator will * subscribe to. *

- * * * @param other the companion {@link Publisher} whose signals trigger new buffers @@ -2147,7 +2147,7 @@ public final > Flux buffer(Publisher other * Collect incoming values into multiple {@link List} buffers that will be emitted by * the returned {@link Flux} every {@code timespan}. *

- * * * @param timespan the duration from buffer creation until a buffer is closed and emitted @@ -2165,17 +2165,17 @@ public final Flux> buffer(Duration timespan) { *

* When timespan < timeshift : dropping buffers *

- * *

* When timespan > timeshift : overlapping buffers *

- * *

* When timespan == timeshift : exact buffers *

- * * * @param timespan the duration from buffer creation until a buffer is closed and emitted @@ -2191,7 +2191,7 @@ public final Flux> buffer(Duration timespan, Duration timeshift) { * Collect incoming values into multiple {@link List} buffers that will be emitted by * the returned {@link Flux} every {@code timespan}, as measured on the provided {@link Scheduler}. *

- * * * @param timespan the duration from buffer creation until a buffer is closed and emitted @@ -2211,17 +2211,17 @@ public final Flux> buffer(Duration timespan, Scheduler timer) { *

* When timespan < timeshift : dropping buffers *

- * *

* When timespan > timeshift : overlapping buffers *

- * *

* When timespan == timeshift : exact buffers *

- * * * @param timespan the duration from buffer creation until a buffer is closed and emitted @@ -2243,7 +2243,7 @@ public final Flux> buffer(Duration timespan, Duration timeshift, Schedul * by the returned {@link Flux} each time the buffer reaches a maximum size OR the * timespan {@link Duration} elapses. *

- * * * @param maxSize the max collected size @@ -2260,7 +2260,7 @@ public final Flux> bufferTimeout(int maxSize, Duration timespan) { * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the timespan {@link Duration} elapses. *

- * * * @param maxSize the max collected size @@ -2279,7 +2279,7 @@ public final > Flux bufferTimeout(int maxSize * by the returned {@link Flux} each time the buffer reaches a maximum size OR the * timespan {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

- * * * @param maxSize the max collected size @@ -2297,7 +2297,7 @@ public final Flux> bufferTimeout(int maxSize, Duration timespan, Schedul * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the timespan {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

- * * * @param maxSize the max collected size @@ -2318,7 +2318,7 @@ public final > Flux bufferTimeout(int maxSiz * the element that triggers the predicate to return true (and thus closes a buffer) * is included as last element in the emitted buffer. *

- * *

* On completion, if the latest buffer is non-empty and has not been closed it is @@ -2341,7 +2341,7 @@ public final Flux> bufferUntil(Predicate predicate) { * push it to true to include the boundary element in the newly opened buffer, false to * include it in the closed buffer (as in {@link #bufferUntil(Predicate)}). *

- * *

* On completion, if the latest buffer is non-empty and has not been closed it is @@ -2365,7 +2365,7 @@ public final Flux> bufferUntil(Predicate predicate, boolean c * predicate returns false... Note that the element that triggers the predicate * to return false (and thus closes a buffer) is NOT included in any emitted buffer. *

- * *

* On completion, if the latest buffer is non-empty and has not been closed it is @@ -2387,17 +2387,17 @@ public final Flux> bufferWhile(Predicate predicate) { *

* When Open signal is strictly not overlapping Close signal : dropping buffers *

- * *

* When Open signal is strictly more frequent than Close signal : overlapping buffers *

- * *

* When Open signal is exactly coordinated with Close signal : exact buffers *

- * * * @param bucketOpening a companion {@link Publisher} to subscribe for buffer creation signals. @@ -2421,17 +2421,17 @@ public final Flux> bufferWhen(Publisher bucketOpening, *

* When Open signal is strictly not overlapping Close signal : dropping buffers *

- * *

* When Open signal is strictly more frequent than Close signal : overlapping buffers *

- * *

* When Open signal is exactly coordinated with Close signal : exact buffers *

- * * * @param bucketOpening a companion {@link Publisher} to subscribe for buffer creation signals. @@ -2456,7 +2456,7 @@ public final > Flux bufferWhen(Publishe * retain an unbounded volume of onNext signals. Completion and Error will also be * replayed. *

- * * * @return a replaying {@link Flux} @@ -2473,7 +2473,7 @@ public final Flux cache() { * Note that {@code cache(0)} will only cache the terminal signal without * expiration. *

- * + * * * @param history number of elements retained in cache * @@ -2491,7 +2491,7 @@ public final Flux cache(int history) { * Completion and Error will also be replayed until {@code ttl} triggers in which case * the next {@link Subscriber} will start over a new subscription. *

- * * * @param ttl Time-to-live for each cached item and post termination. @@ -2510,7 +2510,7 @@ public final Flux cache(Duration ttl) { * Completion and Error will also be replayed until {@code ttl} triggers in which case * the next {@link Subscriber} will start over a new subscription. *

- * * * @param history number of elements retained in cache @@ -2526,7 +2526,7 @@ public final Flux cache(int history, Duration ttl) { * Cast the current {@link Flux} produced type into a target produced type. * *

- * + * * * @param the {@link Flux} output type * @param clazz the target class to cast to @@ -2621,7 +2621,7 @@ public final Flux checkpoint(@Nullable String description, boolean forceStack * The collected result will be emitted when this sequence completes. * *

- * + * * * @param the container type * @param containerSupplier the supplier of the container instance for each Subscriber @@ -2640,7 +2640,7 @@ public final Mono collect(Supplier containerSupplier, BiConsumer - * + * * * @param collector the {@link Collector} * @param The mutable accumulation type @@ -2658,7 +2658,7 @@ public final Mono collect(Collector collect * emitted by the resulting {@link Mono} when this sequence completes. * *

- * + * * * @return a {@link Mono} of a {@link List} of all values from this {@link Flux} */ @@ -2704,7 +2704,7 @@ public final Mono> collectList() { * will be the most recently emitted element. * *

- * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @param the type of the key extracted from each source element @@ -2726,7 +2726,7 @@ public final Mono> collectMap(Function key * from the most recently emitted element. * *

- * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @param valueExtractor a {@link Function} to map elements to a value for the {@link Map} @@ -2751,7 +2751,7 @@ public final Mono> collectMap(Function * from the most recently emitted element. * *

- * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @param valueExtractor a {@link Function} to map elements to a value for the {@link Map} @@ -2781,7 +2781,7 @@ public final Mono> collectMap( * associated to said key. * *

- * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @@ -2801,7 +2801,7 @@ public final Mono>> collectMultimap(Function - * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @param valueExtractor a {@link Function} to map elements to a value for the {@link Map} @@ -2825,7 +2825,7 @@ public final Mono>> collectMultimap(Function - * + * * * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map} * @param valueExtractor a {@link Function} to map elements to a value for the {@link Map} @@ -2858,7 +2858,7 @@ public final Mono>> collectMultimap( * resulting {@link Mono}. * *

- * + * * * @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order */ @@ -2872,7 +2872,7 @@ public final Mono> collectSortedList() { * by the resulting {@link Mono}. * *

- * + * * * @param comparator a {@link Comparator} to sort the items of this sequences * @@ -2933,7 +2933,7 @@ public final Flux compose(Function, ? extends Publisher - * + * * * @param mapper the function to transform this sequence of T into concatenated sequences of V * @param the produced concatenated type @@ -2967,7 +2967,7 @@ public final Flux concatMap(Function - * + * * * @param mapper the function to transform this sequence of T into concatenated sequences of V * @param prefetch the inner source produced demand @@ -3003,7 +3003,7 @@ public final Flux concatMap(Function - * + * * * * @param mapper the function to transform this sequence of T into concatenated sequences of V @@ -3039,7 +3039,7 @@ public final Flux concatMapDelayError(Function - * + * * * * @param mapper the function to transform this sequence of T into concatenated sequences of V @@ -3078,7 +3078,7 @@ public final Flux concatMapDelayError(Function - * + * * * * @param mapper the function to transform this sequence of T into concatenated sequences of V @@ -3102,7 +3102,7 @@ public final Flux concatMapDelayError(Function - * + * *

* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. @@ -3123,7 +3123,7 @@ public final Flux concatMapIterable(Function - * + * *

* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. @@ -3145,7 +3145,7 @@ public final Flux concatMapIterable(Function - * * * @param other the {@link Publisher} sequence to concat after this {@link Flux} @@ -3167,7 +3167,7 @@ public final Flux concatWith(Publisher other) { * The count will be emitted when onComplete is observed. * *

- * + * * * @return a new {@link Mono} of {@link Long} count */ @@ -3178,7 +3178,7 @@ public final Mono count() { /** * Provide a default unique value if this sequence is completed without any data *

- * + * *

* @param defaultV the alternate value if this sequence is empty * @@ -3195,7 +3195,7 @@ public final Flux defaultIfEmpty(T defaultV) { * immediate error signals are not delayed. * *

- * + * * * @param delay duration by which to delay each {@link Subscriber#onNext} signal * @return a delayed {@link Flux} @@ -3211,7 +3211,7 @@ public final Flux delayElements(Duration delay) { * {@link Scheduler}, but empty sequences or immediate error signals are not delayed. * *

- * + * * * @param delay period to delay each {@link Subscriber#onNext} signal * @param timer a time-capable {@link Scheduler} instance to delay each signal on @@ -3284,7 +3284,7 @@ public final Flux delaySequence(Duration delay, Scheduler timer) { * Note that unlike with the {@link Mono#delayUntil(Function) Mono variant} there is * no fusion of subsequent calls. *

- * + * * * @param triggerProvider a {@link Function} that maps each element into a * {@link Publisher} whose termination will trigger relaying the value. @@ -3301,7 +3301,7 @@ public final Flux delayUntil(Function> trig * period elapses. The delay is introduced through the {@link Schedulers#parallel() parallel} default Scheduler. * *

- * + * * * @param delay duration before subscribing this {@link Flux} * @@ -3317,7 +3317,7 @@ public final Flux delaySubscription(Duration delay) { * period elapses, as measured on the user-provided {@link Scheduler}. * *

- * + * * * @param delay {@link Duration} before subscribing this {@link Flux} * @param timer a time-capable {@link Scheduler} instance to run on @@ -3333,7 +3333,7 @@ public final Flux delaySubscription(Duration delay, Scheduler timer) { * source until another {@link Publisher} signals a value or completes. * *

- * + * * * @param subscriptionDelay a companion {@link Publisher} whose onNext/onComplete signal will trigger the {@link Flux#subscribe(Subscriber) subscription} * @param the other source type @@ -3353,7 +3353,7 @@ public final Flux delaySubscription(Publisher subscriptionDelay) { * onComplete. * *

- * + * * * @param the dematerialized type * @@ -3376,7 +3376,7 @@ public final Flux dematerialize() { * elements as distinct due to a hashcode collision. * *

- * + * * * @return a filtering {@link Flux} only emitting distinct values */ @@ -3390,7 +3390,7 @@ public final Flux distinct() { * provided {@link Function}. * *

- * + * * * @param keySelector function to compute comparison key for each element * @param the type of the key extracted from each value in this sequence @@ -3408,7 +3408,7 @@ public final Flux distinct(Function keySelector) * of the {@link Collection} supplied (typically a {@link Set}). * *

- * + * * * @param keySelector function to compute comparison key for each element * @param distinctCollectionSupplier supplier of the {@link Collection} used for distinct @@ -3434,7 +3434,7 @@ public final > Flux distinct( * one another). * *

- * + * *

* The values themselves are recorded into a {@link HashSet} for distinct detection. * Use {@code distinctUntilChanged(Object::hashcode)} if you want a more lightweight approach that @@ -3454,7 +3454,7 @@ public final Flux distinctUntilChanged() { * using equality. * *

- * + * * * @param keySelector function to compute comparison key for each element @@ -3472,7 +3472,7 @@ public final Flux distinctUntilChanged(Function k * after one another), as compared by a key extracted through the user provided {@link * Function} and then comparing keys with the supplied {@link BiPredicate}. *

- * * * @param keySelector function to compute comparison key for each element @@ -3493,7 +3493,7 @@ public final Flux distinctUntilChanged(Function k /** * Add behavior (side-effect) triggered after the {@link Flux} terminates, either by completing downstream successfully or with an error. *

- * + * *

* @param afterTerminate the callback to call after {@link Subscriber#onComplete} or {@link Subscriber#onError} * @@ -3507,7 +3507,7 @@ public final Flux doAfterTerminate(Runnable afterTerminate) { /** * Add behavior (side-effect) triggered when the {@link Flux} is cancelled. *

- * + * *

* @param onCancel the callback to call on {@link Subscription#cancel} * @@ -3521,7 +3521,7 @@ public final Flux doOnCancel(Runnable onCancel) { /** * Add behavior (side-effect) triggered when the {@link Flux} completes successfully. *

- * + * *

* @param onComplete the callback to call on {@link Subscriber#onComplete} * @@ -3556,7 +3556,7 @@ public final Flux doOnEach(Consumer> signalConsumer) { /** * Add behavior (side-effect) triggered when the {@link Flux} completes with an error. *

- * + * *

* @param onError the callback to call on {@link Subscriber#onError} * @@ -3570,7 +3570,7 @@ public final Flux doOnError(Consumer onError) { /** * Add behavior (side-effect) triggered when the {@link Flux} completes with an error matching the given exception type. *

- * + * * * @param exceptionType the type of exceptions to handle * @param onError the error handler for each error @@ -3590,7 +3590,7 @@ public final Flux doOnError(Class exceptionType, /** * Add behavior (side-effect) triggered when the {@link Flux} completes with an error matching the given exception. *

- * + * * * @param predicate the matcher for exceptions to handle * @param onError the error handler for each error @@ -3611,7 +3611,7 @@ public final Flux doOnError(Predicate predicate, /** * Add behavior (side-effect) triggered when the {@link Flux} emits an item. *

- * + * *

* @param onNext the callback to call on {@link Subscriber#onNext} * @@ -3630,7 +3630,7 @@ public final Flux doOnNext(Consumer onNext) { * will simply trigger {@link Operators#onOperatorError(Throwable, Context)}. * *

- * + * * * @param consumer the consumer to invoke on each request * @@ -3644,7 +3644,7 @@ public final Flux doOnRequest(LongConsumer consumer) { /** * Add behavior (side-effect) triggered when the {@link Flux} is subscribed. *

- * + * *

* @param onSubscribe the callback to call on {@link Subscriber#onSubscribe} * @@ -3659,7 +3659,7 @@ public final Flux doOnSubscribe(Consumer onSubscribe) { * Add behavior (side-effect) triggered when the {@link Flux} terminates, either by * completing successfully or with an error. *

- * + * *

* @param onTerminate the callback to call on {@link Subscriber#onComplete} or {@link Subscriber#onError} * @@ -3708,7 +3708,7 @@ public final Flux doFinally(Consumer onFinally) { * First duration is measured between the subscription and the first element. * *

- * + * * * @return a new {@link Flux} that emits a tuple of time elapsed in milliseconds and matching data */ @@ -3722,7 +3722,7 @@ public final Flux> elapsed() { * between each signal as measured by the provided {@link Scheduler}. * First duration is measured between the subscription and the first element. *

- * * * @param scheduler a {@link Scheduler} instance to {@link Scheduler#now(TimeUnit) read time from} @@ -3739,7 +3739,7 @@ public final Flux> elapsed(Scheduler scheduler) { * if the sequence is shorter. * *

- * + * * * @param index zero-based index of the only item to emit * @@ -3754,7 +3754,7 @@ public final Mono elementAt(int index) { * default value if the sequence is shorter. * *

- * + * * * @param index zero-based index of the only item to emit * @param defaultValue a default value to emit if the sequence is shorter @@ -3924,7 +3924,7 @@ public final Flux expand(Function * emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream. * *

- * + * * * @param p the {@link Predicate} to test values against * @@ -3995,7 +3995,7 @@ public final Flux filterWhen(Function * (similar to merging the inner sequences). * *

- * + * *

* @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param the merged output sequence type @@ -4026,7 +4026,7 @@ public final Flux flatMap(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param concurrency the maximum number of in-flight inner sequences @@ -4059,7 +4059,7 @@ public final Flux flatMap(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param concurrency the maximum number of in-flight inner sequences @@ -4094,7 +4094,7 @@ public final Flux flatMap(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param concurrency the maximum number of in-flight inner sequences @@ -4127,7 +4127,7 @@ public final Flux flatMapDelayError(Function * OnError will be transformed into completion signal after its mapping callback has been applied. *

- * + * *

* @param mapperOnNext the {@link Function} to call on next data and returning a sequence to merge. * Use {@literal null} to ignore (provided at least one other mapper is specified). @@ -4157,7 +4157,7 @@ public final Flux flatMap( * merging them into a single {@link Flux}. * *

- * + * *

* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. @@ -4179,7 +4179,7 @@ public final Flux flatMapIterable(Function - * + * *

* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. @@ -4222,7 +4222,7 @@ public final Flux flatMapIterable(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param the merged output sequence type @@ -4262,7 +4262,7 @@ public final Flux flatMapSequential(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param maxConcurrency the maximum number of in-flight inner sequences @@ -4305,7 +4305,7 @@ public final Flux flatMapSequential(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param maxConcurrency the maximum number of in-flight inner sequences @@ -4350,7 +4350,7 @@ public final Flux flatMapSequential(Function - * + * * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param maxConcurrency the maximum number of in-flight inner sequences @@ -4379,7 +4379,7 @@ public int getPrefetch() { * function accordingly. * *

- * + * * *

* The groups need to be drained and consumed downstream for groupBy to work correctly. @@ -4403,7 +4403,7 @@ public final Flux> groupBy(Function - * + * * *

* The groups need to be drained and consumed downstream for groupBy to work correctly. @@ -4429,7 +4429,7 @@ public final Flux> groupBy(Function - * + * * *

* The groups need to be drained and consumed downstream for groupBy to work correctly. @@ -4458,7 +4458,7 @@ public final Flux> groupBy(Function - * + * * *

* The groups need to be drained and consumed downstream for groupBy to work correctly. @@ -4496,7 +4496,7 @@ public final Flux> groupBy(Function - * + * * * @param other the other {@link Publisher} to correlate items with * @param leftEnd a function that returns a Publisher whose emissions indicate the @@ -4552,7 +4552,7 @@ public final Flux handle(BiConsumer> handle * an element matches the value. * *

- * + * * * @param value constant compared to incoming signals * @@ -4571,7 +4571,7 @@ public final Mono hasElement(T value) { * The implementation uses short-circuit logic and completes with true on onNext. * *

- * + * * * @return a new {@link Mono} with true if any value is emitted and false * otherwise @@ -4628,7 +4628,7 @@ public final Flux index(BiFunction * Ignores onNext signals (dropping them) and only propagate termination events. * *

- * + * *

* * @return a new empty {@link Mono} representing the completion of this {@link Flux}. @@ -4646,7 +4646,7 @@ public final Mono ignoreElements() { * one or both source Publishers overlap. * *

- * + * * * * @param other the other {@link Publisher} to correlate items with @@ -4681,7 +4681,7 @@ public final Flux join( * For a passive version use {@link #takeLast(int)} * *

- * + * * * @return a {@link Mono} with the last value in this {@link Flux} */ @@ -4700,7 +4700,7 @@ public final Mono last() { * For a passive version use {@link #takeLast(int)} * *

- * + * * @param defaultValue a single fallback item if this {@link Flux} is empty * @return a {@link Mono} with the last value in this {@link Flux} */ @@ -4809,7 +4809,7 @@ public final Flux limitRequest(long requestCap) { * Default will use {@link Level#INFO} and {@code java.util.logging}. * If SLF4J is available, it will be used instead. *

- * + * *

* The default log category will be "reactor.Flux.", followed by a suffix generated from * the source operator, e.g. "reactor.Flux.Map". @@ -4825,7 +4825,7 @@ public final Flux log() { * Default will use {@link Level#INFO} and {@code java.util.logging}. * If SLF4J is available, it will be used instead. *

- * + * *

* @param category to be mapped into logger configuration (e.g. org.springframework * .reactor). If category ends with "." like "reactor.", a generated operator @@ -4847,7 +4847,7 @@ public final Flux log(String category) { *

 	 *     flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
 	 * 

- * + * *

* @param category to be mapped into logger configuration (e.g. org.springframework * .reactor). If category ends with "." like "reactor.", a generated operator @@ -4872,7 +4872,7 @@ public final Flux log(@Nullable String category, Level level, SignalType... o *

 	 *     flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
 	 *
-	 * 
 	 *
 	 * @param category to be mapped into logger configuration (e.g. org.springframework
@@ -4902,7 +4902,7 @@ public final Flux log(@Nullable String category,
 	 * Transform the items emitted by this {@link Flux} by applying a synchronous function
 	 * to each item.
 	 * 

- * + * *

* @param mapper the synchronous transforming {@link Function} * @param the transformed type @@ -4924,7 +4924,7 @@ public final Flux map(Function mapper) { * All these {@link Signal} have a {@link Context} associated to them. * *

- * + * * * @return a {@link Flux} of materialized {@link Signal} * @see #dematerialize() @@ -4938,7 +4938,7 @@ public final Flux> materialize() { * sequence. Unlike {@link #concatWith(Publisher) concat}, inner sources are subscribed * to eagerly. *

- * + * *

* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source @@ -4971,7 +4971,7 @@ public final Flux name(String name) { /** * Emit only the first item emitted by this {@link Flux}, into a new {@link Mono}. *

- * + * *

* * @return a new {@link Mono} emitting the first value in this {@link Flux} @@ -4991,7 +4991,7 @@ public final Mono next() { * the value is ignored and a request of 1 is emitted. * *

- * + * * * @param clazz the {@link Class} type to test values against * @@ -5008,7 +5008,7 @@ public final Flux ofType(final Class clazz) { * delayed until the buffer gets consumed. * *

- * + * * * @return a backpressured {@link Flux} that buffers with unbounded capacity * @@ -5024,7 +5024,7 @@ public final Flux onBackpressureBuffer() { * immediately emitted on overflow regardless of the pending buffer. * *

- * + * * * @param maxSize maximum buffer backlog size before immediate error * @@ -5042,7 +5042,7 @@ public final Flux onBackpressureBuffer(int maxSize) { * {@link Consumer} will be immediately invoked. * *

- * + * * * @param maxSize maximum buffer backlog size before overflow callback is called * @param onOverflow callback to invoke on overflow @@ -5064,7 +5064,7 @@ public final Flux onBackpressureBuffer(int maxSize, Consumer onOve * error will be delayed after the current backlog is consumed. * *

- * + * * * @param maxSize maximum buffer backlog size before overflow strategy is applied * @param bufferOverflowStrategy strategy to apply to overflowing elements @@ -5094,7 +5094,7 @@ public final Flux onBackpressureBuffer(int maxSize, BufferOverflowStrategy bu * invoked immediately. * *

- * + * * * @param maxSize maximum buffer backlog size before overflow callback is called * @param onBufferOverflow callback to invoke on overflow @@ -5122,7 +5122,7 @@ public final Flux onBackpressureBuffer(int maxSize, Consumer onBuf * is also immediately invoked when there is an overflow. * *

- * + * * * @param ttl maximum {@link Duration} for which an element is kept in the backlog * @param maxSize maximum buffer backlog size before overflow callback is called @@ -5145,7 +5145,7 @@ public final Flux onBackpressureBuffer(Duration ttl, int maxSize, Consumer - * + * * * @param ttl maximum {@link Duration} for which an element is kept in the backlog * @param maxSize maximum buffer backlog size before overflow callback is called @@ -5166,7 +5166,7 @@ public final Flux onBackpressureBuffer(Duration ttl, int maxSize, Consumer - * + * * * @return a backpressured {@link Flux} that drops overflowing elements */ @@ -5180,7 +5180,7 @@ public final Flux onBackpressureDrop() { * is requested downstream. * *

- * + * * * @param onDropped the Consumer called when an value gets dropped due to lack of downstream requests * @return a backpressured {@link Flux} that drops overflowing elements @@ -5195,7 +5195,7 @@ public final Flux onBackpressureDrop(Consumer onDropped) { * downstream. * *

- * + * * * @return a backpressured {@link Flux} that errors on overflowing elements */ @@ -5208,7 +5208,7 @@ public final Flux onBackpressureError() { * the most recent observed item if not enough demand is requested downstream. * *

- * + * * * @return a backpressured {@link Flux} that will only keep a reference to the last observed item */ @@ -5220,7 +5220,7 @@ public final Flux onBackpressureLatest() { * Transform any error emitted by this {@link Flux} by synchronously applying a function to it. *

- * *

* @@ -5236,7 +5236,7 @@ public final Flux onErrorMap(Function * Transform an error emitted by this {@link Flux} by synchronously applying a function * to it if the error matches the given type. Otherwise let the error pass through. *

- * + * *

* @param type the class of the exception type to react to * @param mapper the error transforming {@link Function} @@ -5255,7 +5255,7 @@ public final Flux onErrorMap(Class type, * Transform an error emitted by this {@link Flux} by synchronously applying a function * to it if the error matches the given predicate. Otherwise let the error pass through. *

- * * * @param predicate the error predicate @@ -5273,7 +5273,7 @@ public final Flux onErrorMap(Predicate predicate, * choose the fallback depending on the error. * *

- * + * *

* @param fallback the function to choose the fallback to an alternative {@link Publisher} * @@ -5287,7 +5287,7 @@ public final Flux onErrorResume(Function - * * * @param type the error type to match @@ -5309,7 +5309,7 @@ public final Flux onErrorResume(Class type, * Subscribe to a fallback publisher when an error matching a given predicate * occurs. *

- * * * @param predicate the error predicate to match @@ -5326,7 +5326,7 @@ public final Flux onErrorResume(Predicate predicate, /** * Simply emit a captured fallback value when any error is observed on this {@link Flux}. *

- * + * *

* @param fallbackValue the value to emit if an error occurs * @@ -5340,7 +5340,7 @@ public final Flux onErrorReturn(T fallbackValue) { * Simply emit a captured fallback value when an error of the specified type is * observed on this {@link Flux}. *

- * + * * @param type the error type to match * @param fallbackValue the value to emit if an error occurs that matches the type * @param the error type @@ -5356,7 +5356,7 @@ public final Flux onErrorReturn(Class type, * Simply emit a captured fallback value when an error matching the given predicate is * observed on this {@link Flux}. *

- * + * * @param predicate the error predicate to match * @param fallbackValue the value to emit if an error occurs that matches the predicate * @@ -5385,7 +5385,7 @@ public final Flux onTerminateDetach() { * {@link Publisher}, effectively behaving like the fastest of these competing sources. * *

- * + * *

* @param other the {@link Publisher} to race with * @@ -5410,7 +5410,7 @@ public final Flux or(Publisher other) { * work in parallel, you should call {@link ParallelFlux#runOn(Scheduler)} afterward. * *

- * + * * * @return a new {@link ParallelFlux} instance */ @@ -5426,7 +5426,7 @@ public final ParallelFlux parallel() { * afterward. * *

- * + * * * @param parallelism the number of parallel rails * @@ -5444,7 +5444,7 @@ public final ParallelFlux parallel(int parallelism) { * {@link ParallelFlux#runOn(Scheduler)} afterward. * *

- * + * * * @param parallelism the number of parallel rails * @param prefetch the number of values to prefetch from the source @@ -5468,7 +5468,7 @@ public final ParallelFlux parallel(int parallelism, int prefetch) { * {@link Subscriber} is missing demand (requested = 0), multicast will pause * pushing/pulling. *

- * + * * * @return a new {@link ConnectableFlux} */ @@ -5485,7 +5485,7 @@ public final ConnectableFlux publish() { * {@link Subscriber} is missing demand (requested = 0), multicast will pause * pushing/pulling. *

- * + * * * @param prefetch bounded requested demand * @@ -5534,7 +5534,7 @@ public final Flux publish(Function, ? extends Publisher - * + * * * @return a new {@link Mono} */ @@ -5549,7 +5549,7 @@ public final Mono publishNext() { * This operator influences the threading context where the rest of the operators in * the chain below it will execute, up to a new occurrence of {@code publishOn}. *

- * + * *

* Typically used for fast publisher, slow consumer(s) scenarios. *

@@ -5571,7 +5571,7 @@ public final Flux publishOn(Scheduler scheduler) {
 	 * This operator influences the threading context where the rest of the operators in
 	 * the chain below it will execute, up to a new occurrence of {@code publishOn}.
 	 * 

- * + * *

* Typically used for fast publisher, slow consumer(s) scenarios. *

@@ -5594,7 +5594,7 @@ public final Flux publishOn(Scheduler scheduler, int prefetch) {
 	 * This operator influences the threading context where the rest of the operators in
 	 * the chain below it will execute, up to a new occurrence of {@code publishOn}.
 	 * 

- * + * *

* Typically used for fast publisher, slow consumer(s) scenarios. *

@@ -5639,7 +5639,7 @@ final Flux publishOn(Scheduler scheduler, boolean delayError, int prefetch, i
 	 * elements.
 	 *
 	 * 

- * + * * * @param aggregator the reducing {@link BiFunction} * @@ -5662,7 +5662,7 @@ public final Mono reduce(BiFunction aggregator) { * value, {@literal initial}. * *

- * + * * * @param accumulator the reducing {@link BiFunction} * @param initial the seed, the initial leftmost argument to pass to the reducing {@link BiFunction} @@ -5683,7 +5683,7 @@ public final Mono reduce(A initial, BiFunction accumulat * element is paired with the seed value, supplied via {@literal initial}. * *

- * + * * * @param accumulator the reducing {@link BiFunction} * @param initial a {@link Supplier} of the seed, called on subscription and passed to the the reducing {@link BiFunction} @@ -5701,7 +5701,7 @@ public final Mono reduceWith(Supplier initial, BiFunction - * + * * * @return an indefinitely repeated {@link Flux} on onComplete */ @@ -5713,7 +5713,7 @@ public final Flux repeat() { * Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. * *

- * + * * * @param predicate the boolean to evaluate on onComplete. * @@ -5727,7 +5727,7 @@ public final Flux repeat(BooleanSupplier predicate) { * Repeatedly subscribe to the source {@literal numRepeat} times. * *

- * + * * * @param numRepeat the number of times to re-subscribe on onComplete * @@ -5745,7 +5745,7 @@ public final Flux repeat(long numRepeat) { * subscription. A specified maximum of repeat will limit the number of re-subscribe. * *

- * + * * * @param numRepeat the number of times to re-subscribe on complete * @param predicate the boolean to evaluate on onComplete @@ -5764,7 +5764,7 @@ public final Flux repeat(long numRepeat, BooleanSupplier predicate) { *

If the companion sequence signals when this {@link Flux} is active, the repeat * attempt is suppressed. *

- * + * * * @param repeatFactory the {@link Function} that returns the associated {@link Publisher} * companion, given a {@link Flux} that signals each onComplete as a {@link Long} @@ -5782,7 +5782,7 @@ public final Flux repeatWhen(Function, ? extends Publisher> rep * retain an unbounded amount of onNext signals. Completion and Error will also be * replayed. *

- * * * @return a replaying {@link ConnectableFlux} @@ -5801,7 +5801,7 @@ public final ConnectableFlux replay() { * expiration. * *

- * + * * * @param history number of events retained in history excluding complete and * error @@ -5822,7 +5822,7 @@ public final ConnectableFlux replay(int history) { * the next {@link Subscriber} will start over a new subscription * *

- * * * @param ttl Per-item and post termination timeout duration @@ -5842,7 +5842,7 @@ public final ConnectableFlux replay(Duration ttl) { * the next {@link Subscriber} will start over a new subscription * *

- * * * @param history number of events retained in history excluding complete and error @@ -5862,7 +5862,7 @@ public final ConnectableFlux replay(int history, Duration ttl) { * Completion and Error will also be replayed until {@code ttl} triggers in which case * the next {@link Subscriber} will start over a new subscription *

- * * * @param ttl Per-item and post termination timeout duration @@ -5882,7 +5882,7 @@ public final ConnectableFlux replay(Duration ttl, Scheduler timer) { * Completion and Error will also be replayed until {@code ttl} triggers in which case * the next {@link Subscriber} will start over a new subscription *

- * * * @param history number of events retained in history excluding complete and error @@ -5899,7 +5899,7 @@ public final ConnectableFlux replay(int history, Duration ttl, Scheduler time /** * Re-subscribes to this {@link Flux} sequence if it signals any error, indefinitely. *

- * + * * * @return a {@link Flux} that retries on onError */ @@ -5913,7 +5913,7 @@ public final Flux retry() { *

* Note that passing {@literal Long.MAX_VALUE} is treated as infinite retry. *

- * + * * * @param numRetries the number of times to tolerate an error * @@ -5929,7 +5929,7 @@ public final Flux retry(long numRetries) { * that matches the given {@link Predicate}, otherwise push the error downstream. * *

- * + * * * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal * @@ -5944,7 +5944,7 @@ public final Flux retry(Predicate retryMatcher) { * error that match the given {@link Predicate}, otherwise push the error downstream. * *

- * + * * * @param numRetries the number of times to tolerate an error * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal @@ -5964,7 +5964,7 @@ public final Flux retry(long numRetries, Predicate retryMa * immediately. * *

- * + * * * @param whenFactory the {@link Function} that returns the associated {@link Publisher} * companion, given a {@link Flux} that signals each onError as a {@link Throwable}. @@ -5984,7 +5984,7 @@ public final Flux retryWhen(Function, ? extends Publisher> * signal. * *

- * + * * * @param timespan the duration of the window after which to emit the latest observed item * @@ -6008,7 +6008,7 @@ public final Flux sample(Duration timespan) { * would interfere with the sampling precision. * *

- * + * * * @param sampler the sampler companion {@link Publisher} * @@ -6025,7 +6025,7 @@ public final Flux sample(Publisher sampler) { * within a given duration. * *

- * + * * * @param timespan the duration during which to skip values after each sample * @@ -6040,7 +6040,7 @@ public final Flux sampleFirst(Duration timespan) { * before the next signal from a companion sampler {@link Publisher}. * *

- * + * * * @param samplerFactory supply a companion sampler {@link Publisher} which signals the end of the skip window * @param the companion reified type @@ -6059,7 +6059,7 @@ public final Flux sampleFirst(Function> * Note that this means that the last value in the sequence is always emitted. * *

- * + * * * @param throttlerFactory supply a companion sampler {@link Publisher} which signals * the end of the window during which no new emission should occur. If it is the case, @@ -6083,7 +6083,7 @@ public final Flux sampleTimeout(Function - * + * * * @param throttlerFactory supply a companion sampler {@link Publisher} which signals * the end of the window during which no new emission should occur. If it is the case, @@ -6117,7 +6117,7 @@ public final Flux sampleTimeout(Function

* *

- * + * * * @param accumulator the accumulating {@link BiFunction} * @@ -6141,7 +6141,7 @@ public final Flux scan(BiFunction accumulator) { *

* *

- * + * * * @param initial the initial leftmost argument to pass to the reduce function * @param accumulator the accumulating {@link BiFunction} @@ -6170,7 +6170,7 @@ public final Flux scan(A initial, BiFunction accumulator *

* *

- * + * * * @param initial the supplier providing the seed, the leftmost parameter initially * passed to the reduce function @@ -6208,7 +6208,7 @@ public final Flux share() { * {@link IndexOutOfBoundsException} for a source with more than one element. * *

- * + * * * @return a {@link Mono} with the single item or an error signal */ @@ -6243,7 +6243,7 @@ public final Mono single() { * source with more than one element. * *

- * + * * @param defaultValue a single fallback item if this {@link Flux} is empty * * @return a {@link Mono} with the expected single item, the supplied default value or @@ -6279,7 +6279,7 @@ public final Mono single(T defaultValue) { * source but signal an {@link IndexOutOfBoundsException} for a source with more than * one element. *

- * + * * * @return a {@link Mono} with the expected single item, no item or an error */ @@ -6297,7 +6297,7 @@ public final Mono singleOrEmpty() { * emit the remaining elements. * *

- * + * * * @param skipped the number of elements to drop * @@ -6317,7 +6317,7 @@ public final Flux skip(long skipped) { * Skip elements from this {@link Flux} emitted within the specified initial duration. * *

- * + * * * @param timespan the initial time window during which to drop elements * @@ -6332,7 +6332,7 @@ public final Flux skip(Duration timespan) { * as measured on the provided {@link Scheduler}. * *

- * + * * * @param timespan the initial time window during which to drop elements * @param timer a time-capable {@link Scheduler} instance to measure the time window on @@ -6352,7 +6352,7 @@ public final Flux skip(Duration timespan, Scheduler timer) { * Skip a specified number of elements at the end of this {@link Flux} sequence. * *

- * + * * * @param n the number of elements to drop before completion * @@ -6372,7 +6372,7 @@ public final Flux skipLast(int n) { * value. The resulting {@link Flux} will include and emit the matching value. * *

- * + * * * @param untilPredicate the {@link Predicate} evaluated to stop skipping. * @@ -6387,7 +6387,7 @@ public final Flux skipUntil(Predicate untilPredicate) { * an onNext or onComplete. * *

- * + * * * @param other the companion {@link Publisher} to coordinate with to stop skipping * @@ -6402,7 +6402,7 @@ public final Flux skipUntilOther(Publisher other) { * Skips values from this {@link Flux} while a {@link Predicate} returns true for the value. * *

- * + * * * @param skipPredicate the {@link Predicate} that causes skipping while evaluating to true. * @@ -6449,7 +6449,7 @@ public final Flux sort(Comparator sortFunction) { * Prepend the given {@link Iterable} before this {@link Flux} sequence. * *

- * + * * * @param iterable the sequence of values to start the resulting {@link Flux} with * @@ -6463,7 +6463,7 @@ public final Flux startWith(Iterable iterable) { * Prepend the given values before this {@link Flux} sequence. * *

- * + * * * @param values the array of values to start the resulting {@link Flux} with * @@ -6478,7 +6478,7 @@ public final Flux startWith(T... values) { * Prepend the given {@link Publisher} sequence to this {@link Flux} sequence. * *

- * + * * * @param publisher the Publisher whose values to prepend * @@ -6499,7 +6499,7 @@ public final Flux startWith(Publisher publisher) { * chain, especially no error handling, so other variants should usually be preferred. * *

- * + * *

* * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription} @@ -6522,7 +6522,7 @@ public final Disposable subscribe() { * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value (onNext signal) * @@ -6548,7 +6548,7 @@ public final Disposable subscribe(Consumer consumer) { * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each next signal * @param errorConsumer the consumer to invoke on error signal @@ -6575,7 +6575,7 @@ public final Disposable subscribe(@Nullable Consumer consumer, Consum * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value * @param errorConsumer the consumer to invoke on error signal @@ -6608,7 +6608,7 @@ public final Disposable subscribe( * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value * @param errorConsumer the consumer to invoke on error signal @@ -6705,7 +6705,7 @@ public final Flux subscriberContext(Function doOnContext) { * In such case, you should call {@link #subscribeOn(Scheduler, boolean) subscribeOn(scheduler, false)} * instead. *

- * + * *

* Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios. * @@ -6742,7 +6742,7 @@ public final Flux subscribeOn(Scheduler scheduler) { * Thus this operator has a {@code requestOnSeparateThread} parameter, which should be * set to {@code false} in this case. *

- * + * *

* Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios. * @@ -6804,7 +6804,7 @@ public final > E subscribeWith(E subscriber) { /** * Switch to an alternative {@link Publisher} if this sequence is completed without any data. *

- * + * *

* @param alternate the alternative {@link Publisher} if this sequence is empty * @@ -6820,7 +6820,7 @@ public final Flux switchIfEmpty(Publisher alternate) { * Publisher are emitted in the resulting {@link Flux}. * *

- * + * * * @param fn the {@link Function} to generate a {@link Publisher} for each source value * @param the type of the return value of the transformation function @@ -6839,7 +6839,7 @@ public final Flux switchMap(Function> f * Publisher are emitted in the resulting {@link Flux}. * *

- * + * * * @param fn the {@link Function} to generate a {@link Publisher} for each source value * @param prefetch the produced demand for inner sources @@ -6870,7 +6870,7 @@ public final Flux tag(String key, String value) { /** * Take only the first N values from this {@link Flux}, if available. *

- * + * *

* If N is zero, the resulting {@link Flux} completes as soon as this {@link Flux} * signals its first value (which is not not relayed, though). @@ -6883,7 +6883,7 @@ public final Flux tag(String key, String value) { * using {@link #limitRequest(long)} instead. * *

- * + * * @param n the number of items to emit from this {@link Flux} * * @return a {@link Flux} limited to size N @@ -6903,7 +6903,7 @@ public final Flux take(long n) { * signals its first value (which is not not relayed, though). * *

- * + * * * @param timespan the {@link Duration} of the time window during which to emit elements * from this {@link Flux} @@ -6922,7 +6922,7 @@ public final Flux take(Duration timespan) { * signals its first value (which is not not relayed, though). * *

- * + * * * @param timespan the {@link Duration} of the time window during which to emit elements * from this {@link Flux} @@ -6943,7 +6943,7 @@ public final Flux take(Duration timespan, Scheduler timer) { * Emit the last N values this {@link Flux} emitted before its completion. * *

- * + * * * @param n the number of items from this {@link Flux} to retain and emit on onComplete * @@ -6962,7 +6962,7 @@ public final Flux takeLast(int n) { * This includes the matching data (unlike {@link #takeWhile}). * *

- * + * * * @param predicate the {@link Predicate} that stops the taking of values from this {@link Flux} * when returning {@literal true}. @@ -6978,7 +6978,7 @@ public final Flux takeUntil(Predicate predicate) { * Relay values from this {@link Flux} until the given {@link Publisher} emits. * *

- * + * * * @param other the companion {@link Publisher} that signals when to stop taking values from this {@link Flux} * @@ -6995,7 +6995,7 @@ public final Flux takeUntilOther(Publisher other) { * This only includes the matching data (unlike {@link #takeUntil}). * *

- * + * * * @param continuePredicate the {@link Predicate} invoked each onNext returning {@literal TRUE} * to relay a value or {@literal FALSE} to terminate @@ -7010,7 +7010,7 @@ public final Flux takeWhile(Predicate continuePredicate) { * Return a {@code Mono} that completes when this {@link Flux} completes. * This will actively ignore the sequence and only replay completion or error signals. *

- * + * *

* @return a new {@link Mono} representing the termination of this {@link Flux} */ @@ -7028,7 +7028,7 @@ public final Mono then() { * replayed in the resulting {@code Mono}. * *

- * + * * * @param other a {@link Mono} to emit from after termination * @param the element type of the supplied Mono @@ -7044,7 +7044,7 @@ public final Mono then(Mono other) { * for a supplied {@link Publisher Publisher<Void>} to also complete. The * second completion signal is replayed, or any error signal that occurs instead. *

- * * * @param other a {@link Publisher} to wait for after this Flux's termination @@ -7061,7 +7061,7 @@ public final Mono thenEmpty(Publisher other) { * In other words ignore element from this flux and transform the completion signal into a * {@code Publisher} that will emit elements from the provided {@link Publisher}. *

- * + * * * @param other a {@link Publisher} to emit from after termination * @param the element type of the supplied Publisher @@ -7086,7 +7086,7 @@ public final Flux thenMany(Publisher other) { * given {@link Duration} from the previous emission (or the subscription for the first item). * *

- * + * * * @param timeout the timeout between two signals from this {@link Flux} * @@ -7103,7 +7103,7 @@ public final Flux timeout(Duration timeout) { * If the given {@link Publisher} is null, signal a {@link TimeoutException} instead. * *

- * + * * * @param timeout the timeout between two signals from this {@link Flux} * @param fallback the fallback {@link Publisher} to subscribe when a timeout occurs @@ -7120,7 +7120,7 @@ public final Flux timeout(Duration timeout, @Nullable Publisher * item), as measured by the specified {@link Scheduler}. * *

- * + * * * @param timeout the timeout {@link Duration} between two signals from this {@link Flux} * @param timer a time-capable {@link Scheduler} instance to run on @@ -7139,7 +7139,7 @@ public final Flux timeout(Duration timeout, Scheduler timer) { * If the given {@link Publisher} is null, signal a {@link TimeoutException} instead. * *

- * + * * * @param timeout the timeout {@link Duration} between two signals from this {@link Flux} * @param fallback the fallback {@link Publisher} to subscribe when a timeout occurs @@ -7164,7 +7164,7 @@ public final Flux timeout(Duration timeout, * not been emitted before the given {@link Publisher} emits. * *

- * + * * * @param firstTimeout the companion {@link Publisher} that will trigger a timeout if * emitting before the first signal from this {@link Flux} @@ -7186,7 +7186,7 @@ public final Flux timeout(Publisher firstTimeout) { * the latest element signals. * *

- * + * * * @param firstTimeout the timeout {@link Publisher} that must not emit before the first signal from this {@link Flux} * @param nextTimeoutFactory the timeout {@link Publisher} factory for each next item @@ -7215,7 +7215,7 @@ private final Flux timeout(Publisher firstTimeout, * the latest element signals. * *

- * + * * * @param firstTimeout the timeout {@link Publisher} that must not emit before the first signal from this {@link Flux} * @param nextTimeoutFactory the timeout {@link Publisher} factory for each next item @@ -7240,7 +7240,7 @@ public final Flux timeout(Publisher firstTimeout, * Scheduler) and T2 the emitted data (as a {@code T}), for each item from this {@link Flux}. * *

- * + * * * @return a timestamped {@link Flux} */ @@ -7254,7 +7254,7 @@ public final Flux> timestamp() { * the emitted data (as a {@code T}), for each item from this {@link Flux}. * *

- * + * * * @param scheduler the {@link Scheduler} to read time from * @return a timestamped {@link Flux} @@ -7269,7 +7269,7 @@ public final Flux> timestamp(Scheduler scheduler) { * {@link Iterator#next()} calls. * *

- * + * *

* * @return a blocking {@link Iterable} @@ -7286,7 +7286,7 @@ public final Iterable toIterable() { * {@code Integer.MAX_VALUE} for unbounded demand * *

- * + * *

* * @return a blocking {@link Iterable} @@ -7300,7 +7300,7 @@ public final Iterable toIterable(int batchSize) { * {@link Iterator#next()} calls. * *

- * + * * * @param batchSize the bounded capacity to prefetch from this {@link Flux} or * {@code Integer.MAX_VALUE} for unbounded demand @@ -7326,7 +7326,7 @@ public final Iterable toIterable(int batchSize, @Nullable Supplier> * {@link Subscriber#onNext(Object) onNext} call. * *

- * + * * * @return a {@link Stream} of unknown size with onClose attached to {@link Subscription#cancel()} */ @@ -7341,7 +7341,7 @@ public final Stream toStream() { * @param batchSize the bounded capacity to prefetch from this {@link Flux} or * {@code Integer.MAX_VALUE} for unbounded demand *

- * + * * * @return a {@link Stream} of unknown size with onClose attached to {@link Subscription#cancel()} */ @@ -7380,7 +7380,7 @@ public final Flux transform(Function, ? extends Publisher * Each {@link Flux} window will onComplete after {@code maxSize} items have been routed. * *

- * + * * * @param maxSize the maximum number of items to emit in the window before closing it * @@ -7397,15 +7397,15 @@ public final Flux> window(int maxSize) { *

* When maxSize < skip : dropping windows *

- * + * *

* When maxSize > skip : overlapping windows *

- * + * *

* When maxSize == skip : exact windows *

- * + * * * @param maxSize the maximum number of items to emit in the window before closing it * @param skip the number of items to count before opening and emitting a new window @@ -7425,7 +7425,7 @@ public final Flux> window(int maxSize, int skip) { * where the window boundary is signalled by another {@link Publisher} * *

- * + * * * @param boundary a {@link Publisher} to emit any item for a split signal and complete to terminate * @@ -7444,7 +7444,7 @@ public final Flux> window(Publisher boundary) { * Scheduler). * *

- * + * * * @param timespan the {@link Duration} to delimit {@link Flux} windows * @@ -7464,15 +7464,15 @@ public final Flux> window(Duration timespan) { *

* When timespan < timeshift : dropping windows *

- * + * *

* When timespan > timeshift : overlapping windows *

- * + * *

* When timespan == timeshift : exact windows *

- * + * * * @param timespan the maximum {@link Flux} window {@link Duration} * @param timeshift the period of time at which to create new {@link Flux} windows @@ -7490,7 +7490,7 @@ public final Flux> window(Duration timespan, Duration timeshift) { * for a {@code timespan} {@link Duration} (as measured on the provided {@link Scheduler}). * *

- * + * * * @param timespan the {@link Duration} to delimit {@link Flux} windows * @param timer a time-capable {@link Scheduler} instance to run on @@ -7511,15 +7511,15 @@ public final Flux> window(Duration timespan, Scheduler timer) { *

* When timespan < timeshift : dropping windows *

- * + * *

* When timespan > timeshift : overlapping windows *

- * + * *

* When timeshift == timeshift : exact windows *

- * + * * * @param timespan the maximum {@link Flux} window {@link Duration} * @param timeshift the period of time at which to create new {@link Flux} windows @@ -7543,7 +7543,7 @@ public final Flux> window(Duration timespan, Duration timeshift, Schedul * Scheduler). * *

- * + * * * @param maxSize the maximum number of items to emit in the window before closing it * @param timespan the maximum {@link Duration} since the window was opened before closing it @@ -7562,7 +7562,7 @@ public final Flux> windowTimeout(int maxSize, Duration timespan) { * {@link Scheduler}). * *

- * + * * * @param maxSize the maximum number of items to emit in the window before closing it * @param timespan the maximum {@link Duration} since the window was opened before closing it @@ -7580,7 +7580,7 @@ public final Flux> windowTimeout(int maxSize, Duration timespan, Schedul * point the previous window will receive the triggering element then onComplete. * *

- * + * * * @param boundaryTrigger a predicate that triggers the next window when it becomes true. * @return a {@link Flux} of {@link Flux} windows, bounded depending @@ -7599,12 +7599,12 @@ public final Flux> windowUntil(Predicate boundaryTrigger) { * sometimes emitted, eg. if the first element in the sequence immediately matches the * predicate. *

- * + * *

* Otherwise, the triggering element will be emitted in the old window before it does * onComplete, similar to {@link #windowUntil(Predicate)}. *

- * + * * * @param boundaryTrigger a predicate that triggers the next window when it becomes true. * @param cutBefore push to true to include the triggering element in the new window rather than the old. @@ -7625,12 +7625,12 @@ public final Flux> windowUntil(Predicate boundaryTrigger, boolean cut * sometimes emitted, eg. if the first element in the sequence immediately matches the * predicate. *

- * + * *

* Otherwise, the triggering element will be emitted in the old window before it does * onComplete, similar to {@link #windowUntil(Predicate)}. *

- * + * * * @param boundaryTrigger a predicate that triggers the next window when it becomes true. * @param cutBefore push to true to include the triggering element in the new window rather than the old. @@ -7656,7 +7656,7 @@ public final Flux> windowUntil(Predicate boundaryTrigger, boolean cut * separators anywhere in the sequence, each occurrence will lead to an empty window. * *

- * + * * * @param inclusionPredicate a predicate that triggers the next window when it becomes false. * @return a {@link Flux} of {@link Flux} windows, each containing @@ -7675,7 +7675,7 @@ public final Flux> windowWhile(Predicate inclusionPredicate) { * separators anywhere in the sequence, each occurrence will lead to an empty window. * *

- * + * * * @param inclusionPredicate a predicate that triggers the next window when it becomes false. * @param prefetch the request size to use for this {@link Flux}. @@ -7698,15 +7698,15 @@ public final Flux> windowWhile(Predicate inclusionPredicate, int pref *

* When Open signal is strictly not overlapping Close signal : dropping windows *

- * + * *

* When Open signal is strictly more frequent than Close signal : overlapping windows *

- * + * *

* When Open signal is exactly coordinated with Close signal : exact windows *

- * + * * * @param bucketOpening a {@link Publisher} that opens a new window when it emits any item * @param closeSelector a {@link Function} given an opening signal and returning a {@link Publisher} that @@ -7736,7 +7736,7 @@ public final Flux> windowWhen(Publisher bucketOpening, * If the other {@link Publisher} completes without any value, the sequence is completed. * *

- * + * * * @param other the {@link Publisher} to combine with * @param resultSelector the bi-function called with each pair of source and other @@ -7759,7 +7759,7 @@ public final Flux withLatestFrom(Publisher other, BiFunct * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source2 The second source {@link Publisher} to zip with this {@link Flux}. * @param type of the value from source2 @@ -7779,7 +7779,7 @@ public final Flux> zipWith(Publisher source2) { * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source2 The second source {@link Publisher} to zip with this {@link Flux}. * @param combinator The aggregate function that will receive a unique value from each @@ -7810,7 +7810,7 @@ public final Flux zipWith(Publisher source2, * Errors will immediately be forwarded. * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. *

- * + * *

* @param source2 The second source {@link Publisher} to zip with this {@link Flux}. * @param prefetch the request size to use for this {@link Flux} and the other {@link Publisher} @@ -7839,7 +7839,7 @@ public final Flux zipWith(Publisher source2, * This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. * *

- * + * *

* @param source2 The second source {@link Publisher} to zip with this {@link Flux}. * @param prefetch the request size to use for this {@link Flux} and the other {@link Publisher} @@ -7856,7 +7856,7 @@ public final Flux> zipWith(Publisher source2, i * to say combine one element from each, pairwise, into a {@link Tuple2}. * *

- * + * * * @param iterable the {@link Iterable} to zip with * @param the value type of the other iterable sequence @@ -7875,7 +7875,7 @@ public final Flux> zipWithIterable(Iterable ite * * *

- * + * * * @param iterable the {@link Iterable} to zip with * @param zipper the {@link BiFunction} pair combinator diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/FluxProcessor.java index fabf4e5782..74279297fe 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxProcessor.java @@ -48,7 +48,7 @@ public abstract class FluxProcessor extends Flux * completed. * *

- * + * * * @param the produced type * @return a {@link FluxProcessor} accepting publishers and producing T diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index e5cc716716..a295007dd6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -64,7 +64,7 @@ * with an error. * *

- * + * *

* *

The rx operators will offer aliases for input {@link Mono} type to preserve the "at most one" @@ -177,7 +177,7 @@ public static Mono create(Consumer> callback) { * each {@link Subscriber} downstream. * *

- * + * *

* @param supplier a {@link Mono} factory * @@ -196,7 +196,7 @@ public static Mono defer(Supplier> supplier) * The delay is introduced through the {@link Schedulers#parallel() parallel} default Scheduler. * *

- * + * *

* @param duration the duration of the delay * @@ -212,7 +212,7 @@ public static Mono delay(Duration duration) { * If the demand cannot be produced in time, an onError will be signalled instead. * *

- * + * *

* @param duration the {@link Duration} of the delay * @param timer a time-capable {@link Scheduler} instance to run on @@ -227,7 +227,7 @@ public static Mono delay(Duration duration, Scheduler timer) { * Create a {@link Mono} that completes without emitting any item. * *

- * + * *

* @param the reified {@link Subscriber} type * @@ -241,7 +241,7 @@ public static Mono empty() { * Create a {@link Mono} that terminates with the specified error immediately after * being subscribed to. *

- * + * *

* @param error the onError signal * @param the reified {@link Subscriber} type @@ -257,7 +257,7 @@ public static Mono error(Throwable error) { * and replay that signal, effectively behaving like the fastest of these competing * sources. *

- * + * *

* @param monos The deferred monos to use. * @param The type of the function result. @@ -273,7 +273,7 @@ public static Mono first(Mono... monos) { * Pick the first available result coming from any of the given monos and populate a new {@literal Mono}. * *

- * + * *

* @param monos The monos to use. * @param The type of the function result. @@ -288,7 +288,7 @@ public static Mono first(Iterable> monos) { * Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item. * The source emitter will be cancelled on the first `onNext`. *

- * + * *

* @param source the {@link Publisher} source * @param the source type @@ -314,7 +314,7 @@ public static Mono from(Publisher source) { * the Callable resolves to {@code null}, the resulting Mono completes empty. * *

- * + * *

* @param supplier {@link Callable} that will produce the value * @param type of the expected value @@ -329,7 +329,7 @@ public static Mono fromCallable(Callable supplier) { * Create a {@link Mono}, producing its value using the provided {@link CompletionStage}. * *

- * + * *

* @param completionStage {@link CompletionStage} that will produce a value (or a null to * complete immediately) @@ -376,7 +376,7 @@ public static Mono fromDirect(Publisher source){ * Create a {@link Mono}, producing its value using the provided {@link CompletableFuture}. * *

- * + * *

* @param future {@link CompletableFuture} that will produce a value (or a null to * complete immediately) @@ -393,7 +393,7 @@ public static Mono fromFuture(CompletableFuture future) { * been executed. * *

- * + * *

* @param runnable {@link Runnable} that will be executed before emitting the completion signal * @@ -409,7 +409,7 @@ public static Mono fromRunnable(Runnable runnable) { * the Supplier resolves to {@code null}, the resulting Mono completes empty. * *

- * + * *

* @param supplier {@link Supplier} that will produce the value * @param type of the expected value @@ -426,7 +426,7 @@ public static Mono fromSupplier(Supplier supplier) { * but completes when the source completes. * *

- * + * *

* @param source the {@link Publisher} to ignore * @param the source type of the ignored data @@ -442,7 +442,7 @@ public static Mono ignoreElements(Publisher source) { * instantiation time. * *

- * + * *

* @param data the only item to onNext * @param the type of the produced item @@ -458,7 +458,7 @@ public static Mono just(T data) { * onComplete. * *

- * + * *

* @param data the {@link Optional} item to onNext or onComplete if not present * @param the type of the produced item @@ -474,7 +474,7 @@ public static Mono justOrEmpty(@Nullable Optional data) { * onComplete. * *

- * + * *

* @param data the item to onNext or onComplete if null * @param the type of the produced item @@ -490,7 +490,7 @@ public static Mono justOrEmpty(@Nullable T data) { * Return a {@link Mono} that will never signal any data, error or completion signal, * essentially running indefinitely. *

- * + * *

* @param the {@link Subscriber} type target * @@ -567,7 +567,7 @@ public static Mono sequenceEqual(Publisher source1, * {@link Context#empty() empty Context}. * *

- * + * *

* * @return a new {@link Mono} emitting current context @@ -585,7 +585,7 @@ public static Mono subscriberContext() { *

  • Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup * Consumer may override the terminal event.
  • Non-eager cleanup will drop any exception.
*

- * * * @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource @@ -613,7 +613,7 @@ public static Mono using(Callable resourceSupplier, * Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer * may override the terminal event. *

- * * * @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource @@ -635,7 +635,7 @@ public static Mono using(Callable resourceSupplier, * when all of the given {@literal sources} have been fulfilled. An error will cause * pending results to be cancelled and immediate error emission to the returned {@link Mono}. *

- * + * *

* @param sources The sources to use. * @@ -659,7 +659,7 @@ public static Mono when(Publisher... sources) { * to the returned {@link Mono}. * *

- * + * *

* * @param sources The sources to use. @@ -678,7 +678,7 @@ public static Mono when(final Iterable> sources) { * If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* * @param sources The sources to use. @@ -695,7 +695,7 @@ public static Mono whenDelayError(final Iterable> s * If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* @param sources The sources to use. * @@ -718,7 +718,7 @@ public static Mono whenDelayError(Publisher... sources) { * returned {@link Mono}. * *

- * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -738,7 +738,7 @@ public static Mono> zip(Mono p1, Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -762,7 +762,7 @@ public static Mono zip(Mono p1, Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -785,7 +785,7 @@ public static Mono> zip(Mono p1, M * returned {@link Mono}. * *

- * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -813,7 +813,7 @@ public static Mono> zip(Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -844,7 +844,7 @@ public static Mono> zip(Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -877,7 +877,7 @@ public static Mono> zip( * If any Mono terminates without value, the returned sequence will be terminated immediately and pending results cancelled. * *

- * + * *

* * @param monos The monos to use. @@ -897,7 +897,7 @@ public static Mono zip(final Iterable> monos, Function< * An error will cause pending results to be cancelled and immediate error emission to the * returned {@link Mono}. *

- * + * *

* @param monos The monos to use. * @param combinator the function to transform the combined array into an arbitrary @@ -922,7 +922,7 @@ public static Mono zip(Function combinator * If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -942,7 +942,7 @@ public static Mono> zipDelayError(Mono p1, * If several Monos error, the two exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -964,7 +964,7 @@ public static Mono> zipDelayError(Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -990,7 +990,7 @@ public static Mono> zipDelayError(Mono - * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -1020,7 +1020,7 @@ public static Mono> zipDelayErro * If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* @param p1 The first upstream {@link Publisher} to subscribe to. * @param p2 The second upstream {@link Publisher} to subscribe to. @@ -1054,7 +1054,7 @@ public static Mono> zipD * If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* * @param monos The monos to use. @@ -1075,7 +1075,7 @@ public static Mono zipDelayError(final Iterable> monos, * If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception). * *

- * + * *

* @param monos The monos to use. * @param combinator the function to transform the combined array into an arbitrary @@ -1122,7 +1122,7 @@ public final

P as(Function, P> transformer) { * void mono * *

- * + * *

* @param other the {@link Publisher} to wait for * complete @@ -1148,7 +1148,7 @@ public final Mono and(Publisher other) { * it was a checked exception). * *

- * + * *

* Note that each block() will trigger a new subscription: in other words, the result * might miss signal from hot publishers. @@ -1170,7 +1170,7 @@ public T block() { * If the provided timeout expires,a {@link RuntimeException} is thrown. * *

- * + * *

* Note that each block() will trigger a new subscription: in other words, the result * might miss signal from hot publishers. @@ -1194,7 +1194,7 @@ public T block(Duration timeout) { * {@link RuntimeException} if it was a checked exception). * *

- * + * *

* Note that each blockOptional() will trigger a new subscription: in other words, the result * might miss signal from hot publishers. @@ -1217,7 +1217,7 @@ public Optional blockOptional() { * If the provided timeout expires, a {@link RuntimeException} is thrown. * *

- * + * *

* Note that each block() will trigger a new subscription: in other words, the result * might miss signal from hot publishers. @@ -1236,7 +1236,7 @@ public Optional blockOptional(Duration timeout) { * Cast the current {@link Mono} produced type into a target produced type. * *

- * + * * * @param the {@link Mono} output type * @param clazz the target type to cast to @@ -1252,7 +1252,7 @@ public final Mono cast(Class clazz) { * Turn this {@link Mono} into a hot source and cache last emitted signals for further {@link Subscriber}. * Completion and Error will also be replayed. *

- * * * @return a replaying {@link Mono} @@ -1268,7 +1268,7 @@ public final Mono cache() { * Completion and Error will also be replayed until {@code ttl} triggers in which case * the next {@link Subscriber} will start over a new subscription. *

- * * * @return a replaying {@link Mono} @@ -1379,7 +1379,7 @@ public final Mono compose(Function, ? extends Publisher - * + * * * @param other the {@link Publisher} sequence to concat after this {@link Flux} * @@ -1393,7 +1393,7 @@ public final Flux concatWith(Publisher other) { * Provide a default single value if this mono is completed without any data * *

- * + * *

* @param defaultV the alternate value if this sequence is empty * @@ -1422,7 +1422,7 @@ public final Mono defaultIfEmpty(T defaultV) { * duration. Empty Monos or error signals are not delayed. * *

- * + * * *

* Note that the scheduler on which the Mono chain continues execution will be the @@ -1441,7 +1441,7 @@ public final Mono delayElement(Duration delay) { * {@link Duration}, on a particular {@link Scheduler}. Empty monos or error signals are not delayed. * *

- * + * * *

* Note that the scheduler on which the mono chain continues execution will be the @@ -1469,7 +1469,7 @@ public final Mono delayElement(Duration delay, Scheduler timer) { * completes. Error is propagated immediately * downstream. In both cases, an error in the source is immediately propagated. *

- * + * * * @param triggerProvider a {@link Function} that maps this Mono's value into a * {@link Publisher} whose termination will trigger relaying the value. @@ -1489,7 +1489,7 @@ public final Mono delayUntil(Function> trig * period elapses. * *

- * + * * * @param delay duration before subscribing this {@link Mono} * @@ -1505,7 +1505,7 @@ public final Mono delaySubscription(Duration delay) { * {@link Duration} elapses. * *

- * + * * * @param delay {@link Duration} before subscribing this {@link Mono} * @param timer a time-capable {@link Scheduler} instance to run on @@ -1522,7 +1522,7 @@ public final Mono delaySubscription(Duration delay, Scheduler timer) { * signals a value or completes. * *

- * + * * * @param subscriptionDelay a * {@link Publisher} to signal by next or complete this {@link Mono#subscribe(Subscriber)} @@ -1543,7 +1543,7 @@ public final Mono delaySubscription(Publisher subscriptionDelay) { * onComplete. * *

- * + * * @param the dematerialized type * * @return a dematerialized {@link Mono} @@ -1565,7 +1565,7 @@ public final Mono dematerialize() { * * *

- * + * *

* @param afterSuccessOrError the callback to call after {@link Subscriber#onNext}, {@link Subscriber#onComplete} without preceding {@link Subscriber#onNext} or {@link Subscriber#onError} * @@ -1579,7 +1579,7 @@ public final Mono doAfterSuccessOrError(BiConsumer afte * Add behavior (side-effect) triggered after the {@link Mono} terminates, either by * completing downstream successfully or with an error. *

- * + * *

* @param afterTerminate the callback to call after {@link Subscriber#onComplete} or {@link Subscriber#onError} * @@ -1619,7 +1619,7 @@ public final Mono doFinally(Consumer onFinally) { * * *

- * + * *

* @param onCancel the callback to call on {@link Subscription#cancel()} * @@ -1635,7 +1635,7 @@ public final Mono doOnCancel(Runnable onCancel) { * Add behavior triggered when the {@link Mono} emits a data successfully. * *

- * + * *

* @param onNext the callback to call on {@link Subscriber#onNext} * @@ -1655,7 +1655,7 @@ public final Mono doOnNext(Consumer onNext) { * * *

- * + * *

* @param onSuccess the callback to call on, argument is null if the {@link Mono} * completes without data @@ -1694,7 +1694,7 @@ public final Mono doOnEach(Consumer> signalConsumer) { * Add behavior triggered when the {@link Mono} completes with an error. * *

- * + * *

* @param onError the error callback to call on {@link Subscriber#onError(Throwable)} * @@ -1709,7 +1709,7 @@ public final Mono doOnError(Consumer onError) { /** * Add behavior triggered when the {@link Mono} completes with an error matching the given exception type. *

- * + * * * @param exceptionType the type of exceptions to handle * @param onError the error handler for relevant errors @@ -1729,7 +1729,7 @@ public final Mono doOnError(Class exceptionType, /** * Add behavior triggered when the {@link Mono} completes with an error matching the given predicate. *

- * + * * * @param predicate the matcher for exceptions to handle * @param onError the error handler for relevant error @@ -1753,7 +1753,7 @@ public final Mono doOnError(Predicate predicate, * will simply trigger {@link Operators#onOperatorError(Throwable, Context)}. * *

- * + * * * @param consumer the consumer to invoke on each request * @@ -1768,7 +1768,7 @@ public final Mono doOnRequest(final LongConsumer consumer) { * Add behavior triggered when the {@link Mono} is subscribed. * *

- * + * *

* @param onSubscribe the callback to call on {@link Subscriber#onSubscribe(Subscription)} * @@ -1789,7 +1789,7 @@ public final Mono doOnSubscribe(Consumer onSubscribe) { * * *

- * + * *

* @param onSuccessOrError the callback to call {@link Subscriber#onNext}, {@link Subscriber#onComplete} without preceding {@link Subscriber#onNext} or {@link Subscriber#onError} * @@ -1804,7 +1804,7 @@ public final Mono doOnSuccessOrError(BiConsumer onSucce * Add behavior triggered when the {@link Mono} terminates, either by completing successfully or with an error. * *

- * + * *

* @param onTerminate the callback to call {@link Subscriber#onNext}, {@link Subscriber#onComplete} without preceding {@link Subscriber#onNext} or {@link Subscriber#onError} * @@ -1827,7 +1827,7 @@ public final Mono doOnTerminate(Runnable onTerminate) { * the subscribe and the first next signal, as measured by the {@link Schedulers#parallel() parallel} scheduler. * *

- * + * * * @return a new {@link Mono} that emits a tuple of time elapsed in milliseconds and matching data */ @@ -1841,7 +1841,7 @@ public final Mono> elapsed() { * next signal, as measured by the provided {@link Scheduler}. * *

- * + * * * @param scheduler a {@link Scheduler} instance to read time from * @return a new {@link Mono} that emits a tuple of time elapsed in milliseconds and matching data @@ -2010,7 +2010,7 @@ public final Flux expand(Function * Otherwise complete without value. * *

- * + * *

* @param tester the predicate to evaluate * @@ -2045,7 +2045,7 @@ public final Mono filterWhen(Function * value emitted by another {@link Mono} (possibly changing the value type). * *

- * + * *

* @param transformer the function to dynamically bind a new {@link Mono} * @param the result type bound @@ -2062,7 +2062,7 @@ public final Mono flatMap(Function * its emissions into the returned {@link Flux}. * *

- * + * *

* @param mapper the * {@link Function} to produce a sequence of R from the the eventual passed {@link Subscriber#onNext} @@ -2079,7 +2079,7 @@ public final Flux flatMapMany(Function - * + * *

* @param mapperOnNext the {@link Function} to call on next data and returning a sequence to merge * @param mapperOnError the {@link Function} to call on error signal and returning a sequence to merge @@ -2102,7 +2102,7 @@ public final Flux flatMapMany(Function - * + * * * @param mapper the {@link Function} to transform input item into a sequence {@link Iterable} * @param the merged output sequence type @@ -2145,7 +2145,7 @@ public final Flux flux() { * Emit a single boolean true if this {@link Mono} has an element. * *

- * + * * * @return a new {@link Mono} with true if a value is emitted and false * otherwise @@ -2188,7 +2188,7 @@ public final Mono hide() { * Ignores onNext signal (dropping it) and only propagates termination events. * *

- * + * *

* * @return a new empty {@link Mono} representing the completion of this {@link Mono}. @@ -2203,7 +2203,7 @@ public final Mono ignoreElement() { * If SLF4J is available, it will be used instead. * *

- * + * *

* The default log category will be "reactor.Mono", followed by a suffix generated from * the source operator, e.g. "reactor.Mono.Map". @@ -2221,7 +2221,7 @@ public final Mono log() { * use {@link Level#INFO} and java.util.logging. If SLF4J is available, it will be used instead. * *

- * + * *

* @param category to be mapped into logger configuration (e.g. org.springframework * .reactor). If category ends with "." like "reactor.", a generated operator @@ -2242,7 +2242,7 @@ public final Mono log(@Nullable String category) { *

 	 *     mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)
 	 * 

- * + * *

* @param category to be mapped into logger configuration (e.g. org.springframework * .reactor). If category ends with "." like "reactor.", a generated operator @@ -2269,7 +2269,7 @@ public final Mono log(@Nullable String category, Level level, SignalType... o *

 	 *     mono.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
 	 * 

- * + * *

* @param category to be mapped into logger configuration (e.g. org.springframework * .reactor). If category ends with "." like "reactor.", a generated operator @@ -2299,7 +2299,7 @@ public final Mono log(@Nullable String category, * Transform the item emitted by this {@link Mono} by applying a synchronous function to it. * *

- * + * *

* @param mapper the synchronous transforming {@link Function} * @param the transformed type @@ -2320,7 +2320,7 @@ public final Mono map(Function mapper) { * emitted. Complete signal will first emit a {@code Signal.complete()} and then effectively complete the flux. * All these {@link Signal} have a {@link Context} associated to them. *

- * + * * * @return a {@link Mono} of materialized {@link Signal} * @see #dematerialize() @@ -2334,7 +2334,7 @@ public final Mono> materialize() { * The element from the Mono may be interleaved with the elements of the Publisher. * *

- * + * *

* @param other the {@link Publisher} to merge with * @@ -2359,7 +2359,7 @@ public final Mono name(String name) { * Emit the first available result from this mono or the other mono. * *

- * + * *

* @param other the racing other {@link Mono} to compete with for the result * @@ -2383,7 +2383,7 @@ public final Mono or(Mono other) { * value is ignored. * *

- * + * * * @param clazz the {@link Class} type to test values against * @@ -2398,7 +2398,7 @@ public final Mono ofType(final Class clazz) { * Transform an error emitted by this {@link Mono} by synchronously applying a function * to it if the error matches the given predicate. Otherwise let the error pass through. *

- * * * @param predicate the error predicate @@ -2415,7 +2415,7 @@ public final Mono onErrorMap(Predicate predicate, /** * Transform any error emitted by this {@link Mono} by synchronously applying a function to it. *

- * + * *

* @param mapper the error transforming {@link Function} * @@ -2429,7 +2429,7 @@ public final Mono onErrorMap(Function * Transform an error emitted by this {@link Mono} by synchronously applying a function * to it if the error matches the given type. Otherwise let the error pass through. *

- * + * *

* @param type the class of the exception type to react to * @param mapper the error transforming {@link Function} @@ -2449,7 +2449,7 @@ public final Mono onErrorMap(Class type, * choose the fallback depending on the error. * *

- * + * *

* @param fallback the function to choose the fallback to an alternative {@link Mono} * @@ -2466,7 +2466,7 @@ public final Mono onErrorResume(Function - * * * @param type the error type to match @@ -2489,7 +2489,7 @@ public final Mono onErrorResume(Class type, * Subscribe to a fallback publisher when an error matching a given predicate * occurs. *

- * * * @param predicate the error predicate to match @@ -2507,7 +2507,7 @@ public final Mono onErrorResume(Predicate predicate, * Simply emit a captured fallback value when any error is observed on this {@link Mono}. * *

- * + * *

* @param fallback the value to emit if an error occurs * @@ -2521,7 +2521,7 @@ public final Mono onErrorReturn(final T fallback) { * Simply emit a captured fallback value when an error of the specified type is * observed on this {@link Mono}. *

- * + * * @param type the error type to match * @param fallbackValue the value to emit if an error occurs that matches the type * @param the error type @@ -2536,7 +2536,7 @@ public final Mono onErrorReturn(Class type, T fallba * Simply emit a captured fallback value when an error matching the given predicate is * observed on this {@link Mono}. *

- * + * * @param predicate the error predicate to match * @param fallbackValue the value to emit if an error occurs that matches the predicate * @@ -2580,7 +2580,7 @@ public final Mono publish(Function, ? extends Mono - * + * *

* Typically used for fast publisher, slow consumer(s) scenarios. * @@ -2615,7 +2615,7 @@ public final Mono publishOn(Scheduler scheduler) { * previous subscription. * *

- * + * * * @return an indefinitely repeated {@link Flux} on onComplete */ @@ -2627,7 +2627,7 @@ public final Flux repeat() { * Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. * *

- * + * * * @param predicate the boolean to evaluate on onComplete. * @@ -2642,7 +2642,7 @@ public final Flux repeat(BooleanSupplier predicate) { * Repeatedly subscribe to the source {@literal numRepeat} times. * *

- * + * * * @param numRepeat the number of times to re-subscribe on onComplete * @@ -2660,7 +2660,7 @@ public final Flux repeat(long numRepeat) { * subscription. A specified maximum of repeat will limit the number of re-subscribe. * *

- * + * * * @param numRepeat the number of times to re-subscribe on complete * @param predicate the boolean to evaluate on onComplete @@ -2680,7 +2680,7 @@ public final Flux repeat(long numRepeat, BooleanSupplier predicate) { * attempt is suppressed. * *

- * + * * * @param repeatFactory the {@link Function} that returns the associated {@link Publisher} * companion, given a {@link Flux} that signals each onComplete as a {@link Long} @@ -2700,7 +2700,7 @@ public final Flux repeatWhen(Function, ? extends Publisher> rep * Any terminal signal will terminate the resulting {@link Mono} with the same signal immediately. * *

- * + * * * @param repeatFactory the {@link Function} that returns the associated {@link Publisher} * companion, given a {@link Flux} that signals each onComplete as a 0-based incrementing {@link Long}. @@ -2723,7 +2723,7 @@ public final Mono repeatWhenEmpty(Function, ? extends Publisher * it is different from {@code Integer.MAX_VALUE}). * *

- * + * * * @param maxRepeat the maximum number of repeats (infinite if {@code Integer.MAX_VALUE}) * @param repeatFactory the {@link Function} that returns the associated {@link Publisher} @@ -2756,7 +2756,7 @@ public final Mono repeatWhenEmpty(int maxRepeat, Function, ? exten /** * Re-subscribes to this {@link Mono} sequence if it signals any error, indefinitely. *

- * + * * * @return a {@link Mono} that retries on onError */ @@ -2770,7 +2770,7 @@ public final Mono retry() { *

* Note that passing {@literal Long.MAX_VALUE} is treated as infinite retry. *

- * + * * * @param numRetries the number of times to tolerate an error * @@ -2785,7 +2785,7 @@ public final Mono retry(long numRetries) { * that matches the given {@link Predicate}, otherwise push the error downstream. * *

- * + * * * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal * @@ -2800,7 +2800,7 @@ public final Mono retry(Predicate retryMatcher) { * error that match the given {@link Predicate}, otherwise push the error downstream. * *

- * + * * * @param numRetries the number of times to tolerate an error * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal @@ -2821,7 +2821,7 @@ public final Mono retry(long numRetries, Predicate retryMa * immediately. * *

- * + * * * @param whenFactory the {@link Function} that returns the associated {@link Publisher} * companion, given a {@link Flux} that signals each onError as a {@link Throwable}. @@ -2837,7 +2837,7 @@ public final Mono retryWhen(Function, ? extends Publisher> * Expect exactly one item from this {@link Mono} source or signal * {@link java.util.NoSuchElementException} for an empty source. *

- * + * *

* Note Mono doesn't need {@link Flux#single(Object)}, since it is equivalent to * {@link #defaultIfEmpty(Object)} in a {@link Mono}. @@ -2876,7 +2876,7 @@ public final Mono single() { * chain, especially no error handling, so other variants should usually be preferred. * *

- * + * *

* * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription} @@ -2903,7 +2903,7 @@ public final Disposable subscribe() { * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value (onNext signal) * @@ -2927,7 +2927,7 @@ public final Disposable subscribe(Consumer consumer) { * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each next signal * @param errorConsumer the consumer to invoke on error signal @@ -2952,7 +2952,7 @@ public final Disposable subscribe(@Nullable Consumer consumer, Consum * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value * @param errorConsumer the consumer to invoke on error signal @@ -2982,7 +2982,7 @@ public final Disposable subscribe( * not invoked when executing in a main thread or a unit test for instance. * *

- * + * * * @param consumer the consumer to invoke on each value * @param errorConsumer the consumer to invoke on error signal @@ -3071,7 +3071,7 @@ public final Mono subscriberContext(Function doOnContext) { * context of onNext/onError/onComplete signals from the beginning of the chain up to * the next occurrence of a {@link #publishOn(Scheduler) publishOn}. *

- * + * * *

 	 * {@code mono.subscribeOn(Schedulers.parallel()).subscribe()) }
@@ -3119,7 +3119,7 @@ public final > E subscribeWith(E subscriber) {
 	 * Fallback to an alternative {@link Mono} if this mono is completed without data
 	 *
 	 * 

- * + * *

* @param alternate the alternate mono if this mono is empty * @@ -3197,7 +3197,7 @@ public final Mono takeUntilOther(Publisher other) { * from this {@link Mono}. * *

- * + * *

* @return a {@link Mono} ignoring its payload (actively dropping) */ @@ -3213,7 +3213,7 @@ public final Mono then() { * replayed in the resulting {@code Mono}. * *

- * + * * * @param other a {@link Mono} to emit from after termination * @param the element type of the supplied Mono @@ -3233,7 +3233,7 @@ public final Mono then(Mono other) { * for a supplied {@link Publisher Publisher<Void>} to also complete. The * second completion signal is replayed, or any error signal that occurs instead. *

- * * * @param other a {@link Publisher} to wait for after this Mono's termination @@ -3251,7 +3251,7 @@ public final Mono thenEmpty(Publisher other) { * {@code Flux} that will emit elements from the provided {@link Publisher}. * *

- * + * * * @param other a {@link Publisher} to emit from after termination * @param the element type of the supplied Publisher @@ -3270,7 +3270,7 @@ public final Flux thenMany(Publisher other) { * {@link Duration}. * *

- * + * * * @param timeout the timeout before the onNext signal from this {@link Mono} * @@ -3287,7 +3287,7 @@ public final Mono timeout(Duration timeout) { * If the fallback {@link Mono} is null, signal a {@link TimeoutException} instead. * *

- * + * * * @param timeout the timeout before the onNext signal from this {@link Mono} * @param fallback the fallback {@link Mono} to subscribe to when a timeout occurs @@ -3303,7 +3303,7 @@ public final Mono timeout(Duration timeout, Mono fallback) { * as measured on the provided {@link Scheduler}. * *

- * + * * * @param timeout the timeout before the onNext signal from this {@link Mono} * @param timer a time-capable {@link Scheduler} instance to run the delay on @@ -3321,7 +3321,7 @@ public final Mono timeout(Duration timeout, Scheduler timer) { *

If the given {@link Mono} is null, signal a {@link TimeoutException}. * *

- * + * * * @param timeout the timeout before the onNext signal from this {@link Mono} * @param fallback the fallback {@link Mono} to subscribe when a timeout occurs @@ -3344,7 +3344,7 @@ public final Mono timeout(Duration timeout, @Nullable Mono fallb * not been emitted before the given {@link Publisher} emits. * *

- * + * * * @param firstTimeout the timeout {@link Publisher} that must not emit before the first signal from this {@link Mono} * @param the element type of the timeout Publisher @@ -3361,7 +3361,7 @@ public final Mono timeout(Publisher firstTimeout) { * not been emitted before the given {@link Publisher} emits. * *

- * + * * * @param firstTimeout the timeout * {@link Publisher} that must not emit before the first signal from this {@link Mono} @@ -3382,7 +3382,7 @@ public final Mono timeout(Publisher firstTimeout, Mono fa * {@link Schedulers#parallel() parallel} Scheduler) and T2 the emitted data (as a {@code T}). * *

- * + * * * @return a timestamped {@link Mono} */ @@ -3396,7 +3396,7 @@ public final Mono> timestamp() { * provided {@link Scheduler}) and T2 the emitted data (as a {@code T}). * *

- * + * * * @param scheduler a {@link Scheduler} instance to read time from * @return a timestamped {@link Mono} @@ -3411,7 +3411,7 @@ public final Mono> timestamp(Scheduler scheduler) { * onError. * *

- * + * *

* * @return a {@link CompletableFuture} @@ -3427,7 +3427,7 @@ public final CompletableFuture toFuture() { * on it). * *

- * + * *

* * @return a {@link MonoProcessor} to use to either retrieve value or cancel the underlying {@link Subscription} @@ -3473,7 +3473,7 @@ public final Mono transform(Function, ? extends Publisher * provided {@code rightGenerator} function and combine both results into a {@link Tuple2}. * *

- * + * *

* @param rightGenerator the {@link Function} to generate a {@code Mono} to combine with * @param the element type of the other Mono instance @@ -3490,7 +3490,7 @@ public final Mono> zipWhen(Function> ri * {@code O} object, as defined by the provided {@code combinator} function. * *

- * + * *

* @param rightGenerator the {@link Function} to generate a {@code Mono} to combine with * @param combinator a {@link BiFunction} combinator function when both sources complete @@ -3510,7 +3510,7 @@ public final Mono zipWhen(Function> rightGenera * Combine the result from this mono and another into a {@link Tuple2}. * *

- * + * *

* @param other the {@link Mono} to combine with * @param the element type of the other Mono instance @@ -3526,7 +3526,7 @@ public final Mono> zipWith(Mono other) { * as defined by the provided {@code combinator} function. * *

- * + * *

* @param other the {@link Mono} to combine with * @param combinator a {@link BiFunction} combinator function when both sources diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java index a9d9a53845..5770722f6c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java @@ -39,7 +39,7 @@ * A {@code MonoProcessor} is a {@link Mono} extension that implements stateful semantics. Multi-subscribe is allowed. * *

- * + * *

* * Once a {@link MonoProcessor} has been resolved, newer subscribers will benefit from the cached result. diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 8c7c3c5918..b5dd0d3e76 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -524,7 +524,7 @@ public static Subscription scalarSubscription(CoreSubscriber subs * starve a calling thread if races are too important and {@link Subscriber} is slower. * *

- * + * * * @param the relayed type * @param subscriber the subscriber to serialize diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java index 3112eaaaef..52d081ce9b 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java @@ -612,7 +612,7 @@ public final ParallelFlux hide() { * implementation. Default will use {@link Level#INFO} and java.util.logging. If SLF4J * is available, it will be used instead. *

- * *

* The default log category will be "reactor.*", a generated operator suffix will @@ -629,7 +629,7 @@ public final ParallelFlux log() { * implementation. Default will use {@link Level#INFO} and java.util.logging. If SLF4J * is available, it will be used instead. *

- * *

* @@ -655,7 +655,7 @@ public final ParallelFlux log(@Nullable String category) { * ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, * SignalType.ON_ERROR) * - * * * @param category to be mapped into logger configuration (e.g. org.springframework @@ -685,7 +685,7 @@ public final ParallelFlux log(@Nullable String category, * ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, * SignalType.ON_ERROR) * - * * * @param category to be mapped into logger configuration (e.g. org.springframework diff --git a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java index 96ed9c3fef..e1593a2641 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java @@ -39,7 +39,7 @@ /** * Replays all or the last N items to Subscribers. *

- * *

* @@ -53,7 +53,7 @@ public final class ReplayProcessor extends FluxProcessor * replaying it to late subscribers. This is a buffer-based ReplayProcessor with * a history size of 1. *

- * * * @param the type of the pushed elements @@ -71,7 +71,7 @@ public static ReplayProcessor cacheLast() { * any value has been pushed, then the {@code defaultValue} is emitted instead. * This is a buffer-based ReplayProcessor with a history size of 1. *

- * * * @param value a default value to start the sequence with in case nothing has been diff --git a/reactor-core/src/main/java/reactor/core/publisher/TopicProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/TopicProcessor.java index 590d9eeff4..1b6c960d23 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/TopicProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/TopicProcessor.java @@ -37,12 +37,12 @@ ** An implementation of a RingBuffer backed message-passing Processor implementing publish-subscribe with async event * loops. *

- * + * *

* Created from {@link #share}, the {@link TopicProcessor} will authorize concurrent publishing (multi-producer) * from its receiving side {@link Subscriber#onNext(Object)}. * Additionally, any of the {@link TopicProcessor} will stop the event loop thread if an error occurs. - * + * *

* The processor * respects the Reactive Streams contract and must not be signalled concurrently on any diff --git a/reactor-core/src/main/java/reactor/core/publisher/WorkQueueProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/WorkQueueProcessor.java index e59c20003d..d5b949b207 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/WorkQueueProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/WorkQueueProcessor.java @@ -43,14 +43,14 @@ ** An implementation of a RingBuffer backed message-passing Processor implementing work-queue distribution with * async event loops. *

- * + * *

* Created from {@link #share()}, the {@link WorkQueueProcessor} will authorize concurrent publishing * (multi-producer) from its receiving side {@link Subscriber#onNext(Object)}. * {@link WorkQueueProcessor} is able to replay up to its buffer size number of failed signals (either * dropped or fatally throwing on child {@link Subscriber#onNext}). *

- * + * *

* The processor is very similar to {@link TopicProcessor} but * only partially respects the Reactive Streams contract.

The purpose of this diff --git a/src/docs/asciidoc/index.asciidoc b/src/docs/asciidoc/index.asciidoc index c080c4d4ae..167bbbb33e 100644 --- a/src/docs/asciidoc/index.asciidoc +++ b/src/docs/asciidoc/index.asciidoc @@ -1,6 +1,6 @@ = Reactor 3 Reference Guide Stephane Maldini ; Simon Baslé -:appversion: 3.1.2.RELEASE +:appversion: 3.1.3.RELEASE ifndef::host-github[:ext-relative: {outfilesuffix}] {appversion} :doctype: book