From 3e5afe35c68724ccebe771ceb7d9a387db6c0721 Mon Sep 17 00:00:00 2001 From: alex-butcher <21243172+abutch3r@users.noreply.github.com> Date: Thu, 22 Feb 2024 11:02:55 +0000 Subject: [PATCH 1/5] Make Fail Overflow Test less flaky Fail overflow test can fail to pass if exception is thrown after more then 256 messages have been processed #162 --- .../tck/channel/overflow/FailOverflowStrategyOverflowTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java index 5bf6590a..be5ded32 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java @@ -45,7 +45,7 @@ public void testOverflow() { await().until(() -> bean.exception() != null); assertThat(bean.output()).doesNotContain("999"); - assertThat(bean.output()).hasSizeBetween(0, 256); + assertThat(bean.output()).isNotEmpty().hasSizeLessThan(999); assertThat(bean.failure()).isNotNull().isInstanceOf(Exception.class); } } From 29dfac9262e3f2f98c95221998069336c7d8eaa2 Mon Sep 17 00:00:00 2001 From: alex-butcher <21243172+abutch3r@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:07:40 +0000 Subject: [PATCH 2/5] Add sleep to Drop Emitter test bean --- .../overflow/BeanWithFailOverflowStrategy.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java index da04eea2..46d9062a 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java @@ -95,10 +95,14 @@ public void emitALotOfItems() { @Outgoing("out") public PublisherBuilder consume(final PublisherBuilder values) { return values - .via(ReactiveStreams.builder() - .flatMapCompletionStage(s -> CompletableFuture.supplyAsync(() -> s, executor))) - .onError(err -> downstreamFailure = err); - + .via(ReactiveStreams.builder().flatMapCompletionStage(s -> CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + return s; + }, executor))).onError(err -> downstreamFailure = err); } @Incoming("out") From f0ebe5786ce37ddf6ebdcc5ec6de9bd7a81009b0 Mon Sep 17 00:00:00 2001 From: alex-butcher <21243172+abutch3r@users.noreply.github.com> Date: Thu, 29 Feb 2024 13:37:50 +0000 Subject: [PATCH 3/5] Reorder waits and assertions to provide better result assurance Wait for the failure on the stream to occur instead of the exception on the emitter - this ensures that at least the first message will be sucessfully processed and that a failure did occur before assertions are checked. In the case where the failure may occur sufficiently late in the test execution such that there is a failure, but not yet an exception. In this case emit one more message and wait for the exception before checking emitThree was unused and would close close the stream via `.complete()` - repurpose for being able to send one message and not close the stream if successful. --- .../overflow/BeanWithFailOverflowStrategy.java | 7 ++----- .../overflow/FailOverflowStrategyOverflowTest.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java index 46d9062a..592e60f0 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java @@ -68,12 +68,9 @@ public Exception exception() { return callerException; } - public void emitThree() { + public void emitOne() { try { - emitter.send("1"); - emitter.send("2"); - emitter.send("3"); - emitter.complete(); + emitter.send("1000"); } catch (Exception e) { callerException = e; } diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java index be5ded32..c848f369 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java @@ -43,9 +43,15 @@ public static Archive deployment() { public void testOverflow() { bean.emitALotOfItems(); - await().until(() -> bean.exception() != null); - assertThat(bean.output()).doesNotContain("999"); + await().until(() -> bean.failure() != null); + assertThat(bean.failure()).isInstanceOf(Exception.class); assertThat(bean.output()).isNotEmpty().hasSizeLessThan(999); - assertThat(bean.failure()).isNotNull().isInstanceOf(Exception.class); + //If an exception has not yet been thrown after the failure occurred, try one more message + if (bean.exception() == null) { + bean.emitOne(); + await().until(() -> bean.exception() != null); + } + assertThat(bean.exception()).isInstanceOf(IllegalStateException.class); + } } From cc1b8f11358eab6ba8681933df4bf6a08a673563 Mon Sep 17 00:00:00 2001 From: alex-butcher <21243172+abutch3r@users.noreply.github.com> Date: Fri, 1 Mar 2024 11:07:03 +0000 Subject: [PATCH 4/5] As emitOne is synchronous, no need to wait for exception. --- .../tck/channel/overflow/FailOverflowStrategyOverflowTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java index c848f369..1af3b968 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java @@ -49,7 +49,6 @@ public void testOverflow() { //If an exception has not yet been thrown after the failure occurred, try one more message if (bean.exception() == null) { bean.emitOne(); - await().until(() -> bean.exception() != null); } assertThat(bean.exception()).isInstanceOf(IllegalStateException.class); From 3a5eeb56758f0f8a1943f31c54db6ecf3463580c Mon Sep 17 00:00:00 2001 From: alex-butcher <21243172+abutch3r@users.noreply.github.com> Date: Thu, 21 Mar 2024 11:29:49 +0000 Subject: [PATCH 5/5] Remove isNotEmpty assertion and unneeded force of one messsage --- .../channel/overflow/FailOverflowStrategyOverflowTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java index 1af3b968..c4519d4f 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java @@ -45,11 +45,8 @@ public void testOverflow() { await().until(() -> bean.failure() != null); assertThat(bean.failure()).isInstanceOf(Exception.class); - assertThat(bean.output()).isNotEmpty().hasSizeLessThan(999); - //If an exception has not yet been thrown after the failure occurred, try one more message - if (bean.exception() == null) { - bean.emitOne(); - } + assertThat(bean.output()).doesNotContain("999"); + assertThat(bean.output()).hasSizeLessThan(999); assertThat(bean.exception()).isInstanceOf(IllegalStateException.class); }