diff --git a/build.sbt b/build.sbt index a0e45409f..20d24cc86 100644 --- a/build.sbt +++ b/build.sbt @@ -85,6 +85,7 @@ lazy val root = project zioKafka, zioKafkaTestkit, zioKafkaTest, + zioKafkaTracing, zioKafkaBench, zioKafkaExample, docs @@ -161,6 +162,22 @@ lazy val zioKafkaTest = ) ++ `embedded-kafka`.value ) +lazy val zioKafkaTracing = + project + .in(file("zio-kafka-tracing")) + .dependsOn(zioKafka, zioKafkaTestkit) + .enablePlugins(BuildInfoPlugin) + .settings(stdSettings("zio-kafka-tracing")) + .settings(buildInfoSettings("zio.kafka")) + .settings(enableZIO(enableStreaming = true)) + .settings(publish / skip := true) + .settings( + libraryDependencies ++= Seq( + "dev.zio" %% "zio-opentracing" % "3.0.0", + "io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test + ) ++ `embedded-kafka`.value + ) + lazy val zioKafkaBench = project .in(file("zio-kafka-bench")) diff --git a/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala b/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala new file mode 100644 index 000000000..676838e36 --- /dev/null +++ b/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala @@ -0,0 +1,75 @@ +package zio.kafka.tracing + +import io.opentracing.propagation.{ Format, TextMapAdapter } +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } +import zio.kafka.producer._ +import zio.telemetry.opentracing.OpenTracing +import zio.{ Chunk, RIO, Task, UIO, ZIO } + +import java.nio.charset.StandardCharsets +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +object TracingProducerAspect { + + /** + * Adds open tracing headers to each outgoing record of a ZIO Kafka [[Producer]]. + * + * WARNING: this aspect mutates the headers in the record by adding the tracing headers directly. Be careful NOT to + * reuse the records after passing the records to the producer. + */ + def traced: ProducerAspect[Nothing, OpenTracing] = new ProducerAspect[Nothing, OpenTracing] { + override def apply[R >: Nothing <: OpenTracing](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] = + new ProducerWithEnv[R] with DefaultProducer[R] { + // noinspection YieldingZIOEffectInspection + override def produceChunkAsyncWithFailures( + records: Chunk[ByteRecord] + ): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] = + for { + recordsWithHeaders <- ZIO.foreach(records)(withTracingHeaders) + result <- wrapped.produceChunkAsyncWithFailures(recordsWithHeaders) + } yield result + + // noinspection YieldingZIOEffectInspection + override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] = + for { + recordWithHeaders <- withTracingHeaders(record) + result <- wrapped.produceAsync(recordWithHeaders) + } yield result + + override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] = + wrapped.partitionsFor(topic) + + override def flush: RIO[R, Unit] = + wrapped.flush + + override def metrics: RIO[R, Map[MetricName, Metric]] = + wrapped.metrics + + private def withTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, ByteRecord] = + kafkaTracingHeaders(record).map { headers => + headers.foreach(header => record.headers().add(header)) + record + } + + private def kafkaTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, Seq[Header]] = + ZIO.serviceWithZIO[OpenTracing] { tracing => + import tracing.aspects._ + val headers = mutable.Map.empty[String, String] + val buffer = new TextMapAdapter(headers.asJava) + tracing + .inject(Format.Builtin.HTTP_HEADERS, buffer) + .zipLeft(ZIO.unit @@ spanFrom(Format.Builtin.HTTP_HEADERS, buffer, s"produce to topic ${record.topic()}")) + .as(headers.toSeq.map(PairHeader)) + } + } + } + + private case class PairHeader(keyValue: (String, String)) extends Header { + override def key(): String = keyValue._1 + + override def value(): Array[Byte] = keyValue._2.getBytes(StandardCharsets.UTF_8) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index bc77c6469..377109d32 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -12,45 +12,45 @@ import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ import scala.util.control.{ NoStackTrace, NonFatal } -trait Producer { +trait Producer extends ProducerWithEnv[Any] + +trait ProducerWithEnv[-R] { self => /** * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version * that allows to avoid round-trip-time penalty for each record. */ - def produce( - record: ProducerRecord[Array[Byte], Array[Byte]] - ): Task[RecordMetadata] + def produce(record: ByteRecord): RIO[R, RecordMetadata] /** * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version * that allows to avoid round-trip-time penalty for each record. */ - def produce[R, K, V]( + def produce[R1, K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, RecordMetadata] /** * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version * that allows to avoid round-trip-time penalty for each record. */ - def produce[R, K, V]( + def produce[R1, K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, RecordMetadata] /** * A stream pipeline that produces all records from the stream. */ - final def produceAll[R, K, V]( - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): ZPipeline[R, Throwable, ProducerRecord[K, V], RecordMetadata] = + final def produceAll[R1, K, V]( + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): ZPipeline[R & R1, Throwable, ProducerRecord[K, V], RecordMetadata] = ZPipeline.mapChunksZIO(records => produceChunk(records, keySerializer, valueSerializer)) /** @@ -61,11 +61,9 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. + * throughput. See [[produce[R,K,V](record*]] for a version that awaits broker acknowledgement. */ - def produceAsync( - record: ProducerRecord[Array[Byte], Array[Byte]] - ): Task[Task[RecordMetadata]] + def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] /** * Produces a single record. The effect returned from this method has two layers and describes the completion of two @@ -77,11 +75,11 @@ trait Producer { * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. */ - def produceAsync[R, K, V]( + def produceAsync[R1, K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, Task[RecordMetadata]] /** * Produces a single record. The effect returned from this method has two layers and describes the completion of two @@ -93,13 +91,13 @@ trait Producer { * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases * throughput. See [[produce[R,K,V](topic*]] for version that awaits broker acknowledgement. */ - def produceAsync[R, K, V]( + def produceAsync[R1, K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, Task[RecordMetadata]] /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time @@ -108,9 +106,7 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunk( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): Task[Chunk[RecordMetadata]] + def produceChunk(records: Chunk[ByteRecord]): RIO[R, Chunk[RecordMetadata]] /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time @@ -119,11 +115,11 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunk[R, K, V]( + def produceChunk[R1, K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Chunk[RecordMetadata]] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, Chunk[RecordMetadata]] /** * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of @@ -138,9 +134,7 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunkAsync( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): Task[Task[Chunk[RecordMetadata]]] + def produceChunkAsync(records: Chunk[ByteRecord]): RIO[R, Task[Chunk[RecordMetadata]]] /** * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of @@ -155,11 +149,11 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunkAsync[R, K, V]( + def produceChunkAsync[R1, K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[Chunk[RecordMetadata]]] + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R & R1, Task[Chunk[RecordMetadata]]] /** * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of @@ -180,25 +174,32 @@ trait Producer { * This variant does not accept serializers as they may also fail independently of each record and this is not * reflected in the return type. */ - def produceChunkAsyncWithFailures( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] + def produceChunkAsyncWithFailures(records: Chunk[ByteRecord]): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] /** * Get the partition metadata for the given topic */ - def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] + def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] /** * Flushes the producer's internal buffer. This will guarantee that all records currently buffered will be transmitted * to the broker. */ - def flush: Task[Unit] + def flush: RIO[R, Unit] /** * Expose internal producer metrics */ - def metrics: Task[Map[MetricName, Metric]] + def metrics: RIO[R, Map[MetricName, Metric]] + + /** + * Attaches an aspect that will wrap the producer so that it can be augmented. + * + * @param aspect + * The aspect that will augment this producer + */ + final def @@[LowerR <: UpperR, UpperR <: R](aspect: ProducerAspect[LowerR, UpperR]): ProducerWithEnv[UpperR] = + aspect(self) } object Producer { @@ -206,7 +207,10 @@ object Producer { extends RuntimeException("Publish omitted due to a publish error for a previous record in the chunk") with NoStackTrace - val live: RLayer[ProducerSettings, Producer] = + /** + * Makes a producer, taking the ProducerSettings from the environment. + */ + def live: RLayer[ProducerSettings, Producer] = ZLayer.scoped { for { settings <- ZIO.service[ProducerSettings] @@ -214,6 +218,9 @@ object Producer { } yield producer } + /** + * Makes a producer with given ProducerSettings. + */ def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] = for { _ <- SslHelper.validateEndpoint(settings.driverSettings) @@ -251,9 +258,7 @@ object Producer { /** * Accessor method */ - def produce( - record: ProducerRecord[Array[Byte], Array[Byte]] - ): RIO[Producer, RecordMetadata] = + def produce(record: ByteRecord): RIO[Producer, RecordMetadata] = ZIO.serviceWithZIO[Producer](_.produce(record)) /** @@ -263,7 +268,7 @@ object Producer { record: ProducerRecord[K, V], keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, RecordMetadata] = + ): RIO[Producer & R, RecordMetadata] = ZIO.serviceWithZIO[Producer](_.produce(record, keySerializer, valueSerializer)) /** @@ -275,7 +280,7 @@ object Producer { value: V, keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, RecordMetadata] = + ): RIO[Producer & R, RecordMetadata] = ZIO.serviceWithZIO[Producer](_.produce(topic, key, value, keySerializer, valueSerializer)) /** @@ -290,9 +295,7 @@ object Producer { /** * Accessor method */ - def produceAsync( - record: ProducerRecord[Array[Byte], Array[Byte]] - ): RIO[Producer, Task[RecordMetadata]] = + def produceAsync(record: ByteRecord): RIO[Producer, Task[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceAsync(record)) /** @@ -302,7 +305,7 @@ object Producer { record: ProducerRecord[K, V], keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[RecordMetadata]] = + ): RIO[Producer & R, Task[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceAsync(record, keySerializer, valueSerializer)) /** @@ -314,15 +317,13 @@ object Producer { value: V, keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[RecordMetadata]] = + ): RIO[Producer & R, Task[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceAsync(topic, key, value, keySerializer, valueSerializer)) /** * Accessor method */ - def produceChunkAsync( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): RIO[Producer, Task[Chunk[RecordMetadata]]] = + def produceChunkAsync(records: Chunk[ByteRecord]): RIO[Producer, Task[Chunk[RecordMetadata]]] = ZIO.serviceWithZIO[Producer](_.produceChunkAsync(records)) /** @@ -332,23 +333,21 @@ object Producer { records: Chunk[ProducerRecord[K, V]], keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[Chunk[RecordMetadata]]] = + ): RIO[Producer & R, Task[Chunk[RecordMetadata]]] = ZIO.serviceWithZIO[Producer](_.produceChunkAsync(records, keySerializer, valueSerializer)) /** * Accessor method for [[Producer.produceChunkAsyncWithFailures]]] */ def produceChunkAsyncWithFailures( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] + records: Chunk[ByteRecord] ): RIO[Producer, UIO[Chunk[Either[Throwable, RecordMetadata]]]] = ZIO.serviceWithZIO[Producer](_.produceChunkAsyncWithFailures(records)) /** * Accessor method */ - def produceChunk( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): RIO[Producer, Chunk[RecordMetadata]] = + def produceChunk(records: Chunk[ByteRecord]): RIO[Producer, Chunk[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceChunk(records)) /** @@ -358,7 +357,7 @@ object Producer { records: Chunk[ProducerRecord[K, V]], keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Chunk[RecordMetadata]] = + ): RIO[Producer & R, Chunk[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceChunk(records, keySerializer, valueSerializer)) /** @@ -370,77 +369,75 @@ object Producer { /** * Accessor method */ - val flush: RIO[Producer, Unit] = ZIO.serviceWithZIO(_.flush) + val flush: RIO[Producer, Unit] = + ZIO.serviceWithZIO(_.flush) /** * Accessor method */ - val metrics: RIO[Producer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics) + val metrics: RIO[Producer, Map[MetricName, Metric]] = + ZIO.serviceWithZIO(_.metrics) } -private[producer] final class ProducerLive( - private[producer] val p: JProducer[Array[Byte], Array[Byte]], - runtime: Runtime[Any], - sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] -) extends Producer { +/** + * Implements all the `produce` variants by delegating to one of these methods: + * + * - `produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]]` + * - `produceChunkAsyncWithFailures(records: Chunk[ByteRecord]): RIO[R, UIO[Chunk[Either[Throwable, + * RecordMetadata]]]]` + */ +trait DefaultProducer[R] extends ProducerWithEnv[R] { - override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] = + override def produce(record: ByteRecord): RIO[R, RecordMetadata] = produceAsync(record).flatten - override def produce[R, K, V]( + override def produce[R1, K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, RecordMetadata] = produceAsync(record, keySerializer, valueSerializer).flatten - override def produce[R, K, V]( + override def produce[R1, K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, RecordMetadata] = produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) - // noinspection YieldingZIOEffectInspection - override def produceAsync(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[Task[RecordMetadata]] = - for { - done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] - _ <- sendQueue.offer((Chunk.single(record), done)) - } yield done.await.flatMap(result => ZIO.fromEither(result.head)) - - override def produceAsync[R, K, V]( + override def produceAsync[R1, K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, Task[RecordMetadata]] = serialize(record, keySerializer, valueSerializer).flatMap(produceAsync) - override def produceAsync[R, K, V]( + override def produceAsync[R1, K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, Task[RecordMetadata]] = produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) - override def produceChunk(records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]): Task[Chunk[RecordMetadata]] = + override def produceChunk(records: Chunk[ByteRecord]): RIO[R, Chunk[RecordMetadata]] = produceChunkAsync(records).flatten - override def produceChunk[R, K, V]( + override def produceChunk[R1, K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Chunk[RecordMetadata]] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, Chunk[RecordMetadata]] = produceChunkAsync(records, keySerializer, valueSerializer).flatten // noinspection YieldingZIOEffectInspection override def produceChunkAsync( - records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] - ): Task[Task[Chunk[RecordMetadata]]] = + records: Chunk[ByteRecord] + ): RIO[R, Task[Chunk[RecordMetadata]]] = produceChunkAsyncWithFailures(records).map(_.flatMap { chunkResults => val (errors, success) = chunkResults.partitionMap(identity) errors.headOption match { @@ -449,15 +446,41 @@ private[producer] final class ProducerLive( } }) - override def produceChunkAsync[R, K, V]( + override def produceChunkAsync[R1, K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[Chunk[RecordMetadata]]] = + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1 & R, Task[Chunk[RecordMetadata]]] = ZIO .foreach(records)(serialize(_, keySerializer, valueSerializer)) .flatMap(produceChunkAsync) + private def serialize[R1, K, V]( + r: ProducerRecord[K, V], + keySerializer: Serializer[R1, K], + valueSerializer: Serializer[R1, V] + ): RIO[R1, ByteRecord] = + for { + key <- keySerializer.serialize(r.topic, r.headers, r.key()) + value <- valueSerializer.serialize(r.topic, r.headers, r.value()) + } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) + +} + +private[producer] final class ProducerLive( + private[producer] val p: JProducer[Array[Byte], Array[Byte]], + runtime: Runtime[Any], + sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] +) extends Producer + with DefaultProducer[Any] { + + // noinspection YieldingZIOEffectInspection + override def produceAsync(record: ByteRecord): Task[Task[RecordMetadata]] = + for { + done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] + _ <- sendQueue.offer((Chunk.single(record), done)) + } yield done.await.flatMap(result => ZIO.fromEither(result.head)) + // noinspection YieldingZIOEffectInspection override def produceChunkAsyncWithFailures( records: Chunk[ByteRecord] @@ -541,13 +564,4 @@ private[producer] final class ProducerLive( } .runDrain - private def serialize[R, K, V]( - r: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, ByteRecord] = - for { - key <- keySerializer.serialize(r.topic, r.headers, r.key()) - value <- valueSerializer.serialize(r.topic, r.headers, r.value()) - } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerAspect.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerAspect.scala new file mode 100644 index 000000000..309b95f60 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerAspect.scala @@ -0,0 +1,18 @@ +package zio.kafka.producer + +/** + * A `ProducerAspect` transforms a Producer into another to augment an existing Producer with new capabilities or + * features. + */ +trait ProducerAspect[+LowerR, -UpperR] { self => + + def apply[R >: LowerR <: UpperR](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] + + def @@[LowerR1 >: LowerR, UpperR1 <: UpperR]( + other: ProducerAspect[LowerR1, UpperR1] + ): ProducerAspect[LowerR1, UpperR1] = + new ProducerAspect[LowerR1, UpperR1] { + override def apply[R >: LowerR1 <: UpperR1](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] = + other(self(wrapped)) + } +}