From d7be7ae47545208cb289786bab18ace5f9bb47d5 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 13 Nov 2024 01:20:07 +0800 Subject: [PATCH] Metrics plugin implementation --- composer.json | 1 + .../HttpServer/HttpServerProcess.php | 2 +- .../HttpServer/Internal/HttpServer.php | 22 +-- src/BundledPlugin/HttpServer/Listen.php | 6 + src/BundledPlugin/Metrics/Counter.php | 49 ++++++ .../Exception/LabelsNotMatchException.php | 21 +++ .../Exception/MetricNotFoundException.php | 13 ++ src/BundledPlugin/Metrics/Gauge.php | 92 ++++++++++++ src/BundledPlugin/Metrics/Histogram.php | 92 ++++++++++++ .../Internal/Message/GetMetricMessage.php | 45 ++++++ .../Internal/Message/GetMetricResponse.php | 17 +++ .../Message/IncreaseCounterMessage.php | 21 +++ .../Message/ObserveHistorgamMessage.php | 25 ++++ .../Message/ObserveSummaryMessage.php | 25 ++++ .../Message/RegisterMetricMessage.php | 48 ++++++ .../Internal/Message/SetGaugeMessage.php | 22 +++ .../Metrics/Internal/MessageBusRegistry.php | 141 ++++++++++++++++++ .../Internal/MessageBusRegistryHandler.php | 115 ++++++++++++++ src/BundledPlugin/Metrics/Internal/Metric.php | 36 +++++ .../Metrics/Internal/NotFoundPage.php | 48 ++++++ src/BundledPlugin/Metrics/MetricsPlugin.php | 97 ++++++++++++ .../Metrics/RegistryInterface.php | 52 +++++++ src/BundledPlugin/Metrics/Summary.php | 50 +++++++ src/Internal/MasterProcess.php | 2 +- src/Process.php | 4 +- 25 files changed, 1034 insertions(+), 12 deletions(-) create mode 100644 src/BundledPlugin/Metrics/Counter.php create mode 100644 src/BundledPlugin/Metrics/Exception/LabelsNotMatchException.php create mode 100644 src/BundledPlugin/Metrics/Exception/MetricNotFoundException.php create mode 100644 src/BundledPlugin/Metrics/Gauge.php create mode 100644 src/BundledPlugin/Metrics/Histogram.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/GetMetricMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/GetMetricResponse.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/IncreaseCounterMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/ObserveHistorgamMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/ObserveSummaryMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/RegisterMetricMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/Message/SetGaugeMessage.php create mode 100644 src/BundledPlugin/Metrics/Internal/MessageBusRegistry.php create mode 100644 src/BundledPlugin/Metrics/Internal/MessageBusRegistryHandler.php create mode 100644 src/BundledPlugin/Metrics/Internal/Metric.php create mode 100644 src/BundledPlugin/Metrics/Internal/NotFoundPage.php create mode 100644 src/BundledPlugin/Metrics/MetricsPlugin.php create mode 100644 src/BundledPlugin/Metrics/RegistryInterface.php create mode 100644 src/BundledPlugin/Metrics/Summary.php diff --git a/composer.json b/composer.json index f85255b..4f218e9 100644 --- a/composer.json +++ b/composer.json @@ -22,6 +22,7 @@ "amphp/http-server": "^3.3.1", "amphp/socket": "^2.3.1", "luzrain/polyfill-inotify": "^1.0", + "promphp/prometheus_client_php": "^2.12", "psr/container": "^2.0", "psr/http-message": "^2.0", "psr/log": "^3.0", diff --git a/src/BundledPlugin/HttpServer/HttpServerProcess.php b/src/BundledPlugin/HttpServer/HttpServerProcess.php index c8ec145..9a2eb9f 100644 --- a/src/BundledPlugin/HttpServer/HttpServerProcess.php +++ b/src/BundledPlugin/HttpServer/HttpServerProcess.php @@ -113,7 +113,7 @@ public function handleRequest(Request $request): Response /** * @return list */ - private static function normalizeListenList(self|string|array $listen): array + private static function normalizeListenList(Listen|string|array $listen): array { $listen = \is_array($listen) ? $listen : [$listen]; $ret = []; diff --git a/src/BundledPlugin/HttpServer/Internal/HttpServer.php b/src/BundledPlugin/HttpServer/Internal/HttpServer.php index 0f9f02c..ca98027 100644 --- a/src/BundledPlugin/HttpServer/Internal/HttpServer.php +++ b/src/BundledPlugin/HttpServer/Internal/HttpServer.php @@ -113,7 +113,7 @@ public function start(): void ); foreach ($this->listen as $listen) { - $socketHttpServer->expose(...$this->createInternetAddressAndContext($listen)); + $socketHttpServer->expose(...self::createInternetAddressAndContext($listen, true, self::DEFAULT_TCP_BACKLOG)); } $socketHttpServer->start($this->requestHandler, $errorHandler); @@ -122,20 +122,24 @@ public function start(): void /** * @return array{0: InternetAddress, 1: BindContext} */ - private function createInternetAddressAndContext(Listen $listen): array + public static function createInternetAddressAndContext(Listen $listen, bool $reusePort = false, int $backlog = 0): array { $internetAddress = new InternetAddress($listen->host, $listen->port); + $context = new BindContext(); - $context = (new BindContext()) - ->withReusePort() - ->withBacklog(self::DEFAULT_TCP_BACKLOG) - ; + if ($reusePort) { + $context = $context->withReusePort(); + } + + if ($backlog > 0) { + $context = $context->withBacklog($backlog); + } if ($listen->tls) { \assert($listen->tlsCertificate !== null); - $context = $context->withTlsContext( - (new ServerTlsContext())->withDefaultCertificate(new Certificate($listen->tlsCertificate, $listen->tlsCertificateKey)), - ); + $cert = new Certificate($listen->tlsCertificate, $listen->tlsCertificateKey); + $tlsContext = (new ServerTlsContext())->withDefaultCertificate($cert); + $context = $context->withTlsContext($tlsContext); } return [$internetAddress, $context]; diff --git a/src/BundledPlugin/HttpServer/Listen.php b/src/BundledPlugin/HttpServer/Listen.php index 4dfcd30..e30cca0 100644 --- a/src/BundledPlugin/HttpServer/Listen.php +++ b/src/BundledPlugin/HttpServer/Listen.php @@ -31,4 +31,10 @@ public function __construct( throw new \InvalidArgumentException('Certificate file must be provided'); } } + + public function getAddress(): string + { + return ($this->tls ? 'https://' : 'http://') . $this->host . + (($this->tls && $this->port === 443) || (!$this->tls && $this->port === 80) ? '' : ':' . $this->port); + } } diff --git a/src/BundledPlugin/Metrics/Counter.php b/src/BundledPlugin/Metrics/Counter.php new file mode 100644 index 0000000..1155d70 --- /dev/null +++ b/src/BundledPlugin/Metrics/Counter.php @@ -0,0 +1,49 @@ + $labels + * @throws LabelsNotMatchException + */ + public function inc(array $labels = []): void + { + $this->add(1, $labels); + } + + /** + * @param array $labels + * @throws LabelsNotMatchException + */ + public function add(int $value, array $labels = []): void + { + $this->checkLabels($labels); + + $key = \hash('xxh128', \json_encode($labels)); + $this->buffer[$key] ??= [0, '']; + $buffer = &$this->buffer[$key][0]; + $callbackId = &$this->buffer[$key][1]; + $buffer += $value; + + if ($callbackId !== '') { + return; + } + + $callbackId = EventLoop::delay(self::FLUSH_TIMEOUT, function() use($labels, &$buffer, $key) { + $value = $buffer; + unset($this->buffer[$key]); + $this->messageBus->dispatch(new IncreaseCounterMessage($this->namespace, $this->name, $labels, $value)); + }); + } +} diff --git a/src/BundledPlugin/Metrics/Exception/LabelsNotMatchException.php b/src/BundledPlugin/Metrics/Exception/LabelsNotMatchException.php new file mode 100644 index 0000000..a2dff68 --- /dev/null +++ b/src/BundledPlugin/Metrics/Exception/LabelsNotMatchException.php @@ -0,0 +1,21 @@ + $labels + * @throws LabelsNotMatchException + */ + public function set(float $value, array $labels = []): void + { + $this->checkLabels($labels); + + $key = \hash('xxh128', \json_encode($labels).'set'); + $this->buffer[$key] ??= [0, '']; + $buffer = &$this->buffer[$key][0]; + $callbackId = &$this->buffer[$key][1]; + $buffer = $value; + + if ($callbackId !== '') { + return; + } + + $callbackId = EventLoop::delay(self::FLUSH_TIMEOUT, function() use($labels, &$buffer, $key) { + $value = $buffer; + unset($this->buffer[$key]); + $this->messageBus->dispatch(new SetGaugeMessage($this->namespace, $this->name, $labels, $value, false)); + }); + } + + /** + * @param array $labels + * @throws LabelsNotMatchException + */ + public function inc(array $labels = []): void + { + $this->add(1, $labels); + } + + /** + * @param array $labels + * @throws LabelsNotMatchException + */ + public function dec(array $labels = []): void + { + $this->add(-1, $labels); + } + + /** + * @param array $labels + * @throws LabelsNotMatchException + */ + public function add(float $value, array $labels = []): void + { + $this->checkLabels($labels); + + $key = \hash('xxh128', \json_encode($labels).'add'); + $this->buffer[$key] ??= [0, '']; + $buffer = &$this->buffer[$key][0]; + $callbackId = &$this->buffer[$key][1]; + $buffer += $value; + + if ($callbackId !== '') { + return; + } + + $callbackId = EventLoop::delay(self::FLUSH_TIMEOUT, function() use($labels, &$buffer, $key) { + $value = $buffer; + unset($this->buffer[$key]); + $this->messageBus->dispatch(new SetGaugeMessage($this->namespace, $this->name, $labels, $value, true)); + }); + } + + /** + * @param array $labels + * @throws LabelsNotMatchException + */ + public function sub(float $value, array $labels = []): void + { + $this->add(-$value, $labels); + } +} diff --git a/src/BundledPlugin/Metrics/Histogram.php b/src/BundledPlugin/Metrics/Histogram.php new file mode 100644 index 0000000..ba0e306 --- /dev/null +++ b/src/BundledPlugin/Metrics/Histogram.php @@ -0,0 +1,92 @@ + $labels + * @throws LabelsNotMatchException + */ + public function observe(float $value, array $labels = []): void + { + $this->checkLabels($labels); + + $key = \hash('xxh128', \json_encode($labels)); + $this->buffer[$key] ??= [[], '']; + $buffer = &$this->buffer[$key][0]; + $callbackId = &$this->buffer[$key][1]; + $buffer[] = $value; + + if ($callbackId !== '') { + return; + } + + $callbackId = EventLoop::delay(self::FLUSH_TIMEOUT, function() use($labels, &$buffer, $key) { + $values = $buffer; + unset($this->buffer[$key]); + $this->messageBus->dispatch(new ObserveHistorgamMessage($this->namespace, $this->name, $labels, $values)); + }); + } + + /** + * Creates count buckets, where the lowest bucket has an upper bound of start and each following bucket's upper + * bound is factor times the previous bucket's upper bound. + * The returned slice is meant to be used for the Buckets field of HistogramOpts. + * + * @return list + */ + public static function exponentialBuckets(float $start, float $factor, int $count): array + { + $start > 0 ?: throw new \InvalidArgumentException('$start must be a positive integer'); + $factor > 0 ?: throw new \InvalidArgumentException('$factor must greater than 1'); + $count >= 1 ?: throw new \InvalidArgumentException('$count must be a positive integer'); + + $buckets = []; + for ($i = 0; $i < $count; $i++) { + $buckets[] = $start; + $start *= $factor; + } + + return $buckets; + } + + /** + * Creates count regular buckets, each width wide, where the lowest bucket has an upper bound of start. + * The returned slice is meant to be used for the Buckets field of HistogramOpts. + * + * @return list + */ + public static function linearBuckets(float $start, float $width, int $count): array + { + $width > 0 ?: throw new \InvalidArgumentException('$width must greater than 1'); + $count >= 1 ?: throw new \InvalidArgumentException('$count must be a positive integer'); + + $buckets = []; + for ($i = 0; $i < $count; $i++) { + $buckets[] = $start; + $start += $width; + } + + return $buckets; + } + + /** + * Creates default buckets. + * + * @return list + */ + public static function defaultBuckets(): array + { + return [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]; + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/GetMetricMessage.php b/src/BundledPlugin/Metrics/Internal/Message/GetMetricMessage.php new file mode 100644 index 0000000..7284624 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/GetMetricMessage.php @@ -0,0 +1,45 @@ + + */ +final readonly class GetMetricMessage implements MessageInterface +{ + public const TYPE_COUNTER = 'counter'; + public const TYPE_GAUGE = 'gauge'; + public const TYPE_HISTOGRAM = 'histogram'; + public const TYPE_SUMMARY = 'summary'; + + private function __construct( + public string $type, + public string $namespace, + public string $name, + ) { + } + + public static function counter(string $namespace, string $name): self + { + return new self(self::TYPE_COUNTER, $namespace, $name); + } + + public static function gauge(string $namespace, string $name): self + { + return new self(self::TYPE_GAUGE, $namespace, $name); + } + + public static function histogram(string $namespace, string $name): self + { + return new self(self::TYPE_HISTOGRAM, $namespace, $name); + } + + public static function summary(string $namespace, string $name): self + { + return new self(self::TYPE_SUMMARY, $namespace, $name); + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/GetMetricResponse.php b/src/BundledPlugin/Metrics/Internal/Message/GetMetricResponse.php new file mode 100644 index 0000000..0f95d5f --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/GetMetricResponse.php @@ -0,0 +1,17 @@ + + */ +final readonly class IncreaseCounterMessage implements MessageInterface +{ + public function __construct( + public string $namespace, + public string $name, + public array $labels, + public int $value, + ) { + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/ObserveHistorgamMessage.php b/src/BundledPlugin/Metrics/Internal/Message/ObserveHistorgamMessage.php new file mode 100644 index 0000000..5da7486 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/ObserveHistorgamMessage.php @@ -0,0 +1,25 @@ + + */ +final readonly class ObserveHistorgamMessage implements MessageInterface +{ + /** + * @param array $labels + * @param list $values + */ + public function __construct( + public string $namespace, + public string $name, + public array $labels, + public array $values, + ) { + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/ObserveSummaryMessage.php b/src/BundledPlugin/Metrics/Internal/Message/ObserveSummaryMessage.php new file mode 100644 index 0000000..80abecd --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/ObserveSummaryMessage.php @@ -0,0 +1,25 @@ + + */ +final readonly class ObserveSummaryMessage implements MessageInterface +{ + /** + * @param array $labels + * @param list $values + */ + public function __construct( + public string $namespace, + public string $name, + public array $labels, + public array $values, + ) { + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/RegisterMetricMessage.php b/src/BundledPlugin/Metrics/Internal/Message/RegisterMetricMessage.php new file mode 100644 index 0000000..db7f0fb --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/RegisterMetricMessage.php @@ -0,0 +1,48 @@ + + */ +final readonly class RegisterMetricMessage implements MessageInterface +{ + public const TYPE_COUNTER = 'counter'; + public const TYPE_GAUGE = 'gauge'; + public const TYPE_HISTOGRAM = 'histogram'; + public const TYPE_SUMMARY = 'summary'; + + private function __construct( + public string $type, + public string $namespace, + public string $name, + public string $help, + public array $labels, + public array|null $buckets = null, + ) { + } + + public static function counter(string $namespace, string $name, string $help, array $labels): self + { + return new self(self::TYPE_COUNTER, $namespace, $name, $help, $labels); + } + + public static function gauge(string $namespace, string $name, string $help, array $labels): self + { + return new self(self::TYPE_GAUGE, $namespace, $name, $help, $labels); + } + + public static function histogram(string $namespace, string $name, string $help, array $labels, array $buckets): self + { + return new self(self::TYPE_HISTOGRAM, $namespace, $name, $help, $labels, $buckets); + } + + public static function summary(string $namespace, string $name, string $help, array $labels, array|null $buckets = null): self + { + return new self(self::TYPE_SUMMARY, $namespace, $name, $help, $labels, $buckets); + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Message/SetGaugeMessage.php b/src/BundledPlugin/Metrics/Internal/Message/SetGaugeMessage.php new file mode 100644 index 0000000..4c3cdb3 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Message/SetGaugeMessage.php @@ -0,0 +1,22 @@ + + */ +final readonly class SetGaugeMessage implements MessageInterface +{ + public function __construct( + public string $namespace, + public string $name, + public array $labels, + public float $value, + public bool $increase = false, + ) { + } +} diff --git a/src/BundledPlugin/Metrics/Internal/MessageBusRegistry.php b/src/BundledPlugin/Metrics/Internal/MessageBusRegistry.php new file mode 100644 index 0000000..5389ab7 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/MessageBusRegistry.php @@ -0,0 +1,141 @@ +map[$key])) { + return $this->map[$key]; + } + + $this->messageBus->dispatch(RegisterMetricMessage::counter($namespace, $name, $help, $labels))->await(); + + return $this->map[$key] = new Counter($this->messageBus, $namespace, $name, $help, $labels); + } + + public function getCounter(string $namespace, string $name): Counter + { + $key = \hash('xxh128', 'counter'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + /** @var GetMetricResponse|false $answer */ + $answer = $this->messageBus->dispatch(GetMetricMessage::counter($namespace, $name))->await(); + + if ($answer === false) { + throw new MetricNotFoundException('counter', $namespace, $name); + } + + return $this->map[$key] = new Counter($this->messageBus, $namespace, $name, $answer->help, $answer->labels); + } + + public function registerGauge(string $namespace, string $name, string $help, array $labels = []): Gauge + { + $key = \hash('xxh128', 'gauge'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + $this->messageBus->dispatch(RegisterMetricMessage::gauge($namespace, $name, $help, $labels))->await(); + + return $this->map[$key] = new Gauge($this->messageBus, $namespace, $name, $help, $labels); + } + + public function getGauge(string $namespace, string $name): Gauge + { + $key = \hash('xxh128', 'gauge'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + /** @var GetMetricResponse|false $answer */ + $answer = $this->messageBus->dispatch(GetMetricMessage::gauge($namespace, $name))->await(); + + if ($answer === false) { + throw new MetricNotFoundException('gauge', $namespace, $name); + } + + return $this->map[$key] = new Gauge($this->messageBus, $namespace, $name, $answer->help, $answer->labels); + } + + public function registerHistogram(string $namespace, string $name, string $help, array $labels = [], array|null $buckets = null): Histogram + { + $key = \hash('xxh128', 'histogram'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + $buckets ??= Histogram::defaultBuckets(); + $this->messageBus->dispatch(RegisterMetricMessage::histogram($namespace, $name, $help, $labels, $buckets))->await(); + + return $this->map[$key] = new Histogram($this->messageBus, $namespace, $name, $help, $labels); + } + + public function getHistogram(string $namespace, string $name): Histogram + { + $key = \hash('xxh128', 'histogram'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + /** @var GetMetricResponse|false $answer */ + $answer = $this->messageBus->dispatch(GetMetricMessage::histogram($namespace, $name))->await(); + + if ($answer === false) { + throw new MetricNotFoundException('histogram', $namespace, $name); + } + + return $this->map[$key] = new Histogram($this->messageBus, $namespace, $name, $answer->help, $answer->labels); + } + + public function registerSummary(string $namespace, string $name, string $help, array $labels = [], array|null $quantiles = null): Summary + { + $key = \hash('xxh128', 'summary'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + $quantiles ??= Summary::getDefaultQuantiles(); + $this->messageBus->dispatch(RegisterMetricMessage::summary($namespace, $name, $help, $labels, $quantiles))->await(); + + return $this->map[$key] = new Summary($this->messageBus, $namespace, $name, $help, $labels); + } + + public function getSummary(string $namespace, string $name): Summary + { + $key = \hash('xxh128', 'summary'.$namespace.$name); + if (isset($this->map[$key])) { + return $this->map[$key]; + } + + /** @var GetMetricResponse|false $answer */ + $answer = $this->messageBus->dispatch(GetMetricMessage::summary($namespace, $name))->await(); + + if ($answer === false) { + throw new MetricNotFoundException('summary', $namespace, $name); + } + + return $this->map[$key] = new Summary($this->messageBus, $namespace, $name, $answer->help, $answer->labels); + } +} diff --git a/src/BundledPlugin/Metrics/Internal/MessageBusRegistryHandler.php b/src/BundledPlugin/Metrics/Internal/MessageBusRegistryHandler.php new file mode 100644 index 0000000..3a52a13 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/MessageBusRegistryHandler.php @@ -0,0 +1,115 @@ +registry = new CollectorRegistry(new InMemory(), false); + + $messageHandler->subscribe(RegisterMetricMessage::class, weakClosure($this->registerMetric(...))); + $messageHandler->subscribe(GetMetricMessage::class, weakClosure($this->getMetric(...))); + $messageHandler->subscribe(IncreaseCounterMessage::class, weakClosure($this->increaseCounter(...))); + $messageHandler->subscribe(SetGaugeMessage::class, weakClosure($this->setGauge(...))); + $messageHandler->subscribe(ObserveHistorgamMessage::class, weakClosure($this->observeHistogram(...))); + $messageHandler->subscribe(ObserveSummaryMessage::class, weakClosure($this->observeSummary(...))); + } + + private function registerMetric(RegisterMetricMessage $message): bool + { + try { + match ($message->type) { + RegisterMetricMessage::TYPE_COUNTER => $this->registry->registerCounter( + $message->namespace, $message->name, $message->help, $message->labels, + ), + RegisterMetricMessage::TYPE_GAUGE => $this->registry->registerGauge( + $message->namespace, $message->name, $message->help, $message->labels, + ), + RegisterMetricMessage::TYPE_HISTOGRAM => $this->registry->registerHistogram( + $message->namespace, $message->name, $message->help, $message->labels, $message->buckets, + ), + RegisterMetricMessage::TYPE_SUMMARY => $this->registry->registerSummary( + $message->namespace, $message->name, $message->help, $message->labels, 600, $message->buckets, + ), + }; + } catch (PrometheusMetricRegistrationException) { + return false; + } + + return true; + } + + private function getMetric(GetMetricMessage $message): GetMetricResponse|false + { + try { + $counter = match ($message->type) { + GetMetricMessage::TYPE_COUNTER => $this->registry->getCounter($message->namespace, $message->name), + GetMetricMessage::TYPE_GAUGE => $this->registry->getGauge($message->namespace, $message->name), + GetMetricMessage::TYPE_HISTOGRAM => $this->registry->getHistogram($message->namespace, $message->name), + GetMetricMessage::TYPE_SUMMARY => $this->registry->getSummary($message->namespace, $message->name), + }; + } catch (PrometheusMetricNotFoundException) { + return false; + } + + return new GetMetricResponse($message->type, $message->namespace, $message->name, $counter->getHelp(), $counter->getLabelNames()); + } + + private function increaseCounter(IncreaseCounterMessage $message): void + { + $counter = $this->registry->getCounter($message->namespace, $message->name); + $labels = [...\array_flip($counter->getLabelNames()), ...$message->labels]; + $counter->incBy($message->value, $labels); + } + + private function setGauge(SetGaugeMessage $message): void + { + $gauge = $this->registry->getGauge($message->namespace, $message->name); + $labels = [...\array_flip($gauge->getLabelNames()), ...$message->labels]; + $message->increase ? $gauge->incBy($message->value, $labels) : $gauge->set($message->value, $labels); + } + + private function observeHistogram(ObserveHistorgamMessage $message): void + { + $histogram = $this->registry->getHistogram($message->namespace, $message->name); + $labels = [...\array_flip($histogram->getLabelNames()), ...$message->labels]; + foreach ($message->values as $value) { + $histogram->observe($value, $labels); + } + } + + private function observeSummary(ObserveSummaryMessage $message): void + { + $summary = $this->registry->getSummary($message->namespace, $message->name); + $labels = [...\array_flip($summary->getLabelNames()), ...$message->labels]; + foreach ($message->values as $value) { + $summary->observe($value, $labels); + } + } + + public function render(): string + { + $renderer = new RenderTextFormat(); + + return $renderer->render($this->registry->getMetricFamilySamples()); + } +} diff --git a/src/BundledPlugin/Metrics/Internal/Metric.php b/src/BundledPlugin/Metrics/Internal/Metric.php new file mode 100644 index 0000000..ff06892 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/Metric.php @@ -0,0 +1,36 @@ +labelsCount = \count($this->labels); + } + + /** + * @throws LabelsNotMatchException + */ + protected final function checkLabels(array $labels = []): void + { + $assignedLabels = \array_keys($labels); + if ($this->labelsCount !== \count($assignedLabels) || \array_diff($this->labels, $assignedLabels) !== []) { + throw new LabelsNotMatchException($this->labels, $assignedLabels); + } + } +} diff --git a/src/BundledPlugin/Metrics/Internal/NotFoundPage.php b/src/BundledPlugin/Metrics/Internal/NotFoundPage.php new file mode 100644 index 0000000..48a2b94 --- /dev/null +++ b/src/BundledPlugin/Metrics/Internal/NotFoundPage.php @@ -0,0 +1,48 @@ +status = $status; + $this->reason = HttpStatus::getReason($status); + $this->server = Server::getVersionString(); + } + + public function toHtml(): string + { + return << + + + {$this->status} {$this->reason} + + + +

{$this->status} {$this->reason}

+
Prometheus metrics link: /metrics
+
+
{$this->server}
+ + + HTML; + } +} diff --git a/src/BundledPlugin/Metrics/MetricsPlugin.php b/src/BundledPlugin/Metrics/MetricsPlugin.php new file mode 100644 index 0000000..e303d55 --- /dev/null +++ b/src/BundledPlugin/Metrics/MetricsPlugin.php @@ -0,0 +1,97 @@ +masterContainer->register(RegistryInterface::class, static function (Container $container): RegistryInterface { + return new MessageBusRegistry($container->get('bus')); + }); + + $this->workerContainer->register(RegistryInterface::class, static function (Container $container): RegistryInterface { + return new MessageBusRegistry($container->get('bus')); + }); + } + + public function start(): void + { + $listen = \is_string($this->listen) ? new Listen($this->listen) : $this->listen; + + /** @var LoggerInterface $logger */ + $logger = &$this->masterContainer->get('logger'); + $handler = &$this->masterContainer->get('handler'); + + $nullLogger = new NullLogger(); + $serverSocketFactory = new ResourceServerSocketFactory(); + $clientFactory = new SocketClientFactory($nullLogger); + $errorHandler = new HttpErrorHandler($nullLogger); + $socketHttpServer = new SocketHttpServer( + logger: $nullLogger, + serverSocketFactory: $serverSocketFactory, + clientFactory: $clientFactory, + allowedMethods: ['GET'], + httpDriverFactory: new DefaultHttpDriverFactory(logger: $nullLogger), + ); + + $socketHttpServer->expose(...HttpServer::createInternetAddressAndContext($listen)); + + $messageBusRegistryHandler = new MessageBusRegistryHandler($handler); + + EventLoop::defer(function () use ($logger, $socketHttpServer, $messageBusRegistryHandler, $errorHandler, $listen) { + $requestHandler = $this->createRequestHandler($messageBusRegistryHandler); + $socketHttpServer->start($requestHandler, $errorHandler); + $logger->info(\sprintf('Prometheus metrics available on %s/metrics', $listen->getAddress())); + }); + } + + private function createRequestHandler(MessageBusRegistryHandler $messageBusRegistryHandler): RequestHandler + { + return new class($messageBusRegistryHandler) implements RequestHandler + { + public function __construct(private readonly MessageBusRegistryHandler $messageBusRegistryHandler) + { + } + + public function handleRequest(Request $request): Response + { + if ($request->getUri()->getPath() !== '/metrics') { + return new Response(body: (new NotFoundPage())->toHtml(), status: HttpStatus::NOT_FOUND); + } + + $result = $this->messageBusRegistryHandler->render(); + $headers = ['content-type' => 'text/plain; version=0.0.4']; + + return new Response(body: $result, headers: $headers); + } + }; + } +} diff --git a/src/BundledPlugin/Metrics/RegistryInterface.php b/src/BundledPlugin/Metrics/RegistryInterface.php new file mode 100644 index 0000000..a7ab4f5 --- /dev/null +++ b/src/BundledPlugin/Metrics/RegistryInterface.php @@ -0,0 +1,52 @@ + $labels + */ + public function registerCounter(string $namespace, string $name, string $help, array $labels = []): Counter; + + /** + * @throws MetricNotFoundException + */ + public function getCounter(string $namespace, string $name): Counter; + + /** + * @param list $labels + */ + public function registerGauge(string $namespace, string $name, string $help, array $labels = []): Gauge; + + /** + * @throws MetricNotFoundException + */ + public function getGauge(string $namespace, string $name): Gauge; + + /** + * @param list $labels + * @param null|list $buckets + */ + public function registerHistogram(string $namespace, string $name, string $help, array $labels = [], array|null $buckets = null): Histogram; + + /** + * @throws MetricNotFoundException + */ + public function getHistogram(string $namespace, string $name): Histogram; + + /** + * @param list $labels + * @param null|list> $quantiles + */ + public function registerSummary(string $namespace, string $name, string $help, array $labels = [], array|null $quantiles = null): Summary; + + /** + * @throws MetricNotFoundException + */ + public function getSummary(string $namespace, string $name): Summary; +} diff --git a/src/BundledPlugin/Metrics/Summary.php b/src/BundledPlugin/Metrics/Summary.php new file mode 100644 index 0000000..a782101 --- /dev/null +++ b/src/BundledPlugin/Metrics/Summary.php @@ -0,0 +1,50 @@ + $labels + * @throws LabelsNotMatchException + */ + public function observe(float $value, array $labels = []): void + { + $this->checkLabels($labels); + + $key = \hash('xxh128', \json_encode($labels)); + $this->buffer[$key] ??= [[], '']; + $buffer = &$this->buffer[$key][0]; + $callbackId = &$this->buffer[$key][1]; + $buffer[] = $value; + + if ($callbackId !== '') { + return; + } + + $callbackId = EventLoop::delay(self::FLUSH_TIMEOUT, function() use($labels, &$buffer, $key) { + $values = $buffer; + unset($this->buffer[$key]); + $this->messageBus->dispatch(new ObserveSummaryMessage($this->namespace, $this->name, $labels, $values)); + }); + } + + /** + * Creates default quantiles. + * + * @return list> + */ + public static function getDefaultQuantiles(): array + { + return [0.01, 0.05, 0.5, 0.95, 0.99]; + } +} diff --git a/src/Internal/MasterProcess.php b/src/Internal/MasterProcess.php index 1d7882c..530a71c 100644 --- a/src/Internal/MasterProcess.php +++ b/src/Internal/MasterProcess.php @@ -199,7 +199,7 @@ private function start(): void }); foreach ($this->plugins as $plugin) { - EventLoop::queue(function () use ($plugin) { + EventLoop::defer(function () use ($plugin) { $plugin->start(); }); } diff --git a/src/Process.php b/src/Process.php index eac85a2..891ec29 100644 --- a/src/Process.php +++ b/src/Process.php @@ -106,7 +106,9 @@ final public function run(Container $workerContainer): int ]))->await(); $this->start(); if ($this->onStart !== null) { - ($this->onStart)($this); + EventLoop::queue(function () { + ($this->onStart)($this); + }); } $this->status = Status::RUNNING; $this->startingFuture->complete();