From 7bb054ddde6fed254ce24c20c780c52167763546 Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 15:44:21 +0100 Subject: [PATCH 1/5] feat: consume from specific partition --- README.md | 1 + examples/consumer-per-partition.md | 130 +++++++++++++++++++++ index.d.ts | 7 +- lib/error.js | 2 +- lib/kafka-consumer.js | 80 ++++++++++++- src/kafka-consumer.h | 3 + src/workers.cc | 182 +++++++++++++++++++++++++++++ src/workers.h | 19 +++ 8 files changed, 419 insertions(+), 5 deletions(-) create mode 100644 examples/consumer-per-partition.md diff --git a/README.md b/README.md index b02cd84b..73bee23a 100644 --- a/README.md +++ b/README.md @@ -500,6 +500,7 @@ The following table lists important methods for this API. |`consumer.unsubscribe()` | Unsubscribes from the currently subscribed topics.

You cannot subscribe to different topics without calling the `unsubscribe()` method first. | |`consumer.consume(cb)` | Gets messages from the existing subscription as quickly as possible. If `cb` is specified, invokes `cb(err, message)`.

This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited by `UV_THREADPOOL_SIZE` (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasing `UV_THREADPOOL_SIZE` or using `consumer.consume(number, cb)` instead. | |`consumer.consume(number, cb)` | Gets `number` of messages from the existing subscription. If `cb` is specified, invokes `cb(err, message)`. | +|`consumer.consume(number, topic, partition, cb)` | Gets `number` of messages from a partition of the given topic. The topic must have a subscription. If `cb` is specified, invokes `cb(err, message)`. |`consumer.commit()` | Commits all locally stored offsets | |`consumer.commit(topicPartition)` | Commits offsets specified by the topic partition | |`consumer.commitMessage(message)` | Commits the offsets specified by the message | diff --git a/examples/consumer-per-partition.md b/examples/consumer-per-partition.md new file mode 100644 index 00000000..6719f7c1 --- /dev/null +++ b/examples/consumer-per-partition.md @@ -0,0 +1,130 @@ +A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others). + +The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect. + +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +var Kafka = require('../'); + +var consumer = new Kafka.KafkaConsumer({ + //'debug': 'all', + 'metadata.broker.list': 'localhost:9092', + 'group.id': 'test-group-' + Math.random(), + 'enable.auto.commit': false, + 'rebalance_cb': true, +}, { + 'auto.offset.reset': 'earliest', // start from the beginning +}); + +var topicName = 'test'; + +// Keep track of which partitions are assigned. +var assignments = []; + +//logging debug messages, if debug is enabled +consumer.on('event.log', function(log) { + console.log(log); +}); + +//logging all errors +consumer.on('event.error', function(err) { + console.error('Error from consumer'); + console.error(err); +}); + +consumer.on('ready', function(arg) { + console.log('consumer ready: ' + JSON.stringify(arg)); + + consumer.subscribe([topicName]); + + // start a regular consume loop in flowing mode. This won't result in any + // messages because will we start consuming from a partition directly. + // This is required to serve the rebalancing events + consumer.consume(); +}); + +// Start our own consume loops for all newly assigned partitions +consumer.on('rebalance', function(err, updatedAssignments) { + console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) { + return a.partition; + })); + + // Normally messages are forwarded to a general queue, which contains messages from all assigned partitions. + // however we want to consume per partitions, for this we need to disable forwarding. + updatedAssignments.forEach(function (assignment) { + consumer.disableQueueForwarding(assignment); + }); + + // find new assignments + var newAssignments = updatedAssignments.filter(function (updatedAssignment) { + return !assignments.some(function (assignment) { + return assignment.partition === updatedAssignment.partition; + }); + }); + + // update global assignments array + assignments = updatedAssignments; + + // then start consume loops for the new assignments + newAssignments.forEach(function (assignment) { + startConsumeMessages(assignment.partition); + }); +}); + +function startConsumeMessages(partition) { + console.log('partition: ' + partition + ' starting to consume'); + + function consume() { + var isPartitionAssigned = assignments.some(function(assignment) { + return assignment.partition === partition; + }); + + if (!isPartitionAssigned) { + console.log('partition: ' + partition + ' stop consuming'); + return; + } + + // consume per 5 messages + consumer.consume(5, topicName, partition, callback); + } + + function callback(err, messages) { + messages.forEach(function(message) { + // consume the message + console.log('partition ' + message.partition + ' value ' + message.value.toString()); + consumer.commitMessage(message); + }); + + if (messages.length > 0) { + consumer.commitMessage(messages.pop()); + } + + // simulate performance + setTimeout(consume, partition === 0 ? 2000 : 500); + } + + // kick-off recursive consume loop + consume(); +} + +consumer.on('disconnected', function(arg) { + console.log('consumer disconnected. ' + JSON.stringify(arg)); +}); + +//starting the consumer +consumer.connect(); + +//stopping this example after 30s +setTimeout(function() { + consumer.disconnect(); +}, 30000); + +``` \ No newline at end of file diff --git a/index.d.ts b/index.d.ts index 43bedfc9..88bf43ca 100644 --- a/index.d.ts +++ b/index.d.ts @@ -223,8 +223,9 @@ export class KafkaConsumer extends Client { committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this; committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this; - consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void; - consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; + consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; + consume(number: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; + consume(cb: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void; consume(): void; getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; @@ -239,6 +240,8 @@ export class KafkaConsumer extends Client { seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this; + disableQueueForwarding(topicPartition: TopicPartition): this; + setDefaultConsumeTimeout(timeoutMs: number): void; setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void; diff --git a/lib/error.js b/lib/error.js index 1795dbfe..add99cbb 100644 --- a/lib/error.js +++ b/lib/error.js @@ -405,7 +405,7 @@ function LibrdKafkaError(e) { this.origin = 'kafka'; } Error.captureStackTrace(this, this.constructor); - } else if (!util.isError(e)) { + } else if (!(Object.prototype.toString(e) === "[object Error]" || e instanceof Error)) { // This is the better way this.message = e.message; this.code = e.code; diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f..ab38396f 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -151,6 +151,11 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs) this._consumeLoopTimeoutDelay = intervalMs; }; +KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { + this._client.disableQueueForwarding(topicPartition); + return this; +}; + /** * Get a stream representation of this KafkaConsumer * @@ -361,6 +366,20 @@ KafkaConsumer.prototype.unsubscribe = function() { }; /** + * Read a number of messages from a specific topic and partition. + * + * Can be useful if the consume performance differs per partition. Consuming + * per partition could prevent slow performance on one partition from affecting + * the consumption of other partitions. + * + * To select the right partition it is required to set a topic param, because a + * consumer can be subscribed to multiple topics. + * + * @param {number} size - Number of messages to read + * @param {string} topic - Name of topic to read + * @param {number} partition - Identifier of partition to read + * @param {KafkaConsumer~readCallback} cb - Callback to return when work is done. + *//** * Read a number of messages from Kafka. * * This method is similar to the main one, except that it reads a number @@ -384,12 +403,22 @@ KafkaConsumer.prototype.unsubscribe = function() { * @param {KafkaConsumer~readCallback} cb - Callback to return when a message * is fetched. */ -KafkaConsumer.prototype.consume = function(number, cb) { + KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; var self = this; - if ((number && typeof number === 'number') || (number && cb)) { + if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') { + + if (cb === undefined) { + cb = function() {}; + } else if (typeof cb !== 'function') { + throw new TypeError('Callback must be a function'); + } + this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb); + } else if ((number && typeof number === 'number') || (number && topic)) { + // topic is given as the cb + cb = topic; if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { @@ -499,6 +528,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { }; +/** + * Consume a number of messages from a specific topic and partition + * Wrapped in a try catch with proper error reporting. Should not be + * called directly, and instead should be called using consume. + * + * @private + * @see consume + */ +KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb, onlyApplyTimeoutToFirstMessage) { + var self = this; + + this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) { + if (err) { + err = LibrdKafkaError.create(err); + if (cb) { + cb(err); + } + return; + } + + var currentEofEventsIndex = 0; + + function emitEofEventsFor(messageIndex) { + while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) { + delete eofEvents[currentEofEventsIndex].messageIndex; + self.emit('partition.eof', eofEvents[currentEofEventsIndex]) + ++currentEofEventsIndex; + } + } + + emitEofEventsFor(-1); + + for (var i = 0; i < messages.length; i++) { + self.emit('data', messages[i]); + emitEofEventsFor(i); + } + + emitEofEventsFor(messages.length); + + if (cb) { + cb(null, messages); + } + + }, onlyApplyTimeoutToFirstMessage); + +}; + /** * This callback returns the message read from Kafka. * diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index c91590ec..c26d96f6 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -73,6 +73,8 @@ class KafkaConsumer : public Connection { Baton Assign(std::vector); Baton Unassign(); + + Baton DisableQueueForwarding(RdKafka::TopicPartition*); Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); @@ -107,6 +109,7 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeAssign); static NAN_METHOD(NodeUnassign); static NAN_METHOD(NodeAssignments); + static NAN_METHOD(NodeDisableQueueForwarding); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); static NAN_METHOD(NodeCommitSync); diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..2dbaf821 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -783,6 +783,188 @@ void KafkaConsumerConsumeLoop::HandleErrorCallback() { callback->Call(argc, argv); } +/** + * @brief KafkaConsumer get messages per partition worker. + * + * This callback will get a number of messages from a specific partition. + * Can be of use in streams or places where you don't want an infinite + * loop managed in C++land and would rather manage it in Node. + * + * @see RdKafka::KafkaConsumer::Consume + * @see NodeKafka::KafkaConsumer::GetMessage + */ + +KafkaConsumerConsumeNumOfPartition::KafkaConsumerConsumeNumOfPartition(Nan::Callback *callback, + KafkaConsumer* consumer, + const uint32_t & num_messages, + const std::string topic, + const uint32_t & partition, + const int & timeout_ms, + const bool only_apply_timeout_to_first_message) : + ErrorAwareWorker(callback), + m_consumer(consumer), + m_num_messages(num_messages), + m_topic(topic), + m_partition(partition), + m_timeout_ms(timeout_ms), + m_only_apply_timeout_to_first_message(only_apply_timeout_to_first_message) {} + +KafkaConsumerConsumeNumOfPartition::~KafkaConsumerConsumeNumOfPartition() {} + +void KafkaConsumerConsumeNumOfPartition::Execute() { + std::size_t max = static_cast(m_num_messages); + bool looping = true; + int timeout_ms = m_timeout_ms; + std::size_t eof_event_count = 0; + + // Disable forwarding for own partition + RdKafka::TopicPartition *topicPartition = RdKafka::TopicPartition::create( + m_topic, m_partition); + + if (topicPartition == NULL) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "TopicPartition not found.")); + + return; + } + + RdKafka::Queue *queue = m_consumer->GetClient()->get_partition_queue( + topicPartition); + + if (queue == NULL) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "TopicPartition has an invalid queue.")); + delete topicPartition; + return; + } + + while (m_messages.size() - eof_event_count < max && looping) { + if (!m_consumer->IsConnected()) { + if (m_messages.size() == eof_event_count) { + SetErrorBaton(Baton(RdKafka::ERR__STATE, + "KafkaConsumer is not connected")); + } + looping = false; + continue; + } + + // Get a message + RdKafka::Message *message = queue->consume(timeout_ms); + RdKafka::ErrorCode errorCode = message->err(); + + // If true, do not wait after the first message. This will cause to consume + // only what has also been fetched and then return immediately + if (m_only_apply_timeout_to_first_message) { + timeout_ms = 1; + } + + switch (errorCode) { + case RdKafka::ERR__PARTITION_EOF: + // If partition EOF and have consumed messages, retry with timeout 1 + // This allows getting ready messages, while not waiting for new ones + if (m_messages.size() > eof_event_count) { + timeout_ms = 1; + } + + // We will only go into this code path when `enable.partition.eof` is + // set to true. In this case, consumer is also interested in EOF + // messages, so we return an EOF message + m_messages.push_back(message); + eof_event_count += 1; + break; + case RdKafka::ERR__TIMED_OUT: + case RdKafka::ERR__TIMED_OUT_QUEUE: + // Break of the loop if we timed out + delete message; + looping = false; + break; + case RdKafka::ERR_NO_ERROR: + m_messages.push_back(message); + break; + default: + // Set the error for any other errors and break + delete message; + if (m_messages.size() == eof_event_count) { + SetErrorBaton(Baton(errorCode)); + } + looping = false; + break; + } + } + + delete queue; + delete topicPartition; +} + +void KafkaConsumerConsumeNumOfPartition::HandleOKCallback() { + Nan::HandleScope scope; + const unsigned int argc = 3; + v8::Local argv[argc]; + argv[0] = Nan::Null(); + + v8::Local returnArray = Nan::New(); + v8::Local eofEventsArray = Nan::New(); + + if (m_messages.size() > 0) { + int returnArrayIndex = -1; + int eofEventsArrayIndex = -1; + for (std::vector::iterator it = m_messages.begin(); + it != m_messages.end(); ++it) { + RdKafka::Message* message = *it; + + switch (message->err()) { + case RdKafka::ERR_NO_ERROR: + ++returnArrayIndex; + Nan::Set(returnArray, returnArrayIndex, Conversion::Message::ToV8Object(message)); + break; + case RdKafka::ERR__PARTITION_EOF: + ++eofEventsArrayIndex; + + // create EOF event + v8::Local eofEvent = Nan::New(); + + Nan::Set(eofEvent, Nan::New("topic").ToLocalChecked(), + Nan::New(message->topic_name()).ToLocalChecked()); + Nan::Set(eofEvent, Nan::New("offset").ToLocalChecked(), + Nan::New(message->offset())); + Nan::Set(eofEvent, Nan::New("partition").ToLocalChecked(), + Nan::New(message->partition())); + + // also store index at which position in the message array this event was emitted + // this way, we can later emit it at the right point in time + Nan::Set(eofEvent, Nan::New("messageIndex").ToLocalChecked(), + Nan::New(returnArrayIndex)); + + Nan::Set(eofEventsArray, eofEventsArrayIndex, eofEvent); + } + + delete message; + } + } + + argv[1] = returnArray; + argv[2] = eofEventsArray; + + callback->Call(argc, argv); +} + +void KafkaConsumerConsumeNumOfPartition::HandleErrorCallback() { + Nan::HandleScope scope; + + if (m_messages.size() > 0) { + for (std::vector::iterator it = m_messages.begin(); + it != m_messages.end(); ++it) { + RdKafka::Message* message = *it; + delete message; + } + } + + const unsigned int argc = 1; + v8::Local argv[argc] = { GetErrorObject() }; + + callback->Call(argc, argv); +} + /** * @brief KafkaConsumer get messages worker. * diff --git a/src/workers.h b/src/workers.h index d7d5ac8a..d60856e3 100644 --- a/src/workers.h +++ b/src/workers.h @@ -432,6 +432,25 @@ class KafkaConsumerSeek : public ErrorAwareWorker { const int m_timeout_ms; }; +class KafkaConsumerConsumeNumOfPartition : public ErrorAwareWorker { + public: + KafkaConsumerConsumeNumOfPartition(Nan::Callback*, NodeKafka::KafkaConsumer*, + const uint32_t &, const std::string, const uint32_t &, const int &, const bool); + ~KafkaConsumerConsumeNumOfPartition(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + private: + NodeKafka::KafkaConsumer * m_consumer; + const uint32_t m_num_messages; + const std::string m_topic; + const uint32_t m_partition; + const int m_timeout_ms; + std::vector m_messages; + const bool m_only_apply_timeout_to_first_message; +}; + class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, From d9394a440109bd4bf34ba11b912419332b4b05fa Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 16:31:51 +0100 Subject: [PATCH 2/5] fix: add missing method --- src/kafka-consumer.cc | 55 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0f5e32ed..ea9110fc 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -487,6 +487,30 @@ Baton KafkaConsumer::RefreshAssignments() { } } +Baton KafkaConsumer::DisableQueueForwarding(RdKafka::TopicPartition * toppar) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected"); + } + + // Disable forwarding for own partition + RdKafka::Queue *queue = m_client->get_partition_queue(toppar); + + if (queue == NULL) { + return Baton(RdKafka::ERR__STATE, + "TopicPartition has an invalid queue."); + } + + RdKafka::ErrorCode err = queue->forward(NULL); + if (err != RdKafka::ERR_NO_ERROR) { + delete queue; + return Baton(RdKafka::ERR__STATE, + "Could not disable queue for given partition."); + } + + delete queue; + return Baton(err); +} + std::string KafkaConsumer::Name() { if (!IsConnected()) { return std::string(""); @@ -548,6 +572,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); + Nan::SetPrototypeMethod(tpl, "disableQueueForwarding", NodeDisableQueueForwarding); // NOLINT Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -720,6 +745,36 @@ NAN_METHOD(KafkaConsumer::NodeAssignments) { Conversion::TopicPartition::ToV8Array(consumer->m_partitions)); } +NAN_METHOD(KafkaConsumer::NodeDisableQueueForwarding) { + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + if (!consumer->IsConnected()) { + Nan::ThrowError("KafkaConsumer is disconnected"); + return; + } + + if (info[0]->IsObject()) { + RdKafka::TopicPartition * toppar = + Conversion::TopicPartition::FromV8Object(info[0].As()); + + if (toppar == NULL) { + Nan::ThrowError("Invalid topic partition provided"); + return; + } + + Baton b = consumer->DisableQueueForwarding(toppar); + + delete toppar; + } else { + Nan::ThrowError("First parameter must be an object"); + return; + } + + info.GetReturnValue().Set(Nan::Null()); +} + NAN_METHOD(KafkaConsumer::NodeAssign) { Nan::HandleScope scope; From 0e4ec00a04a77a8bd93813f4233f46f93e8a9c62 Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 16:34:46 +0100 Subject: [PATCH 3/5] docs: add note about consume per partition --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 73bee23a..930ff2fb 100644 --- a/README.md +++ b/README.md @@ -447,7 +447,7 @@ stream.consumer.commit(); // Commits all locally stored offsets You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages: * *Flowing mode*. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the `unsubscribe` or `disconnect` method. -* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. +* *Non-flowing mode*. This mode reads a single message from Kafka at a time manually. You may choose to read from a specific partition or all at once. [In the docs an example has been included on how to consume from a specific partition](https://blizzard.github.io/node-rdkafka/current/tutorial-consumer-per-partition.html). The following example illustrates flowing mode: ```js From 88aa290c6b5fb64f6779958c1642843cee0494f0 Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 17:03:14 +0100 Subject: [PATCH 4/5] fix: update consume --- lib/kafka-consumer.js | 13 +++---- src/kafka-consumer.cc | 79 +++++++++++++++++++++++++++++++++---------- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index ab38396f..d09092d5 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -143,6 +143,11 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { this._consumeTimeout = timeoutMs; }; +KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { + this._client.disableQueueForwarding(topicPartition); + return this; +}; + /** * Set the default sleep delay for the next consume loop after the previous one has timed out. * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out @@ -151,11 +156,6 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs) this._consumeLoopTimeoutDelay = intervalMs; }; -KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { - this._client.disableQueueForwarding(topicPartition); - return this; -}; - /** * Get a stream representation of this KafkaConsumer * @@ -403,7 +403,7 @@ KafkaConsumer.prototype.unsubscribe = function() { * @param {KafkaConsumer~readCallback} cb - Callback to return when a message * is fetched. */ - KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { +KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; var self = this; @@ -419,6 +419,7 @@ KafkaConsumer.prototype.unsubscribe = function() { } else if ((number && typeof number === 'number') || (number && topic)) { // topic is given as the cb cb = topic; + if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index ea9110fc..c130d8c5 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1172,7 +1172,8 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { Nan::Callback *callback = new Nan::Callback(cb); - consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, timeout_sleep_delay_ms); + consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop( + callback, consumer, timeout_ms, timeout_sleep_delay_ms); info.GetReturnValue().Set(Nan::Null()); } @@ -1196,27 +1197,71 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { } if (info[1]->IsNumber()) { - if (!info[2]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } + if (info[2]->IsString() && info[3]->IsNumber()) { + // Consume per partition + if (!info[4]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } + + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } + + // Get string pointer for the topic name + Nan::Utf8String topicUTF8(Nan::To(info[2]).ToLocalChecked()); + std::string topic_name(*topicUTF8); + + // Parse partition + v8::Local partitionNumber = info[3].As(); + Nan::Maybe partitionMaybe = Nan::To(partitionNumber); // NOLINT + + uint32_t partition; + if (partitionMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number equal to or over 0"); + } else { + partition = partitionMaybe.FromJust(); + } - v8::Local numMessagesNumber = info[1].As(); - Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + // Parse onlyApplyTimeoutToFirstMessage + bool only_apply_timeout_to_first_message; + if (!Nan::To(info[5]).To(&only_apply_timeout_to_first_message)) { + only_apply_timeout_to_first_message = false; + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - uint32_t numMessages; - if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a number over 0"); + v8::Local cb = info[4].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms, only_apply_timeout_to_first_message)); // NOLINT } else { - numMessages = numMessagesMaybe.FromJust(); - } + if (!info[2]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - v8::Local cb = info[2].As(); - Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + v8::Local cb = info[2].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT + } } else { if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); @@ -1269,7 +1314,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) { // cleanup the async worker consumeLoop->WorkComplete(); consumeLoop->Destroy(); - + consumer->m_consume_loop = nullptr; } From e4c42cbd33541248ba979e4d0807a157b3c1777b Mon Sep 17 00:00:00 2001 From: Martijn Imhoff Date: Wed, 6 Nov 2024 17:35:54 +0100 Subject: [PATCH 5/5] fix: remove only_apply_timeout_to_first_message --- lib/kafka-consumer.js | 4 ++-- src/kafka-consumer.cc | 8 +------- src/workers.cc | 14 +++----------- src/workers.h | 3 +-- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index d09092d5..7acfe228 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -537,7 +537,7 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb, onlyApplyTimeoutToFirstMessage) { +KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb) { var self = this; this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) { @@ -572,7 +572,7 @@ KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages cb(null, messages); } - }, onlyApplyTimeoutToFirstMessage); + }); }; diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index c130d8c5..a850d0f6 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1228,18 +1228,12 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { partition = partitionMaybe.FromJust(); } - // Parse onlyApplyTimeoutToFirstMessage - bool only_apply_timeout_to_first_message; - if (!Nan::To(info[5]).To(&only_apply_timeout_to_first_message)) { - only_apply_timeout_to_first_message = false; - } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); v8::Local cb = info[4].As(); Nan::Callback *callback = new Nan::Callback(cb); Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms, only_apply_timeout_to_first_message)); // NOLINT + new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms)); // NOLINT } else { if (!info[2]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); diff --git a/src/workers.cc b/src/workers.cc index 2dbaf821..a6cf64a0 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -799,15 +799,13 @@ KafkaConsumerConsumeNumOfPartition::KafkaConsumerConsumeNumOfPartition(Nan::Call const uint32_t & num_messages, const std::string topic, const uint32_t & partition, - const int & timeout_ms, - const bool only_apply_timeout_to_first_message) : + const int & timeout_ms) : ErrorAwareWorker(callback), m_consumer(consumer), m_num_messages(num_messages), m_topic(topic), m_partition(partition), - m_timeout_ms(timeout_ms), - m_only_apply_timeout_to_first_message(only_apply_timeout_to_first_message) {} + m_timeout_ms(timeout_ms) {} KafkaConsumerConsumeNumOfPartition::~KafkaConsumerConsumeNumOfPartition() {} @@ -851,13 +849,7 @@ void KafkaConsumerConsumeNumOfPartition::Execute() { // Get a message RdKafka::Message *message = queue->consume(timeout_ms); RdKafka::ErrorCode errorCode = message->err(); - - // If true, do not wait after the first message. This will cause to consume - // only what has also been fetched and then return immediately - if (m_only_apply_timeout_to_first_message) { - timeout_ms = 1; - } - + switch (errorCode) { case RdKafka::ERR__PARTITION_EOF: // If partition EOF and have consumed messages, retry with timeout 1 diff --git a/src/workers.h b/src/workers.h index d60856e3..86955496 100644 --- a/src/workers.h +++ b/src/workers.h @@ -435,7 +435,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker { class KafkaConsumerConsumeNumOfPartition : public ErrorAwareWorker { public: KafkaConsumerConsumeNumOfPartition(Nan::Callback*, NodeKafka::KafkaConsumer*, - const uint32_t &, const std::string, const uint32_t &, const int &, const bool); + const uint32_t &, const std::string, const uint32_t &, const int &); ~KafkaConsumerConsumeNumOfPartition(); void Execute(); @@ -448,7 +448,6 @@ class KafkaConsumerConsumeNumOfPartition : public ErrorAwareWorker { const uint32_t m_partition; const int m_timeout_ms; std::vector m_messages; - const bool m_only_apply_timeout_to_first_message; }; class KafkaConsumerConsumeNum : public ErrorAwareWorker {