Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus (rev…
Browse files Browse the repository at this point in the history
…iew fixes)
  • Loading branch information
nastassia-dailidava committed Oct 1, 2024
1 parent 638ec83 commit 3c93684
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 38 deletions.
6 changes: 3 additions & 3 deletions docs/deployment/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ Envoy Control Runner exposes a set of metrics on standard Spring Actuator's `/ac

Metric | Description | Labels
----------------------|----------------------------------------------------|------------------------------------
**grpc.connections** | Number of running gRPC connections of a given type | type (cds/xds/lds/rds/sds/unknown)
**connections** | Number of running gRPC connections of a given type | stream-type (cds/xds/lds/rds/sds/unknown), connection-type (grpc)

#### xDS requests

Metric | Description | Labels
-------------------------|---------------------------------------------------|--------------------------------------------------------------
**grpc.requests.count** | Counter of received gRPC requests of a given type | type (cds/xds/lds/rds/sds/unknown), metric-type(total/delta)
**requests.total** | Counter of received gRPC requests of a given type | stream-type (cds/xds/lds/rds/sds/unknown), connection-type (grpc), discovery-request-type(total/delta)

#### Snapshot

Expand All @@ -62,4 +62,4 @@ Envoy Control Runner exposes a set of metrics on standard Spring Actuator's `/ac

Metric | Description | Labels
-------------------------------------------|----------------------------------------------------------------|----------------------------------------------
**cross-dc-synchronization.errors.total** | Counter of synchronization errors for a given DC and operation | cluster, operation (get-instances/get-state)
**cross.dc.synchronization.errors.total** | Counter of synchronization errors for a given DC and operation | cluster, operation (get-instances/get-state)
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
.map { type -> type to AtomicInteger(0) }
.toMap()

meterRegistry.gauge("grpc.connections", Tags.of("type", "all"), connections)
connectionsByType.forEach { (type, typeConnections) ->
meterRegistry.gauge("grpc.connections", Tags.of("type", type.name.lowercase()), typeConnections)
meterRegistry.gauge("connections", Tags.of("connection-type", "grpc", "stream-type", type.name.lowercase()), typeConnections)
}
}

