Skip to content

Commit

Permalink
Merge pull request #727 from yahoo/hp-update-kf
Browse files Browse the repository at this point in the history
Update kafka dependency to 2.4
  • Loading branch information
patelh authored Feb 20, 2020
2 parents 555167a + 72acd31 commit 2740a58
Show file tree
Hide file tree
Showing 24 changed files with 1,390 additions and 513 deletions.
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ The command below will create a zip file which can be used to deploy the applica
Please refer to play framework documentation on [production deployment/configuration](https://www.playframework.com/documentation/2.4.x/ProductionConfiguration).

If java is not in your path, or you need to build against a specific java version,
please use the following (the example assumes oracle java8):
please use the following (the example assumes zulu java11):

$ PATH=/usr/local/oracle-java-8/bin:$PATH \
JAVA_HOME=/usr/local/oracle-java-8 \
/path/to/sbt -java-home /usr/local/oracle-java-8 clean dist
$ PATH=/usr/lib/jvm/zulu-11-amd64/bin:$PATH \
JAVA_HOME=/usr/lib/jvm/zulu-11-amd64 \
/path/to/sbt -java-home /usr/lib/jvm/zulu-11-amd64 clean dist

This ensures that the 'java' and 'javac' binaries in your path are first looked up in the
oracle java8 release. Next, for all downstream tools that only listen to JAVA_HOME, it points
them to the oracle java8 location. Lastly, it tells sbt to use the oracle java8 location as
correct location. Next, for all downstream tools that only listen to JAVA_HOME, it points
them to the java11 location. Lastly, it tells sbt to use the java11 location as
well.

Starting the service
Expand All @@ -199,7 +199,7 @@ configuration file. For example:
Again, if java is not in your path, or you need to run against a different version of java,
add the -java-home option as follows:

$ bin/cmak -java-home /usr/local/oracle-java-8
$ bin/cmak -java-home /usr/lib/jvm/zulu-11-amd64

Starting the service with Security
----------------------------------
Expand All @@ -223,8 +223,6 @@ If you'd like to create a Debian or RPM package instead, you can run one of:
Credits
-------

Logo/favicon used is from [Apache Kafka](http://kafka.apache.org) project and a registered trademark of the Apache Software Foundation.

Most of the utils code has been adapted to work with [Apache Curator](http://curator.apache.org) from [Apache Kafka](http://kafka.apache.org).

Name and Management
Expand All @@ -236,3 +234,8 @@ License
-------

Licensed under the terms of the Apache License 2.0. See accompanying LICENSE file for terms.

Consumer/Producer Lag
-------

Producer offset is polled. Consumer offset is read from the offset topic for Kafka based consumers. This means the reported lag may be negative since we are consuming offset from the offset topic faster then polling the producer offset. This is normal and not a problem.
6 changes: 3 additions & 3 deletions app/kafka/manager/actor/cluster/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import akka.actor.{ActorPath, Props}
import akka.pattern._
import akka.util.Timeout
import kafka.common.TopicAndPartition
import kafka.manager.base._
import kafka.manager.base.cluster.BaseClusterQueryCommandActor
import kafka.manager.features.{ClusterFeatures, KMJMXMetricsFeature, KMLogKafkaFeature}
import kafka.manager.logkafka._
import kafka.manager.model.{ClusterContext, ClusterConfig, CuratorConfig}
import kafka.manager.model.{ClusterConfig, ClusterContext, CuratorConfig}
import kafka.manager.utils.AdminUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.CreateMode

import scala.concurrent.duration.{FiniteDuration, _}
Expand Down Expand Up @@ -513,7 +513,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, None, clusterContext, None))
toElect = tis.flatMap(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => TopicAndPartition(ti.topic, tpi.partNum))).toSet
toElect = tis.flatMap(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => new TopicPartition(ti.topic, tpi.partNum))).toSet
} yield toElect
preferredLeaderElections.map { toElect =>
withKafkaCommandActor(KCPreferredReplicaLeaderElection(toElect)) { kcResponse: KCCommandResult =>
Expand Down
26 changes: 11 additions & 15 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import akka.pattern._
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import grizzled.slf4j.Logging
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.common.OffsetAndMetadata
import kafka.manager._
import kafka.manager.base.cluster.{BaseClusterQueryActor, BaseClusterQueryCommandActor}
import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig}
Expand All @@ -26,7 +26,7 @@ import kafka.manager.model.ActorModel._
import kafka.manager.model._
import kafka.manager.utils.ZkUtils
import kafka.manager.utils.zero81.{PreferredReplicaLeaderElectionCommand, ReassignPartitionCommand}
import kafka.manager.utils.one10.{GroupMetadata, GroupMetadataKey, MemberMetadata, OffsetKey}
import kafka.manager.utils.two40.{GroupMetadata, GroupMetadataKey, MemberMetadata, OffsetKey}
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.cache._
Expand Down Expand Up @@ -115,9 +115,6 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
AdminClient.create(props)
}

