diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index 90e23998f07c..2a8069705d49 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -143,6 +143,10 @@ pub fn split_df_as_ref( extend_sub_chunks: bool, ) -> PolarsResult> { let total_len = df.height(); + if total_len == 0 { + return Ok(vec![df.clone()]); + } + let chunk_size = std::cmp::max(total_len / n, 1); if df.n_chunks() == n diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index e748f670ad3b..697a5fdbec2f 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -248,8 +248,7 @@ where ); ipc_stream_writer.start(&df.schema().to_arrow(self.pl_flavor), None)?; - - df.align_chunks(); + let df = chunk_df_for_writing(df, 512 * 512)?; let iter = df.iter_chunks(self.pl_flavor); for batch in iter { diff --git a/crates/polars-io/src/parquet/write.rs b/crates/polars-io/src/parquet/write.rs index c3aa6e85fb6f..9d59f577adcf 100644 --- a/crates/polars-io/src/parquet/write.rs +++ b/crates/polars-io/src/parquet/write.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::VecDeque; use std::io::Write; use std::sync::Mutex; @@ -7,7 +6,6 @@ use arrow::array::Array; use arrow::datatypes::PhysicalType; use arrow::record_batch::RecordBatch; use polars_core::prelude::*; -use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref}; use polars_core::POOL; use polars_parquet::read::ParquetError; pub use polars_parquet::write::RowGroupIter; @@ -19,6 +17,8 @@ use write::{ BrotliLevel as BrotliLevelParquet, GzipLevel as GzipLevelParquet, ZstdLevel as ZstdLevelParquet, }; +use crate::prelude::chunk_df_for_writing; + #[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct GzipLevel(u8); @@ -191,29 +191,8 @@ where /// Write the given DataFrame in the writer `W`. Returns the total size of the file. pub fn finish(self, df: &mut DataFrame) -> PolarsResult { - // ensures all chunks are aligned. - df.align_chunks(); - - let n_splits = df.height() / self.row_group_size.unwrap_or(512 * 512); - let chunked_df = if n_splits > 0 { - Cow::Owned(accumulate_dataframes_vertical_unchecked( - split_df_as_ref(df, n_splits, false)? - .into_iter() - .map(|mut df| { - // If the chunks are small enough, writing many small chunks - // leads to slow writing performance, so in that case we - // merge them. - let n_chunks = df.n_chunks(); - if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) { - df.as_single_chunk_par(); - } - df - }), - )) - } else { - Cow::Borrowed(df) - }; - let mut batched = self.batched(&df.schema())?; + let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?; + let mut batched = self.batched(&chunked_df.schema())?; batched.write_batch(&chunked_df)?; batched.finish() } diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index 114fc979b6eb..d64a702ddc17 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -1,8 +1,12 @@ +#[cfg(any(feature = "ipc_streaming", feature = "parquet"))] +use std::borrow::Cow; use std::io::Read; use std::path::{Path, PathBuf}; use once_cell::sync::Lazy; use polars_core::prelude::*; +#[cfg(any(feature = "ipc_streaming", feature = "parquet"))] +use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref}; use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; @@ -256,6 +260,40 @@ pub fn check_projected_schema( check_projected_schema_impl(a, b, projected_names, msg) } +/// Split DataFrame into chunks in preparation for writing. The chunks have a +/// maximum number of rows per chunk to ensure reasonable memory efficiency when +/// reading the resulting file, and a minimum size per chunk to ensure +/// reasonable performance when writing. +#[cfg(any(feature = "ipc_streaming", feature = "parquet"))] +pub(crate) fn chunk_df_for_writing( + df: &mut DataFrame, + row_group_size: usize, +) -> PolarsResult> { + // ensures all chunks are aligned. + df.align_chunks(); + + let n_splits = df.height() / row_group_size; + let result = if n_splits > 0 { + Cow::Owned(accumulate_dataframes_vertical_unchecked( + split_df_as_ref(df, n_splits, false)? + .into_iter() + .map(|mut df| { + // If the chunks are small enough, writing many small chunks + // leads to slow writing performance, so in that case we + // merge them. + let n_chunks = df.n_chunks(); + if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) { + df.as_single_chunk_par(); + } + df + }), + )) + } else { + Cow::Borrowed(df) + }; + Ok(result) +} + #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/py-polars/tests/unit/io/test_ipc.py b/py-polars/tests/unit/io/test_ipc.py index 8c03e3ee3136..50dc8a19db9c 100644 --- a/py-polars/tests/unit/io/test_ipc.py +++ b/py-polars/tests/unit/io/test_ipc.py @@ -13,6 +13,7 @@ from pathlib import Path from polars.type_aliases import IpcCompression + from tests.unit.conftest import MemoryUsage COMPRESSIONS = ["uncompressed", "lz4", "zstd"] @@ -266,3 +267,42 @@ def test_ipc_view_gc_14448() -> None: df.write_ipc(f, future=True) f.seek(0) assert_frame_equal(pl.read_ipc(f), df) + + +@pytest.mark.slow() +@pytest.mark.write_disk() +@pytest.mark.parametrize("stream", [True, False]) +def test_read_ipc_only_loads_selected_columns( + memory_usage_without_pyarrow: MemoryUsage, + tmp_path: Path, + stream: bool, +) -> None: + """Only requested columns are loaded by ``read_ipc()``/``read_ipc_stream()``.""" + tmp_path.mkdir(exist_ok=True) + + # Each column will be about 16MB of RAM. There's a fixed overhead tied to + # block size so smaller file sizes can be misleading in terms of memory + # usage. + series = pl.arange(0, 2_000_000, dtype=pl.Int64, eager=True) + + file_path = tmp_path / "multicolumn.ipc" + df = pl.DataFrame( + { + "a": series, + "b": series, + } + ) + write_ipc(df, stream, file_path) + del df, series + + memory_usage_without_pyarrow.reset_tracking() + + # Only load one column: + kwargs = {} + if not stream: + kwargs["memory_map"] = False + df = read_ipc(stream, str(file_path), columns=["b"], rechunk=False, **kwargs) + del df + # Only one column's worth of memory should be used; 2 columns would be + # 32_000_000 at least, but there's some overhead. + assert 16_000_000 < memory_usage_without_pyarrow.get_peak() < 23_000_000