Skip to content

Commit

Permalink
feat: consume per partition
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnimhoff committed Mar 8, 2024
1 parent 0b5987a commit 9e15ed8
Show file tree
Hide file tree
Showing 8 changed files with 526 additions and 27 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ stream.consumer.commit(); // Commits all locally stored offsets
You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages:

* *Flowing mode*. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the `unsubscribe` or `disconnect` method.
* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually.
* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. You may choose to read from a specific partition or all at once. [In the docs an example has been included on how to consume from a specific partition](https://blizzard.github.io/node-rdkafka/current/tutorial-consumer-per-partition.html).

The following example illustrates flowing mode:
```js
Expand Down Expand Up @@ -500,6 +500,7 @@ The following table lists important methods for this API.
|`consumer.unsubscribe()` | Unsubscribes from the currently subscribed topics. <br><br>You cannot subscribe to different topics without calling the `unsubscribe()` method first. |
|`consumer.consume(cb)` | Gets messages from the existing subscription as quickly as possible. If `cb` is specified, invokes `cb(err, message)`. <br><br>This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited by `UV_THREADPOOL_SIZE` (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasing `UV_THREADPOOL_SIZE` or using `consumer.consume(number, cb)` instead. |
|`consumer.consume(number, cb)` | Gets `number` of messages from the existing subscription. If `cb` is specified, invokes `cb(err, message)`. |
|`consumer.consume(number, topic, partition, cb)` | Gets `number` of messages from a partition of the given topic. The topic must have a subscription. If `cb` is specified, invokes `cb(err, message)`. |
|`consumer.commit()` | Commits all locally stored offsets |
|`consumer.commit(topicPartition)` | Commits offsets specified by the topic partition |
|`consumer.commitMessage(message)` | Commits the offsets specified by the message |
Expand Down
133 changes: 133 additions & 0 deletions examples/consumer-per-partition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others).

The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect.

```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': 'localhost:9092',
'group.id': 'node-rdkafka-consumer-per-partition-example',
'enable.auto.commit': false,
'rebalance_cb': true,
}, {
'auto.offset.reset': 'earliest', // start from the beginning
});

var topicName = 'test';

// Keep track of which partitions are assigned.
var assignments = [];

//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.on('ready', function(arg) {
console.log('consumer ready: ' + JSON.stringify(arg));

consumer.subscribe([topicName]);

// Remove the default timeout so that we won't wait on each consume
consumer.setDefaultConsumeTimeout(0);

// start a regular consume loop in flowing mode. This won't result in any
// messages because will we start consuming from a partition directly.
// This is required to serve the rebalancing events
consumer.consume();
});

// Start our own consume loops for all newly assigned partitions
consumer.on('rebalance', function(err, updatedAssignments) {
console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) {
return a.partition;
}));

// Normally messages are forwarded to a general queue, which contains messages from all assigned partitions.
// however we want to consume per partitions, for this we need to disable forwarding.
updatedAssignments.forEach(function (assignment) {
consumer.disableQueueForwarding(assignment);
});

// find new assignments
var newAssignments = updatedAssignments.filter(function (updatedAssignment) {
return !assignments.some(function (assignment) {
return assignment.partition === updatedAssignment.partition;
});
});

// update global assignments array
assignments = updatedAssignments;

// then start consume loops for the new assignments
newAssignments.forEach(function (assignment) {
startConsumeMessages(assignment.partition);
});
});

function startConsumeMessages(partition) {
console.log('partition: ' + partition + ' starting to consume');

function consume() {
var isPartitionAssigned = assignments.some(function(assignment) {
return assignment.partition === partition;
});

if (!isPartitionAssigned) {
console.log('partition: ' + partition + ' stop consuming');
return;
}

// consume per 5 messages
consumer.consume(5, topicName, partition, callback);
}

function callback(err, messages) {
messages.forEach(function(message) {
// consume the message
console.log('partition ' + message.partition + ' value ' + message.value.toString());
consumer.commitMessage(message);
});

if (messages.length > 0) {
consumer.commitMessage(messages.pop());
}

// simulate performance
setTimeout(consume, partition === 0 ? 2000 : 500);
}

// kick-off recursive consume loop
consume();
}

consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//starting the consumer
consumer.connect();

//stopping this example after 30s
setTimeout(function() {
consumer.disconnect();
}, 30000);

```
11 changes: 7 additions & 4 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type EventListener<K extends string> = K extends keyof EventListenerMap ? EventL
export abstract class Client<Events extends string> extends EventEmitter {
constructor(globalConf: GlobalConfig, SubClientType: any, topicConf: TopicConfig);

connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;
connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError | null, data: Metadata) => any): this;

getClient(): any;

Expand Down Expand Up @@ -219,8 +219,9 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(number: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(cb: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(): void;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
Expand All @@ -233,10 +234,12 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {

resume(topicPartitions: TopicPartition[]): any;

seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;
seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError | undefined) => void): this;

setDefaultConsumeTimeout(timeoutMs: number): void;

disableQueueForwarding(topicPartition: TopicPartition): this;

setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;

subscribe(topics: SubscribeTopicList): this;
Expand Down
79 changes: 77 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};

KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) {
this._client.disableQueueForwarding(topicPartition);
return this;
};

/**
* Set the default sleep delay for the next consume loop after the previous one has timed out.
* @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
Expand Down Expand Up @@ -361,6 +366,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
};

/**
* Read a number of messages from a specific topic and partition.
*
* Can be useful if the consume performance differs per partition. Consuming
* per partition could prevent slow performance on one partition from affecting
* the consumption of other partitions.
*
* To select the right partition it is required to set a topic param, because a
* consumer can be subscribed to multiple topics.
*
* @param {number} size - Number of messages to read
* @param {string} topic - Name of topic to read
* @param {number} partition - Identifier of partition to read
* @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
*//**
* Read a number of messages from Kafka.
*
* This method is similar to the main one, except that it reads a number
Expand All @@ -384,11 +403,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

if ((number && typeof number === 'number') || (number && cb)) {
if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') {

if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}

this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb);
} else if ((number && typeof number === 'number') || (number && cb)) {

if (cb === undefined) {
cb = function() {};
Expand Down Expand Up @@ -499,6 +527,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {

};

/**
* Consume a number of messages from a specific topic and partition
* Wrapped in a try catch with proper error reporting. Should not be
* called directly, and instead should be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb) {
var self = this;

this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
cb(err);
}
return;
}

var currentEofEventsIndex = 0;

function emitEofEventsFor(messageIndex) {
while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
delete eofEvents[currentEofEventsIndex].messageIndex;
self.emit('partition.eof', eofEvents[currentEofEventsIndex])
++currentEofEventsIndex;
}
}

emitEofEventsFor(-1);

for (var i = 0; i < messages.length; i++) {
self.emit('data', messages[i]);
emitEofEventsFor(i);
}

emitEofEventsFor(messages.length);

if (cb) {
cb(null, messages);
}

});

};

/**
* This callback returns the message read from Kafka.
*
Expand Down
Loading

0 comments on commit 9e15ed8

Please sign in to comment.