Skip to content

Commit

Permalink
Merge pull request #66 from nitayk/feature/removeTagsFromMetricNameDD…
Browse files Browse the repository at this point in the history
…Reporter

feature/removeTagsFromMetricNameDDReporter
  • Loading branch information
hjacobs authored May 2, 2019
2 parents 071b24e + 1441579 commit f66e579
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ metrics {
port = ${?DATADOG_AGENT_PORT}
tracked_consumer_group = []
tracked_consumer_group = ${?DATADOG_CONSUMER_GROUPS}
remove_tags_from_metric_name = false
remove_tags_from_metric_name = ${?DATADOG_REMOVE_TAGS_FROM_METRIC_NAME}
}
toregistry {
enabled = false
Expand Down
34 changes: 24 additions & 10 deletions src/main/scala/config/MetricsSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,39 @@ package config
import com.typesafe.config.Config
import scala.collection.JavaConverters._

case class RegistryOptions(enabled: Boolean = true, intervalSeconds : Int)
case class CloudWatch(enabled: Boolean = false, name : String, intervalMinutes: Int)
case class DataDog(enabled: Boolean = false, name : String, intervalMinutes: Int, agentHost: String, agentPort: Int, trackedConsumerGroups: List[String])
case class RegistryOptions(enabled: Boolean = true, intervalSeconds: Int)
case class CloudWatch(enabled: Boolean = false, name: String, intervalMinutes: Int)

case class DataDog(enabled: Boolean = false,
name: String,
intervalMinutes: Int,
agentHost: String,
agentPort: Int,
trackedConsumerGroups: List[String],
removeTagsFromMetricName: Boolean)

case class MetricsSettings(cloudWatch: CloudWatch, dataDog: DataDog, registryOptions: RegistryOptions)

object MetricsSettings {
def apply(config: Config): MetricsSettings =
MetricsSettings(
CloudWatch(config.getBoolean("metrics.cloudwatch.enabled"),
CloudWatch(
config.getBoolean("metrics.cloudwatch.enabled"),
config.getString("metrics.cloudwatch.name"),
config.getInt("metrics.cloudwatch.interval_minutes")),
DataDog(config.getBoolean("metrics.datadog.enabled"),
config.getInt("metrics.cloudwatch.interval_minutes")
),
DataDog(
config.getBoolean("metrics.datadog.enabled"),
config.getString("metrics.datadog.name"),
config.getInt("metrics.datadog.interval_minutes"),
config.getString("metrics.datadog.host"),
config.getInt("metrics.datadog.port"),
config.getStringList("metrics.datadog.tracked_consumer_group").asScala.toList
config.getStringList("metrics.datadog.tracked_consumer_group").asScala.toList,
config.getBoolean("metrics.datadog.remove_tags_from_metric_name")
),
RegistryOptions(config.getBoolean("metrics.toregistry.enabled"),
config.getInt("metrics.toregistry.interval_seconds")))
}
RegistryOptions(
config.getBoolean("metrics.toregistry.enabled"),
config.getInt("metrics.toregistry.interval_seconds")
)
)
}
16 changes: 8 additions & 8 deletions src/main/scala/reporter/RemoraDatadogReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.coursera.metrics.datadog.{DatadogReporter, MetricNameFormatter}

