Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 rolled back reactor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 19, 2024
1 parent f43cedc commit 736d27c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
Expand Down Expand Up @@ -60,13 +57,9 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.metrics()
.name("snapshot-updater-merged").metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -101,16 +94,12 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater-groups-published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
.metrics()
.onErrorResume { e ->
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
Expand All @@ -124,18 +113,13 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "sampled")
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
.name("snapshot-updater-services-sampled").metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.metrics()
.name("snapshot-updater-services-published").metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand All @@ -162,10 +146,7 @@ class SnapshotUpdater(
}
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services")
).increment()
meterRegistry.counter("snapshot-updater.services.updates.errors").increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
Expand Down Expand Up @@ -45,11 +42,9 @@ class GlobalStateChanges(
.toMultiClusterState()
}
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combinator", meterRegistry)
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes-combinator")
.metrics()
.name("global-service-changes-emitted").metrics()
}

private fun combinedExperimentalFlow(
Expand All @@ -76,13 +71,10 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.name("global-service-changes-emitted").metrics()
.onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry)
.publishOn(scheduler, 1)
.checkpoint("global-service-changes-published")
.tag(CHECKPOINT_TAG, "published")
.metrics()
.name("global-service-changes-published").metrics()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import reactor.core.publisher.Flux

class RemoteClusterStateChanges(
Expand All @@ -16,7 +14,5 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation")
.metrics()
.name("cross-dc-changes-distinct").metrics()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.time.Duration
Expand Down Expand Up @@ -55,14 +52,11 @@ class ConsulServiceChanges(
},
FluxSink.OverflowStrategy.LATEST
)
.measureDiscardedItems("consul-service-changes", metrics.meterRegistry)
.measureDiscardedItems("consul-service-changes-emitted", metrics.meterRegistry)
.checkpoint("consul-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "consul-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.name("consul-service-changes-emitted").metrics()
.checkpoint("consul-service-changes-emitted-distinct")
.tag(CHECKPOINT_TAG, "distinct")
.metrics()
.name("consul-service-changes-emitted-distinct").metrics()
.doOnCancel {
logger.warn("Cancelling watching consul service changes")
watcher.close()
Expand Down

0 comments on commit 736d27c

Please sign in to comment.