Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 returned snapshot updater metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 18, 2024
1 parent 23c13aa commit 8979a5c
Showing 1 changed file with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.envoyproxy.controlplane.cache.SnapshotCache
import io.envoyproxy.controlplane.cache.v3.Snapshot
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
Expand All @@ -17,6 +18,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
Expand Down Expand Up @@ -60,6 +62,11 @@ class SnapshotUpdater(
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -117,11 +124,18 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "sampled")
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down Expand Up @@ -184,10 +198,13 @@ class SnapshotUpdater(
}
}

private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds")

private fun updateSnapshotForGroups(
groups: Collection<Group>,
result: UpdateResult
): Mono<UpdateResult> {
val sample = Timer.start()
versions.retainGroups(cache.groups())
val results = Flux.fromIterable(groups)
.doOnNextScheduledOn(groupSnapshotScheduler) { group ->
Expand All @@ -205,6 +222,7 @@ class SnapshotUpdater(
}
}
return results.then(Mono.fromCallable {
sample.stop(updateSnapshotForGroupsTimer)
result
})
}
Expand Down

0 comments on commit 8979a5c

Please sign in to comment.