-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
producer.flush() ? #122
Comments
True, there should be some comment regarding flushing the producer. I'll add that now, thanks. |
What about sync and async producing? librdkafka has special topic about this: https://github.com/edenhill/librdkafka/wiki/Sync-producer How sync producer should be implemented using cppkafka? |
You can use the Normally I always use this class because it has a smarter flushing mechanism than the |
@mfontanini can you give a small example of how to use BufferedProducer and BufferedProducer::sync_produce() ? |
Did you try reading the documentation? It's pretty self explanatory. See the constructor and BufferedProducer::sync_produce. There's also this example that which uses the buffered producer (although it does async production, but the changes are minimal). |
It seems that buffered producer is falling into endless loop, when there're connection timeouts. https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600 Don't know why, but producer_.flush() in this code always times out, which leads to infinite loop. Why flush can return timeouts all the time? |
You can use |
Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout. |
Do you keep getting timeouts when flushing even though you shouldn't? I see timeouts when running tests against a local VM. I'm not sure why I get timeouts, given the connection shouldn't fail. I feel like there's some race condition in rdkafka that's triggered when you flush too quickly, probably before the handle connects to the brokers. |
I'm sending only one message via sync_produce into cloudkarafka.com. And it'seems something wrong with my network config or authentification. I expect that sync_produce will fail after some time, but it does not. Loop in https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600 is infinite. |
|
If you try the simple producer example on your first page without producer.flush():
And don't do producer.flush() in the end.
Then, Create one consumer and consume one message with group-id 0. And kill it.
Now Create a 2nd consumer with group-id 0 and it will be few minutes before this one gets assigned.
Looks like it's very important to do producer.flush() but you haven't done it in your example codes elsewhere?
The text was updated successfully, but these errors were encountered: