diff --git a/.travis.yml b/.travis.yml index 98d7ba20e..14d84d554 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,11 @@ install: - test $MD_ONLY && echo "Skipped!" || yarn install jobs: include: + - stage: test + name: 'Lint' + script: + - env + - yarn lint - stage: test name: 'Broker' script: diff --git a/src/admin/index.spec.js b/src/admin/index.spec.js index d3db38838..d7e0dd22f 100644 --- a/src/admin/index.spec.js +++ b/src/admin/index.spec.js @@ -1,6 +1,5 @@ const createAdmin = require('./index') const { createCluster, newLogger } = require('testHelpers') -const { KafkaJSNonRetriableError } = require('../errors') describe('Admin', () => { it('gives access to its logger', () => { diff --git a/src/consumer/__tests__/consumeMessages.spec.js b/src/consumer/__tests__/consumeMessages.spec.js index 68054dda6..570c46244 100644 --- a/src/consumer/__tests__/consumeMessages.spec.js +++ b/src/consumer/__tests__/consumeMessages.spec.js @@ -10,6 +10,7 @@ const { newLogger, waitFor, waitForMessages, + testIfKafka011, } = require('testHelpers') describe('Consumer', () => { @@ -177,6 +178,139 @@ describe('Consumer', () => { ]) }) + testIfKafka011('consume messages with 0.11 format', async () => { + const topicName2 = `test-topic2-${secureRandom()}` + await createTopic({ topic: topicName2 }) + + cluster = createCluster({ allowExperimentalV011: true }) + producer = createProducer({ + cluster, + createPartitioner: createModPartitioner, + logger: newLogger(), + }) + + consumer = createConsumer({ + cluster, + groupId, + maxWaitTimeInMs: 100, + logger: newLogger(), + }) + + jest.spyOn(cluster, 'refreshMetadataIfNecessary') + + await consumer.connect() + await producer.connect() + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + await consumer.subscribe({ topic: topicName2, fromBeginning: true }) + + const messagesConsumed = [] + await consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) + + const generateMessages = () => + Array(103) + .fill() + .map(() => { + const value = secureRandom() + return { + key: `key-${value}`, + value: `value-${value}`, + headers: { + 'header-keyA': `header-valueA-${value}`, + 'header-keyB': `header-valueB-${value}`, + 'header-keyC': `header-valueC-${value}`, + }, + } + }) + + const messages1 = generateMessages() + const messages2 = generateMessages() + + await producer.sendBatch({ + acks: 1, + topicMessages: [ + { topic: topicName, messages: messages1 }, + { topic: topicName2, messages: messages2 }, + ], + }) + + await waitForMessages(messagesConsumed, { number: messages1.length + messages2.length }) + + expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled() + + const messagesFromTopic1 = messagesConsumed.filter(m => m.topic === topicName) + const messagesFromTopic2 = messagesConsumed.filter(m => m.topic === topicName2) + + expect(messagesFromTopic1[0]).toEqual({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages1[0].key), + value: Buffer.from(messages1[0].value), + headers: { + 'header-keyA': Buffer.from(messages1[0].headers['header-keyA']), + 'header-keyB': Buffer.from(messages1[0].headers['header-keyB']), + 'header-keyC': Buffer.from(messages1[0].headers['header-keyC']), + }, + magicByte: 2, + offset: '0', + }), + }) + + const lastMessage1 = messages1[messages1.length - 1] + expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(lastMessage1.key), + value: Buffer.from(lastMessage1.value), + headers: { + 'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']), + 'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']), + 'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']), + }, + magicByte: 2, + offset: '102', + }), + }) + + expect(messagesFromTopic2[0]).toEqual({ + topic: topicName2, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages2[0].key), + value: Buffer.from(messages2[0].value), + headers: { + 'header-keyA': Buffer.from(messages2[0].headers['header-keyA']), + 'header-keyB': Buffer.from(messages2[0].headers['header-keyB']), + 'header-keyC': Buffer.from(messages2[0].headers['header-keyC']), + }, + magicByte: 2, + offset: '0', + }), + }) + + const lastMessage2 = messages2[messages2.length - 1] + expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual({ + topic: topicName2, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(lastMessage2.key), + value: Buffer.from(lastMessage2.value), + headers: { + 'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']), + 'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']), + 'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']), + }, + magicByte: 2, + offset: '102', + }), + }) + + // check if all offsets are present + expect(messagesFromTopic1.map(m => m.message.offset)).toEqual(messages1.map((_, i) => `${i}`)) + expect(messagesFromTopic2.map(m => m.message.offset)).toEqual(messages2.map((_, i) => `${i}`)) + }) + it('stops consuming messages when running = false', async () => { await consumer.connect() await producer.connect() diff --git a/src/consumer/__tests__/instrumentationEvents.spec.js b/src/consumer/__tests__/instrumentationEvents.spec.js index 1a94baafb..e93c93b34 100644 --- a/src/consumer/__tests__/instrumentationEvents.spec.js +++ b/src/consumer/__tests__/instrumentationEvents.spec.js @@ -1,6 +1,5 @@ const createProducer = require('../../producer') const createConsumer = require('../index') -const { KafkaJSNonRetriableError } = require('../../errors') const { secureRandom, diff --git a/src/network/connection.js b/src/network/connection.js index da9f803e5..9080e0233 100644 --- a/src/network/connection.js +++ b/src/network/connection.js @@ -320,7 +320,9 @@ module.exports = class Connection { this.buffer = decoder.readAll() if (this.authHandlers) { - return this.authHandlers.onSuccess(data.slice(0, Decoder.int32Size() + expectedResponseSize)) + const rawResponseSize = Decoder.int32Size() + expectedResponseSize + const rawResponseBuffer = data.slice(0, rawResponseSize) + return this.authHandlers.onSuccess(rawResponseBuffer) } const correlationId = response.readInt32() diff --git a/src/protocol/requests/fetch/v4/response.js b/src/protocol/requests/fetch/v4/response.js index 882042e9e..b9d062cc5 100644 --- a/src/protocol/requests/fetch/v4/response.js +++ b/src/protocol/requests/fetch/v4/response.js @@ -26,7 +26,13 @@ const MAGIC_OFFSET = 16 */ const decodeMessages = async decoder => { - const messagesBuffer = decoder.readBytes() + const messagesSize = decoder.readInt32() + + if (messagesSize <= 0 || !decoder.canReadBytes(messagesSize)) { + return [] + } + + const messagesBuffer = decoder.readBytes(messagesSize) const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8() if (magicByte === MAGIC_BYTE) {