diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index b126b45be12..387e0ee5a72 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -35,16 +35,11 @@ public abstract class SharedTopicListener implements TopicListener { @Override public Flux listen(TopicMessageFilter filter) { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - Flux overflowProcessor = sink.asFlux(); - // moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40% - Flux topicMessageFlux = getSharedListener(filter) + return getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) - .onBackpressureBuffer( - listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow())) - .doFinally(s -> sink.tryEmitComplete()); - - return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor) + .onBackpressureBuffer(listenerProperties.getMaxBufferSize(), BufferOverflowStrategy.ERROR) + .doFinally(s -> sink.tryEmitComplete()) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); } diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java index 2f6823b94bb..499326f66fe 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java @@ -80,7 +80,7 @@ void slowSubscriberOverflowException() { .expectNext(1L, 2L) .thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer .thenRequest(Long.MAX_VALUE) - .thenConsumeWhile(n -> n < numMessages) + .expectNextCount(maxBufferSize) .expectErrorMatches(Exceptions::isOverflow) .verify(Duration.ofMillis(1000L));