Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned load and run source #32

Open
lacarvalho91 opened this issue May 21, 2019 · 1 comment
Open

Add partitioned load and run source #32

lacarvalho91 opened this issue May 21, 2019 · 1 comment

Comments

@lacarvalho91
Copy link
Contributor

lacarvalho91 commented May 21, 2019

Add a source that will emit a source for each partition in the given topics. This will work the same way loadAndRun does except it will be per partition, leveraging Alpakka Kafka's source per partition

   /**
    * Same as [[TopicLoader.loadAndRun]], but with one stream per partition.
    * See [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for an
    * explanation of how the outer Source works.
    */
  def partitionedLoadAndRun[K : Deserializer, V : Deserializer](
      topics: NonEmptyList[String],
  )(implicit system: ActorSystem): Source[(TopicPartition, Source[ConsumerRecord[K, V], Future[Done]]), Future[Consumer.Control]] = ???
@bcarter97
Copy link
Member

bcarter97 commented Jun 3, 2022

I'm not really sure of the use case for this method. Source per partition supports automatic partition assignment from Kafka, but for example in an application that loads from Kafka to re-create state, why would kafka-topic-loader need to be the one getting the assignment from Kafka? Would it not make more sense for the implementation to use the partitioned source and kafka-topic-loader to just be assigned a partition to read from?

Something like

  /** Same as [[TopicLoader.loadAndRun]], but for a specified partitions. See
    * [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for how to get a partition assignment from Kafka.
    */
  def partitionedLoadAndRun[K : Deserializer, V : Deserializer](
      partitions: NonEmptyList[TopicPartition]
  )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = ???

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants