diff --git a/README.md b/README.md index 8341bba40..e1704c6ad 100644 --- a/README.md +++ b/README.md @@ -76,11 +76,11 @@ The minimum configuration is the zookeeper hosts which are to be used for CMAK ( This can be found in the application.conf file in conf directory. The same file will be packaged in the distribution zip file; you may modify settings after unzipping the file on the desired server. - kafka-manager.zkhosts="my.zookeeper.host.com:2181" + cmak.zkhosts="my.zookeeper.host.com:2181" You can specify multiple zookeeper hosts by comma delimiting them, like so: - kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181" + cmak.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181" Alternatively, use the environment variable `ZK_HOSTS` if you don't want to hardcode any values. @@ -97,22 +97,22 @@ You can optionally enable/disable the following functionality by modifying the d Consider setting these parameters for larger clusters with jmx enabled : - - kafka-manager.broker-view-thread-pool-size=< 3 * number_of_brokers> - - kafka-manager.broker-view-max-queue-size=< 3 * total # of partitions across all topics> - - kafka-manager.broker-view-update-seconds=< kafka-manager.broker-view-max-queue-size / (10 * number_of_brokers) > + - cmak.broker-view-thread-pool-size=< 3 * number_of_brokers> + - cmak.broker-view-max-queue-size=< 3 * total # of partitions across all topics> + - cmak.broker-view-update-seconds=< cmak.broker-view-max-queue-size / (10 * number_of_brokers) > Here is an example for a kafka cluster with 10 brokers, 100 topics, with each topic having 10 partitions giving 1000 total partitions with JMX enabled : - - kafka-manager.broker-view-thread-pool-size=30 - - kafka-manager.broker-view-max-queue-size=3000 - - kafka-manager.broker-view-update-seconds=30 + - cmak.broker-view-thread-pool-size=30 + - cmak.broker-view-max-queue-size=3000 + - cmak.broker-view-update-seconds=30 The follow control consumer offset cache's thread pool and queue : - - kafka-manager.offset-cache-thread-pool-size=< default is # of processors> - - kafka-manager.offset-cache-max-queue-size=< default is 1000> - - kafka-manager.kafka-admin-client-thread-pool-size=< default is # of processors> - - kafka-manager.kafka-admin-client-max-queue-size=< default is 1000> + - cmak.offset-cache-thread-pool-size=< default is # of processors> + - cmak.offset-cache-max-queue-size=< default is 1000> + - cmak.kafka-admin-client-thread-pool-size=< default is # of processors> + - cmak.kafka-admin-client-max-queue-size=< default is 1000> You should increase the above for large # of consumers with consumer polling enabled. Though it mainly affects ZK based consumer polling. diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 7e563c3d2..206f886fa 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -57,33 +57,37 @@ object ApiError extends Logging { } } -object KafkaManager { - - val ConsumerPropertiesFile = "kafka-manager.consumer.properties.file" - val BaseZkPath = "kafka-manager.base-zk-path" - val PinnedDispatchName = "kafka-manager.pinned-dispatcher-name" - val ZkHosts = "kafka-manager.zkhosts" - val BrokerViewUpdateSeconds = "kafka-manager.broker-view-update-seconds" - val KafkaManagerUpdateSeconds = "kafka-manager.kafka-manager-update-seconds" - val DeleteClusterUpdateSeconds = "kafka-manager.delete-cluster-update-seconds" - val DeletionBatchSize = "kafka-manager.deletion-batch-size" - val MaxQueueSize = "kafka-manager.max-queue-size" - val ThreadPoolSize = "kafka-manager.thread-pool-size" - val MutexTimeoutMillis = "kafka-manager.mutex-timeout-millis" - val StartDelayMillis = "kafka-manager.start-delay-millis" - val ApiTimeoutMillis = "kafka-manager.api-timeout-millis" - val ClusterActorsAskTimeoutMillis = "kafka-manager.cluster-actors-ask-timeout-millis" - val PartitionOffsetCacheTimeoutSecs = "kafka-manager.partition-offset-cache-timeout-secs" - val SimpleConsumerSocketTimeoutMillis = "kafka-manager.simple-consumer-socket-timeout-millis" - val BrokerViewThreadPoolSize = "kafka-manager.broker-view-thread-pool-size" - val BrokerViewMaxQueueSize = "kafka-manager.broker-view-max-queue-size" - val OffsetCacheThreadPoolSize = "kafka-manager.offset-cache-thread-pool-size" - val OffsetCacheMaxQueueSize = "kafka-manager.offset-cache-max-queue-size" - val KafkaAdminClientThreadPoolSize = "kafka-manager.kafka-admin-client-thread-pool-size" - val KafkaAdminClientMaxQueueSize = "kafka-manager.kafka-admin-client-max-queue-size" - val KafkaManagedOffsetMetadataCheckMillis = "kafka-manager.kafka-managed-offset-metadata-check-millis" - val KafkaManagedOffsetGroupCacheSize = "kafka-manager.kafka-managed-offset-group-cache-size" - val KafkaManagedOffsetGroupExpireDays = "kafka-manager.kafka-managed-offset-group-expire-days" +import akka.pattern._ +import scalaz.{-\/, \/, \/-} +class KafkaManager(akkaConfig: Config) extends Logging { + + def getPrefixedKey(key: String): String = if (akkaConfig.hasPathOrNull(s"cmak.$key")) s"cmak.$key" else s"kafka-manager.$key" + + val ConsumerPropertiesFile = getPrefixedKey("consumer.properties.file") + val BaseZkPath = getPrefixedKey("base-zk-path") + val PinnedDispatchName = getPrefixedKey("pinned-dispatcher-name") + val ZkHosts = getPrefixedKey("zkhosts") + val BrokerViewUpdateSeconds = getPrefixedKey("broker-view-update-seconds") + val KafkaManagerUpdateSeconds = getPrefixedKey("kafka-manager-update-seconds") + val DeleteClusterUpdateSeconds = getPrefixedKey("delete-cluster-update-seconds") + val DeletionBatchSize = getPrefixedKey("deletion-batch-size") + val MaxQueueSize = getPrefixedKey("max-queue-size") + val ThreadPoolSize = getPrefixedKey("thread-pool-size") + val MutexTimeoutMillis = getPrefixedKey("mutex-timeout-millis") + val StartDelayMillis = getPrefixedKey("start-delay-millis") + val ApiTimeoutMillis = getPrefixedKey("api-timeout-millis") + val ClusterActorsAskTimeoutMillis = getPrefixedKey("cluster-actors-ask-timeout-millis") + val PartitionOffsetCacheTimeoutSecs = getPrefixedKey("partition-offset-cache-timeout-secs") + val SimpleConsumerSocketTimeoutMillis = getPrefixedKey("simple-consumer-socket-timeout-millis") + val BrokerViewThreadPoolSize = getPrefixedKey("broker-view-thread-pool-size") + val BrokerViewMaxQueueSize = getPrefixedKey("broker-view-max-queue-size") + val OffsetCacheThreadPoolSize = getPrefixedKey("offset-cache-thread-pool-size") + val OffsetCacheMaxQueueSize = getPrefixedKey("offset-cache-max-queue-size") + val KafkaAdminClientThreadPoolSize = getPrefixedKey("kafka-admin-client-thread-pool-size") + val KafkaAdminClientMaxQueueSize = getPrefixedKey("kafka-admin-client-max-queue-size") + val KafkaManagedOffsetMetadataCheckMillis = getPrefixedKey("kafka-managed-offset-metadata-check-millis") + val KafkaManagedOffsetGroupCacheSize = getPrefixedKey("kafka-managed-offset-group-cache-size") + val KafkaManagedOffsetGroupExpireDays = getPrefixedKey("kafka-managed-offset-group-expire-days") val DefaultConfig: Config = { val defaults: Map[String, _ <: AnyRef] = Map( @@ -114,12 +118,7 @@ object KafkaManager { import scala.collection.JavaConverters._ ConfigFactory.parseMap(defaults.asJava) } -} -import KafkaManager._ -import akka.pattern._ -import scalaz.{-\/, \/, \/-} -class KafkaManager(akkaConfig: Config) extends Logging { private[this] val system = ActorSystem("kafka-manager-system", akkaConfig) private[this] val configWithDefaults = akkaConfig.withFallback(DefaultConfig) diff --git a/conf/application.conf b/conf/application.conf index b28670d6f..f2f334004 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -21,8 +21,13 @@ play.http.requestHandler = "play.http.DefaultHttpRequestHandler" play.http.context = "/" play.application.loader=loader.KafkaManagerLoader +# Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead. +# https://github.com/yahoo/CMAK/issues/713 kafka-manager.zkhosts="kafka-manager-zookeeper:2181" kafka-manager.zkhosts=${?ZK_HOSTS} +cmak.zkhosts="kafka-manager-zookeeper:2181" +cmak.zkhosts=${?ZK_HOSTS} + pinned-dispatcher.type="PinnedDispatcher" pinned-dispatcher.executor="thread-pool-executor" application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"] diff --git a/test/controller/api/TestKafkaStateCheck.scala b/test/controller/api/TestKafkaStateCheck.scala index 81736a252..5905fd061 100644 --- a/test/controller/api/TestKafkaStateCheck.scala +++ b/test/controller/api/TestKafkaStateCheck.scala @@ -45,8 +45,8 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M val configMap: Map[String, AnyRef] = Map( "pinned-dispatcher.type" -> "PinnedDispatcher", "pinned-dispatcher.executor" -> "thread-pool-executor", - "kafka-manager.zkhosts" -> kafkaServerZkPath, - KafkaManager.ConsumerPropertiesFile -> "conf/consumer.properties" + "cmak.zkhosts" -> kafkaServerZkPath, + "cmak.consumer.properties.file" -> "conf/consumer.properties" ) val loader = new KafkaManagerLoaderForTests application = Option(loader.load(ApplicationLoader.createContext( diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index f6b032e24..195a92422 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -28,11 +28,11 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { private[this] val akkaConfig: Properties = new Properties() akkaConfig.setProperty("pinned-dispatcher.type","PinnedDispatcher") akkaConfig.setProperty("pinned-dispatcher.executor","thread-pool-executor") - akkaConfig.setProperty(KafkaManager.ZkHosts,testServer.getConnectString) - akkaConfig.setProperty(KafkaManager.BrokerViewUpdateSeconds,"1") - akkaConfig.setProperty(KafkaManager.KafkaManagerUpdateSeconds,"1") - akkaConfig.setProperty(KafkaManager.DeleteClusterUpdateSeconds,"1") - akkaConfig.setProperty(KafkaManager.ConsumerPropertiesFile,"conf/consumer.properties") + akkaConfig.setProperty("cmak.zkhosts",testServer.getConnectString) + akkaConfig.setProperty("cmak.broker-view-update-seconds","1") + akkaConfig.setProperty("cmak.kafka-manager-update-seconds","1") + akkaConfig.setProperty("cmak.delete-cluster-update-seconds","1") + akkaConfig.setProperty("cmak.consumer.properties.file","conf/consumer.properties") private[this] val config : Config = ConfigFactory.parseProperties(akkaConfig) private[this] val kafkaManager : KafkaManager = new KafkaManager(config)