Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consume from specific partition #1101

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ stream.consumer.commit(); // Commits all locally stored offsets
You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages:

* *Flowing mode*. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the `unsubscribe` or `disconnect` method.
* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually.
* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. You may choose to read from a specific partition or all at once. [In the docs an example has been included on how to consume from a specific partition](https://blizzard.github.io/node-rdkafka/current/tutorial-consumer-per-partition.html).

The following example illustrates flowing mode:
```js
Expand Down Expand Up @@ -500,6 +500,7 @@ The following table lists important methods for this API.
|`consumer.unsubscribe()` | Unsubscribes from the currently subscribed topics. <br><br>You cannot subscribe to different topics without calling the `unsubscribe()` method first. |
|`consumer.consume(cb)` | Gets messages from the existing subscription as quickly as possible. If `cb` is specified, invokes `cb(err, message)`. <br><br>This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited by `UV_THREADPOOL_SIZE` (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasing `UV_THREADPOOL_SIZE` or using `consumer.consume(number, cb)` instead. |
|`consumer.consume(number, cb)` | Gets `number` of messages from the existing subscription. If `cb` is specified, invokes `cb(err, message)`. |
|`consumer.consume(number, topic, partition, cb)` | Gets `number` of messages from a partition of the given topic. The topic must have a subscription. If `cb` is specified, invokes `cb(err, message)`.
|`consumer.commit()` | Commits all locally stored offsets |
|`consumer.commit(topicPartition)` | Commits offsets specified by the topic partition |
|`consumer.commitMessage(message)` | Commits the offsets specified by the message |
Expand Down
130 changes: 130 additions & 0 deletions examples/consumer-per-partition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others).

The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect.

```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': 'localhost:9092',
'group.id': 'test-group-' + Math.random(),
'enable.auto.commit': false,
'rebalance_cb': true,
}, {
'auto.offset.reset': 'earliest', // start from the beginning
});

var topicName = 'test';

// Keep track of which partitions are assigned.
var assignments = [];

//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.on('ready', function(arg) {
console.log('consumer ready: ' + JSON.stringify(arg));

consumer.subscribe([topicName]);

// start a regular consume loop in flowing mode. This won't result in any
// messages because will we start consuming from a partition directly.
// This is required to serve the rebalancing events
consumer.consume();
});

// Start our own consume loops for all newly assigned partitions
consumer.on('rebalance', function(err, updatedAssignments) {
console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) {
return a.partition;
}));

// Normally messages are forwarded to a general queue, which contains messages from all assigned partitions.
// however we want to consume per partitions, for this we need to disable forwarding.
updatedAssignments.forEach(function (assignment) {
consumer.disableQueueForwarding(assignment);
});

// find new assignments
var newAssignments = updatedAssignments.filter(function (updatedAssignment) {
return !assignments.some(function (assignment) {
return assignment.partition === updatedAssignment.partition;
});
});

// update global assignments array
assignments = updatedAssignments;

// then start consume loops for the new assignments
newAssignments.forEach(function (assignment) {
startConsumeMessages(assignment.partition);
});
});

function startConsumeMessages(partition) {
console.log('partition: ' + partition + ' starting to consume');

function consume() {
var isPartitionAssigned = assignments.some(function(assignment) {
return assignment.partition === partition;
});

if (!isPartitionAssigned) {
console.log('partition: ' + partition + ' stop consuming');
return;
}

// consume per 5 messages
consumer.consume(5, topicName, partition, callback);
}

function callback(err, messages) {
messages.forEach(function(message) {
// consume the message
console.log('partition ' + message.partition + ' value ' + message.value.toString());
consumer.commitMessage(message);
});

if (messages.length > 0) {
consumer.commitMessage(messages.pop());
}

// simulate performance
setTimeout(consume, partition === 0 ? 2000 : 500);
}

// kick-off recursive consume loop
consume();
}

consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//starting the consumer
consumer.connect();

//stopping this example after 30s
setTimeout(function() {
consumer.disconnect();
}, 30000);

```
7 changes: 5 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(number: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(cb: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(): void;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
Expand All @@ -239,6 +240,8 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {

seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;

disableQueueForwarding(topicPartition: TopicPartition): this;

setDefaultConsumeTimeout(timeoutMs: number): void;

setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;
Expand Down
2 changes: 1 addition & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ function LibrdKafkaError(e) {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else if (!util.isError(e)) {
} else if (!(Object.prototype.toString(e) === "[object Error]" || e instanceof Error)) {
// This is the better way
this.message = e.message;
this.code = e.code;
Expand Down
81 changes: 79 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};

KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) {
this._client.disableQueueForwarding(topicPartition);
return this;
};

/**
* Set the default sleep delay for the next consume loop after the previous one has timed out.
* @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
Expand Down Expand Up @@ -361,6 +366,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
};

/**
* Read a number of messages from a specific topic and partition.
*
* Can be useful if the consume performance differs per partition. Consuming
* per partition could prevent slow performance on one partition from affecting
* the consumption of other partitions.
*
* To select the right partition it is required to set a topic param, because a
* consumer can be subscribed to multiple topics.
*
* @param {number} size - Number of messages to read
* @param {string} topic - Name of topic to read
* @param {number} partition - Identifier of partition to read
* @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
*//**
* Read a number of messages from Kafka.
*
* This method is similar to the main one, except that it reads a number
Expand All @@ -384,11 +403,22 @@ KafkaConsumer.prototype.unsubscribe = function() {
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

if ((number && typeof number === 'number') || (number && cb)) {
if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') {

if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}

this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb);
} else if ((number && typeof number === 'number') || (number && topic)) {
// topic is given as the cb
cb = topic;

if (cb === undefined) {
cb = function() {};
Expand Down Expand Up @@ -499,6 +529,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {

};

/**
* Consume a number of messages from a specific topic and partition
* Wrapped in a try catch with proper error reporting. Should not be
* called directly, and instead should be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb) {
var self = this;

this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
cb(err);
}
return;
}

var currentEofEventsIndex = 0;

function emitEofEventsFor(messageIndex) {
while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
delete eofEvents[currentEofEventsIndex].messageIndex;
self.emit('partition.eof', eofEvents[currentEofEventsIndex])
++currentEofEventsIndex;
}
}

emitEofEventsFor(-1);

for (var i = 0; i < messages.length; i++) {
self.emit('data', messages[i]);
emitEofEventsFor(i);
}

emitEofEventsFor(messages.length);

if (cb) {
cb(null, messages);
}

});

};

/**
* This callback returns the message read from Kafka.
*
Expand Down
Loading
Loading