From 28fa55dc5199c3d5da074b171dc59d84f7b18fd6 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 8 Mar 2022 17:24:22 +0100 Subject: [PATCH 1/2] Respect default topic replication factor when creating topics Fixes #1216 --- docker-compose.0_10.yml | 3 + docker-compose.0_11.yml | 3 + docker-compose.1_1.yml | 3 + docker-compose.2_2.yml | 3 + docker-compose.2_3.yml | 3 + docker-compose.2_4.yml | 5 +- docker-compose.2_4_oauthbearer.yml | 3 + src/broker/__tests__/createTopics.spec.js | 57 ++++++++++++++++++- .../createTopics/fixtures/v0_request.json | 2 +- .../createTopics/fixtures/v1_request.json | 2 +- .../createTopics/fixtures/v2_request.json | 2 +- .../requests/createTopics/v0/request.js | 2 +- .../requests/createTopics/v1/request.js | 2 +- 13 files changed, 82 insertions(+), 8 deletions(-) diff --git a/docker-compose.0_10.yml b/docker-compose.0_10.yml index b35227ae3..4059bc236 100644 --- a/docker-compose.0_10.yml +++ b/docker-compose.0_10.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -82,6 +83,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -129,6 +131,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.0_11.yml b/docker-compose.0_11.yml index 956a3aa72..aaf7fac90 100644 --- a/docker-compose.0_11.yml +++ b/docker-compose.0_11.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -82,6 +83,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -129,6 +131,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.1_1.yml b/docker-compose.1_1.yml index f3acdfeda..545f2cce5 100644 --- a/docker-compose.1_1.yml +++ b/docker-compose.1_1.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -82,6 +83,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -129,6 +131,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_2.yml b/docker-compose.2_2.yml index 824bbd759..efe77ea7a 100644 --- a/docker-compose.2_2.yml +++ b/docker-compose.2_2.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -81,6 +82,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -127,6 +129,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_3.yml b/docker-compose.2_3.yml index 970ff1f11..a992caba6 100644 --- a/docker-compose.2_3.yml +++ b/docker-compose.2_3.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -81,6 +82,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -127,6 +129,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_4.yml b/docker-compose.2_4.yml index a7f30ff67..0cfc912cb 100644 --- a/docker-compose.2_4.yml +++ b/docker-compose.2_4.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -46,7 +47,7 @@ services: KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf' KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer" - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' # suppress verbosity # https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO' @@ -81,6 +82,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -127,6 +129,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_4_oauthbearer.yml b/docker-compose.2_4_oauthbearer.yml index 05ea24dd5..1979a9fe7 100644 --- a/docker-compose.2_4_oauthbearer.yml +++ b/docker-compose.2_4_oauthbearer.yml @@ -35,6 +35,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -80,6 +81,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -125,6 +127,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/src/broker/__tests__/createTopics.spec.js b/src/broker/__tests__/createTopics.spec.js index 4c32b05c3..d8ddbe7c4 100644 --- a/src/broker/__tests__/createTopics.spec.js +++ b/src/broker/__tests__/createTopics.spec.js @@ -1,10 +1,18 @@ -const { createConnection, connectionOpts, secureRandom, newLogger } = require('testHelpers') +const { + createCluster, + createConnection, + connectionOpts, + secureRandom, + newLogger, +} = require('testHelpers') +const { ConfigResourceTypes } = require('../../..') +const createAdmin = require('../../admin') const Broker = require('../index') const topicNameComparator = (a, b) => a.topic.localeCompare(b.topic) describe('Broker > createTopics', () => { - let seedBroker, broker + let seedBroker, broker, admin beforeEach(async () => { seedBroker = new Broker({ @@ -19,12 +27,16 @@ describe('Broker > createTopics', () => { broker = new Broker({ connection: createConnection(newBrokerData), logger: newLogger(), + allowAutoTopicCreation: false, }) + + admin = createAdmin({ logger: newLogger(), cluster: createCluster() }) }) afterEach(async () => { seedBroker && (await seedBroker.disconnect()) broker && (await broker.disconnect()) + admin && (await admin.disconnect()) }) test('request', async () => { @@ -73,4 +85,45 @@ describe('Broker > createTopics', () => { ].sort(topicNameComparator), }) }) + + it('should use the default replication factor', async () => { + await broker.connect() + const topicName = `test-topic-${secureRandom()}` + const response = await broker.createTopics({ + topics: [{ topic: topicName }], + }) + + expect(response).toEqual( + expect.objectContaining({ + topicErrors: expect.arrayContaining([ + expect.objectContaining({ + topic: topicName, + errorCode: 0, + }), + ]), + }) + ) + + await admin.connect() + const describeResponse = await admin.describeConfigs({ + resources: [ + { + type: ConfigResourceTypes.BROKER, + name: '0', + configNames: ['default.replication.factor'], + }, + ], + }) + const defaultReplicationFactor = parseInt( + describeResponse.resources[0].configEntries[0].configValue, + null, + 10 + ) + expect(defaultReplicationFactor).toBeGreaterThan(0) + + const metadata = await broker.metadata([topicName]) + expect(metadata.topicMetadata[0].partitionMetadata[0].replicas.length).toEqual( + defaultReplicationFactor + ) + }) }) diff --git a/src/protocol/requests/createTopics/fixtures/v0_request.json b/src/protocol/requests/createTopics/fixtures/v0_request.json index b5e57da2a..b9bc6558e 100644 --- a/src/protocol/requests/createTopics/fixtures/v0_request.json +++ b/src/protocol/requests/createTopics/fixtures/v0_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,19,136]} +{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136]} diff --git a/src/protocol/requests/createTopics/fixtures/v1_request.json b/src/protocol/requests/createTopics/fixtures/v1_request.json index 3fc8fd120..cdc7fc8dd 100644 --- a/src/protocol/requests/createTopics/fixtures/v1_request.json +++ b/src/protocol/requests/createTopics/fixtures/v1_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,19,136,1]} +{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136,1]} diff --git a/src/protocol/requests/createTopics/fixtures/v2_request.json b/src/protocol/requests/createTopics/fixtures/v2_request.json index a75dfda90..34cf92f14 100644 --- a/src/protocol/requests/createTopics/fixtures/v2_request.json +++ b/src/protocol/requests/createTopics/fixtures/v2_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,74,116,101,115,116,45,116,111,112,105,99,45,102,100,101,54,55,98,53,97,55,57,55,57,56,52,97,99,48,56,51,55,45,53,53,52,57,50,45,49,98,102,50,102,51,48,97,45,99,99,101,56,45,52,48,51,100,45,56,56,57,55,45,54,57,48,50,97,48,98,56,54,102,98,48,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,74,116,101,115,116,45,116,111,112,105,99,45,51,100,54,99,53,51,97,102,50,101,48,102,57,98,49,100,49,55,53,55,45,53,53,52,57,50,45,99,98,100,101,50,51,52,52,45,100,57,100,51,45,52,97,100,55,45,98,52,48,56,45,57,57,54,99,100,97,49,51,101,54,101,53,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,19,136,0]} +{"type":"Buffer","data":[0,0,0,2,0,74,116,101,115,116,45,116,111,112,105,99,45,102,100,101,54,55,98,53,97,55,57,55,57,56,52,97,99,48,56,51,55,45,53,53,52,57,50,45,49,98,102,50,102,51,48,97,45,99,99,101,56,45,52,48,51,100,45,56,56,57,55,45,54,57,48,50,97,48,98,56,54,102,98,48,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,74,116,101,115,116,45,116,111,112,105,99,45,51,100,54,99,53,51,97,102,50,101,48,102,57,98,49,100,49,55,53,55,45,53,53,52,57,50,45,99,98,100,101,50,51,52,52,45,100,57,100,51,45,52,97,100,55,45,98,52,48,56,45,57,57,54,99,100,97,49,51,101,54,101,53,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136,0]} diff --git a/src/protocol/requests/createTopics/v0/request.js b/src/protocol/requests/createTopics/v0/request.js index 65de02ddc..91fb2e32f 100644 --- a/src/protocol/requests/createTopics/v0/request.js +++ b/src/protocol/requests/createTopics/v0/request.js @@ -28,7 +28,7 @@ module.exports = ({ topics, timeout = 5000 }) => ({ const encodeTopics = ({ topic, numPartitions = 1, - replicationFactor = 1, + replicationFactor = -1, replicaAssignment = [], configEntries = [], }) => { diff --git a/src/protocol/requests/createTopics/v1/request.js b/src/protocol/requests/createTopics/v1/request.js index 06d791d06..1d6ec5606 100644 --- a/src/protocol/requests/createTopics/v1/request.js +++ b/src/protocol/requests/createTopics/v1/request.js @@ -32,7 +32,7 @@ module.exports = ({ topics, validateOnly = false, timeout = 5000 }) => ({ const encodeTopics = ({ topic, numPartitions = 1, - replicationFactor = 1, + replicationFactor = -1, replicaAssignment = [], configEntries = [], }) => { From 715afe9fe3e04fcaf67452a0c177b47fb53d4efb Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 8 Mar 2022 17:56:59 +0100 Subject: [PATCH 2/2] Respect default num partitions configuration when creating topics --- docker-compose.0_10.yml | 3 +++ docker-compose.0_11.yml | 3 +++ docker-compose.1_1.yml | 3 +++ docker-compose.2_2.yml | 3 +++ docker-compose.2_3.yml | 3 +++ docker-compose.2_4.yml | 3 +++ docker-compose.2_4_oauthbearer.yml | 3 +++ src/admin/__tests__/createPartitions.spec.js | 8 ++++---- src/admin/__tests__/fetchOffsets.spec.js | 6 +++++- src/admin/__tests__/getTopicMetadata.spec.js | 2 +- src/broker/__tests__/createTopics.spec.js | 11 +++++++++-- .../requests/createTopics/fixtures/v0_request.json | 2 +- .../requests/createTopics/fixtures/v1_request.json | 2 +- .../requests/createTopics/fixtures/v2_request.json | 2 +- src/protocol/requests/createTopics/v0/request.js | 2 +- src/protocol/requests/createTopics/v1/request.js | 2 +- 16 files changed, 45 insertions(+), 13 deletions(-) diff --git a/docker-compose.0_10.yml b/docker-compose.0_10.yml index 4059bc236..c48613b25 100644 --- a/docker-compose.0_10.yml +++ b/docker-compose.0_10.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -84,6 +85,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -132,6 +134,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.0_11.yml b/docker-compose.0_11.yml index aaf7fac90..ec384dbc6 100644 --- a/docker-compose.0_11.yml +++ b/docker-compose.0_11.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -84,6 +85,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -132,6 +134,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.1_1.yml b/docker-compose.1_1.yml index 545f2cce5..a070c41b6 100644 --- a/docker-compose.1_1.yml +++ b/docker-compose.1_1.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -84,6 +85,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -132,6 +134,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_2.yml b/docker-compose.2_2.yml index efe77ea7a..508dc4d50 100644 --- a/docker-compose.2_2.yml +++ b/docker-compose.2_2.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -83,6 +84,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -130,6 +132,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_3.yml b/docker-compose.2_3.yml index a992caba6..c930b850e 100644 --- a/docker-compose.2_3.yml +++ b/docker-compose.2_3.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -83,6 +84,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -130,6 +132,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_4.yml b/docker-compose.2_4.yml index 0cfc912cb..dce014477 100644 --- a/docker-compose.2_4.yml +++ b/docker-compose.2_4.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -83,6 +84,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -130,6 +132,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/docker-compose.2_4_oauthbearer.yml b/docker-compose.2_4_oauthbearer.yml index 1979a9fe7..5639369fa 100644 --- a/docker-compose.2_4_oauthbearer.yml +++ b/docker-compose.2_4_oauthbearer.yml @@ -36,6 +36,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -82,6 +83,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' @@ -128,6 +130,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_NUM_PARTITIONS: '2' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' KAFKA_SSL_KEYSTORE_FILENAME: 'kafka.server.keystore.jks' diff --git a/src/admin/__tests__/createPartitions.spec.js b/src/admin/__tests__/createPartitions.spec.js index 1978a4844..d8171694e 100644 --- a/src/admin/__tests__/createPartitions.spec.js +++ b/src/admin/__tests__/createPartitions.spec.js @@ -56,7 +56,7 @@ describe('Admin', () => { await expect( admin.createPartitions({ - topicPartitions: [{ topic: topicName + 'x', count: 2 }], + topicPartitions: [{ topic: topicName + 'x', count: 3 }], }) ).rejects.toHaveProperty('message', 'This server does not host this topic-partition') }) @@ -75,7 +75,7 @@ describe('Admin', () => { await expect( admin.createPartitions({ - topicPartitions: [{ topic: topicName, count: 2, assignments: [[10]] }], + topicPartitions: [{ topic: topicName, count: 3, assignments: [[10]] }], }) ).rejects.toHaveProperty('message', 'Replica assignment is invalid') }) @@ -112,7 +112,7 @@ describe('Admin', () => { await expect( admin.createPartitions({ - topicPartitions: [{ topic: topicName, count: 2 }], + topicPartitions: [{ topic: topicName, count: 3 }], }) ).resolves.not.toThrow() }) @@ -132,7 +132,7 @@ describe('Admin', () => { admin = createAdmin({ cluster, logger: newLogger() }) await expect( admin.createPartitions({ - topicPartitions: [{ topic: topicName, count: 2 }], + topicPartitions: [{ topic: topicName, count: 3 }], }) ).resolves.not.toThrow() diff --git a/src/admin/__tests__/fetchOffsets.spec.js b/src/admin/__tests__/fetchOffsets.spec.js index 39cf1fd36..b9f1835d6 100644 --- a/src/admin/__tests__/fetchOffsets.spec.js +++ b/src/admin/__tests__/fetchOffsets.spec.js @@ -22,7 +22,11 @@ describe('Admin', () => { yetAnotherTopicName = `yet-another-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` - await createTopic({ topic: topicName }) + await Promise.all( + [topicName, anotherTopicName, yetAnotherTopicName].map(topic => + createTopic({ topic, numPartitions: 1 }) + ) + ) logger = newLogger() cluster = createCluster() diff --git a/src/admin/__tests__/getTopicMetadata.spec.js b/src/admin/__tests__/getTopicMetadata.spec.js index 3dded2f2d..13c8ca65a 100644 --- a/src/admin/__tests__/getTopicMetadata.spec.js +++ b/src/admin/__tests__/getTopicMetadata.spec.js @@ -78,7 +78,7 @@ describe('Admin', () => { expect(topicsMetadata[0]).toHaveProperty('name', existingTopicName) expect(topicsMetadata[1]).toHaveProperty('name', newTopicName) - expect(topicsMetadata[1].partitions).toHaveLength(1) + expect(topicsMetadata[1].partitions).toHaveLength(2) }) test('throws an error if the topic does not exist and "allowAutoTopicCreation" is false', async () => { diff --git a/src/broker/__tests__/createTopics.spec.js b/src/broker/__tests__/createTopics.spec.js index d8ddbe7c4..0d97be56d 100644 --- a/src/broker/__tests__/createTopics.spec.js +++ b/src/broker/__tests__/createTopics.spec.js @@ -86,7 +86,7 @@ describe('Broker > createTopics', () => { }) }) - it('should use the default replication factor', async () => { + it('should use the default replication factor and num partitions if not specified', async () => { await broker.connect() const topicName = `test-topic-${secureRandom()}` const response = await broker.createTopics({ @@ -110,7 +110,7 @@ describe('Broker > createTopics', () => { { type: ConfigResourceTypes.BROKER, name: '0', - configNames: ['default.replication.factor'], + configNames: ['default.replication.factor', 'num.partitions'], }, ], }) @@ -119,11 +119,18 @@ describe('Broker > createTopics', () => { null, 10 ) + const defaultNumPartitions = parseInt( + describeResponse.resources[0].configEntries[1].configValue, + null, + 10 + ) expect(defaultReplicationFactor).toBeGreaterThan(0) + expect(defaultNumPartitions).toBeGreaterThan(0) const metadata = await broker.metadata([topicName]) expect(metadata.topicMetadata[0].partitionMetadata[0].replicas.length).toEqual( defaultReplicationFactor ) + expect(metadata.topicMetadata[0].partitionMetadata).toHaveLength(defaultNumPartitions) }) }) diff --git a/src/protocol/requests/createTopics/fixtures/v0_request.json b/src/protocol/requests/createTopics/fixtures/v0_request.json index b9bc6558e..69ab946a0 100644 --- a/src/protocol/requests/createTopics/fixtures/v0_request.json +++ b/src/protocol/requests/createTopics/fixtures/v0_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136]} +{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,0,19,136]} diff --git a/src/protocol/requests/createTopics/fixtures/v1_request.json b/src/protocol/requests/createTopics/fixtures/v1_request.json index cdc7fc8dd..1408b47d0 100644 --- a/src/protocol/requests/createTopics/fixtures/v1_request.json +++ b/src/protocol/requests/createTopics/fixtures/v1_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136,1]} +{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,0,19,136,1]} diff --git a/src/protocol/requests/createTopics/fixtures/v2_request.json b/src/protocol/requests/createTopics/fixtures/v2_request.json index 34cf92f14..ed4c687d2 100644 --- a/src/protocol/requests/createTopics/fixtures/v2_request.json +++ b/src/protocol/requests/createTopics/fixtures/v2_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[0,0,0,2,0,74,116,101,115,116,45,116,111,112,105,99,45,102,100,101,54,55,98,53,97,55,57,55,57,56,52,97,99,48,56,51,55,45,53,53,52,57,50,45,49,98,102,50,102,51,48,97,45,99,99,101,56,45,52,48,51,100,45,56,56,57,55,45,54,57,48,50,97,48,98,56,54,102,98,48,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,74,116,101,115,116,45,116,111,112,105,99,45,51,100,54,99,53,51,97,102,50,101,48,102,57,98,49,100,49,55,53,55,45,53,53,52,57,50,45,99,98,100,101,50,51,52,52,45,100,57,100,51,45,52,97,100,55,45,98,52,48,56,45,57,57,54,99,100,97,49,51,101,54,101,53,0,0,0,1,255,255,0,0,0,0,0,0,0,0,0,0,19,136,0]} +{"type":"Buffer","data":[0,0,0,2,0,74,116,101,115,116,45,116,111,112,105,99,45,102,100,101,54,55,98,53,97,55,57,55,57,56,52,97,99,48,56,51,55,45,53,53,52,57,50,45,49,98,102,50,102,51,48,97,45,99,99,101,56,45,52,48,51,100,45,56,56,57,55,45,54,57,48,50,97,48,98,56,54,102,98,48,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,74,116,101,115,116,45,116,111,112,105,99,45,51,100,54,99,53,51,97,102,50,101,48,102,57,98,49,100,49,55,53,55,45,53,53,52,57,50,45,99,98,100,101,50,51,52,52,45,100,57,100,51,45,52,97,100,55,45,98,52,48,56,45,57,57,54,99,100,97,49,51,101,54,101,53,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,0,19,136,0]} diff --git a/src/protocol/requests/createTopics/v0/request.js b/src/protocol/requests/createTopics/v0/request.js index 91fb2e32f..2a635d8d9 100644 --- a/src/protocol/requests/createTopics/v0/request.js +++ b/src/protocol/requests/createTopics/v0/request.js @@ -27,7 +27,7 @@ module.exports = ({ topics, timeout = 5000 }) => ({ const encodeTopics = ({ topic, - numPartitions = 1, + numPartitions = -1, replicationFactor = -1, replicaAssignment = [], configEntries = [], diff --git a/src/protocol/requests/createTopics/v1/request.js b/src/protocol/requests/createTopics/v1/request.js index 1d6ec5606..151a01476 100644 --- a/src/protocol/requests/createTopics/v1/request.js +++ b/src/protocol/requests/createTopics/v1/request.js @@ -31,7 +31,7 @@ module.exports = ({ topics, validateOnly = false, timeout = 5000 }) => ({ const encodeTopics = ({ topic, - numPartitions = 1, + numPartitions = -1, replicationFactor = -1, replicaAssignment = [], configEntries = [],