Skip to content

Commit

Permalink
[WIP] Track time spent merging
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Jul 31, 2019
1 parent 32e04c8 commit c756b68
Showing 1 changed file with 74 additions and 12 deletions.
86 changes: 74 additions & 12 deletions tdiag/src/commands/arrangements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::time::Duration;

use crate::DiagError;

use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::operators::{Filter, Map};
use timely::logging::{TimelyEvent, WorkerIdentifier};
use TimelyEvent::Operates;

use differential_dataflow::collection::AsCollection;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::operators::{Count, Join};
use DifferentialEvent::{Batch, Merge};
use differential_dataflow::operators::{Consolidate, Count, Join};
use DifferentialEvent::{Batch, Merge, MergeShortfall};

use tdiag_connect::receive::ReplayWithShutdown;

Expand Down Expand Up @@ -58,31 +59,92 @@ pub fn listen(

let operates = timely_replayer
.replay_with_shutdown_into(scope, is_running_w.clone())
.filter(|(_, worker, _)| *worker == 0)
.flat_map(|(t, _, x)| {
.flat_map(|(t, worker, x)| {
if let Operates(event) = x {
Some(((event.id, format!("{} ({:?})", event.name, event.addr)), t, 1 as isize))
Some((
(
(worker, event.id),
format!("{} ({:?})", event.name, event.addr),
),
t,
1 as isize,
))
} else {
None
}
})
.as_collection();

differential_replayer
.replay_with_shutdown_into(scope, is_running_w.clone())
.filter(|(_, worker, _)| *worker == 0)
.flat_map(|(t, _, x)| match x {
Batch(x) => Some((x.operator, t, x.length as isize)),
let events =
differential_replayer.replay_with_shutdown_into(scope, is_running_w.clone());

// Track time spent merging.
events
.flat_map(|(t, w, event)| {
if let Merge(event) = event {
Some((t, w, event))
} else {
None
}
})
.unary(
timely::dataflow::channels::pact::Pipeline,
"MergeTimes",
|_, _| {
let mut map = std::collections::HashMap::new();
let mut vec = Vec::new();

move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vec);
let mut session = output.session(&time);

for (ts, worker, event) in vec.drain(..) {
let key = (worker, event.operator);

match event.complete {
None => {
assert!(!map.contains_key(&key));
map.insert(key, ts);
}
Some(_) => {
assert!(map.contains_key(&key));
let end = map.remove(&key).unwrap();
let ts_clip =
std::time::Duration::from_secs(ts.as_secs() + 1);
let elapsed = ts - end;
let elapsed_ns = (elapsed.as_secs() as isize)
* 1_000_000_000
+ (elapsed.subsec_nanos() as isize);
session.give((key.1, ts_clip, elapsed_ns));
}
}
}
});
}
},
)
.as_collection()
.consolidate()
.inspect(|x| println!("time {:?}", x));

// Track sizes.
events
.flat_map(|(t, worker, x)| match x {
Batch(x) => Some(((worker, x.operator), t, x.length as isize)),
Merge(x) => match x.complete {
None => None,
Some(complete_size) => {
let size_diff =
(complete_size as isize) - (x.length1 + x.length2) as isize;

Some((x.operator, t, size_diff as isize))
Some(((worker, x.operator), t, size_diff as isize))
}
},
_ => None,
MergeShortfall(x) => {
eprintln!("MergeShortfall {:?}", x);
None
}
})
.as_collection()
.count()
Expand Down

0 comments on commit c756b68

Please sign in to comment.