From 79641635dd7dcdbad4300c18ad93b5dadbd25a16 Mon Sep 17 00:00:00 2001 From: faustin0 Date: Thu, 21 Oct 2021 09:22:33 +0200 Subject: [PATCH] move `.repeat` before `.parJoin` to avoid waiting for all stream completion before starting a new one --- core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala | 9 ++------- .../main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala | 7 ++----- core/src/main/scala/jms4s/JmsTransactedConsumer.scala | 7 ++----- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala b/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala index f91a6a59..e13838f3 100644 --- a/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala +++ b/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala @@ -65,14 +65,9 @@ object JmsAcknowledgerConsumer { ): JmsAcknowledgerConsumer[F] = (f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]) => { Stream - .emits(0 until concurrencyLevel) - .as( - Stream.eval( - pool.receive(f) - ) - ) - .parJoin(concurrencyLevel) + .emit(Stream.eval(pool.receive(f))) .repeat + .parJoin(concurrencyLevel) .compile .drain } diff --git a/core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala b/core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala index d5d754e1..05e70e63 100644 --- a/core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala +++ b/core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala @@ -66,12 +66,9 @@ object JmsAutoAcknowledgerConsumer { ): JmsAutoAcknowledgerConsumer[F] = (f: (JmsMessage, MessageFactory[F]) => F[AutoAckAction[F]]) => Stream - .emits(0 until concurrencyLevel) - .as( - Stream.eval(pool.receive(f)) - ) - .parJoin(concurrencyLevel) + .emit(Stream.eval(pool.receive(f))) .repeat + .parJoin(concurrencyLevel) .compile .drain diff --git a/core/src/main/scala/jms4s/JmsTransactedConsumer.scala b/core/src/main/scala/jms4s/JmsTransactedConsumer.scala index 022f1bba..41183b53 100644 --- a/core/src/main/scala/jms4s/JmsTransactedConsumer.scala +++ b/core/src/main/scala/jms4s/JmsTransactedConsumer.scala @@ -66,12 +66,9 @@ object JmsTransactedConsumer { ): JmsTransactedConsumer[F] = (f: (JmsMessage, MessageFactory[F]) => F[TransactionAction[F]]) => Stream - .emits(0 until concurrencyLevel) - .as( - Stream.eval(pool.receive(f)) - ) - .parJoin(concurrencyLevel) + .emit(Stream.eval(pool.receive(f))) .repeat + .parJoin(concurrencyLevel) .compile .drain