Skip to content

Commit

Permalink
Cli tool to log logical trace sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Jul 31, 2019
1 parent 1d2657c commit 583fe65
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
74 changes: 74 additions & 0 deletions tdiag/src/commands/arrangements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! "arrangements" subcommand: cli tool to extract logical arrangement
//! sizes over time.
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::DiagError;

use timely::dataflow::operators::{Filter, Map};

use differential_dataflow::collection::AsCollection;
use differential_dataflow::operators::reduce::Count;
use differential_dataflow::logging::DifferentialEvent;
use DifferentialEvent::{Merge, Batch};

use tdiag_connect::receive::ReplayWithShutdown;

/// @TODO
pub fn listen(
timely_configuration: timely::Configuration,
sockets: Vec<Option<std::net::TcpStream>>,) -> Result<(), crate::DiagError> {

let sockets = Arc::new(Mutex::new(sockets));

let is_running = Arc::new(std::sync::atomic::AtomicBool::new(true));
let is_running_w = is_running.clone();

timely::execute(timely_configuration, move |worker| {
let sockets = sockets.clone();

// create replayer from disjoint partition of source worker identifiers.
let replayer = tdiag_connect::receive::make_readers::<Duration, (Duration, timely::logging::WorkerIdentifier, DifferentialEvent)>(
tdiag_connect::receive::ReplaySource::Tcp(sockets), worker.index(), worker.peers())
.expect("failed to open tcp readers");

worker.dataflow::<Duration, _, _>(|scope| {

let window_size = Duration::from_secs(1);

replayer.replay_with_shutdown_into(scope, is_running_w.clone())
.filter(|(_, worker, _)| *worker == 0)
.flat_map(|(t, _, x)| match x {
Batch(x) => Some((x.operator, t, x.length as isize)),
Merge(x) => {
match x.complete {
None => None,
Some(complete_size) => {
let size_diff = (complete_size as isize) - (x.length1 + x.length2) as isize;

Some((x.operator, t, size_diff as isize))
}
}
}
_ => None,
})
.as_collection()
.count()
.delay(move |t| {
let w_secs = window_size.as_secs();

let secs_coarsened = if w_secs == 0 {
t.as_secs()
} else {
(t.as_secs() / w_secs + 1) * w_secs
};

Duration::new(secs_coarsened, 0)
})
.inspect(|x| println!("{:?}", x));
})
}).map_err(|x| DiagError(format!("error in the timely computation: {}", x)))?;

Ok(())
}
1 change: 1 addition & 0 deletions tdiag/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
pub mod graph;
pub mod profile;
pub mod arrangements;
19 changes: 15 additions & 4 deletions tdiag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ You can customize the interface and port for the receiver (this program) with --
.help("The output path for the generated html file (don't forget the .html extension)")
.required(true))
)
.subcommand(clap::SubCommand::with_name("profile")
.about("Print total time spent running each operator")
.subcommand(
clap::SubCommand::with_name("profile")
.about("Print total time spent running each operator")
)
.subcommand(
clap::SubCommand::with_name("arrangements")
.about("Track the logical size of arrangements over the course of a computation")
)
.get_matches();

Expand Down Expand Up @@ -79,13 +84,19 @@ You can customize the interface and port for the receiver (this program) with --
let sockets = tdiag_connect::receive::open_sockets(ip_addr, port, source_peers)?;
println!("Trace sources connected");
crate::commands::graph::listen_and_render(timely_configuration, sockets, output_path)
},
}
("profile", Some(_profile_args)) => {
println!("Listening for {} connections on {}:{}", source_peers, ip_addr, port);
let sockets = tdiag_connect::receive::open_sockets(ip_addr, port, source_peers)?;
println!("Trace sources connected");
crate::commands::profile::listen_and_profile(timely_configuration, sockets)
},
}
("arrangements", Some(_args)) => {
println!("Listening for {} connections on {}:{}", source_peers, ip_addr, port);
let sockets = tdiag_connect::receive::open_sockets(ip_addr, port, source_peers)?;
println!("Trace sources connected");
crate::commands::arrangements::listen(timely_configuration, sockets)
}
_ => panic!("Invalid subcommand"),
}
}
Expand Down

0 comments on commit 583fe65

Please sign in to comment.