diff --git a/README.md b/README.md index d3de26724..5f96756a5 100644 --- a/README.md +++ b/README.md @@ -1155,6 +1155,9 @@ List of available events: * consumer.events.STOP +* consumer.events.CRASH + payload: {`error`, `groupId`} + ### Producer * producer.events.CONNECT diff --git a/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js b/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js new file mode 100644 index 000000000..065c817ad --- /dev/null +++ b/src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js @@ -0,0 +1,56 @@ +const createConsumer = require('../index') + +const { + secureRandom, + createCluster, + createTopic, + newLogger, + waitForConsumerToJoinGroup, +} = require('testHelpers') + +describe('Consumer', () => { + let topicNames, groupId, consumer1, consumer2 + + beforeEach(async () => { + topicNames = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`] + groupId = `consumer-group-id-${secureRandom()}` + + await Promise.all(topicNames.map(topicName => createTopic({ topic: topicName }))) + + consumer1 = createConsumer({ + cluster: createCluster({ metadataMaxAge: 50 }), + groupId, + heartbeatInterval: 100, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + }) + + afterEach(async () => { + consumer1 && (await consumer1.disconnect()) + consumer2 && (await consumer2.disconnect()) + }) + + it('handles receiving assignments for unsubscribed topics', async () => { + await consumer1.connect() + await Promise.all( + topicNames.map(topicName => consumer1.subscribe({ topic: topicName, fromBeginning: true })) + ) + + consumer1.run({ eachMessage: () => {} }) + await waitForConsumerToJoinGroup(consumer1) + + // Second consumer re-uses group id but only subscribes to one of the topics + consumer2 = createConsumer({ + cluster: createCluster({ metadataMaxAge: 50 }), + groupId, + heartbeatInterval: 100, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + await consumer2.subscribe({ topic: topicNames[0], fromBeginning: true }) + + consumer2.run({ eachMessage: () => {} }) + await waitForConsumerToJoinGroup(consumer2) + }) +}) diff --git a/src/consumer/__tests__/instrumentationEvents.spec.js b/src/consumer/__tests__/instrumentationEvents.spec.js index 61651071c..82c06a7ac 100644 --- a/src/consumer/__tests__/instrumentationEvents.spec.js +++ b/src/consumer/__tests__/instrumentationEvents.spec.js @@ -249,4 +249,39 @@ describe('Consumer > Instrumentation Events', () => { expect(stopListener).toHaveBeenCalled() expect(disconnectListener).toHaveBeenCalled() }) + + it('emits crash events', async () => { + consumer = createConsumer({ + cluster, + groupId, + logger: newLogger(), + heartbeatInterval: 100, + maxWaitTimeInMs: 1, + maxBytesPerPartition: 180, + retry: { + retries: 0, + }, + }) + const crashListener = jest.fn() + consumer.on(consumer.events.CRASH, crashListener) + const error = new Error('💣') + + await consumer.connect() + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + await consumer.run({ + eachMessage: async () => { + throw error + }, + }) + + await producer.send({ acks: 1, topic: topicName, messages: [message] }) + + await waitFor(() => crashListener.mock.calls.length > 0) + expect(crashListener).toHaveBeenCalledWith({ + id: expect.any(Number), + timestamp: expect.any(Number), + type: 'consumer.crash', + payload: { error, groupId }, + }) + }) }) diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index e8d4a4962..78f0d8140 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -166,7 +166,8 @@ module.exports = class ConsumerGroup { } // Check if the consumer is aware of all assigned partitions - for (let topic of assignedTopics) { + const safeAssignedTopics = keys(currentMemberAssignment) + for (let topic of safeAssignedTopics) { const assignedPartitions = currentMemberAssignment[topic] const knownPartitions = this.partitionsPerSubscribedTopic.get(topic) const isAwareOfAllAssignedPartitions = assignedPartitions.every(partition => diff --git a/src/consumer/index.js b/src/consumer/index.js index f4d92ab46..47b0501ed 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -4,7 +4,7 @@ const ConsumerGroup = require('./consumerGroup') const Runner = require('./runner') const events = require('./instrumentationEvents') const InstrumentationEventEmitter = require('../instrumentation/emitter') -const { CONNECT, DISCONNECT, STOP } = require('./instrumentationEvents') +const { CONNECT, DISCONNECT, STOP, CRASH } = require('./instrumentationEvents') const { KafkaJSNonRetriableError } = require('../errors') const { roundRobin } = require('./assigners') const { EARLIEST_OFFSET, LATEST_OFFSET } = require('../constants') @@ -186,6 +186,11 @@ module.exports = ({ await disconnect() + instrumentationEmitter.emit(CRASH, { + error: e, + groupId, + }) + if (e.name === 'KafkaJSNumberOfRetriesExceeded') { logger.error(`Restarting the consumer in ${e.retryTime}ms`, { retryCount: e.retryCount, diff --git a/src/consumer/instrumentationEvents.js b/src/consumer/instrumentationEvents.js index 59653ad0c..336e8ae9e 100644 --- a/src/consumer/instrumentationEvents.js +++ b/src/consumer/instrumentationEvents.js @@ -11,4 +11,5 @@ module.exports = { CONNECT: consumerType('connect'), DISCONNECT: consumerType('disconnect'), STOP: consumerType('stop'), + CRASH: consumerType('crash'), } diff --git a/testHelpers/index.js b/testHelpers/index.js index b903d9062..3e60e5cf1 100644 --- a/testHelpers/index.js +++ b/testHelpers/index.js @@ -132,6 +132,10 @@ const waitForConsumerToJoinGroup = (consumer, { maxWait = 10000 } = {}) => clearTimeout(timeoutId) resolve() }) + consumer.on(consumer.events.CRASH, event => { + clearTimeout(timeoutId) + reject(event.payload.error) + }) }) const createTopic = async ({ topic, partitions = 1, config = [] }) => {