diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java index da04eea2..592e60f0 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/BeanWithFailOverflowStrategy.java @@ -68,12 +68,9 @@ public Exception exception() { return callerException; } - public void emitThree() { + public void emitOne() { try { - emitter.send("1"); - emitter.send("2"); - emitter.send("3"); - emitter.complete(); + emitter.send("1000"); } catch (Exception e) { callerException = e; } @@ -95,10 +92,14 @@ public void emitALotOfItems() { @Outgoing("out") public PublisherBuilder consume(final PublisherBuilder values) { return values - .via(ReactiveStreams.builder() - .flatMapCompletionStage(s -> CompletableFuture.supplyAsync(() -> s, executor))) - .onError(err -> downstreamFailure = err); - + .via(ReactiveStreams.builder().flatMapCompletionStage(s -> CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + return s; + }, executor))).onError(err -> downstreamFailure = err); } @Incoming("out") diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java index 5bf6590a..c4519d4f 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/channel/overflow/FailOverflowStrategyOverflowTest.java @@ -43,9 +43,11 @@ public static Archive deployment() { public void testOverflow() { bean.emitALotOfItems(); - await().until(() -> bean.exception() != null); + await().until(() -> bean.failure() != null); + assertThat(bean.failure()).isInstanceOf(Exception.class); assertThat(bean.output()).doesNotContain("999"); - assertThat(bean.output()).hasSizeBetween(0, 256); - assertThat(bean.failure()).isNotNull().isInstanceOf(Exception.class); + assertThat(bean.output()).hasSizeLessThan(999); + assertThat(bean.exception()).isInstanceOf(IllegalStateException.class); + } }