From 0f14847da83824e7563241ebabdbb3f7763c1323 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Tue, 28 Nov 2023 09:25:48 +0000 Subject: [PATCH] Changed column width Signed-off-by: Jordan Hall --- .scalafmt.conf | 3 +- .../kafkakewl/common/http/EndpointUtils.scala | 28 +++++------ .../common/kafka/KafkaConsumerUtils.scala | 16 ++---- .../persistence/KafkaPersistentStore.scala | 49 ++++++++----------- .../com/mwam/kafkakewl/deploy/Main.scala | 5 +- .../endpoints/DeploymentsEndpoints.scala | 7 ++- .../services/TopologyDeploymentsService.scala | 4 +- .../mwam/kafkakewl/domain/Deployments.scala | 15 ++---- .../com/mwam/kafkakewl/domain/Topology.scala | 32 ++++-------- .../mwam/kafkakewl/domain/TopologyJson.scala | 9 +--- .../validation/TopologyValidation.scala | 8 +-- .../kafkakewl/domain/validation/package.scala | 12 ++--- .../kafkakewl/metrics/domain/Failures.scala | 3 +- .../domain/KafkaConsumerGroupInfo.scala | 35 +++++++------ .../domain/KafkaConsumerGroupInfoJson.scala | 9 +--- .../domain/KafkaConsumerGroupOffset.scala | 3 +- .../domain/KafkaTopicPartitionInfoJson.scala | 9 +--- .../endpoints/ConsumerGroupEndpoints.scala | 3 +- .../ConsumerGroupServerEndpoints.scala | 14 ++---- .../metrics/endpoints/TopicEndpoints.scala | 5 +- .../endpoints/TopicServerEndpoints.scala | 10 +--- .../services/ConsumerOffsetsSource.scala | 36 +++++--------- .../KafkaConsumerGroupInfoCache.scala | 8 +-- .../services/KafkaTopicInfoCache.scala | 5 +- .../services/KafkaTopicInfoSource.scala | 12 ++--- .../kafkakewl/utils/logging/LogFilter.scala | 6 +-- 26 files changed, 115 insertions(+), 231 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 10a81a0..20afb4a 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,2 +1,3 @@ version = 3.7.17 -runner.dialect = scala3 \ No newline at end of file +runner.dialect = scala3 +maxColumn = 120 \ No newline at end of file diff --git a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/http/EndpointUtils.scala b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/http/EndpointUtils.scala index 918295a..0731e33 100644 --- a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/http/EndpointUtils.scala +++ b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/http/EndpointUtils.scala @@ -13,29 +13,26 @@ import sttp.tapir.EndpointIO.annotations.* import zio.json.{JsonDecoder, JsonEncoder} trait EndpointUtils { - val apiEndpoint: PublicEndpoint[Unit, Unit, Unit, Any] = - endpoint.in("api" / "v1") + val apiEndpoint: PublicEndpoint[Unit, Unit, Unit, Any] = endpoint.in("api" / "v1") } object EndpointUtils { - /** Body codec that will deserialize YAML into domain objects via YAML -> JSON - * -> domain marshalling. This codec does not support encoding responses as - * YAML (since circe-yaml does not support encoding) and YAML response are - * not expected to see use. + /** Body codec that will deserialize YAML into domain objects via YAML -> JSON -> domain marshalling. This codec does + * not support encoding responses as YAML (since circe-yaml does not support encoding) and YAML response are not + * expected to see use. * * The Content-Type of YAML requests is associated with "text/plain". * - * This works by using circe-yaml to convert the YAML input from the request - * body into JSON and using the existing tapir JSON marshalling onwards. + * This works by using circe-yaml to convert the YAML input from the request body into JSON and using the existing + * tapir JSON marshalling onwards. * * @tparam T * Domain object target of the model * @return * A body mapping from string to T */ - def yamlRequestBody[T: JsonEncoder: JsonDecoder: Schema] - : EndpointIO.Body[String, T] = { + def yamlRequestBody[T: JsonEncoder: JsonDecoder: Schema]: EndpointIO.Body[String, T] = { val jsonDecode = sttp.tapir.json.zio.zioCodec[T] stringBodyUtf8AnyFormat( zioYamlCodec.mapDecode(jsonDecode.rawDecode)(jsonDecode.encode) @@ -43,12 +40,11 @@ object EndpointUtils { } private val zioYamlCodec: Codec[String, String, TextPlain] = - sttp.tapir.Codec.anyString[String, TextPlain](CodecFormat.TextPlain()) { - s => - io.circe.yaml.parser.parse(s).map(_.spaces2) match { - case Left(error) => DecodeResult.Error(s, error) - case Right(jsonString) => DecodeResult.Value(jsonString) - } + sttp.tapir.Codec.anyString[String, TextPlain](CodecFormat.TextPlain()) { s => + io.circe.yaml.parser.parse(s).map(_.spaces2) match { + case Left(error) => DecodeResult.Error(s, error) + case Right(jsonString) => DecodeResult.Value(jsonString) + } } { _ => throw UnsupportedOperationException("Encoding as YAML is unsupported") } diff --git a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/kafka/KafkaConsumerUtils.scala b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/kafka/KafkaConsumerUtils.scala index 47eb82a..d5816e2 100644 --- a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/kafka/KafkaConsumerUtils.scala +++ b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/kafka/KafkaConsumerUtils.scala @@ -7,17 +7,9 @@ package com.mwam.kafkakewl.common.kafka import com.mwam.kafkakewl.domain.config.KafkaClientConfig -import org.apache.kafka.clients.consumer.{ - Consumer, - ConsumerRecords, - KafkaConsumer -} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization.{ - BytesDeserializer, - Deserializer, - StringDeserializer -} +import org.apache.kafka.common.serialization.{BytesDeserializer, Deserializer, StringDeserializer} import org.apache.kafka.common.utils.Bytes import zio.* @@ -92,9 +84,7 @@ object KafkaConsumerExtensions { if (key == null) { None } else { - val value = valueBytesOption.map(valueBytes => - valueDeserializer.deserialize(topic, valueBytes.get) - ) + val value = valueBytesOption.map(valueBytes => valueDeserializer.deserialize(topic, valueBytes.get)) Some((key, value)) } }, diff --git a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/persistence/KafkaPersistentStore.scala b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/persistence/KafkaPersistentStore.scala index 5588eb4..dc3ea64 100644 --- a/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/persistence/KafkaPersistentStore.scala +++ b/kafkakewl-common/src/main/scala/com/mwam/kafkakewl/common/persistence/KafkaPersistentStore.scala @@ -31,12 +31,7 @@ final case class BatchMessageEnvelope[Payload]( ) object BatchMessageEnvelopeJson { - import zio.json.{ - DeriveJsonDecoder, - DeriveJsonEncoder, - JsonDecoder, - JsonEncoder - } + import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder} given [Payload](using JsonEncoder[Payload] @@ -58,9 +53,7 @@ class KafkaPersistentStore( private object KafkaSerde { val key: Serde[Any, TopologyId] = - Serde.string.inmapM(s => ZIO.succeed(TopologyId(s)))(topologyId => - ZIO.succeed(topologyId.value) - ) + Serde.string.inmapM(s => ZIO.succeed(TopologyId(s)))(topologyId => ZIO.succeed(topologyId.value)) val value: Serde[Any, BatchMessageEnvelope[TopologyDeployment]] = Serde.string.inmapM[Any, BatchMessageEnvelope[TopologyDeployment]](s => ZIO @@ -86,9 +79,8 @@ class KafkaPersistentStore( (duration, deserializedTopologies) = deserializedTopologiesWithDuration - failedTopologies = deserializedTopologies.collect { - case (id, Left(jsonError)) => - ZIO.logError(s"failed to deserialize topology $id: $jsonError") + failedTopologies = deserializedTopologies.collect { case (id, Left(jsonError)) => + ZIO.logError(s"failed to deserialize topology $id: $jsonError") }.toList topologies = deserializedTopologies @@ -108,24 +100,23 @@ class KafkaPersistentStore( for { txn <- kafkaProducer.createTransaction batchSize = topologyDeployments.size - _ <- ZIO.foreachDiscard(topologyDeployments.values.zipWithIndex) { - (td, indexInBatch) => - for { - recordMetadataWithDuration <- txn - .produce( - topicName, - td.topologyId, - BatchMessageEnvelope(batchSize, indexInBatch, td), - KafkaSerde.key, - KafkaSerde.value, - offset = None - ) - .timed - (duration, recordMetadata) = recordMetadataWithDuration - _ <- ZIO.logInfo( - s"saved topology ${td.topologyId} into $topicName kafka topic: P#${recordMetadata.partition} @${recordMetadata.offset} in ${duration.toMillis / 1000.0} seconds" + _ <- ZIO.foreachDiscard(topologyDeployments.values.zipWithIndex) { (td, indexInBatch) => + for { + recordMetadataWithDuration <- txn + .produce( + topicName, + td.topologyId, + BatchMessageEnvelope(batchSize, indexInBatch, td), + KafkaSerde.key, + KafkaSerde.value, + offset = None ) - } yield () + .timed + (duration, recordMetadata) = recordMetadataWithDuration + _ <- ZIO.logInfo( + s"saved topology ${td.topologyId} into $topicName kafka topic: P#${recordMetadata.partition} @${recordMetadata.offset} in ${duration.toMillis / 1000.0} seconds" + ) + } yield () } } yield () } diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala index 643be44..f130e52 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala @@ -7,10 +7,7 @@ package com.mwam.kafkakewl.deploy import com.mwam.kafkakewl.common.http.HttpServer -import com.mwam.kafkakewl.common.persistence.{ - KafkaPersistentStore, - PersistentStore -} +import com.mwam.kafkakewl.common.persistence.{KafkaPersistentStore, PersistentStore} import com.mwam.kafkakewl.common.telemetry.GlobalTracer import com.mwam.kafkakewl.deploy.endpoints.* import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/endpoints/DeploymentsEndpoints.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/endpoints/DeploymentsEndpoints.scala index d907477..da5f6d7 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/endpoints/DeploymentsEndpoints.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/endpoints/DeploymentsEndpoints.scala @@ -48,10 +48,9 @@ class DeploymentsEndpoints() extends EndpointUtils { .errorOut(queryDeploymentsFailureOutput) .out(jsonBody[TopologyDeployment]) - val getDeploymentsEndpoint - : PublicEndpoint[TopologyDeploymentQuery, QueryDeploymentsFailure, Seq[ - TopologyDeployment - ], Any] = deploymentsEndpoint + val getDeploymentsEndpoint: PublicEndpoint[TopologyDeploymentQuery, QueryDeploymentsFailure, Seq[ + TopologyDeployment + ], Any] = deploymentsEndpoint .in(query[Option[String]]("filter")) .in(query[Option[Boolean]]("with_topology")) .in(query[Option[Int]]("offset")) diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala index 758a744..78b94e3 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala @@ -45,9 +45,7 @@ class TopologyDeploymentsService private ( ) ) .toMap ++ deployments.delete - .map(tid => - (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None)) - ) + .map(tid => (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None))) .toMap _ <- persistentStore diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala index a23912b..0ee8e81 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala @@ -13,8 +13,7 @@ final case class DeploymentOptions( allowUnsafe: Boolean = false ) -/** A deployment to be done, contains options, topologies to be deployed and - * topology-ids to be removed. +/** A deployment to be done, contains options, topologies to be deployed and topology-ids to be removed. * * @param options * the deployment options @@ -68,17 +67,13 @@ sealed trait PostDeploymentsFailure /** All failures relating to performing/querying deployments. */ object DeploymentsFailure { - final case class NotFound(notFound: Seq[String]) - extends QueryDeploymentsFailure + final case class NotFound(notFound: Seq[String]) extends QueryDeploymentsFailure final case class Authorization(authorizationFailed: Seq[String]) extends PostDeploymentsFailure with QueryDeploymentsFailure - final case class Validation(validationFailed: Seq[String]) - extends PostDeploymentsFailure - final case class Deployment(deploymentFailed: Seq[String]) - extends PostDeploymentsFailure - final case class Persistence(persistFailed: Seq[String]) - extends PostDeploymentsFailure + final case class Validation(validationFailed: Seq[String]) extends PostDeploymentsFailure + final case class Deployment(deploymentFailed: Seq[String]) extends PostDeploymentsFailure + final case class Persistence(persistFailed: Seq[String]) extends PostDeploymentsFailure def notFound(notFound: String*): NotFound = NotFound(notFound) def authorization(throwable: Throwable): Authorization = Authorization( diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Topology.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Topology.scala index db01c82..380e96c 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Topology.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Topology.scala @@ -24,9 +24,7 @@ final case class Topic( /** The local application id in the current topology's namespace. */ -final case class ApplicationLocalId(value: String) - extends AnyVal - with StringValue +final case class ApplicationLocalId(value: String) extends AnyVal with StringValue final case class UserId(value: String) extends AnyVal with StringValue final case class Application( id: ApplicationLocalId, @@ -34,26 +32,20 @@ final case class Application( // TODO different application types ) -/** The local topic alias id in the current topology's namespace (fully - * qualified isn't really needed anyway because aliases aren't currently - * exposed to other topologies). +/** The local topic alias id in the current topology's namespace (fully qualified isn't really needed anyway because + * aliases aren't currently exposed to other topologies). */ -final case class TopicAliasLocalId(value: String) - extends AnyVal - with StringValue +final case class TopicAliasLocalId(value: String) extends AnyVal with StringValue final case class TopicAlias( id: TopicAliasLocalId, // TODO perhaps support other ways, e.g. list of topic ids, namespace? Although all these are expressible with regex easily regex: String ) -/** The local application alias id in the current topology's namespace (fully - * qualified isn't really needed anyway because aliases aren't currently - * exposed to other topologies). +/** The local application alias id in the current topology's namespace (fully qualified isn't really needed anyway + * because aliases aren't currently exposed to other topologies). */ -final case class ApplicationAliasLocalId(value: String) - extends AnyVal - with StringValue +final case class ApplicationAliasLocalId(value: String) extends AnyVal with StringValue final case class ApplicationAlias( id: ApplicationAliasLocalId, // TODO perhaps support other ways, e.g. list of topic ids, namespace? Although all these are expressible with regex easily @@ -64,15 +56,11 @@ final case class Aliases( applications: Seq[ApplicationAlias] = Seq.empty ) -/** ApplicationFlexId can be an application alias or application id, local or - * fully qualified. +/** ApplicationFlexId can be an application alias or application id, local or fully qualified. */ -final case class ApplicationFlexId(value: String) - extends AnyVal - with StringValue +final case class ApplicationFlexId(value: String) extends AnyVal with StringValue -/** TopicFlexId can be a local or fully qualified topic alias or fully qualified - * topic id. +/** TopicFlexId can be a local or fully qualified topic alias or fully qualified topic id. */ final case class TopicFlexId(value: String) extends AnyVal with StringValue final case class ProducedTopic(topic: TopicFlexId) diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/TopologyJson.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/TopologyJson.scala index 6918a6c..5499015 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/TopologyJson.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/TopologyJson.scala @@ -6,14 +6,7 @@ package com.mwam.kafkakewl.domain -import zio.json.{ - DeriveJsonDecoder, - DeriveJsonEncoder, - JsonDecoder, - JsonEncoder, - JsonFieldDecoder, - JsonFieldEncoder -} +import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder, JsonFieldDecoder, JsonFieldEncoder} object TopologyJson { given [T <: StringValue]: JsonEncoder[T] = diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/TopologyValidation.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/TopologyValidation.scala index f809e28..425ab62 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/TopologyValidation.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/TopologyValidation.scala @@ -6,13 +6,7 @@ package com.mwam.kafkakewl.domain.validation -import com.mwam.kafkakewl.domain.{ - Deployments, - Namespace, - Topology, - TopologyDeployments, - TopologyId -} +import com.mwam.kafkakewl.domain.{Deployments, Namespace, Topology, TopologyDeployments, TopologyId} import scala.util.matching.Regex diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/package.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/package.scala index d17c3ae..07e140c 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/package.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/validation/package.scala @@ -14,11 +14,11 @@ import scala.annotation.targetName package object validation { type ValidationError = String - /** The validation does not produce anything, because the input is already a - * domain object that we're checking whether it's valid or not. + /** The validation does not produce anything, because the input is already a domain object that we're checking whether + * it's valid or not. * - * TODO Maybe an abuse of the Validation[E, A] type? Ultimately all we need - * is a list of errors, when empty it means no errors. + * TODO Maybe an abuse of the Validation[E, A] type? Ultimately all we need is a list of errors, when empty it means + * no errors. */ type ValidationFailures = Validation[ValidationError, Unit] @@ -37,8 +37,8 @@ package object validation { validationFailures1: ValidationFailures, validationFailures2: ValidationFailures ): ValidationFailures = - Validation.validateWith(validationFailures1, validationFailures2) { - (_, _) => successValue + Validation.validateWith(validationFailures1, validationFailures2) { (_, _) => + successValue } extension (validationFailures: ValidationFailures) { diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/Failures.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/Failures.scala index cd0d3c9..76ea560 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/Failures.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/Failures.scala @@ -12,8 +12,7 @@ sealed trait QueryFailure object Failures { final case class NotFound(notFound: Seq[String]) extends QueryFailure - final case class Authorization(authorizationFailed: Seq[String]) - extends QueryFailure + final case class Authorization(authorizationFailed: Seq[String]) extends QueryFailure def notFound(notFound: String*): NotFound = NotFound(notFound) def authorization(throwable: Throwable): Authorization = Authorization( diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfo.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfo.scala index 394a126..d478643 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfo.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfo.scala @@ -22,26 +22,25 @@ object KafkaConsumerGroupInfoExtensions { consumerGroupOffsets: KafkaConsumerGroupOffsets ): Map[String, KafkaConsumerGroupInfo] = { // TODO may not be very efficient, but works. - consumerGroupOffsets.foldLeft(consumerGroupInfos) { - case (newConsumerGroupInfos, (cgtp, cgo)) => - val group = cgtp.group - val topic = cgtp.topicPartition.topic - val partition = cgtp.topicPartition.partition + consumerGroupOffsets.foldLeft(consumerGroupInfos) { case (newConsumerGroupInfos, (cgtp, cgo)) => + val group = cgtp.group + val topic = cgtp.topicPartition.topic + val partition = cgtp.topicPartition.partition - val currentConsumerGroupInfo = - consumerGroupInfos.getOrElse(group, KafkaConsumerGroupInfo.empty) - val currentConsumerGroupTopic = currentConsumerGroupInfo.topics - .getOrElse(topic, SortedMap.empty[Int, KafkaConsumerGroupOffset]) + val currentConsumerGroupInfo = + consumerGroupInfos.getOrElse(group, KafkaConsumerGroupInfo.empty) + val currentConsumerGroupTopic = currentConsumerGroupInfo.topics + .getOrElse(topic, SortedMap.empty[Int, KafkaConsumerGroupOffset]) - val newConsumerGroupTopic = cgo match { - case Some(consumerGroupOffset) => - currentConsumerGroupTopic + (partition -> consumerGroupOffset) - case None => currentConsumerGroupTopic - partition - } - val newConsumerGroupInfo = KafkaConsumerGroupInfo( - currentConsumerGroupInfo.topics + (topic -> newConsumerGroupTopic) - ) - newConsumerGroupInfos + (cgtp.group -> newConsumerGroupInfo) + val newConsumerGroupTopic = cgo match { + case Some(consumerGroupOffset) => + currentConsumerGroupTopic + (partition -> consumerGroupOffset) + case None => currentConsumerGroupTopic - partition + } + val newConsumerGroupInfo = KafkaConsumerGroupInfo( + currentConsumerGroupInfo.topics + (topic -> newConsumerGroupTopic) + ) + newConsumerGroupInfos + (cgtp.group -> newConsumerGroupInfo) } } } diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfoJson.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfoJson.scala index ea0729d..bb8edbb 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfoJson.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupInfoJson.scala @@ -6,14 +6,7 @@ package com.mwam.kafkakewl.metrics.domain -import zio.json.{ - DeriveJsonDecoder, - DeriveJsonEncoder, - JsonDecoder, - JsonEncoder, - JsonFieldDecoder, - JsonFieldEncoder -} +import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder, JsonFieldDecoder, JsonFieldEncoder} import scala.collection.immutable.{Map, SortedMap} diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupOffset.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupOffset.scala index 7ed8202..3d95b04 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupOffset.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaConsumerGroupOffset.scala @@ -23,7 +23,6 @@ type KafkaConsumerGroupOffsets = Map[ConsumerGroupTopicPartition, Option[KafkaConsumerGroupOffset]] object KafkaConsumerGroupOffsets { - val empty - : Map[ConsumerGroupTopicPartition, Option[KafkaConsumerGroupOffset]] = + val empty: Map[ConsumerGroupTopicPartition, Option[KafkaConsumerGroupOffset]] = Map.empty } diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaTopicPartitionInfoJson.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaTopicPartitionInfoJson.scala index be21f0e..28b5d26 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaTopicPartitionInfoJson.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/domain/KafkaTopicPartitionInfoJson.scala @@ -6,14 +6,7 @@ package com.mwam.kafkakewl.metrics.domain -import zio.json.{ - DeriveJsonDecoder, - DeriveJsonEncoder, - JsonDecoder, - JsonEncoder, - JsonFieldDecoder, - JsonFieldEncoder -} +import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder, JsonFieldDecoder, JsonFieldEncoder} import scala.collection.immutable.SortedMap diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupEndpoints.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupEndpoints.scala index 6207c79..15ced83 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupEndpoints.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupEndpoints.scala @@ -24,8 +24,7 @@ class ConsumerGroupEndpoints() extends EndpointUtils with EndpointOutputs { .errorOut(queryFailureOutput) .out(jsonBody[Seq[String]]) - val getGroupEndpoint - : PublicEndpoint[String, QueryFailure, KafkaConsumerGroupInfo, Any] = + val getGroupEndpoint: PublicEndpoint[String, QueryFailure, KafkaConsumerGroupInfo, Any] = groupEndpoint .in(path[String]("group_name")) .get diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupServerEndpoints.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupServerEndpoints.scala index 70db7b9..6661b9c 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupServerEndpoints.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/ConsumerGroupServerEndpoints.scala @@ -7,11 +7,7 @@ package com.mwam.kafkakewl.metrics.endpoints import com.mwam.kafkakewl.common.telemetry.zServerLogicWithTracing -import com.mwam.kafkakewl.metrics.domain.{ - Failures, - KafkaConsumerGroupInfo, - QueryFailure -} +import com.mwam.kafkakewl.metrics.domain.{Failures, KafkaConsumerGroupInfo, QueryFailure} import com.mwam.kafkakewl.metrics.services.KafkaConsumerGroupInfoCache import sttp.tapir.ztapir.* import zio.* @@ -25,12 +21,8 @@ class ConsumerGroupServerEndpoints( given Tracing = tracing val endpoints: List[ZServerEndpoint[Any, Any]] = List( - consumerGroupEndpoints.getGroupsEndpoint.zServerLogicWithTracing(_ => - getConsumerGroups - ), - consumerGroupEndpoints.getGroupEndpoint.zServerLogicWithTracing(group => - getConsumerGroup(group) - ) + consumerGroupEndpoints.getGroupsEndpoint.zServerLogicWithTracing(_ => getConsumerGroups), + consumerGroupEndpoints.getGroupEndpoint.zServerLogicWithTracing(group => getConsumerGroup(group)) ) private def getConsumerGroups: ZIO[Any, QueryFailure, Seq[String]] = diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicEndpoints.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicEndpoints.scala index 4c6fffe..edb6785 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicEndpoints.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicEndpoints.scala @@ -7,10 +7,7 @@ package com.mwam.kafkakewl.metrics.endpoints import com.mwam.kafkakewl.common.http.EndpointUtils -import com.mwam.kafkakewl.metrics.domain.{ - KafkaSingleTopicPartitionInfos, - QueryFailure -} +import com.mwam.kafkakewl.metrics.domain.{KafkaSingleTopicPartitionInfos, QueryFailure} import com.mwam.kafkakewl.metrics.domain.KafkaTopicPartitionInfoJson.given import com.mwam.kafkakewl.metrics.domain.KafkaTopicPartitionInfoSchema.given import sttp.tapir.json.zio.* diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicServerEndpoints.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicServerEndpoints.scala index ec04200..e085abb 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicServerEndpoints.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/endpoints/TopicServerEndpoints.scala @@ -7,11 +7,7 @@ package com.mwam.kafkakewl.metrics.endpoints import com.mwam.kafkakewl.common.telemetry.zServerLogicWithTracing -import com.mwam.kafkakewl.metrics.domain.{ - Failures, - KafkaSingleTopicPartitionInfos, - QueryFailure -} +import com.mwam.kafkakewl.metrics.domain.{Failures, KafkaSingleTopicPartitionInfos, QueryFailure} import com.mwam.kafkakewl.metrics.services.KafkaTopicInfoCache import sttp.tapir.ztapir.* import zio.* @@ -26,9 +22,7 @@ class TopicServerEndpoints( val endpoints: List[ZServerEndpoint[Any, Any]] = List( topicEndpoints.getTopicsEndpoint.zServerLogicWithTracing(_ => getTopics), - topicEndpoints.getTopicEndpoint.zServerLogicWithTracing(topic => - getTopic(topic) - ) + topicEndpoints.getTopicEndpoint.zServerLogicWithTracing(topic => getTopic(topic)) ) private def getTopics: ZIO[Any, QueryFailure, Seq[String]] = diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/ConsumerOffsetsSource.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/ConsumerOffsetsSource.scala index aaa477f..0ee7d9c 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/ConsumerOffsetsSource.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/ConsumerOffsetsSource.scala @@ -13,10 +13,7 @@ package com.mwam.kafkakewl.metrics.services import com.mwam.kafkakewl.common.kafka.KafkaConsumerExtensions.* -import com.mwam.kafkakewl.common.kafka.{ - CompactedConsumeResult, - KafkaConsumerUtils -} +import com.mwam.kafkakewl.common.kafka.{CompactedConsumeResult, KafkaConsumerUtils} import com.mwam.kafkakewl.domain.config.KafkaClientConfig import com.mwam.kafkakewl.metrics.ConsumerOffsetsSourceConfig import com.mwam.kafkakewl.metrics.domain.{ @@ -44,11 +41,10 @@ object ConsumerOffsetsDeserializers { implicit class ByteBufferExtensions(bb: ByteBuffer) { - /** Reads a length-prefixed string from the byte-buffer. If the 16 bit - * length is -1 (0xFFFF) it considers it an empty string. + /** Reads a length-prefixed string from the byte-buffer. If the 16 bit length is -1 (0xFFFF) it considers it an + * empty string. * - * This is how the `__consumer_offsets` topic's key and value encodes - * strings. + * This is how the `__consumer_offsets` topic's key and value encodes strings. */ def getStringPrefixedWithLength: String = { val length = bb.getShort() @@ -62,14 +58,12 @@ object ConsumerOffsetsDeserializers { } } - /** A kafka deserializer returning a KafkaConsumerGroupTopicPartition from the - * `__consumer_offsets` topic's keys. + /** A kafka deserializer returning a KafkaConsumerGroupTopicPartition from the `__consumer_offsets` topic's keys. * - * It supports only version 0 or 1 and return null for version 2 (which means - * consumer group metadata messages will be filtered out). + * It supports only version 0 or 1 and return null for version 2 (which means consumer group metadata messages will + * be filtered out). */ - class ConsumerOffsetsKeyDeserializer - extends KafkaDeserializer[ConsumerGroupTopicPartition] { + class ConsumerOffsetsKeyDeserializer extends KafkaDeserializer[ConsumerGroupTopicPartition] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () @@ -99,14 +93,12 @@ object ConsumerOffsetsDeserializers { override def close(): Unit = () } - /** A kafka deserializer returning a ConsumerGroupOffset from the - * `__consumer_offsets` topic's values. + /** A kafka deserializer returning a ConsumerGroupOffset from the `__consumer_offsets` topic's values. * - * It can only be used if the key's version is 0 or 1, otherwise the format - * of this is different (the value of consumer group metadata messages). + * It can only be used if the key's version is 0 or 1, otherwise the format of this is different (the value of + * consumer group metadata messages). */ - class ConsumerOffsetsValueDeserializer - extends KafkaDeserializer[KafkaConsumerGroupOffset] { + class ConsumerOffsetsValueDeserializer extends KafkaDeserializer[KafkaConsumerGroupOffset] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () @@ -235,9 +227,7 @@ object ConsumerOffsetsSource { partitionInfos <- consumer.partitionsFor( consumerOffsetsSourceConfig.consumerOffsetsTopicName ) - topicPartitions = partitionInfos.map(pi => - new TopicPartition(pi.topic, pi.partition) - ) + topicPartitions = partitionInfos.map(pi => new TopicPartition(pi.topic, pi.partition)) topicPartitionSet = topicPartitions.toSet // TODO calculate and expose the live lag as a metric diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaConsumerGroupInfoCache.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaConsumerGroupInfoCache.scala index b790a2f..9cff833 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaConsumerGroupInfoCache.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaConsumerGroupInfoCache.scala @@ -7,10 +7,7 @@ package com.mwam.kafkakewl.metrics.services import com.mwam.kafkakewl.metrics.domain.KafkaConsumerGroupInfoExtensions.* -import com.mwam.kafkakewl.metrics.domain.{ - KafkaConsumerGroupInfo, - KafkaConsumerGroupOffsets -} +import com.mwam.kafkakewl.metrics.domain.{KafkaConsumerGroupInfo, KafkaConsumerGroupOffsets} import zio.* class KafkaConsumerGroupInfoCache( @@ -25,8 +22,7 @@ class KafkaConsumerGroupInfoCache( } object KafkaConsumerGroupInfoCache { - def live - : ZLayer[ConsumerOffsetsSource, Nothing, KafkaConsumerGroupInfoCache] = + def live: ZLayer[ConsumerOffsetsSource, Nothing, KafkaConsumerGroupInfoCache] = ZLayer.scoped { for { consumerGroupOffsetsDequeue <- ZIO diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoCache.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoCache.scala index eb37684..2605a38 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoCache.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoCache.scala @@ -7,10 +7,7 @@ package com.mwam.kafkakewl.metrics.services import com.mwam.kafkakewl.metrics.domain.KafkaTopicPartitionInfoExtensions.* -import com.mwam.kafkakewl.metrics.domain.{ - KafkaSingleTopicPartitionInfos, - KafkaTopicPartitionInfoChanges -} +import com.mwam.kafkakewl.metrics.domain.{KafkaSingleTopicPartitionInfos, KafkaTopicPartitionInfoChanges} import zio.* class KafkaTopicInfoCache( diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoSource.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoSource.scala index 359efb2..a97903a 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoSource.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/services/KafkaTopicInfoSource.scala @@ -47,9 +47,7 @@ object KafkaTopicInfoSource { for { scope <- ZIO.service[Scope] consumer <- ZIO.service[Consumer] - hub <- Hub.bounded[KafkaTopicPartitionInfoChanges](requestedCapacity = - 1 - ) + hub <- Hub.bounded[KafkaTopicPartitionInfoChanges](requestedCapacity = 1) } yield KafkaTopicInfoSource( scope, createKafkaTopicInfosStream(consumer), @@ -71,9 +69,7 @@ object KafkaTopicInfoSource { ) || topic.startsWith("dbz") } .flatMap { case (topic, kafkaPartitionInfos) => - kafkaPartitionInfos.map(tpi => - new TopicPartition(topic, tpi.partition) - ) + kafkaPartitionInfos.map(tpi => new TopicPartition(topic, tpi.partition)) } .toSet beginningEndOffsets <- consumer.beginningOffsets( @@ -111,8 +107,8 @@ object KafkaTopicInfoSource { } .map { case (_, topicPartitionInfos) => topicPartitionInfos } // putting the previous and current topic infos together, so that we can diff them below and publish the changes only - .scan((KafkaTopicPartitionInfos.empty, KafkaTopicPartitionInfos.empty)) { - case ((_, previous), current) => (previous, current) + .scan((KafkaTopicPartitionInfos.empty, KafkaTopicPartitionInfos.empty)) { case ((_, previous), current) => + (previous, current) } // first emitted output of scan (2 empty maps) isn't needed .filter { case (previous, current) => diff --git a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala index c0d9455..45453f5 100644 --- a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala +++ b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala @@ -30,8 +30,7 @@ class LogFilter extends TurboFilter { FilterReply.DENY case ( Level.WARN, - "org.apache.kafka.clients.admin.AdminClientConfig" | - "org.apache.kafka.clients.consumer.ConsumerConfig" + "org.apache.kafka.clients.admin.AdminClientConfig" | "org.apache.kafka.clients.consumer.ConsumerConfig" ) if format.contains("supplied but isn't a known config.") => // "The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config." can be info logger.info(marker, format, params) @@ -46,8 +45,7 @@ class LogFilter extends TurboFilter { // TGT renewal can be info logger.info(marker, format, params) FilterReply.DENY - case (Level.ERROR, "org.apache.curator.ConnectionState") - if format.contains("Authentication failed") => + case (Level.ERROR, "org.apache.curator.ConnectionState") if format.contains("Authentication failed") => // this curator error can be warning, because we can still talk to ZK un-authenticated logger.warn(marker, format, params) FilterReply.DENY