Skip to content

Commit

Permalink
ensures exception is caught if inner fused for FlatMap #3353
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka authored Feb 14, 2023
1 parent 6b0c843 commit 2ce1640
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -416,6 +416,7 @@ else if (!delayError || !Exceptions.addThrowable(ERROR, this, e_)) {
onError(Operators.onOperatorError(s, e_, t, ctx));
}
Operators.onDiscard(t, ctx);
tryEmitScalar(null);
return;
}
tryEmitScalar(v);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,6 @@
import reactor.core.TestLoggerExtension;
import reactor.core.publisher.FluxPeekFuseableTest.AssertQueueSubscription;
import reactor.core.scheduler.Schedulers;
import reactor.test.util.LoggerUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -1869,4 +1868,41 @@ public void noWrappingOfCheckedExceptions_hide() {
.expectError(NoSuchMethodException.class)
.verify();
}

static class FluxFlatMapDelayError3336Test {

@Test
void workingFlatMapDelayError() {
Flux.just(0, 1, 2, 3)
.flatMapDelayError(integer -> {
throw new RuntimeException(); // Cancels upstream subscription after consuming one event
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Completes as expected
}

@Test
void hangingFlatMapDelayError() {
Flux.just(0, 1, 2, 3)
.flatMapDelayError(integer -> {
return Flux.error(new RuntimeException()); // Does not cancel upstream subscription
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Triggers timeout
}

@Test
void deoptimizedFlatMapDelayError() {
Flux.just(0, 1, 2, 3)
.flatMapDelayError(integer -> {
return Flux.error(new RuntimeException())
.hide(); // Does not cancel upstream subscription
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Completes after consuming all events
}
}
}

0 comments on commit 2ce1640

Please sign in to comment.