Skip to content

Commit

Permalink
fix: Pass through optional consumer settings when checking for offsets (
Browse files Browse the repository at this point in the history
#196)

Co-authored-by: Soham <[email protected]>
Co-authored-by: Soham <[email protected]>
Co-authored-by: Pedro Mangabeira <[email protected]>
  • Loading branch information
4 people authored Apr 28, 2023
1 parent 08373d5 commit 8fcd133
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
19 changes: 11 additions & 8 deletions src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait TopicLoader extends LazyLogging {
Config
.loadOrThrow(system.settings.config)
.topicLoader
load(logOffsetsForTopics(topics, strategy, config), config, maybeConsumerSettings)
load(logOffsetsForTopics(topics, strategy, config, maybeConsumerSettings), config, maybeConsumerSettings)
}

/** Source that loads the specified topics from the beginning. When the latest current offsets are reached, the
Expand All @@ -91,7 +91,7 @@ trait TopicLoader extends LazyLogging {
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = {
val config = Config.loadOrThrow(system.settings.config).topicLoader
val logOffsetsF = logOffsetsForTopics(topics, LoadAll, config)
val logOffsetsF = logOffsetsForTopics(topics, LoadAll, config, maybeConsumerSettings)
val postLoadingSource = Source.futureSource(logOffsetsF.map { logOffsets =>
val highestOffsets = logOffsets.map { case (p, o) => p -> o.highest }
kafkaSource[K, V](highestOffsets, config, maybeConsumerSettings)
Expand All @@ -105,16 +105,18 @@ trait TopicLoader extends LazyLogging {
protected def logOffsetsForPartitions(
topicPartitions: NonEmptyList[TopicPartition],
strategy: LoadTopicStrategy,
config: TopicLoaderConfig
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
)(implicit
system: ActorSystem
): Future[Map[TopicPartition, LogOffsets]] =
fetchLogOffsets(_ => topicPartitions.toList, strategy, config)
fetchLogOffsets(_ => topicPartitions.toList, strategy, config, maybeConsumerSettings)

protected def logOffsetsForTopics(
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
config: TopicLoaderConfig
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
)(implicit
system: ActorSystem
): Future[Map[TopicPartition, LogOffsets]] = {
Expand All @@ -123,13 +125,14 @@ trait TopicLoader extends LazyLogging {
t <- topics.toList
p <- c.partitionsFor(t).asScala
} yield new TopicPartition(t, p.partition)
fetchLogOffsets(partitionsFromTopics, strategy, config)
fetchLogOffsets(partitionsFromTopics, strategy, config, maybeConsumerSettings)
}

private def fetchLogOffsets(
f: Consumer[Array[Byte], Array[Byte]] => List[TopicPartition],
strategy: LoadTopicStrategy,
config: TopicLoaderConfig
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
)(implicit system: ActorSystem): Future[Map[TopicPartition, LogOffsets]] = {
def earliestOffsets(
consumer: Consumer[Array[Byte], Array[Byte]],
Expand All @@ -142,7 +145,7 @@ trait TopicLoader extends LazyLogging {
import system.dispatcher

Future {
withStandaloneConsumer(consumerSettings(None, config)) { c =>
withStandaloneConsumer(consumerSettings(maybeConsumerSettings, config)) { c =>
val offsets = offsetsFrom(f(c)) _
val beginningOffsets = offsets(c.beginningOffsets)
val endOffsets = strategy match {
Expand Down
6 changes: 4 additions & 2 deletions src/test/scala/base/IntegrationSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ abstract class IntegrationSpecBase extends UnitSpecBase {
val testTopic2 = "load-state-topic-2"
val testTopicPartitions = 5

def createCustomTopics(topics: NonEmptyList[String], partitions: Int = testTopicPartitions): Unit =
topics.toList.foreach(createCustomTopic(_, partitions = partitions))
def createCustomTopics(topics: NonEmptyList[String], partitions: Int = testTopicPartitions)(implicit
config: EmbeddedKafkaConfig
): Unit =
topics.toList.foreach(createCustomTopic(_, partitions = partitions)(config))

/*
* Note: Compaction is only triggered if messages are published as a separate statement.
Expand Down
38 changes: 38 additions & 0 deletions src/test/scala/integration/TopicLoaderIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package integration
import java.util.concurrent.TimeoutException as JavaTimeoutException

import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.testkit.scaladsl.TestSink
import base.IntegrationSpecBase
import cats.data.NonEmptyList
import com.typesafe.config.ConfigFactory
import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringSerializer}
import org.scalatest.prop.TableDrivenPropertyChecks.*
import org.scalatest.prop.Tables.Table
import uk.sky.kafka.topicloader.*
import utils.RandomPort

import scala.concurrent.Future

Expand Down Expand Up @@ -145,6 +149,40 @@ class TopicLoaderIntSpec extends IntegrationSpecBase {
}
}

"Kafka consumer settings" should {

"Override default settings and run two topic loaders on different cluster" in new TestContext {
val topics = NonEmptyList.of(testTopic1, testTopic2)
val (forTopic1, forTopic2) = records(1 to 15).splitAt(10)
val stringSerializer = new StringSerializer

val secondKafkaConfig = EmbeddedKafkaConfig(RandomPort(), RandomPort())
val consumerSettings = ConsumerSettings
.create(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(s"localhost:${secondKafkaConfig.kafkaPort}")

EmbeddedKafka.start()(secondKafkaConfig)

withRunningKafka {

createCustomTopics(topics)
createCustomTopics(topics)(secondKafkaConfig)

publishToKafka(testTopic1, forTopic1)(secondKafkaConfig, stringSerializer, stringSerializer)
publishToKafka(testTopic2, forTopic2)(secondKafkaConfig, stringSerializer, stringSerializer)

val loadedRecords = TopicLoader.load[String, String](topics, LoadAll).runWith(Sink.seq).futureValue
val secondLoadedRecords =
TopicLoader.load[String, String](topics, LoadAll, Some(consumerSettings)).runWith(Sink.seq).futureValue

loadedRecords.map(recordToTuple) shouldBe empty
secondLoadedRecords.map(recordToTuple) should contain theSameElementsAs (forTopic1 ++ forTopic2)
}

EmbeddedKafka.stop()
}
}

"Kafka is misbehaving" should {

"fail if unavailable at startup" in new TestContext {
Expand Down

0 comments on commit 8fcd133

Please sign in to comment.