|
| 1 | +//! "profile" subcommand |
| 2 | +
|
| 3 | +use std::sync::{Arc, Mutex}; |
| 4 | + |
| 5 | +use crate::DiagError; |
| 6 | + |
| 7 | +use timely::dataflow::operators::{Map, Filter, generic::Operator}; |
| 8 | + |
| 9 | +use differential_dataflow::trace::TraceReader; |
| 10 | +use differential_dataflow::collection::AsCollection; |
| 11 | +use differential_dataflow::operators::{Join, reduce::Threshold, Consolidate, arrange::{Arrange, Arranged}}; |
| 12 | + |
| 13 | +use timely::logging::TimelyEvent::{Operates, Schedule}; |
| 14 | + |
| 15 | +use tdiag_connect::receive::ReplayWithShutdown; |
| 16 | + |
| 17 | +// static GRAPH_HTML: &str = include_str!("graph/dataflow-graph.html"); |
| 18 | + |
| 19 | +/// Creates TODO |
| 20 | +/// |
| 21 | +/// 1. Listens to incoming connection from a timely-dataflow program with |
| 22 | +/// logging enabled; |
| 23 | +/// 2. runs a differential-dataflow program TODO |
| 24 | +// /// TODO This module includes `graph/dataflow-graph.html` as a static resource. |
| 25 | +pub fn listen_and_compute( |
| 26 | + timely_configuration: timely::Configuration, |
| 27 | + sockets: Vec<Option<std::net::TcpStream>>) -> Result<(), crate::DiagError> { |
| 28 | + |
| 29 | + let sockets = Arc::new(Mutex::new(sockets)); |
| 30 | + |
| 31 | + let (output_send, output_recv) = ::std::sync::mpsc::channel(); |
| 32 | + let output_send = Arc::new(Mutex::new(output_send)); |
| 33 | + |
| 34 | + // let (channels_send, channels_recv) = ::std::sync::mpsc::channel(); |
| 35 | + // let channels_send = Arc::new(Mutex::new(channels_send)); |
| 36 | + |
| 37 | + let is_running = Arc::new(std::sync::atomic::AtomicBool::new(true)); |
| 38 | + let is_running_w = is_running.clone(); |
| 39 | + |
| 40 | + let worker_handles = timely::execute(timely_configuration, move |worker| { |
| 41 | + let output_send: std::sync::mpsc::Sender<_> = output_send.lock().expect("cannot lock output_send").clone(); |
| 42 | + |
| 43 | + let sockets = sockets.clone(); |
| 44 | + |
| 45 | + // create replayer from disjoint partition of source worker identifiers. |
| 46 | + let replayer = tdiag_connect::receive::make_readers::< |
| 47 | + // TODO: type alias |
| 48 | + std::time::Duration, (std::time::Duration, timely::logging::WorkerIdentifier, timely::logging::TimelyEvent)>( |
| 49 | + tdiag_connect::receive::ReplaySource::Tcp(sockets), worker.index(), worker.peers()) |
| 50 | + .expect("failed to open tcp readers"); |
| 51 | + |
| 52 | + let profile_trace = worker.dataflow(|scope| { |
| 53 | + let stream = replayer.replay_with_shutdown_into(scope, is_running_w.clone()); |
| 54 | + |
| 55 | + let operates = stream |
| 56 | + .filter(|(_, w, _)| *w== 0) |
| 57 | + .flat_map(|(t, _, x)| if let Operates(event) = x { Some((event, t, 1 as isize)) } else { None }) |
| 58 | + .as_collection(); |
| 59 | + |
| 60 | + let schedule = stream |
| 61 | + .flat_map(|(t, w, x)| if let Schedule(event) = x { Some((t, w, event)) } else { None }) |
| 62 | + .unary(timely::dataflow::channels::pact::Pipeline, "Schedules", |_,_| { |
| 63 | + let mut map = std::collections::HashMap::new(); |
| 64 | + let mut vec = Vec::new(); |
| 65 | + move |input, output| { |
| 66 | + input.for_each(|time, data| { |
| 67 | + data.swap(&mut vec); |
| 68 | + let mut session = output.session(&time); |
| 69 | + for (ts, worker, event) in vec.drain(..) { |
| 70 | + let key = (worker, event.id); |
| 71 | + match event.start_stop { |
| 72 | + timely::logging::StartStop::Start => { |
| 73 | + assert!(!map.contains_key(&key)); |
| 74 | + map.insert(key, ts); |
| 75 | + }, |
| 76 | + timely::logging::StartStop::Stop => { |
| 77 | + assert!(map.contains_key(&key)); |
| 78 | + let end = map.remove(&key).unwrap(); |
| 79 | + let ts_clip = std::time::Duration::from_secs(ts.as_secs() + 1); |
| 80 | + let elapsed = ts - end; |
| 81 | + let elapsed_ns = (elapsed.as_secs() as isize) * 1_000_000_000 + (elapsed.subsec_nanos() as isize); |
| 82 | + session.give((key.1, ts_clip, elapsed_ns)); |
| 83 | + } |
| 84 | + } |
| 85 | + } |
| 86 | + }); |
| 87 | + } |
| 88 | + }).as_collection().consolidate(); // (operator_id) |
| 89 | + |
| 90 | + // FIXME |
| 91 | + // == Re-construct the dataflow graph (re-wire channels crossing a scope boundary) == |
| 92 | + // |
| 93 | + // A timely dataflow graph has a hierarchical structure: a "scope" looks like an |
| 94 | + // operator to the outside but can contain a subgraph of operators (and other scopes) |
| 95 | + // |
| 96 | + // We flatten this hierarchy to display it as a simple directed graph, but preserve the |
| 97 | + // information on scope boundaries so that they can be drawn as graph cuts. |
| 98 | + |
| 99 | + let operates = operates.map(|event| (event.addr, (event.id, event.name))); |
| 100 | + |
| 101 | + // Addresses of potential scopes (excluding leaf operators) |
| 102 | + let scopes = operates.map(|(mut addr, _)| { |
| 103 | + addr.pop(); |
| 104 | + addr |
| 105 | + }).distinct(); |
| 106 | + |
| 107 | + // Exclusively leaf operators |
| 108 | + let operates_without_subg = operates.antijoin(&scopes).map(|(addr, (id, name))| (id, (addr, name, false))); |
| 109 | + let subg = operates.semijoin(&scopes).map(|(addr, (id, name))| (id, (addr, name, true))); |
| 110 | + |
| 111 | + let all_operators = operates_without_subg.concat(&subg).distinct(); |
| 112 | + |
| 113 | + use differential_dataflow::trace::implementations::ord::OrdKeySpine; |
| 114 | + let Arranged { trace: profile_trace, .. } = all_operators.semijoin(&schedule) |
| 115 | + .map(|(id, (addr, name, is_scope))| (id, addr, name, is_scope)) |
| 116 | + .consolidate() |
| 117 | + .arrange::<OrdKeySpine<_, _, _>>(); |
| 118 | + |
| 119 | + profile_trace |
| 120 | + }); |
| 121 | + |
| 122 | + while worker.step() { } |
| 123 | + |
| 124 | + let mut profile_trace = profile_trace; |
| 125 | + |
| 126 | + profile_trace.distinguish_since(&[]); |
| 127 | + |
| 128 | + let (mut cursor, storage) = profile_trace.cursor(); |
| 129 | + |
| 130 | + use differential_dataflow::trace::cursor::Cursor; |
| 131 | + while cursor.key_valid(&storage) { |
| 132 | + let key = cursor.key(&storage); |
| 133 | + if cursor.val_valid(&storage) { |
| 134 | + let mut ns = 0; |
| 135 | + cursor.map_times(&storage, |_, r| ns += r); |
| 136 | + output_send.send((key.clone(), ns)).expect("failed to send output to mpsc channel"); |
| 137 | + } |
| 138 | + cursor.step_key(&storage); |
| 139 | + } |
| 140 | + |
| 141 | + }).map_err(|x| DiagError(format!("error in the timely computation: {}", x)))?; |
| 142 | + |
| 143 | + { |
| 144 | + use std::io; |
| 145 | + use std::io::prelude::*; |
| 146 | + |
| 147 | + let mut stdin = io::stdin(); |
| 148 | + let mut stdout = io::stdout(); |
| 149 | + |
| 150 | + write!(stdout, "Press enter to stop collecting profile data (this will crash the source computation if it hasn't terminated).") |
| 151 | + .expect("failed to write to stdout"); |
| 152 | + stdout.flush().unwrap(); |
| 153 | + |
| 154 | + // Read a single byte and discard |
| 155 | + let _ = stdin.read(&mut [0u8]).expect("failed to read from stdin"); |
| 156 | + } |
| 157 | + |
| 158 | + is_running.store(false, std::sync::atomic::Ordering::Release); |
| 159 | + |
| 160 | + worker_handles.join().into_iter().collect::<Result<Vec<_>, _>>().expect("Timely error"); |
| 161 | + |
| 162 | + let mut data = output_recv.into_iter().collect::<Vec<_>>(); |
| 163 | + data.sort_unstable_by_key(|&(_, ns)| std::cmp::Reverse(ns)); |
| 164 | + for ((id, addr, name, is_scope), ns) in data.into_iter() { |
| 165 | + println!("{}\t{}\t(id={}, addr={:?}):\t{:e} s", |
| 166 | + if is_scope { "[scope]" } else { "" }, |
| 167 | + name, |
| 168 | + id, |
| 169 | + addr, |
| 170 | + (ns as f64) / 1_000_000_000f64); |
| 171 | + } |
| 172 | + |
| 173 | + Ok(()) |
| 174 | +} |
0 commit comments