From 4cf70b617376900a0a264641d15d26352c69ceaf Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Thu, 27 Jun 2024 17:45:56 +0200 Subject: [PATCH] improve wording --- crates/polars-stream/src/execute.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/polars-stream/src/execute.rs b/crates/polars-stream/src/execute.rs index cb9304f7c93c..baeeed61ea53 100644 --- a/crates/polars-stream/src/execute.rs +++ b/crates/polars-stream/src/execute.rs @@ -20,19 +20,19 @@ fn find_pipeline_blockers(graph: &Graph) -> Vec { .outputs .iter() .all(|o| graph.pipes[*o].send_state != PortState::Ready); - let has_ready_input = node + let has_input_ready = node .inputs .iter() .any(|o| graph.pipes[*o].send_state == PortState::Ready); - if no_output_ready && has_ready_input { + if no_output_ready && has_input_ready { blockers.push(node_key); } } blockers } -/// Given a set of nodes expand this set to all nodes which are inputs to the -/// set and whose connecting pipe is ready on both sides. +/// Given a set of nodes expand this set with all nodes which are inputs to the +/// set and whose connecting pipe is ready on both sides, recursively. /// /// Returns the set of nodes as well as the pipes connecting them. fn expand_ready_subgraph( @@ -61,8 +61,8 @@ fn expand_ready_subgraph( /// Finds a part of the graph which we can run. fn find_runnable_subgraph(graph: &mut Graph) -> (PlHashSet, Vec) { - // Find pipeline blockers, choose a subset with at most expensive pipeline - // blocker, and return the subgraph needed to feed them. + // Find pipeline blockers, choose a subset with at most one memory intensive + // pipeline blocker, and return the subgraph needed to feed them. let blockers = find_pipeline_blockers(graph); let (expensive, cheap): (Vec<_>, Vec<_>) = blockers.into_iter().partition(|b| { graph.nodes[*b] @@ -137,6 +137,7 @@ fn run_subgraph( } } + // Spawn a task per pipeline. for pipeline in 0..num_pipes { join_handles.push(node.compute.spawn( scope,