private def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)

override def processQueryRequest(request: QueryRequest): Unit = {
if(adminClientOption.isEmpty) {
context.actorSelection(config.kafkaStateActorPath).tell(KSGetBrokers, self)
Expand Down Expand Up @@ -238,8 +235,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
private[this] var lastGroupMemberMetadataCheckMillis : Long = System.currentTimeMillis()

import KafkaManagedOffsetCache._
//import kafka.manager.utils.zero90.GroupMetadataManager._
import kafka.manager.utils.one10.GroupMetadataManager._
import kafka.manager.utils.two40.GroupMetadataManager._

require(isSupported(clusterContext.config.version), s"Kafka version not support : ${clusterContext.config}")

Expand Down Expand Up @@ -506,8 +502,8 @@ trait OffsetCache extends Logging {
val kafkaConsumer = getKafkaConsumer()
val f: Future[Map[TopicPartition, java.lang.Long]] = Future {
try {
val topicAndPartitions = parts.map(tpl => (TopicAndPartition(topic, tpl._2), PartitionOffsetRequestInfo(time, nOffsets)))
val request: List[TopicPartition] = topicAndPartitions.map(f => new TopicPartition(f._1.topic, f._1.partition))
val topicAndPartitions = parts.map(tpl => (new TopicPartition(topic, tpl._2), PartitionOffsetRequestInfo(time, nOffsets)))
val request: List[TopicPartition] = topicAndPartitions.map(f => new TopicPartition(f._1.topic(), f._1.partition()))
kafkaConsumer.endOffsets(request.asJava).asScala.toMap
} finally {
kafkaConsumer.close()
Expand Down Expand Up @@ -1336,7 +1332,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
request match {
case KSUpdatePreferredLeaderElection(millis,json) =>
safeExecute {
val s: Set[TopicAndPartition] = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(json)
val s: Set[TopicPartition] = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(json)
preferredLeaderElection.fold {
//nothing there, add as new
preferredLeaderElection = Some(PreferredReplicaElection(getDateTime(millis), s, None, config.clusterContext))
Expand All @@ -1353,7 +1349,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
case KSUpdateReassignPartition(millis,json) =>
safeExecute {
val m : Map[TopicAndPartition, Seq[Int]] = ReassignPartitionCommand.parsePartitionReassignmentZkData(json)
val m : Map[TopicPartition, Seq[Int]] = ReassignPartitionCommand.parsePartitionReassignmentZkData(json)
reassignPartitions.fold {
//nothing there, add as new
reassignPartitions = Some(ReassignPartitions(getDateTime(millis),m, None, config.clusterContext))
Expand Down Expand Up @@ -1433,7 +1429,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
} { data: java.util.Map[String, ChildData] =>
var broker2TopicPartitionMap: Map[BrokerIdentity, List[(TopicAndPartition, PartitionOffsetRequestInfo)]] = Map()
var broker2TopicPartitionMap: Map[BrokerIdentity, List[(TopicPartition, PartitionOffsetRequestInfo)]] = Map()

breakable {
data.asScala.keys.toIndexedSeq.foreach(topic => {
Expand All @@ -1446,13 +1442,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
leaders.foreach(leader => {
leader._2 match {
case Some(brokerIden) =>
var tlList : List[(TopicAndPartition, PartitionOffsetRequestInfo)] = null
var tlList : List[(TopicPartition, PartitionOffsetRequestInfo)] = null
if (broker2TopicPartitionMap.contains(brokerIden)) {
tlList = broker2TopicPartitionMap(brokerIden)
} else {
tlList = List()
}
tlList = (TopicAndPartition(topic, leader._1), PartitionOffsetRequestInfo(-1, 1)) +: tlList
tlList = (new TopicPartition(topic, leader._1), PartitionOffsetRequestInfo(-1, 1)) +: tlList
broker2TopicPartitionMap += (brokerIden -> tlList)
case None =>
}
Expand Down Expand Up @@ -1487,7 +1483,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None
try {
kafkaConsumer = Option(new KafkaConsumer(consumerProperties))
val request = tpList.map(f => new TopicPartition(f._1.topic, f._1.partition))
val request = tpList.map(f => new TopicPartition(f._1.topic(), f._1.partition()))
var tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request.asJavaCollection).asScala)

var topicOffsetMap: Map[Int, Long] = null
Expand Down
23 changes: 13 additions & 10 deletions app/kafka/manager/model/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ package kafka.manager.model
import java.util.Properties

import grizzled.slf4j.Logging
import kafka.common.TopicAndPartition
import kafka.manager.jmx._
import kafka.manager.utils
import kafka.manager.utils.one10.MemberMetadata
import kafka.manager.utils.two40.MemberMetadata
import kafka.manager.utils.zero81.ForceReassignmentCommand
import org.apache.kafka.common.requests.DescribeGroupsResponse
import org.apache.kafka.common.TopicPartition
import org.joda.time.DateTime

import scala.collection.immutable.Queue
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}
import scala.util.Try
import scalaz.{NonEmptyList, Validation}

import scala.collection.immutable.Map

/**
Expand Down Expand Up @@ -128,7 +127,7 @@ object ActorModel {
readVersions: Map[String, Int]) extends CommandRequest
case class KCUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest
case class KCDeleteTopic(topic: String) extends CommandRequest
case class KCPreferredReplicaLeaderElection(topicAndPartition: Set[TopicAndPartition]) extends CommandRequest
case class KCPreferredReplicaLeaderElection(topicAndPartition: Set[TopicPartition]) extends CommandRequest
case class KCReassignPartition(currentTopicIdentity: Map[String, TopicIdentity]
, generatedTopicIdentity: Map[String, TopicIdentity]
, forceSet: Set[ForceReassignmentCommand]) extends CommandRequest
Expand Down Expand Up @@ -207,13 +206,17 @@ object ActorModel {
case class BrokerList(list: IndexedSeq[BrokerIdentity], clusterContext: ClusterContext) extends QueryResponse

case class PreferredReplicaElection(startTime: DateTime,
topicAndPartition: Set[TopicAndPartition],
topicAndPartition: Set[TopicPartition],
endTime: Option[DateTime],
clusterContext: ClusterContext) extends QueryResponse
clusterContext: ClusterContext) extends QueryResponse {
def sortedTopicPartitionList: List[(String, Int)] = topicAndPartition.toList.map(tp => (tp.topic(), tp.partition())).sortBy(_._1)
}
case class ReassignPartitions(startTime: DateTime,
partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
endTime: Option[DateTime],
clusterContext: ClusterContext) extends QueryResponse
clusterContext: ClusterContext) extends QueryResponse {
def sortedTopicPartitionAssignmentList : List[((String, Int), Seq[Int])] = partitionsToBeReassigned.toList.sortBy(partition => (partition._1.topic(), partition._1.partition())).map { case (tp, a) => ((tp.topic(), tp.partition()), a)}
}

case class ConsumedTopicDescription(consumer: String,
topic: String,
Expand Down
6 changes: 3 additions & 3 deletions app/kafka/manager/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package kafka.manager.utils

import java.nio.charset.StandardCharsets

import kafka.common.TopicAndPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException.{NodeExistsException, NoNodeException}
import org.apache.zookeeper.KeeperException.{NoNodeException, NodeExistsException}
import org.apache.zookeeper.data.Stat

/**
Expand Down Expand Up @@ -121,7 +121,7 @@ object ZkUtils {
}


def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]]): String = {
toJson(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
"replicas" -> e._2))))
}
Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/utils/one10/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ object GroupMetadataManager {
val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
val subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
val assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
val member = new MemberMetadata(memberId
, groupId
, clientId
Expand Down
22 changes: 0 additions & 22 deletions app/kafka/manager/utils/one10/MemberMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,6 @@ object MemberMetadata {
, assignment
)
}
def from(groupId: String, groupSummary: DescribeGroupsResponse.GroupMetadata, memberSummary: DescribeGroupsResponse.GroupMember) : MemberMetadata = {
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(memberSummary.memberAssignment)))
val topics: Set[String] = {
try {
val subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(Utils.readBytes(memberSummary.memberMetadata())))
subscription.topics().asScala.toSet
} catch {
case e: Exception =>
assignment.partitions().asScala.map(tp => tp.topic()).toSet
}
}
MemberMetadata(
memberSummary.memberId
, groupId
, memberSummary.clientId
, memberSummary.clientHost
, groupSummary.protocolType()
, List((groupSummary.protocol, topics))
, assignment.partitions().asScala.map(tp => tp.topic() -> tp.partition()).toSet
)

}
}

/**
Expand Down
Loading

0 comments on commit 2740a58

Please sign in to comment.