diff --git a/Cargo.toml b/Cargo.toml index 74a437f..f683477 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ crate-type = ["dylib"] [dependencies] bitflags = "1.0" libc = "0.2.0" -time = "0.1" +time = { version = "0.3.21", features = ["formatting"] } [build-dependencies] -cc = "1.0.28" +cc = "1.0.28" \ No newline at end of file diff --git a/src/cell/mod.rs b/src/cell/mod.rs index 41f84cc..fa5cacc 100644 --- a/src/cell/mod.rs +++ b/src/cell/mod.rs @@ -2,6 +2,9 @@ extern crate time; pub mod store; +use std::convert::TryInto; +use time::{format_description::well_known, OffsetDateTime}; + use error::CellError; // Maximum number of times to retry set_if_not_exists/compare_and_swap @@ -30,7 +33,7 @@ impl Rate { /// we wanted to have 10 actions every 2 seconds, the period produced would /// be 200 ms. pub fn per_period(n: i64, period: time::Duration) -> Rate { - let ns: i64 = period.num_nanoseconds().unwrap(); + let ns: i128 = period.whole_nanoseconds(); // Don't rely on floating point math to get here. if n == 0 || ns == 0 { @@ -76,7 +79,10 @@ impl RateLimiter { pub fn new(store: T, quota: &RateQuota) -> Self { RateLimiter { delay_variation_tolerance: time::Duration::nanoseconds( - quota.max_rate.period.num_nanoseconds().unwrap() * (quota.max_burst + 1), + (quota.max_rate.period.whole_nanoseconds() + * (quota.max_burst as i128 + 1)) + .try_into() + .unwrap(), ), emission_interval: quota.max_rate.period, limit: quota.max_burst + 1, @@ -110,9 +116,7 @@ impl RateLimiter { return Err(error!("Zero rates are not supported")); } - let increment = time::Duration::nanoseconds( - self.emission_interval.num_nanoseconds().unwrap() * quantity, - ); + let increment = self.emission_interval * quantity as f64; self.log_start(key, quantity, increment); // Rust actually detects that this variable can only ever be assigned @@ -140,12 +144,12 @@ impl RateLimiter { let tat = match tat_val { -1 => now, - _ => from_nanoseconds(tat_val), + _ => OffsetDateTime::from_unix_timestamp_nanos(tat_val).unwrap(), }; log_debug!( self.store, "tat = {} (from store = {})", - tat.rfc3339(), + tat.format(&well_known::Rfc3339).unwrap(), tat_val ); @@ -154,7 +158,11 @@ impl RateLimiter { } else { tat + increment }; - log_debug!(self.store, "new_tat = {}", new_tat.rfc3339()); + log_debug!( + self.store, + "new_tat = {}", + new_tat.format(&well_known::Rfc3339).unwrap() + ); // Block the request if the next permitted time is in the future. let allow_at = new_tat - self.delay_variation_tolerance; @@ -162,14 +170,14 @@ impl RateLimiter { log_debug!( self.store, "diff = {}ms (now - allow_at)", - diff.num_milliseconds() + diff.whole_milliseconds() ); - if diff < time::Duration::zero() { + if diff < time::Duration::ZERO { log_debug!( self.store, "BLOCKED retry_after = {}ms", - -diff.num_milliseconds() + -diff.whole_milliseconds() ); if increment <= self.delay_variation_tolerance { @@ -181,7 +189,7 @@ impl RateLimiter { break; } - let new_tat_ns = nanoseconds(new_tat); + let new_tat_ns = new_tat.unix_timestamp_nanos(); ttl = new_tat - now; log_debug!(self.store, "ALLOWED"); @@ -215,8 +223,8 @@ impl RateLimiter { let next = self.delay_variation_tolerance - ttl; if next > -self.emission_interval { - rlc.remaining = (next.num_microseconds().unwrap() as f64 - / self.emission_interval.num_microseconds().unwrap() as f64) + rlc.remaining = (next.whole_microseconds() as f64 + / self.emission_interval.whole_microseconds() as f64) as i64; } rlc.reset_after = ttl; @@ -235,12 +243,12 @@ impl RateLimiter { log_debug!( self.store, "retry_after = {}ms", - rlc.retry_after.num_milliseconds() + rlc.retry_after.whole_milliseconds() ); log_debug!( self.store, "reset_after = {}ms (ttl)", - rlc.reset_after.num_milliseconds() + rlc.reset_after.whole_milliseconds() ); } @@ -252,17 +260,17 @@ impl RateLimiter { log_debug!( self.store, "delay_variation_tolerance = {}ms", - self.delay_variation_tolerance.num_milliseconds() + self.delay_variation_tolerance.whole_milliseconds() ); log_debug!( self.store, "emission_interval = {}ms", - self.emission_interval.num_milliseconds() + self.emission_interval.whole_milliseconds() ); log_debug!( self.store, "tat_increment = {}ms (emission_interval * quantity)", - increment.num_milliseconds() + increment.whole_milliseconds() ); } } @@ -273,19 +281,6 @@ pub struct RateQuota { pub max_rate: Rate, } -fn from_nanoseconds(x: i64) -> time::Tm { - let ns = 10_i64.pow(9); - time::at(time::Timespec { - sec: x / ns, - nsec: (x % ns) as i32, - }) -} - -fn nanoseconds(x: time::Tm) -> i64 { - let ts = x.to_timespec(); - ts.sec * 10_i64.pow(9) + i64::from(ts.nsec) -} - #[cfg(test)] mod tests { extern crate time; @@ -372,7 +367,7 @@ mod tests { max_burst: limit - 1, max_rate: Rate::per_second(1), }; - let start = time::now_utc(); + let start = OffsetDateTime::now_utc(); let mut memory_store = store::MemoryStore::new_verbose(); let mut test_store = TestStore::new(&mut memory_store); let mut limiter = RateLimiter::new(&mut test_store, "a); @@ -383,7 +378,7 @@ mod tests { // // You can never make a request larger than the maximum. - RateLimitCase::new(0, start, 6, 5, time::Duration::zero(), + RateLimitCase::new(0, start, 6, 5, time::Duration::ZERO, time::Duration::seconds(-1), true), // Rate limit normal requests appropriately. @@ -484,7 +479,7 @@ mod tests { #[derive(Debug, Eq, PartialEq)] struct RateLimitCase { num: i64, - now: time::Tm, + now: time::OffsetDateTime, volume: i64, remaining: i64, reset_after: time::Duration, @@ -495,7 +490,7 @@ mod tests { impl RateLimitCase { fn new( num: i64, - now: time::Tm, + now: time::OffsetDateTime, volume: i64, remaining: i64, reset_after: time::Duration, @@ -518,7 +513,7 @@ mod tests { /// us to tweak certain behavior, like for example setting the effective /// system clock. struct TestStore<'a> { - clock: time::Tm, + clock: time::OffsetDateTime, fail_updates: bool, store: &'a mut store::MemoryStore, } @@ -526,7 +521,7 @@ mod tests { impl<'a> TestStore<'a> { fn new(store: &'a mut store::MemoryStore) -> TestStore { TestStore { - clock: time::empty_tm(), + clock: OffsetDateTime::UNIX_EPOCH, fail_updates: false, store, } @@ -537,8 +532,8 @@ mod tests { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: i128, + new: i128, ttl: time::Duration, ) -> Result { if self.fail_updates { @@ -548,7 +543,10 @@ mod tests { } } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(i128, time::OffsetDateTime), CellError> { let tup = self.store.get_with_time(key)?; Ok((tup.0, self.clock)) } @@ -560,7 +558,7 @@ mod tests { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: i128, ttl: time::Duration, ) -> Result { if self.fail_updates { diff --git a/src/cell/store.rs b/src/cell/store.rs index 8d521ea..bd878b3 100644 --- a/src/cell/store.rs +++ b/src/cell/store.rs @@ -3,6 +3,7 @@ extern crate time; use error::CellError; use redis; use std::collections::HashMap; +use time::OffsetDateTime; /// Store exposes the atomic data store operations that the GCRA rate limiter /// needs to function correctly. @@ -19,8 +20,8 @@ pub trait Store { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: i128, + new: i128, ttl: time::Duration, ) -> Result; @@ -28,7 +29,8 @@ pub trait Store { /// store (this is done so that rate limiters running on a variety of /// different nodes can operate with a consistent clock instead of using /// their own). If the key was unset, -1 is returned. - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError>; + fn get_with_time(&self, key: &str) + -> Result<(i128, time::OffsetDateTime), CellError>; /// Logs a debug message to the data store. fn log_debug(&self, message: &str); @@ -38,7 +40,7 @@ pub trait Store { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: i128, ttl: time::Duration, ) -> Result; } @@ -50,14 +52,17 @@ impl<'a, T: Store> Store for &'a mut T { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: i128, + new: i128, ttl: time::Duration, ) -> Result { (**self).compare_and_swap_with_ttl(key, old, new, ttl) } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(i128, time::OffsetDateTime), CellError> { (**self).get_with_time(key) } @@ -68,7 +73,7 @@ impl<'a, T: Store> Store for &'a mut T { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: i128, ttl: time::Duration, ) -> Result { (**self).set_if_not_exists_with_ttl(key, value, ttl) @@ -82,7 +87,7 @@ impl<'a, T: Store> Store for &'a mut T { /// mutex added if it's ever used for anything serious. #[derive(Default)] pub struct MemoryStore { - map: HashMap, + map: HashMap, verbose: bool, } @@ -103,8 +108,8 @@ impl Store for MemoryStore { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: i128, + new: i128, _: time::Duration, ) -> Result { match self.map.get(key) { @@ -116,10 +121,13 @@ impl Store for MemoryStore { Ok(true) } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(i128, time::OffsetDateTime), CellError> { match self.map.get(key) { - Some(n) => Ok((*n, time::now_utc())), - None => Ok((-1, time::now_utc())), + Some(n) => Ok((*n, OffsetDateTime::now_utc())), + None => Ok((-1, OffsetDateTime::now_utc())), } } @@ -132,7 +140,7 @@ impl Store for MemoryStore { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: i128, _: time::Duration, ) -> Result { match self.map.get(key) { @@ -154,27 +162,33 @@ pub struct InternalRedisStore<'a> { } impl<'a> InternalRedisStore<'a> { + const KEY_VERSION: &str = "cell_v2_"; + pub fn new(r: &'a redis::Redis) -> InternalRedisStore<'a> { InternalRedisStore { r } } + + pub fn versioned_key(key: &str) -> String { + Self::KEY_VERSION.to_owned() + key + } } impl<'a> Store for InternalRedisStore<'a> { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: i128, + new: i128, ttl: time::Duration, ) -> Result { - let key = self.r.open_key_writable(key); + let key = self.r.open_key_writable(&Self::versioned_key(key)); match key.read()? { Some(s) => { // While we will usually have a value here to parse, it's possible that // in the case of a very fast rate the key's already been // expired even since the beginning of this operation. // Check whether the value is empty to handle that possibility. - if !s.is_empty() && s.parse::()? == old { + if !s.is_empty() && s.parse::()? == old { // Still the old value: perform the swap. key.write(new.to_string().as_str())?; key.set_expire(ttl)?; @@ -191,16 +205,19 @@ impl<'a> Store for InternalRedisStore<'a> { } } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(i128, time::OffsetDateTime), CellError> { // TODO: currently leveraging that CommandError and CellError are the // same thing, but we should probably reconcile this. - let key = self.r.open_key(key); + let key = self.r.open_key(&Self::versioned_key(key)); match key.read()? { Some(s) => { - let n = s.parse::()?; - Ok((n, time::now_utc())) + let n = s.parse::()?; + Ok((n, OffsetDateTime::now_utc())) } - None => Ok((-1, time::now_utc())), + None => Ok((-1, OffsetDateTime::now_utc())), } } @@ -211,10 +228,10 @@ impl<'a> Store for InternalRedisStore<'a> { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: i128, ttl: time::Duration, ) -> Result { - let key = self.r.open_key_writable(key); + let key = self.r.open_key_writable(&Self::versioned_key(key)); let res = if key.is_empty()? { key.write(value.to_string().as_str())?; Ok(true) @@ -237,20 +254,17 @@ mod tests { let mut store = MemoryStore::default(); // First attempt obviously works. - let res1 = - store.compare_and_swap_with_ttl("foo", 123, 124, time::Duration::zero()); + let res1 = store.compare_and_swap_with_ttl("foo", 123, 124, time::Duration::ZERO); assert_eq!(true, res1.unwrap()); // Second attempt succeeds: we use the value we just set combined with // a new value. - let res2 = - store.compare_and_swap_with_ttl("foo", 124, 125, time::Duration::zero()); + let res2 = store.compare_and_swap_with_ttl("foo", 124, 125, time::Duration::ZERO); assert_eq!(true, res2.unwrap()); // Third attempt fails: we try to overwrite using a value that is // incorrect. - let res2 = - store.compare_and_swap_with_ttl("foo", 123, 126, time::Duration::zero()); + let res2 = store.compare_and_swap_with_ttl("foo", 123, 126, time::Duration::ZERO); assert_eq!(false, res2.unwrap()); } @@ -263,7 +277,7 @@ mod tests { // Now try setting a value. let _ = store - .set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()) + .set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO) .unwrap(); let res2 = store.get_with_time("foo"); @@ -274,10 +288,10 @@ mod tests { fn it_performs_set_if_not_exists_with_ttl() { let mut store = MemoryStore::default(); - let res1 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()); + let res1 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO); assert_eq!(true, res1.unwrap()); - let res2 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()); + let res2 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO); assert_eq!(false, res2.unwrap()); } } diff --git a/src/lib.rs b/src/lib.rs index fd5f0f1..7a852ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,12 +66,12 @@ impl Command for ThrottleCommand { // If either time had a partial component, but it up to the next full // second because otherwise a fast-paced caller could try again too // early. - let mut retry_after = rate_limit_result.retry_after.num_seconds(); - if rate_limit_result.retry_after.num_milliseconds() > 0 { + let mut retry_after = rate_limit_result.retry_after.whole_seconds(); + if rate_limit_result.retry_after.subsec_milliseconds() > 0 { retry_after += 1 } - let mut reset_after = rate_limit_result.reset_after.num_seconds(); - if rate_limit_result.reset_after.num_milliseconds() > 0 { + let mut reset_after = rate_limit_result.reset_after.whole_seconds(); + if rate_limit_result.reset_after.subsec_milliseconds() > 0 { reset_after += 1 } diff --git a/src/redis/mod.rs b/src/redis/mod.rs index a625f32..f901a51 100644 --- a/src/redis/mod.rs +++ b/src/redis/mod.rs @@ -11,6 +11,7 @@ pub mod raw; use error::CellError; use libc::{c_int, c_long, c_longlong, size_t}; +use std::convert::TryInto; use std::ptr; use std::string; use time; @@ -352,7 +353,10 @@ impl RedisKeyWritable { } pub fn set_expire(&self, expire: time::Duration) -> Result<(), CellError> { - match raw::set_expire(self.key_inner, expire.num_milliseconds()) { + match raw::set_expire( + self.key_inner, + expire.whole_milliseconds().try_into().unwrap(), + ) { raw::Status::Ok => Ok(()), // Error may occur if the key wasn't open for writing or is an