Skip to content

Commit

Permalink
Merge pull request #471 from Banno/setupauth
Browse files Browse the repository at this point in the history
feat: enable plumbing configurations into `setUp`
  • Loading branch information
Kazark authored Aug 18, 2021
2 parents d556345 + 9f5f445 commit ee69768
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/com/banno/kafka/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@ object Topic {
override def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] =
for {
_ <- AdminApi.createTopicsIdempotent(
bootstrapServers.bs,
config
List(config),
configs
)
_ <- registerSchemas(schemaRegistryUri)
_ <- registerSchemas(schemaRegistryUri, configs)
} yield ()
}

Expand Down Expand Up @@ -165,8 +167,9 @@ object Topic {

override def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl
): F[Unit] = fa.setUp(bootstrapServers, schemaRegistryUri)
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] = fa.setUp(bootstrapServers, schemaRegistryUri, configs)

override def name: TopicName = fa.name
override def purpose: TopicPurpose = fa.purpose
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/com/banno/kafka/Topical.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ trait Topical[A, B] {
def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit]

def registerSchemas[F[_]: Sync](
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/com/banno/kafka/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object Topics {
def tailSetUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
): F[Unit]

def tailRegisterSchemas[F[_]: Sync](
Expand Down Expand Up @@ -80,9 +81,10 @@ object Topics {
final override def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] =
topic.setUp(bootstrapServers, schemaRegistryUri) *>
tailSetUp(bootstrapServers, schemaRegistryUri)
topic.setUp(bootstrapServers, schemaRegistryUri, configs) *>
tailSetUp(bootstrapServers, schemaRegistryUri, configs)
}

private final case class SingletonTopics[K, V](
Expand All @@ -99,6 +101,7 @@ object Topics {
override def tailSetUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
): F[Unit] = Applicative[F].unit
}

Expand All @@ -125,7 +128,8 @@ object Topics {
override def tailSetUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
): F[Unit] = tail.setUp(bootstrapServers, schemaRegistryUri)
configs: Map[String, Object],
): F[Unit] = tail.setUp(bootstrapServers, schemaRegistryUri, configs)
}

def uncons[K, V, S <: Coproduct, T <: Coproduct](
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/com/banno/kafka/admin/AdminApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,12 @@ object AdminApi {

def createTopicsIdempotent[F[_]: Sync](
bootstrapServers: String,
topics: Iterable[NewTopic]
topics: Iterable[NewTopic],
configs: Map[String, Object] = Map.empty,
): F[CreateTopicsResult] =
AdminApi.resource[F](BootstrapServers(bootstrapServers)).use(_.createTopicsIdempotent(topics))
AdminApi.resource[F](
Map[String, Object](BootstrapServers(bootstrapServers)) ++ configs
).use(_.createTopicsIdempotent(topics))

def createTopicsIdempotent[F[_]: Sync](
bootstrapServers: String,
Expand Down

0 comments on commit ee69768

Please sign in to comment.