diff --git a/crates/polars-stream/src/nodes/mod.rs b/crates/polars-stream/src/nodes/mod.rs index 66e64626d307..82ad0f8293e9 100644 --- a/crates/polars-stream/src/nodes/mod.rs +++ b/crates/polars-stream/src/nodes/mod.rs @@ -11,6 +11,7 @@ pub mod reduce; pub mod select; pub mod simple_projection; pub mod streaming_slice; +pub mod with_row_index; pub mod zip; /// The imports you'll always need for implementing a ComputeNode. diff --git a/crates/polars-stream/src/nodes/with_row_index.rs b/crates/polars-stream/src/nodes/with_row_index.rs new file mode 100644 index 000000000000..942d23219fec --- /dev/null +++ b/crates/polars-stream/src/nodes/with_row_index.rs @@ -0,0 +1,87 @@ +use polars_core::prelude::*; +use polars_core::utils::Container; +use polars_utils::pl_str::PlSmallStr; + +use super::compute_node_prelude::*; +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::wait_group::WaitGroup; +use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE; + +pub struct WithRowIndexNode { + name: PlSmallStr, + offset: IdxSize, +} + +impl WithRowIndexNode { + pub fn new(name: PlSmallStr, offset: Option) -> Self { + Self { + name, + offset: offset.unwrap_or(0), + } + } +} + +impl ComputeNode for WithRowIndexNode { + fn name(&self) -> &str { + "with_row_index" + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(recv.len() == 1 && send.len() == 1); + recv.swap_with_slice(send); + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv: &mut [Option>], + send: &mut [Option>], + _state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv.len() == 1 && send.len() == 1); + let mut receiver = recv[0].take().unwrap().serial(); + let senders = send[0].take().unwrap().parallel(); + + let (mut distributor, distr_receivers) = + distributor_channel(senders.len(), DEFAULT_DISTRIBUTOR_BUFFER_SIZE); + + let name = self.name.clone(); + + // To figure out the correct offsets we need to be serial. + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + while let Ok(morsel) = receiver.recv().await { + let offset = self.offset; + self.offset = self + .offset + .checked_add(morsel.df().len().try_into().unwrap()) + .unwrap(); + if distributor.send((morsel, offset)).await.is_err() { + break; + } + } + + Ok(()) + })); + + // But adding the new row index column can be done in parallel. + for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) { + let name = name.clone(); + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + let wait_group = WaitGroup::default(); + while let Ok((morsel, offset)) = recv.recv().await { + let mut morsel = + morsel.try_map(|df| df.with_row_index(name.clone(), Some(offset)))?; + morsel.set_consume_token(wait_group.token()); + if send.send(morsel).await.is_err() { + break; + } + wait_group.wait().await; + } + + Ok(()) + })); + } + } +} diff --git a/crates/polars-stream/src/physical_plan/fmt.rs b/crates/polars-stream/src/physical_plan/fmt.rs index 0ec15047f0a1..21dd8e9dd634 100644 --- a/crates/polars-stream/src/physical_plan/fmt.rs +++ b/crates/polars-stream/src/physical_plan/fmt.rs @@ -59,6 +59,14 @@ fn visualize_plan_rec( from_ref(input), ) }, + PhysNodeKind::WithRowIndex { + input, + name, + offset, + } => ( + format!("with-row-index\\nname: {name}\\noffset: {offset:?}"), + from_ref(input), + ), PhysNodeKind::InputIndependentSelect { selectors } => ( format!( "input-independent-select\\n{}", diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index c00f4adc6003..f2b532ca1ca3 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -254,18 +254,32 @@ pub fn lower_ir( expr_cache, )?; - if function.is_streamable() { - let map = Arc::new(move |df| function.evaluate(df)); - PhysNodeKind::Map { - input: phys_input, - map, - } - } else { - let map = Arc::new(move |df| function.evaluate(df)); - PhysNodeKind::InMemoryMap { + match function { + FunctionIR::RowIndex { + name, + offset, + schema: _, + } => PhysNodeKind::WithRowIndex { input: phys_input, - map, - } + name, + offset, + }, + + function if function.is_streamable() => { + let map = Arc::new(move |df| function.evaluate(df)); + PhysNodeKind::Map { + input: phys_input, + map, + } + }, + + function => { + let map = Arc::new(move |df| function.evaluate(df)); + PhysNodeKind::InMemoryMap { + input: phys_input, + map, + } + }, } }, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index d60b251c99c2..eddbc87bda99 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use polars_core::frame::DataFrame; -use polars_core::prelude::{InitHashMaps, PlHashMap, SortMultipleOptions}; +use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions}; use polars_core::schema::{Schema, SchemaRef}; use polars_error::PolarsResult; use polars_plan::plans::hive::HivePartitions; @@ -58,6 +58,12 @@ pub enum PhysNodeKind { extend_original: bool, }, + WithRowIndex { + input: PhysNodeKey, + name: PlSmallStr, + offset: Option, + }, + InputIndependentSelect { selectors: Vec, }, @@ -164,6 +170,7 @@ fn insert_multiplexers( | PhysNodeKind::FileScan { .. } | PhysNodeKind::InputIndependentSelect { .. } => {}, PhysNodeKind::Select { input, .. } + | PhysNodeKind::WithRowIndex { input, .. } | PhysNodeKind::Reduce { input, .. } | PhysNodeKind::StreamingSlice { input, .. } | PhysNodeKind::Filter { input, .. } diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 7b11e44cc04b..a4d58847033e 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -131,6 +131,18 @@ fn to_graph_rec<'a>( ) }, + WithRowIndex { + input, + name, + offset, + } => { + let input_key = to_graph_rec(*input, ctx)?; + ctx.graph.add_node( + nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset), + [input_key], + ) + }, + InputIndependentSelect { selectors } => { let phys_selectors = selectors .iter()