Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for pluggable metric reporting #81

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val docs = project
.settings(dontPublish)
.settings(
name := "Akka Persistence plugin for Amazon DynamoDB",
libraryDependencies ++= Dependencies.docs,
libraryDependencies ++= (Dependencies.TestDeps.cloudwatchMetricPublisher +: Dependencies.docs),
makeSite := makeSite.dependsOn(LocalRootProject / ScalaUnidoc / doc).value,
previewPath := (Paradox / siteSubdirName).value,
Preprocess / siteSubdirName := s"api/akka-persistence-dynamodb/${projectInfoVersion.value}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.metrics.MetricPublisher

object ClientProvider extends ExtensionId[ClientProvider] {
def createExtension(system: ActorSystem[_]): ClientProvider = new ClientProvider(system)
Expand All @@ -35,6 +36,7 @@ object ClientProvider extends ExtensionId[ClientProvider] {
class ClientProvider(system: ActorSystem[_]) extends Extension {
private val clients = new ConcurrentHashMap[String, DynamoDbAsyncClient]
private val clientSettings = new ConcurrentHashMap[String, ClientSettings]
private val metricsProvider = SDKClientMetricsResolver.resolve(system)

CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close DynamoDB clients") { () =>
Expand All @@ -48,7 +50,7 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
configLocation,
configLocation => {
val settings = clientSettingsFor(configLocation)
createClient(settings)
createClient(settings, metricsProvider.map(_.metricsPublisherFor(configLocation)))
})
}

Expand All @@ -63,7 +65,7 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
}
}

