Skip to content

Commit

Permalink
Merge pull request #726 from yahoo/surabhip-issue713
Browse files Browse the repository at this point in the history
Rename config keys to cmak to address issue#713
  • Loading branch information
patelh authored Feb 20, 2020
2 parents b38d85b + 5a4a96d commit 555167a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 51 deletions.
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ The minimum configuration is the zookeeper hosts which are to be used for CMAK (
This can be found in the application.conf file in conf directory. The same file will be packaged
in the distribution zip file; you may modify settings after unzipping the file on the desired server.

kafka-manager.zkhosts="my.zookeeper.host.com:2181"
cmak.zkhosts="my.zookeeper.host.com:2181"

You can specify multiple zookeeper hosts by comma delimiting them, like so:

kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
cmak.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"

Alternatively, use the environment variable `ZK_HOSTS` if you don't want to hardcode any values.

Expand All @@ -97,22 +97,22 @@ You can optionally enable/disable the following functionality by modifying the d

Consider setting these parameters for larger clusters with jmx enabled :

- kafka-manager.broker-view-thread-pool-size=< 3 * number_of_brokers>
- kafka-manager.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
- kafka-manager.broker-view-update-seconds=< kafka-manager.broker-view-max-queue-size / (10 * number_of_brokers) >
- cmak.broker-view-thread-pool-size=< 3 * number_of_brokers>
- cmak.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
- cmak.broker-view-update-seconds=< cmak.broker-view-max-queue-size / (10 * number_of_brokers) >

Here is an example for a kafka cluster with 10 brokers, 100 topics, with each topic having 10 partitions giving 1000 total partitions with JMX enabled :

- kafka-manager.broker-view-thread-pool-size=30
- kafka-manager.broker-view-max-queue-size=3000
- kafka-manager.broker-view-update-seconds=30
- cmak.broker-view-thread-pool-size=30
- cmak.broker-view-max-queue-size=3000
- cmak.broker-view-update-seconds=30

The follow control consumer offset cache's thread pool and queue :

- kafka-manager.offset-cache-thread-pool-size=< default is # of processors>
- kafka-manager.offset-cache-max-queue-size=< default is 1000>
- kafka-manager.kafka-admin-client-thread-pool-size=< default is # of processors>
- kafka-manager.kafka-admin-client-max-queue-size=< default is 1000>
- cmak.offset-cache-thread-pool-size=< default is # of processors>
- cmak.offset-cache-max-queue-size=< default is 1000>
- cmak.kafka-admin-client-thread-pool-size=< default is # of processors>
- cmak.kafka-admin-client-max-queue-size=< default is 1000>

You should increase the above for large # of consumers with consumer polling enabled. Though it mainly affects ZK based consumer polling.

Expand Down
63 changes: 31 additions & 32 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,37 @@ object ApiError extends Logging {
}
}

