Skip to content

Commit

Permalink
perf(rust): read_ipc memory usage tests, and writing fix (#15599)
Browse files Browse the repository at this point in the history
Co-authored-by: Itamar Turner-Trauring <[email protected]>
  • Loading branch information
itamarst and pythonspeed authored Apr 12, 2024
1 parent 48615d5 commit 8cdacc1
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 27 deletions.
4 changes: 4 additions & 0 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ pub fn split_df_as_ref(
extend_sub_chunks: bool,
) -> PolarsResult<Vec<DataFrame>> {
let total_len = df.height();
if total_len == 0 {
return Ok(vec![df.clone()]);
}

let chunk_size = std::cmp::max(total_len / n, 1);

if df.n_chunks() == n
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ where
);

ipc_stream_writer.start(&df.schema().to_arrow(self.pl_flavor), None)?;

df.align_chunks();
let df = chunk_df_for_writing(df, 512 * 512)?;
let iter = df.iter_chunks(self.pl_flavor);

for batch in iter {
Expand Down
29 changes: 4 additions & 25 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::VecDeque;
use std::io::Write;
use std::sync::Mutex;
Expand All @@ -7,7 +6,6 @@ use arrow::array::Array;
use arrow::datatypes::PhysicalType;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_core::POOL;
use polars_parquet::read::ParquetError;
pub use polars_parquet::write::RowGroupIter;
Expand All @@ -19,6 +17,8 @@ use write::{
BrotliLevel as BrotliLevelParquet, GzipLevel as GzipLevelParquet, ZstdLevel as ZstdLevelParquet,
};

use crate::prelude::chunk_df_for_writing;

#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GzipLevel(u8);
Expand Down Expand Up @@ -191,29 +191,8 @@ where

/// Write the given DataFrame in the writer `W`. Returns the total size of the file.
pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
// ensures all chunks are aligned.
df.align_chunks();

let n_splits = df.height() / self.row_group_size.unwrap_or(512 * 512);
let chunked_df = if n_splits > 0 {
Cow::Owned(accumulate_dataframes_vertical_unchecked(
split_df_as_ref(df, n_splits, false)?
.into_iter()
.map(|mut df| {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
df
}),
))
} else {
Cow::Borrowed(df)
};
let mut batched = self.batched(&df.schema())?;
let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
let mut batched = self.batched(&chunked_df.schema())?;
batched.write_batch(&chunked_df)?;
batched.finish()
}
Expand Down
38 changes: 38 additions & 0 deletions crates/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use std::borrow::Cow;
use std::io::Read;
use std::path::{Path, PathBuf};

use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -256,6 +260,40 @@ pub fn check_projected_schema(
check_projected_schema_impl(a, b, projected_names, msg)
}

/// Split DataFrame into chunks in preparation for writing. The chunks have a
/// maximum number of rows per chunk to ensure reasonable memory efficiency when
/// reading the resulting file, and a minimum size per chunk to ensure
/// reasonable performance when writing.
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
pub(crate) fn chunk_df_for_writing(
df: &mut DataFrame,
row_group_size: usize,
) -> PolarsResult<Cow<DataFrame>> {
// ensures all chunks are aligned.
df.align_chunks();

let n_splits = df.height() / row_group_size;
let result = if n_splits > 0 {
Cow::Owned(accumulate_dataframes_vertical_unchecked(
split_df_as_ref(df, n_splits, false)?
.into_iter()
.map(|mut df| {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
df
}),
))
} else {
Cow::Borrowed(df)
};
Ok(result)
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;
Expand Down
40 changes: 40 additions & 0 deletions py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path

from polars.type_aliases import IpcCompression
from tests.unit.conftest import MemoryUsage

COMPRESSIONS = ["uncompressed", "lz4", "zstd"]

Expand Down Expand Up @@ -266,3 +267,42 @@ def test_ipc_view_gc_14448() -> None:
df.write_ipc(f, future=True)
f.seek(0)
assert_frame_equal(pl.read_ipc(f), df)


@pytest.mark.slow()
@pytest.mark.write_disk()
@pytest.mark.parametrize("stream", [True, False])
def test_read_ipc_only_loads_selected_columns(
memory_usage_without_pyarrow: MemoryUsage,
tmp_path: Path,
stream: bool,
) -> None:
"""Only requested columns are loaded by ``read_ipc()``/``read_ipc_stream()``."""
tmp_path.mkdir(exist_ok=True)

# Each column will be about 16MB of RAM. There's a fixed overhead tied to
# block size so smaller file sizes can be misleading in terms of memory
# usage.
series = pl.arange(0, 2_000_000, dtype=pl.Int64, eager=True)

file_path = tmp_path / "multicolumn.ipc"
df = pl.DataFrame(
{
"a": series,
"b": series,
}
)
write_ipc(df, stream, file_path)
del df, series

memory_usage_without_pyarrow.reset_tracking()

# Only load one column:
kwargs = {}
if not stream:
kwargs["memory_map"] = False
df = read_ipc(stream, str(file_path), columns=["b"], rechunk=False, **kwargs)
del df
# Only one column's worth of memory should be used; 2 columns would be
# 32_000_000 at least, but there's some overhead.
assert 16_000_000 < memory_usage_without_pyarrow.get_peak() < 23_000_000

0 comments on commit 8cdacc1

Please sign in to comment.