Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.flush() ? #122

Open
orwel1984 opened this issue Oct 19, 2018 · 11 comments
Open

producer.flush() ? #122

orwel1984 opened this issue Oct 19, 2018 · 11 comments

Comments

@orwel1984
Copy link

orwel1984 commented Oct 19, 2018

If you try the simple producer example on your first page without producer.flush():

#include <cppkafka/cppkafka.h>

using namespace std;
using namespace cppkafka;

int main() {
    // Create the config
    Configuration config = {
        { "metadata.broker.list", "127.0.0.1:9092" }
    };

    // Create the producer
    Producer producer(config);

    // Produce a message!
    string message = "hey there!";
    producer.produce(MessageBuilder("my_topic").partition(0).payload(message));
    // producer.flush();
}

And don't do producer.flush() in the end.
Then, Create one consumer and consume one message with group-id 0. And kill it.
Now Create a 2nd consumer with group-id 0 and it will be few minutes before this one gets assigned.

Looks like it's very important to do producer.flush() but you haven't done it in your example codes elsewhere?

@mfontanini
Copy link
Owner

True, there should be some comment regarding flushing the producer. I'll add that now, thanks.

@EvilBeaver
Copy link
Contributor

What about sync and async producing? librdkafka has special topic about this: https://github.com/edenhill/librdkafka/wiki/Sync-producer

How sync producer should be implemented using cppkafka?

@mfontanini
Copy link
Owner

You can use the BufferedProducer and use BufferedProducer::sync_produce. This does a similar thing to what the rdkafka wiki entry talks about under the hood.

Normally I always use this class because it has a smarter flushing mechanism than the Producer, which simply calls rd_kafka_flush.

@orwel1984
Copy link
Author

@mfontanini can you give a small example of how to use BufferedProducer and BufferedProducer::sync_produce() ?

@mfontanini
Copy link
Owner

Did you try reading the documentation? It's pretty self explanatory. See the constructor and BufferedProducer::sync_produce. There's also this example that which uses the buffered producer (although it does async production, but the changes are minimal).

@EvilBeaver
Copy link
Contributor

EvilBeaver commented Oct 29, 2018

It seems that buffered producer is falling into endless loop, when there're connection timeouts.

https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600

Don't know why, but producer_.flush() in this code always times out, which leads to infinite loop. Why flush can return timeouts all the time?

@accelerated
Copy link
Contributor

You can use wait_for_acks(timeout) so you don't block. See here.

@EvilBeaver
Copy link
Contributor

Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.

@mfontanini
Copy link
Owner

Do you keep getting timeouts when flushing even though you shouldn't? I see timeouts when running tests against a local VM. I'm not sure why I get timeouts, given the connection shouldn't fail. I feel like there's some race condition in rdkafka that's triggered when you flush too quickly, probably before the handle connects to the brokers.

@EvilBeaver
Copy link
Contributor

I'm sending only one message via sync_produce into cloudkarafka.com. And it'seems something wrong with my network config or authentification. I expect that sync_produce will fail after some time, but it does not. Loop in https://github.com/mfontanini/cppkafka/blob/master/include/cppkafka/utils/buffered_producer.h#L600 is infinite.

@accelerated
Copy link
Contributor

Yes, probably. But sync_produce doesn't use timeout'ed version of wait_for_acks, and I can't find the way to make it use wait_for_acks with timeout.

sync_produce() = produce() + wait_for_acks() (no timeout here means indefinite)
therefore you can simulate with
sync_produce(timeout) = produce() + wait_for_acks(timeout)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants