Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Resource Configuration #20

Merged
merged 11 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/main/protobuf/collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ message CollectorEvent {
// Types of Entities supported
enum EntityType {
CLUSTER = 0;
NODE = 1;
BROKER = 1;
TOPIC = 2;
}
EntityType entityType = 1;
string entityId = 2;
jeqo marked this conversation as resolved.
Show resolved Hide resolved
oneof value {
ClusterEvent clusterEvent = 3;
NodeEvent nodeEvent = 4;
BrokerEvent brokerEvent = 4;
TopicEvent topicEvent = 5;
}
}
Expand All @@ -40,20 +40,22 @@ message ClusterUpdated {
}

// Supported Node Events
message NodeEvent {
int32 id = 1;
message BrokerEvent {
string id = 1;
oneof event {
NodeCreated nodeCreated = 2;
NodeUpdated nodeUpdated = 3;
BrokerCreated brokerCreated = 2;
BrokerUpdated brokerUpdated = 3;
}
}

message NodeCreated {
message BrokerCreated {
Node node = 1;
Config config = 2;
}

message NodeUpdated {
message BrokerUpdated {
zhenik marked this conversation as resolved.
Show resolved Hide resolved
Node node = 1;
Config config = 2;
}

// Supported Topic Events
Expand All @@ -71,6 +73,7 @@ message TopicCreated {

message TopicUpdated {
TopicDescription topicDescription = 1;
Config config = 2;
}

message TopicDescription {
Expand All @@ -93,4 +96,12 @@ message Node {
string host = 2;
int32 port = 3;
string rack = 4;
}

message Config {
message Entry {
string name = 1;
string value = 2;
}
repeated Entry entries = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class CollectorConfig(config: Config) {
object Topic {
val pollInterval: Duration = config.getDuration("collector.topic.poll-interval")
val includeInternalTopics: Boolean = config.getBoolean("collector.topic.include-internal-topics")
val whitelist: List[String] = config.getString("collector.topic.whitelist").split(",").toList
val blacklist: List[String] = config.getString("collector.topic.blacklist").split(",").toList
val whitelist: List[String] = config.getString("collector.topic.whitelist").split(",").filterNot(s => s.isEmpty).toList
val blacklist: List[String] = config.getString("collector.topic.blacklist").split(",").filterNot(s => s.isEmpty).toList
}
}
object Kafka {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import no.sysco.middleware.kafka.event.collector.cluster.ClusterManager
import no.sysco.middleware.kafka.event.collector.cluster.ClusterManager.GetCluster
import no.sysco.middleware.kafka.event.collector.cluster.NodeManager.ListNodes
import no.sysco.middleware.kafka.event.collector.cluster.BrokerManager.ListBrokers
import no.sysco.middleware.kafka.event.collector.internal.{ EventConsumer, EventProducer, EventRepository }
import no.sysco.middleware.kafka.event.collector.topic.TopicManager
import no.sysco.middleware.kafka.event.collector.topic.TopicManager.ListTopics
import no.sysco.middleware.kafka.event.proto.collector.{ ClusterEvent, CollectorEvent, NodeEvent, TopicEvent }
import no.sysco.middleware.kafka.event.proto.collector._

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -66,14 +66,14 @@ class CollectorManager(implicit
override def receive: Receive = {
case collectorEvent: CollectorEvent => handleEvent(collectorEvent)
case getCluster: GetCluster => clusterEventCollector forward getCluster
case listNodes: ListNodes => clusterEventCollector forward listNodes
case listNodes: ListBrokers => clusterEventCollector forward listNodes
case listTopics: ListTopics => topicEventCollector forward listTopics
}

private def handleEvent(event: CollectorEvent): Unit = {
event.value match {
case value: CollectorEvent.Value if value.isClusterEvent => handleClusterEvent(value.clusterEvent)
case value: CollectorEvent.Value if value.isNodeEvent => handleNodeEvent(value.nodeEvent)
case value: CollectorEvent.Value if value.isBrokerEvent => handleNodeEvent(value.brokerEvent)
case value: CollectorEvent.Value if value.isTopicEvent => handleTopicEvent(value.topicEvent)
}
}
Expand All @@ -85,10 +85,10 @@ class CollectorManager(implicit
}
}

private def handleNodeEvent(nodeEvent: Option[NodeEvent]): Unit = {
nodeEvent match {
case Some(nodeEventValue) => clusterEventCollector ! nodeEventValue
case None =>
private def handleNodeEvent(brokerEvent: Option[BrokerEvent]): Unit = {
brokerEvent match {
case Some(brokerEventValue) => clusterEventCollector ! brokerEventValue
case None =>
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package no.sysco.middleware.kafka.event.collector.cluster

import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.pattern.ask
import akka.util.Timeout
import io.opencensus.scala.Stats
import io.opencensus.scala.stats.Measurement
import no.sysco.middleware.kafka.event.collector.internal.EventRepository
import no.sysco.middleware.kafka.event.collector.internal.EventRepository.DescribeConfig
import no.sysco.middleware.kafka.event.collector.internal.Parser._
import no.sysco.middleware.kafka.event.collector.model._
import no.sysco.middleware.kafka.event.proto.collector.{ BrokerCreated, BrokerEvent, BrokerUpdated }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{ Failure, Success }

object BrokerManager {
def props(
eventRepository: ActorRef,
eventProducer: ActorRef)(implicit executionContext: ExecutionContext): Props =
Props(new BrokerManager(eventRepository, eventProducer))

case class ListBrokers()

}

/**
* Manage Cluster Nodes state.
*
* @param eventProducer Reference to producer, to publish events.
*/
class BrokerManager(eventRepository: ActorRef, eventProducer: ActorRef)(implicit executionContext: ExecutionContext)
extends Actor with ActorLogging {

import BrokerManager._
import no.sysco.middleware.kafka.event.collector.metrics.Metrics._

var brokers: Map[String, Broker] = Map()

implicit val timeout: Timeout = 10 seconds

override def receive(): Receive = {
case nodesDescribed: NodesDescribed => handleNodesDescribed(nodesDescribed)
case brokerEvent: BrokerEvent => handleBrokerEvent(brokerEvent)
case ListBrokers() => handleListBrokers()
}

def handleNodesDescribed(nodesDescribed: NodesDescribed): Unit = {
log.info("Handling {} nodes described event.", nodesDescribed.nodes.size)
evaluateCurrentNodes(brokers.values.map(_.node).toList, nodesDescribed.nodes)
zhenik marked this conversation as resolved.
Show resolved Hide resolved
evaluateNodesDescribed(nodesDescribed.nodes)
}

private def evaluateCurrentNodes(currentBrokers: List[Node], nodes: List[Node]): Unit = {
currentBrokers match {
case Nil =>
case node :: ns =>
if (!nodes.contains(node)) {
log.warning("{} is not listed", node)
}
evaluateCurrentNodes(ns, nodes)
}
}

private def evaluateNodesDescribed(listedNodes: List[Node]): Unit = {
zhenik marked this conversation as resolved.
Show resolved Hide resolved
listedNodes match {
case Nil =>
case node :: ns =>
val brokerId = String.valueOf(node.id)
brokers.get(brokerId) match {
zhenik marked this conversation as resolved.
Show resolved Hide resolved
case None =>
val configFuture =
(eventRepository ? DescribeConfig(EventRepository.ResourceType.Broker, brokerId)).mapTo[ConfigDescribed]
configFuture onComplete {
case Success(configDescribed) =>
Stats.record(
List(brokerTypeTag, createdOperationTypeTag),
Measurement.double(totalMessageProducedMeasure, 1))
eventProducer !
BrokerEvent(brokerId, BrokerEvent.Event.BrokerCreated(BrokerCreated(Some(toPb(node)), Some(toPb(configDescribed.config)))))
case Failure(t) => log.error(t, "Error querying config")
}
case Some(thisBroker) =>
val configFuture =
(eventRepository ? DescribeConfig(EventRepository.ResourceType.Broker, brokerId)).mapTo[ConfigDescribed]
configFuture onComplete {
case Success(configDescribed) =>
if (!thisBroker.equals(Broker(brokerId, node, configDescribed.config))) {
Stats.record(
List(brokerTypeTag, updatedOperationTypeTag),
Measurement.double(totalMessageProducedMeasure, 1))
eventProducer !
BrokerEvent(brokerId, BrokerEvent.Event.BrokerUpdated(BrokerUpdated(Some(toPb(node)), Some(toPb(configDescribed.config)))))
}
case Failure(t) => log.error(t, "Error querying config")
}
}
evaluateNodesDescribed(ns)
}
}

def handleBrokerEvent(brokerEvent: BrokerEvent): Unit = {
log.info("Handling node {} event.", brokerEvent.id)
val brokerId = String.valueOf(brokerEvent.id)
brokerEvent.event match {
case event if event.isBrokerCreated =>
Stats.record(
List(brokerTypeTag, createdOperationTypeTag),
Measurement.double(totalMessageConsumedMeasure, 1))
event.brokerCreated match {
case Some(brokerCreated) =>
val broker = Broker(brokerId, fromPb(brokerCreated.getNode), fromPb(brokerCreated.config))
brokers = brokers + (brokerId -> broker)
case None =>
zhenik marked this conversation as resolved.
Show resolved Hide resolved
}
case event if event.isBrokerUpdated =>
Stats.record(
List(brokerTypeTag, updatedOperationTypeTag),
Measurement.double(totalMessageConsumedMeasure, 1))
event.brokerUpdated match {
case Some(brokerUpdated) =>
brokers = brokers + (brokerId -> Broker(brokerId, fromPb(brokerUpdated.getNode), fromPb(brokerUpdated.config)))
case None =>
}
}
}

def handleListBrokers(): Unit = sender() ! Brokers(brokers.values.toList)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import java.time.Duration
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import io.opencensus.scala.Stats
import io.opencensus.scala.stats.Measurement
import no.sysco.middleware.kafka.event.collector.cluster.NodeManager.ListNodes
import no.sysco.middleware.kafka.event.collector.internal.Parser
import no.sysco.middleware.kafka.event.collector.cluster.BrokerManager.ListBrokers
import no.sysco.middleware.kafka.event.collector.internal.Parser._
import no.sysco.middleware.kafka.event.collector.model.{ Cluster, ClusterDescribed, NodesDescribed }
import no.sysco.middleware.kafka.event.proto.collector._

Expand Down Expand Up @@ -40,7 +40,8 @@ class ClusterManager(
import no.sysco.middleware.kafka.event.collector.internal.EventRepository._
import no.sysco.middleware.kafka.event.collector.metrics.Metrics._

val nodeManager: ActorRef = context.actorOf(NodeManager.props(eventProducer), "node-manager")
val brokerManager: ActorRef =
context.actorOf(BrokerManager.props(eventRepository, eventProducer), "broker-manager")

var cluster: Option[Cluster] = None

Expand All @@ -51,8 +52,8 @@ class ClusterManager(
case clusterDescribed: ClusterDescribed => handleClusterDescribed(clusterDescribed)
case clusterEvent: ClusterEvent => handleClusterEvent(clusterEvent)
case GetCluster() => handleGetCluster()
case nodeEvent: NodeEvent => nodeManager forward nodeEvent
case listNodes: ListNodes => nodeManager forward listNodes
case brokerEvent: BrokerEvent => brokerManager forward brokerEvent
case listNodes: ListBrokers => brokerManager forward listNodes
}

def handleDescribeCluster(): Unit = {
Expand All @@ -69,7 +70,7 @@ class ClusterManager(
def handleClusterDescribed(clusterDescribed: ClusterDescribed): Unit = {
log.info("Handling cluster {} described event.", clusterDescribed.id)
val controller: Option[Node] = clusterDescribed.controller match {
case Some(c) => Some(Parser.toPb(c))
case Some(c) => Some(toPb(c))
case None => None
}
cluster match {
Expand All @@ -89,7 +90,7 @@ class ClusterManager(
eventProducer ! ClusterEvent(clusterDescribed.id, ClusterEvent.Event.ClusterUpdated(ClusterUpdated(controller)))
}
}
nodeManager ! NodesDescribed(clusterDescribed.nodes)
brokerManager ! NodesDescribed(clusterDescribed.nodes)
}

def handleClusterEvent(clusterEvent: ClusterEvent): Unit = {
Expand All @@ -103,7 +104,7 @@ class ClusterManager(
case Some(clusterCreated) =>
val controller = clusterCreated.controller match {
case None => None
case Some(node) => Some(Parser.fromPb(node))
case Some(node) => Some(fromPb(node))
}
cluster = Some(Cluster(clusterEvent.id, controller))
case None =>
Expand All @@ -116,7 +117,7 @@ class ClusterManager(
case Some(clusterUpdated) =>
val controller = clusterUpdated.controller match {
case None => None
case Some(node) => Some(Parser.fromPb(node))
case Some(node) => Some(fromPb(node))
}
cluster = Some(Cluster(clusterEvent.id, controller))
case None =>
Expand Down
Loading