From 81154edc6e218ec56cd64cee039d3799c6b2b2c0 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Mon, 21 Oct 2024 23:48:26 +1100 Subject: [PATCH] refactor(rust): Reduce memcopy in parquet (#19350) --- crates/polars-io/src/csv/read/read_impl.rs | 2 +- .../src/csv/read/schema_inference.rs | 4 +-- crates/polars-io/src/mmap.rs | 26 +++++++++---------- .../polars-io/src/parquet/read/read_impl.rs | 12 +++------ crates/polars-io/src/utils/other.rs | 18 +++++-------- crates/polars-utils/src/mmap.rs | 6 +++++ 6 files changed, 31 insertions(+), 37 deletions(-) diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 9a7508b57af2..520f6bee729d 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -191,7 +191,7 @@ impl<'a> CoreReader<'a> { if let Some(b) = decompress(&reader_bytes, total_n_rows, separator, quote_char, eol_char) { - reader_bytes = ReaderBytes::Owned(b); + reader_bytes = ReaderBytes::Owned(b.into()); } } diff --git a/crates/polars-io/src/csv/read/schema_inference.rs b/crates/polars-io/src/csv/read/schema_inference.rs index 3684df1a2bac..a01ec5ddef3f 100644 --- a/crates/polars-io/src/csv/read/schema_inference.rs +++ b/crates/polars-io/src/csv/read/schema_inference.rs @@ -296,7 +296,7 @@ fn infer_file_schema_inner( buf.push(eol_char); return infer_file_schema_inner( - &ReaderBytes::Owned(buf), + &ReaderBytes::Owned(buf.into()), separator, max_read_rows, has_header, @@ -481,7 +481,7 @@ fn infer_file_schema_inner( rb.extend_from_slice(reader_bytes); rb.push(eol_char); return infer_file_schema_inner( - &ReaderBytes::Owned(rb), + &ReaderBytes::Owned(rb.into()), separator, max_read_rows, has_header, diff --git a/crates/polars-io/src/mmap.rs b/crates/polars-io/src/mmap.rs index df91f32942f9..2373257469e7 100644 --- a/crates/polars-io/src/mmap.rs +++ b/crates/polars-io/src/mmap.rs @@ -1,9 +1,8 @@ use std::fs::File; use std::io::{BufReader, Cursor, Read, Seek}; -use std::sync::Arc; use polars_core::config::verbose; -use polars_utils::mmap::{MMapSemaphore, MemSlice}; +use polars_utils::mmap::MemSlice; /// Trait used to get a hold to file handler or to the underlying bytes /// without performing a Read. @@ -67,8 +66,7 @@ impl MmapBytesReader for &mut T { // Handle various forms of input bytes pub enum ReaderBytes<'a> { Borrowed(&'a [u8]), - Owned(Vec), - Mapped(MMapSemaphore, &'a File), + Owned(MemSlice), } impl std::ops::Deref for ReaderBytes<'_> { @@ -77,19 +75,21 @@ impl std::ops::Deref for ReaderBytes<'_> { match self { Self::Borrowed(ref_bytes) => ref_bytes, Self::Owned(vec) => vec, - Self::Mapped(mmap, _) => mmap.as_ref(), } } } -/// Require 'static to force the caller to do any transmute as it's usually much -/// clearer to see there whether it's sound. +/// There are some places that perform manual lifetime management after transmuting `ReaderBytes` +/// to have a `'static` inner lifetime. The advantage to doing this is that it lets you construct a +/// `MemSlice` from the `ReaderBytes` in a zero-copy manner regardless of the underlying enum +/// variant. impl ReaderBytes<'static> { - pub fn into_mem_slice(self) -> MemSlice { + /// Construct a `MemSlice` in a zero-copy manner from the underlying bytes, with the assumption + /// that the underlying bytes have a `'static` lifetime. + pub fn to_memslice(&self) -> MemSlice { match self { ReaderBytes::Borrowed(v) => MemSlice::from_static(v), - ReaderBytes::Owned(v) => MemSlice::from_vec(v), - ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(Arc::new(v)), + ReaderBytes::Owned(v) => v.clone(), } } } @@ -104,16 +104,14 @@ impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> { }, None => { if let Some(f) = m.to_file() { - let f = unsafe { std::mem::transmute::<&File, &'a File>(f) }; - let mmap = MMapSemaphore::new_from_file(f).unwrap(); - ReaderBytes::Mapped(mmap, f) + ReaderBytes::Owned(MemSlice::from_file(f).unwrap()) } else { if verbose() { eprintln!("could not memory map file; read to buffer.") } let mut buf = vec![]; m.read_to_end(&mut buf).expect("could not read"); - ReaderBytes::Owned(buf) + ReaderBytes::Owned(MemSlice::from_vec(buf)) } }, } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 7d926e434a72..aa86bfccce51 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -15,7 +15,6 @@ use polars_parquet::parquet::statistics::Statistics; use polars_parquet::read::{ self, ColumnChunkMetadata, FileMetadata, Filter, PhysicalType, RowGroupMetadata, }; -use polars_utils::mmap::MemSlice; use rayon::prelude::*; #[cfg(feature = "cloud")] @@ -908,10 +907,9 @@ pub fn read_parquet( } let reader = ReaderBytes::from(&mut reader); - let store = mmap::ColumnStore::Local( - unsafe { std::mem::transmute::, ReaderBytes<'static>>(reader) } - .into_mem_slice(), - ); + let store = mmap::ColumnStore::Local(unsafe { + std::mem::transmute::, ReaderBytes<'static>>(reader).to_memslice() + }); let dfs = rg_to_dfs( &store, @@ -959,9 +957,7 @@ impl FetchRowGroupsFromMmapReader { fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { // @TODO: we can something smarter here with mmap - Ok(mmap::ColumnStore::Local(MemSlice::from_vec( - self.0.deref().to_vec(), - ))) + Ok(mmap::ColumnStore::Local(self.0.to_memslice())) } } diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 023d61fe525b..7033d55e1b0b 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -6,14 +6,14 @@ use once_cell::sync::Lazy; use polars_core::prelude::*; #[cfg(any(feature = "ipc_streaming", feature = "parquet"))] use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref}; -use polars_utils::mmap::MMapSemaphore; +use polars_utils::mmap::{MMapSemaphore, MemSlice}; use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; -pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( - reader: &'a mut R, -) -> PolarsResult> { +pub fn get_reader_bytes( + reader: &mut R, +) -> PolarsResult> { // we have a file so we can mmap // only seekable files are mmap-able if let Some((file, offset)) = reader @@ -23,14 +23,8 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( { let mut options = memmap::MmapOptions::new(); options.offset(offset); - - // somehow bck thinks borrows alias - // this is sound as file was already bound to 'a - use std::fs::File; - - let file = unsafe { std::mem::transmute::<&File, &'a File>(file) }; let mmap = MMapSemaphore::new_from_file_with_options(file, options)?; - Ok(ReaderBytes::Mapped(mmap, file)) + Ok(ReaderBytes::Owned(MemSlice::from_mmap(Arc::new(mmap)))) } else { // we can get the bytes for free if reader.to_bytes().is_some() { @@ -40,7 +34,7 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( // we have to read to an owned buffer to get the bytes. let mut bytes = Vec::with_capacity(1024 * 128); reader.read_to_end(&mut bytes)?; - Ok(ReaderBytes::Owned(bytes)) + Ok(ReaderBytes::Owned(bytes.into())) } } } diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index 29651d5eb56a..0ac1a643d93d 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -61,6 +61,12 @@ mod private { } } + impl From> for MemSlice { + fn from(value: Vec) -> Self { + Self::from_vec(value) + } + } + impl MemSlice { pub const EMPTY: Self = Self::from_static(&[]);