Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example with Akka stream Kafka for consumer #92

Open
mavencode01 opened this issue Mar 20, 2018 · 3 comments
Open

Example with Akka stream Kafka for consumer #92

mavencode01 opened this issue Mar 20, 2018 · 3 comments

Comments

@mavencode01
Copy link

mavencode01 commented Mar 20, 2018

I'm wondering how this might work with Akka stream?
Any example to show how to consume large messages using Akka streams Kafka ?

@becketqin
Copy link
Contributor

I don't have an example of using this in Akka stream. What do you mean by Akka streams Kafka?

@mavencode01
Copy link
Author

Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.

yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html

@Roiocam
Copy link

Roiocam commented Jun 1, 2023

Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.好的,不幸的是我无法使用 Akka 流来处理大消息。它需要消费者正确反序列化消息。

yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html是的,我的意思是 Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html

work fine with the below code.

the most important code is: .withConsumerFactory(e -> factory.createConsumer(e.getProperties()));

replace ConsumerFactory with linkedin kafka client consumer factory.

ConsumerSettings settings =
        ConsumerSettings.create(
                        config, new StringDeserializer(), new ByteArrayDeserializer())
                .withBootstrapServers(mqAddress)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                .withProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
                .withProperties(this.properties)
                .withProperty(
                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        StringDeserializer.class.getName())
                .withProperty(
                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        ByteArrayDeserializer.class.getName())
                .withConsumerFactory(e -> factory.createConsumer(e.getProperties()));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants