Skip to content

Commit

Permalink
Join operator names
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Jul 31, 2019
1 parent 583fe65 commit d7ee392
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
86 changes: 62 additions & 24 deletions tdiag/src/commands/arrangements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,90 @@ use std::time::Duration;
use crate::DiagError;

use timely::dataflow::operators::{Filter, Map};
use timely::logging::{TimelyEvent, WorkerIdentifier};
use TimelyEvent::Operates;

use differential_dataflow::collection::AsCollection;
use differential_dataflow::operators::reduce::Count;
use differential_dataflow::logging::DifferentialEvent;
use DifferentialEvent::{Merge, Batch};
use differential_dataflow::operators::{Count, Join};
use DifferentialEvent::{Batch, Merge};

use tdiag_connect::receive::ReplayWithShutdown;

/// @TODO
pub fn listen(
timely_configuration: timely::Configuration,
sockets: Vec<Option<std::net::TcpStream>>,) -> Result<(), crate::DiagError> {

let sockets = Arc::new(Mutex::new(sockets));
timely_sockets: Vec<Option<std::net::TcpStream>>,
differential_sockets: Vec<Option<std::net::TcpStream>>,
) -> Result<(), crate::DiagError> {
let timely_sockets = Arc::new(Mutex::new(timely_sockets));
let differential_sockets = Arc::new(Mutex::new(differential_sockets));

let is_running = Arc::new(std::sync::atomic::AtomicBool::new(true));
let is_running_w = is_running.clone();

timely::execute(timely_configuration, move |worker| {
let sockets = sockets.clone();
let timely_sockets = timely_sockets.clone();
let differential_sockets = differential_sockets.clone();

// create replayer from disjoint partition of source worker identifiers.
let replayer = tdiag_connect::receive::make_readers::<Duration, (Duration, timely::logging::WorkerIdentifier, DifferentialEvent)>(
tdiag_connect::receive::ReplaySource::Tcp(sockets), worker.index(), worker.peers())
.expect("failed to open tcp readers");

worker.dataflow::<Duration, _, _>(|scope| {
let timely_replayer = tdiag_connect::receive::make_readers::<
Duration,
(Duration, WorkerIdentifier, TimelyEvent),
>(
tdiag_connect::receive::ReplaySource::Tcp(timely_sockets),
worker.index(),
worker.peers(),
)
.expect("failed to open timely tcp readers");

let differential_replayer = tdiag_connect::receive::make_readers::<
Duration,
(Duration, WorkerIdentifier, DifferentialEvent),
>(
tdiag_connect::receive::ReplaySource::Tcp(differential_sockets),
worker.index(),
worker.peers(),
)
.expect("failed to open differential tcp readers");

worker.dataflow::<Duration, _, _>(|scope| {
let window_size = Duration::from_secs(1);

replayer.replay_with_shutdown_into(scope, is_running_w.clone())

let operates = timely_replayer
.replay_with_shutdown_into(scope, is_running_w.clone())
.filter(|(_, worker, _)| *worker == 0)
.flat_map(|(t, _, x)| {
if let Operates(event) = x {
Some(((event.id, event.name), t, 1 as isize))
} else {
None
}
})
.as_collection();

differential_replayer
.replay_with_shutdown_into(scope, is_running_w.clone())
.filter(|(_, worker, _)| *worker == 0)
.flat_map(|(t, _, x)| match x {
Batch(x) => Some((x.operator, t, x.length as isize)),
Merge(x) => {
match x.complete {
None => None,
Some(complete_size) => {
let size_diff = (complete_size as isize) - (x.length1 + x.length2) as isize;

Some((x.operator, t, size_diff as isize))
}
Merge(x) => match x.complete {
None => None,
Some(complete_size) => {
let size_diff =
(complete_size as isize) - (x.length1 + x.length2) as isize;

Some((x.operator, t, size_diff as isize))
}
}
},
_ => None,
})
.as_collection()
.count()
.inner
// We do not bother with retractions here, because the
// user is only interested in the current count.
.filter(|(_, _, count)| count >= &0)
.as_collection()
.delay(move |t| {
let w_secs = window_size.as_secs();

Expand All @@ -66,9 +102,11 @@ pub fn listen(

Duration::new(secs_coarsened, 0)
})
.join(&operates)
.inspect(|x| println!("{:?}", x));
})
}).map_err(|x| DiagError(format!("error in the timely computation: {}", x)))?;
})
.map_err(|x| DiagError(format!("error in the timely computation: {}", x)))?;

Ok(())
}
26 changes: 22 additions & 4 deletions tdiag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,30 @@ You can customize the interface and port for the receiver (this program) with --
crate::commands::profile::listen_and_profile(timely_configuration, sockets)
}
("arrangements", Some(_args)) => {
println!("Listening for {} connections on {}:{}", source_peers, ip_addr, port);
let sockets = tdiag_connect::receive::open_sockets(ip_addr, port, source_peers)?;
println!(
"Listening for {} Timely connections on {}:{}",
source_peers, ip_addr, port
);
let timely_sockets =
tdiag_connect::receive::open_sockets(ip_addr.clone(), port, source_peers)?;

println!(
"Listening for {} Differential connections on {}:{}",
source_peers,
ip_addr,
port + 1
);
let differential_sockets =
tdiag_connect::receive::open_sockets(ip_addr.clone(), port + 1, source_peers)?;

println!("Trace sources connected");
crate::commands::arrangements::listen(timely_configuration, sockets)
crate::commands::arrangements::listen(
timely_configuration,
timely_sockets,
differential_sockets,
)
}
_ => panic!("Invalid subcommand"),
_ => panic!("Invalid subcommand"),
}
}

Expand Down

0 comments on commit d7ee392

Please sign in to comment.