Skip to content

Commit

Permalink
tweaks MultiSubscription cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Jun 10, 2016
1 parent 36aa312 commit de0e8d5
Show file tree
Hide file tree
Showing 11 changed files with 6 additions and 51 deletions.
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxConcatMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,11 +760,6 @@ public ConcatMapInner(StreamConcatMapSupport<R> parent) {
this.parent = parent;
}

@Override
public void onSubscribe(Subscription s) {
set(s);
}

@Override
public void onNext(R t) {
produced++;
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxRepeat.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public void onNext(T t) {
subscriber.onNext(t);
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onComplete() {
long r = remaining;
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxRepeatPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ public RepeatPredicateSubscriber(Publisher<? extends T> source,
this.predicate = predicate;
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onNext(T t) {
produced++;
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/reactor/core/publisher/FluxRepeatWhen.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ public RepeatWhenMainSubscriber(Subscriber<? super T> actual, Subscriber<Long> s
this.otherArbiter = new DeferredSubscription();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void cancel() {
if (cancelled) {
Expand All @@ -138,11 +133,6 @@ public void cancel() {
super.cancel();
}

@Override
public void onSubscribe(Subscription s) {
set(s);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxResume.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ public ResumeSubscriber(Subscriber<? super T> actual,
this.nextFactory = nextFactory;
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onSubscribe(Subscription s) {
if (!second) {
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public void onError(Throwable t) {
resubscribe();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

void resubscribe() {
if (WIP.getAndIncrement(this) == 0) {
do {
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxRetryPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ public void onNext(T t) {
subscriber.onNext(t);
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public void onError(Throwable t) {
boolean b;
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxRetryWhen.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ void whenComplete() {

subscriber.onComplete();
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}
}

static final class RetryWhenOtherSubscriber
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ public void onComplete() {
}
}

@Override
protected boolean shouldCancelCurrent() {
return false;
}

@Override
public Object connectedInput() {
return null;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/reactor/core/publisher/FluxTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public void onSubscribe(Subscription s) {
}
}

@Override
protected boolean shouldCancelCurrent() {
return true;
}

@Override
public void onNext(T t) {
timeout.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,6 @@ public final boolean isUnbounded() {
}

protected boolean shouldCancelCurrent() {
return true;
return false;
}
}

0 comments on commit de0e8d5

Please sign in to comment.