Skip to content

Commit

Permalink
use Chunk.toNel with NonEmptyTraverse
Browse files Browse the repository at this point in the history
  • Loading branch information
zcox committed Dec 28, 2023
1 parent 590ffc2 commit 3253892
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.banno.kafka.producer

import cats.*
import cats.data.NonEmptyList
import cats.syntax.all.*
import fs2.*
import org.apache.kafka.common.*
Expand Down Expand Up @@ -115,6 +116,14 @@ case class ProducerOps[F[_], K, V](producer: ProducerApi[F, K, V]) {
s =>
pipeSendBatch[Chunk](Traverse[Chunk], F)(s.chunks).flatMap(Stream.chunk)

def pipeSendBatchChunksNonEmpty(implicit
F: FlatMap[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatchNonEmpty[NonEmptyList](NonEmptyTraverse[NonEmptyList], F)(
s.chunks.map(_.toNel).unNone
).flatMap(nel => Stream.emits(nel.toList))

/** Calls chunkN on the input stream, to create chunks of size `n`, and sends
* those chunks as batches to the producer.
*/
Expand Down

0 comments on commit 3253892

Please sign in to comment.