Skip to content
This repository has been archived by the owner on Apr 2, 2018. It is now read-only.

Adds new function interval_range #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ Timer facilities for Tokio
[dependencies]
futures = "0.1"
slab = "0.3.0"
rand = "0.3"
45 changes: 40 additions & 5 deletions src/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,58 @@ use {Sleep, TimerError};

use std::time::Duration;

/// A stream representing notifications at fixed interval
use rand::{thread_rng, Rng};

/// A stream representing notifications at given interval
///
/// Intervals are created through `Timer::interval`.
#[derive(Debug)]
pub struct Interval {
sleep: Sleep,
duration: Duration,
min_duration: Duration,
max_duration: Duration,
}

/// Create a new interval
pub fn new(sleep: Sleep, dur: Duration) -> Interval {
pub fn new(sleep: Sleep, min_dur: Duration, max_dur: Duration) -> Interval {
Interval {
sleep: sleep,
duration: dur,
min_duration: min_dur,
max_duration: max_dur,
}
}

const NANOS_PER_SEC: u32 = 1_000_000_000;

/// Returns the next duration for an interval
/// If `min` and `max` are equal, the duration is fixed.
/// If `min` and `max` are not equal, a duration in the range [`min`, `max`] is returned.
///
/// # Panics
///
/// Panics if `max < min`.
pub(crate) fn next_duration(min: Duration, max: Duration) -> Duration {
let mut rng = thread_rng();

let secs = if min.as_secs() == max.as_secs() {
min.as_secs()
} else {
rng.gen_range(min.as_secs(), max.as_secs() + 1)
};

let nsecs = if min.subsec_nanos() == max.subsec_nanos() {
min.subsec_nanos()
} else if secs == min.as_secs() {
rng.gen_range(min.subsec_nanos(), NANOS_PER_SEC)
} else if secs == max.as_secs() {
rng.gen_range(0, max.subsec_nanos() + 1)
} else {
rng.gen_range(0, NANOS_PER_SEC)
};

Duration::new(secs, nsecs)
}

impl Stream for Interval {
type Item = ();
type Error = TimerError;
Expand All @@ -29,7 +64,7 @@ impl Stream for Interval {
let _ = try_ready!(self.sleep.poll());

// Reset the timeout
self.sleep = self.sleep.timer().sleep(self.duration);
self.sleep = self.sleep.timer().sleep(next_duration(self.min_duration, self.max_duration));

Ok(Async::Ready(Some(())))
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
#[macro_use]
extern crate futures;
extern crate slab;
extern crate rand;

mod interval;
mod mpmc;
Expand Down
14 changes: 12 additions & 2 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Timer {
/// Creates a new interval which will fire at `dur` time into the future,
/// and will repeat every `dur` interval after
pub fn interval(&self, dur: Duration) -> Interval {
interval::new(self.sleep(dur), dur)
interval::new(self.sleep(dur), dur, dur)
}

/// Creates a new interval which will fire at the time specified by `at`,
Expand All @@ -126,7 +126,17 @@ impl Timer {
self.sleep(Duration::from_millis(0))
};

interval::new(sleep, dur)
interval::new(sleep, dur, dur)
}

/// Creates a new interval which will fire at a time in the range [`min`, `max`] into the
/// future, and will repeat every time with a new interval in the range [`min`, `max`] after.
///
/// # Panics
///
/// Panics if `max < min`.
pub fn interval_range(&self, min: Duration, max: Duration) -> Interval {
interval::new(self.sleep(interval::next_duration(min, max)), min, max)
}
}

Expand Down