diff --git a/crates/polars-arrow/src/io/ipc/read/common.rs b/crates/polars-arrow/src/io/ipc/read/common.rs index 6b893c0e8ce3..0a1297bf1184 100644 --- a/crates/polars-arrow/src/io/ipc/read/common.rs +++ b/crates/polars-arrow/src/io/ipc/read/common.rs @@ -318,10 +318,14 @@ pub fn read_dictionary( Ok(()) } -pub fn prepare_projection( - schema: &ArrowSchema, - mut projection: Vec, -) -> (Vec, PlHashMap, ArrowSchema) { +#[derive(Clone)] +pub struct ProjectionInfo { + pub columns: Vec, + pub map: PlHashMap, + pub schema: ArrowSchema, +} + +pub fn prepare_projection(schema: &ArrowSchema, mut projection: Vec) -> ProjectionInfo { let schema = projection .iter() .map(|x| { @@ -355,7 +359,11 @@ pub fn prepare_projection( } } - (projection, map, schema) + ProjectionInfo { + columns: projection, + map, + schema, + } } pub fn apply_projection( diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index a83e1b758d80..e75fae36730e 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -305,7 +305,7 @@ fn get_message_from_block_offset<'a, R: Read + Seek>( .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err))) } -fn get_message_from_block<'a, R: Read + Seek>( +pub(super) fn get_message_from_block<'a, R: Read + Seek>( reader: &mut R, block: &arrow_format::ipc::Block, message_scratch: &'a mut Vec, diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index 88411f9b905f..f4430db7dea2 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -19,6 +19,7 @@ mod schema; mod stream; pub(crate) use common::first_dict_field; +pub use common::{prepare_projection, ProjectionInfo}; pub use error::OutOfSpecKind; pub use file::{ deserialize_footer, get_row_count, read_batch, read_file_dictionaries, read_file_metadata, diff --git a/crates/polars-arrow/src/io/ipc/read/reader.rs b/crates/polars-arrow/src/io/ipc/read/reader.rs index 8369d2960233..e9523477fe39 100644 --- a/crates/polars-arrow/src/io/ipc/read/reader.rs +++ b/crates/polars-arrow/src/io/ipc/read/reader.rs @@ -1,9 +1,9 @@ use std::io::{Read, Seek}; use polars_error::PolarsResult; -use polars_utils::aliases::PlHashMap; use super::common::*; +use super::file::{get_message_from_block, get_record_batch}; use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata}; use crate::array::Array; use crate::datatypes::ArrowSchema; @@ -16,7 +16,7 @@ pub struct FileReader { // the dictionaries are going to be read dictionaries: Option, current_block: usize, - projection: Option<(Vec, PlHashMap, ArrowSchema)>, + projection: Option, remaining: usize, data_scratch: Vec, message_scratch: Vec, @@ -32,10 +32,29 @@ impl FileReader { projection: Option>, limit: Option, ) -> Self { - let projection = projection.map(|projection| { - let (p, h, schema) = prepare_projection(&metadata.schema, projection); - (p, h, schema) - }); + let projection = + projection.map(|projection| prepare_projection(&metadata.schema, projection)); + Self { + reader, + metadata, + dictionaries: Default::default(), + projection, + remaining: limit.unwrap_or(usize::MAX), + current_block: 0, + data_scratch: Default::default(), + message_scratch: Default::default(), + } + } + + /// Creates a new [`FileReader`]. Use `projection` to only take certain columns. + /// # Panic + /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) + pub fn new_with_projection_info( + reader: R, + metadata: FileMetadata, + projection: Option, + limit: Option, + ) -> Self { Self { reader, metadata, @@ -52,7 +71,7 @@ impl FileReader { pub fn schema(&self) -> &ArrowSchema { self.projection .as_ref() - .map(|x| &x.2) + .map(|x| &x.schema) .unwrap_or(&self.metadata.schema) } @@ -66,9 +85,23 @@ impl FileReader { self.reader } + pub fn set_current_block(&mut self, idx: usize) { + self.current_block = idx; + } + + pub fn get_current_block(&self) -> usize { + self.current_block + } + + /// Get the inner memory scratches so they can be reused in a new writer. + /// This can be utilized to save memory allocations for performance reasons. + pub fn take_projection_info(&mut self) -> Option { + std::mem::take(&mut self.projection) + } + /// Get the inner memory scratches so they can be reused in a new writer. /// This can be utilized to save memory allocations for performance reasons. - pub fn get_scratches(&mut self) -> (Vec, Vec) { + pub fn take_scratches(&mut self) -> (Vec, Vec) { ( std::mem::take(&mut self.data_scratch), std::mem::take(&mut self.message_scratch), @@ -91,6 +124,43 @@ impl FileReader { }; Ok(()) } + + /// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are + /// still too see. + /// + /// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`, + /// the block will not be skipped. + pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult { + let mut remaining_offset = offset; + + for (i, block) in self.metadata.blocks.iter().enumerate() { + let message = + get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?; + let record_batch = get_record_batch(message)?; + + let length = record_batch.length()?; + let length = length as u64; + + if length > remaining_offset { + self.current_block = i; + return Ok(remaining_offset); + } + + remaining_offset -= length; + } + + self.current_block = self.metadata.blocks.len(); + Ok(remaining_offset) + } + + pub fn next_record_batch( + &mut self, + ) -> Option>> { + let block = self.metadata.blocks.get(self.current_block)?; + self.current_block += 1; + let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch); + Some(message.and_then(|m| get_record_batch(m))) + } } impl Iterator for FileReader { @@ -114,7 +184,7 @@ impl Iterator for FileReader { &mut self.reader, self.dictionaries.as_ref().unwrap(), &self.metadata, - self.projection.as_ref().map(|x| x.0.as_ref()), + self.projection.as_ref().map(|x| x.columns.as_ref()), Some(self.remaining), block, &mut self.message_scratch, @@ -122,7 +192,7 @@ impl Iterator for FileReader { ); self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default(); - let chunk = if let Some((_, map, _)) = &self.projection { + let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection { // re-order according to projection chunk.map(|chunk| apply_projection(chunk, map)) } else { diff --git a/crates/polars-arrow/src/io/ipc/read/stream.rs b/crates/polars-arrow/src/io/ipc/read/stream.rs index 87241596cdbe..b2cfb727b385 100644 --- a/crates/polars-arrow/src/io/ipc/read/stream.rs +++ b/crates/polars-arrow/src/io/ipc/read/stream.rs @@ -2,7 +2,6 @@ use std::io::Read; use arrow_format::ipc::planus::ReadAsRoot; use polars_error::{polars_bail, polars_err, PolarsError, PolarsResult}; -use polars_utils::aliases::PlHashMap; use super::super::CONTINUATION_MARKER; use super::common::*; @@ -93,7 +92,7 @@ fn read_next( dictionaries: &mut Dictionaries, message_buffer: &mut Vec, data_buffer: &mut Vec, - projection: &Option<(Vec, PlHashMap, ArrowSchema)>, + projection: &Option, scratch: &mut Vec, ) -> PolarsResult> { // determine metadata length @@ -169,7 +168,7 @@ fn read_next( batch, &metadata.schema, &metadata.ipc_schema, - projection.as_ref().map(|x| x.0.as_ref()), + projection.as_ref().map(|x| x.columns.as_ref()), None, dictionaries, metadata.version, @@ -179,7 +178,7 @@ fn read_next( scratch, ); - if let Some((_, map, _)) = projection { + if let Some(ProjectionInfo { map, .. }) = projection { // re-order according to projection chunk .map(|chunk| apply_projection(chunk, map)) @@ -238,7 +237,7 @@ pub struct StreamReader { finished: bool, data_buffer: Vec, message_buffer: Vec, - projection: Option<(Vec, PlHashMap, ArrowSchema)>, + projection: Option, scratch: Vec, } @@ -249,10 +248,8 @@ impl StreamReader { /// encounter a schema. /// To check if the reader is done, use `is_finished(self)` pub fn new(reader: R, metadata: StreamMetadata, projection: Option>) -> Self { - let projection = projection.map(|projection| { - let (p, h, schema) = prepare_projection(&metadata.schema, projection); - (p, h, schema) - }); + let projection = + projection.map(|projection| prepare_projection(&metadata.schema, projection)); Self { reader, @@ -275,7 +272,7 @@ impl StreamReader { pub fn schema(&self) -> &ArrowSchema { self.projection .as_ref() - .map(|x| &x.2) + .map(|x| &x.schema) .unwrap_or(&self.metadata.schema) } diff --git a/crates/polars-arrow/src/record_batch.rs b/crates/polars-arrow/src/record_batch.rs index f58d129831f1..2b0b8112ea9e 100644 --- a/crates/polars-arrow/src/record_batch.rs +++ b/crates/polars-arrow/src/record_batch.rs @@ -9,7 +9,7 @@ use crate::array::{Array, ArrayRef}; /// the same length, [`RecordBatchT::len`]. #[derive(Debug, Clone, PartialEq, Eq)] pub struct RecordBatchT> { - length: usize, + height: usize, arrays: Vec, } @@ -29,14 +29,14 @@ impl> RecordBatchT { /// /// # Error /// - /// I.f.f. the length does not match the length of any of the arrays - pub fn try_new(length: usize, arrays: Vec) -> PolarsResult { + /// I.f.f. the height does not match the length of any of the arrays + pub fn try_new(height: usize, arrays: Vec) -> PolarsResult { polars_ensure!( - arrays.iter().all(|arr| arr.as_ref().len() == length), + arrays.iter().all(|arr| arr.as_ref().len() == height), ComputeError: "RecordBatch requires all its arrays to have an equal number of rows", ); - Ok(Self { length, arrays }) + Ok(Self { height, arrays }) } /// returns the [`Array`]s in [`RecordBatchT`] @@ -51,7 +51,17 @@ impl> RecordBatchT { /// returns the number of rows of every array pub fn len(&self) -> usize { - self.length + self.height + } + + /// returns the number of rows of every array + pub fn height(&self) -> usize { + self.height + } + + /// returns the number of arrays + pub fn width(&self) -> usize { + self.arrays.len() } /// returns whether the columns have any rows diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 7e2d7b050dcf..0d8fef7f4c4a 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::{mem, ops}; +use polars_row::ArrayRef; use polars_utils::itertools::Itertools; use rayon::prelude::*; @@ -3334,6 +3335,31 @@ impl DataFrame { pub(crate) fn infer_height(cols: &[Column]) -> usize { cols.first().map_or(0, Column::len) } + + pub fn append_record_batch(&mut self, rb: RecordBatchT) -> PolarsResult<()> { + polars_ensure!( + rb.arrays().len() == self.width(), + InvalidOperation: "attempt to extend dataframe of width {} with record batch of width {}", + self.width(), + rb.arrays().len(), + ); + + if rb.height() == 0 { + return Ok(()); + } + + // SAFETY: + // - we don't adjust the names of the columns + // - each column gets appended the same number of rows, which is an invariant of + // record_batch. + let columns = unsafe { self.get_columns_mut() }; + for (col, arr) in columns.iter_mut().zip(rb.into_arrays()) { + let arr_series = Series::from_arrow_chunks(PlSmallStr::EMPTY, vec![arr])?.into_column(); + col.append(&arr_series)?; + } + + Ok(()) + } } pub struct RecordBatchIter<'a> { diff --git a/crates/polars-core/src/frame/upstream_traits.rs b/crates/polars-core/src/frame/upstream_traits.rs index 38b346ace652..1392f87c052f 100644 --- a/crates/polars-core/src/frame/upstream_traits.rs +++ b/crates/polars-core/src/frame/upstream_traits.rs @@ -1,5 +1,7 @@ use std::ops::{Index, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive}; +use arrow::record_batch::RecordBatchT; + use crate::prelude::*; impl FromIterator for DataFrame { @@ -22,6 +24,32 @@ impl FromIterator for DataFrame { } } +impl TryExtend>> for DataFrame { + fn try_extend>>>( + &mut self, + iter: I, + ) -> PolarsResult<()> { + for record_batch in iter { + self.append_record_batch(record_batch)?; + } + + Ok(()) + } +} + +impl TryExtend>>> for DataFrame { + fn try_extend>>>>( + &mut self, + iter: I, + ) -> PolarsResult<()> { + for record_batch in iter { + self.append_record_batch(record_batch?)?; + } + + Ok(()) + } +} + impl Index for DataFrame { type Output = Column; diff --git a/crates/polars-core/src/scalar/from.rs b/crates/polars-core/src/scalar/from.rs index 3af8671dadd1..c104c2ea8573 100644 --- a/crates/polars-core/src/scalar/from.rs +++ b/crates/polars-core/src/scalar/from.rs @@ -1,3 +1,5 @@ +use polars_utils::pl_str::PlSmallStr; + use super::{AnyValue, DataType, Scalar}; macro_rules! impl_from { @@ -25,4 +27,5 @@ impl_from! { (u64, UInt64, UInt64) (f32, Float32, Float32) (f64, Float64, Float64) + (PlSmallStr, StringOwned, String) } diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 4e039124933f..f4ef629821a9 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -45,7 +45,7 @@ pub fn get_reader_bytes( feature = "parquet", feature = "avro" ))] -pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema { +pub fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema { projection .iter() .map(|idx| schema.get_at_index(*idx).unwrap()) @@ -59,14 +59,14 @@ pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> Ar feature = "avro", feature = "parquet" ))] -pub(crate) fn columns_to_projection( - columns: &[String], +pub fn columns_to_projection>( + columns: &[T], schema: &ArrowSchema, ) -> PolarsResult> { let mut prj = Vec::with_capacity(columns.len()); for column in columns { - let i = schema.try_index_of(column)?; + let i = schema.try_index_of(column.as_ref())?; prj.push(i); } diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 70880ca78359..dc0d330d8b86 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -89,6 +89,7 @@ pub fn optimize( let simplify_expr = opt_state.contains(OptFlags::SIMPLIFY_EXPR); let slice_pushdown = opt_state.contains(OptFlags::SLICE_PUSHDOWN); let streaming = opt_state.contains(OptFlags::STREAMING); + let new_streaming = opt_state.contains(OptFlags::NEW_STREAMING); let fast_projection = opt_state.contains(OptFlags::FAST_PROJECTION); // Don't run optimizations that don't make sense on a single node. @@ -181,7 +182,7 @@ pub fn optimize( } if slice_pushdown { - let slice_pushdown_opt = SlicePushDown::new(streaming); + let slice_pushdown_opt = SlicePushDown::new(streaming, new_streaming); let alp = lp_arena.take(lp_top); let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; diff --git a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs index 9c2f8497fac8..a5ff806abae9 100644 --- a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs @@ -5,6 +5,7 @@ use crate::prelude::*; pub(super) struct SlicePushDown { streaming: bool, + new_streaming: bool, pub scratch: Vec, } @@ -59,9 +60,10 @@ fn can_pushdown_slice_past_projections(exprs: &[ExprIR], arena: &Arena) - } impl SlicePushDown { - pub(super) fn new(streaming: bool) -> Self { + pub(super) fn new(streaming: bool, new_streaming: bool) -> Self { Self { streaming, + new_streaming, scratch: vec![], } } @@ -211,6 +213,32 @@ impl SlicePushDown { Ok(lp) }, + + #[cfg(feature = "ipc")] + (Scan { + sources, + file_info, + hive_parts, + output_schema, + mut file_options, + predicate, + scan_type: scan_type @ FileScan::Ipc { .. }, + }, Some(state)) if self.new_streaming && predicate.is_none() => { + file_options.slice = Some((state.offset, state.len as usize)); + + let lp = Scan { + sources, + file_info, + hive_parts, + output_schema, + scan_type, + file_options, + predicate, + }; + + Ok(lp) + }, + // TODO! we currently skip slice pushdown if there is a predicate. (Scan { sources, diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs new file mode 100644 index 000000000000..3a83c8e3132c --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -0,0 +1,557 @@ +use std::cmp::Reverse; +use std::io::Cursor; +use std::ops::Range; +use std::sync::Arc; + +use polars_core::config; +use polars_core::frame::DataFrame; +use polars_core::prelude::{Column, DataType}; +use polars_core::scalar::Scalar; +use polars_core::utils::arrow::array::TryExtend; +use polars_core::utils::arrow::io::ipc::read::{ + prepare_projection, read_file_metadata, FileMetadata, FileReader, ProjectionInfo, +}; +use polars_error::{ErrString, PolarsError, PolarsResult}; +use polars_expr::prelude::PhysicalExpr; +use polars_expr::state::ExecutionState; +use polars_io::cloud::CloudOptions; +use polars_io::ipc::IpcScanOptions; +use polars_io::utils::columns_to_projection; +use polars_io::RowIndex; +use polars_plan::plans::hive::HivePartitions; +use polars_plan::plans::{FileInfo, ScanSources}; +use polars_plan::prelude::FileScanOptions; +use polars_utils::mmap::MemSlice; +use polars_utils::pl_str::PlSmallStr; +use polars_utils::priority::Priority; +use polars_utils::IdxSize; + +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; +use crate::morsel::{get_ideal_morsel_size, SourceToken}; +use crate::nodes::{ + ComputeNode, JoinHandle, Morsel, MorselSeq, PortState, TaskPriority, TaskScope, +}; +use crate::pipe::{RecvPort, SendPort}; +use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE}; + +const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static( + "\ +IPC file produces more than 2^32 rows; \ +consider compiling with polars-bigidx feature (polars-u64-idx package on python)", +)); + +pub struct IpcSourceNode { + sources: ScanSources, + + config: IpcSourceNodeConfig, + num_pipelines: usize, + + /// Every phase we need to be able to continue from where we left off, so we save the state of + /// the Walker task. + state: IpcSourceNodeState, +} + +pub struct IpcSourceNodeConfig { + row_index: Option, + projection_info: Option, + + rechunk: bool, + include_file_paths: Option, + + first_metadata: FileMetadata, +} + +pub struct IpcSourceNodeState { + morsel_seq: u64, + row_idx_offset: IdxSize, + + slice: Range, + + source_idx: usize, + source: Option, +} + +pub struct Source { + file_path: Option>, + + memslice: Arc, + metadata: Arc, + + block_offset: usize, +} + +impl IpcSourceNode { + #[allow(clippy::too_many_arguments)] + pub fn new( + sources: ScanSources, + _file_info: FileInfo, + _hive_parts: Option>>, // @TODO + predicate: Option>, + options: IpcScanOptions, + _cloud_options: Option, + file_options: FileScanOptions, + mut first_metadata: Option, + ) -> PolarsResult { + // These should have all been removed during lower_ir + assert!(predicate.is_none()); + assert!(!sources.is_empty()); + + let IpcScanOptions = options; + + let FileScanOptions { + slice, + with_columns, + cache: _, // @TODO + row_index, + rechunk, + file_counter: _, // @TODO + hive_options: _, // @TODO + glob: _, // @TODO + include_file_paths, + allow_missing_columns: _, // @TODO + } = file_options; + + let first_metadata = match first_metadata.take() { + Some(md) => md, + None => { + let source = sources.iter().next().unwrap(); + let source = source.to_memslice()?; + read_file_metadata(&mut std::io::Cursor::new(&*source))? + }, + }; + + let projection = with_columns + .as_ref() + .map(|cols| columns_to_projection(cols, &first_metadata.schema)) + .transpose()?; + let projection_info = projection + .as_ref() + .map(|p| prepare_projection(&first_metadata.schema, p.clone())); + + let state = IpcSourceNodeState { + morsel_seq: 0, + row_idx_offset: row_index.as_ref().map_or(0, |ri| ri.offset), + + // Always create a slice. If no slice was given, just make the biggest slice possible. + slice: slice.map_or(0..usize::MAX, |(offset, length)| { + let offset = offset as usize; + offset..offset + length + }), + + source_idx: 0, + source: None, + }; + + Ok(IpcSourceNode { + sources, + + config: IpcSourceNodeConfig { + row_index, + projection_info, + + rechunk, + include_file_paths, + + first_metadata, + }, + + num_pipelines: 0, + + state, + }) + } +} + +/// Move `slice` forward by `n` and return the slice until then. +fn slice_take(slice: &mut Range, n: usize) -> Range { + let offset = slice.start; + let length = slice.len(); + + assert!(offset < n); + + let chunk_length = (n - offset).min(length); + let rng = offset..offset + chunk_length; + *slice = 0..length - chunk_length; + + rng +} + +fn get_max_morsel_size() -> usize { + std::env::var("POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE") + .map_or_else( + |_| get_ideal_morsel_size(), + |v| { + v.parse::().expect( + "POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE does not contain valid size", + ) + }, + ) + .max(1) +} + +impl ComputeNode for IpcSourceNode { + fn name(&self) -> &str { + "ipc_source" + } + + fn initialize(&mut self, num_pipelines: usize) { + self.num_pipelines = num_pipelines; + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(recv.is_empty()); + assert_eq!(send.len(), 1); + + if self.state.slice.is_empty() || self.state.source_idx >= self.sources.len() { + send[0] = PortState::Done; + } + + if send[0] != PortState::Done { + send[0] = PortState::Ready; + } + + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv_ports: &mut [Option>], + send_ports: &mut [Option>], + _state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv_ports.is_empty()); + assert_eq!(send_ports.len(), 1); + + // Split size for morsels. + let max_morsel_size = get_max_morsel_size(); + let source_token = SourceToken::new(); + + let num_pipelines = self.num_pipelines; + let config = &self.config; + let sources = &self.sources; + let state = &mut self.state; + + /// Messages sent from Walker task to Decoder tasks. + struct BatchMessage { + memslice: Arc, + metadata: Arc, + file_path: Option>, + row_idx_offset: IdxSize, + slice: Range, + block_range: Range, + morsel_seq_base: u64, + } + + // Walker task -> Decoder tasks. + let (mut batch_tx, batch_rxs) = + distributor_channel::(num_pipelines, DEFAULT_DISTRIBUTOR_BUFFER_SIZE); + // Decoder tasks -> Distributor task. + let (mut decoded_rx, decoded_tx) = Linearizer::, Morsel>>::new( + num_pipelines, + DEFAULT_LINEARIZER_BUFFER_SIZE, + ); + // Distributor task -> output. + let mut sender = send_ports[0].take().unwrap().serial(); + + // Distributor task. + // + // Shuffles morsels from `n` producers amongst `n` consumers. + // + // If record batches in the source IPC file are large, one decoder might produce many + // morsels at the same time. At the same time, other decoders might not produce anything. + // Therefore, we would like to distribute the output of a single decoder task over the + // available output pipelines. + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + while let Some(morsel) = decoded_rx.get().await { + if sender.send(morsel.1).await.is_err() { + break; + } + } + PolarsResult::Ok(()) + })); + + // Decoder tasks. + // + // Tasks a IPC file and certain number of blocks and decodes each block as a record batch. + // Then, all record batches are concatenated into a DataFrame. If the resulting DataFrame + // is too large, which happens when we have one very large block, the DataFrame is split + // into smaller pieces an spread among the pipelines. + let decoder_tasks = decoded_tx.into_iter().zip(batch_rxs) + .map(|(mut send, mut rx)| { + let source_token = source_token.clone(); + scope.spawn_task(TaskPriority::Low, async move { + // Amortize allocations. + let mut data_scratch = Vec::new(); + let mut message_scratch = Vec::new(); + let mut projection_info = config.projection_info.clone(); + + let schema = projection_info.as_ref().map_or(config.first_metadata.schema.as_ref(), |ProjectionInfo { schema, .. }| schema); + let pl_schema = schema + .iter() + .map(|(n, f)| (n.clone(), DataType::from_arrow(&f.dtype, true))) + .collect(); + + while let Ok(m) = rx.recv().await { + let BatchMessage { + memslice: source, + metadata, + file_path, + row_idx_offset, + slice, + morsel_seq_base, + block_range, + } = m; + + let mut reader = FileReader::new_with_projection_info( + Cursor::new(source.as_ref()), + metadata.as_ref().clone(), + std::mem::take(&mut projection_info), + None, + ); + reader.set_current_block(block_range.start); + reader.set_scratches(( + std::mem::take(&mut data_scratch), + std::mem::take(&mut message_scratch), + )); + + // Create the DataFrame with the appropriate schema and append all the record + // batches to it. This will perform schema validation as well. + let mut df = DataFrame::empty_with_schema(&pl_schema); + df.try_extend(reader.by_ref().take(block_range.len()))?; + + df = df.slice(slice.start as i64, slice.len()); + + if config.rechunk { + df.rechunk_mut(); + } + + if let Some(RowIndex { name, offset: _ }) = &config.row_index { + let offset = row_idx_offset + slice.start as IdxSize; + df = df.with_row_index(name.clone(), Some(offset))?; + } + + if let Some(col) = config.include_file_paths.as_ref() { + let file_path = file_path.unwrap(); + let file_path = Scalar::from(PlSmallStr::from(file_path.as_ref())); + df.with_column(Column::new_scalar( + col.clone(), + file_path, + df.height(), + ))?; + } + + // If the block is very large, we want to split the block amongst the + // pipelines. That will at least allow some parallelism. + if df.height() > max_morsel_size && config::verbose() { + eprintln!("IPC source encountered a (too) large record batch of {} rows. Splitting and continuing.", df.height()); + } + for i in 0..df.height().div_ceil(max_morsel_size) { + let morsel = df.slice((i * max_morsel_size) as i64, max_morsel_size); + let seq = MorselSeq::new(morsel_seq_base + i as u64); + let morsel = Morsel::new( + morsel, + seq, + source_token.clone(), + ); + if send.insert(Priority(Reverse(seq), morsel)).await.is_err() { + break; + } + } + + (data_scratch, message_scratch) = reader.take_scratches(); + projection_info = reader.take_projection_info(); + } + + PolarsResult::Ok(()) + }) + }) + .collect::>(); + + // Walker task. + // + // Walks all the sources and supplies block ranges to the decoder tasks. + join_handles.push(scope.spawn_task(TaskPriority::Low, async move { + struct Batch { + row_idx_offset: IdxSize, + block_start: usize, + num_rows: usize, + } + + // Batch completion parameters + let batch_size_limit = get_ideal_morsel_size(); + let sliced_batch_size_limit = state.slice.len().div_ceil(num_pipelines); + let batch_block_limit = if sources.len() >= num_pipelines { + // If there are more files than decoder tasks, try to subdivide the files instead + // of the blocks. + usize::MAX + } else { + config.first_metadata.blocks.len().div_ceil(num_pipelines) + }; + + // Amortize allocations + let mut data_scratch = Vec::new(); + let mut message_scratch = Vec::new(); + let mut projection_info = config.projection_info.clone(); + + 'source_loop: while !state.slice.is_empty() { + let source = match state.source { + Some(ref mut source) => source, + None => { + let Some(source) = sources.get(state.source_idx) else { + break; + }; + + let file_path: Option> = config + .include_file_paths + .as_ref() + .map(|_| source.to_include_path_name().into()); + let memslice = source.to_memslice()?; + let metadata = if state.source_idx == 0 { + config.first_metadata.clone() + } else { + read_file_metadata(&mut std::io::Cursor::new(memslice.as_ref()))? + }; + + state.source.insert(Source { + file_path, + memslice: Arc::new(memslice), + metadata: Arc::new(metadata), + block_offset: 0, + }) + }, + }; + + let mut reader = FileReader::new_with_projection_info( + Cursor::new(source.memslice.as_ref()), + source.metadata.as_ref().clone(), + std::mem::take(&mut projection_info), + None, + ); + reader.set_current_block(source.block_offset); + reader.set_scratches(( + std::mem::take(&mut data_scratch), + std::mem::take(&mut message_scratch), + )); + + if state.slice.start > 0 { + // Skip over all blocks that the slice would skip anyway. + let new_offset = reader.skip_blocks_till_limit(state.slice.start as u64)?; + + state.row_idx_offset += (state.slice.start as u64 - new_offset) as IdxSize; + state.slice = new_offset as usize..new_offset as usize + state.slice.len(); + + // If we skip the entire file. Don't even try to read from it. + if reader.get_current_block() == reader.metadata().blocks.len() { + (data_scratch, message_scratch) = reader.take_scratches(); + projection_info = reader.take_projection_info(); + state.source.take(); + state.source_idx += 1; + continue; + } + } + + let mut batch = Batch { + row_idx_offset: state.row_idx_offset, + block_start: reader.get_current_block(), + num_rows: 0, + }; + + // We don't yet want to commit these values to the state in case this batch gets + // cancelled. + let mut uncommitted_slice = state.slice.clone(); + let mut uncommitted_row_idx_offset = state.row_idx_offset; + while !state.slice.is_empty() { + let mut is_batch_complete = false; + + match reader.next_record_batch() { + None if batch.num_rows == 0 => break, + + // If we have no more record batches available, we want to send what is + // left. + None => is_batch_complete = true, + Some(record_batch) => { + let rb_num_rows = record_batch?.length()? as usize; + batch.num_rows += rb_num_rows; + + // We need to ensure that we are not overflowing the IdxSize maximum + // capacity. + let rb_num_rows = IdxSize::try_from(rb_num_rows) + .map_err(|_| ROW_COUNT_OVERFLOW_ERR)?; + uncommitted_row_idx_offset = uncommitted_row_idx_offset + .checked_add(rb_num_rows) + .ok_or(ROW_COUNT_OVERFLOW_ERR)?; + }, + } + + let current_block = reader.get_current_block(); + + // Subdivide into batches for large files. + is_batch_complete |= batch.num_rows >= batch_size_limit; + // Subdivide into batches if the file is sliced. + is_batch_complete |= batch.num_rows >= sliced_batch_size_limit; + // Subdivide into batches for small files. + is_batch_complete |= current_block - batch.block_start >= batch_block_limit; + + // Batch blocks such that we send appropriately sized morsels. We guarantee a + // lower bound here, but not an upper bound. + if is_batch_complete { + let batch_slice = slice_take(&mut uncommitted_slice, batch.num_rows); + let batch_slice_len = batch_slice.len(); + let block_range = batch.block_start..current_block; + + let message = BatchMessage { + memslice: source.memslice.clone(), + metadata: source.metadata.clone(), + file_path: source.file_path.clone(), + row_idx_offset: batch.row_idx_offset, + slice: batch_slice, + morsel_seq_base: state.morsel_seq, + block_range, + }; + + if source_token.stop_requested() { + break 'source_loop; + } + + if batch_tx.send(message).await.is_err() { + // This should only happen if the receiver of the decoder + // has broken off, meaning no further input will be needed. + break 'source_loop; + } + + // Commit the changes to the state. + // Now, we know that the a decoder will process it. + // + // This might generate several morsels if the record batch is very large. + state.morsel_seq += batch_slice_len.div_ceil(max_morsel_size) as u64; + state.slice = uncommitted_slice.clone(); + state.row_idx_offset = uncommitted_row_idx_offset; + source.block_offset = current_block; + + batch = Batch { + row_idx_offset: state.row_idx_offset, + block_start: current_block, + num_rows: 0, + }; + } + } + + (data_scratch, message_scratch) = reader.take_scratches(); + projection_info = reader.take_projection_info(); + + state.source.take(); + state.source_idx += 1; + } + + drop(batch_tx); // Inform decoder tasks to stop. + for decoder_task in decoder_tasks { + decoder_task.await?; + } + + PolarsResult::Ok(()) + })); + } +} diff --git a/crates/polars-stream/src/nodes/io_sources/mod.rs b/crates/polars-stream/src/nodes/io_sources/mod.rs new file mode 100644 index 000000000000..ce14ad3b0f7a --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/mod.rs @@ -0,0 +1 @@ +pub mod ipc; diff --git a/crates/polars-stream/src/nodes/mod.rs b/crates/polars-stream/src/nodes/mod.rs index 936c0ceb3ada..effebe67c34b 100644 --- a/crates/polars-stream/src/nodes/mod.rs +++ b/crates/polars-stream/src/nodes/mod.rs @@ -5,6 +5,7 @@ pub mod in_memory_sink; pub mod in_memory_source; pub mod input_independent_select; pub mod io_sinks; +pub mod io_sources; pub mod joins; pub mod map; pub mod multiplexer; diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 95e3ae72224d..063c94081dbc 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -1,10 +1,11 @@ use std::sync::Arc; +use polars_core::frame::DataFrame; use polars_core::prelude::{InitHashMaps, PlHashMap, PlIndexMap}; use polars_core::schema::Schema; use polars_error::{polars_ensure, PolarsResult}; use polars_plan::plans::expr_ir::{ExprIR, OutputName}; -use polars_plan::plans::{AExpr, FunctionIR, IRAggExpr, IR}; +use polars_plan::plans::{AExpr, FileScan, FunctionIR, IRAggExpr, IR}; use polars_plan::prelude::{FileType, SinkType}; use polars_utils::arena::{Arena, Node}; use polars_utils::itertools::Itertools; @@ -314,23 +315,67 @@ pub fn lower_ir( sources: scan_sources, file_info, hive_parts, - output_schema, + output_schema: scan_output_schema, scan_type, - predicate, + mut predicate, file_options, } = v.clone() else { unreachable!(); }; - PhysNodeKind::FileScan { - scan_sources, - file_info, - hive_parts, - output_schema, - scan_type, - predicate, - file_options, + if scan_sources.is_empty() { + // If there are no sources, just provide an empty in-memory source with the right + // schema. + PhysNodeKind::InMemorySource { + df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())), + } + } else { + if matches!(scan_type, FileScan::Ipc { .. }) { + // @TODO: All the things the IPC source does not support yet. + if hive_parts.is_some() + || scan_sources.is_cloud_url() + || file_options.allow_missing_columns + || file_options.slice.is_some_and(|(offset, _)| offset < 0) + { + todo!(); + } + } + + // If the node itself would just filter on the whole output then there is no real + // reason to do it in the source node itself. + let do_filter_in_separate_node = + predicate.is_some() && matches!(scan_type, FileScan::Ipc { .. }); + + if do_filter_in_separate_node { + assert!(file_options.slice.is_none()); // Invariant of the scan + let predicate = predicate.take().unwrap(); + + let input = phys_sm.insert(PhysNode::new( + output_schema.clone(), + PhysNodeKind::FileScan { + scan_sources, + file_info, + hive_parts, + output_schema: scan_output_schema, + scan_type, + predicate: None, + file_options, + }, + )); + + PhysNodeKind::Filter { input, predicate } + } else { + PhysNodeKind::FileScan { + scan_sources, + file_info, + hive_parts, + output_schema: scan_output_schema, + scan_type, + predicate, + file_options, + } + } } }, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index befa9c3a93b9..b701696972a9 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -367,6 +367,23 @@ fn to_graph_rec<'a>( todo!() } }, + FileScan::Ipc { + options, + cloud_options, + metadata: first_metadata, + } => ctx.graph.add_node( + nodes::io_sources::ipc::IpcSourceNode::new( + scan_sources, + file_info, + hive_parts, + predicate, + options, + cloud_options, + file_options, + first_metadata, + )?, + [], + ), _ => todo!(), } } diff --git a/py-polars/tests/unit/io/test_lazy_ipc.py b/py-polars/tests/unit/io/test_lazy_ipc.py index 0d67b6b06f89..ec75d495ce8d 100644 --- a/py-polars/tests/unit/io/test_lazy_ipc.py +++ b/py-polars/tests/unit/io/test_lazy_ipc.py @@ -88,6 +88,7 @@ def test_ipc_list_arg(io_files_path: Path) -> None: assert df.row(0) == ("vegetables", 45, 0.5, 2) +@pytest.mark.may_fail_auto_streaming def test_scan_ipc_local_with_async( capfd: Any, monkeypatch: Any,