Skip to content

Commit

Permalink
Merge pull request #257 from fp-in-bo/more-stream-concurrency
Browse files Browse the repository at this point in the history
Improve stream parjoin
  • Loading branch information
faustin0 authored Dec 21, 2021
2 parents a4d1ad9 + 7964163 commit fd324b2
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 17 deletions.
9 changes: 2 additions & 7 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,9 @@ object JmsAcknowledgerConsumer {
): JmsAcknowledgerConsumer[F] =
(f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]) => {
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(
pool.receive(f)
)
)
.parJoin(concurrencyLevel)
.emit(Stream.eval(pool.receive(f)))
.repeat
.parJoin(concurrencyLevel)
.compile
.drain
}
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,9 @@ object JmsAutoAcknowledgerConsumer {
): JmsAutoAcknowledgerConsumer[F] =
(f: (JmsMessage, MessageFactory[F]) => F[AutoAckAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(pool.receive(f))
)
.parJoin(concurrencyLevel)
.emit(Stream.eval(pool.receive(f)))
.repeat
.parJoin(concurrencyLevel)
.compile
.drain

Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,9 @@ object JmsTransactedConsumer {
): JmsTransactedConsumer[F] =
(f: (JmsMessage, MessageFactory[F]) => F[TransactionAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(pool.receive(f))
)
.parJoin(concurrencyLevel)
.emit(Stream.eval(pool.receive(f)))
.repeat
.parJoin(concurrencyLevel)
.compile
.drain

Expand Down

0 comments on commit fd324b2

Please sign in to comment.