Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Resource Configuration #20

Merged
merged 11 commits into from
Oct 11, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ object Parser {
case s => Some(s)
})

def fromPb(config: proto.collector.Config): Config =
Config(config.entries.map(entry => entry.name -> entry.value).toMap)
def fromPb(configOption: Option[proto.collector.Config]): Config =
configOption match {
case Some(config) =>
Config(config.entries.map(entry => entry.name -> entry.value).toMap)
case None => Config()
}
def toPb(topicDescription: TopicDescription, config: Config): proto.collector.TopicUpdated =
proto.collector.TopicUpdated(
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TopicManager(
import no.sysco.middleware.kafka.event.collector.internal.EventRepository._
import no.sysco.middleware.kafka.event.collector.metrics.Metrics._

var topics: Map[String, Option[Topic]] = Map()
var topics: Map[String, Topic] = Map()

implicit val timeout: Timeout = 5 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could u add to javadoc info about timeout and why 5 secs? It is implicit value, so it used somewhere here in class (I guess its time out for Future completion).


Expand Down Expand Up @@ -161,8 +161,7 @@ class TopicManager(
}
}

// assume that topic name already collected, no null pointers
topics(topicName) match {
topics.get(topicName) match {
case None =>
val configFuture =
(eventRepository ? DescribeConfig(EventRepository.ResourceType.Topic, topicName)).mapTo[Config]
Expand Down Expand Up @@ -201,7 +200,6 @@ class TopicManager(
Measurement.double(totalMessageConsumedMeasure, 1))
event.topicCreated match {
case Some(_) =>
topics = topics + (topicEvent.name -> None)
eventRepository ! DescribeTopic(topicEvent.name)
case None =>
}
Expand All @@ -212,8 +210,8 @@ class TopicManager(
event.topicUpdated match {
case Some(topicUpdated) =>
val topicDescription = fromPb(topicEvent.name, topicUpdated.topicDescription.get)
val config = fromPb(topicUpdated.config.get)
topics = topics + (topicEvent.name -> Some(Topic(topicEvent.name, topicDescription, config)))
val config = fromPb(topicUpdated.config)
topics = topics + (topicEvent.name -> Topic(topicEvent.name, topicDescription, config))
case None =>
}
case event if event.isTopicDeleted =>
Expand All @@ -229,5 +227,5 @@ class TopicManager(
}

def handleListTopics(): Unit =
sender() ! Topics(topics.values.filter(_.isDefined).map(_.get).toList)
sender() ! Topics(topics.values.toList)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import java.time.Duration

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.testkit.{ ImplicitSender, TestKit, TestProbe }
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import net.manub.embeddedkafka.EmbeddedKafkaConfig
import no.sysco.middleware.kafka.event.collector.internal.EventRepository.CollectTopics
import no.sysco.middleware.kafka.event.collector.model.{ TopicDescription, _ }
import no.sysco.middleware.kafka.event.collector.internal.EventRepository.{CollectTopics, DescribeConfig, DescribeTopic, ResourceType}
import no.sysco.middleware.kafka.event.collector.model.{TopicDescription, _}
import no.sysco.middleware.kafka.event.collector.topic.TopicManager.ListTopics
import no.sysco.middleware.kafka.event.proto
import no.sysco.middleware.kafka.event.proto.collector._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import no.sysco.middleware.kafka.event.proto.collector.{TopicCreated, TopicDeleted, TopicEvent, TopicUpdated}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps

class TopicManagerSpec
extends TestKit(ActorSystem("test-topic-manager"))
Expand Down Expand Up @@ -69,12 +70,16 @@ class TopicManagerSpec
manager ! TopicEvent("topic-2", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-3", TopicEvent.Event.TopicCreated(TopicCreated()))

// then, current state should have 3 topics
// then, 3 topics should be ask to be described
eventRepository.expectMsg(DescribeTopic("topic-1"))
eventRepository.expectMsg(DescribeTopic("topic-2"))
eventRepository.expectMsg(DescribeTopic("topic-3"))

// and, current state should not have 3 topics, as empty topics are not returned
manager ! ListTopics()

val topicsV0 = expectMsgType[Topics]
assert(topicsV0.topics.size == 3)
//FIXME assert(topicsV0.topicsAndDescription.count(_._2.isEmpty) == 3)
assert(topicsV0.topics.isEmpty)

// if a topic is updated
manager !
Expand All @@ -87,21 +92,31 @@ class TopicManagerSpec
internal = false,
List(proto.collector.TopicDescription.TopicPartitionInfo(0, Some(proto.collector.Node(0, "localhost", 9092, "1")))))))))

manager !
TopicEvent(
"topic-2",
TopicEvent.Event.TopicUpdated(
TopicUpdated(
Some(
proto.collector.TopicDescription(
internal = false,
List(proto.collector.TopicDescription.TopicPartitionInfo(0, Some(proto.collector.Node(0, "localhost", 9092, "1")))))))))


// then, current state should reflect topic updated
manager ! ListTopics()
val topicsV1 = expectMsgType[Topics]
//FIXME assert(topicsV1.topicsAndDescription.count(_._2.isEmpty) == 2)
//FIXME assert(!topicsV1.topicsAndDescription("topic-1").get.internal)
//FIXME assert(topicsV1.topicsAndDescription("topic-1").get.partitions.size == 1)
assert(topicsV1.topics.size == 2)
assert(!topicsV1.topics.find(s => s.name.equals("topic-1")).get.description.internal)
assert(topicsV1.topics.find(s => s.name.equals("topic-1")).get.description.partitions.size == 1)