Expand All @@ -54,8 +53,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "total")
"requests.total",
Tags.of("connection-type", "grpc", "stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "discovery-request-type", "total")
)
.increment()
}
Expand All @@ -65,8 +64,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
request: V3DeltaDiscoveryRequest
) {
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "delta")
"requests.total",
Tags.of("connection-type", "grpc", "stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "discovery-request-type", "delta")
)
.increment()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class EnvoySnapshotFactory(
)
sample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
"snapshot.factory.seconds",
Tags.of("operation", "new-snapshot", "type", "global")
)
)
Expand Down Expand Up @@ -163,7 +163,7 @@ class EnvoySnapshotFactory(
val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot)
groupSample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
"snapshot.factory.seconds",
Tags.of("operation", "new-snapshot", "type", "group")
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater.count.total", meterRegistry)
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater.count.total")
.tag("type", "groups")
Expand All @@ -101,7 +101,7 @@ class SnapshotUpdater(
}
.onErrorResume { e ->
meterRegistry.counter(
"snapshot-updater.errors.total",
"snapshot.updater.errors.total",
Tags.of("type", "groups")
)
.increment()
Expand All @@ -112,16 +112,16 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot-updater.count.total")
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot-updater.count.total", meterRegistry)
.onBackpressureLatestMeasured("snapshot.updater.count.total", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater.count.total", meterRegistry) // todo
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name("snapshot-updater.count.total")
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "published")
.metrics()
Expand Down Expand Up @@ -152,7 +152,7 @@ class SnapshotUpdater(
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
"snapshot-updater.errors.total",
"snapshot.updater.errors.total",
Tags.of("type", "services")
).increment()
logger.error("Unable to process service changes", e)
Expand All @@ -176,14 +176,14 @@ class SnapshotUpdater(
}
} catch (e: Throwable) {
meterRegistry.counter(
"snapshot-updater.errors.total", Tags.of("service", group.serviceName)
"snapshot.updater.errors.total", Tags.of("service", group.serviceName)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer =
meterRegistry.timer("snapshot-updater.duration.seconds", Tags.of("type", "groups"))
meterRegistry.timer("snapshot.updater.duration.seconds", Tags.of("type", "groups"))

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -198,7 +198,7 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter("snapshot-updater.errors.total", Tags.of("type", "communication-mode"))
meterRegistry.counter("snapshot.updater.errors.total", Tags.of("type", "communication-mode"))
.increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name("cross-dc-changes-distinct").metrics()
.name("cross.dc.synchronization.distinct").metrics()
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class RemoteServices(
fun getChanges(interval: Long): Flux<MultiClusterState> {
val aclFlux: Flux<MultiClusterState> = Flux.create({ sink ->
scheduler.scheduleWithFixedDelay({
meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-multi-cluster-state"))
meterRegistry.timer("cross.dc.synchronization.seconds", Tags.of("operation", "get-multi-cluster-state"))
.recordCallable {
getChanges(sink::next, interval)
}
}, 0, interval, TimeUnit.SECONDS)
}, FluxSink.OverflowStrategy.LATEST)
return aclFlux.doOnCancel {
meterRegistry.counter("cross-dc-synchronization.cancelled").increment()
meterRegistry.counter("cross.dc.synchronization.cancelled").increment()
logger.warn("Cancelling cross dc sync")
}
}
Expand All @@ -62,7 +62,7 @@ class RemoteServices(
.orTimeout(interval, TimeUnit.SECONDS)
.exceptionally {
meterRegistry.counter(
"cross-dc-synchronization.errors.total",
"cross.dc.synchronization.errors.total",
Tags.of("cluster", cluster, "operation", "get-state")
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
Expand All @@ -76,7 +76,7 @@ class RemoteServices(
cluster to instances
} catch (e: Exception) {
meterRegistry.counter(
"cross-dc-synchronization.errors.total",
"cross.dc.synchronization.errors.total",
Tags.of("cluster", cluster, "operation", "get-instances")
).increment()
logger.warn("Failed fetching instances from $cluster", e)
Expand All @@ -89,7 +89,7 @@ class RemoteServices(
state: ServicesState
): ClusterState {
meterRegistry.counter(
"cross-dc-synchronization.total", Tags.of("cluster", cluster)
"cross.dc.synchronization.total", Tags.of("cluster", cluster)
)
.increment()
val clusterState = ClusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ class SnapshotUpdaterTest {
val snapshot = cache.getSnapshot(servicesGroup)
assertThat(snapshot).isEqualTo(null)
assertThat(
simpleMeterRegistry.find("snapshot-updater.errors.total")
simpleMeterRegistry.find("snapshot.updater.errors.total")
.tags(Tags.of("service", "example-service"))
.counter()?.count()
).isEqualTo(1.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ class RestTemplateControlPlaneClient(
}

private fun <T> timed(function: () -> T): T {
return meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-state"))
return meterRegistry.timer("cross.dc.synchronization.seconds", Tags.of("operation", "get-state"))
.record(function)
}

private fun success() {
meterRegistry.counter("cross-dc-synchronization", Tags.of("operation", "get-state", "status", "success"))
meterRegistry.counter("cross.dc.synchronization", Tags.of("operation", "get-state", "status", "success"))
.increment()
}

private fun failure() {
meterRegistry.counter("cross-dc-synchronization", Tags.of("operation", "get-state", "status", "failure"))
meterRegistry.counter("cross.dc.synchronization", Tags.of("operation", "get-state", "status", "failure"))
.increment()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.micrometer.core.instrument.Tags
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.platform.commons.util.Preconditions.condition
import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted
import pl.allegro.tech.servicemesh.envoycontrol.config.Ads
import pl.allegro.tech.servicemesh.envoycontrol.config.DeltaAds
Expand Down Expand Up @@ -233,14 +232,15 @@ interface MetricsDiscoveryServerCallbacksTest {
// expect
untilAsserted {
expectedGrpcConnectionsGaugeValues().forEach { (type, value) ->
val metric = "grpc.connections"
val metric = "connections"
assertThat(
meterRegistry.find(metric)
.tags(Tags.of("type", type.name.lowercase())).gauge()
.tags(Tags.of("stream-type", type.name.lowercase(), "connection-type", "grpc")).gauge()
).isNotNull
assertThat(
meterRegistry.get(metric)
.tags(Tags.of("type", type.name.lowercase())).gauge().value().toInt()
.tags(Tags.of("stream-type", type.name.lowercase(), "connection-type", "grpc")).gauge().value()
.toInt()
).isEqualTo(value)
}
}
Expand All @@ -259,10 +259,10 @@ interface MetricsDiscoveryServerCallbacksTest {
}
}

private fun assertCondition(type: String, condition: Predicate<Int?>, metricType: String) {
private fun assertCondition(type: String, condition: Predicate<Int?>, reqTpe: String) {
val counterValue =
envoyControl().app.meterRegistry().find("grpc.requests.count")
.tags(Tags.of("type", type, "metric-type", metricType))
envoyControl().app.meterRegistry().find("requests.total")
.tags(Tags.of("stream-type", type, "discovery-request-type", reqTpe, "connection-type", "grpc"))
.counter()?.count()?.toInt()
logger.info("$type $counterValue")
assertThat(counterValue).satisfies(Consumer { condition.test(it) })
Expand Down

0 comments on commit 3c93684

Please sign in to comment.