Skip to content

Commit

Permalink
Merge pull request TimelyDataflow#19 from namibj/arrangement-fix-outp…
Browse files Browse the repository at this point in the history
…ut-interval

Fix arrangement profiling's --output-interval
  • Loading branch information
comnik authored Feb 10, 2021
2 parents c65b6db + 56e2309 commit 0fa322c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
14 changes: 5 additions & 9 deletions tdiag/src/commands/arrangements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::convert::TryFrom;

use crate::DiagError;

Expand Down Expand Up @@ -60,8 +61,6 @@ pub fn listen(
)
.expect("failed to open differential tcp readers");

let window_size = Duration::from_millis(output_interval_ms);

worker.dataflow::<Duration, _, _>(|scope| {
let operates = timely_replayer
.replay_with_shutdown_into(scope, is_running_w.clone())
Expand Down Expand Up @@ -109,15 +108,12 @@ pub fn listen(
})
.as_collection()
.delay(move |t| {
let w_secs = window_size.as_secs();
let timestamp: u64 = u64::try_from(t.as_millis())
.expect("Why are the timestamps larger than humans are old?");

let secs_coarsened = if w_secs == 0 {
t.as_secs()
} else {
(t.as_secs() / w_secs + 1) * w_secs
};
let window_idx = (timestamp / output_interval_ms) + 1;

Duration::new(secs_coarsened, 0)
Duration::from_millis(window_idx * output_interval_ms)
})
.count()
.inner
Expand Down
2 changes: 1 addition & 1 deletion tdiag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ You can customize the interface and port for the receiver (this program) with --
.value_name("MS")
.help("Interval (in ms) at which to print arrangement sizes; defaults to 1000ms")
.default_value("1000"))
.help("
.after_help("
Add the following snippet to your Differential computation:
```
Expand Down

0 comments on commit 0fa322c

Please sign in to comment.