Skip to content

Commit

Permalink
SingleConcatWithPublisher: limit recursion depth to 1 (apple#1654)
Browse files Browse the repository at this point in the history
Motivation:

To mitigate apple#1652 issue, temporary limit the mutual recursion between
`onNext` and `request` to a depth of 1 for `ConcatDeferNextSubscriber`.

Modifications:

- Introduce additional `SINGLE_DELIVERING` state to understand if the
`request` was invoked while we deliver result of the `Single`;
- Remove `boundedDepthOfOnNextAndRequestRecursion()` override from
`SingleConcatWithPublisherDeferSubscribeTckTest`;

Result:

1. `SingleConcatWithPublisher` has a maximum recursion depth of 1.
2. Less risk of encountering issue apple#1652.
  • Loading branch information
idelpivnitskiy authored and bondolo committed Jul 2, 2021
1 parent ea2f56c commit ecd9ff3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,18 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
*/
private static final Object REQUESTED_ONE = new Object();
/**
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)}.
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)} or while its
* result is delivering to the target.
*/
private static final Object REQUESTED_MORE = new Object();
/**
* If only one item was {@link #request(long) requested} and {@link #onSuccess(Object)} invoked.
*/
private static final Object SINGLE_DELIVERING = new Object();
/**
* If only one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and its result was
* delivered to the target.
*/
private static final Object SINGLE_DELIVERED = new Object();
/**
* If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we
Expand All @@ -223,6 +229,7 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
public void onSuccess(@Nullable final T result) {
for (;;) {
final Object oldValue = mayBeResult;
assert oldValue != SINGLE_DELIVERING;
assert oldValue != SINGLE_DELIVERED;
assert oldValue != PUBLISHER_SUBSCRIBED;

Expand All @@ -233,8 +240,8 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else if (oldValue == REQUESTED_ONE) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERED)) {
tryEmitSingleSuccessToTarget(result);
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) {
emitSingleSuccessToTarget(result);
break;
}
} else if (oldValue == REQUESTED_MORE &&
Expand Down Expand Up @@ -276,7 +283,7 @@ public void request(long n) {
break;
}
}
} else if (oldVal == REQUESTED_ONE) {
} else if (oldVal == REQUESTED_ONE || oldVal == SINGLE_DELIVERING) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) {
super.request(n);
break;
Expand All @@ -297,13 +304,26 @@ public void request(long n) {
}
break;
}
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERED)) {
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERING)) {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
tryEmitSingleSuccessToTarget(tVal);
emitSingleSuccessToTarget(tVal);
break;
}
}
}

private void emitSingleSuccessToTarget(@Nullable final T result) {
if (tryEmitSingleSuccessToTarget(result)) {
if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERING, SINGLE_DELIVERED)) {
// state didn't change, we are done
} else if (mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) {
// more demand appeared while we were delivering the single result
next.subscribeInternal(this);
} else {
assert mayBeResult == CANCELLED;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ public class SingleConcatWithPublisherDeferSubscribeTckTest extends SingleConcat
boolean deferSubscribe() {
return true;
}

@Override
public long boundedDepthOfOnNextAndRequestRecursion() {
return 2;
}
}

0 comments on commit ecd9ff3

Please sign in to comment.