From e17d2644c7334850ab9774487ecdb4b5abcf2b8a Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 1 Jul 2021 22:38:57 -0500 Subject: [PATCH] `FromNPublisher`: limit recursion depth to 1 (#1653) Motivation: To mitigate #1652 issue, temporary limit the mutual recursion between `onNext` and `request` to a depth of 1. Modifications: - Use the left 4 bits of the `state` to limit recursion depth to 1; - Remove `boundedDepthOfOnNextAndRequestRecursion()` override from `PublisherFrom2TckTest` and `PublisherFrom3TckTest`; Result: 1. `FromNPublisher` has a maximum recursion depth of 1. 2. Less risk of encountering issue #1652. --- .../concurrent/api/FromNPublisher.java | 52 ++++++++++++++----- .../tck/PublisherFrom2TckTest.java | 5 -- .../tck/PublisherFrom3TckTest.java | 5 -- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromNPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromNPublisher.java index 7faeff0a95..e9fd3b0efc 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromNPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromNPublisher.java @@ -57,6 +57,9 @@ void doSubscribe(final Subscriber subscriber) { } private final class NValueSubscription implements Subscription { + private static final byte ZERO = 0; + private static final byte ONE = 1; + private static final byte TWO = 2; private static final byte TERMINATED = 3; private byte requested; private byte state; @@ -65,9 +68,9 @@ private final class NValueSubscription implements Subscription { private NValueSubscription(final Subscriber subscriber) { this.subscriber = subscriber; if (v1 == UNUSED_REF) { - // 3-value version - simulate 1 emitted item, start counting from 1. + // 2-value version - simulate 1 emitted item, start counting from 1. requested = 1; - state++; + state = ONE; } } @@ -78,7 +81,7 @@ public void cancel() { @Override public void request(final long n) { - if (state == TERMINATED) { + if (state() == TERMINATED) { return; } if (!isRequestNValid(n)) { @@ -90,20 +93,27 @@ public void request(final long n) { return; } requested = (byte) min(3, addWithOverflowProtection(requested, n)); - boolean successful = true; - while (successful && state < requested) { - if (state == 0) { - successful = deliver(v1); - } else if (state == 1) { - successful = deliver(v2); - } else if (state == 2 && deliver(v3)) { - subscriber.onComplete(); + if (ignoreRequests()) { + return; + } + ignoreRequests(true); + while (state() < requested) { + if (state() == ZERO) { + deliver(v1, ONE); + } else if (state() == ONE) { + deliver(v2, TWO); + } else if (state() == TWO) { + if (deliver(v3, TERMINATED)) { + subscriber.onComplete(); + } + return; } } + ignoreRequests(false); } - private boolean deliver(@Nullable T value) { - ++state; + private boolean deliver(@Nullable final T value, final byte nextState) { + state = (byte) ((state & 0x10) | nextState); try { subscriber.onNext(value); return true; @@ -113,5 +123,21 @@ private boolean deliver(@Nullable T value) { return false; } } + + private byte state() { + return (byte) (state & 0x0F); + } + + private boolean ignoreRequests() { + return (state & 0x10) > 0; + } + + private void ignoreRequests(final boolean ignore) { + if (ignore) { + state |= 0x10; + } else { + state &= 0x0F; + } + } } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom2TckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom2TckTest.java index 443bd0a7fd..33fca3dc94 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom2TckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom2TckTest.java @@ -32,9 +32,4 @@ public Publisher createServiceTalkPublisher(long elements) { public long maxElementsFromPublisher() { return 2; } - - @Override - public long boundedDepthOfOnNextAndRequestRecursion() { - return 2; - } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom3TckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom3TckTest.java index 5e6b33705c..bc27bf2136 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom3TckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFrom3TckTest.java @@ -32,9 +32,4 @@ public Publisher createServiceTalkPublisher(long elements) { public long maxElementsFromPublisher() { return 3; } - - @Override - public long boundedDepthOfOnNextAndRequestRecursion() { - return 3; - } }