Skip to content

Commit

Permalink
Log zk retry, reduce default to 10
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Feb 27, 2020
1 parent d5c3c0c commit 165e29f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
12 changes: 11 additions & 1 deletion app/kafka/manager/base/CuratorAwareActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@

package kafka.manager.base

import akka.actor.ActorLogging
import kafka.manager.model.CuratorConfig
import org.apache.curator.RetrySleeper
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.BoundedExponentialBackoffRetry

import scala.util.Try

class LoggingRetryPolicy(curatorConfig: CuratorConfig, actorLogging: ActorLogging
) extends BoundedExponentialBackoffRetry(curatorConfig.baseSleepTimeMs
, curatorConfig.maxSleepTimeMs, curatorConfig.zkMaxRetry) {
override def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper: RetrySleeper): Boolean = {
actorLogging.log.info(s"retryCount=$retryCount maxRetries=${curatorConfig.zkMaxRetry} zkConnect=${curatorConfig.zkConnect}")
super.allowRetry(retryCount, elapsedTimeMs, sleeper)
}
}

trait CuratorAwareActor extends BaseActor {

Expand All @@ -23,7 +33,7 @@ trait CuratorAwareActor extends BaseActor {
protected def getCurator(config: CuratorConfig) : CuratorFramework = {
val curator: CuratorFramework = CuratorFrameworkFactory.newClient(
config.zkConnect,
new BoundedExponentialBackoffRetry(config.baseSleepTimeMs, config.maxSleepTimeMs, config.zkMaxRetry))
new LoggingRetryPolicy(config, this))
curator
}

Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scalaz.Validation.FlatMap._
/**
* @author hiral
*/
case class CuratorConfig(zkConnect: String, zkMaxRetry: Int = 100, baseSleepTimeMs : Int = 100, maxSleepTimeMs: Int = 1000)
case class CuratorConfig(zkConnect: String, zkMaxRetry: Int = 10, baseSleepTimeMs : Int = 100, maxSleepTimeMs: Int = 1000)

sealed trait KafkaVersion
case object Kafka_0_8_1_1 extends KafkaVersion {
Expand Down Expand Up @@ -185,7 +185,7 @@ object ClusterConfig {
def apply(name: String
, version : String
, zkHosts: String
, zkMaxRetry: Int = 100
, zkMaxRetry: Int = 10
, jmxEnabled: Boolean
, jmxUser: Option[String]
, jmxPass: Option[String]
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
name := """cmak"""

/* For packaging purposes, -SNAPSHOT MUST contain a digit */
version := "3.0.0.1"
version := "3.0.0.2"

scalaVersion := "2.12.10"

Expand Down

0 comments on commit 165e29f

Please sign in to comment.