object KafkaManager {

val ConsumerPropertiesFile = "kafka-manager.consumer.properties.file"
val BaseZkPath = "kafka-manager.base-zk-path"
val PinnedDispatchName = "kafka-manager.pinned-dispatcher-name"
val ZkHosts = "kafka-manager.zkhosts"
val BrokerViewUpdateSeconds = "kafka-manager.broker-view-update-seconds"
val KafkaManagerUpdateSeconds = "kafka-manager.kafka-manager-update-seconds"
val DeleteClusterUpdateSeconds = "kafka-manager.delete-cluster-update-seconds"
val DeletionBatchSize = "kafka-manager.deletion-batch-size"
val MaxQueueSize = "kafka-manager.max-queue-size"
val ThreadPoolSize = "kafka-manager.thread-pool-size"
val MutexTimeoutMillis = "kafka-manager.mutex-timeout-millis"
val StartDelayMillis = "kafka-manager.start-delay-millis"
val ApiTimeoutMillis = "kafka-manager.api-timeout-millis"
val ClusterActorsAskTimeoutMillis = "kafka-manager.cluster-actors-ask-timeout-millis"
val PartitionOffsetCacheTimeoutSecs = "kafka-manager.partition-offset-cache-timeout-secs"
val SimpleConsumerSocketTimeoutMillis = "kafka-manager.simple-consumer-socket-timeout-millis"
val BrokerViewThreadPoolSize = "kafka-manager.broker-view-thread-pool-size"
val BrokerViewMaxQueueSize = "kafka-manager.broker-view-max-queue-size"
val OffsetCacheThreadPoolSize = "kafka-manager.offset-cache-thread-pool-size"
val OffsetCacheMaxQueueSize = "kafka-manager.offset-cache-max-queue-size"
val KafkaAdminClientThreadPoolSize = "kafka-manager.kafka-admin-client-thread-pool-size"
val KafkaAdminClientMaxQueueSize = "kafka-manager.kafka-admin-client-max-queue-size"
val KafkaManagedOffsetMetadataCheckMillis = "kafka-manager.kafka-managed-offset-metadata-check-millis"
val KafkaManagedOffsetGroupCacheSize = "kafka-manager.kafka-managed-offset-group-cache-size"
val KafkaManagedOffsetGroupExpireDays = "kafka-manager.kafka-managed-offset-group-expire-days"
import akka.pattern._
import scalaz.{-\/, \/, \/-}
class KafkaManager(akkaConfig: Config) extends Logging {

def getPrefixedKey(key: String): String = if (akkaConfig.hasPathOrNull(s"cmak.$key")) s"cmak.$key" else s"kafka-manager.$key"

val ConsumerPropertiesFile = getPrefixedKey("consumer.properties.file")
val BaseZkPath = getPrefixedKey("base-zk-path")
val PinnedDispatchName = getPrefixedKey("pinned-dispatcher-name")
val ZkHosts = getPrefixedKey("zkhosts")
val BrokerViewUpdateSeconds = getPrefixedKey("broker-view-update-seconds")
val KafkaManagerUpdateSeconds = getPrefixedKey("kafka-manager-update-seconds")
val DeleteClusterUpdateSeconds = getPrefixedKey("delete-cluster-update-seconds")
val DeletionBatchSize = getPrefixedKey("deletion-batch-size")
val MaxQueueSize = getPrefixedKey("max-queue-size")
val ThreadPoolSize = getPrefixedKey("thread-pool-size")
val MutexTimeoutMillis = getPrefixedKey("mutex-timeout-millis")
val StartDelayMillis = getPrefixedKey("start-delay-millis")
val ApiTimeoutMillis = getPrefixedKey("api-timeout-millis")
val ClusterActorsAskTimeoutMillis = getPrefixedKey("cluster-actors-ask-timeout-millis")
val PartitionOffsetCacheTimeoutSecs = getPrefixedKey("partition-offset-cache-timeout-secs")
val SimpleConsumerSocketTimeoutMillis = getPrefixedKey("simple-consumer-socket-timeout-millis")
val BrokerViewThreadPoolSize = getPrefixedKey("broker-view-thread-pool-size")
val BrokerViewMaxQueueSize = getPrefixedKey("broker-view-max-queue-size")
val OffsetCacheThreadPoolSize = getPrefixedKey("offset-cache-thread-pool-size")
val OffsetCacheMaxQueueSize = getPrefixedKey("offset-cache-max-queue-size")
val KafkaAdminClientThreadPoolSize = getPrefixedKey("kafka-admin-client-thread-pool-size")
val KafkaAdminClientMaxQueueSize = getPrefixedKey("kafka-admin-client-max-queue-size")
val KafkaManagedOffsetMetadataCheckMillis = getPrefixedKey("kafka-managed-offset-metadata-check-millis")
val KafkaManagedOffsetGroupCacheSize = getPrefixedKey("kafka-managed-offset-group-cache-size")
val KafkaManagedOffsetGroupExpireDays = getPrefixedKey("kafka-managed-offset-group-expire-days")

val DefaultConfig: Config = {
val defaults: Map[String, _ <: AnyRef] = Map(
Expand Down Expand Up @@ -114,12 +118,7 @@ object KafkaManager {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(defaults.asJava)
}
}

import KafkaManager._
import akka.pattern._
import scalaz.{-\/, \/, \/-}
class KafkaManager(akkaConfig: Config) extends Logging {
private[this] val system = ActorSystem("kafka-manager-system", akkaConfig)

private[this] val configWithDefaults = akkaConfig.withFallback(DefaultConfig)
Expand Down
5 changes: 5 additions & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader

# Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead.
# https://github.com/yahoo/CMAK/issues/713
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
cmak.zkhosts="kafka-manager-zookeeper:2181"
cmak.zkhosts=${?ZK_HOSTS}

pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
Expand Down
4 changes: 2 additions & 2 deletions test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M
val configMap: Map[String, AnyRef] = Map(
"pinned-dispatcher.type" -> "PinnedDispatcher",
"pinned-dispatcher.executor" -> "thread-pool-executor",
"kafka-manager.zkhosts" -> kafkaServerZkPath,
KafkaManager.ConsumerPropertiesFile -> "conf/consumer.properties"
"cmak.zkhosts" -> kafkaServerZkPath,
"cmak.consumer.properties.file" -> "conf/consumer.properties"
)
val loader = new KafkaManagerLoaderForTests
application = Option(loader.load(ApplicationLoader.createContext(
Expand Down
10 changes: 5 additions & 5 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
private[this] val akkaConfig: Properties = new Properties()
akkaConfig.setProperty("pinned-dispatcher.type","PinnedDispatcher")
akkaConfig.setProperty("pinned-dispatcher.executor","thread-pool-executor")
akkaConfig.setProperty(KafkaManager.ZkHosts,testServer.getConnectString)
akkaConfig.setProperty(KafkaManager.BrokerViewUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.KafkaManagerUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.DeleteClusterUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.ConsumerPropertiesFile,"conf/consumer.properties")
akkaConfig.setProperty("cmak.zkhosts",testServer.getConnectString)
akkaConfig.setProperty("cmak.broker-view-update-seconds","1")
akkaConfig.setProperty("cmak.kafka-manager-update-seconds","1")
akkaConfig.setProperty("cmak.delete-cluster-update-seconds","1")
akkaConfig.setProperty("cmak.consumer.properties.file","conf/consumer.properties")
private[this] val config : Config = ConfigFactory.parseProperties(akkaConfig)

private[this] val kafkaManager : KafkaManager = new KafkaManager(config)
Expand Down

0 comments on commit 555167a

Please sign in to comment.