Skip to content

Commit

Permalink
Merge pull request #114 from tulios/fix-produce-v3-decode
Browse files Browse the repository at this point in the history
Fix producer v3 decode format
  • Loading branch information
tulios authored Aug 14, 2018
2 parents 0f04b0b + cdef78e commit a1e6c4b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 11 deletions.
12 changes: 6 additions & 6 deletions src/broker/__tests__/produce.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ describe('Broker > Produce', () => {

const response1 = await broker2.produce({ topicData: createTopicData() })
expect(response1).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand All @@ -169,7 +169,7 @@ describe('Broker > Produce', () => {

const response2 = await broker2.produce({ topicData: createTopicData() })
expect(response2).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '3', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand All @@ -195,7 +195,7 @@ describe('Broker > Produce', () => {

const response1 = await broker2.produce({ topicData: createTopicData(true) })
expect(response1).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand All @@ -206,7 +206,7 @@ describe('Broker > Produce', () => {

const response2 = await broker2.produce({ topicData: createTopicData() })
expect(response2).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '3', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand Down Expand Up @@ -236,7 +236,7 @@ describe('Broker > Produce', () => {
})

expect(response1).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand All @@ -251,7 +251,7 @@ describe('Broker > Produce', () => {
})

expect(response2).toEqual({
responses: [
topics: [
{
topicName,
partitions: [{ baseOffset: '3', errorCode: 0, logAppendTime: '-1', partition: 0 }],
Expand Down
91 changes: 90 additions & 1 deletion src/producer/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const createProducer = require('./index')
const { KafkaJSNonRetriableError } = require('../errors')
const {
secureRandom,
connectionOpts,
Expand All @@ -11,6 +10,7 @@ const {
sslBrokers,
saslBrokers,
newLogger,
testIfKafka011,
} = require('testHelpers')

const { KafkaJSSASLAuthenticationError } = require('../errors')
Expand Down Expand Up @@ -203,6 +203,95 @@ describe('Producer', () => {
])
})

testIfKafka011('produce messages for Kafka 0.11', async () => {
const cluster = createCluster(
Object.assign(connectionOpts(), {
allowExperimentalV011: true,
createPartitioner: createModPartitioner,
})
)

producer = createProducer({ cluster, logger: newLogger() })
await producer.connect()

const sendMessages = async () =>
await producer.send({
acks: 1,
topic: topicName,
messages: new Array(10).fill().map((_, i) => ({
key: `key-${i}`,
value: `value-${i}`,
})),
})

expect(await sendMessages()).toEqual([
{
topicName,
baseOffset: '0',
errorCode: 0,
logAppendTime: '-1',
partition: 0,
},
])

expect(await sendMessages()).toEqual([
{
topicName,
baseOffset: '10',
errorCode: 0,
logAppendTime: '-1',
partition: 0,
},
])
})

testIfKafka011('produce messages for Kafka 0.11 with headers', async () => {
const cluster = createCluster(
Object.assign(connectionOpts(), {
allowExperimentalV011: true,
createPartitioner: createModPartitioner,
})
)

producer = createProducer({ cluster, logger: newLogger() })
await producer.connect()

const sendMessages = async () =>
await producer.send({
acks: 1,
topic: topicName,
messages: new Array(10).fill().map((_, i) => ({
key: `key-${i}`,
value: `value-${i}`,
headers: {
[`header-a${i}`]: `header-value-a${i}`,
[`header-b${i}`]: `header-value-b${i}`,
[`header-c${i}`]: `header-value-c${i}`,
},
})),
})

expect(await sendMessages()).toEqual([
{
topicName,
baseOffset: '0',
errorCode: 0,
logAppendTime: '-1',
partition: 0,
},
])

expect(await sendMessages()).toEqual([
{
topicName,
baseOffset: '10',
errorCode: 0,
logAppendTime: '-1',
partition: 0,
},
])
})

test('produce messages to multiple topics', async () => {
const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
const cluster = createCluster({
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/requests/produce/v3/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ const partition = decoder => ({

const decode = async rawData => {
const decoder = new Decoder(rawData)
const responses = decoder.readArray(decoder => ({
const topics = decoder.readArray(decoder => ({
topicName: decoder.readString(),
partitions: decoder.readArray(partition),
}))

const throttleTime = decoder.readInt32()

return {
responses,
topics,
throttleTime,
}
}

const parse = async data => {
const partitionsWithError = data.responses.map(response => {
const partitionsWithError = data.topics.map(response => {
return response.partitions.filter(partition => failure(partition.errorCode))
})

Expand Down
2 changes: 1 addition & 1 deletion src/protocol/requests/produce/v3/response.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ describe('Protocol > Requests > Produce > v3', () => {
test('response', async () => {
const data = await decode(Buffer.from(require('../fixtures/v3_response.json')))
expect(data).toEqual({
responses: [
topics: [
{
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
topicName: 'test-topic-ebba68879c6f5081d8c2',
Expand Down

0 comments on commit a1e6c4b

Please sign in to comment.