Skip to content

Commit

Permalink
Merge pull request #116 from tulios/fix-fetch-v4-for-partial-messages…
Browse files Browse the repository at this point in the history
…-on-record-batch

Fix fetch v4 for partial messages on record batch
  • Loading branch information
Nevon authored Aug 19, 2018
2 parents ade12dd + 8349812 commit 6b1d5d3
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ install:
- test $MD_ONLY && echo "Skipped!" || yarn install
jobs:
include:
- stage: test
name: 'Lint'
script:
- env
- yarn lint
- stage: test
name: 'Broker'
script:
Expand Down
1 change: 0 additions & 1 deletion src/admin/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const createAdmin = require('./index')
const { createCluster, newLogger } = require('testHelpers')
const { KafkaJSNonRetriableError } = require('../errors')

describe('Admin', () => {
it('gives access to its logger', () => {
Expand Down
134 changes: 134 additions & 0 deletions src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
newLogger,
waitFor,
waitForMessages,
testIfKafka011,
} = require('testHelpers')

describe('Consumer', () => {
Expand Down Expand Up @@ -177,6 +178,139 @@ describe('Consumer', () => {
])
})

testIfKafka011('consume messages with 0.11 format', async () => {
const topicName2 = `test-topic2-${secureRandom()}`
await createTopic({ topic: topicName2 })

cluster = createCluster({ allowExperimentalV011: true })
producer = createProducer({
cluster,
createPartitioner: createModPartitioner,
logger: newLogger(),
})

consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 100,
logger: newLogger(),
})

jest.spyOn(cluster, 'refreshMetadataIfNecessary')

await consumer.connect()
await producer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topic: topicName2, fromBeginning: true })

const messagesConsumed = []
await consumer.run({ eachMessage: async event => messagesConsumed.push(event) })

const generateMessages = () =>
Array(103)
.fill()
.map(() => {
const value = secureRandom()
return {
key: `key-${value}`,
value: `value-${value}`,
headers: {
'header-keyA': `header-valueA-${value}`,
'header-keyB': `header-valueB-${value}`,
'header-keyC': `header-valueC-${value}`,
},
}
})

const messages1 = generateMessages()
const messages2 = generateMessages()

await producer.sendBatch({
acks: 1,
topicMessages: [
{ topic: topicName, messages: messages1 },
{ topic: topicName2, messages: messages2 },
],
})

await waitForMessages(messagesConsumed, { number: messages1.length + messages2.length })

expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled()

const messagesFromTopic1 = messagesConsumed.filter(m => m.topic === topicName)
const messagesFromTopic2 = messagesConsumed.filter(m => m.topic === topicName2)

expect(messagesFromTopic1[0]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages1[0].key),
value: Buffer.from(messages1[0].value),
headers: {
'header-keyA': Buffer.from(messages1[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages1[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages1[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})

const lastMessage1 = messages1[messages1.length - 1]
expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage1.key),
value: Buffer.from(lastMessage1.value),
headers: {
'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})

expect(messagesFromTopic2[0]).toEqual({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages2[0].key),
value: Buffer.from(messages2[0].value),
headers: {
'header-keyA': Buffer.from(messages2[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages2[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages2[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})

const lastMessage2 = messages2[messages2.length - 1]
expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage2.key),
value: Buffer.from(lastMessage2.value),
headers: {
'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})

// check if all offsets are present
expect(messagesFromTopic1.map(m => m.message.offset)).toEqual(messages1.map((_, i) => `${i}`))
expect(messagesFromTopic2.map(m => m.message.offset)).toEqual(messages2.map((_, i) => `${i}`))
})

it('stops consuming messages when running = false', async () => {
await consumer.connect()
await producer.connect()
Expand Down
1 change: 0 additions & 1 deletion src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const createProducer = require('../../producer')
const createConsumer = require('../index')
const { KafkaJSNonRetriableError } = require('../../errors')

const {
secureRandom,
Expand Down
4 changes: 3 additions & 1 deletion src/network/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ module.exports = class Connection {
this.buffer = decoder.readAll()

if (this.authHandlers) {
return this.authHandlers.onSuccess(data.slice(0, Decoder.int32Size() + expectedResponseSize))
const rawResponseSize = Decoder.int32Size() + expectedResponseSize
const rawResponseBuffer = data.slice(0, rawResponseSize)
return this.authHandlers.onSuccess(rawResponseBuffer)
}

const correlationId = response.readInt32()
Expand Down
8 changes: 7 additions & 1 deletion src/protocol/requests/fetch/v4/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ const MAGIC_OFFSET = 16
*/

const decodeMessages = async decoder => {
const messagesBuffer = decoder.readBytes()
const messagesSize = decoder.readInt32()

if (messagesSize <= 0 || !decoder.canReadBytes(messagesSize)) {
return []
}

const messagesBuffer = decoder.readBytes(messagesSize)
const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8()

if (magicByte === MAGIC_BYTE) {
Expand Down

0 comments on commit 6b1d5d3

Please sign in to comment.