diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index fa0872b08..cccc757ed 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -60,8 +60,9 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) + .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") + .name("snapshot-updater-merged").metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -96,8 +97,9 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-groups-published", meterRegistry) .checkpoint("snapshot-updater-groups-published") + .name("snapshot-updater-groups-published").metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } @@ -119,11 +121,13 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) + .name("snapshot-updater-services-sampled").metrics() + .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-services-published", meterRegistry) .checkpoint("snapshot-updater-services-published") + .name("snapshot-updater-services-published").metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null