From de0e8d501b66e3d3ec7f6f8edac6a9c32cff991c Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 10 Jun 2016 15:47:12 +0100 Subject: [PATCH] tweaks MultiSubscription cancel --- .../java/reactor/core/publisher/FluxConcatMap.java | 5 ----- src/main/java/reactor/core/publisher/FluxRepeat.java | 5 ----- .../reactor/core/publisher/FluxRepeatPredicate.java | 5 ----- .../java/reactor/core/publisher/FluxRepeatWhen.java | 10 ---------- src/main/java/reactor/core/publisher/FluxResume.java | 5 ----- src/main/java/reactor/core/publisher/FluxRetry.java | 5 ----- .../reactor/core/publisher/FluxRetryPredicate.java | 5 ----- .../java/reactor/core/publisher/FluxRetryWhen.java | 5 ----- .../java/reactor/core/publisher/FluxSwitchIfEmpty.java | 5 ----- src/main/java/reactor/core/publisher/FluxTimeout.java | 5 +++++ .../core/subscriber/MultiSubscriptionSubscriber.java | 2 +- 11 files changed, 6 insertions(+), 51 deletions(-) diff --git a/src/main/java/reactor/core/publisher/FluxConcatMap.java b/src/main/java/reactor/core/publisher/FluxConcatMap.java index 394d48a499..bd99d1a634 100644 --- a/src/main/java/reactor/core/publisher/FluxConcatMap.java +++ b/src/main/java/reactor/core/publisher/FluxConcatMap.java @@ -760,11 +760,6 @@ public ConcatMapInner(StreamConcatMapSupport parent) { this.parent = parent; } - @Override - public void onSubscribe(Subscription s) { - set(s); - } - @Override public void onNext(R t) { produced++; diff --git a/src/main/java/reactor/core/publisher/FluxRepeat.java b/src/main/java/reactor/core/publisher/FluxRepeat.java index bafbdbebc4..149c194599 100644 --- a/src/main/java/reactor/core/publisher/FluxRepeat.java +++ b/src/main/java/reactor/core/publisher/FluxRepeat.java @@ -95,11 +95,6 @@ public void onNext(T t) { subscriber.onNext(t); } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public void onComplete() { long r = remaining; diff --git a/src/main/java/reactor/core/publisher/FluxRepeatPredicate.java b/src/main/java/reactor/core/publisher/FluxRepeatPredicate.java index 4d360997c9..dde2c523bb 100644 --- a/src/main/java/reactor/core/publisher/FluxRepeatPredicate.java +++ b/src/main/java/reactor/core/publisher/FluxRepeatPredicate.java @@ -82,11 +82,6 @@ public RepeatPredicateSubscriber(Publisher source, this.predicate = predicate; } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public void onNext(T t) { produced++; diff --git a/src/main/java/reactor/core/publisher/FluxRepeatWhen.java b/src/main/java/reactor/core/publisher/FluxRepeatWhen.java index 78284e4169..d1f9b12e53 100644 --- a/src/main/java/reactor/core/publisher/FluxRepeatWhen.java +++ b/src/main/java/reactor/core/publisher/FluxRepeatWhen.java @@ -121,11 +121,6 @@ public RepeatWhenMainSubscriber(Subscriber actual, Subscriber s this.otherArbiter = new DeferredSubscription(); } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public void cancel() { if (cancelled) { @@ -138,11 +133,6 @@ public void cancel() { super.cancel(); } - @Override - public void onSubscribe(Subscription s) { - set(s); - } - @Override public void onNext(T t) { subscriber.onNext(t); diff --git a/src/main/java/reactor/core/publisher/FluxResume.java b/src/main/java/reactor/core/publisher/FluxResume.java index 842f44dc57..6020306118 100644 --- a/src/main/java/reactor/core/publisher/FluxResume.java +++ b/src/main/java/reactor/core/publisher/FluxResume.java @@ -69,11 +69,6 @@ public ResumeSubscriber(Subscriber actual, this.nextFactory = nextFactory; } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public void onSubscribe(Subscription s) { if (!second) { diff --git a/src/main/java/reactor/core/publisher/FluxRetry.java b/src/main/java/reactor/core/publisher/FluxRetry.java index 6bf5148473..aec8083d5a 100644 --- a/src/main/java/reactor/core/publisher/FluxRetry.java +++ b/src/main/java/reactor/core/publisher/FluxRetry.java @@ -103,11 +103,6 @@ public void onError(Throwable t) { resubscribe(); } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - void resubscribe() { if (WIP.getAndIncrement(this) == 0) { do { diff --git a/src/main/java/reactor/core/publisher/FluxRetryPredicate.java b/src/main/java/reactor/core/publisher/FluxRetryPredicate.java index 11b617cd1d..e59417f250 100644 --- a/src/main/java/reactor/core/publisher/FluxRetryPredicate.java +++ b/src/main/java/reactor/core/publisher/FluxRetryPredicate.java @@ -89,11 +89,6 @@ public void onNext(T t) { subscriber.onNext(t); } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public void onError(Throwable t) { boolean b; diff --git a/src/main/java/reactor/core/publisher/FluxRetryWhen.java b/src/main/java/reactor/core/publisher/FluxRetryWhen.java index 724307abbb..d8a24a79e8 100644 --- a/src/main/java/reactor/core/publisher/FluxRetryWhen.java +++ b/src/main/java/reactor/core/publisher/FluxRetryWhen.java @@ -202,11 +202,6 @@ void whenComplete() { subscriber.onComplete(); } - - @Override - protected boolean shouldCancelCurrent() { - return false; - } } static final class RetryWhenOtherSubscriber diff --git a/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java b/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java index 48c02621e3..3d2c0af957 100644 --- a/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java +++ b/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java @@ -85,11 +85,6 @@ public void onComplete() { } } - @Override - protected boolean shouldCancelCurrent() { - return false; - } - @Override public Object connectedInput() { return null; diff --git a/src/main/java/reactor/core/publisher/FluxTimeout.java b/src/main/java/reactor/core/publisher/FluxTimeout.java index 72f99a7d4b..2229449a0b 100644 --- a/src/main/java/reactor/core/publisher/FluxTimeout.java +++ b/src/main/java/reactor/core/publisher/FluxTimeout.java @@ -124,6 +124,11 @@ public void onSubscribe(Subscription s) { } } + @Override + protected boolean shouldCancelCurrent() { + return true; + } + @Override public void onNext(T t) { timeout.cancel(); diff --git a/src/main/java/reactor/core/subscriber/MultiSubscriptionSubscriber.java b/src/main/java/reactor/core/subscriber/MultiSubscriptionSubscriber.java index 387a130c8e..d6ee344c4b 100644 --- a/src/main/java/reactor/core/subscriber/MultiSubscriptionSubscriber.java +++ b/src/main/java/reactor/core/subscriber/MultiSubscriptionSubscriber.java @@ -360,6 +360,6 @@ public final boolean isUnbounded() { } protected boolean shouldCancelCurrent() { - return true; + return false; } }