Skip to content

Commit

Permalink
ensures addCap always returns value with flag
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
Oleh Dokuka committed Oct 24, 2023
1 parent 65bdb3a commit 0b988cb
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ static <T> long addCap(BaseSink<T> instance, long toAdd) {
s = instance.requested;
r = s & Long.MAX_VALUE;
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
return s;
}
u = Operators.addCap(r, toAdd);
if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -57,6 +58,26 @@

class FluxCreateTest {

@Test
//https://github.com/reactor/reactor-core/issues/3569
void ensuresRequestMaxPlusOneDoesNotFailOnNoRequestConsumer() {
Flux.create(sink -> {
sink.next("1");
sink.next("2");
sink.next("3");
sink.complete();
})
.as(StepVerifier::create)
.expectNext("1")
.thenRequest(1)
.expectNext("2")
.thenRequest(1)
.expectNext("3")
.thenRequest(1)
.expectComplete()
.verify(Duration.ofMillis(1000));
}

@Test
//https://github.com/reactor/reactor-core/issues/1949
void ensuresConcurrentRequestAndSettingOnRequestAlwaysDeliversDemand() throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 0b988cb

Please sign in to comment.