Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract Java Kafka client metrics via Producer/ConsumerMetrics #430

Merged
merged 9 commits into from
Jul 15, 2024
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
}
```

## Java client metrics example

The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well.

> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` (or its alternative `metrics.exposeJavaClientMetrics`)
> registers new Prometheus collector under the hood. Please use unique prefixes for each collector
> to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration).
> Prefix can be set as parameter in: `ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))`

```scala
import ConsumerMetricsOf.*

val config: ConsumerConfig = ???
val prometheus: CollectorRegistry = ???
val metrics: ConsumerMetrics[IO] = ???

for {
metrics <- metrics.exposeJavaClientMetrics(prometheus)
consumerOf = ConsumerOf.apply1(metrics1.some)
consumer <- consumerOf(config)
} yield ???
```

## Setup

```scala
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ lazy val commonSettings = Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.skafka.Converters#MapJOps.asScalaMap$extension"
)
),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.consumer.ConsumerMetrics#ConsumerMetricsOps.mapK$extension"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.producer.ProducerMetrics#ProducerMetricsOps.mapK$extension"),
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ConsumerMetricsOf {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ConsumerMetrics]]
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ConsumerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
source.poll(topic, bytes, records, age)

override def count(name: String, topic: Topic): F[Unit] =
source.count(name, topic)

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
source.rebalance(name, topicPartition)

override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)

}

implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prometheus: CollectorRegistry,
prefix: Option[String] = None,
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] =
withJavaClientMetrics(source, prometheus, prefix)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.evolutiongaming.skafka.metrics

import cats.syntax.all._
import cats.effect.syntax.resource._
import cats.effect.{Resource, Ref, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.ClientMetric
import io.prometheus.client.CollectorRegistry

import java.util.UUID

/**
* Allows reporting metrics of multiple Kafka clients inside a single VM.
*
* Example:
* {{{
* val prometheus: CollectorRegistry = ???
* val consumerOf: ConsumerOf[F] = ???
* val producerOf: ProducerOf[F] = ???
*
* for {
* registry <- KafkaMetricsRegistry.of(prometheus)
*
* consumer <- consumerOf(consumerConfig)
* _ <- registry.register(consumer.clientMetrics)
*
* producer <- producerOf(producerConfig)
* _ <- registry.register(producer.clientMetrics)
* } yield ()
* }}}
*/
trait KafkaMetricsRegistry[F[_]] {

/**
* Register a function to obtain a list of client metrics.
* Normally, you would pass
* [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or
* [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]]
*/
def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit]
}

object KafkaMetricsRegistry {

def of[F[_]: Sync: ToTry: UUIDGen](
prometheus: CollectorRegistry,
prefix: Option[String] = None,
): Resource[F, KafkaMetricsRegistry[F]] =
for {
sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource

metrics = sources
.get
.flatMap { sources =>
sources
.toList
.flatTraverse {
case (uuid, metrics) =>
metrics.map { metrics =>
metrics.toList.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) }
}
}
}
.widen[Seq[ClientMetric[F]]]

collector = new KafkaMetricsCollector[F](metrics, prefix)
allocate = Sync[F].delay { prometheus.register(collector) }
release = Sync[F].delay { prometheus.unregister(collector) }

_ <- Resource.make(allocate)(_ => release)
} yield new KafkaMetricsRegistry[F] {

def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] =
for {
uuid <- UUIDGen[F].randomUUID.toResource

allocate = sources.update { sources => sources.updated(uuid, metrics) }
release = sources.update { sources => sources - uuid }

_ <- Resource.make(allocate)(_ => release)
} yield {}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.evolutiongaming.skafka.producer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ProducerMetricsOf {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ProducerMetrics]]
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ProducerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction

override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = source.sendOffsetsToTransaction(latency)

override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency)

override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency)

override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = source.send(topic, latency, bytes)

override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency)

override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency)

override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency)

override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)

override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] =
registry.register(producer.clientMetrics)

}

implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prometheus: CollectorRegistry,
prefix: Option[String],
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] =
withJavaClientMetrics(source, prometheus, prefix)

}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.Resource
import cats.effect.{MonadCancel, Resource}
import cats.implicits._
import cats.{Applicative, Monad, ~>}
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles}

import scala.concurrent.duration.FiniteDuration
import scala.annotation.nowarn

trait ConsumerMetrics[F[_]] {

Expand All @@ -20,6 +21,9 @@ trait ConsumerMetrics[F[_]] {
def rebalance(name: String, topicPartition: TopicPartition): F[Unit]

def topics(latency: FiniteDuration): F[Unit]

private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] =
Resource.unit[F]
}

object ConsumerMetrics {
Expand Down Expand Up @@ -104,11 +108,11 @@ object ConsumerMetrics {
bytesSummary <- bytesSummary
rebalancesCounter <- rebalancesCounter
topicsLatency <- topicsLatency
ageSummary <- registry.summary(
name = s"${ prefix }_poll_age",
help = "Poll records age, time since record.timestamp",
ageSummary <- registry.summary(
name = s"${prefix}_poll_age",
help = "Poll records age, time since record.timestamp",
quantiles = Quantiles.Default,
labels = LabelNames("client", "topic")
labels = LabelNames("client", "topic")
)
} yield { (clientId: ClientId) =>
new ConsumerMetrics[F] {
Expand Down Expand Up @@ -162,6 +166,7 @@ object ConsumerMetrics {

implicit class ConsumerMetricsOps[F[_]](val self: ConsumerMetrics[F]) extends AnyVal {

@deprecated("Use mapK(f, g) instead", "16.2.0")
dfakhritdinov marked this conversation as resolved.
Show resolved Hide resolved
def mapK[G[_]](f: F ~> G): ConsumerMetrics[G] = {
new MapK with ConsumerMetrics[G] {

Expand All @@ -186,5 +191,36 @@ object ConsumerMetrics {
}
}
}

def mapK[G[_]](
fg: F ~> G,
gf: G ~> F
)(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ConsumerMetrics[G] = {
new MapK with ConsumerMetrics[G] {

def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = {
fg(self.call(name, topic, latency, success))
}

def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = {
fg(self.poll(topic, bytes, records, age))
}

def count(name: String, topic: Topic) = {
fg(self.count(name, topic))
}

def rebalance(name: String, topicPartition: TopicPartition) = {
fg(self.rebalance(name, topicPartition))
}

def topics(latency: FiniteDuration) = {
fg(self.topics(latency))
}

override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,31 @@ object ConsumerOf {
def apply[K, V](config: ConsumerConfig)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]) = {
Consumer
.of[F, K, V](config)
.map { consumer =>
metrics.fold { consumer } { consumer.withMetrics1[Throwable] }
.flatMap { consumer =>
metrics match {

case None =>
Resource.pure[F, Consumer[F, K, V]](consumer)

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics[K, V](consumer)
dfakhritdinov marked this conversation as resolved.
Show resolved Hide resolved
} yield {
consumer.withMetrics1[Throwable](metrics)
}
}
}
}
}
}

/** The sole purpose of this method is to support binary compatibility with an intermediate
* version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics`
* and `apply2` using `MeasureDuration` from `cats-helper`.
* This should not be used and should be removed in a reasonable amount of time.
*/
* version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics`
* and `apply2` using `MeasureDuration` from `cats-helper`.
* This should not be used and should be removed in a reasonable amount of time.
*/
@deprecated("Use `apply1`", since = "16.0.3")
def apply2[F[_] : Async : ToTry : ToFuture : MeasureDuration](
def apply2[F[_]: Async: ToTry: ToFuture: MeasureDuration](
metrics: Option[ConsumerMetrics[F]] = None
): ConsumerOf[F] = apply1(metrics)

Expand Down
Loading
Loading