class RemoraDatadogReporter(metricRegistry: MetricRegistry, datadogConfig: DataDog) {


private val transport = new UdpTransport.Builder().withStatsdHost(datadogConfig.agentHost).withPort(datadogConfig.agentPort).build

/**
Expand All @@ -24,12 +23,13 @@ class RemoraDatadogReporter(metricRegistry: MetricRegistry, datadogConfig: DataD
}
}

private val metricNameFormatter: MetricNameFormatter = new MetricNameFormatter {
private def metricNameFormatter(removeTagsFromMetricName: Boolean): MetricNameFormatter = new MetricNameFormatter {
override def format(nameWithPrefix: String, path: String*): String = {
RegistryKafkaMetric.decode(nameWithPrefix.replaceFirst(s"${datadogConfig.name}\\.","")) match {
case Some(registryKafkaMetric) =>
val builder = new TaggedNameBuilder().metricName(nameWithPrefix)
.addTag("topic", registryKafkaMetric.topic)
val builder = new TaggedNameBuilder().metricName(
if (removeTagsFromMetricName) buildNameWithoutTags(registryKafkaMetric) else nameWithPrefix
).addTag("topic", registryKafkaMetric.topic)
.addTag("group", registryKafkaMetric.group)
registryKafkaMetric.partition.foreach(p => builder.addTag("partition", p))
builder.build().encode()
Expand All @@ -38,15 +38,15 @@ class RemoraDatadogReporter(metricRegistry: MetricRegistry, datadogConfig: DataD
}
}

private def buildNameWithoutTags(registryKafkaMetric: RegistryKafkaMetric): String =
s"${datadogConfig.name}.${registryKafkaMetric.prefix}.${registryKafkaMetric.suffix}"

def startReporter(): Unit = DatadogReporter
.forRegistry(metricRegistry)
.withPrefix(datadogConfig.name)
.withTransport(transport)
.filter(kafkaConsumerGroupFilter)
.withMetricNameFormatter(metricNameFormatter)
.withMetricNameFormatter(metricNameFormatter(datadogConfig.removeTagsFromMetricName))
.build
.start(datadogConfig.intervalMinutes, TimeUnit.MINUTES)


}

21 changes: 17 additions & 4 deletions src/test/scala/reporter/RemoraDatadogReporterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class RemoraDatadogReporterSpec extends FlatSpec with Matchers with PrivateMetho

private val metricRegistry: MetricRegistry = new MetricRegistry
private val metric: Metric = mock[Metric]
private val config = DataDog(enabled = true, "test", 1, "localhost", 8125, List.empty)
private val config = DataDog(enabled = true, "test", 1, "localhost", 8125, List.empty, removeTagsFromMetricName = false)
private val configRemoveTags = DataDog(enabled = true, "test", 1, "localhost", 8125, List.empty, removeTagsFromMetricName = true)

"Metrics filter" should "match any metric when no filter is given" in {
val filter = buildMetricFilter(List.empty)
Expand Down Expand Up @@ -50,14 +51,26 @@ class RemoraDatadogReporterSpec extends FlatSpec with Matchers with PrivateMetho
formatter.format(s"${config.name}.gauge.test_1_faulty_test-consumer__lag") should be(s"${config.name}.gauge.test_1_faulty_test-consumer__lag")
}

private def buildMetricFilter(kafkaConsumerList: List[String]): MetricFilter = {
val config = DataDog(enabled = true, "test", 1, "localhost", 8125, kafkaConsumerList)
"Metric name formatter without tags" should "add tag information if metric is well formatted" in {
val formatter = getMetricNameFormatter(configRemoveTags)

formatter.format(s"${configRemoveTags.name}.gauge.test.1.test-consumer.lag") should be(s"${configRemoveTags.name}.gauge.lag[topic:test,group:test-consumer,partition:1]")
}

it should "not add partition tag information if no partition" in {
val formatter = getMetricNameFormatter(configRemoveTags)

formatter.format(s"${configRemoveTags.name}.gauge.test-topic.test-consumer.totalLag") should be(s"${configRemoveTags.name}.gauge.totalLag[topic:test-topic,group:test-consumer]")
}

private def buildMetricFilter(kafkaConsumerList: List[String], removeTags: Boolean = false): MetricFilter = {
val config = DataDog(enabled = true, "test", 1, "localhost", 8125, kafkaConsumerList, removeTags)
val reporter = new RemoraDatadogReporter(metricRegistry, config)
reporter invokePrivate PrivateMethod[MetricFilter]('kafkaConsumerGroupFilter)()
}

private def getMetricNameFormatter(config: DataDog): MetricNameFormatter = {
val reporter = new RemoraDatadogReporter(metricRegistry, config)
reporter invokePrivate PrivateMethod[MetricNameFormatter]('metricNameFormatter)()
reporter invokePrivate PrivateMethod[MetricNameFormatter]('metricNameFormatter)(config.removeTagsFromMetricName)
}
}

0 comments on commit f66e579

Please sign in to comment.