From 722cb6c370b80ea1836e77521b08bd239341bf54 Mon Sep 17 00:00:00 2001 From: "Saai Syvendra (Github key)" Date: Thu, 31 Oct 2024 09:33:47 +0530 Subject: [PATCH] Replace use of deprecated DirectProcessor Signed-off-by: Saai Syvendra (Github key) --- .../mirror/grpc/listener/SharedTopicListener.java | 11 +++-------- .../listener/AbstractSharedTopicListenerTest.java | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index b126b45be12..387e0ee5a72 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -35,16 +35,11 @@ public abstract class SharedTopicListener implements TopicListener { @Override public Flux listen(TopicMessageFilter filter) { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - Flux overflowProcessor = sink.asFlux(); - // moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40% - Flux topicMessageFlux = getSharedListener(filter) + return getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) - .onBackpressureBuffer( - listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow())) - .doFinally(s -> sink.tryEmitComplete()); - - return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor) + .onBackpressureBuffer(listenerProperties.getMaxBufferSize(), BufferOverflowStrategy.ERROR) + .doFinally(s -> sink.tryEmitComplete()) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); } diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java index 2f6823b94bb..499326f66fe 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java @@ -80,7 +80,7 @@ void slowSubscriberOverflowException() { .expectNext(1L, 2L) .thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer .thenRequest(Long.MAX_VALUE) - .thenConsumeWhile(n -> n < numMessages) + .expectNextCount(maxBufferSize) .expectErrorMatches(Exceptions::isOverflow) .verify(Duration.ofMillis(1000L));