diff --git a/Cargo.lock b/Cargo.lock index f6fc2d77..fb53a41d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1422,12 +1422,15 @@ name = "obstore" version = "0.5.0-beta.1" dependencies = [ "arrow", + "async-trait", "bytes", "chrono", "futures", "http", "indexmap", "object_store", + "parking_lot", + "pin-project", "pyo3", "pyo3-arrow", "pyo3-async-runtimes", @@ -1527,6 +1530,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" diff --git a/obstore/Cargo.toml b/obstore/Cargo.toml index f0917f55..596724a9 100644 --- a/obstore/Cargo.toml +++ b/obstore/Cargo.toml @@ -19,12 +19,15 @@ crate-type = ["cdylib"] [dependencies] arrow = "54.2.1" +async-trait = "0.1.83" bytes = { workspace = true } chrono = { workspace = true } futures = { workspace = true } http = { workspace = true } indexmap = { workspace = true } object_store = { workspace = true } +parking_lot = "0.12" +pin-project = "1.1.7" pyo3 = { workspace = true, features = ["chrono"] } pyo3-arrow = "0.7.2" pyo3-async-runtimes = { workspace = true, features = ["tokio-runtime"] } diff --git a/obstore/src/lib.rs b/obstore/src/lib.rs index 9662bcda..fb62cf18 100644 --- a/obstore/src/lib.rs +++ b/obstore/src/lib.rs @@ -8,6 +8,7 @@ mod delete; mod get; mod head; mod list; +mod metrics; mod path; mod put; mod rename; @@ -84,5 +85,7 @@ fn _obstore(py: Python, m: &Bound) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(signer::sign_async))?; m.add_wrapped(wrap_pyfunction!(signer::sign))?; + m.add_class::()?; + Ok(()) } diff --git a/obstore/src/metrics/iox_time.rs b/obstore/src/metrics/iox_time.rs new file mode 100644 index 00000000..99db55f9 --- /dev/null +++ b/obstore/src/metrics/iox_time.rs @@ -0,0 +1,741 @@ +use chrono::{DateTime, DurationRound, TimeDelta, TimeZone, Timelike, Utc}; +use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock}; +use std::{ + fmt::{Debug, Display}, + future::Future, + ops::{Add, Sub}, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker}, + time::Duration, +}; + +/// A UTC Timestamp returned by a [`TimeProvider`] +/// +/// Purposefully does not provide [`std::convert::From`] implementations +/// as intended to be an opaque type returned by a `TimeProvider` - the construction methods +/// provided are intended for serialization/deserialization and tests only +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct Time(DateTime); + +impl Add for Time { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + let duration = chrono::Duration::from_std(rhs).unwrap(); + Self(self.0 + duration) + } +} + +impl Sub for Time { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + let duration = chrono::Duration::from_std(rhs).unwrap(); + Self(self.0 - duration) + } +} + +impl Debug for Time { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } +} + +impl std::fmt::Display for Time { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_rfc3339()) + } +} + +impl Time { + pub const MAX: Self = Self(DateTime::::MAX_UTC); + pub const MIN: Self = Self(DateTime::::MIN_UTC); + + /// Makes a new `Time` from the number of non-leap nanoseconds + /// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp"). + pub fn from_timestamp_nanos(nanos: i64) -> Self { + Self(Utc.timestamp_nanos(nanos)) + } + + /// Makes a new `Time` from the number of non-leap microseconds + /// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp"). + pub fn from_timestamp_micros(micros: i64) -> Option { + Utc.timestamp_micros(micros).single().map(Self) + } + + /// Makes a new `DateTime` from the number of non-leap milliseconds + /// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp"). + pub fn from_timestamp_millis(millis: i64) -> Option { + Utc.timestamp_millis_opt(millis).single().map(Self) + } + + /// Makes a new `Time` from the number of non-leap seconds + /// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp") + /// and the number of nanoseconds since the last whole non-leap second. + pub fn from_timestamp(secs: i64, nanos: u32) -> Option { + Utc.timestamp_opt(secs, nanos).single().map(Self) + } + + /// Makes a new `Time` from the provided [`DateTime`] + pub const fn from_datetime(time: DateTime) -> Self { + Self(time) + } + + /// Returns an RFC 3339 and ISO 8601 date and time string such as `1996-12-19T16:39:57+00:00`. + pub fn to_rfc3339(&self) -> String { + self.0.to_rfc3339() + } + + /// Parses data from RFC 3339 format. + pub fn from_rfc3339(s: &str) -> Result> { + Ok(Self(DateTime::::from( + DateTime::parse_from_rfc3339(s).map_err(Box::new)?, + ))) + } + + /// Returns the number of non-leap-nanoseconds since January 1, 1970 UTC + pub fn timestamp_nanos(&self) -> i64 { + // TODO: ensure that this can never over-/underflow + self.0.timestamp_nanos_opt().expect("nanos in range") + } + + /// Returns the number of seconds since January 1, 1970 UTC + pub fn timestamp(&self) -> i64 { + self.0.timestamp() + } + + /// Returns the hour number from 0 to 23. + pub fn hour(&self) -> u32 { + self.0.hour() + } + + /// Returns the minute number from 0 to 59. + pub fn minute(&self) -> u32 { + self.0.minute() + } + + /// Returns the second number from 0 to 59. + pub fn second(&self) -> u32 { + self.0.second() + } + + /// Returns the number of nanoseconds since the last second boundary + pub fn timestamp_subsec_nanos(&self) -> u32 { + self.0.timestamp_subsec_nanos() + } + + /// Returns the number of non-leap-milliseconds since January 1, 1970 UTC + pub fn timestamp_millis(&self) -> i64 { + self.0.timestamp_millis() + } + + /// Returns the duration since the provided time or None if it would be negative + pub fn checked_duration_since(&self, other: Self) -> Option { + self.0.signed_duration_since(other.0).to_std().ok() + } + + /// Adds given [`Duration`] to the current date and time. + /// + /// Returns `None` if it would result in overflow + pub fn checked_add(&self, duration: Duration) -> Option { + let duration = chrono::Duration::from_std(duration).ok()?; + Some(Self(self.0.checked_add_signed(duration)?)) + } + + /// Subtracts the given [`Duration`] from the current date and time. + /// + /// Returns `None` if it would result in overflow + pub fn checked_sub(&self, duration: Duration) -> Option { + let duration = chrono::Duration::from_std(duration).ok()?; + Some(Self(self.0.checked_sub_signed(duration)?)) + } + + /// Returns `Time` as a [`DateTime`] + pub fn date_time(&self) -> DateTime { + self.0 + } + + /// Returns a new instance truncated to the most recently passed hour. + /// + /// # Example + /// + /// ``` + /// let time = iox_time::Time::from_timestamp_nanos(1723129005000000000); + /// assert_eq!(time.to_rfc3339(), "2024-08-08T14:56:45+00:00"); + /// let truncated = time.truncate_to_hour().unwrap(); + /// assert_eq!(truncated.to_rfc3339(), "2024-08-08T14:00:00+00:00"); + /// ``` + pub fn truncate_to_hour(&self) -> Result { + self.0.duration_trunc(TimeDelta::hours(1)).map(Self) + } +} + +pub trait TimeProvider: Debug + Display + Send + Sync + 'static { + /// Returns the current `Time`. No guarantees are made about monotonicity + fn now(&self) -> Time; + + /// Sleep for the given duration. + fn sleep(&self, d: Duration) -> Pin + Send + 'static>> { + self.sleep_until(self.now() + d) + } + + /// Sleep until given time. + fn sleep_until(&self, t: Time) -> Pin + Send + 'static>>; + + /// Return a time that is the specified number of minutes in the future relative to this + /// provider's `now`. + fn minutes_into_future(&self, minutes: u64) -> Time { + self.now() + Duration::from_secs(60 * minutes) + } + + /// Return a time that is the specified number of minutes in the past relative to this + /// provider's `now`. + fn minutes_ago(&self, minutes_ago: u64) -> Time { + self.now() - Duration::from_secs(60 * minutes_ago) + } + + /// Return a time that is the specified number of hours in the past relative to this provider's + /// `now`. + fn hours_ago(&self, hours_ago: u64) -> Time { + self.now() - Duration::from_secs(60 * 60 * hours_ago) + } +} + +/// A [`TimeProvider`] that uses [`Utc::now`] as a clock source +#[derive(Debug, Default, Clone, Copy)] +pub struct SystemProvider {} + +impl SystemProvider { + pub fn new() -> Self { + Self::default() + } +} + +impl Display for SystemProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "System") + } +} + +impl TimeProvider for SystemProvider { + fn now(&self) -> Time { + Time(Utc::now()) + } + + fn sleep_until(&self, t: Time) -> Pin + Send + 'static>> { + let d = t.checked_duration_since(TimeProvider::now(self)); + + Box::pin(async move { + if let Some(d) = d { + tokio::time::sleep(d).await; + } + }) + } +} + +/// Internal state fo [`MockProvider`] +#[derive(Debug)] +struct MockProviderInner { + now: Time, + waiting: Vec, +} + +/// A [`TimeProvider`] that returns a fixed `Time` that can be set by [`MockProvider::set`] +#[derive(Debug, Clone)] +pub struct MockProvider { + inner: Arc>, +} + +impl MockProvider { + pub fn new(start: Time) -> Self { + Self { + inner: Arc::new(RwLock::new(MockProviderInner { + now: start, + waiting: vec![], + })), + } + } + + pub fn set(&self, time: Time) { + let mut inner = self.inner.write(); + inner.now = time; + for waiter in inner.waiting.drain(..) { + waiter.wake() + } + } + + pub fn inc(&self, duration: Duration) -> Time { + let mut inner = self.inner.write(); + inner.now = inner.now + duration; + for waiter in inner.waiting.drain(..) { + waiter.wake() + } + inner.now + } +} + +impl Display for MockProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Mock") + } +} + +impl TimeProvider for MockProvider { + fn now(&self) -> Time { + self.inner.read().now + } + + fn sleep_until(&self, t: Time) -> Pin + Send + 'static>> { + Box::pin(MockSleep { + inner: Arc::clone(&self.inner), + deadline: t, + }) + } +} + +struct MockSleep { + inner: Arc>, + deadline: Time, +} + +impl Future for MockSleep { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.inner.upgradable_read(); + if inner.now >= self.deadline { + Poll::Ready(()) + } else { + let mut inner = RwLockUpgradableReadGuard::upgrade(inner); + inner.waiting.push(cx.waker().clone()); + Poll::Pending + } + } +} + +impl TimeProvider for Arc +where + T: TimeProvider + ?Sized, +{ + fn now(&self) -> Time { + (**self).now() + } + + fn sleep(&self, d: Duration) -> Pin + Send + 'static>> { + (**self).sleep(d) + } + + fn sleep_until(&self, t: Time) -> Pin + Send + 'static>> { + (**self).sleep_until(t) + } +} + +/// An asynchronous version of [`TimeProvider`]. This trait provides +/// the same functionality as `TimeProvider` - but the idea is that +/// looking at the clock is async. +pub trait AsyncTimeProvider: Debug + Display + Send + Sync + 'static { + type Error: std::error::Error + Send + Sync + 'static; + + /// Returns the current `Time`. No guarantees are made about monotonicity + fn now(&self) -> impl std::future::Future> + Send; +} + +impl AsyncTimeProvider for SystemProvider { + type Error = std::convert::Infallible; + + async fn now(&self) -> Result { + Ok(TimeProvider::now(self)) + } +} + +impl AsyncTimeProvider for MockProvider { + type Error = std::convert::Infallible; + + async fn now(&self) -> Result { + Ok(TimeProvider::now(self)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_system_provider_now() { + let provider = SystemProvider::new(); + let (a, async_a) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + std::thread::sleep(Duration::from_secs(1)); + let (b, async_b) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + let (c, async_c) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + + let delta = b.checked_duration_since(a).unwrap(); + let async_delta = async_b.checked_duration_since(async_a).unwrap(); + + // Assert that the deltas are within a millisecond of each other, + // they should be relatively close but not necessarily equal. + assert!(delta.abs_diff(async_delta) <= Duration::from_millis(1)); + + assert!(delta > Duration::from_millis(500)); + assert!(delta < Duration::from_secs(5)); + assert!(b <= c); + + assert!(async_delta > Duration::from_millis(500)); + assert!(async_delta < Duration::from_secs(5)); + assert!(async_b <= async_c); + } + + #[tokio::test] + async fn test_system_provider_sleep() { + let provider = SystemProvider::new(); + + let (a, async_a) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + TimeProvider::sleep(&provider, Duration::from_secs(1)).await; + let (b, async_b) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + + let delta = b.checked_duration_since(a).unwrap(); + let async_delta = async_b.checked_duration_since(async_a).unwrap(); + + // Assert that the deltas are within a millisecond of each other, + // they should be relatively close but not necessarily equal. + assert!(delta.abs_diff(async_delta) <= Duration::from_millis(1)); + assert!(delta > Duration::from_millis(500)); + assert!(delta < Duration::from_secs(5)); + + assert!(async_delta > Duration::from_millis(500)); + assert!(async_delta < Duration::from_secs(5)); + } + + #[tokio::test] + async fn test_system_provider_sleep_until() { + let provider = SystemProvider::new(); + + let (a, async_a) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + TimeProvider::sleep_until(&provider, a + Duration::from_secs(1)).await; + let (b, async_b) = ( + TimeProvider::now(&provider), + AsyncTimeProvider::now(&provider).await.unwrap(), + ); + + let delta = b.checked_duration_since(a).unwrap(); + let async_delta = async_b.checked_duration_since(async_a).unwrap(); + + // Assert that the deltas are within a millisecond of each other, + // they should be relatively close but not necessarily equal. + assert!(delta.abs_diff(async_delta) <= Duration::from_millis(1)); + assert!(delta > Duration::from_millis(500)); + assert!(delta < Duration::from_secs(5)); + + assert!(async_delta > Duration::from_millis(500)); + assert!(async_delta < Duration::from_secs(5)); + } + + #[tokio::test] + async fn test_mock_provider_now() { + let provider = MockProvider::new(Time::from_timestamp_nanos(0)); + assert_eq!(TimeProvider::now(&provider).timestamp_nanos(), 0); + assert_eq!( + AsyncTimeProvider::now(&provider).await.unwrap(), + TimeProvider::now(&provider) + ); + assert_eq!( + AsyncTimeProvider::now(&provider) + .await + .unwrap() + .timestamp_nanos(), + 0 + ); + + provider.set(Time::from_timestamp_nanos(12)); + assert_eq!(TimeProvider::now(&provider).timestamp_nanos(), 12); + assert_eq!( + AsyncTimeProvider::now(&provider).await.unwrap(), + TimeProvider::now(&provider) + ); + assert_eq!( + AsyncTimeProvider::now(&provider) + .await + .unwrap() + .timestamp_nanos(), + 12 + ); + } + + #[tokio::test] + async fn test_mock_provider_sleep() { + let provider = MockProvider::new(Time::from_timestamp_nanos(0)); + + // not sleeping finishes instantly + provider.sleep(Duration::from_secs(0)).await; + + // ==== sleep with `inc` ==== + let fut = provider.sleep(Duration::from_millis(100)); + let handle = tokio::task::spawn(async move { + fut.await; + }); + + // does not finish immediately + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when not incremented enough + provider.inc(Duration::from_millis(50)); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // finishes once incremented at least to the duration + provider.inc(Duration::from_millis(50)); + handle.await.unwrap(); + + // finishes also when "overshooting" the duration + let fut = provider.sleep(Duration::from_millis(100)); + let handle = tokio::task::spawn(async move { + fut.await; + }); + provider.inc(Duration::from_millis(101)); + handle.await.unwrap(); + + // ==== sleep with `set` ==== + provider.set(Time::from_timestamp_millis(100).unwrap()); + let fut = provider.sleep(Duration::from_millis(100)); + let handle = tokio::task::spawn(async move { + fut.await; + }); + + // does not finish immediately + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when time goes backwards + provider.set(Time::from_timestamp_millis(0).unwrap()); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when time goes forward but not enough + provider.set(Time::from_timestamp_millis(150).unwrap()); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // finishes when time is set at least to the wait duration + provider.set(Time::from_timestamp_millis(200).unwrap()); + handle.await.unwrap(); + + // also finishes when "overshooting" + let fut = provider.sleep(Duration::from_millis(100)); + let handle = tokio::task::spawn(async move { + fut.await; + }); + provider.set(Time::from_timestamp_millis(301).unwrap()); + handle.await.unwrap(); + } + + #[tokio::test] + async fn test_mock_provider_sleep_until() { + let provider = MockProvider::new(Time::from_timestamp_nanos(0)); + + // not sleeping finishes instantly + provider.sleep(Duration::from_secs(0)).await; + + // ==== sleep with `inc` ==== + let fut = provider.sleep_until(Time::from_timestamp_millis(100).unwrap()); + let handle = tokio::task::spawn(async move { + fut.await; + }); + + // does not finish immediately + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when not incremented enough + provider.inc(Duration::from_millis(50)); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // finishes once incremented at least to the duration + provider.inc(Duration::from_millis(50)); + handle.await.unwrap(); + + // finishes also when "overshooting" the duration + let fut = provider.sleep_until(Time::from_timestamp_millis(200).unwrap()); + let handle = tokio::task::spawn(async move { + fut.await; + }); + provider.inc(Duration::from_millis(101)); + handle.await.unwrap(); + + // ==== sleep with `set` ==== + provider.set(Time::from_timestamp_millis(100).unwrap()); + let fut = provider.sleep_until(Time::from_timestamp_millis(200).unwrap()); + let handle = tokio::task::spawn(async move { + fut.await; + }); + + // does not finish immediately + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when time goes backwards + provider.set(Time::from_timestamp_millis(0).unwrap()); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // does not finish when time goes forward but not enough + provider.set(Time::from_timestamp_millis(150).unwrap()); + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(!handle.is_finished()); + + // finishes when time is set at least to the wait duration + provider.set(Time::from_timestamp_millis(200).unwrap()); + handle.await.unwrap(); + + // also finishes when "overshooting" + let fut = provider.sleep_until(Time::from_timestamp_millis(300).unwrap()); + let handle = tokio::task::spawn(async move { + fut.await; + }); + provider.set(Time::from_timestamp_millis(301).unwrap()); + handle.await.unwrap(); + } + + #[test] + fn test_time() { + let verify = |date_time: DateTime| { + let time = Time::from_datetime(date_time); + + assert_eq!(time.hour(), date_time.hour()); + assert_eq!(time.minute(), date_time.minute()); + assert_eq!(time.second(), date_time.second()); + + assert_eq!(time.date_time(), date_time); + assert_eq!( + time, + Time::from_timestamp(date_time.timestamp(), date_time.timestamp_subsec_nanos()) + .unwrap(), + ); + assert_eq!( + time, + Time::from_timestamp_nanos(date_time.timestamp_nanos_opt().unwrap()) + ); + assert_eq!( + Time::from_timestamp_millis(date_time.timestamp_millis()).unwrap(), + Time::from_datetime( + Utc.timestamp_millis_opt(date_time.timestamp_millis()) + .unwrap() + ) + ); + + assert_eq!( + time.timestamp_nanos(), + date_time.timestamp_nanos_opt().unwrap() + ); + assert_eq!(time.timestamp_millis(), date_time.timestamp_millis()); + assert_eq!(time.to_rfc3339(), date_time.to_rfc3339()); + + let duration = Duration::from_millis(265367345); + + assert_eq!( + time + duration, + Time::from_datetime(date_time + chrono::Duration::from_std(duration).unwrap()) + ); + + assert_eq!( + time - duration, + Time::from_datetime(date_time - chrono::Duration::from_std(duration).unwrap()) + ); + + assert_eq!(time, Time::from_rfc3339(&time.to_rfc3339()).unwrap()); + }; + + verify(Utc.timestamp_nanos(3406960448958394583)); + verify(Utc.timestamp_nanos(0)); + verify(Utc.timestamp_nanos(-3659396346346)); + } + + #[test] + fn test_overflow() { + let time = Time::MAX; + assert!(time.checked_add(Duration::from_nanos(1)).is_none()); + assert!(time.checked_sub(Duration::from_nanos(1)).is_some()); + + let time = Time::MIN; + assert!(time.checked_add(Duration::from_nanos(1)).is_some()); + assert!(time.checked_sub(Duration::from_nanos(1)).is_none()); + + let duration = Duration::from_millis(i64::MAX as u64 + 1); + + let time = Time::from_timestamp_nanos(0); + assert!(chrono::Duration::from_std(duration).is_err()); + assert!(time.checked_add(duration).is_none()); + assert!(time.checked_sub(duration).is_none()); + } + + #[test] + fn test_duration_since() { + assert_eq!( + Time::from_timestamp_nanos(5056) + .checked_duration_since(Time::from_timestamp_nanos(-465)) + .unwrap(), + Duration::from_nanos(5056 + 465) + ); + + assert!(Time::MAX.checked_duration_since(Time::MIN).is_some()); + + assert!(Time::from_timestamp_nanos(505) + .checked_duration_since(Time::from_timestamp_nanos(506)) + .is_none()); + } + + #[tokio::test] + async fn test_minutes_ago() { + let now = "2022-07-07T00:00:00+00:00"; + let ago = "2022-07-06T22:38:00+00:00"; + + let provider = MockProvider::new(Time::from_rfc3339(now).unwrap()); + + let t = TimeProvider::minutes_ago(&provider, 82); + assert_eq!(t, Time::from_timestamp_nanos(1657147080000000000)); + assert_eq!(t.to_rfc3339(), ago); + } + + #[tokio::test] + async fn test_minutes_into_future() { + let now = "2022-07-07T00:00:00+00:00"; + let future = "2022-07-07T00:10:00+00:00"; + + let provider = MockProvider::new(Time::from_rfc3339(now).unwrap()); + + let t = TimeProvider::minutes_into_future(&provider, 10); + assert_eq!(t, Time::from_timestamp_nanos(1657152600000000000)); + assert_eq!(t.to_rfc3339(), future); + } + + #[tokio::test] + async fn test_hours_ago() { + let now = "2022-07-07T00:00:00+00:00"; + let ago = "2022-07-03T14:00:00+00:00"; + + let provider = MockProvider::new(Time::from_rfc3339(now).unwrap()); + + let t = TimeProvider::hours_ago(&provider, 82); + assert_eq!(t, Time::from_timestamp_nanos(1656856800000000000)); + assert_eq!(t.to_rfc3339(), ago); + } +} diff --git a/obstore/src/metrics/metric/counter.rs b/obstore/src/metrics/metric/counter.rs new file mode 100644 index 00000000..f9b3e0b9 --- /dev/null +++ b/obstore/src/metrics/metric/counter.rs @@ -0,0 +1,86 @@ +use super::{MetricKind, MetricObserver, Observation}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/// A monotonic counter. +/// +/// A [`U64Counter`]` is an internally reference counted type, and all mutations +/// to cloned instances mutate the same underlying counter. +#[derive(Debug, Clone, Default)] +pub struct U64Counter { + state: Arc, +} + +impl U64Counter { + pub fn inc(&self, count: u64) { + self.state.fetch_add(count, Ordering::Relaxed); + } + + pub fn fetch(&self) -> u64 { + self.state.load(Ordering::Relaxed) + } +} + +impl MetricObserver for U64Counter { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::U64Counter + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::U64Counter(self.fetch()) + } +} + +/// A concise helper to assert the value of a metric counter, regardless of underlying type. +#[macro_export] +macro_rules! assert_counter { + ( + $metrics:expr, + $counter:ty, + $name:expr, + $(labels = $attr:expr,)* + $(value = $value:expr,)* + ) => { + // Default to an empty set of attributes if not specified. + #[allow(unused)] + let mut attr = None; + $(attr = Some($attr);)* + let attr = attr.unwrap_or_else(|| metric::Attributes::from(&[])); + + let counter = $metrics + .get_instrument::>($name) + .expect("failed to find metric with provided name") + .get_observer(&attr) + .expect("failed to find metric with provided attributes") + .fetch(); + + $(assert_eq!(counter, $value, "counter value mismatch");)* + }; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_counter() { + let counter = U64Counter::default(); + assert_eq!(counter.fetch(), 0); + counter.inc(12); + assert_eq!(counter.fetch(), 12); + counter.inc(34); + assert_eq!(counter.fetch(), 46); + + assert_eq!(counter.observe(), Observation::U64Counter(46)); + + // Expect counter to wrap around + counter.inc(u64::MAX); + assert_eq!(counter.observe(), Observation::U64Counter(45)); + } +} diff --git a/obstore/src/metrics/metric/duration.rs b/obstore/src/metrics/metric/duration.rs new file mode 100644 index 00000000..9bba15bb --- /dev/null +++ b/obstore/src/metrics/metric/duration.rs @@ -0,0 +1,331 @@ +use std::time::Duration; + +use super::{ + HistogramObservation, MakeMetricObserver, MetricKind, MetricObserver, Observation, + ObservationBucket, U64Counter, U64Gauge, U64Histogram, +}; + +use std::convert::TryInto; + +/// The maximum duration that can be stored in the duration measurements +pub const DURATION_MAX: Duration = Duration::from_nanos(u64::MAX); + +/// A monotonic counter of `std::time::Duration` +#[derive(Debug, Clone, Default)] +pub(crate) struct DurationCounter { + inner: U64Counter, +} + +impl DurationCounter { + pub fn inc(&self, duration: Duration) { + self.inner.inc( + duration + .as_nanos() + .try_into() + .expect("cannot fit duration into u64"), + ) + } + + pub fn fetch(&self) -> Duration { + Duration::from_nanos(self.inner.fetch()) + } +} + +impl MetricObserver for DurationCounter { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::DurationCounter + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::DurationCounter(self.fetch()) + } +} + +/// An observation of a single `std::time::Duration` +/// +/// NOTE: If the same `DurationGauge` is used in multiple locations, e.g. a non-unique set +/// of attributes is provided to `Metric::recorder`, the reported value +/// will oscillate between those reported by the separate locations +#[derive(Debug, Clone, Default)] +pub(crate) struct DurationGauge { + inner: U64Gauge, +} + +impl DurationGauge { + pub fn set(&self, value: Duration) { + self.inner.set( + value + .as_nanos() + .try_into() + .expect("cannot fit duration into u64"), + ) + } + + pub fn fetch(&self) -> Duration { + Duration::from_nanos(self.inner.fetch()) + } +} + +impl MetricObserver for DurationGauge { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::DurationGauge + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::DurationGauge(self.fetch()) + } +} + +/// A `DurationHistogram` provides bucketed observations of `Durations` +/// +/// This provides insight into the distribution beyond a simple count or total +#[derive(Debug, Clone)] +pub(crate) struct DurationHistogram { + inner: U64Histogram, +} + +impl DurationHistogram { + pub fn fetch(&self) -> HistogramObservation { + let inner = self.inner.fetch(); + + HistogramObservation { + total: Duration::from_nanos(inner.total), + buckets: inner + .buckets + .into_iter() + .map(|bucket| ObservationBucket { + le: Duration::from_nanos(bucket.le), + count: bucket.count, + }) + .collect(), + } + } + + pub fn record(&self, value: Duration) { + self.record_multiple(value, 1) + } + + pub fn record_multiple(&self, value: Duration, count: u64) { + self.inner.record_multiple( + value + .as_nanos() + .try_into() + .expect("cannot fit duration into u64"), + count, + ) + } + + pub fn reset(&self) { + self.inner.reset(); + } + + pub fn percentile(&self, percentile: u64) -> Duration { + Duration::from_nanos(self.inner.percentile(percentile)) + } +} + +/// `DurationHistogramOptions` allows configuring the buckets used by `DurationHistogram` +#[derive(Debug, Clone)] +pub(crate) struct DurationHistogramOptions { + buckets: Vec, +} + +impl DurationHistogramOptions { + /// Create a new `DurationHistogramOptions` with a list of thresholds to delimit the buckets + pub fn new(thresholds: impl IntoIterator) -> Self { + let mut buckets: Vec<_> = thresholds.into_iter().collect(); + buckets.sort_unstable(); + Self { buckets } + } +} + +impl Default for DurationHistogramOptions { + fn default() -> Self { + Self { + buckets: vec![ + Duration::from_millis(1), + Duration::from_micros(2_500), + Duration::from_millis(5), + Duration::from_millis(10), + Duration::from_millis(25), + Duration::from_millis(50), + Duration::from_millis(100), + Duration::from_millis(250), + Duration::from_millis(500), + Duration::from_millis(1000), + Duration::from_millis(2500), + Duration::from_millis(5000), + Duration::from_millis(10000), + DURATION_MAX, + ], + } + } +} + +impl MakeMetricObserver for DurationHistogram { + type Options = DurationHistogramOptions; + + fn create(options: &DurationHistogramOptions) -> Self { + Self { + inner: U64Histogram::new(options.buckets.iter().map(|duration| { + duration + .as_nanos() + .try_into() + .expect("cannot fit duration into u64") + })), + } + } +} + +impl MetricObserver for DurationHistogram { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::DurationHistogram + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::DurationHistogram(self.fetch()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_gauge() { + let gauge = DurationGauge::default(); + + assert_eq!( + gauge.observe(), + Observation::DurationGauge(Duration::from_nanos(0)) + ); + + gauge.set(Duration::from_nanos(10002)); + assert_eq!( + gauge.observe(), + Observation::DurationGauge(Duration::from_nanos(10002)) + ); + + gauge.set(Duration::from_nanos(12)); + assert_eq!( + gauge.observe(), + Observation::DurationGauge(Duration::from_nanos(12)) + ); + + let r2 = gauge.recorder(); + + gauge.set(Duration::from_secs(12)); + assert_eq!( + gauge.observe(), + Observation::DurationGauge(Duration::from_secs(12)) + ); + + std::mem::drop(r2); + + assert_eq!( + gauge.observe(), + Observation::DurationGauge(Duration::from_secs(12)) + ); + } + + #[test] + fn test_counter() { + let counter = DurationCounter::default(); + assert_eq!(counter.fetch(), Duration::from_nanos(0)); + counter.inc(Duration::from_nanos(120)); + assert_eq!(counter.fetch(), Duration::from_nanos(120)); + counter.inc(Duration::from_secs(1)); + assert_eq!(counter.fetch(), Duration::from_nanos(1_000_000_120)); + + assert_eq!( + counter.observe(), + Observation::DurationCounter(Duration::from_nanos(1_000_000_120)) + ) + } + + #[test] + #[should_panic(expected = "cannot fit duration into u64: TryFromIntError(())")] + fn test_bucket_overflow() { + let options = DurationHistogramOptions::new([Duration::MAX]); + DurationHistogram::create(&options); + } + + #[test] + #[should_panic(expected = "cannot fit duration into u64: TryFromIntError(())")] + fn test_record_overflow() { + let histogram = DurationHistogram::create(&Default::default()); + histogram.record(Duration::MAX); + } + + #[test] + fn test_histogram() { + let buckets = [ + Duration::from_millis(10), + Duration::from_millis(15), + Duration::from_millis(100), + DURATION_MAX, + ]; + + let options = DurationHistogramOptions::new(buckets); + let histogram = DurationHistogram::create(&options); + + let buckets = |expected: &[u64; 4], total| -> Observation { + Observation::DurationHistogram(HistogramObservation { + total, + buckets: expected + .iter() + .cloned() + .zip(buckets) + .map(|(count, le)| ObservationBucket { le, count }) + .collect(), + }) + }; + + assert_eq!( + histogram.observe(), + buckets(&[0, 0, 0, 0], Duration::from_millis(0)) + ); + + histogram.record(Duration::from_millis(20)); + assert_eq!( + histogram.observe(), + buckets(&[0, 0, 1, 0], Duration::from_millis(20)) + ); + + histogram.record(Duration::from_millis(0)); + assert_eq!( + histogram.observe(), + buckets(&[1, 0, 1, 0], Duration::from_millis(20)) + ); + + histogram.record(DURATION_MAX); + + // Expect total to overflow and wrap around + assert_eq!( + histogram.observe(), + buckets( + &[1, 0, 1, 1], + Duration::from_millis(20) - Duration::from_nanos(1) + ) + ); + } +} diff --git a/obstore/src/metrics/metric/gauge.rs b/obstore/src/metrics/metric/gauge.rs new file mode 100644 index 00000000..4dfedd4b --- /dev/null +++ b/obstore/src/metrics/metric/gauge.rs @@ -0,0 +1,107 @@ +use super::{MetricKind, MetricObserver, Observation}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/// An observation of a single u64 value +/// +/// NOTE: If the same `U64Gauge` is used in multiple locations, e.g. a non-unique set +/// of attributes is provided to `Metric::recorder`, the reported value +/// will oscillate between those reported by the separate locations +#[derive(Debug, Clone, Default)] +pub struct U64Gauge { + state: Arc, +} + +impl U64Gauge { + /// Sets the value of this U64Gauge + pub fn set(&self, value: u64) { + self.state.store(value, Ordering::Relaxed); + } + + /// Increments the value of this U64Gauge by the specified amount. + pub fn inc(&self, delta: u64) { + self.state.fetch_add(delta, Ordering::Relaxed); + } + + /// Decrements the value of this U64Gauge by the specified amount. + /// + /// # Underflow / Overflow + /// + /// This operation wraps around on over/underflow. + pub fn dec(&self, delta: u64) { + self.state.fetch_sub(delta, Ordering::Relaxed); + } + + /// Adjusts the value of this U64Gauge by the specified delta. + /// + /// # Underflow / Overflow + /// + /// This operation wraps around on over/underflow. + pub fn delta(&self, delta: i64) { + if delta > 0 { + self.inc(delta as _); + } else { + self.dec(delta.unsigned_abs()); + } + } + + /// Fetches the value of this U64Gauge + pub fn fetch(&self) -> u64 { + self.state.load(Ordering::Relaxed) + } +} + +impl MetricObserver for U64Gauge { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::U64Gauge + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::U64Gauge(self.fetch()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gauge() { + let gauge = U64Gauge::default(); + + assert_eq!(gauge.observe(), Observation::U64Gauge(0)); + + gauge.set(345); + assert_eq!(gauge.observe(), Observation::U64Gauge(345)); + + gauge.set(23); + assert_eq!(gauge.observe(), Observation::U64Gauge(23)); + + gauge.inc(10); + assert_eq!(gauge.observe(), Observation::U64Gauge(33)); + + gauge.dec(10); + assert_eq!(gauge.observe(), Observation::U64Gauge(23)); + + gauge.delta(19); + assert_eq!(gauge.observe(), Observation::U64Gauge(42)); + + gauge.delta(-19); + assert_eq!(gauge.observe(), Observation::U64Gauge(23)); + + let r2 = gauge.recorder(); + + r2.set(34); + assert_eq!(gauge.observe(), Observation::U64Gauge(34)); + + std::mem::drop(r2); + + assert_eq!(gauge.observe(), Observation::U64Gauge(34)); + } +} diff --git a/obstore/src/metrics/metric/histogram.rs b/obstore/src/metrics/metric/histogram.rs new file mode 100644 index 00000000..bfd015fc --- /dev/null +++ b/obstore/src/metrics/metric/histogram.rs @@ -0,0 +1,257 @@ +use super::{ + HistogramObservation, MakeMetricObserver, MetricKind, MetricObserver, Observation, + ObservationBucket, +}; +use parking_lot::Mutex; +use std::sync::Arc; + +/// Determines the bucketing used by the `U64Histogram` +#[derive(Debug, Clone)] +pub struct U64HistogramOptions { + buckets: Vec, +} + +impl U64HistogramOptions { + /// Create a new `U64HistogramOptions` with a list of thresholds to delimit the buckets + pub fn new(thresholds: impl IntoIterator) -> Self { + let mut buckets: Vec<_> = thresholds.into_iter().collect(); + buckets.sort_unstable(); + Self { buckets } + } +} + +/// A `U64Histogram` provides bucketed observations of u64 values +/// +/// This provides insight into the distribution of values beyond a simple count or total +#[derive(Debug, Clone)] +pub struct U64Histogram { + shared: Arc>>, +} + +impl U64Histogram { + pub(crate) fn new(sorted_buckets: impl Iterator) -> Self { + let buckets = sorted_buckets + .map(|le| ObservationBucket { + le, + count: Default::default(), + }) + .collect(); + + Self { + shared: Arc::new(Mutex::new(HistogramObservation { + total: Default::default(), + buckets, + })), + } + } + + pub fn fetch(&self) -> HistogramObservation { + self.shared.lock().clone() + } + + pub fn record(&self, value: u64) { + self.record_multiple(value, 1) + } + + pub fn record_multiple(&self, value: u64, count: u64) { + let mut state = self.shared.lock(); + if let Some(bucket) = state + .buckets + .iter_mut() + .find(|bucket| value <= bucket.le) + .as_mut() + { + bucket.count = bucket.count.wrapping_add(count); + state.total = state.total.wrapping_add(value * count); + } + } + + pub fn reset(&self) { + let mut state = self.shared.lock(); + for bucket in &mut state.buckets { + bucket.count = 0; + } + state.total = 0; + } + + /// percentile returns the bucket threshold for the given percentile. + /// For example, if you want the median value, percentile(50) will return the 'le' threshold + /// for the histogram bucket that contains the median sample. + /// + /// A use case for for this function is: + /// Use a histogram tracks the load placed on a system. + /// Set the buckets so they represent load levels of idle/low/medium/high/overloaded. + /// Then use percentile to determine how much of the time is spent at various load levels. + /// e.g. if percentile(50) comes come back with the low load threshold, the median load on the system is low + pub fn percentile(&self, percentile: u64) -> u64 { + let state = self.shared.lock(); + + // we need the total quantity of samples, not the sum of samples. + let total: u64 = state.buckets.iter().map(|bucket| bucket.count).sum(); + + let target = total * percentile / 100; + + let mut sum = 0; + for bucket in &state.buckets { + sum += bucket.count; + if sum >= target { + return bucket.le; + } + } + 0 + } +} + +impl MakeMetricObserver for U64Histogram { + type Options = U64HistogramOptions; + + fn create(options: &U64HistogramOptions) -> Self { + Self::new(options.buckets.iter().cloned()) + } +} + +impl MetricObserver for U64Histogram { + type Recorder = Self; + + fn kind() -> MetricKind { + MetricKind::U64Histogram + } + + fn recorder(&self) -> Self::Recorder { + self.clone() + } + + fn observe(&self) -> Observation { + Observation::U64Histogram(self.fetch()) + } +} + +/// A concise helper to assert the value of a metric histogram, regardless of underlying type. +#[macro_export] +macro_rules! assert_histogram { + ( + $metrics:expr, + $hist:ty, + $name:expr, + $(labels = $attr:expr,)* + $(samples = $samples:expr,)* + $(sum = $sum:expr,)* + ) => { + // Default to an empty set of attributes if not specified. + #[allow(unused)] + let mut attr = None; + $(attr = Some($attr);)* + let attr = attr.unwrap_or_else(|| metric::Attributes::from(&[])); + + let hist = $metrics + .get_instrument::>($name) + .expect("failed to find metric with provided name") + .get_observer(&attr) + .expect("failed to find metric with provided attributes") + .fetch(); + + $(assert_eq!(hist.sample_count(), $samples, "sample count mismatch");)* + $(assert_eq!(hist.total, $sum, "sum value mismatch");)* + }; +} + +#[cfg(test)] +mod tests { + use super::HistogramObservation; + use super::*; + + #[test] + fn test_histogram() { + let buckets = [20, 40, 50]; + let options = U64HistogramOptions::new(buckets); + let histogram = U64Histogram::create(&options); + + let buckets = |expected: &[u64; 3], total: u64| -> Observation { + Observation::U64Histogram(HistogramObservation { + total, + buckets: expected + .iter() + .cloned() + .zip(buckets) + .map(|(count, le)| ObservationBucket { le, count }) + .collect(), + }) + }; + + assert_eq!(histogram.observe(), buckets(&[0, 0, 0], 0)); + + histogram.record(30); + + assert_eq!(histogram.observe(), buckets(&[0, 1, 0], 30)); + + histogram.record(50); + + assert_eq!(histogram.observe(), buckets(&[0, 1, 1], 80)); + + histogram.record(51); + + // Exceeds max bucket - ignored + assert_eq!(histogram.observe(), buckets(&[0, 1, 1], 80)); + + histogram.record(0); + histogram.record(0); + + assert_eq!(histogram.observe(), buckets(&[2, 1, 1], 80)); + + // Now test the percentile reporting function + let options = U64HistogramOptions::new(vec![0, 1, 2, 4, 8, 16, 32, u64::MAX]); + let histogram = U64Histogram::create(&options); + + histogram.record(0); // bucket 0, le 0 + histogram.record(2); // bucket 2, le 2 + histogram.record(3); // bucket 3, le 4 + histogram.record(3); // bucket 3, le 4 + histogram.record(20); // bucket 6, le 32 + histogram.record(20000); // bucket 7, le u64::MAX + histogram.record(20000); // bucket 7, le u64::MAX + histogram.record(20000); // bucket 7, le u64::MAX + histogram.record(20000); // bucket 7, le u64::MAX + histogram.record(20000); // bucket 7, le u64::MAX + + // Of the 10 samples above: + // 1 (10%) is in bucket 0, le 0 + // 1 (10%) is in bucket 2, le 2 + // 2 (20%) are in bucket 3, le 4 + // 1 (10%) is in bucket 6, le 32 + // 5 (50%) are in bucket 7, le u64::MAX + + // request percentiles falling in bucket 0, le 0 + assert_eq!(histogram.percentile(3), 0); + assert_eq!(histogram.percentile(10), 0); + assert_eq!(histogram.percentile(19), 0); + + // request percentiles falling in bucket 2, le 2 + assert_eq!(histogram.percentile(20), 2); + assert_eq!(histogram.percentile(29), 2); + + // requests percentiles falling in bucket 3, le 4 + assert_eq!(histogram.percentile(30), 4); + assert_eq!(histogram.percentile(49), 4); + + // requests percentiles falling in bucket 6, le 32 + assert_eq!(histogram.percentile(50), 32); + assert_eq!(histogram.percentile(59), 32); + + // requests percentiles falling in bucket 6, le 32 + assert_eq!(histogram.percentile(60), u64::MAX); + assert_eq!(histogram.percentile(80), u64::MAX); + assert_eq!(histogram.percentile(100), u64::MAX); + + // test reset + histogram.reset(); + assert_eq!(histogram.percentile(100), 0); + histogram.record(1); // bucket 1, le 1 + histogram.record(2); // bucket 2, le 2 + histogram.record(3); // bucket 3, le 4 + histogram.record(3); // bucket 3, le 4 + assert_eq!(histogram.percentile(0), 0); + assert_eq!(histogram.percentile(25), 1); + assert_eq!(histogram.percentile(49), 1); + assert_eq!(histogram.percentile(50), 2); + } +} diff --git a/obstore/src/metrics/metric/metric.rs b/obstore/src/metrics/metric/metric.rs new file mode 100644 index 00000000..fd190f17 --- /dev/null +++ b/obstore/src/metrics/metric/metric.rs @@ -0,0 +1,337 @@ +use std::any::Any; +use std::collections::BTreeMap; +use std::sync::Arc; + +use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; + +use super::{Attributes, Instrument, MetricKind, Observation, Reporter}; + +/// A `Metric` collects `Observation` for each unique set of `Attributes` +/// +/// It is templated by `T: MetricObserver` which determines the type of +/// `Observation` made by this `Metric` along with its semantics +#[derive(Debug)] +pub struct Metric { + name: &'static str, + description: &'static str, + shared: Arc>, +} + +#[derive(Debug)] +struct MetricShared { + options: T::Options, + values: Mutex>, +} + +/// Manually implement Clone to avoid constraint T: Clone +impl Clone for Metric { + fn clone(&self) -> Self { + Self { + name: self.name, + description: self.description, + shared: Arc::clone(&self.shared), + } + } +} + +impl Metric { + pub(crate) fn new(name: &'static str, description: &'static str, options: T::Options) -> Self { + Self { + name, + description, + shared: Arc::new(MetricShared { + options, + values: Default::default(), + }), + } + } + + /// Retrieves a type that can be used to report observations for a given set of attributes + /// + /// If this is the first time this method has been called with this set of attributes, + /// it will initialize the corresponding `MetricObserver` with the default observation + /// + /// ``` + /// use ::metric::{U64Gauge, Registry, Metric}; + /// + /// let registry = Registry::new(); + /// let metric: Metric = registry.register_metric("metric_name", "description"); + /// + /// metric.recorder(&[("foo", "bar")]).set(34); + /// + /// let recorder = metric.recorder(&[("foo", "biz")]); + /// recorder.set(34); + /// + /// ``` + pub fn recorder(&self, attributes: impl Into) -> T::Recorder { + self.observer(attributes).recorder() + } + + /// Retrieves the observer for a given set of attributes + /// + /// If this is the first time this method has been called with this set of attributes, + /// it will initialize the corresponding `MetricObserver` with the default observation + pub fn observer(&self, attributes: impl Into) -> MappedMutexGuard<'_, T> { + MutexGuard::map(self.shared.values.lock(), |values| { + values + .entry(attributes.into()) + .or_insert_with(|| T::create(&self.shared.options)) + }) + } + + /// Gets the observer for a given set of attributes if one has + /// been registered by a call to `Metric::recorder` + /// + /// This is primarily useful for testing + pub fn get_observer(&self, attributes: &Attributes) -> Option> { + MutexGuard::try_map(self.shared.values.lock(), |values| { + values.get_mut(attributes) + }) + .ok() + } +} + +impl Instrument for Metric { + fn report(&self, reporter: &mut dyn Reporter) { + reporter.start_metric(self.name, self.description, T::kind()); + + let values = self.shared.values.lock(); + for (attributes, metric_value) in &*values { + reporter.report_observation(attributes, metric_value.observe()) + } + + reporter.finish_metric(); + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Types that wish to be used with `Metric` must implement this trait +/// that exposes the necessary reporting API +/// +/// `Metric` maintains a distinct `MetricObserver` for each unique set of `Attributes` +pub trait MetricObserver: MakeMetricObserver + std::fmt::Debug + Send + 'static { + /// The type that is used to modify the value reported by this MetricObserver + /// + /// Most commonly this will be `Self` but see `CumulativeGauge` for an example + /// of where it is not + type Recorder; + + /// The `MetricKind` reported by this `MetricObserver` + fn kind() -> MetricKind; + + /// Return a `Self::Recorder` that can be used to mutate the value reported + /// by this `MetricObserver` + fn recorder(&self) -> Self::Recorder; + + /// Return the current value for this + fn observe(&self) -> Observation; +} + +/// All `MetricObserver` must also implement `MakeMetricObserver` which defines +/// how to construct new instances of `Self` +/// +/// A blanket impl is provided for types that implement Default +/// +/// See `U64Histogram` for an example of how this is used +pub trait MakeMetricObserver { + type Options: Sized + Send + Sync + std::fmt::Debug; + + fn create(options: &Self::Options) -> Self; +} + +impl MakeMetricObserver for T { + type Options = (); + + fn create(_: &Self::Options) -> Self { + Default::default() + } +} + +/// In most cases the recorder for a `MetricObserver` is stateless, in fact in many cases +/// `MetricObserver::Recorder = Self`. This means applications wishing to record observations +/// for many different sets of attributes can just use `Metric` and construct reporters +/// dynamically +/// +/// ``` +/// use metric::{Registry, Metric, U64Gauge, Attributes}; +/// +/// let registry = Registry::new(); +/// let metric: Metric = registry.register_metric("foo", "description"); +/// +/// metric.recorder(&[("foo", "bar")]).set(21); +/// metric.recorder(&[("fiz", "bar")]).set(34); +/// +/// assert_eq!(metric.get_observer(&Attributes::from(&[("foo", "bar")])).unwrap().fetch(), 21); +/// assert_eq!(metric.get_observer(&Attributes::from(&[("fiz", "bar")])).unwrap().fetch(), 34); +/// ``` +/// +/// However, some `MetricObserver` are stateful, for example, `CumulativeGauge`. In this case +/// dropping the recorder clears any contribution it made to the metric's total +/// +/// ``` +/// use metric::{Registry, Metric, CumulativeGauge, Attributes}; +/// +/// let registry = Registry::new(); +/// let metric: Metric = registry.register_metric("foo", "description"); +/// +/// metric.recorder(&[("foo", "bar")]).set(21); +/// metric.recorder(&[("fiz", "bar")]).set(34); +/// +/// // Recorders dropped immediately and so they don't record anything! +/// assert_eq!(metric.get_observer(&Attributes::from(&[("foo", "bar")])).unwrap().fetch(), 0); +/// assert_eq!(metric.get_observer(&Attributes::from(&[("fiz", "bar")])).unwrap().fetch(), 0); +/// ``` +/// +/// `RecorderCollection` exists to address this situation, as unlike `Metric` it retains the +/// `MetricObserver::Recorder` and ensures they live as long as the `RecorderCollection` +/// +/// ``` +/// use metric::{Registry, Metric, CumulativeGauge, RecorderCollection, Attributes}; +/// +/// let registry = Registry::new(); +/// let metric: Metric = registry.register_metric("foo", "description"); +/// +/// let mut r1 = RecorderCollection::new(metric.clone()); +/// let mut r2 = RecorderCollection::new(metric.clone()); +/// +/// r1.recorder(&[("foo", "bar")]).set(21); +/// r1.recorder(&[("fiz", "bar")]).set(34); +/// r2.recorder(&[("foo", "bar")]).set(12); +/// +/// assert_eq!(metric.get_observer(&Attributes::from(&[("foo", "bar")])).unwrap().fetch(), 21 + 12); +/// assert_eq!(metric.get_observer(&Attributes::from(&[("fiz", "bar")])).unwrap().fetch(), 34); +/// +/// std::mem::drop(r1); +/// +/// assert_eq!(metric.get_observer(&Attributes::from(&[("foo", "bar")])).unwrap().fetch(), 12); +/// assert_eq!(metric.get_observer(&Attributes::from(&[("fiz", "bar")])).unwrap().fetch(), 0); +/// +/// std::mem::drop(r2); +/// +/// assert_eq!(metric.get_observer(&Attributes::from(&[("foo", "bar")])).unwrap().fetch(), 0); +/// assert_eq!(metric.get_observer(&Attributes::from(&[("fiz", "bar")])).unwrap().fetch(), 0); +/// ``` +/// +#[derive(Debug)] +pub struct RecorderCollection { + metric: Metric, + recorders: BTreeMap, +} + +impl RecorderCollection { + /// Create a new `RecorderCollection` from the provided `Metric` + pub fn new(metric: Metric) -> Self { + Self { + metric, + recorders: Default::default(), + } + } + + /// Create a new unregistered `RecorderCollection` from the provided options + pub fn new_unregistered_options(options: T::Options) -> Self { + Self { + metric: Metric::new("unregistered", "unregistered", options), + recorders: Default::default(), + } + } + + /// Retrieves a type that can be used to report observations for a given set of attributes + /// + /// The value returned is cached on this `RecorderCollection` and lives as long as it does + pub fn recorder(&mut self, attributes: impl Into) -> &mut T::Recorder { + let metric = &self.metric; + self.recorders + .entry(attributes.into()) + .or_insert_with_key(|key| metric.recorder(key.clone())) + } +} + +impl RecorderCollection +where + T::Options: Default, +{ + /// Create a new unregistered `RecorderCollection` with the default options + pub fn new_unregistered() -> Self { + Self::new_unregistered_options(Default::default()) + } +} + +/// A common grouping of `MetricObserver` for reporting on fallible code paths +#[derive(Debug, Clone)] +pub struct ResultMetric { + pub ok: T, + pub client_error: T, + pub server_error: T, + pub unexpected_response: T, +} + +impl ResultMetric +where + T: MetricObserver, +{ + pub fn new(metric: &Metric, mut attributes: Attributes) -> Self { + attributes.insert("status", "ok"); + let ok = metric.recorder(attributes.clone()); + + attributes.insert("status", "client_error"); + let client_error = metric.recorder(attributes.clone()); + + attributes.insert("status", "server_error"); + let server_error = metric.recorder(attributes.clone()); + + attributes.insert("status", "unexpected_response"); + let unexpected_response = metric.recorder(attributes); + + Self { + ok, + client_error, + server_error, + unexpected_response, + } + } +} + +#[cfg(test)] +mod tests { + use super::super::U64Counter; + use super::*; + + #[test] + fn test_metric() { + let metric: Metric = Metric::new("foo", "description", ()); + + let r1 = metric.recorder(&[("tag1", "val1"), ("tag2", "val2")]); + let r2 = metric.recorder(&[("tag1", "val1")]); + let r3 = metric.recorder(&[("tag1", "val2")]); + let r4 = metric.recorder(&[("tag1", "val1"), ("tag2", "val2")]); + + assert_eq!(r1.fetch(), 0); + assert_eq!(r2.fetch(), 0); + assert_eq!(r3.fetch(), 0); + assert_eq!(r4.fetch(), 0); + + r2.inc(32); + + assert_eq!(r1.fetch(), 0); + assert_eq!(r2.fetch(), 32); + assert_eq!(r3.fetch(), 0); + assert_eq!(r4.fetch(), 0); + + r1.inc(30); + + assert_eq!(r1.fetch(), 30); + assert_eq!(r2.fetch(), 32); + assert_eq!(r3.fetch(), 0); + assert_eq!(r4.fetch(), 30); + + r4.inc(21); + + assert_eq!(r1.fetch(), 51); + assert_eq!(r2.fetch(), 32); + assert_eq!(r3.fetch(), 0); + assert_eq!(r4.fetch(), 51); + } +} diff --git a/obstore/src/metrics/metric/mod.rs b/obstore/src/metrics/metric/mod.rs new file mode 100644 index 00000000..28bfc7f7 --- /dev/null +++ b/obstore/src/metrics/metric/mod.rs @@ -0,0 +1,456 @@ +use parking_lot::Mutex; +use std::any::Any; +use std::borrow::Cow; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; + +mod counter; +mod duration; +mod gauge; +mod histogram; +#[allow(clippy::module_inception)] +mod metric; + +pub(crate) use counter::*; +pub(crate) use duration::*; +pub(crate) use gauge::*; +pub(crate) use histogram::*; +pub(crate) use metric::*; + +/// A `Registry` stores a map of metric names to `Instrument` +/// +/// It allows retrieving them by name, registering new instruments and generating +/// reports of all registered instruments +#[derive(Debug, Default)] +pub struct Registry { + /// A list of instruments indexed by metric name + /// + /// A BTreeMap is used to provide a consistent ordering + instruments: Mutex>>, +} + +impl Registry { + pub fn new() -> Self { + Self::default() + } + + /// Register a new `Metric` with the provided name and description + /// + /// ``` + /// use ::metric::{Registry, Metric, U64Counter}; + /// + /// let registry = Registry::new(); + /// let counter: Metric = registry.register_metric("metric_name", "description"); + /// ``` + /// + /// Note: `&'static str` is intentionally used to ensure the metric name appears "in-the-plain" + /// and can easily be searched for within the codebase + /// + pub fn register_metric(&self, name: &'static str, description: &'static str) -> Metric + where + T: MetricObserver, + T::Options: Default, + { + self.register_metric_with_options(name, description, Default::default) + } + + /// If a metric with the provided `name` already exists, returns it + /// + /// Otherwise, invokes `options` and creates a new Metric from the + /// returned options, stores it in this `Registry` and returns it + /// + /// Panics if an `Instrument` has already been registered with this + /// name but a different type + /// + /// ``` + /// use ::metric::{Registry, Metric, U64Histogram, U64HistogramOptions}; + /// + /// let registry = Registry::new(); + /// let histogram: Metric = registry.register_metric_with_options( + /// "metric_name", + /// "description", + /// || U64HistogramOptions::new([10, 20, u64::MAX]), + /// ); + /// ``` + /// + pub fn register_metric_with_options T::Options>( + &self, + name: &'static str, + description: &'static str, + options: F, + ) -> Metric { + self.register_instrument(name, move || Metric::new(name, description, options())) + } + + /// If an instrument already exists with the provided `name`, returns it + /// + /// Otherwise, invokes `create` to create a new `Instrument`, stores it in this `Registry`, + /// and returns it + /// + /// An application might choose to register a custom Instrument, instead of using a Metric, + /// when it wishes to defer some computation to report time. For example, reporting + /// metrics from systems that cannot be instrumented with `Metric` directly (e.g. jemalloc) + /// + /// Note: An instrument name is not required to match the metric name(s) reported by the + /// instrument, however: + /// + /// - instruments will report in order of their instrument name + /// - not all reporters may handle the same metric name being reported multiple times + /// + /// Panics if an `Instrument` has already been registered with this name but a different type + /// + /// Panics if the instrument name is illegal + pub fn register_instrument I, I: Instrument + Clone + 'static>( + &self, + name: &'static str, + create: F, + ) -> I { + assert_legal_key(name); + + let mut instruments = self.instruments.lock(); + + let instrument = match instruments.entry(name) { + Entry::Occupied(o) => match o.get().as_any().downcast_ref::() { + Some(instrument) => instrument.clone(), + None => panic!("instrument {name} registered with two different types"), + }, + Entry::Vacant(v) => { + let instrument = create(); + v.insert(Box::new(instrument.clone())); + instrument + } + }; + + instrument + } + + /// Returns the already registered `Instrument` if any + /// + /// This is primarily useful for testing + pub fn get_instrument(&self, name: &'static str) -> Option { + let instruments = self.instruments.lock(); + instruments + .get(name) + .map(|instrument| match instrument.as_any().downcast_ref::() { + Some(metric) => metric.clone(), + None => panic!("instrument {name} registered with two different types"), + }) + } + + /// Record the current state of every metric in this registry to the provided `Reporter` + /// + /// Will iterate through all registered metrics in alphabetical order and for each: + /// - call start_metric once + /// - call report_observation once for each set of attributes in alphabetical order + /// - call finish_metric once complete + pub fn report(&self, reporter: &mut dyn Reporter) { + let instruments = self.instruments.lock(); + for instrument in instruments.values() { + instrument.report(reporter) + } + } +} + +/// `Instrument` is a type that knows how to write its observations to a `Reporter` +pub trait Instrument: std::fmt::Debug + Send + Sync { + /// Record the current state of this metric to the provided `Reporter` + /// + /// Guaranteed to: + /// - call start_metric once + /// - call report_observation once for each set of attributes in alphabetical order + /// - call finish_metric once complete + fn report(&self, reporter: &mut dyn Reporter); + + /// Returns the type as [`Any`] so that it can be downcast to its underlying type + fn as_any(&self) -> &dyn Any; +} + +/// `Reporter` is the trait that should be implemented by anything that wants to +/// extract the state of all metrics within a `Registry` and export them +pub trait Reporter { + /// Start recording the observations of a single metric + /// + /// Successive calls are guaranteed to not occur without an intervening + /// call to finish_metric + fn start_metric( + &mut self, + metric_name: &'static str, + description: &'static str, + kind: MetricKind, + ); + + /// Record an observation for the metric started by start_metric + /// + /// Must not be called without a prior call to start_metric with + /// no intervening call to finish_metric + fn report_observation(&mut self, attributes: &Attributes, observation: Observation); + + /// Finish recording a given metric + /// + /// Must not be called without a prior call to start_metric with + /// no intervening call to finish_metric + fn finish_metric(&mut self); +} + +/// A set of observations for a particular metric +/// +/// This is solely used by `RawReporter` to buffer up observations, the `Reporter` +/// trait streams `Observation` and does not perform intermediate aggregation +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ObservationSet { + pub metric_name: &'static str, + pub description: &'static str, + pub kind: MetricKind, + pub observations: Vec<(Attributes, Observation)>, +} + +impl ObservationSet { + /// Returns the observation for a given set of attributes if any + pub fn observation(&self, attributes: impl Into) -> Option<&Observation> { + let attributes = attributes.into(); + self.observations + .iter() + .find_map(|(a, o)| if a == &attributes { Some(o) } else { None }) + } +} + +/// A `Reporter` that records the raw data submitted +#[derive(Debug, Clone, Default)] +pub struct RawReporter { + completed: Vec, + in_progress: Option, +} + +impl Reporter for RawReporter { + fn start_metric( + &mut self, + metric_name: &'static str, + description: &'static str, + kind: MetricKind, + ) { + assert!(self.in_progress.is_none(), "metric already in progress"); + self.in_progress = Some(ObservationSet { + metric_name, + description, + kind, + observations: Default::default(), + }) + } + + fn report_observation(&mut self, attributes: &Attributes, observation: Observation) { + let metric = self + .in_progress + .as_mut() + .expect("metric should be in progress"); + metric.observations.push((attributes.clone(), observation)) + } + + fn finish_metric(&mut self) { + let metric = self + .in_progress + .take() + .expect("metric should be in progress"); + self.completed.push(metric) + } +} + +impl RawReporter { + /// Returns the observation set for a given metric name if any + pub fn metric(&self, metric_name: &str) -> Option<&ObservationSet> { + self.observations() + .iter() + .find(|observation| observation.metric_name == metric_name) + } + + /// Returns a list of `ObservationSet` for each reported metric + pub fn observations(&self) -> &Vec { + assert!(self.in_progress.is_none(), "metric observation in progress"); + &self.completed + } +} + +/// Identifies the type of `Observation` reported by this `Metric` +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum MetricKind { + U64Counter, + U64Gauge, + U64Histogram, + DurationCounter, + DurationGauge, + DurationHistogram, +} + +/// A `Metric` records an `Observation` for each unique set of `Attributes` +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Observation { + U64Counter(u64), + U64Gauge(u64), + DurationCounter(std::time::Duration), + DurationGauge(std::time::Duration), + U64Histogram(HistogramObservation), + DurationHistogram(HistogramObservation), +} + +/// A histogram measurement +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct HistogramObservation { + /// The sum of all observations + pub total: T, + /// The buckets + pub buckets: Vec>, +} + +impl HistogramObservation { + pub fn sample_count(&self) -> u64 { + self.buckets.iter().map(|bucket| bucket.count).sum() + } +} + +/// A bucketed observation +/// +/// Stores the number of values that were less than or equal to `le` and +/// strictly greater than the `le` of the previous bucket +/// +/// NB: Unlike prometheus histogram bins the buckets are not cumulative +/// i.e. `count` is just the count of values that fell into this bucket +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ObservationBucket { + pub le: T, + pub count: u64, +} + +/// A set of key-value pairs with unique keys +/// +/// A `Metric` records observations for each unique set of `Attributes` +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct Attributes(BTreeMap<&'static str, Cow<'static, str>>); + +impl Attributes { + pub fn iter(&self) -> std::collections::btree_map::Iter<'_, &'static str, Cow<'static, str>> { + self.0.iter() + } + + /// Sets the given key, overriding it if already set + pub fn insert(&mut self, key: &'static str, value: impl Into>) { + self.0.insert(key, value.into()); + } +} + +impl<'a, const N: usize> From<&'a [(&'static str, &'static str); N]> for Attributes { + fn from(iterator: &'a [(&'static str, &'static str); N]) -> Self { + Self( + iterator + .iter() + .map(|(key, value)| { + assert_legal_key(key); + (*key, Cow::Borrowed(*value)) + }) + .collect(), + ) + } +} + +impl From<[(&'static str, Cow<'static, str>); N]> for Attributes { + fn from(iterator: [(&'static str, Cow<'static, str>); N]) -> Self { + Self( + IntoIterator::into_iter(iterator) + .map(|(key, value)| { + assert_legal_key(key); + (key, value) + }) + .collect(), + ) + } +} + +/// Panics if the provided string matches [0-9a-z_]+ +pub fn assert_legal_key(s: &str) { + assert!(!s.is_empty(), "string must not be empty"); + assert!( + s.chars().all(|c| matches!(c, '0'..='9' | 'a'..='z' | '_')), + "string must be [0-9a-z_]+ got: \"{s}\"" + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_registry() { + let registry = Registry::new(); + let counter: Metric = registry.register_metric("foo", "my magic description"); + let gauge: Metric = registry.register_metric("bar", "my magic description"); + + counter.recorder(&[("tag1", "foo")]).inc(23); + counter.recorder(&[("tag1", "bar")]).inc(53); + gauge.recorder(&[("tag1", "value")]).set(49); + + let mut reporter = RawReporter::default(); + registry.report(&mut reporter); + + let observations = reporter.observations(); + + assert_eq!(observations.len(), 2); + + // Results should be alphabetical in metric name + let gauge = &observations[0]; + assert_eq!(gauge.metric_name, "bar"); + assert_eq!(gauge.kind, MetricKind::U64Gauge); + assert_eq!(gauge.observations.len(), 1); + + let (attributes, observation) = &gauge.observations[0]; + assert_eq!(attributes.0.get("tag1").unwrap(), "value"); + assert_eq!(observation, &Observation::U64Gauge(49)); + + let counter = &observations[1]; + assert_eq!(counter.metric_name, "foo"); + assert_eq!(counter.kind, MetricKind::U64Counter); + + assert_eq!(counter.observations.len(), 2); + + // Attributes should be alphabetical + let (attributes, observation) = &counter.observations[0]; + assert_eq!(attributes.0.get("tag1").unwrap(), "bar"); + assert_eq!(observation, &Observation::U64Counter(53)); + + let (attributes, observation) = &counter.observations[1]; + assert_eq!(attributes.0.get("tag1").unwrap(), "foo"); + assert_eq!(observation, &Observation::U64Counter(23)); + + assert!(registry + .get_instrument::>("unregistered") + .is_none()); + + let counter = registry + .get_instrument::>("foo") + .unwrap(); + + let new_attributes = Attributes::from(&[("foo", "bar")]); + assert!(counter.get_observer(&new_attributes).is_none()); + let observation = counter.get_observer(attributes).unwrap().observe(); + + assert_eq!(observation, Observation::U64Counter(23)); + } + + #[test] + #[should_panic(expected = "instrument foo registered with two different types")] + fn test_type_mismatch() { + let registry = Registry::new(); + registry.register_metric::("foo", "my magic description"); + registry.register_metric::("foo", "my magic description"); + } + + #[test] + #[should_panic(expected = "string must be [0-9a-z_]+ got: \"foo sdf\"")] + fn illegal_metric_name() { + let registry = Registry::new(); + registry.register_metric::("foo sdf", "my magic description"); + } + + #[test] + #[should_panic(expected = "string must be [0-9a-z_]+ got: \"foo bar\"")] + fn illegal_attribute_name() { + let _ = Attributes::from(&[("foo bar", "value")]); + } +} diff --git a/obstore/src/metrics/mod.rs b/obstore/src/metrics/mod.rs new file mode 100644 index 00000000..48feab0f --- /dev/null +++ b/obstore/src/metrics/mod.rs @@ -0,0 +1,6 @@ +mod iox_time; +mod metric; +mod multipart_upload_metrics; +mod store; + +pub use store::PyObjectStoreMetrics; diff --git a/obstore/src/metrics/multipart_upload_metrics.rs b/obstore/src/metrics/multipart_upload_metrics.rs new file mode 100644 index 00000000..2c79bfcb --- /dev/null +++ b/obstore/src/metrics/multipart_upload_metrics.rs @@ -0,0 +1,93 @@ +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; + +use super::iox_time::{Time, TimeProvider}; +use async_trait::async_trait; +use futures::{channel::oneshot::Sender, FutureExt}; +use object_store::{MultipartUpload, PutPayload, PutResult, UploadPart}; + +/// Able to perform metrics tracking bytes written +/// with object_store::put_multipart. +#[derive(Debug)] +pub(crate) struct MultipartUploadWrapper { + /// current inner [`MultipartUpload`] + inner: Box, + + /// calculate end time + time_provider: Arc, + + /// current subtotaled size, regardless of success or failure + bytes_attempted_so_far: Arc, + /// current outcome + outcome: Arc, + /// action upon complete + tx: Option, Time)>>, +} + +impl MultipartUploadWrapper { + pub(crate) fn new( + inner: Box, + time_provider: Arc, + tx: Sender<(bool, Option, Time)>, + ) -> Self { + Self { + inner, + time_provider, + bytes_attempted_so_far: Arc::new(AtomicU64::new(0)), + outcome: Arc::new(AtomicBool::new(true)), + tx: Some(tx), + } + } +} + +#[async_trait] +impl MultipartUpload for MultipartUploadWrapper { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let attempted_size = data.content_length(); + let res = self.inner.as_mut().put_part(data); + let bytes_attempted_so_far = self.bytes_attempted_so_far.clone(); + let outcome = self.outcome.clone(); + + async move { + match res.await { + Ok(_) => { + bytes_attempted_so_far.fetch_add(attempted_size as u64, Ordering::AcqRel); + Ok(()) + } + Err(e) => { + outcome.fetch_and(false, Ordering::AcqRel); // mark result failure + bytes_attempted_so_far.fetch_add(attempted_size as u64, Ordering::AcqRel); + Err(e) + } + } + } + .boxed() + } + + async fn complete(&mut self) -> object_store::Result { + let res = self.inner.complete().await; + if res.is_err() { + self.outcome.fetch_and(false, Ordering::AcqRel); // mark result failure + } + res + } + + async fn abort(&mut self) -> object_store::Result<()> { + self.outcome.fetch_and(false, Ordering::AcqRel); // mark result failure + self.inner.abort().await + } +} + +impl Drop for MultipartUploadWrapper { + fn drop(&mut self) { + let outcome = self.outcome.load(Ordering::Acquire); + let bytes = self.bytes_attempted_so_far.load(Ordering::Acquire); + + if let Some(tx) = self.tx.take() { + tx.send((outcome, Some(bytes), self.time_provider.now())) + .expect("should send object store metrics back from MultipartUploadWrapper"); + } + } +} diff --git a/obstore/src/metrics/store.rs b/obstore/src/metrics/store.rs new file mode 100644 index 00000000..1cb80881 --- /dev/null +++ b/obstore/src/metrics/store.rs @@ -0,0 +1,2436 @@ +//! A metric instrumentation wrapper over [`ObjectStore`] implementations. + +#![allow(clippy::clone_on_ref_ptr)] + +use super::multipart_upload_metrics::MultipartUploadWrapper; +use object_store::{ + GetOptions, GetResultPayload, MultipartUpload, PutMultipartOpts, PutOptions, PutPayload, + PutResult, +}; +use pyo3_object_store::PyObjectStore; + +use std::sync::Arc; +use std::{borrow::Cow, ops::Range}; +use std::{ + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use super::iox_time::{Time, TimeProvider}; +use super::metric::{DurationHistogram, Metric, U64Counter}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{stream::BoxStream, Stream, StreamExt}; +use pin_project::{pin_project, pinned_drop}; + +use crate::metrics::metric; +use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; +use tokio::{sync::Mutex, task::JoinSet}; + +use pyo3::prelude::*; + +/// A typed name of a scope / type to report the metrics under. +#[derive(Debug, Clone)] +pub struct StoreType(Cow<'static, str>); + +impl From for StoreType +where + T: Into>, +{ + fn from(v: T) -> Self { + Self(v.into()) + } +} + +#[derive(Debug, Clone)] +struct Metrics { + success_duration: DurationHistogram, + error_duration: DurationHistogram, +} + +impl Metrics { + fn new(registry: &metric::Registry, store_type: &StoreType, op: &'static str) -> Self { + // Call durations broken down by op & result + let duration: Metric = registry.register_metric( + "object_store_op_duration", + "object store operation duration", + ); + + Self { + success_duration: duration.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("success")), + ]), + error_duration: duration.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("error")), + ]), + } + } + + fn record(&self, t_begin: Time, t_end: Time, success: bool) { + // Avoid exploding if time goes backwards - simply drop the measurement + // if it happens. + let Some(delta) = t_end.checked_duration_since(t_begin) else { + return; + }; + + if success { + self.success_duration.record(delta); + } else { + self.error_duration.record(delta); + } + } +} + +#[derive(Debug, Clone)] +struct MetricsWithBytes { + inner: Metrics, + success_bytes: U64Counter, + error_bytes: U64Counter, +} + +impl MetricsWithBytes { + fn new(registry: &metric::Registry, store_type: &StoreType, op: &'static str) -> Self { + // Byte counts up/down + let bytes = registry.register_metric::( + "object_store_transfer_bytes", + "cumulative count of file content bytes transferred to/from the object store", + ); + + Self { + inner: Metrics::new(registry, store_type, op), + success_bytes: bytes.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("success")), + ]), + error_bytes: bytes.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("error")), + ]), + } + } + + fn record_bytes_only(&self, success: bool, bytes: u64) { + if success { + self.success_bytes.inc(bytes); + } else { + self.error_bytes.inc(bytes); + } + } + + fn record(&self, t_begin: Time, t_end: Time, success: bool, bytes: Option) { + if let Some(bytes) = bytes { + self.record_bytes_only(success, bytes); + } + + self.inner.record(t_begin, t_end, success); + } +} + +#[derive(Debug, Clone)] +struct MetricsWithBytesAndTtfb { + inner: MetricsWithBytes, + success_duration: DurationHistogram, + error_duration: DurationHistogram, +} + +impl MetricsWithBytesAndTtfb { + fn new(registry: &metric::Registry, store_type: &StoreType, op: &'static str) -> Self { + // Call durations broken down by op & result + let duration: Metric = registry.register_metric( + "object_store_op_ttfb", + "Time to first byte for object store operation", + ); + + Self { + inner: MetricsWithBytes::new(registry, store_type, op), + success_duration: duration.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("success")), + ]), + error_duration: duration.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("error")), + ]), + } + } + + fn record_bytes_only(&self, success: bool, bytes: u64) { + self.inner.record_bytes_only(success, bytes); + } + + fn record( + &self, + t_begin: Time, + t_first_byte: Time, + t_end: Time, + success: bool, + bytes: Option, + ) { + if let Some(delta) = t_first_byte.checked_duration_since(t_begin) { + if success { + self.success_duration.record(delta); + } else { + self.error_duration.record(delta); + } + } + + self.inner.record(t_begin, t_end, success, bytes); + } +} + +#[derive(Debug, Clone)] +struct MetricsWithCount { + inner: Metrics, + success_count: U64Counter, + error_count: U64Counter, +} + +impl MetricsWithCount { + fn new(registry: &metric::Registry, store_type: &StoreType, op: &'static str) -> Self { + let count = registry.register_metric::( + "object_store_transfer_objects", + "cumulative count of objects transferred to/from the object store", + ); + + Self { + inner: Metrics::new(registry, store_type, op), + success_count: count.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("success")), + ]), + error_count: count.recorder([ + ("store_type", store_type.0.clone()), + ("op", Cow::Borrowed(op)), + ("result", Cow::Borrowed("error")), + ]), + } + } + + fn record_count_only(&self, success: bool, count: u64) { + if success { + self.success_count.inc(count); + } else { + self.error_count.inc(count); + } + } + + fn record(&self, t_begin: Time, t_end: Time, success: bool, count: Option) { + if let Some(count) = count { + self.record_count_only(success, count); + } + + self.inner.record(t_begin, t_end, success); + } +} + +/// An instrumentation decorator, wrapping an underlying [`ObjectStore`] +/// implementation and recording bytes transferred and call latency. +/// +/// # Stream Duration +/// +/// The [`ObjectStore::get()`] call can return a [`Stream`] which is polled +/// by the caller and may yield chunks of a file over a series of polls (as +/// opposed to all of the file data in one go). Because the caller drives the +/// polling and therefore fetching of data from the object store over the +/// lifetime of the [`Stream`], the duration of a [`ObjectStore::get()`] +/// request is measured to be the wall clock difference between the moment the +/// caller executes the [`ObjectStore::get()`] call, up until the last chunk +/// of data is yielded to the caller. +/// +/// This means the duration metrics measuring consumption of returned streams +/// are recording the rate at which the application reads the data, as opposed +/// to the duration of time taken to fetch that data. +/// +/// # Stream Errors +/// +/// The [`ObjectStore::get()`] method can return a [`Stream`] of [`Result`] +/// instances, and returning an error when polled is not necessarily a terminal +/// state. The metric recorder allows for a caller to observe a transient error +/// and subsequently go on to complete reading the stream, recording this read +/// in the "success" histogram. +/// +/// If a stream is not polled again after observing an error, the operation is +/// recorded in the "error" histogram. +/// +/// A stream can return an arbitrary sequence of success and error states before +/// terminating, with the last observed poll result that yields a [`Result`] +/// dictating which histogram the operation is recorded in. +/// +/// # Bytes Transferred +/// +/// The metric recording bytes transferred accounts for only object data, and +/// not object metadata (such as that returned by list methods). +/// +/// The total data transferred will be greater than the metric value due to +/// metadata queries, read errors, etc. The metric tracks the amount of object +/// data successfully yielded to the caller. +/// +/// # Backwards Clocks +/// +/// If the system clock is observed as moving backwards in time, call durations +/// are not recorded. The bytes transferred metric is not affected. +#[derive(Debug)] +pub struct ObjectStoreMetrics { + inner: Arc, + time_provider: Arc, + + put: MetricsWithBytes, + put_multipart: Arc, + inprogress_multipart: Mutex>, + get: MetricsWithBytesAndTtfb, + get_range: MetricsWithBytes, + get_ranges: MetricsWithBytes, + head: Metrics, + delete: Metrics, + delete_stream: MetricsWithCount, + list: MetricsWithCount, + list_with_offset: MetricsWithCount, + list_with_delimiter: MetricsWithCount, + copy: Metrics, + rename: Metrics, + copy_if_not_exists: Metrics, + rename_if_not_exists: Metrics, +} + +impl ObjectStoreMetrics { + /// Instrument `T`, pushing to `registry`. + pub fn new( + inner: Arc, + time_provider: Arc, + store_type: impl Into, + registry: &metric::Registry, + ) -> Self { + let store_type = store_type.into(); + + Self { + inner, + time_provider, + + put: MetricsWithBytes::new(registry, &store_type, "put"), + put_multipart: Arc::new(MetricsWithBytes::new( + registry, + &store_type, + "put_multipart", + )), + inprogress_multipart: Default::default(), + get: MetricsWithBytesAndTtfb::new(registry, &store_type, "get"), + get_range: MetricsWithBytes::new(registry, &store_type, "get_range"), + get_ranges: MetricsWithBytes::new(registry, &store_type, "get_ranges"), + head: Metrics::new(registry, &store_type, "head"), + delete: Metrics::new(registry, &store_type, "delete"), + delete_stream: MetricsWithCount::new(registry, &store_type, "delete_stream"), + list: MetricsWithCount::new(registry, &store_type, "list"), + list_with_offset: MetricsWithCount::new(registry, &store_type, "list_with_offset"), + list_with_delimiter: MetricsWithCount::new( + registry, + &store_type, + "list_with_delimiter", + ), + copy: Metrics::new(registry, &store_type, "copy"), + rename: Metrics::new(registry, &store_type, "rename"), + copy_if_not_exists: Metrics::new(registry, &store_type, "copy_if_not_exists"), + rename_if_not_exists: Metrics::new(registry, &store_type, "rename_if_not_exists"), + } + } + + #[cfg(test)] + async fn close(&self) { + let _ = self.inprogress_multipart.lock().await.join_next().await; + } +} + +impl std::fmt::Display for ObjectStoreMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ObjectStoreMetrics({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for ObjectStoreMetrics { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> Result { + let t = self.time_provider.now(); + let size = bytes.content_length(); + let res = self.inner.put_opts(location, bytes, opts).await; + self.put + .record(t, self.time_provider.now(), res.is_ok(), Some(size as _)); + res + } + + async fn put_multipart_opts( + &self, + location: &Path, + _opts: PutMultipartOpts, + ) -> Result> { + let t = self.time_provider.now(); + let inner = self.inner.put_multipart(location).await?; + + let (tx, rx) = futures::channel::oneshot::channel(); + let reporter = Arc::clone(&self.put_multipart); + self.inprogress_multipart.lock().await.spawn(async move { + if let Ok((res, bytes, t_end)) = rx.await { + reporter.record(t, t_end, res, bytes); + } + }); + + let multipart_upload = + MultipartUploadWrapper::new(inner, Arc::clone(&self.time_provider), tx); + Ok(Box::new(multipart_upload)) + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let started_at = self.time_provider.now(); + + let res = self.inner.get_opts(location, options).await; + + match res { + Ok(mut res) => { + res.payload = match res.payload { + GetResultPayload::File(file, path) => { + let file = tokio::fs::File::from_std(file); + let size = file.metadata().await.ok().map(|m| m.len()); + let file = file.into_std().await; + + let end = self.time_provider.now(); + self.get.record( + started_at, + // first byte wasn't really measured, so take "end" instead + end, end, true, size, + ); + GetResultPayload::File(file, path) + } + GetResultPayload::Stream(s) => { + // Wrap the object store data stream in a decorator to track the + // yielded data / wall clock, inclusive of the inner call above. + GetResultPayload::Stream(Box::pin(Box::new( + StreamMetricRecorder::new( + s, + started_at, + BytesStreamDelegate::new(self.get.clone()), + Arc::clone(&self.time_provider), + ) + .fuse(), + ))) + } + }; + Ok(res) + } + Err(e) => { + let end = self.time_provider.now(); + self.get.record(started_at, end, end, false, None); + Err(e) + } + } + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + let t = self.time_provider.now(); + let res = self.inner.get_range(location, range).await; + self.get_range.record( + t, + self.time_provider.now(), + res.is_ok(), + res.as_ref().ok().map(|b| b.len() as _), + ); + res + } + + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + let t = self.time_provider.now(); + let res = self.inner.get_ranges(location, ranges).await; + self.get_ranges.record( + t, + self.time_provider.now(), + res.is_ok(), + res.as_ref() + .ok() + .map(|b| b.iter().map(|b| b.len() as u64).sum()), + ); + res + } + + async fn head(&self, location: &Path) -> Result { + let t = self.time_provider.now(); + let res = self.inner.head(location).await; + self.head.record(t, self.time_provider.now(), res.is_ok()); + res + } + + async fn delete(&self, location: &Path) -> Result<()> { + let t = self.time_provider.now(); + let res = self.inner.delete(location).await; + self.delete.record(t, self.time_provider.now(), res.is_ok()); + res + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { + let started_at = self.time_provider.now(); + + let s = self.inner.delete_stream(locations); + + // Wrap the object store data stream in a decorator to track the + // yielded data / wall clock, inclusive of the inner call above. + StreamMetricRecorder::new( + s, + started_at, + CountStreamDelegate::new(self.delete_stream.clone()), + Arc::clone(&self.time_provider), + ) + .fuse() + .boxed() + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + let started_at = self.time_provider.now(); + + let s = self.inner.list(prefix); + + // Wrap the object store data stream in a decorator to track the + // yielded data / wall clock, inclusive of the inner call above. + StreamMetricRecorder::new( + s, + started_at, + CountStreamDelegate::new(self.list.clone()), + Arc::clone(&self.time_provider), + ) + .fuse() + .boxed() + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, Result> { + let started_at = self.time_provider.now(); + + let s = self.inner.list_with_offset(prefix, offset); + + // Wrap the object store data stream in a decorator to track the + // yielded data / wall clock, inclusive of the inner call above. + StreamMetricRecorder::new( + s, + started_at, + CountStreamDelegate::new(self.list_with_offset.clone()), + Arc::clone(&self.time_provider), + ) + .fuse() + .boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let t = self.time_provider.now(); + let res = self.inner.list_with_delimiter(prefix).await; + self.list_with_delimiter.record( + t, + self.time_provider.now(), + res.is_ok(), + res.as_ref().ok().map(|res| res.objects.len() as _), + ); + res + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let t = self.time_provider.now(); + let res = self.inner.copy(from, to).await; + self.copy.record(t, self.time_provider.now(), res.is_ok()); + res + } + + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + let t = self.time_provider.now(); + let res = self.inner.rename(from, to).await; + self.rename.record(t, self.time_provider.now(), res.is_ok()); + res + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let t = self.time_provider.now(); + let res = self.inner.copy_if_not_exists(from, to).await; + self.copy_if_not_exists + .record(t, self.time_provider.now(), res.is_ok()); + res + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let t = self.time_provider.now(); + let res = self.inner.rename_if_not_exists(from, to).await; + self.rename_if_not_exists + .record(t, self.time_provider.now(), res.is_ok()); + res + } +} + +#[pyclass] +pub struct PyObjectStoreMetrics { + store: ObjectStoreMetrics, + metrics: Arc, +} + +#[pymethods] +impl PyObjectStoreMetrics { + #[new] + fn new(store: PyObjectStore, name: String) -> PyResult { + let metrics = Arc::new(metric::Registry::new()); + let store = ObjectStoreMetrics::new( + store.into_inner(), + Arc::new(super::iox_time::SystemProvider::new()), + name, + &metrics, + ); + Ok(Self { store, metrics }) + } + + fn get(&self) { + self.metrics.get_instrument("name"); + } +} + +/// A [`MetricDelegate`] is called whenever the [`StreamMetricRecorder`] +/// observes an `Ok(Item)` in the stream. +trait MetricDelegate { + /// The type this delegate observes. + type Item; + + /// Invoked when the stream yields an `Ok(Item)`. + fn observe_ok(&mut self, value: &Self::Item, t: Time); + + /// Finish stream. + fn finish(&mut self, t_begin: Time, t_end: Time, success: bool); +} + +/// A [`MetricDelegate`] for instrumented streams of [`Bytes`]. +/// +/// This impl is used to record the number of bytes yielded for +/// [`ObjectStore::get()`] calls. +#[derive(Debug)] +struct BytesStreamDelegate { + metrics: MetricsWithBytesAndTtfb, + first_byte: Option