Skip to content

Commit

Permalink
Add comments to the logic that re-constructs the dataflow graph
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lattuada <[email protected]>
  • Loading branch information
utaal committed May 28, 2019
1 parent 400d293 commit 19795d8
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions tdiag/src/commands/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,31 @@ pub fn listen_and_render(
.flat_map(|(t, _, x)| if let Channels(event) = x { Some((event, t, 1 as isize)) } else { None })
.as_collection();

// == Fix addresses so we can connect operators outside and inside subgraphs ==

let operates = operates.map(|event| (event.addr, event.name)); // .inspect(|x| eprintln!("Operates: {:?}", x.0));

let operates_anti = operates.map(|(mut addr, _)| {
// == Re-construct the dataflow graph (re-wire channels crossing a scope boundary) ==
//
// A timely dataflow graph has a hierarchical structure: a "scope" looks like an
// operator to the outside but can contain a subgraph of operators (and other scopes)
//
// We flatten this hierarchy to display it as a simple directed graph, but preserve the
// information on scope boundaries so that they can be drawn as graph cuts.

let operates = operates.map(|event| (event.addr, event.name));

// Addresses of potential scopes (excluding leaf operators)
let scopes = operates.map(|(mut addr, _)| {
addr.pop();
addr
});
}).distinct();

let operates_without_subg = operates.antijoin(&operates_anti.distinct());
// Exclusively leaf operators
let operates_without_subg = operates.antijoin(&scopes);

let subgraphs = operates.map(|(addr, _)| (addr, ())).semijoin(&operates_anti.distinct()).map(|(addr, ())| addr);
// subgraphs.inspect(|x| eprintln!("Subgraph: {:?}", x));
// Retain only subscopes that correspond to scopes observed in the logs (remove empty [] addrs)
let subgraphs = operates.map(|(addr, _)| (addr, ())).semijoin(&scopes).map(|(addr, ())| addr);

let channels = channels.map(
|event| (event.id, (event.scope_addr, event.source, event.target))); // .inspect(|x| eprintln!("Channels: {:?}", x.0));
let channels = channels.map(|event| (event.id, (event.scope_addr, event.source, event.target)));

// Output leaf operators
{
operates_without_subg
.consolidate()
Expand All @@ -90,26 +98,39 @@ pub fn listen_and_render(
.capture_into(operators_send);
}

// Output channels
{
let subg_channels_outside_egress = channels
// Channels that enter a subscope (as seen from outside the subscope)
// Their source is the operator representing the subscope
let subg_channels_outside_ingress = channels
.map(|(id, (scope_addr, from, to))| {
let mut subscope_addr = scope_addr.clone();
subscope_addr.push(from.0);
(subscope_addr, (id, from.1, (scope_addr, to)))
subscope_addr.push(to.0);
(subscope_addr, (id, (scope_addr, from), to.1))
})
.semijoin(&subgraphs);

let subg_channels_outside_ingress = channels
// Channels that leave a subscope (as seen from outside the subscope)
// Their destination is the operator representing the subscope
let subg_channels_outside_egress = channels
.map(|(id, (scope_addr, from, to))| {
let mut subscope_addr = scope_addr.clone();
subscope_addr.push(to.0);
(subscope_addr, (id, (scope_addr, from), to.1))
subscope_addr.push(from.0);
(subscope_addr, (id, from.1, (scope_addr, to)))
})
.semijoin(&subgraphs);

// subg_channels_outside_ingress.inspect(|x| eprintln!("Subg channel ingress: {:?}", x));
// subg_channels_outside_egress.inspect(|x| eprintln!("Subg channel egress: {:?}", x));

// Join the external and internal representation of channels that enter a subscope
//
// subscope [0, 1, 4]
// +-----------------------+
// ([0, 1], 3) | ([0, 1, 4], 1) |
// o------------->|------------->o |
// external | internal |
// +-----------------------+
//
// The external channel has addr [0, 1], source 3, destination 4 ([0, 1, 4] is the subscope).
// The internal channel has addr [0, 1, 4], source 0 (special!), destination 1.
let subg_ingress = subg_channels_outside_ingress
.map(|(subscope_addr, (id, orig, subscope_port))| ((subscope_addr, (0, subscope_port)), (id, orig)))
.join_map(
Expand All @@ -120,8 +141,11 @@ pub fn listen_and_render(
let mut to_addr = scope_addr.clone();
to_addr.push(to.0);
(vec![*id1, *id2], true, orig_addr, to_addr, orig_from.1, to.1)
}); // .inspect(|x| eprintln!("Subg channel: {:?}", x));
});

// Join the external and internal representation of channels that leave a subscope
//
// The structure depicted above is inverted for channels leaving a subscope
let subg_egress = subg_channels_outside_egress
.map(|(subscope_addr, (id, subscope_port, dest))| ((subscope_addr, (0, subscope_port)), (id, dest)))
.join_map(
Expand All @@ -132,8 +156,9 @@ pub fn listen_and_render(
let mut dest_addr = dest_addr.clone();
dest_addr.push(dest_to.0);
(vec![*id1, *id2], true, from_addr, dest_addr, to.1, dest_to.1)
}); // .inspect(|x| eprintln!("Subg channel: {:?}", x));
});

// Select all other channels (those that don't enter/leave a subscope)
let non_subg = channels
.map(|(id, (scope_addr, from, to))| {
let mut subscope_addr = scope_addr.clone();
Expand Down

0 comments on commit 19795d8

Please sign in to comment.