Skip to content

Commit

Permalink
Merge pull request #221 from tulios/220-handle-unsubscribed-assignments
Browse files Browse the repository at this point in the history
Handle receiving assignments for unsubscribed topics
  • Loading branch information
Nevon committed Nov 28, 2018
1 parent d841354 commit 626405b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,9 @@ List of available events:

* consumer.events.STOP

* consumer.events.CRASH
payload: {`error`, `groupId`}

### <a name="instrumentation-producer"></a> Producer

* producer.events.CONNECT
Expand Down
56 changes: 56 additions & 0 deletions src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const createConsumer = require('../index')

const {
secureRandom,
createCluster,
createTopic,
newLogger,
waitForConsumerToJoinGroup,
} = require('testHelpers')

describe('Consumer', () => {
let topicNames, groupId, consumer1, consumer2

beforeEach(async () => {
topicNames = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
groupId = `consumer-group-id-${secureRandom()}`

await Promise.all(topicNames.map(topicName => createTopic({ topic: topicName })))

consumer1 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
})

afterEach(async () => {
consumer1 && (await consumer1.disconnect())
consumer2 && (await consumer2.disconnect())
})

it('handles receiving assignments for unsubscribed topics', async () => {
await consumer1.connect()
await Promise.all(
topicNames.map(topicName => consumer1.subscribe({ topic: topicName, fromBeginning: true }))
)

consumer1.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer1)

// Second consumer re-uses group id but only subscribes to one of the topics
consumer2 = createConsumer({
cluster: createCluster({ metadataMaxAge: 50 }),
groupId,
heartbeatInterval: 100,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
await consumer2.subscribe({ topic: topicNames[0], fromBeginning: true })

consumer2.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer2)
})
})
35 changes: 35 additions & 0 deletions src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,39 @@ describe('Consumer > Instrumentation Events', () => {
expect(stopListener).toHaveBeenCalled()
expect(disconnectListener).toHaveBeenCalled()
})

it('emits crash events', async () => {
consumer = createConsumer({
cluster,
groupId,
logger: newLogger(),
heartbeatInterval: 100,
maxWaitTimeInMs: 1,
maxBytesPerPartition: 180,
retry: {
retries: 0,
},
})
const crashListener = jest.fn()
consumer.on(consumer.events.CRASH, crashListener)
const error = new Error('💣')

await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.run({
eachMessage: async () => {
throw error
},
})

await producer.send({ acks: 1, topic: topicName, messages: [message] })

await waitFor(() => crashListener.mock.calls.length > 0)
expect(crashListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'consumer.crash',
payload: { error, groupId },
})
})
})
3 changes: 2 additions & 1 deletion src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ module.exports = class ConsumerGroup {
}

// Check if the consumer is aware of all assigned partitions
for (let topic of assignedTopics) {
const safeAssignedTopics = keys(currentMemberAssignment)
for (let topic of safeAssignedTopics) {
const assignedPartitions = currentMemberAssignment[topic]
const knownPartitions = this.partitionsPerSubscribedTopic.get(topic)
const isAwareOfAllAssignedPartitions = assignedPartitions.every(partition =>
Expand Down
7 changes: 6 additions & 1 deletion src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const ConsumerGroup = require('./consumerGroup')
const Runner = require('./runner')
const events = require('./instrumentationEvents')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const { CONNECT, DISCONNECT, STOP } = require('./instrumentationEvents')
const { CONNECT, DISCONNECT, STOP, CRASH } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')
const { roundRobin } = require('./assigners')
const { EARLIEST_OFFSET, LATEST_OFFSET } = require('../constants')
Expand Down Expand Up @@ -186,6 +186,11 @@ module.exports = ({

await disconnect()

instrumentationEmitter.emit(CRASH, {
error: e,
groupId,
})

if (e.name === 'KafkaJSNumberOfRetriesExceeded') {
logger.error(`Restarting the consumer in ${e.retryTime}ms`, {
retryCount: e.retryCount,
Expand Down
1 change: 1 addition & 0 deletions src/consumer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ module.exports = {
CONNECT: consumerType('connect'),
DISCONNECT: consumerType('disconnect'),
STOP: consumerType('stop'),
CRASH: consumerType('crash'),
}
4 changes: 4 additions & 0 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ const waitForConsumerToJoinGroup = (consumer, { maxWait = 10000 } = {}) =>
clearTimeout(timeoutId)
resolve()
})
consumer.on(consumer.events.CRASH, event => {
clearTimeout(timeoutId)
reject(event.payload.error)
})
})

const createTopic = async ({ topic, partitions = 1, config = [] }) => {
Expand Down

0 comments on commit 626405b

Please sign in to comment.