-
Notifications
You must be signed in to change notification settings - Fork 215
Consuming messages
Consumption is performed via the Consumer
class. This one is a wrapper around librdkafka's high level consumer API. The low level API is not supported on cppkafka.
Every time a consumer is polled for messages, this will return a Message
object. These ones are wrappers for a rd_kafka_message_t
pointer, so you always need to check if a Message
object is valid before using it.
Subscription to topics is very straightforward. The Consumer::subscribe
method takes a list of topics you want to subscribe to. Once you've subscribed to a topic, you can start polling the consumer for messages:
// Construct from some config we've defined somewhere
Consumer consumer(config);
// Subscribe to 2 topics
consumer.subscribe({ "topic1", "topic2" });
// Now loop forever polling for messages
while (true) {
Message msg = consumer.poll();
// Make sure we have a message before processing it
if (!msg) {
continue;
}
// Messages can contain error notifications rather than actual data
if (msg.get_error()) {
// librdkafka provides an error indicating we've reached the
// end of a partition every time we do so. Make sure it's not one
// of those cases, as it's not really an error
if (!msg.is_eof()) {
// Handle this somehow...
}
continue;
}
// We actually have a message. At this point you can check for the
// message's key and payload via `Message::get_key` and
// `Message::get_payload`
cout << "Received message on partition " << msg.get_topic() << "/"
<< msg.get_partition() << ", offset " << msg.get_offset() << endl;
}
The partition assignment and revocation callbacks take std::function
s and will be called every time there's a partition rebalance on any topics your consumer is subscribed to. After calling the partition assignment callback, the Consumer
will call Consumer::assign
with the list of assigned topics/partitions. If any modification is made to this list inside the assignment callback, then these changes will be taken into account.
Consumer consumer(config);
// Set the partition assignment callback. Note that a TopicPartitionList is
// just an alias for a std::vector<TopicPartition>. operator<< is implemented
// so you can easily print these to a standard output stream
consumer.set_assignment_callbacl([](const TopicPartitionList& topic_partitions) {
cout << "Topics/partitions assigned: " << topic_partitions << endl;
});
As mentioned above, a Message
object is just a wrapper around a librdkafka message pointer. These are non copyable and movable, so you can safely move the pointer into some container and then eventually use it to e.g. commit an offset.
The Buffer
class is a read only view of a buffer and is used for both a message's key and payload. These can't be copied but you can implicitly convert them to string or use the Buffer::begin
and Buffer::end
methods to iterate their contents just as you would with a standard container.