Skip to content

Commit

Permalink
Merge pull request #20 from sysco-middleware/config
Browse files Browse the repository at this point in the history
Support Resource Configuration
  • Loading branch information
jeqo authored Oct 11, 2018
2 parents 9702ed6 + 1aadc75 commit 146fa8c
Show file tree
Hide file tree
Showing 20 changed files with 576 additions and 330 deletions.
27 changes: 19 additions & 8 deletions src/main/protobuf/collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ message CollectorEvent {
// Types of Entities supported
enum EntityType {
CLUSTER = 0;
NODE = 1;
BROKER = 1;
TOPIC = 2;
}
EntityType entityType = 1;
string entityId = 2;
oneof value {
ClusterEvent clusterEvent = 3;
NodeEvent nodeEvent = 4;
BrokerEvent brokerEvent = 4;
TopicEvent topicEvent = 5;
}
}
Expand All @@ -40,20 +40,22 @@ message ClusterUpdated {
}

// Supported Node Events
message NodeEvent {
int32 id = 1;
message BrokerEvent {
string id = 1;
oneof event {
NodeCreated nodeCreated = 2;
NodeUpdated nodeUpdated = 3;
BrokerCreated brokerCreated = 2;
BrokerUpdated brokerUpdated = 3;
}
}

message NodeCreated {
message BrokerCreated {
Node node = 1;
Config config = 2;
}

message NodeUpdated {
message BrokerUpdated {
Node node = 1;
Config config = 2;
}

// Supported Topic Events
Expand All @@ -71,6 +73,7 @@ message TopicCreated {

message TopicUpdated {
TopicDescription topicDescription = 1;
Config config = 2;
}

message TopicDescription {
Expand All @@ -93,4 +96,12 @@ message Node {
string host = 2;
int32 port = 3;
string rack = 4;
}

message Config {
message Entry {
string name = 1;
string value = 2;
}
repeated Entry entries = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class CollectorConfig(config: Config) {
object Topic {
val pollInterval: Duration = config.getDuration("collector.topic.poll-interval")
val includeInternalTopics: Boolean = config.getBoolean("collector.topic.include-internal-topics")
val whitelist: List[String] = config.getString("collector.topic.whitelist").split(",").toList
val blacklist: List[String] = config.getString("collector.topic.blacklist").split(",").toList
val whitelist: List[String] = config.getString("collector.topic.whitelist").split(",").filterNot(s => s.isEmpty).toList
val blacklist: List[String] = config.getString("collector.topic.blacklist").split(",").filterNot(s => s.isEmpty).toList
}
}
object Kafka {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import no.sysco.middleware.kafka.event.collector.cluster.ClusterManager
import no.sysco.middleware.kafka.event.collector.cluster.ClusterManager.GetCluster
import no.sysco.middleware.kafka.event.collector.cluster.NodeManager.ListNodes
import no.sysco.middleware.kafka.event.collector.cluster.BrokerManager.ListBrokers
import no.sysco.middleware.kafka.event.collector.internal.{ EventConsumer, EventProducer, EventRepository }
import no.sysco.middleware.kafka.event.collector.topic.TopicManager
import no.sysco.middleware.kafka.event.collector.topic.TopicManager.ListTopics
import no.sysco.middleware.kafka.event.proto.collector.{ ClusterEvent, CollectorEvent, NodeEvent, TopicEvent }
import no.sysco.middleware.kafka.event.proto.collector._

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -66,14 +66,14 @@ class CollectorManager(implicit
override def receive: Receive = {
case collectorEvent: CollectorEvent => handleEvent(collectorEvent)
case getCluster: GetCluster => clusterEventCollector forward getCluster
case listNodes: ListNodes => clusterEventCollector forward listNodes
case listNodes: ListBrokers => clusterEventCollector forward listNodes
case listTopics: ListTopics => topicEventCollector forward listTopics
}

private def handleEvent(event: CollectorEvent): Unit = {
event.value match {
case value: CollectorEvent.Value if value.isClusterEvent => handleClusterEvent(value.clusterEvent)
case value: CollectorEvent.Value if value.isNodeEvent => handleNodeEvent(value.nodeEvent)
case value: CollectorEvent.Value if value.isBrokerEvent => handleNodeEvent(value.brokerEvent)
case value: CollectorEvent.Value if value.isTopicEvent => handleTopicEvent(value.topicEvent)
}
}
Expand All @@ -85,10 +85,10 @@ class CollectorManager(implicit
}
}

private def handleNodeEvent(nodeEvent: Option[NodeEvent]): Unit = {
nodeEvent match {
case Some(nodeEventValue) => clusterEventCollector ! nodeEventValue
case None =>
private def handleNodeEvent(brokerEvent: Option[BrokerEvent]): Unit = {
brokerEvent match {
case Some(brokerEventValue) => clusterEventCollector ! brokerEventValue
case None =>
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package no.sysco.middleware.kafka.event.collector.cluster

import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.pattern.ask
import akka.util.Timeout
import io.opencensus.scala.Stats
import io.opencensus.scala.stats.Measurement
import no.sysco.middleware.kafka.event.collector.internal.EventRepository
import no.sysco.middleware.kafka.event.collector.internal.EventRepository.DescribeConfig
import no.sysco.middleware.kafka.event.collector.internal.Parser._
import no.sysco.middleware.kafka.event.collector.model._
import no.sysco.middleware.kafka.event.proto.collector.{ BrokerCreated, BrokerEvent, BrokerUpdated }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{ Failure, Success }

object BrokerManager {
def props(
eventRepository: ActorRef,
eventProducer: ActorRef)(implicit executionContext: ExecutionContext): Props =
Props(new BrokerManager(eventRepository, eventProducer))

case class ListBrokers()

}

/**
* Manage Cluster Nodes state.
*
* @param eventProducer Reference to producer, to publish events.
*/
class BrokerManager(eventRepository: ActorRef, eventProducer: ActorRef)(implicit executionContext: ExecutionContext)
extends Actor with ActorLogging {

import BrokerManager._
import no.sysco.middleware.kafka.event.collector.metrics.Metrics._

var brokers: Map[String, Broker] = Map()

implicit val timeout: Timeout = 10 seconds

override def receive(): Receive = {
case nodesDescribed: NodesDescribed => handleNodesDescribed(nodesDescribed)
case brokerEvent: BrokerEvent => handleBrokerEvent(brokerEvent)
case ListBrokers() => handleListBrokers()
}

def handleNodesDescribed(nodesDescribed: NodesDescribed): Unit = {
log.info("Handling {} nodes described event.", nodesDescribed.nodes.size)
evaluateCurrentNodes(brokers.values.map(_.node).toList, nodesDescribed.nodes)
evaluateNodesDescribed(nodesDescribed.nodes)
}

private def evaluateCurrentNodes(currentBrokers: List[Node], nodes: List[Node]): Unit = {
currentBrokers match {
case Nil =>
case node :: ns =>
if (!nodes.contains(node)) {
log.warning("{} is not listed", node)
}
evaluateCurrentNodes(ns, nodes)
}
}

private def evaluateNodesDescribed(listedNodes: List[Node]): Unit = {
listedNodes match {
case Nil =>
case node :: ns =>
val brokerId = String.valueOf(node.id)
brokers.get(brokerId) match {
case None =>
val configFuture =
(eventRepository ? DescribeConfig(EventRepository.ResourceType.Broker, brokerId)).mapTo[ConfigDescribed]
configFuture onComplete {
case Success(configDescribed) =>
Stats.record(
List(brokerTypeTag, createdOperationTypeTag),
Measurement.double(totalMessageProducedMeasure, 1))
eventProducer !
BrokerEvent(brokerId, BrokerEvent.Event.BrokerCreated(BrokerCreated(Some(toPb(node)), Some(toPb(configDescribed.config)))))
case Failure(t) => log.error(t, "Error querying config")
}
case Some(thisBroker) =>
val configFuture =
(eventRepository ? DescribeConfig(EventRepository.ResourceType.Broker, brokerId)).mapTo[ConfigDescribed]
configFuture onComplete {
case Success(configDescribed) =>
if (!thisBroker.equals(Broker(brokerId, node, configDescribed.config))) {
Stats.record(
List(brokerTypeTag, updatedOperationTypeTag),
Measurement.double(totalMessageProducedMeasure, 1))
eventProducer !
BrokerEvent(brokerId, BrokerEvent.Event.BrokerUpdated(BrokerUpdated(Some(toPb(node)), Some(toPb(configDescribed.config)))))
}
case Failure(t) => log.error(t, "Error querying config")
}
}
evaluateNodesDescribed(ns)
}
}

def handleBrokerEvent(brokerEvent: BrokerEvent): Unit = {
log.info("Handling node {} event.", brokerEvent.id)
val brokerId = String.valueOf(brokerEvent.id)
brokerEvent.event match {
case event if event.isBrokerCreated =>
Stats.record(
List(brokerTypeTag, createdOperationTypeTag),
Measurement.double(totalMessageConsumedMeasure, 1))
event.brokerCreated match {
case Some(brokerCreated) =>
val broker = Broker(brokerId, fromPb(brokerCreated.getNode), fromPb(brokerCreated.config))
brokers = brokers + (brokerId -> broker)
case None => //This scenario should not happen as event is validated before.
}
case event if event.isBrokerUpdated =>
Stats.record(
List(brokerTypeTag, updatedOperationTypeTag),
Measurement.double(totalMessageConsumedMeasure, 1))
event.brokerUpdated match {
case Some(brokerUpdated) =>
val broker = Broker(brokerId, fromPb(brokerUpdated.getNode), fromPb(brokerUpdated.config))
brokers = brokers + (brokerId -> broker)
case None => //This scenario should not happen as event is validated before.
}
}
}

def handleListBrokers(): Unit = sender() ! Brokers(brokers.values.toList)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import java.time.Duration
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import io.opencensus.scala.Stats
import io.opencensus.scala.stats.Measurement
import no.sysco.middleware.kafka.event.collector.cluster.NodeManager.ListNodes
import no.sysco.middleware.kafka.event.collector.internal.Parser
import no.sysco.middleware.kafka.event.collector.cluster.BrokerManager.ListBrokers
import no.sysco.middleware.kafka.event.collector.internal.Parser._
import no.sysco.middleware.kafka.event.collector.model.{ Cluster, ClusterDescribed, NodesDescribed }
import no.sysco.middleware.kafka.event.proto.collector._

Expand Down Expand Up @@ -40,7 +40,8 @@ class ClusterManager(
import no.sysco.middleware.kafka.event.collector.internal.EventRepository._
import no.sysco.middleware.kafka.event.collector.metrics.Metrics._

val nodeManager: ActorRef = context.actorOf(NodeManager.props(eventProducer), "node-manager")
val brokerManager: ActorRef =
context.actorOf(BrokerManager.props(eventRepository, eventProducer), "broker-manager")

var cluster: Option[Cluster] = None

Expand All @@ -51,8 +52,8 @@ class ClusterManager(
case clusterDescribed: ClusterDescribed => handleClusterDescribed(clusterDescribed)
case clusterEvent: ClusterEvent => handleClusterEvent(clusterEvent)
case GetCluster() => handleGetCluster()
case nodeEvent: NodeEvent => nodeManager forward nodeEvent
case listNodes: ListNodes => nodeManager forward listNodes
case brokerEvent: BrokerEvent => brokerManager forward brokerEvent
case listNodes: ListBrokers => brokerManager forward listNodes
}

def handleDescribeCluster(): Unit = {
Expand All @@ -69,7 +70,7 @@ class ClusterManager(
def handleClusterDescribed(clusterDescribed: ClusterDescribed): Unit = {
log.info("Handling cluster {} described event.", clusterDescribed.id)
val controller: Option[Node] = clusterDescribed.controller match {
case Some(c) => Some(Parser.toPb(c))
case Some(c) => Some(toPb(c))
case None => None
}
cluster match {
Expand All @@ -89,7 +90,7 @@ class ClusterManager(
eventProducer ! ClusterEvent(clusterDescribed.id, ClusterEvent.Event.ClusterUpdated(ClusterUpdated(controller)))
}
}
nodeManager ! NodesDescribed(clusterDescribed.nodes)
brokerManager ! NodesDescribed(clusterDescribed.nodes)
}

def handleClusterEvent(clusterEvent: ClusterEvent): Unit = {
Expand All @@ -103,10 +104,10 @@ class ClusterManager(
case Some(clusterCreated) =>
val controller = clusterCreated.controller match {
case None => None
case Some(node) => Some(Parser.fromPb(node))
case Some(node) => Some(fromPb(node))
}
cluster = Some(Cluster(clusterEvent.id, controller))
case None =>
case None => //This scenario should not happen as event is validated before.
}
case event if event.isClusterUpdated =>
Stats.record(
Expand All @@ -116,10 +117,10 @@ class ClusterManager(
case Some(clusterUpdated) =>
val controller = clusterUpdated.controller match {
case None => None
case Some(node) => Some(Parser.fromPb(node))
case Some(node) => Some(fromPb(node))
}
cluster = Some(Cluster(clusterEvent.id, controller))
case None =>
case None => //This scenario should not happen as event is validated before.
}
}
}
Expand Down
Loading

0 comments on commit 146fa8c

Please sign in to comment.