Skip to content

Commit

Permalink
Add rebalancing logic to kafka consumers (#866)
Browse files Browse the repository at this point in the history
Upgrade node-rdkafka to latest
Add cooperative-sticky rebalancing approach as default
  • Loading branch information
ish-bindra authored Feb 4, 2025
1 parent 2cacf6d commit 29389d6
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changeset/ten-buses-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@steveojs/storage-postgres": major
"steveo": major
---

Upgrade node-rdkafka, add custom rebalancing
4 changes: 2 additions & 2 deletions apps/tasks-example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
"lodash.merge": "^4.6.2",
"lodash.shuffle": "^4.2.0",
"moment": "2.30.1",
"node-rdkafka": "^2.11.0",
"node-rdkafka": "^3.2.1",
"null-logger": "^2.0.0",
"rsmq": "^0.12.3",
"uuid": "^3.1.0"
}
}
}
2 changes: 1 addition & 1 deletion apps/workflows-example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"lodash.merge": "^4.6.2",
"lodash.shuffle": "^4.2.0",
"moment": "2.30.1",
"node-rdkafka": "^2.11.0",
"node-rdkafka": "^3.2.1",
"null-logger": "^2.0.0",
"rsmq": "^0.12.3",
"ts-dotenv": "^0.9.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/steveo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"lodash.merge": "4.6.2",
"lodash.shuffle": "^4.2.0",
"moment": "2.30.1",
"node-rdkafka": "2.18.0",
"node-rdkafka": "^3.2.1",
"null-logger": "^2.0.0",
"rsmq": "^0.12.4",
"uuid": "^9.0.0"
Expand Down
9 changes: 9 additions & 0 deletions packages/steveo/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ const KafkaConsumerDefault: KafkaConsumerConfig = {
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'group.id': 'KAFKA_CONSUMERS',
/**
* See: https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
* This is a new feature in Kafka 2.4.0 that allows consumers to join and leave the group
* without triggering a full rebalance. This can be useful for scaling out consumers in a
* consumer group without causing a rebalance of the entire group.
* The default value is 'eager' which means that the consumer will trigger a rebalance
* when it joins or leaves the group, which is STOP THE WORLD behavior.
*/
'partition.assignment.strategy': 'cooperative-sticky',
},
topic: {
'auto.offset.reset': 'latest',
Expand Down
39 changes: 38 additions & 1 deletion packages/steveo/src/consumers/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,43 @@ class KafkaRunner
{
'bootstrap.servers': this.config.bootstrapServers,
'security.protocol': this.config.securityProtocol,
rebalance_cb: (err, assignment) => {
this.logger.debug('Rebalance event', err, assignment);
try {
/**
* These error codes can mean that the consumer needs to reassign
*
* KafkaJS (another kafka client) has a similar implementation
* See: https://github.com/tulios/kafkajs/pull/1474
* See: https://github.com/tulios/kafkajs/blob/master/src/consumer/runner.js#L115-L160
*/
if (
[
Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS,
Kafka.CODES.ERRORS.ERR_NOT_COORDINATOR_FOR_GROUP,
Kafka.CODES.ERRORS.ERR_REBALANCE_IN_PROGRESS,
Kafka.CODES.ERRORS.ERR_ILLEGAL_GENERATION,
Kafka.CODES.ERRORS.ERR_UNKNOWN_MEMBER_ID,
].includes(err.code)
) {
if (this.consumer.rebalanceProtocol() === 'COOPERATIVE') {
this.consumer.incrementalAssign(assignment);
} else {
this.consumer.assign(assignment);
}
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
if (this.consumer.rebalanceProtocol() === 'COOPERATIVE') {
this.consumer.incrementalUnassign(assignment);
} else {
this.consumer.unassign();
}
}
} catch (e) {
this.logger.error('Error during rebalance', e);
// Shutting down the consumer to not have a zombie consumer
if (this.consumer.isConnected()) this.shutdown();
}
},
offset_commit_cb: (err, topicPartitions) => {
if (err) {
this.logger.error(err);
Expand Down Expand Up @@ -208,7 +245,7 @@ class KafkaRunner
if (this.state === 'terminating') {
this.logger.debug(`terminating kafka consumer`);
this.state = 'terminated';
this.disconnect();
await this.shutdown();
return;
}

Expand Down
2 changes: 2 additions & 0 deletions packages/steveo/test/common/config_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ describe('Config', () => {
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'group.id': 'KAFKA_CONSUMERS',
'partition.assignment.strategy': 'cooperative-sticky',
});
expect(config.producer).to.eqls({ global: {}, topic: {} });
expect(config.admin).to.eqls({});
Expand Down Expand Up @@ -55,6 +56,7 @@ describe('Config', () => {
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'group.id': 'TEST_CONSUMERS',
'partition.assignment.strategy': 'cooperative-sticky',
a: 1,
});
expect(config.producer).to.eqls({
Expand Down
2 changes: 1 addition & 1 deletion packages/storage-postgres/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"lodash.merge": "4.6.2",
"lodash.shuffle": "^4.2.0",
"moment": "2.30.1",
"node-rdkafka": "2.18.0",
"node-rdkafka": "3.2.1",
"null-logger": "^2.0.0",
"prisma": "^5.19.1",
"radash": "^12.1.0",
Expand Down
17 changes: 11 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5231,11 +5231,16 @@ mv@~2:
ncp "~2.0.0"
rimraf "~2.4.0"

nan@^2.14.0, nan@^2.16.0, nan@^2.17.0:
nan@^2.14.0, nan@^2.16.0:
version "2.17.0"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.17.0.tgz#c0150a2368a182f033e9aa5195ec76ea41a199cb"
integrity sha512-2ZTgtl0nJsO0KQCjEpxcIr5D+Yv90plTitZt9JBfQvVJDS5seMl3FOvsh3+9CoYWXf/1l5OaZzzF6nDm4cagaQ==

nan@^2.19.0:
version "2.22.0"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.22.0.tgz#31bc433fc33213c97bad36404bb68063de604de3"
integrity sha512-nbajikzWTMwsW+eSsNm3QwlOs7het9gGJU5dDZzRTQGk03vyBOauxgI4VakDzE0PtsGTmXPsXTbbjVhRwR5mpw==

[email protected]:
version "3.1.23"
resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.23.tgz#f744086ce7c2bc47ee0a8472574d5c78e4183a81"
Expand Down Expand Up @@ -5336,13 +5341,13 @@ node-preload@^0.2.1:
dependencies:
process-on-spawn "^1.0.0"

node-rdkafka@2.18.0:
version "2.18.0"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.18.0.tgz#116950e49dfe804932c8bc6dbc68949793e72ee2"
integrity sha512-jYkmO0sPvjesmzhv1WFOO4z7IMiAFpThR6/lcnFDWgSPkYL95CtcuVNo/R5PpjujmqSgS22GMkL1qvU4DTAvEQ==
node-rdkafka@3.2.1, node-rdkafka@^3.2.1:
version "3.2.1"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-3.2.1.tgz#1e41ed61e88c551745ca9de097c9e64a7cb1803d"
integrity sha512-lf/U8LLCLA8v2tAh2x4guizfFAZSt/KJSzzaIEXtYGospfeZVf2U0RfHlCo7+XvbUoAG4V3uyMgif/tjHQIYfA==
dependencies:
bindings "^1.3.1"
nan "^2.17.0"
nan "^2.19.0"

node-releases@^2.0.0:
version "2.0.0"
Expand Down

0 comments on commit 29389d6

Please sign in to comment.