Skip to content

Commit

Permalink
fix cancel on terminal event with MultiSubscriptionSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Jun 10, 2016
1 parent bd8c148 commit 36aa312
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRepeat.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public void onNext(T t) {
subscriber.onNext(t);
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onComplete() {
long r = remaining;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRepeatPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public RepeatPredicateSubscriber(Publisher<? extends T> source,
this.predicate = predicate;
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onNext(T t) {
produced++;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRepeatWhen.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public RepeatWhenMainSubscriber(Subscriber<? super T> actual, Subscriber<Long> s
this.otherArbiter = new DeferredSubscription();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void cancel() {
if (cancelled) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxResume.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public ResumeSubscriber(Subscriber<? super T> actual,
this.nextFactory = nextFactory;
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onSubscribe(Subscription s) {
if (!second) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public void onError(Throwable t) {
resubscribe();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

void resubscribe() {
if (WIP.getAndIncrement(this) == 0) {
do {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRetryPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public void onNext(T t) {
subscriber.onNext(t);
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onError(Throwable t) {
boolean b;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxRetryWhen.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ void whenComplete() {

subscriber.onComplete();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}
}

static final class RetryWhenOtherSubscriber
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public void onComplete() {
}
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public Object connectedInput() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public final void set(Subscription s) {

if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
Subscription a = actual;
if (a != null) {

if (a != null && shouldCancelCurrent()) {
a.cancel();
}

Expand Down Expand Up @@ -311,7 +311,7 @@ final void drainLoop() {
}

if (ms != null) {
if (a != null) {
if (a != null && shouldCancelCurrent()) {
a.cancel();
}
actual = ms;
Expand Down Expand Up @@ -358,4 +358,8 @@ public boolean isStarted() {
public final boolean isUnbounded() {
return unbounded;
}

protected boolean shouldCancelCurrent() {
return true;
}
}

0 comments on commit 36aa312

Please sign in to comment.