// finally, if topic is deleted
manager ! TopicEvent("topic-2", TopicEvent.Event.TopicDeleted(TopicDeleted()))

// then, current state should have just 2 topics.
manager ! ListTopics()
val topicsV2 = expectMsgType[Topics]
//FIXME assert(topicsV2.topicsAndDescription.count(_._2.isEmpty) == 1)
//FIXME assert(topicsV2.topicsAndDescription.get("topic-2").isEmpty)
assert(topicsV2.topics.size == 1)
}
}

Expand All @@ -117,18 +132,19 @@ class TopicManagerSpec
eventRepository = eventRepositoryProbe.ref,
eventProducer = eventProducerProbe.ref))

// collect 3 topics
manager ! TopicEvent("topic-1", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-2", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-3", TopicEvent.Event.TopicCreated(TopicCreated()))

// describe 2 internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont forget also comments :) As I see now, U describe 2 NON-internal topic

manager ! TopicDescribed(("topic-1", TopicDescription(internal = true, List.empty)))
manager ! TopicDescribed(("topic-1", TopicDescription(internal = false, List.empty)))
eventRepositoryProbe.expectMsg(DescribeConfig(ResourceType.Topic, "topic-1"))
eventRepositoryProbe.reply(Config())
eventProducerProbe.expectMsgType[TopicEvent]
manager ! TopicDescribed(("topic-2", TopicDescription(internal = true, List.empty)))
manager ! TopicDescribed(("topic-2", TopicDescription(internal = false, List.empty)))
eventRepositoryProbe.expectMsg(DescribeConfig(ResourceType.Topic, "topic-2"))
eventRepositoryProbe.reply(Config())
eventProducerProbe.expectMsgType[TopicEvent]
// describe 1 NOT internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And opposite here U describe 1 INTERNAL topic

manager ! TopicDescribed(("topic-3", TopicDescription(internal = false, List.empty)))
manager ! TopicDescribed(("topic-3", TopicDescription(internal = true, List.empty)))
eventRepositoryProbe.expectMsg(DescribeConfig(ResourceType.Topic, "topic-3"))
eventRepositoryProbe.reply(Config())
eventProducerProbe.expectMsgType[TopicEvent]
}
}
Expand All @@ -142,24 +158,23 @@ class TopicManagerSpec
system.actorOf(
TopicManager.props(
pollInterval = Duration.ofSeconds(100),
includeInternalTopics = false,
eventRepository = eventRepositoryProbe.ref,
eventProducer = eventProducerProbe.ref))

// collect 3 topics
manager ! TopicEvent("topic-1", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-2", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-3", TopicEvent.Event.TopicCreated(TopicCreated()))
eventProducer = eventProducerProbe.ref,
includeInternalTopics = false))

// describe 2 internal
manager ! TopicDescribed(("topic-1", TopicDescription(internal = true, List.empty)))
manager ! TopicDescribed(("topic-2", TopicDescription(internal = true, List.empty)))

eventProducerProbe.expectNoMessage(500 millis)

// describe 1 NOT internal
manager ! TopicDescribed(("topic-3", TopicDescription(internal = false, List.empty)))
manager ! TopicDescribed(("topic-1", TopicDescription(internal = false, List.empty)))
eventRepositoryProbe.expectMsg(DescribeConfig(ResourceType.Topic, "topic-1"))
eventRepositoryProbe.reply(Config())
eventProducerProbe.expectMsgType[TopicEvent]
manager ! TopicDescribed(("topic-2", TopicDescription(internal = false, List.empty)))
eventRepositoryProbe.expectMsg(DescribeConfig(ResourceType.Topic, "topic-2"))
eventRepositoryProbe.reply(Config())
eventProducerProbe.expectMsgType[TopicEvent]
// describe 1 NOT internal
manager ! TopicDescribed(("topic-3", TopicDescription(internal = true, List.empty)))
eventRepositoryProbe.expectNoMessage()
eventProducerProbe.expectNoMessage()
}
}

Expand Down Expand Up @@ -253,9 +268,33 @@ class TopicManagerSpec
eventProducer = eventProducerProbe.ref))

// collect 3 topics
manager ! TopicEvent("topic-1", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-2", TopicEvent.Event.TopicCreated(TopicCreated()))
manager ! TopicEvent("topic-3", TopicEvent.Event.TopicCreated(TopicCreated()))
manager !
TopicEvent(
"topic-1",
TopicEvent.Event.TopicUpdated(
TopicUpdated(
Some(
proto.collector.TopicDescription(
internal = false,
List(proto.collector.TopicDescription.TopicPartitionInfo(0, Some(proto.collector.Node(0, "localhost", 9092, "1")))))))))
manager !
TopicEvent(
"topic-2",
TopicEvent.Event.TopicUpdated(
TopicUpdated(
Some(
proto.collector.TopicDescription(
internal = false,
List(proto.collector.TopicDescription.TopicPartitionInfo(0, Some(proto.collector.Node(0, "localhost", 9092, "1")))))))))
manager !
TopicEvent(
"topic-3",
TopicEvent.Event.TopicUpdated(
TopicUpdated(
Some(
proto.collector.TopicDescription(
internal = false,
List(proto.collector.TopicDescription.TopicPartitionInfo(0, Some(proto.collector.Node(0, "localhost", 9092, "1")))))))))

// if topic-3 is not collected
manager ! TopicsCollected(List("topic-1", "topic-2"))
Expand Down