diff --git a/core/src/main/scala/com/banno/kafka/Topic.scala b/core/src/main/scala/com/banno/kafka/Topic.scala index bd995a2b2..205a17f17 100644 --- a/core/src/main/scala/com/banno/kafka/Topic.scala +++ b/core/src/main/scala/com/banno/kafka/Topic.scala @@ -130,13 +130,15 @@ object Topic { override def setUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object] = Map.empty, ): F[Unit] = for { _ <- AdminApi.createTopicsIdempotent( bootstrapServers.bs, - config + List(config), + configs ) - _ <- registerSchemas(schemaRegistryUri) + _ <- registerSchemas(schemaRegistryUri, configs) } yield () } @@ -165,8 +167,9 @@ object Topic { override def setUp[F[_]: Sync]( bootstrapServers: BootstrapServers, - schemaRegistryUri: SchemaRegistryUrl - ): F[Unit] = fa.setUp(bootstrapServers, schemaRegistryUri) + schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object] = Map.empty, + ): F[Unit] = fa.setUp(bootstrapServers, schemaRegistryUri, configs) override def name: TopicName = fa.name override def purpose: TopicPurpose = fa.purpose diff --git a/core/src/main/scala/com/banno/kafka/Topical.scala b/core/src/main/scala/com/banno/kafka/Topical.scala index 9eea99ec0..315da9b58 100644 --- a/core/src/main/scala/com/banno/kafka/Topical.scala +++ b/core/src/main/scala/com/banno/kafka/Topical.scala @@ -49,6 +49,7 @@ trait Topical[A, B] { def setUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object] = Map.empty, ): F[Unit] def registerSchemas[F[_]: Sync]( diff --git a/core/src/main/scala/com/banno/kafka/Topics.scala b/core/src/main/scala/com/banno/kafka/Topics.scala index f7c683700..a32db258d 100644 --- a/core/src/main/scala/com/banno/kafka/Topics.scala +++ b/core/src/main/scala/com/banno/kafka/Topics.scala @@ -50,6 +50,7 @@ object Topics { def tailSetUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object], ): F[Unit] def tailRegisterSchemas[F[_]: Sync]( @@ -80,9 +81,10 @@ object Topics { final override def setUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object] = Map.empty, ): F[Unit] = - topic.setUp(bootstrapServers, schemaRegistryUri) *> - tailSetUp(bootstrapServers, schemaRegistryUri) + topic.setUp(bootstrapServers, schemaRegistryUri, configs) *> + tailSetUp(bootstrapServers, schemaRegistryUri, configs) } private final case class SingletonTopics[K, V]( @@ -99,6 +101,7 @@ object Topics { override def tailSetUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, + configs: Map[String, Object], ): F[Unit] = Applicative[F].unit } @@ -125,7 +128,8 @@ object Topics { override def tailSetUp[F[_]: Sync]( bootstrapServers: BootstrapServers, schemaRegistryUri: SchemaRegistryUrl, - ): F[Unit] = tail.setUp(bootstrapServers, schemaRegistryUri) + configs: Map[String, Object], + ): F[Unit] = tail.setUp(bootstrapServers, schemaRegistryUri, configs) } def uncons[K, V, S <: Coproduct, T <: Coproduct]( diff --git a/core/src/main/scala/com/banno/kafka/admin/AdminApi.scala b/core/src/main/scala/com/banno/kafka/admin/AdminApi.scala index 896d6aae2..b3fd4f8d2 100644 --- a/core/src/main/scala/com/banno/kafka/admin/AdminApi.scala +++ b/core/src/main/scala/com/banno/kafka/admin/AdminApi.scala @@ -130,9 +130,12 @@ object AdminApi { def createTopicsIdempotent[F[_]: Sync]( bootstrapServers: String, - topics: Iterable[NewTopic] + topics: Iterable[NewTopic], + configs: Map[String, Object] = Map.empty, ): F[CreateTopicsResult] = - AdminApi.resource[F](BootstrapServers(bootstrapServers)).use(_.createTopicsIdempotent(topics)) + AdminApi.resource[F]( + Map[String, Object](BootstrapServers(bootstrapServers)) ++ configs + ).use(_.createTopicsIdempotent(topics)) def createTopicsIdempotent[F[_]: Sync]( bootstrapServers: String,