Skip to content

Consuming messages

Matias Fontanini edited this page Apr 26, 2017 · 1 revision

Consumption is performed via the Consumer class. This one is a wrapper around librdkafka's high level consumer API. The low level API is not supported on cppkafka.

Every time a consumer is polled for messages, this will return a Message object. These ones are wrappers for a rd_kafka_message_t pointer, so you always need to check if a Message object is valid before using it.

Subscribing

Subscription to topics is very straightforward. The Consumer::subscribe method takes a list of topics you want to subscribe to. Once you've subscribed to a topic, you can start polling the consumer for messages:

// Construct from some config we've defined somewhere
Consumer consumer(config);

// Subscribe to 2 topics
consumer.subscribe({ "topic1", "topic2" });

// Now loop forever polling for messages
while (true) {
    Message msg = consumer.poll();

    // Make sure we have a message before processing it
    if (!msg) {
        continue;
    }

    // Messages can contain error notifications rather than actual data
    if (msg.get_error()) {
        // librdkafka provides an error indicating we've reached the
        // end of a partition every time we do so. Make sure it's not one
        // of those cases, as it's not really an error
        if (!msg.is_eof()) {
            // Handle this somehow...
        }
        continue;
    }

    // We actually have a message. At this point you can check for the
    // message's key and payload via `Message::get_key` and
    // `Message::get_payload`

    cout << "Received message on partition " << msg.get_topic() << "/"
         << msg.get_partition() << ", offset " << msg.get_offset() << endl;
}

Assignment/revocation callbacks

The partition assignment and revocation callbacks take std::functions and will be called every time there's a partition rebalance on any topics your consumer is subscribed to. After calling the partition assignment callback, the Consumer will call Consumer::assign with the list of assigned topics/partitions. If any modification is made to this list inside the assignment callback, then these changes will be taken into account.

Consumer consumer(config);

// Set the partition assignment callback. Note that a TopicPartitionList is 
// just an alias for a std::vector<TopicPartition>. operator<< is implemented
// so you can easily print these to a standard output stream
consumer.set_assignment_callbacl([](const TopicPartitionList& topic_partitions) {
    cout << "Topics/partitions assigned: " << topic_partitions << endl;
});

Message objects

As mentioned above, a Message object is just a wrapper around a librdkafka message pointer. These are non copyable and movable, so you can safely move the pointer into some container and then eventually use it to e.g. commit an offset.

Buffers

The Buffer class is a read only view of a buffer and is used for both a message's key and payload. These can't be copied but you can implicitly convert them to string or use the Buffer::begin and Buffer::end methods to iterate their contents just as you would with a standard container.

Clone this wiki locally