Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cooperative Rebalance #1081

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add cooperative rebalance
  • Loading branch information
serj026 committed Jun 14, 2024
commit a129bbe11ff6f6bbb430184afa7ffb3e08a3316b
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -208,6 +208,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);

assign(assignments: Assignment[]): this;
incrementalAssign(assignments: Assignment[]): this;

assignments(): Assignment[];

@@ -248,12 +249,15 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
subscription(): string[];

unassign(): this;
incrementalUnassign(assignments: Assignment[]): this;

unsubscribe(): this;

offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;

rebalanceProtocol(): string;

static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
}

46 changes: 44 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
@@ -61,9 +61,17 @@ function KafkaConsumer(conf, topicConf) {
// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
if (self.rebalanceProtocol() === 'COOPERATIVE') {
self.incrementalAssign(assignment);
} else {
self.assign(assignment);
}
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
if (self.rebalanceProtocol() === 'COOPERATIVE') {
self.incrementalUnassign(assignment);
} else {
self.unassign();
}
}
} catch (e) {
// Ignore exceptions if we are not connected
@@ -275,6 +283,40 @@ KafkaConsumer.prototype.unassign = function() {
return this;
};

/**
* Assign the consumer specific partitions and topics. Used for
* cooperative rebalancing.
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set. Assignments are additive.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
this._client.incrementalAssign(TopicPartition.map(assignments));
return this;
};

/**
* Unassign the consumer specific partitions and topics. Used for
* cooperative rebalancing.
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set. Assignments are subtractive.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
this._client.incrementalUnassign(TopicPartition.map(assignments));
return this;
};

/**
* Get the type of rebalance protocol used in the consumer group.
*
* @returns "NONE", "COOPERATIVE" or "EAGER".
*/
KafkaConsumer.prototype.rebalanceProtocol = function() {
return this._client.rebalanceProtocol();
}

/**
* Get the assignments for the consumer
12 changes: 12 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
@@ -68,6 +68,18 @@ Connection::~Connection() {
}
}

Baton Connection::rdkafkaErrorToBaton(RdKafka::Error* error) {
if ( NULL == error) {
return Baton(RdKafka::ERR_NO_ERROR);
}
else {
Baton result(error->code(), error->str(), error->is_fatal(),
error->is_retriable(), error->txn_requires_abort());
delete error;
return result;
}
}

RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
}
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ class Connection : public Nan::ObjectWrap {

static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);

bool m_has_been_disconnected;
bool m_is_closing;
181 changes: 181 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
@@ -219,6 +219,59 @@ Baton KafkaConsumer::Unassign() {
return Baton(RdKafka::ERR_NO_ERROR);
}

Baton KafkaConsumer::IncrementalAssign(std::vector<RdKafka::TopicPartition*> partitions) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::Error* error = consumer->incremental_assign(partitions);

if (error == NULL) {
m_partition_cnt += partitions.size();
m_partitions.insert(m_partitions.end(), partitions.begin(), partitions.end());
} else {
RdKafka::TopicPartition::destroy(partitions);
}

return rdkafkaErrorToBaton(error);
}

Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> partitions) {
if (!IsClosing() && !IsConnected()) {
return Baton(RdKafka::ERR__STATE);
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::Error* error = consumer->incremental_unassign(partitions);

std::vector<RdKafka::TopicPartition*> delete_partitions;

if (error == NULL) {
for (unsigned int i = 0; i < partitions.size(); i++) {
for (unsigned int j = 0; j < m_partitions.size(); j++) {
if (partitions[i]->partition() == m_partitions[j]->partition() &&
partitions[i]->topic() == m_partitions[j]->topic()) {
delete_partitions.push_back(m_partitions[j]);
m_partitions.erase(m_partitions.begin() + j);
m_partition_cnt--;
break;
}
}
}
}

RdKafka::TopicPartition::destroy(delete_partitions);

RdKafka::TopicPartition::destroy(partitions);

return rdkafkaErrorToBaton(error);
}

Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
@@ -494,6 +547,17 @@ std::string KafkaConsumer::Name() {
return std::string(m_client->name());
}

std::string KafkaConsumer::RebalanceProtocol() {
if (!IsConnected()) {
return std::string("NONE");
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

return consumer->rebalance_protocol();
}

Nan::Persistent<v8::Function> KafkaConsumer::constructor;

void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
@@ -547,7 +611,10 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
Nan::SetPrototypeMethod(tpl, "rebalanceProtocol", NodeRebalanceProtocol);

Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
@@ -720,6 +787,12 @@ NAN_METHOD(KafkaConsumer::NodeAssignments) {
Conversion::TopicPartition::ToV8Array(consumer->m_partitions));
}

NAN_METHOD(KafkaConsumer::NodeRebalanceProtocol) {
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
std::string protocol = consumer->RebalanceProtocol();
info.GetReturnValue().Set(Nan::New<v8::String>(protocol).ToLocalChecked());
}

NAN_METHOD(KafkaConsumer::NodeAssign) {
Nan::HandleScope scope;

@@ -798,6 +871,114 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
Nan::HandleScope scope;

if (info.Length() < 1 || !info[0]->IsArray()) {
return Nan::ThrowError("Need to specify an array of partitions");
}

v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
std::vector<RdKafka::TopicPartition*> topic_partitions;

for (unsigned int i = 0; i < partitions->Length(); ++i) {
v8::Local<v8::Value> partition_obj_value;
if (!(
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
partition_obj_value->IsObject())) {
Nan::ThrowError("Must pass topic-partition objects");
}

v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();

int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");

if (!topic.empty()) {
RdKafka::TopicPartition* part;

if (partition < 0) {
part = Connection::GetPartition(topic);
} else {
part = Connection::GetPartition(topic, partition);
}

int64_t offset = GetParameter<int64_t>(
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
if (offset != RdKafka::Topic::OFFSET_INVALID) {
part->set_offset(offset);
}

topic_partitions.push_back(part);
}
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

Baton b = consumer->IncrementalAssign(topic_partitions);

if (b.err() != RdKafka::ERR_NO_ERROR) {
v8::Local<v8::Value> errorObject = b.ToObject();
Nan::ThrowError(errorObject);
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
Nan::HandleScope scope;

if (info.Length() < 1 || !info[0]->IsArray()) {
return Nan::ThrowError("Need to specify an array of partitions");
}

v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
std::vector<RdKafka::TopicPartition*> topic_partitions;

for (unsigned int i = 0; i < partitions->Length(); ++i) {
v8::Local<v8::Value> partition_obj_value;
if (!(
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
partition_obj_value->IsObject())) {
Nan::ThrowError("Must pass topic-partition objects");
}

v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();

int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");

if (!topic.empty()) {
RdKafka::TopicPartition* part;

if (partition < 0) {
part = Connection::GetPartition(topic);
} else {
part = Connection::GetPartition(topic, partition);
}

int64_t offset = GetParameter<int64_t>(
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
if (offset != RdKafka::Topic::OFFSET_INVALID) {
part->set_offset(offset);
}

topic_partitions.push_back(part);
}
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

Baton b = consumer->IncrementalUnassign(topic_partitions);

if (b.err() != RdKafka::ERR_NO_ERROR) {
v8::Local<v8::Value> errorObject = b.ToObject();
Nan::ThrowError(errorObject);
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
Nan::HandleScope scope;

7 changes: 7 additions & 0 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
@@ -74,9 +74,13 @@ class KafkaConsumer : public Connection {
Baton Assign(std::vector<RdKafka::TopicPartition*>);
Baton Unassign();

Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);

Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);

std::string Name();
std::string RebalanceProtocol();

Baton Subscribe(std::vector<std::string>);
Baton Consume(int timeout_ms);
@@ -106,6 +110,9 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeDisconnect);
static NAN_METHOD(NodeAssign);
static NAN_METHOD(NodeUnassign);
static NAN_METHOD(NodeIncrementalAssign);
static NAN_METHOD(NodeIncrementalUnassign);
static NAN_METHOD(NodeRebalanceProtocol);
static NAN_METHOD(NodeAssignments);
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);