Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 7, 2024
1 parent c435189 commit 4da0741
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 108 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ allprojects {
bytebuddy : '1.15.1',
re2j : '1.3',
xxhash : '0.10.1',
dropwizard : '4.2.26'
dropwizard : '4.2.26',
reactor_core_micrometer: '1.0.6'
]

dependencyManagement {
Expand Down
2 changes: 2 additions & 0 deletions envoy-control-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect'
api group: 'io.dropwizard.metrics', name: 'metrics-core', version: versions.dropwizard
api group: 'io.micrometer', name: 'micrometer-core'
api group: 'io.projectreactor', name: 'reactor-core-micrometer', version: versions.reactor_core_micrometer

implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j

api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ class ControlPlane private constructor(
ExecutorServiceMetrics(
executor,
executorServiceName,
"envoy-control",
Tags.of("executor", executorServiceName)
)
.bindTo(meterRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.function.Consumer
Expand All @@ -34,9 +37,14 @@ internal class GroupChangeWatcher(

fun onGroupAdded(): Flux<List<Group>> {
return groupsChanged
.measureBuffer("group-change-watcher-emitted", meterRegistry)
.measureBuffer("group-change-watcher", meterRegistry)
.checkpoint("group-change-watcher-emitted")
.name("group-change-watcher-emitted").metrics()
.name(REACTOR_METRIC)
.tag(WATCH_TYPE_TAG, "group")
.tap(Micrometer.metrics(meterRegistry))
.doOnSubscribe {
logger.info("Watching group changes")
}
.doOnCancel {
logger.warn("Cancelling watching group changes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.util.function.Supplier

import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.utils.PROTOBUF_CACHE_METRIC

internal class CachedProtoResourcesSerializer(
private val meterRegistry: MeterRegistry,
Expand All @@ -27,7 +28,7 @@ internal class CachedProtoResourcesSerializer(
}

private val cache: Cache<Message, Any> = createCache("protobuf-cache")
private val timer = createTimer(reportMetrics, meterRegistry, "protobuf-cache.serialize.time")
private val timer = createTimer(reportMetrics, meterRegistry, PROTOBUF_CACHE_METRIC)

private fun <K, V> createCache(cacheName: String): Cache<K, V> {
return if (reportMetrics) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks

import com.google.common.net.InetAddresses.increment
import io.envoyproxy.controlplane.cache.Resources
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
import java.util.concurrent.atomic.AtomicInteger

class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks {
Expand Down Expand Up @@ -38,8 +42,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)

connectionsByType.forEach { (type, typeConnections) ->
meterRegistry.gauge(
"connections",
Tags.of("connection-type", "grpc", "stream-type", type.name.lowercase()),
CONNECTIONS_METRIC,
Tags.of(CONNECTION_TYPE_TAG, "grpc", STREAM_TYPE_TAG, type.name.lowercase()),
typeConnections
)
}
Expand All @@ -57,11 +61,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
meterRegistry.counter(
"requests.total",
REQUESTS_METRIC,
Tags.of(
"connection-type", "grpc",
"stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
"discovery-request-type", "total"
CONNECTION_TYPE_TAG, "grpc",
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
DISCOVERY_REQ_TYPE_TAG, "total"
)
)
.increment()
Expand All @@ -72,11 +76,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
request: V3DeltaDiscoveryRequest
) {
meterRegistry.counter(
"requests.total",
REQUESTS_METRIC,
Tags.of(
"connection-type",
"grpc", "stream-type",
StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "discovery-request-type", "delta"
CONNECTION_TYPE_TAG, "grpc",
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
DISCOVERY_REQ_TYPE_TAG, "delta"
)
)
.increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
Expand Down Expand Up @@ -51,12 +61,13 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot.updater.count.total")
.tag("status", "merged")
.tag("type", "global")
.metrics()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.tap(Micrometer.metrics(meterRegistry))
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -91,18 +102,20 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot.updater.count.total")
.tag("type", "groups")
.tag("status", "published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
.tap(Micrometer.metrics(meterRegistry))
.onErrorResume { e ->
meterRegistry.counter(
"snapshot.updater.errors.total",
Tags.of("type", "groups")
ERRORS_TOTAL_METRIC,
Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater")
)
.increment()
logger.error("Unable to process new group", e)
Expand All @@ -112,19 +125,19 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot.updater.count.total", meterRegistry)
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "sampled")
.tap(Micrometer.metrics(meterRegistry))
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "published")
.metrics()
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.tap(Micrometer.metrics(meterRegistry))
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down Expand Up @@ -152,8 +165,8 @@ class SnapshotUpdater(
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
"snapshot.updater.errors.total",
Tags.of("type", "services")
ERRORS_TOTAL_METRIC,
Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
Expand All @@ -162,7 +175,7 @@ class SnapshotUpdater(

private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) {
meterRegistry.timer(
"simple-cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot")
SIMPLE_CACHE_METRIC, Tags.of(SERVICE_TAG, serviceName, OPERATION_TAG, "set-snapshot")
)
} else {
noopTimer
Expand All @@ -176,14 +189,18 @@ class SnapshotUpdater(
}
} catch (e: Throwable) {
meterRegistry.counter(
"snapshot.updater.errors.total", Tags.of("service", group.serviceName)
ERRORS_TOTAL_METRIC,
Tags.of(
SERVICE_TAG, group.serviceName,
OPERATION_TAG, "create-snapshot",
METRIC_EMITTER_TAG, "snapshot-updater"
)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer =
meterRegistry.timer("snapshot.updater.duration.seconds", Tags.of("type", "groups"))
private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds")

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -198,8 +215,7 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter("snapshot.updater.errors.total", Tags.of("type", "communication-mode"))
.increment()
meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

Expand All @@ -15,9 +19,10 @@ class GlobalStateChanges(
private val meterRegistry: MeterRegistry,
private val properties: SyncProperties
) {
private val scheduler = Schedulers.newBoundedElastic(
Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator"
)
private val scheduler =
Schedulers.newBoundedElastic(
Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator"
)

fun combined(): Flux<MultiClusterState> {
val clusterStatesStreams: List<Flux<MultiClusterState>> = clusterStateChanges.map { it.stream() }
Expand All @@ -41,9 +46,11 @@ class GlobalStateChanges(
.toMultiClusterState()
}
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.measureBuffer("global-service-changes-combinator", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name("global-service-changes-emitted").metrics()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes-combinator")
.tap(Micrometer.metrics(meterRegistry))
}

private fun combinedExperimentalFlow(
Expand All @@ -70,10 +77,13 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name("global-service-changes-emitted").metrics()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry)
.publishOn(scheduler, 1)
.checkpoint("global-service-changes-published")
.name("global-service-changes-published").metrics()
.tag(CHECKPOINT_TAG, "published")
.tap(Micrometer.metrics(meterRegistry))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import reactor.core.publisher.Flux

class RemoteClusterStateChanges(
Expand All @@ -14,5 +16,7 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name("cross.dc.synchronization.distinct").metrics()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation")
.metrics()
}
Loading

0 comments on commit 4da0741

Please sign in to comment.