Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 returned reactor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 18, 2024
1 parent d843f31 commit 23c13aa
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.servicemesh.envoycontrol.utils

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.asSequence

private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils")
private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") }
Expand Down Expand Up @@ -110,7 +112,12 @@ private fun measureQueueSubscriptionBuffer(
name: String,
meterRegistry: MeterRegistry
) {
logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry")
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
subscription,
queueSubscriptionBufferExtractor
)
}

private fun measureScannableBuffer(
Expand All @@ -119,7 +126,49 @@ private fun measureScannableBuffer(
innerSources: Int,
meterRegistry: MeterRegistry
) {
logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry")
val buffered = scannable.scan(Scannable.Attr.BUFFERED)
if (buffered == null) {
logger.error(
"Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " +
"Use measureBuffer() only on supported reactor operators"
)
return
}

meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
scannable,
scannableBufferExtractor
)

/**
* Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
* buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
*
* To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
* be available, so it must be stated explicitly as innerSources parameter.
*/
for (i in 0 until innerSources) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"),
scannable,
innerBufferExtractor(i)
)
}
}

private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 }
private fun innerBufferExtractor(index: Int) = { s: Scannable ->
s.inners().asSequence()
.elementAtOrNull(index)
?.let(scannableBufferExtractor)
?: -1.0
}

private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> ->
s.size.toDouble()
}

sealed class ParallelizableScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.testcontainers.shaded.org.awaitility.Awaitility
Expand All @@ -13,7 +12,6 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.BiFunction

@Disabled
class ReactorUtilsTest {

@Test
Expand Down

0 comments on commit 23c13aa

Please sign in to comment.