private def createClient(settings: ClientSettings): DynamoDbAsyncClient = {
private def createClient(settings: ClientSettings, metricsPublisher: Option[MetricPublisher]): DynamoDbAsyncClient = {
val httpClientBuilder = NettyNioAsyncHttpClient.builder
.maxConcurrency(settings.http.maxConcurrency)
.maxPendingConnectionAcquires(settings.http.maxPendingConnectionAcquires)
Expand Down Expand Up @@ -98,6 +100,8 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
overrideConfigurationBuilder = overrideConfigurationBuilder.apiCallAttemptTimeout(timeout.toJava)
}

metricsPublisher.foreach { mp => overrideConfigurationBuilder.addMetricPublisher(mp) }

overrideConfigurationBuilder.build()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.util

import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi

import software.amazon.awssdk.metrics.MetricCollection
import software.amazon.awssdk.metrics.MetricPublisher

import scala.jdk.CollectionConverters.ListHasAsScala

import java.util.concurrent.ConcurrentHashMap

/**
* Service Provider Interface for injecting AWS SDK MetricPublisher into the underlying DynamoDB client (see
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/metrics-list.html).
*
* Implementations must include a single constructor with one argument: an `akka.actor.ClassicActorSystemProvider`. To
* setup your implementation, add a setting to your 'application.conf':
*
* {{{
* akka.persistence.dynamodb.client.metrics-providers += com.myexample.MyAWSMetricsProvider
* }}}
*/
@ApiMayChange
trait SDKClientMetricsProvider {

/**
* Given an overall config path for Akka Persistence DynamoDB (e.g. 'akka.persistence.dynamodb') returns an instance
* of an AWS SDK MetricPublisher which publishes SDK client metrics to the location of this implementation's choosing.
*/
def metricsPublisherFor(configLocation: String): MetricPublisher
}

/** INTERNAL API */
@InternalApi
private[dynamodb] object SDKClientMetricsResolver {
def resolve(system: ClassicActorSystemProvider): Option[SDKClientMetricsProvider] = {
val providersPath = "akka.persistence.dynamodb.client.metrics-providers"
val config = system.classicSystem.settings.config
if (!config.hasPath(providersPath)) {
None
} else {
val fqcns = config.getStringList(providersPath)

fqcns.size match {
case 0 => None
case 1 => Some(createProvider(system, fqcns.get(0)))
case _ =>
val providers = fqcns.asScala.toSeq.map(fqcn => createProvider(system, fqcn))
Some(new EnsembleSDKClientMetricsProvider(providers))
}
}
}

def createProvider(system: ClassicActorSystemProvider, fqcn: String): SDKClientMetricsProvider = {
system.classicSystem
.asInstanceOf[ExtendedActorSystem]
.dynamicAccess
.createInstanceFor[SDKClientMetricsProvider](fqcn, List(classOf[ClassicActorSystemProvider] -> system))
.get
}

// This technically does not follow the construction convention that would allow it
// to be reflectively constructed, but we don't reflectively construct it
private class EnsembleSDKClientMetricsProvider(providers: Seq[SDKClientMetricsProvider])
extends SDKClientMetricsProvider {
def metricsPublisherFor(configLocation: String): MetricPublisher =
instances.computeIfAbsent(
configLocation,
path =>
new MetricPublisher {
private val publishers = providers.map(_.metricsPublisherFor(configLocation))

def publish(metricCollection: MetricCollection): Unit = {
publishers.foreach(_.publish(metricCollection))
}

def close(): Unit = {
publishers.foreach(_.close())
}
})

private val instances = new ConcurrentHashMap[String, MetricPublisher]()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package akka.persistence.dynamodb.util

import scala.concurrent.duration._
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.jdk.OptionConverters._

import akka.actor.ClassicActorSystemProvider
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.ActorTestKitBase
import akka.util.JavaDurationConverters._
Expand All @@ -17,6 +19,8 @@ import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.core.retry.RetryMode
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.metrics.MetricCollection
import software.amazon.awssdk.metrics.MetricPublisher

class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {

Expand Down Expand Up @@ -62,6 +66,20 @@ class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {
compressionConfiguration.minimumCompressionThresholdInBytes shouldBe 10240
}

"create client with a MetricPublisher" in withActorTestKit("""
akka.persistence.dynamodb.client {
region = "us-east-1"
metrics-providers += akka.persistence.dynamodb.util.TestNoopMetricsProvider
}
""") { testKit =>
val clientConfigLocation = "akka.persistence.dynamodb.client"
val client = ClientProvider(testKit.system).clientFor(clientConfigLocation)

val clientConfiguration = client.serviceClientConfiguration
val overrideConfiguration = clientConfiguration.overrideConfiguration
overrideConfiguration.metricPublishers.asScala shouldNot be(empty)
}

"create client with configured settings" in withActorTestKit("""
akka.persistence.dynamodb.client {
call-timeout = 3 seconds
Expand Down Expand Up @@ -158,3 +176,11 @@ class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {
}

}

class TestNoopMetricsProvider(system: ClassicActorSystemProvider) extends SDKClientMetricsProvider {
def metricsPublisherFor(configLocation: String): MetricPublisher =
new MetricPublisher {
def publish(collection: MetricCollection): Unit = ()
def close(): Unit = ()
}
}
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This Akka Persistence plugin allows for using Amazon DynamoDB as a backend for A
* [Query Plugin](query.md)
* [Projection](projection.md)
* [Configuration](config.md)
* [Observability](observability.md)
* [Database cleanup](cleanup.md)
* [Contributing](contributing.md)

Expand Down
23 changes: 23 additions & 0 deletions docs/src/main/paradox/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Observability

This plugin supports injecting an [AWS `MetricPublisher`](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/design/core/metrics/Design.md) into the underlying DynamoDB SDK client. This injection is accomplished by defining a class @scala[extending]@java[implementing] @apidoc[akka.persistence.dynamodb.util.SDKClientMetricsProvider].

Your implementation must expose a single constructor with one argument: an `akka.actor.ClassicActorSystemProvider`. Its `metricsPublisherFor` method will take the config path to the `client` section of this instance of the plugin @ref:[configuration](config.md#multiple-plugins).

The AWS SDK provides an implementation of `MetricPublisher` which publishes to [Amazon CloudWatch](https://docs.aws.amazon.com/cloudwatch/). An `SDKClientMetricsProvider` providing [this `MetricPublisher`](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/metrics.html) with defaults would look like:

Scala
: @@snip [cloudwatch default](/docs/src/test/scala/docs/CloudWatchProvider.scala) { #cloudwatch-default }

Java
: @@snip [cloudwatch default](/docs/src/test/java/jdocs/CloudWatchWithDefaultConfigurationMetricsProvider.java) { #cloudwatch-default }

To register your provider implementation with the plugin, add its fully-qualified class name to the configuration path `akka.persistence.dynamodb.client.metrics-providers` (e.g. in `application.conf`):

```
akka.persistence.dynamodb.client.metrics-providers += domain.package.CloudWatchWithDefaultConfigurationMetricsProvider
```

If multiple providers are specified, they will automatically be combined into a "meta-provider" which provides a publisher which will publish using _all_ of the specified providers' respective publishers.

If implementing your own `MetricPublisher`, [Amazon recommends that care be taken to not block the thread calling the methods of the `MetricPublisher`](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/design/core/metrics/Design.md#performance): all I/O and "heavy" computation should be performed asynchronously (e.g., since you have an `ActorSystem`, by `tell`ing the metrics to an actor) and control immediately returned to the caller.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package jdocs;

// #cloudwatch-default
import akka.actor.ClassicActorSystemProvider;
import akka.persistence.dynamodb.util.SDKClientMetricsProvider;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher;

public class CloudWatchWithDefaultConfigurationMetricsProvider implements SDKClientMetricsProvider {
public CloudWatchWithDefaultConfigurationMetricsProvider(ClassicActorSystemProvider system) {
}

@Override
public MetricPublisher metricsPublisherFor(String configLocation) {
// These are just the defaults... a more elaborate configuration using its builder is possible
return CloudWatchMetricPublisher.create();
}
}
// #cloudwatch-default
16 changes: 16 additions & 0 deletions docs/src/test/scala/docs/CloudWatchProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package docs

// #cloudwatch-default
import akka.actor.ClassicActorSystemProvider
import akka.persistence.dynamodb.util.SDKClientMetricsProvider
import software.amazon.awssdk.metrics.MetricPublisher
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher

class CloudWatchWithDefaultConfigurationMetricsProvider(system: ClassicActorSystemProvider)
extends SDKClientMetricsProvider {
def metricsPublisherFor(configLocation: String): MetricPublisher = {
// These are just the defaults... a more elaborate configuration using its builder is possible
CloudWatchMetricPublisher.create()
}
}
// #cloudwatch-default
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ object Dependencies {
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.12" % Test // ApacheV2
val junit = "junit" % "junit" % "4.12" % Test // Eclipse Public License 1.0
val junitInterface = "com.novocode" % "junit-interface" % "0.11" % Test // "BSD 2-Clause"

val cloudwatchMetricPublisher = "software.amazon.awssdk" % "cloudwatch-metric-publisher" % AwsSdkVersion % Test
}

import Compile._
Expand Down