Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus (lint)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 7, 2024
1 parent cd0947b commit 179fe51
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ class ControlPlane private constructor(
ExecutorServiceMetrics(
executor,
executorServiceName,
"envoy-control",
Tags.of("executor", executorServiceName)
)
.bindTo(meterRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.watchMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.watchTypeTag
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
Expand All @@ -37,7 +40,8 @@ internal class GroupChangeWatcher(
return groupsChanged
.measureBuffer("group-change-watcher", meterRegistry)
.checkpoint("group-change-watcher-emitted")
.name("group_change_watcher")
.name(reactorMetricName)
.tag(watchTypeTag, "group")
.tap(Micrometer.metrics(meterRegistry))
.doOnSubscribe {
logger.info("Watching group changes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Compa
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
Expand All @@ -17,11 +19,10 @@ class GlobalStateChanges(
private val meterRegistry: MeterRegistry,
private val properties: SyncProperties
) {
private val scheduler = Micrometer.timedScheduler(
private val scheduler =
Schedulers.newBoundedElastic(
Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator"
), meterRegistry, "schedulers", Tags.of("name", "global-service-changes-combinator")
)
)

fun combined(): Flux<MultiClusterState> {
val clusterStatesStreams: List<Flux<MultiClusterState>> = clusterStateChanges.map { it.stream() }
Expand All @@ -45,10 +46,11 @@ class GlobalStateChanges(
.toMultiClusterState()
}
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.measureBuffer("global-service-changes-combinator", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(reactorMetricName)
.tag(metricEmitterTag, "global-service-changes-combinator")
.tap(Micrometer.metrics(meterRegistry))
.name("global-service-changes-emitted").metrics()
}

// todo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.clusterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncCancelledMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncSecondsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncTotalMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
Expand All @@ -35,7 +38,7 @@ class RemoteServices(
val aclFlux: Flux<MultiClusterState> = Flux.create({ sink ->
scheduler.scheduleWithFixedDelay({
meterRegistry.timer(
"cross_dc_synchronization.seconds",
crossDcSyncSecondsMetricName,
Tags.of(operationTag, "get-multi-cluster-state")
)
.recordCallable {
Expand All @@ -44,7 +47,7 @@ class RemoteServices(
}, 0, interval, TimeUnit.SECONDS)
}, FluxSink.OverflowStrategy.LATEST)
return aclFlux.doOnCancel {
meterRegistry.counter("cross_dc_synchronization.cancelled").increment()
meterRegistry.counter(crossDcSyncCancelledMetricName).increment()
logger.warn("Cancelling cross dc sync")
}
}
Expand Down Expand Up @@ -104,7 +107,7 @@ class RemoteServices(
state: ServicesState
): ClusterState {
meterRegistry.counter(
"cross_dc_synchronization.total", Tags.of(clusterTag, cluster)
crossDcSyncTotalMetricName, Tags.of(clusterTag, cluster)
)
.increment()
val clusterState = ClusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@ const val reactorMetricName = "reactor"
const val errorsMetricName = "errors.total"
const val connectionsMetricName = "connections"
const val requestsMetricName = "requests.total"
const val watchMetricName = "watch"
const val envoyControlWarmUpMetricName = "envoy.control.warmup.seconds"
const val crossDcSyncMetricName = "cross.dc.synchronization"
const val crossDcSyncCancelledMetricName = "$crossDcSyncMetricName.cancelled.total"
const val crossDcSyncSecondsMetricName = "$crossDcSyncMetricName.seconds"
const val crossDcSyncTotalMetricName = "$crossDcSyncMetricName.total"

const val connectionTypeTag = "connection-type"
const val streamTypeTag = "stream-type"
const val checkpointTag = "checkpoint"
const val watchTypeTag = "watch-type"
const val discoveryReqTypeTag = "discovery-request-type"
const val metricTypeTag = "metric-type"
const val metricEmitterTag = "metric-emitter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi
import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters
import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.statusTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.streamTypeTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.watchMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.watchTypeTag
import reactor.core.scheduler.Schedulers
import java.net.URI

Expand Down Expand Up @@ -173,14 +179,29 @@ class ControlPlaneConfig {
ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local"

fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics {
val metricName = "watched-services"
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also {
meterRegistry.gauge(metricName, Tags.of("status", "added"), it.servicesAdded)
meterRegistry.gauge(metricName, Tags.of("status", "removed"), it.servicesRemoved)
meterRegistry.gauge(metricName, Tags.of("status", "instance-changed"), it.instanceChanges)
meterRegistry.gauge(metricName, Tags.of("status", "snapshot-changed"), it.snapshotChanges)
meterRegistry.gauge(watchMetricName, Tags.of(statusTag, "added", watchTypeTag, "service"), it.servicesAdded)
meterRegistry.gauge(
watchMetricName,
Tags.of(statusTag, "removed", watchTypeTag, "service"),
it.servicesRemoved
)
meterRegistry.gauge(
watchMetricName,
Tags.of(statusTag, "instance-changed", watchTypeTag, "service"),
it.instanceChanges
)
meterRegistry.gauge(
watchMetricName,
Tags.of(statusTag, "snapshot-changed", watchTypeTag, "service"),
it.snapshotChanges
)
meterRegistry.gauge("cache.groups.count", it.cacheGroupsCount)
it.meterRegistry.more().counter("services.watch.errors.total", listOf(), it.errorWatchingServices)
it.meterRegistry.more().counter(
errorsMetricName,
Tags.of(metricEmitterTag, watchMetricName, watchTypeTag, "service"),
it.errorWatchingServices
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.envoyControlWarmUpMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems
import pl.allegro.tech.servicemesh.envoycontrol.utils.checkpointTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.time.Duration
Expand Down Expand Up @@ -51,11 +56,14 @@ class ConsulServiceChanges(
},
FluxSink.OverflowStrategy.LATEST
)
.measureDiscardedItems("consul-service-changes-emitted", metrics.meterRegistry)
.measureDiscardedItems("consul-service-changes", metrics.meterRegistry)
.checkpoint("consul-service-changes-emitted")
.name("consul-service-changes-emitted").metrics()
.name(reactorMetricName)
.tag(metricEmitterTag, "consul-service-changes")
.tag(checkpointTag, "emitted")
.checkpoint("consul-service-changes-emitted-distinct")
.name("consul-service-changes-emitted-distinct").metrics()
.tag(checkpointTag, "distinct")
.tap(Micrometer.metrics(metrics.meterRegistry))
.doOnCancel {
logger.warn("Cancelling watching consul service changes")
watcher.close()
Expand Down Expand Up @@ -226,7 +234,7 @@ class ConsulServiceChanges(
if (ready) {
val stopTimer = System.currentTimeMillis()
readinessStateHandler.ready()
metrics.meterRegistry.timer("envoy-control.warmup.seconds")
metrics.meterRegistry.timer(envoyControlWarmUpMetricName)
.record(
stopTimer - startTimer,
TimeUnit.SECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ interface MetricsDiscoveryServerCallbacksTest {
// given
val meterRegistry = envoyControl().app.meterRegistry()
consul().server.operations.registerService(service(), name = "echo")

for (meter in meterRegistry.meters) {
print(meter.toString())
}
// expect
untilAsserted {
expectedGrpcConnectionsGaugeValues().forEach { (type, value) ->
Expand Down

0 comments on commit 179fe51

Please sign in to comment.