You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Add a source that will emit a source for each partition in the given topics. This will work the same way loadAndRun does except it will be per partition, leveraging Alpakka Kafka's source per partition
/** * Same as [[TopicLoader.loadAndRun]], but with one stream per partition. * See [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for an * explanation of how the outer Source works.*/defpartitionedLoadAndRun[K:Deserializer, V:Deserializer](
topics: NonEmptyList[String],
)(implicitsystem: ActorSystem):Source[(TopicPartition, Source[ConsumerRecord[K, V], Future[Done]]), Future[Consumer.Control]] =???
The text was updated successfully, but these errors were encountered:
I'm not really sure of the use case for this method. Source per partition supports automatic partition assignment from Kafka, but for example in an application that loads from Kafka to re-create state, why would kafka-topic-loader need to be the one getting the assignment from Kafka? Would it not make more sense for the implementation to use the partitioned source and kafka-topic-loader to just be assigned a partition to read from?
Something like
/** Same as [[TopicLoader.loadAndRun]], but for a specified partitions. See * [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for how to get a partition assignment from Kafka.*/defpartitionedLoadAndRun[K:Deserializer, V:Deserializer](
partitions: NonEmptyList[TopicPartition]
)(implicitsystem: ActorSystem):Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] =???
Add a source that will emit a source for each partition in the given topics. This will work the same way
loadAndRun
does except it will be per partition, leveraging Alpakka Kafka's source per partitionThe text was updated successfully, but these errors were encountered: