Skip to content

Commit

Permalink
Faucet Routes (#19692)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
leecchh authored Oct 15, 2024
1 parent e510867 commit 8aac6c2
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 50 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ttl_cache.workspace = true
eyre.workspace = true
tempfile.workspace = true
parking_lot.workspace = true
tonic.workspace = true

sui-json-rpc-types.workspace = true
sui-types.workspace = true
Expand All @@ -38,6 +39,7 @@ telemetry-subscribers.workspace = true
typed-store.workspace = true
shared-crypto.workspace = true
async-recursion.workspace = true
mysten-network.workspace = true

[dev-dependencies]
test-cluster.workspace = true
Expand Down
90 changes: 74 additions & 16 deletions crates/sui-faucet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use mysten_network::metrics::MetricsCallbackProvider;
use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use std::time::Duration;
use tonic::Code;

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on

/// Metrics relevant to the requests coming into the service
#[derive(Clone, Debug)]
pub struct RequestMetrics {
pub(crate) total_requests_received: IntCounter,
pub(crate) total_requests_succeeded: IntCounter,
pub(crate) total_requests_shed: IntCounter,
pub(crate) total_requests_failed: IntCounter,
pub(crate) total_requests_disconnected: IntCounter,
pub(crate) current_requests_in_flight: IntGauge,
pub(crate) process_latency: Histogram,
pub(crate) total_requests_received: IntCounterVec,
pub(crate) total_requests_succeeded: IntCounterVec,
pub(crate) total_requests_shed: IntCounterVec,
pub(crate) total_requests_failed: IntCounterVec,
pub(crate) total_requests_disconnected: IntCounterVec,
pub(crate) current_requests_in_flight: IntGaugeVec,
pub(crate) process_latency: HistogramVec,
}

/// Metrics relevant to the running of the service
Expand All @@ -38,46 +42,53 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[
impl RequestMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
total_requests_received: register_int_counter_with_registry!(
total_requests_received: register_int_counter_vec_with_registry!(
"total_requests_received",
"Total number of requests received in Faucet",
&["path"],
registry,
)
.unwrap(),
total_requests_succeeded: register_int_counter_with_registry!(
total_requests_succeeded: register_int_counter_vec_with_registry!(
"total_requests_succeeded",
"Total number of requests processed successfully in Faucet",
&["path"],
registry,
)
.unwrap(),
total_requests_shed: register_int_counter_with_registry!(
total_requests_shed: register_int_counter_vec_with_registry!(
"total_requests_shed",
"Total number of requests that were dropped because the service was saturated",
&["path"],
registry,
)
.unwrap(),
total_requests_failed: register_int_counter_with_registry!(
total_requests_failed: register_int_counter_vec_with_registry!(
"total_requests_failed",
"Total number of requests that started but failed with an uncaught error",
&["path"],
registry,
)
.unwrap(),
total_requests_disconnected: register_int_counter_with_registry!(
total_requests_disconnected: register_int_counter_vec_with_registry!(
"total_requests_disconnected",
"Total number of requests where the client disconnected before the service \
returned a response",
&["path"],
registry,
)
.unwrap(),
current_requests_in_flight: register_int_gauge_with_registry!(
current_requests_in_flight: register_int_gauge_vec_with_registry!(
"current_requests_in_flight",
"Current number of requests being processed in Faucet",
&["path"],
registry,
)
.unwrap(),
process_latency: register_histogram_with_registry!(
process_latency: register_histogram_vec_with_registry!(
"process_latency",
"Latency of processing a Faucet request",
&["path"],
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
Expand Down Expand Up @@ -121,3 +132,50 @@ impl FaucetMetrics {
}
}
}

impl MetricsCallbackProvider for RequestMetrics {
fn on_request(&self, path: String) {
self.total_requests_received
.with_label_values(&[path.as_str()])
.inc();
}

fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
self.process_latency
.with_label_values(&[path.as_str()])
.observe(latency.as_secs_f64());

match grpc_status_code {
Code::Ok => {
self.total_requests_succeeded
.with_label_values(&[path.as_str()])
.inc();
}
Code::Unavailable | Code::ResourceExhausted => {
self.total_requests_shed
.with_label_values(&[path.as_str()])
.inc();
}
_ => {
self.total_requests_failed
.with_label_values(&[path.as_str()])
.inc();
}
}
}

fn on_start(&self, path: &str) {
self.current_requests_in_flight
.with_label_values(&[path])
.inc();
}

fn on_drop(&self, path: &str) {
self.total_requests_disconnected
.with_label_values(&[path])
.inc();
self.current_requests_in_flight
.with_label_values(&[path])
.dec();
}
}
115 changes: 81 additions & 34 deletions crates/sui-faucet/src/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tower::{load_shed::error::Overloaded, BoxError, Layer, Service, ServiceExt};
use tracing::{error, info, warn};

use crate::metrics::RequestMetrics;
use http::Request;

/// Tower Layer for tracking metrics in Prometheus related to number, success-rate and latency of
/// requests running through service.
Expand All @@ -35,6 +36,7 @@ pub struct RequestMetricsFuture<Res> {
struct MetricsGuard {
timer: Option<HistogramTimer>,
metrics: Arc<RequestMetrics>,
path: String,
}

impl RequestMetricsLayer {
Expand All @@ -55,11 +57,14 @@ impl<Inner> Layer<Inner> for RequestMetricsLayer {
}
}

impl<Inner, Req, Body> Service<Req> for RequestMetricsService<Inner>
impl<Inner, Body> Service<Request<Body>> for RequestMetricsService<Inner>
where
Inner: Service<Req, Response = http::Response<Body>, Error = BoxError> + Clone + Send + 'static,
Inner: Service<Request<Body>, Response = http::Response<Body>, Error = BoxError>
+ Clone
+ Send
+ 'static,
Inner::Future: Send,
Req: Send + 'static,
Body: Send + 'static,
{
type Response = Inner::Response;
type Error = BoxError;
Expand All @@ -69,18 +74,19 @@ where
self.inner.poll_ready(ctx)
}

fn call(&mut self, req: Req) -> Self::Future {
let metrics = MetricsGuard::new(self.metrics.clone());
fn call(&mut self, req: Request<Body>) -> Self::Future {
let path = req.uri().path().to_string();
let metrics = MetricsGuard::new(self.metrics.clone(), &path);
let inner = self.inner.clone();

let future = Box::pin(async move {
let resp = inner.oneshot(req).await;
match &resp {
Result::Ok(resp) if !resp.status().is_success() => {
Ok(resp) if !resp.status().is_success() => {
metrics.failed(None, Some(resp.status()))
}
Result::Ok(_) => metrics.succeeded(),
Result::Err(err) => {
Ok(_) => metrics.succeeded(),
Err(err) => {
if err.is::<Overloaded>() {
metrics.shed();
} else {
Expand All @@ -104,54 +110,95 @@ impl<Res> Future for RequestMetricsFuture<Res> {
}

impl MetricsGuard {
fn new(metrics: Arc<RequestMetrics>) -> Self {
metrics.total_requests_received.inc();
metrics.current_requests_in_flight.inc();
fn new(metrics: Arc<RequestMetrics>, path: &str) -> Self {
metrics
.total_requests_received
.with_label_values(&[path])
.inc();
metrics
.current_requests_in_flight
.with_label_values(&[path])
.inc();
MetricsGuard {
timer: Some(metrics.process_latency.start_timer()),
timer: Some(
metrics
.process_latency
.with_label_values(&[path])
.start_timer(),
),
metrics,
path: path.to_string(),
}
}

fn succeeded(mut self) {
let elapsed = self.timer.take().unwrap().stop_and_record();
self.metrics.total_requests_succeeded.inc();
info!("Request succeeded in {:.2}s", elapsed);
if let Some(timer) = self.timer.take() {
let elapsed = timer.stop_and_record();
self.metrics
.total_requests_succeeded
.with_label_values(&[&self.path])
.inc();
info!(
"Request succeeded for path {} in {:.2}s",
self.path, elapsed
);
}
}

fn failed(mut self, error: Option<&BoxError>, status: Option<StatusCode>) {
let elapsed = self.timer.take().unwrap().stop_and_record();
let code = status
.map(|c| c.as_str().to_string())
.unwrap_or_else(|| "no_code".to_string());
self.metrics.total_requests_failed.inc();

if let Some(err) = error {
error!(
"Request failed in {:.2}s, error {:?}, code {}",
elapsed, err, code
);
} else {
warn!("Request failed in {:.2}s, code: {}", elapsed, code);
if let Some(timer) = self.timer.take() {
let elapsed = timer.stop_and_record();
self.metrics
.total_requests_failed
.with_label_values(&[&self.path])
.inc();

if let Some(err) = error {
error!(
"Request failed for path {} in {:.2}s, error {:?}",
self.path, elapsed, err
);
} else if let Some(status) = status {
error!(
"Request failed for path {} in {:.2}s with status: {}",
self.path, elapsed, status
);
} else {
warn!("Request failed for path {} in {:.2}s", self.path, elapsed);
}
}
}

fn shed(mut self) {
let elapsed = self.timer.take().unwrap().stop_and_record();
self.metrics.total_requests_shed.inc();
info!("Request shed in {:.2}s", elapsed);
if let Some(timer) = self.timer.take() {
let elapsed = timer.stop_and_record();
self.metrics
.total_requests_shed
.with_label_values(&[&self.path])
.inc();
info!("Request shed for path {} in {:.2}s", self.path, elapsed);
}
}
}

impl Drop for MetricsGuard {
fn drop(&mut self) {
self.metrics.current_requests_in_flight.dec();
self.metrics
.current_requests_in_flight
.with_label_values(&[&self.path])
.dec();

// Request was still in flight when the guard was dropped, implying the client disconnected.
if let Some(timer) = self.timer.take() {
let elapsed = timer.stop_and_record();
self.metrics.total_requests_disconnected.inc();
info!("Request disconnected in {:.2}s", elapsed);
self.metrics
.total_requests_disconnected
.with_label_values(&[&self.path])
.inc();
info!(
"Request disconnected for path {} in {:.2}s",
self.path, elapsed
);
}
}
}

0 comments on commit 8aac6c2

Please sign in to comment.