-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DOCS] Using KafkaReceiver with Ktor Server (cancellation, and terminal events) #76
Comments
I found a way to make it work. It seems on(MonitoringEvent(ApplicationStarted)) { application ->
// .. code to initialise the receiver
CoroutineScope(Dispatchers.IO).launch {
receiver
.receive("my-topic")
.collect {
application.log.info("${it.key()} -> ${it.value()}")
it.offset.acknowledge()
}
}
} I don't know which is the best way to start a receiver in Ktor. @nomisRev Can you recommend or give us an example how to use it with Ktor? Regards |
Hey @ageorgousakis, Thanks for your interest in the library, and opening a ticket! 🙏 So to provide a small snippet: val flow: Flow<Unit> = receiver
.receive("my-topic")
.map { // <-- changed collect to map, so the result is still a Flow
application.log.info("${it.key()} -> ${it.value()}")
it.offset.acknowledge()
}
val application: Application = TODO("This is the Application from Ktor")
flow.launchIn(application) The Ktor NOTE: I've run into some issues where Ktor doesn't properly cancel on |
Alternatively, you could also use ApplicationEngine#addShutdownHook and use it to manually control a The take-away is the the Hope that helps @ageorgousakis ! |
Hi,
I'm trying to user KafkaReceiver inside Ktor 2. I'm not quite sure what is the best way to start KafkaReciever with Ktor.
What I did was to create an application plugin like the following and install it in Ktor engine.
I seems the receiver starts and collects the messages. The question is how can we stop the receiver when Ktor shutdowns. When I stop the Ktor application it seems stuck and I have to kill the process.
The text was updated successfully, but these errors were encountered: