diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 35bd22dab5c3..c8429e1b2d80 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -42,7 +42,7 @@ use arrow::io::ipc::{read, write}; use polars_core::prelude::*; use crate::prelude::*; -use crate::shared::{finish_reader, ArrowReader, WriterFactory}; +use crate::shared::{finish_reader, ArrowReader}; /// Read Arrows Stream IPC format into a DataFrame /// @@ -290,13 +290,3 @@ impl Default for IpcStreamWriterOption { Self::new() } } - -impl WriterFactory for IpcStreamWriterOption { - fn create_writer(&self, writer: W) -> Box> { - Box::new(IpcStreamWriter::new(writer).with_compression(self.compression)) - } - - fn extension(&self) -> PathBuf { - self.extension.to_owned() - } -} diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index f6b647979938..d78362f5555f 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -16,4 +16,4 @@ pub use ipc_file::{IpcReader, IpcScanOptions}; pub use ipc_reader_async::*; #[cfg(feature = "ipc_streaming")] pub use ipc_stream::*; -pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption, IpcWriterOptions}; +pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOptions}; diff --git a/crates/polars-io/src/ipc/write.rs b/crates/polars-io/src/ipc/write.rs index b187ff8edc07..ca6b33dd23fb 100644 --- a/crates/polars-io/src/ipc/write.rs +++ b/crates/polars-io/src/ipc/write.rs @@ -1,5 +1,4 @@ use std::io::Write; -use std::path::PathBuf; use arrow::io::ipc::write; use arrow::io::ipc::write::WriteOptions; @@ -8,7 +7,7 @@ use polars_core::prelude::*; use serde::{Deserialize, Serialize}; use crate::prelude::*; -use crate::shared::{schema_to_arrow_checked, WriterFactory}; +use crate::shared::schema_to_arrow_checked; #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -19,6 +18,12 @@ pub struct IpcWriterOptions { pub maintain_order: bool, } +impl IpcWriterOptions { + pub fn to_writer(&self, writer: W) -> IpcWriter { + IpcWriter::new(writer).with_compression(self.compression) + } +} + /// Write a DataFrame to Arrow's IPC format /// /// # Example @@ -153,45 +158,3 @@ impl From for write::Compression { } } } - -pub struct IpcWriterOption { - compression: Option, - extension: PathBuf, -} - -impl IpcWriterOption { - pub fn new() -> Self { - Self { - compression: None, - extension: PathBuf::from(".ipc"), - } - } - - /// Set the compression used. Defaults to None. - pub fn with_compression(mut self, compression: Option) -> Self { - self.compression = compression; - self - } - - /// Set the extension. Defaults to ".ipc". - pub fn with_extension(mut self, extension: PathBuf) -> Self { - self.extension = extension; - self - } -} - -impl Default for IpcWriterOption { - fn default() -> Self { - Self::new() - } -} - -impl WriterFactory for IpcWriterOption { - fn create_writer(&self, writer: W) -> Box> { - Box::new(IpcWriter::new(writer).with_compression(self.compression)) - } - - fn extension(&self) -> PathBuf { - self.extension.to_owned() - } -} diff --git a/crates/polars-io/src/partition.rs b/crates/polars-io/src/partition.rs index de821efb0d60..439441eec085 100644 --- a/crates/polars-io/src/partition.rs +++ b/crates/polars-io/src/partition.rs @@ -1,8 +1,6 @@ //! Functionality for writing a DataFrame partitioned into multiple files. -use std::fs::File; -use std::io::BufWriter; -use std::path::{Path, PathBuf}; +use std::path::Path; use polars_core::prelude::*; use polars_core::series::IsSorted; @@ -10,135 +8,36 @@ use polars_core::POOL; use rayon::prelude::*; use crate::parquet::write::ParquetWriteOptions; -use crate::utils::resolve_homedir; -use crate::WriterFactory; - -/// Write a DataFrame with disk partitioning -/// -/// # Example -/// ``` -/// use polars_core::prelude::*; -/// use polars_io::ipc::IpcWriterOption; -/// use polars_io::partition::PartitionedWriter; -/// -/// fn example(df: &mut DataFrame) -> PolarsResult<()> { -/// let option = IpcWriterOption::default(); -/// PartitionedWriter::new(option, "./rootdir", ["a", "b"]) -/// .finish(df) -/// } -/// ``` - -pub struct PartitionedWriter { - option: F, - rootdir: PathBuf, - by: Vec, - parallel: bool, -} - -impl PartitionedWriter -where - F: WriterFactory + Send + Sync, -{ - pub fn new(option: F, rootdir: P, by: I) -> Self - where - P: Into, - I: IntoIterator, - S: AsRef, - { - Self { - option, - rootdir: rootdir.into(), - by: by.into_iter().map(|s| s.as_ref().to_string()).collect(), - parallel: true, - } - } - - /// Write the parquet file in parallel (default). - pub fn with_parallel(mut self, parallel: bool) -> Self { - self.parallel = parallel; - self - } - - fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> { - let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df); - std::fs::create_dir_all(&path)?; - - path.push(format!( - "data-{:04}.{}", - i, - self.option.extension().display() - )); - - let file = std::fs::File::create(path)?; - let writer = BufWriter::new(file); - - self.option - .create_writer::>(writer) - .finish(partition_df) - } - - pub fn finish(self, df: &DataFrame) -> PolarsResult<()> { - let groups = df.group_by(self.by.clone())?; - let groups = groups.get_groups(); - - // don't parallelize this - // there is a lot of parallelization in take and this may easily SO - POOL.install(|| { - match groups { - GroupsProxy::Idx(idx) => { - idx.par_iter() - .enumerate() - .map(|(i, (_, group))| { - // groups are in bounds - // and sorted - let mut part_df = unsafe { - df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending) - }; - self.write_partition_df(&mut part_df, i) - }) - .collect::>>() - }, - GroupsProxy::Slice { groups, .. } => groups - .par_iter() - .enumerate() - .map(|(i, [first, len])| { - let mut part_df = df.slice(*first as i64, *len as usize); - self.write_partition_df(&mut part_df, i) - }) - .collect::>>(), - } - })?; +#[cfg(feature = "ipc")] +use crate::prelude::IpcWriterOptions; +use crate::{SerWriter, WriteDataFrameToFile}; +impl WriteDataFrameToFile for ParquetWriteOptions { + fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { + self.to_writer(file).finish(&mut df)?; Ok(()) } } -/// `partition_df` must be created in the same way as `partition_by`. -fn resolve_partition_dir(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf -where - I: IntoIterator, - S: AsRef, -{ - let mut path = PathBuf::new(); - path.push(resolve_homedir(rootdir)); - - for key in by.into_iter() { - let value = partition_df[key.as_ref()].get(0).unwrap().to_string(); - path.push(format!("{}={}", key.as_ref(), value)) +#[cfg(feature = "ipc")] +impl WriteDataFrameToFile for IpcWriterOptions { + fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { + self.to_writer(file).finish(&mut df)?; + Ok(()) } - path } /// Write a partitioned parquet dataset. This functionality is unstable. -pub fn write_partitioned_dataset( +pub fn write_partitioned_dataset( df: &DataFrame, path: &Path, partition_by: &[S], - file_write_options: &ParquetWriteOptions, + file_write_options: &O, chunk_size: usize, ) -> PolarsResult<()> where S: AsRef, + O: WriteDataFrameToFile + Send + Sync, { // Note: When adding support for formats other than Parquet, avoid writing the partitioned // columns into the file. We write them for parquet because they are encoded efficiently with @@ -210,9 +109,9 @@ where (n_files, rows_per_file) }; - let write_part = |mut df: DataFrame, path: &Path| { + let write_part = |df: DataFrame, path: &Path| { let f = std::fs::File::create(path)?; - file_write_options.to_writer(f).finish(&mut df)?; + file_write_options.write_df_to_file(df, f)?; PolarsResult::Ok(()) }; @@ -258,7 +157,7 @@ where .par_iter() .map(|group| { let df = unsafe { - df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending) + df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending) }; finish_part_df(df) }) diff --git a/crates/polars-io/src/shared.rs b/crates/polars-io/src/shared.rs index 2788d10a54fd..735490b0bcb3 100644 --- a/crates/polars-io/src/shared.rs +++ b/crates/polars-io/src/shared.rs @@ -1,5 +1,4 @@ use std::io::{Read, Write}; -use std::path::PathBuf; use std::sync::Arc; use arrow::array::new_empty_array; @@ -41,9 +40,8 @@ where fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()>; } -pub trait WriterFactory { - fn create_writer(&self, writer: W) -> Box>; - fn extension(&self) -> PathBuf; +pub trait WriteDataFrameToFile { + fn write_df_to_file(&self, df: DataFrame, file: W) -> PolarsResult<()>; } pub trait ArrowReader { diff --git a/docs/src/python/user-guide/io/hive.py b/docs/src/python/user-guide/io/hive.py index 215b4eea6f87..072b0fb3c34a 100644 --- a/docs/src/python/user-guide/io/hive.py +++ b/docs/src/python/user-guide/io/hive.py @@ -25,7 +25,8 @@ Path(path).parent.mkdir(exist_ok=True, parents=True) df.write_parquet(path) -Path("docs/data/hive_mixed/description.txt").touch() +# Make sure the file is not empty because path expansion ignores empty files. +Path("docs/data/hive_mixed/description.txt").write_text("A") def print_paths(path: str) -> None: @@ -123,7 +124,7 @@ def dir_recurse(path: Path): # --8<-- [end:write_parquet_partitioned_show_data] # --8<-- [start:write_parquet_partitioned] -df.write_parquet_partitioned("docs/data/hive_write/", ["a", "b"]) +df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"]) # --8<-- [end:write_parquet_partitioned] # --8<-- [start:write_parquet_partitioned_show_paths] diff --git a/docs/user-guide/io/hive.md b/docs/user-guide/io/hive.md index 27af6b6c18ee..d0de9ada2c59 100644 --- a/docs/user-guide/io/hive.md +++ b/docs/user-guide/io/hive.md @@ -26,8 +26,8 @@ path included in the output: ### Handling mixed files -Passing a directory to `scan_parquet` may not work if there are extra non-data files next to the -data files. +Passing a directory to `scan_parquet` may not work if there are files with different extensions in +the directory. For this example the following directory structure is used: @@ -80,7 +80,7 @@ Polars supports writing hive partitioned parquet datasets, with planned support For this example the following DataFrame is used: -{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',['write_parquet_partitioned'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',[])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned_show_data" @@ -88,7 +88,7 @@ For this example the following DataFrame is used: We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`: -{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet_partitioned'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned" diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index 0085f3d943fb..1f088958a3c0 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -107,7 +107,6 @@ Parquet read_parquet_schema scan_parquet DataFrame.write_parquet - DataFrame.write_parquet_partitioned LazyFrame.sink_parquet PyArrow Datasets diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 43450fc6e753..355a8b937c4e 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3446,6 +3446,8 @@ def write_parquet( data_page_size: int | None = None, use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, + partition_by: str | Sequence[str] | None = None, + partition_chunk_size_bytes: int = 4_294_967_296, ) -> None: """ Write to Apache Parquet file. @@ -3454,6 +3456,7 @@ def write_parquet( ---------- file File path or writable file-like object to which the result will be written. + This should be a path to a directory if writing a partitioned dataset. compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} Choose "zstd" for good compression performance. Choose "lz4" for fast compression/decompression. @@ -3497,6 +3500,14 @@ def write_parquet( using `pyarrow.parquet.write_to_dataset`. The `partition_cols` parameter leads to write the dataset to a directory. Similar to Spark's partitioned datasets. + partition_by + Column(s) to partition by. A partitioned dataset will be written if this is + specified. This parameter is considered unstable and is subject to change. + partition_chunk_size_bytes + Approximate size to split DataFrames within a single partition when + writing. Note this is calculated using the size of the DataFrame in + memory - the size of the output file may differ depending on the + file format / compression. Examples -------- @@ -3528,7 +3539,11 @@ def write_parquet( if compression is None: compression = "uncompressed" if isinstance(file, (str, Path)): - if pyarrow_options is not None and pyarrow_options.get("partition_cols"): + if ( + partition_by is not None + or pyarrow_options is not None + and pyarrow_options.get("partition_cols") + ): file = normalize_filepath(file, check_not_directory=False) else: file = normalize_filepath(file) @@ -3594,6 +3609,13 @@ def write_parquet( "null_count": True, } + if partition_by is not None: + msg = "The `partition_by` parameter of `write_parquet` is considered unstable." + issue_unstable_warning(msg) + + if isinstance(partition_by, str): + partition_by = [partition_by] + self._df.write_parquet( file, compression, @@ -3601,100 +3623,10 @@ def write_parquet( statistics, row_group_size, data_page_size, + partition_by=partition_by, + partition_chunk_size_bytes=partition_chunk_size_bytes, ) - @unstable() - def write_parquet_partitioned( - self, - path: str | Path, - partition_by: str | Collection[str], - *, - chunk_size_bytes: int = 4_294_967_296, - compression: ParquetCompression = "zstd", - compression_level: int | None = None, - statistics: bool | str | dict[str, bool] = True, - row_group_size: int | None = None, - data_page_size: int | None = None, - ) -> None: - """ - Write a partitioned directory of parquet files. - - Parameters - ---------- - path - Path to the base directory for the partitioned dataset. - partition_by - Columns to partition by. - chunk_size_bytes - Approximate size to split DataFrames within a single partition when - writing. Note this is calculated using the size of the DataFrame in - memory - the size of the output file may differ depending on the - file format / compression. - compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} - Choose "zstd" for good compression performance. - Choose "lz4" for fast compression/decompression. - Choose "snappy" for more backwards compatibility guarantees - when you deal with older parquet readers. - compression_level - The level of compression to use. Higher compression means smaller files on - disk. - - - "gzip" : min-level: 0, max-level: 10. - - "brotli" : min-level: 0, max-level: 11. - - "zstd" : min-level: 1, max-level: 22. - - statistics - Write statistics to the parquet headers. This is the default behavior. - - Possible values: - - - `True`: enable default set of statistics (default) - - `False`: disable all statistics - - "full": calculate and write all available statistics. Cannot be - combined with `use_pyarrow`. - - `{ "statistic-key": True / False, ... }`. Cannot be combined with - `use_pyarrow`. Available keys: - - - "min": column minimum value (default: `True`) - - "max": column maximum value (default: `True`) - - "distinct_count": number of unique column values (default: `False`) - - "null_count": number of null values in column (default: `True`) - row_group_size - Size of the row groups in number of rows. Defaults to 512^2 rows. - data_page_size - Size of the data page in bytes. Defaults to 1024^2 bytes. - """ - path = normalize_filepath(path, check_not_directory=False) - partition_by = [partition_by] if isinstance(partition_by, str) else partition_by - - if isinstance(statistics, bool) and statistics: - statistics = { - "min": True, - "max": True, - "distinct_count": False, - "null_count": True, - } - elif isinstance(statistics, bool) and not statistics: - statistics = {} - elif statistics == "full": - statistics = { - "min": True, - "max": True, - "distinct_count": True, - "null_count": True, - } - - self._df.write_parquet_partitioned( - path, - partition_by, - chunk_size_bytes, - compression, - compression_level, - statistics, - row_group_size, - data_page_size, - ) - def write_database( self, table_name: str, diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index 61afd6fc1b74..e28a07a7d819 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -378,7 +378,7 @@ impl PyDataFrame { } #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size))] + #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes))] pub fn write_parquet( &mut self, py: Python, @@ -388,8 +388,37 @@ impl PyDataFrame { statistics: Wrap, row_group_size: Option, data_page_size: Option, + partition_by: Option>, + partition_chunk_size_bytes: usize, ) -> PyResult<()> { + use polars_io::partition::write_partitioned_dataset; + let compression = parse_parquet_compression(compression, compression_level)?; + + if let Some(partition_by) = partition_by { + let path = py_f.extract::(py)?; + + py.allow_threads(|| { + let write_options = ParquetWriteOptions { + compression, + statistics: statistics.0, + row_group_size, + data_page_size, + maintain_order: true, + }; + write_partitioned_dataset( + &self.df, + std::path::Path::new(path.as_str()), + partition_by.as_slice(), + &write_options, + partition_chunk_size_bytes, + ) + .map_err(PyPolarsErr::from) + })?; + + return Ok(()); + }; + let buf = get_file_like(py_f, true)?; py.allow_threads(|| { ParquetWriter::new(buf) @@ -403,49 +432,6 @@ impl PyDataFrame { Ok(()) } - #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, partition_by, chunk_size_bytes, compression, compression_level, statistics, row_group_size, data_page_size))] - pub fn write_parquet_partitioned( - &mut self, - py: Python, - py_f: PyObject, - partition_by: Vec, - chunk_size_bytes: usize, - compression: &str, - compression_level: Option, - statistics: Wrap, - row_group_size: Option, - data_page_size: Option, - ) -> PyResult<()> { - use std::path::Path; - - use polars_io::partition::write_partitioned_dataset; - - let Ok(path) = py_f.extract::(py) else { - return Err(PyPolarsErr::from(polars_err!(ComputeError: "expected path-like")).into()); - }; - let path = Path::new(&*path); - let compression = parse_parquet_compression(compression, compression_level)?; - - let write_options = ParquetWriteOptions { - compression, - statistics: statistics.0, - row_group_size, - data_page_size, - maintain_order: true, - }; - - write_partitioned_dataset( - &self.df, - path, - partition_by.as_slice(), - &write_options, - chunk_size_bytes, - ) - .map_err(PyPolarsErr::from)?; - Ok(()) - } - #[cfg(feature = "json")] pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> { let file = BufWriter::new(get_file_like(py_f, true)?); diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index bd84389c5919..7a7217e9cfba 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -672,7 +672,7 @@ def test_projection_only_hive_parts_gives_correct_number_of_rows( @pytest.mark.write_disk() def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None: root = tmp_path - df.write_parquet_partitioned(root, ["a", "b"]) + df.write_parquet(root, partition_by=["a", "b"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -693,7 +693,7 @@ def test_hive_write_multiple_files(tmp_path: Path) -> None: assert n_files > 1, "increase df size or decrease file size" root = tmp_path - df.write_parquet_partitioned(root, ["a"], chunk_size_bytes=chunk_size) + df.write_parquet(root, partition_by="a", partition_chunk_size_bytes=chunk_size) assert sum(1 for _ in (root / "a=0").iterdir()) == n_files assert_frame_equal(pl.scan_parquet(root).collect(), df) @@ -721,7 +721,7 @@ def test_hive_write_dates(tmp_path: Path) -> None: ) root = tmp_path - df.write_parquet_partitioned(root, ["date1", "date2"]) + df.write_parquet(root, partition_by=["date1", "date2"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -738,8 +738,8 @@ def test_hive_predicate_dates_14712( tmp_path: Path, monkeypatch: Any, capfd: Any ) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") - pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet_partitioned( - tmp_path, ["a"] + pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet( + tmp_path, partition_by="a" ) pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect() assert "hive partitioning: skipped 1 files" in capfd.readouterr().err