Skip to content

Commit

Permalink
Add Open Tracing support (WIP)
Browse files Browse the repository at this point in the history
Also:
- producer aspects
- use ByteRecord alias

TODO:
- all code to be verified by OT expert
- add support in consumer
- add tests
- consider not mutating record headers in `TracingProducerAspect.traced`
  • Loading branch information
erikvanoosten committed Oct 19, 2024
1 parent 66e57a0 commit d350c65
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 119 deletions.
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ lazy val root = project
zioKafka,
zioKafkaTestkit,
zioKafkaTest,
zioKafkaTracing,
zioKafkaBench,
zioKafkaExample,
docs
Expand Down Expand Up @@ -161,6 +162,22 @@ lazy val zioKafkaTest =
) ++ `embedded-kafka`.value
)

lazy val zioKafkaTracing =
project
.in(file("zio-kafka-tracing"))
.dependsOn(zioKafka, zioKafkaTestkit)
.enablePlugins(BuildInfoPlugin)
.settings(stdSettings("zio-kafka-tracing"))
.settings(buildInfoSettings("zio.kafka"))
.settings(enableZIO(enableStreaming = true))
.settings(publish / skip := true)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-opentracing" % "3.0.0",
"io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test
) ++ `embedded-kafka`.value
)

lazy val zioKafkaBench =
project
.in(file("zio-kafka-bench"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package zio.kafka.tracing

import io.opentracing.propagation.{ Format, TextMapAdapter }
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio.kafka.producer._
import zio.telemetry.opentracing.OpenTracing
import zio.{ Chunk, RIO, Task, UIO, ZIO }

import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.jdk.CollectionConverters._

object TracingProducerAspect {

/**
* Adds open tracing headers to each outgoing record of a ZIO Kafka [[Producer]].
*
* WARNING: this aspect mutates the headers in the record by adding the tracing headers directly. Be careful NOT to
* reuse the records after passing the records to the producer.
*/
def traced: ProducerAspect[Nothing, OpenTracing] = new ProducerAspect[Nothing, OpenTracing] {
override def apply[R >: Nothing <: OpenTracing](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] =
new ProducerWithEnv[R] with DefaultProducer[R] {
// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
for {
recordsWithHeaders <- ZIO.foreach(records)(withTracingHeaders)
result <- wrapped.produceChunkAsyncWithFailures(recordsWithHeaders)
} yield result

// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] =
for {
recordWithHeaders <- withTracingHeaders(record)
result <- wrapped.produceAsync(recordWithHeaders)
} yield result

override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] =
wrapped.partitionsFor(topic)

override def flush: RIO[R, Unit] =
wrapped.flush

override def metrics: RIO[R, Map[MetricName, Metric]] =
wrapped.metrics

private def withTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, ByteRecord] =
kafkaTracingHeaders(record).map { headers =>
headers.foreach(header => record.headers().add(header))
record
}

private def kafkaTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, Seq[Header]] =
ZIO.serviceWithZIO[OpenTracing] { tracing =>
import tracing.aspects._
val headers = mutable.Map.empty[String, String]
val buffer = new TextMapAdapter(headers.asJava)
tracing
.inject(Format.Builtin.HTTP_HEADERS, buffer)
.zipLeft(ZIO.unit @@ spanFrom(Format.Builtin.HTTP_HEADERS, buffer, s"produce to topic ${record.topic()}"))
.as(headers.toSeq.map(PairHeader))
}
}
}

private case class PairHeader(keyValue: (String, String)) extends Header {
override def key(): String = keyValue._1

override def value(): Array[Byte] = keyValue._2.getBytes(StandardCharsets.UTF_8)
}
}
Loading

0 comments on commit d350c65

Please sign in to comment.