Skip to content

Latest commit

 

History

History
125 lines (103 loc) · 4.46 KB

metrics_logging.md

File metadata and controls

125 lines (103 loc) · 4.46 KB

Metrics Logging

When we process items through a stream we probably want to know how many items are passing through. So let's look at using MetricsLoggers to capture that data. Fleam includes two types of MetricsLoggers: CloudWatch and TextLogging.

Before we get into configuring our loggers, let's look at how we use one. So let's start with a flow that does nothing but log how many items are going through it. Here you can see that a logger is nothing more than a flow that takes in some type and passes that type through.

import org.apache.pekko.stream.scaladsl._
case class Item()

class Pipeline(logger: Flow[Item, Item, _]) {
  val flow = {
    Flow[Item]
      .via(logger)
  }
}

So this shows us that we can count anywhere along the stream. Let's look at how we create a logger using TextLogging. Creating a logger is pretty easy from here. We need to give it some function String => Unit. Here we're going to use a Logger's info function.

import com.nike.fleam.cloudwatch._
import com.nike.fleam.logging._
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global

val textLogger = TextLogging(LoggerFactory.getLogger("Example").info)

If we tried to use our textLogger now we'd run into some trouble.

val pipeline1 = new Pipeline(textLogger.logCount[Item])
// error: could not find implicit value for parameter f: com.nike.fleam.logging.Counter[repl.MdocSession.MdocApp.Item,com.nike.fleam.logging.LogMessage]
// val pipeline1 = new Pipeline(textLogger.logCount[Item])
//                              ^^^^^^^^^^^^^^^^^^^^^^^^^

We can see that the logger needs a Counter for Item. So, easy, let's create one.

First we'll start with the configuration.

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import com.nike.fleam.configuration._
import com.typesafe.config.ConfigFactory

val config1 = ConfigFactory.parseString("""
  {
    textLogger {
      batchSize = 100000
      within = 1 seconds
    }
  }
""")

val textLoggingConfig = config1.as[GroupedWithinConfiguration]("textLogger")

Now we can create a Counter for Items. A counter defines how the item is counted. The most basic one is countWithin which sums items at some max batchSize or within some time frame. This saves us the overload of a log message for each item. The counter doesn't impede the flow of the stream.

implicit val itemTextCounter: Counter[Item, LogMessage] = new Counter[Item, LogMessage] {
  val flow = Counters.countWithin[Item](textLoggingConfig).map(count => LogMessage(s"Processed $count items"))
}

Now when we try to create our pipeline it finds our counter.

val pipeline2 = new Pipeline(textLogger.logCount[Item])

Okay, so how much harder is it do a CloudWatch logger? Let's start creating one.

import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.regions.Region

val awsClient = CloudWatchAsyncClient.builder().region(Region.of("us-west-2")).build()
val cloudWatchLogger = CloudWatch(awsClient)

The main difference in creating the client is we need an AmazonCloudWatch instead of a function String => Unit.

Now we're going to need a Counter of Item for CloudWatch too, so let's define the config.

val config2 = ConfigFactory.parseString("""
  {
    cloudwatchLogger {
      batchSize = 100000
      within = 1 seconds
    }
  }
""")

Defining the Counter is a little different. Instead of creating a LogMessage we need to create a PutMetricDataRequest. Cloudwatch.wrap helps us do this easily.

import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest

val cloudwatchLoggingConfig = config2.as[GroupedWithinConfiguration]("cloudwatchLogger")

implicit val itemCloudwatchCounter: Counter[Item, PutMetricDataRequest] = new Counter[Item, PutMetricDataRequest] {
  val flow = Counters.countWithin[Item](cloudwatchLoggingConfig).map(count =>
    CloudWatch.wrap(
      namespace = "Example",
      metricName = "Item",
      count = count))
}

So now we can use our CloudWatch logger in our pipeline.

val pipeline3 = new Pipeline(cloudWatchLogger.logCount[Item])

So great, we can swap them out. But what if we want to log both text and send data off to CloudWatch? Well, since they're both Flow[Item, Item, _] we can compose them together.

val pipeline4 = new Pipeline(textLogger.logCount[Item].via(cloudWatchLogger.logCount[Item]))