From 80113a0dd1cde946f29800a4dfef912683469574 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 15:02:20 +1000 Subject: [PATCH 1/6] c --- crates/polars-io/src/ipc/ipc_stream.rs | 12 +-- crates/polars-io/src/ipc/mod.rs | 2 +- crates/polars-io/src/ipc/write.rs | 51 ++-------- crates/polars-io/src/partition.rs | 135 +++---------------------- crates/polars-io/src/shared.rs | 6 +- docs/_build/API_REFERENCE_LINKS.yml | 1 + docs/src/python/user-guide/io/hive.py | 4 +- docs/user-guide/io/hive.md | 8 +- py-polars/docs/source/reference/io.rst | 8 +- py-polars/polars/__init__.py | 2 + py-polars/polars/dataframe/frame.py | 102 ++----------------- py-polars/polars/io/__init__.py | 2 + py-polars/polars/io/hive.py | 41 ++++++++ py-polars/src/dataframe/io.rs | 70 +++++-------- py-polars/src/io/hive.rs | 25 +++++ py-polars/src/io/mod.rs | 1 + py-polars/src/lib.rs | 2 + py-polars/tests/unit/io/test_hive.py | 12 ++- 18 files changed, 157 insertions(+), 327 deletions(-) create mode 100644 py-polars/polars/io/hive.py create mode 100644 py-polars/src/io/hive.rs create mode 100644 py-polars/src/io/mod.rs diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 35bd22dab5c3..c8429e1b2d80 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -42,7 +42,7 @@ use arrow::io::ipc::{read, write}; use polars_core::prelude::*; use crate::prelude::*; -use crate::shared::{finish_reader, ArrowReader, WriterFactory}; +use crate::shared::{finish_reader, ArrowReader}; /// Read Arrows Stream IPC format into a DataFrame /// @@ -290,13 +290,3 @@ impl Default for IpcStreamWriterOption { Self::new() } } - -impl WriterFactory for IpcStreamWriterOption { - fn create_writer(&self, writer: W) -> Box> { - Box::new(IpcStreamWriter::new(writer).with_compression(self.compression)) - } - - fn extension(&self) -> PathBuf { - self.extension.to_owned() - } -} diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index f6b647979938..d78362f5555f 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -16,4 +16,4 @@ pub use ipc_file::{IpcReader, IpcScanOptions}; pub use ipc_reader_async::*; #[cfg(feature = "ipc_streaming")] pub use ipc_stream::*; -pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption, IpcWriterOptions}; +pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOptions}; diff --git a/crates/polars-io/src/ipc/write.rs b/crates/polars-io/src/ipc/write.rs index b187ff8edc07..ca6b33dd23fb 100644 --- a/crates/polars-io/src/ipc/write.rs +++ b/crates/polars-io/src/ipc/write.rs @@ -1,5 +1,4 @@ use std::io::Write; -use std::path::PathBuf; use arrow::io::ipc::write; use arrow::io::ipc::write::WriteOptions; @@ -8,7 +7,7 @@ use polars_core::prelude::*; use serde::{Deserialize, Serialize}; use crate::prelude::*; -use crate::shared::{schema_to_arrow_checked, WriterFactory}; +use crate::shared::schema_to_arrow_checked; #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -19,6 +18,12 @@ pub struct IpcWriterOptions { pub maintain_order: bool, } +impl IpcWriterOptions { + pub fn to_writer(&self, writer: W) -> IpcWriter { + IpcWriter::new(writer).with_compression(self.compression) + } +} + /// Write a DataFrame to Arrow's IPC format /// /// # Example @@ -153,45 +158,3 @@ impl From for write::Compression { } } } - -pub struct IpcWriterOption { - compression: Option, - extension: PathBuf, -} - -impl IpcWriterOption { - pub fn new() -> Self { - Self { - compression: None, - extension: PathBuf::from(".ipc"), - } - } - - /// Set the compression used. Defaults to None. - pub fn with_compression(mut self, compression: Option) -> Self { - self.compression = compression; - self - } - - /// Set the extension. Defaults to ".ipc". - pub fn with_extension(mut self, extension: PathBuf) -> Self { - self.extension = extension; - self - } -} - -impl Default for IpcWriterOption { - fn default() -> Self { - Self::new() - } -} - -impl WriterFactory for IpcWriterOption { - fn create_writer(&self, writer: W) -> Box> { - Box::new(IpcWriter::new(writer).with_compression(self.compression)) - } - - fn extension(&self) -> PathBuf { - self.extension.to_owned() - } -} diff --git a/crates/polars-io/src/partition.rs b/crates/polars-io/src/partition.rs index de821efb0d60..141067704c4f 100644 --- a/crates/polars-io/src/partition.rs +++ b/crates/polars-io/src/partition.rs @@ -1,8 +1,6 @@ //! Functionality for writing a DataFrame partitioned into multiple files. -use std::fs::File; -use std::io::BufWriter; -use std::path::{Path, PathBuf}; +use std::path::Path; use polars_core::prelude::*; use polars_core::series::IsSorted; @@ -10,135 +8,34 @@ use polars_core::POOL; use rayon::prelude::*; use crate::parquet::write::ParquetWriteOptions; -use crate::utils::resolve_homedir; -use crate::WriterFactory; - -/// Write a DataFrame with disk partitioning -/// -/// # Example -/// ``` -/// use polars_core::prelude::*; -/// use polars_io::ipc::IpcWriterOption; -/// use polars_io::partition::PartitionedWriter; -/// -/// fn example(df: &mut DataFrame) -> PolarsResult<()> { -/// let option = IpcWriterOption::default(); -/// PartitionedWriter::new(option, "./rootdir", ["a", "b"]) -/// .finish(df) -/// } -/// ``` - -pub struct PartitionedWriter { - option: F, - rootdir: PathBuf, - by: Vec, - parallel: bool, -} - -impl PartitionedWriter -where - F: WriterFactory + Send + Sync, -{ - pub fn new(option: F, rootdir: P, by: I) -> Self - where - P: Into, - I: IntoIterator, - S: AsRef, - { - Self { - option, - rootdir: rootdir.into(), - by: by.into_iter().map(|s| s.as_ref().to_string()).collect(), - parallel: true, - } - } - - /// Write the parquet file in parallel (default). - pub fn with_parallel(mut self, parallel: bool) -> Self { - self.parallel = parallel; - self - } - - fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> { - let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df); - std::fs::create_dir_all(&path)?; - - path.push(format!( - "data-{:04}.{}", - i, - self.option.extension().display() - )); - - let file = std::fs::File::create(path)?; - let writer = BufWriter::new(file); - - self.option - .create_writer::>(writer) - .finish(partition_df) - } - - pub fn finish(self, df: &DataFrame) -> PolarsResult<()> { - let groups = df.group_by(self.by.clone())?; - let groups = groups.get_groups(); - - // don't parallelize this - // there is a lot of parallelization in take and this may easily SO - POOL.install(|| { - match groups { - GroupsProxy::Idx(idx) => { - idx.par_iter() - .enumerate() - .map(|(i, (_, group))| { - // groups are in bounds - // and sorted - let mut part_df = unsafe { - df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending) - }; - self.write_partition_df(&mut part_df, i) - }) - .collect::>>() - }, - GroupsProxy::Slice { groups, .. } => groups - .par_iter() - .enumerate() - .map(|(i, [first, len])| { - let mut part_df = df.slice(*first as i64, *len as usize); - self.write_partition_df(&mut part_df, i) - }) - .collect::>>(), - } - })?; +use crate::prelude::IpcWriterOptions; +use crate::{SerWriter, WriteDataFrameToFile}; +impl WriteDataFrameToFile for ParquetWriteOptions { + fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { + self.to_writer(file).finish(&mut df)?; Ok(()) } } -/// `partition_df` must be created in the same way as `partition_by`. -fn resolve_partition_dir(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf -where - I: IntoIterator, - S: AsRef, -{ - let mut path = PathBuf::new(); - path.push(resolve_homedir(rootdir)); - - for key in by.into_iter() { - let value = partition_df[key.as_ref()].get(0).unwrap().to_string(); - path.push(format!("{}={}", key.as_ref(), value)) +impl WriteDataFrameToFile for IpcWriterOptions { + fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { + self.to_writer(file).finish(&mut df)?; + Ok(()) } - path } /// Write a partitioned parquet dataset. This functionality is unstable. -pub fn write_partitioned_dataset( +pub fn write_partitioned_dataset( df: &DataFrame, path: &Path, partition_by: &[S], - file_write_options: &ParquetWriteOptions, + file_write_options: &O, chunk_size: usize, ) -> PolarsResult<()> where S: AsRef, + O: WriteDataFrameToFile + Send + Sync, { // Note: When adding support for formats other than Parquet, avoid writing the partitioned // columns into the file. We write them for parquet because they are encoded efficiently with @@ -210,9 +107,9 @@ where (n_files, rows_per_file) }; - let write_part = |mut df: DataFrame, path: &Path| { + let write_part = |df: DataFrame, path: &Path| { let f = std::fs::File::create(path)?; - file_write_options.to_writer(f).finish(&mut df)?; + file_write_options.write_df_to_file(df, f)?; PolarsResult::Ok(()) }; @@ -258,7 +155,7 @@ where .par_iter() .map(|group| { let df = unsafe { - df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending) + df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending) }; finish_part_df(df) }) diff --git a/crates/polars-io/src/shared.rs b/crates/polars-io/src/shared.rs index 2788d10a54fd..735490b0bcb3 100644 --- a/crates/polars-io/src/shared.rs +++ b/crates/polars-io/src/shared.rs @@ -1,5 +1,4 @@ use std::io::{Read, Write}; -use std::path::PathBuf; use std::sync::Arc; use arrow::array::new_empty_array; @@ -41,9 +40,8 @@ where fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()>; } -pub trait WriterFactory { - fn create_writer(&self, writer: W) -> Box>; - fn extension(&self) -> PathBuf; +pub trait WriteDataFrameToFile { + fn write_df_to_file(&self, df: DataFrame, file: W) -> PolarsResult<()>; } pub trait ArrowReader { diff --git a/docs/_build/API_REFERENCE_LINKS.yml b/docs/_build/API_REFERENCE_LINKS.yml index 4cd8c18a3b05..4762e8b8a5aa 100644 --- a/docs/_build/API_REFERENCE_LINKS.yml +++ b/docs/_build/API_REFERENCE_LINKS.yml @@ -83,6 +83,7 @@ python: lazy: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.lazy.html explain: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.explain.html fetch: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.fetch.html + PartitionedWriteOptions: https://docs.pola.rs/api/python/stable/reference/api/polars.PartitionedWriteOptions.html SQLContext: https://docs.pola.rs/api/python/stable/reference/sql SQLregister: name: register diff --git a/docs/src/python/user-guide/io/hive.py b/docs/src/python/user-guide/io/hive.py index 215b4eea6f87..6df23dbf16a9 100644 --- a/docs/src/python/user-guide/io/hive.py +++ b/docs/src/python/user-guide/io/hive.py @@ -25,7 +25,7 @@ Path(path).parent.mkdir(exist_ok=True, parents=True) df.write_parquet(path) -Path("docs/data/hive_mixed/description.txt").touch() +Path("docs/data/hive_mixed/description.txt").write_text("A") def print_paths(path: str) -> None: @@ -123,7 +123,7 @@ def dir_recurse(path: Path): # --8<-- [end:write_parquet_partitioned_show_data] # --8<-- [start:write_parquet_partitioned] -df.write_parquet_partitioned("docs/data/hive_write/", ["a", "b"]) +df.write_parquet(pl.PartitionedWriteOptions("docs/data/hive_write/", ["a", "b"])) # --8<-- [end:write_parquet_partitioned] # --8<-- [start:write_parquet_partitioned_show_paths] diff --git a/docs/user-guide/io/hive.md b/docs/user-guide/io/hive.md index 27af6b6c18ee..128810bdd0e8 100644 --- a/docs/user-guide/io/hive.md +++ b/docs/user-guide/io/hive.md @@ -26,8 +26,8 @@ path included in the output: ### Handling mixed files -Passing a directory to `scan_parquet` may not work if there are extra non-data files next to the -data files. +Passing a directory to `scan_parquet` may not work if there are files with different extensions in +the directory. For this example the following directory structure is used: @@ -80,7 +80,7 @@ Polars supports writing hive partitioned parquet datasets, with planned support For this example the following DataFrame is used: -{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',['write_parquet_partitioned'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',[])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned_show_data" @@ -88,7 +88,7 @@ For this example the following DataFrame is used: We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`: -{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet_partitioned'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet', 'PartitionedWriteOptions'])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned" diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index 0085f3d943fb..babdcef0e1c8 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -107,9 +107,15 @@ Parquet read_parquet_schema scan_parquet DataFrame.write_parquet - DataFrame.write_parquet_partitioned LazyFrame.sink_parquet +Partitioned Writing +~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + PartitionedWriteOptions + PyArrow Datasets ~~~~~~~~~~~~~~~~ Connect to pyarrow datasets. diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 5931eb8c6c7d..682a575cadde 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -149,6 +149,7 @@ zeros, ) from polars.io import ( + PartitionedWriteOptions, read_avro, read_clipboard, read_csv, @@ -238,6 +239,7 @@ "Unknown", "Utf8", # polars.io + "PartitionedWriteOptions", "read_avro", "read_clipboard", "read_csv", diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 43450fc6e753..31f8c423bfa4 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -171,6 +171,7 @@ ) from polars._utils.various import NoDefault from polars.interchange.dataframe import PolarsDataFrame + from polars.io import PartitionedWriteOptions from polars.ml.torch import PolarsDataset if sys.version_info >= (3, 10): @@ -3437,7 +3438,7 @@ def write_ipc_stream( def write_parquet( self, - file: str | Path | BytesIO, + file: str | Path | BytesIO | PartitionedWriteOptions, *, compression: ParquetCompression = "zstd", compression_level: int | None = None, @@ -3454,6 +3455,8 @@ def write_parquet( ---------- file File path or writable file-like object to which the result will be written. + This can also accept `PartitionedWriteOptions` to write a partitioned + dataset. compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} Choose "zstd" for good compression performance. Choose "lz4" for fast compression/decompression. @@ -3533,6 +3536,11 @@ def write_parquet( else: file = normalize_filepath(file) + from polars.io import PartitionedWriteOptions + + if isinstance(file, PartitionedWriteOptions): + file = file._inner + if use_pyarrow: if statistics == "full" or isinstance(statistics, dict): msg = "write_parquet with `use_pyarrow=True` allows only boolean values for `statistics`" @@ -3603,98 +3611,6 @@ def write_parquet( data_page_size, ) - @unstable() - def write_parquet_partitioned( - self, - path: str | Path, - partition_by: str | Collection[str], - *, - chunk_size_bytes: int = 4_294_967_296, - compression: ParquetCompression = "zstd", - compression_level: int | None = None, - statistics: bool | str | dict[str, bool] = True, - row_group_size: int | None = None, - data_page_size: int | None = None, - ) -> None: - """ - Write a partitioned directory of parquet files. - - Parameters - ---------- - path - Path to the base directory for the partitioned dataset. - partition_by - Columns to partition by. - chunk_size_bytes - Approximate size to split DataFrames within a single partition when - writing. Note this is calculated using the size of the DataFrame in - memory - the size of the output file may differ depending on the - file format / compression. - compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} - Choose "zstd" for good compression performance. - Choose "lz4" for fast compression/decompression. - Choose "snappy" for more backwards compatibility guarantees - when you deal with older parquet readers. - compression_level - The level of compression to use. Higher compression means smaller files on - disk. - - - "gzip" : min-level: 0, max-level: 10. - - "brotli" : min-level: 0, max-level: 11. - - "zstd" : min-level: 1, max-level: 22. - - statistics - Write statistics to the parquet headers. This is the default behavior. - - Possible values: - - - `True`: enable default set of statistics (default) - - `False`: disable all statistics - - "full": calculate and write all available statistics. Cannot be - combined with `use_pyarrow`. - - `{ "statistic-key": True / False, ... }`. Cannot be combined with - `use_pyarrow`. Available keys: - - - "min": column minimum value (default: `True`) - - "max": column maximum value (default: `True`) - - "distinct_count": number of unique column values (default: `False`) - - "null_count": number of null values in column (default: `True`) - row_group_size - Size of the row groups in number of rows. Defaults to 512^2 rows. - data_page_size - Size of the data page in bytes. Defaults to 1024^2 bytes. - """ - path = normalize_filepath(path, check_not_directory=False) - partition_by = [partition_by] if isinstance(partition_by, str) else partition_by - - if isinstance(statistics, bool) and statistics: - statistics = { - "min": True, - "max": True, - "distinct_count": False, - "null_count": True, - } - elif isinstance(statistics, bool) and not statistics: - statistics = {} - elif statistics == "full": - statistics = { - "min": True, - "max": True, - "distinct_count": True, - "null_count": True, - } - - self._df.write_parquet_partitioned( - path, - partition_by, - chunk_size_bytes, - compression, - compression_level, - statistics, - row_group_size, - data_page_size, - ) - def write_database( self, table_name: str, diff --git a/py-polars/polars/io/__init__.py b/py-polars/polars/io/__init__.py index 9d63cc5a3780..de32a7763f95 100644 --- a/py-polars/polars/io/__init__.py +++ b/py-polars/polars/io/__init__.py @@ -5,6 +5,7 @@ from polars.io.csv import read_csv, read_csv_batched, scan_csv from polars.io.database import read_database, read_database_uri from polars.io.delta import read_delta, scan_delta +from polars.io.hive import PartitionedWriteOptions from polars.io.iceberg import scan_iceberg from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc from polars.io.json import read_json @@ -32,6 +33,7 @@ "read_parquet_schema", "scan_csv", "scan_delta", + "PartitionedWriteOptions", "scan_iceberg", "scan_ipc", "scan_ndjson", diff --git a/py-polars/polars/io/hive.py b/py-polars/polars/io/hive.py new file mode 100644 index 000000000000..3c2003d00555 --- /dev/null +++ b/py-polars/polars/io/hive.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Sequence + +from polars._utils.various import ( + is_str_sequence, + normalize_filepath, +) + + +class PartitionedWriteOptions: + """ + Configuration for writing a partitioned dataset. + + This is passed to `write_*` functions that support writing partitioned datasets. + """ + + def __init__( + self, + path: str | Path, + partition_by: str | Sequence[str], + *, + chunk_size_bytes: int = 4_294_967_296, + ): + if not isinstance(path, (str, Path)): + msg = f"`path` should be of type str or Path, got {type(path).__name__!r}" + raise TypeError(msg) + + path = normalize_filepath(path, check_not_directory=False) + + if isinstance(partition_by, str): + partition_by = [partition_by] + + if not is_str_sequence(partition_by): + msg = f"`partition_by` should be of type str or Collection[str], got {type(path).__name__!r}" + raise TypeError(msg) + + from polars.polars import PartitionedWriteOptions + + self._inner = PartitionedWriteOptions(path, partition_by, chunk_size_bytes) diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index 61afd6fc1b74..ccaa82282a77 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -389,7 +389,34 @@ impl PyDataFrame { row_group_size: Option, data_page_size: Option, ) -> PyResult<()> { + use polars_io::partition::write_partitioned_dataset; + + use crate::io::hive::PartitionedWriteOptions; let compression = parse_parquet_compression(compression, compression_level)?; + + if let Ok(part_opts) = py_f.downcast_bound::(py) { + let part_opts = part_opts.get(); + py.allow_threads(|| { + let write_options = ParquetWriteOptions { + compression, + statistics: statistics.0, + row_group_size, + data_page_size, + maintain_order: true, + }; + write_partitioned_dataset( + &self.df, + std::path::Path::new(AsRef::::as_ref(&part_opts.path)), + part_opts.partition_by.as_slice(), + &write_options, + part_opts.chunk_size_bytes, + ) + .map_err(PyPolarsErr::from) + })?; + + return Ok(()); + }; + let buf = get_file_like(py_f, true)?; py.allow_threads(|| { ParquetWriter::new(buf) @@ -403,49 +430,6 @@ impl PyDataFrame { Ok(()) } - #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, partition_by, chunk_size_bytes, compression, compression_level, statistics, row_group_size, data_page_size))] - pub fn write_parquet_partitioned( - &mut self, - py: Python, - py_f: PyObject, - partition_by: Vec, - chunk_size_bytes: usize, - compression: &str, - compression_level: Option, - statistics: Wrap, - row_group_size: Option, - data_page_size: Option, - ) -> PyResult<()> { - use std::path::Path; - - use polars_io::partition::write_partitioned_dataset; - - let Ok(path) = py_f.extract::(py) else { - return Err(PyPolarsErr::from(polars_err!(ComputeError: "expected path-like")).into()); - }; - let path = Path::new(&*path); - let compression = parse_parquet_compression(compression, compression_level)?; - - let write_options = ParquetWriteOptions { - compression, - statistics: statistics.0, - row_group_size, - data_page_size, - maintain_order: true, - }; - - write_partitioned_dataset( - &self.df, - path, - partition_by.as_slice(), - &write_options, - chunk_size_bytes, - ) - .map_err(PyPolarsErr::from)?; - Ok(()) - } - #[cfg(feature = "json")] pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> { let file = BufWriter::new(get_file_like(py_f, true)?); diff --git a/py-polars/src/io/hive.rs b/py-polars/src/io/hive.rs new file mode 100644 index 000000000000..a835112d2d12 --- /dev/null +++ b/py-polars/src/io/hive.rs @@ -0,0 +1,25 @@ +use pyo3::prelude::*; +use pyo3::pybacked::PyBackedStr; + +#[pyclass(frozen)] +pub struct PartitionedWriteOptions { + pub path: PyBackedStr, + pub partition_by: Vec, + pub chunk_size_bytes: usize, +} + +#[pymethods] +impl PartitionedWriteOptions { + #[new] + pub fn __init__( + path: PyBackedStr, + partition_by: Vec, + chunk_size_bytes: usize, + ) -> PyResult { + Ok(Self { + path, + partition_by, + chunk_size_bytes, + }) + } +} diff --git a/py-polars/src/io/mod.rs b/py-polars/src/io/mod.rs new file mode 100644 index 000000000000..5ba92ac1f6c1 --- /dev/null +++ b/py-polars/src/io/mod.rs @@ -0,0 +1 @@ +pub mod hive; diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 4e91dbb63e1f..99c4e489b90b 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -23,6 +23,7 @@ mod file; mod functions; mod gil_once_cell; mod interop; +mod io; mod lazyframe; mod lazygroupby; mod map; @@ -125,6 +126,7 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { m.add_class::().unwrap(); #[cfg(feature = "sql")] m.add_class::().unwrap(); + m.add_class::().unwrap(); // Submodules // LogicalPlan objects diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index bd84389c5919..77d1b1506c40 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -672,7 +672,7 @@ def test_projection_only_hive_parts_gives_correct_number_of_rows( @pytest.mark.write_disk() def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None: root = tmp_path - df.write_parquet_partitioned(root, ["a", "b"]) + df.write_parquet(pl.PartitionedWriteOptions(root, ["a", "b"])) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -693,7 +693,9 @@ def test_hive_write_multiple_files(tmp_path: Path) -> None: assert n_files > 1, "increase df size or decrease file size" root = tmp_path - df.write_parquet_partitioned(root, ["a"], chunk_size_bytes=chunk_size) + df.write_parquet( + pl.PartitionedWriteOptions(root, ["a"], chunk_size_bytes=chunk_size) + ) assert sum(1 for _ in (root / "a=0").iterdir()) == n_files assert_frame_equal(pl.scan_parquet(root).collect(), df) @@ -721,7 +723,7 @@ def test_hive_write_dates(tmp_path: Path) -> None: ) root = tmp_path - df.write_parquet_partitioned(root, ["date1", "date2"]) + df.write_parquet(pl.PartitionedWriteOptions(root, ["date1", "date2"])) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -738,8 +740,8 @@ def test_hive_predicate_dates_14712( tmp_path: Path, monkeypatch: Any, capfd: Any ) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") - pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet_partitioned( - tmp_path, ["a"] + pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet( + pl.PartitionedWriteOptions(tmp_path, ["a"]) ) pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect() assert "hive partitioning: skipped 1 files" in capfd.readouterr().err From 9ed13f516e6b2be0856e0487816e27c68a445114 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 15:07:31 +1000 Subject: [PATCH 2/6] c --- docs/src/python/user-guide/io/hive.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/src/python/user-guide/io/hive.py b/docs/src/python/user-guide/io/hive.py index 6df23dbf16a9..ef8885d2f53a 100644 --- a/docs/src/python/user-guide/io/hive.py +++ b/docs/src/python/user-guide/io/hive.py @@ -25,6 +25,7 @@ Path(path).parent.mkdir(exist_ok=True, parents=True) df.write_parquet(path) +# Make sure the file is not empty because path expansion ignores empty files. Path("docs/data/hive_mixed/description.txt").write_text("A") From 17085ed2d4839a88836b8fcfbc27edc49662240a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 15:15:24 +1000 Subject: [PATCH 3/6] c --- py-polars/polars/io/hive.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/py-polars/polars/io/hive.py b/py-polars/polars/io/hive.py index 3c2003d00555..b8d7231bf7b9 100644 --- a/py-polars/polars/io/hive.py +++ b/py-polars/polars/io/hive.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Sequence +from polars._utils.unstable import unstable from polars._utils.various import ( is_str_sequence, normalize_filepath, @@ -14,8 +15,25 @@ class PartitionedWriteOptions: Configuration for writing a partitioned dataset. This is passed to `write_*` functions that support writing partitioned datasets. + + .. warning:: + This functionality is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + + Parameters + ---------- + path + Path to the base directory for the partitioned dataset. + partition_by + Column(s) to partition by. + chunk_size_bytes + Approximate size to split DataFrames within a single partition when + writing. Note this is calculated using the size of the DataFrame in + memory - the size of the output file may differ depending on the + file format / compression. """ + @unstable() def __init__( self, path: str | Path, From d06d336a474fabd53bc6c9d8bddfd7bd601dfb51 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 15:16:47 +1000 Subject: [PATCH 4/6] c --- py-polars/polars/dataframe/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 31f8c423bfa4..d2ee9f87d982 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3456,7 +3456,7 @@ def write_parquet( file File path or writable file-like object to which the result will be written. This can also accept `PartitionedWriteOptions` to write a partitioned - dataset. + dataset (however note this functionality is unstable). compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} Choose "zstd" for good compression performance. Choose "lz4" for fast compression/decompression. From 21114f8a644ed62048b7f3a56d5a15d2d4bc9c8b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 15:25:42 +1000 Subject: [PATCH 5/6] c --- crates/polars-io/src/partition.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/polars-io/src/partition.rs b/crates/polars-io/src/partition.rs index 141067704c4f..439441eec085 100644 --- a/crates/polars-io/src/partition.rs +++ b/crates/polars-io/src/partition.rs @@ -8,6 +8,7 @@ use polars_core::POOL; use rayon::prelude::*; use crate::parquet::write::ParquetWriteOptions; +#[cfg(feature = "ipc")] use crate::prelude::IpcWriterOptions; use crate::{SerWriter, WriteDataFrameToFile}; @@ -18,6 +19,7 @@ impl WriteDataFrameToFile for ParquetWriteOptions { } } +#[cfg(feature = "ipc")] impl WriteDataFrameToFile for IpcWriterOptions { fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { self.to_writer(file).finish(&mut df)?; From e0123c3a49247ecbef6cdb2b21e80ef96ff2f6dc Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 12 Jul 2024 19:48:13 +1000 Subject: [PATCH 6/6] c --- docs/_build/API_REFERENCE_LINKS.yml | 1 - docs/src/python/user-guide/io/hive.py | 2 +- docs/user-guide/io/hive.md | 2 +- py-polars/docs/source/reference/io.rst | 7 --- py-polars/polars/__init__.py | 2 - py-polars/polars/dataframe/frame.py | 36 +++++++++++----- py-polars/polars/io/__init__.py | 2 - py-polars/polars/io/hive.py | 59 -------------------------- py-polars/src/dataframe/io.rs | 16 ++++--- py-polars/src/io/hive.rs | 25 ----------- py-polars/src/io/mod.rs | 1 - py-polars/src/lib.rs | 2 - py-polars/tests/unit/io/test_hive.py | 10 ++--- 13 files changed, 41 insertions(+), 124 deletions(-) delete mode 100644 py-polars/polars/io/hive.py delete mode 100644 py-polars/src/io/hive.rs delete mode 100644 py-polars/src/io/mod.rs diff --git a/docs/_build/API_REFERENCE_LINKS.yml b/docs/_build/API_REFERENCE_LINKS.yml index 4762e8b8a5aa..4cd8c18a3b05 100644 --- a/docs/_build/API_REFERENCE_LINKS.yml +++ b/docs/_build/API_REFERENCE_LINKS.yml @@ -83,7 +83,6 @@ python: lazy: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.lazy.html explain: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.explain.html fetch: https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.fetch.html - PartitionedWriteOptions: https://docs.pola.rs/api/python/stable/reference/api/polars.PartitionedWriteOptions.html SQLContext: https://docs.pola.rs/api/python/stable/reference/sql SQLregister: name: register diff --git a/docs/src/python/user-guide/io/hive.py b/docs/src/python/user-guide/io/hive.py index ef8885d2f53a..072b0fb3c34a 100644 --- a/docs/src/python/user-guide/io/hive.py +++ b/docs/src/python/user-guide/io/hive.py @@ -124,7 +124,7 @@ def dir_recurse(path: Path): # --8<-- [end:write_parquet_partitioned_show_data] # --8<-- [start:write_parquet_partitioned] -df.write_parquet(pl.PartitionedWriteOptions("docs/data/hive_write/", ["a", "b"])) +df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"]) # --8<-- [end:write_parquet_partitioned] # --8<-- [start:write_parquet_partitioned_show_paths] diff --git a/docs/user-guide/io/hive.md b/docs/user-guide/io/hive.md index 128810bdd0e8..d0de9ada2c59 100644 --- a/docs/user-guide/io/hive.md +++ b/docs/user-guide/io/hive.md @@ -88,7 +88,7 @@ For this example the following DataFrame is used: We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`: -{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet', 'PartitionedWriteOptions'])}} +{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}} ```python exec="on" result="text" session="user-guide/io/hive" --8<-- "python/user-guide/io/hive.py:write_parquet_partitioned" diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index babdcef0e1c8..1f088958a3c0 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -109,13 +109,6 @@ Parquet DataFrame.write_parquet LazyFrame.sink_parquet -Partitioned Writing -~~~~~~~~~~~~~~~~~~~ -.. autosummary:: - :toctree: api/ - - PartitionedWriteOptions - PyArrow Datasets ~~~~~~~~~~~~~~~~ Connect to pyarrow datasets. diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 682a575cadde..5931eb8c6c7d 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -149,7 +149,6 @@ zeros, ) from polars.io import ( - PartitionedWriteOptions, read_avro, read_clipboard, read_csv, @@ -239,7 +238,6 @@ "Unknown", "Utf8", # polars.io - "PartitionedWriteOptions", "read_avro", "read_clipboard", "read_csv", diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index d2ee9f87d982..355a8b937c4e 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -171,7 +171,6 @@ ) from polars._utils.various import NoDefault from polars.interchange.dataframe import PolarsDataFrame - from polars.io import PartitionedWriteOptions from polars.ml.torch import PolarsDataset if sys.version_info >= (3, 10): @@ -3438,7 +3437,7 @@ def write_ipc_stream( def write_parquet( self, - file: str | Path | BytesIO | PartitionedWriteOptions, + file: str | Path | BytesIO, *, compression: ParquetCompression = "zstd", compression_level: int | None = None, @@ -3447,6 +3446,8 @@ def write_parquet( data_page_size: int | None = None, use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, + partition_by: str | Sequence[str] | None = None, + partition_chunk_size_bytes: int = 4_294_967_296, ) -> None: """ Write to Apache Parquet file. @@ -3455,8 +3456,7 @@ def write_parquet( ---------- file File path or writable file-like object to which the result will be written. - This can also accept `PartitionedWriteOptions` to write a partitioned - dataset (however note this functionality is unstable). + This should be a path to a directory if writing a partitioned dataset. compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} Choose "zstd" for good compression performance. Choose "lz4" for fast compression/decompression. @@ -3500,6 +3500,14 @@ def write_parquet( using `pyarrow.parquet.write_to_dataset`. The `partition_cols` parameter leads to write the dataset to a directory. Similar to Spark's partitioned datasets. + partition_by + Column(s) to partition by. A partitioned dataset will be written if this is + specified. This parameter is considered unstable and is subject to change. + partition_chunk_size_bytes + Approximate size to split DataFrames within a single partition when + writing. Note this is calculated using the size of the DataFrame in + memory - the size of the output file may differ depending on the + file format / compression. Examples -------- @@ -3531,16 +3539,15 @@ def write_parquet( if compression is None: compression = "uncompressed" if isinstance(file, (str, Path)): - if pyarrow_options is not None and pyarrow_options.get("partition_cols"): + if ( + partition_by is not None + or pyarrow_options is not None + and pyarrow_options.get("partition_cols") + ): file = normalize_filepath(file, check_not_directory=False) else: file = normalize_filepath(file) - from polars.io import PartitionedWriteOptions - - if isinstance(file, PartitionedWriteOptions): - file = file._inner - if use_pyarrow: if statistics == "full" or isinstance(statistics, dict): msg = "write_parquet with `use_pyarrow=True` allows only boolean values for `statistics`" @@ -3602,6 +3609,13 @@ def write_parquet( "null_count": True, } + if partition_by is not None: + msg = "The `partition_by` parameter of `write_parquet` is considered unstable." + issue_unstable_warning(msg) + + if isinstance(partition_by, str): + partition_by = [partition_by] + self._df.write_parquet( file, compression, @@ -3609,6 +3623,8 @@ def write_parquet( statistics, row_group_size, data_page_size, + partition_by=partition_by, + partition_chunk_size_bytes=partition_chunk_size_bytes, ) def write_database( diff --git a/py-polars/polars/io/__init__.py b/py-polars/polars/io/__init__.py index de32a7763f95..9d63cc5a3780 100644 --- a/py-polars/polars/io/__init__.py +++ b/py-polars/polars/io/__init__.py @@ -5,7 +5,6 @@ from polars.io.csv import read_csv, read_csv_batched, scan_csv from polars.io.database import read_database, read_database_uri from polars.io.delta import read_delta, scan_delta -from polars.io.hive import PartitionedWriteOptions from polars.io.iceberg import scan_iceberg from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc from polars.io.json import read_json @@ -33,7 +32,6 @@ "read_parquet_schema", "scan_csv", "scan_delta", - "PartitionedWriteOptions", "scan_iceberg", "scan_ipc", "scan_ndjson", diff --git a/py-polars/polars/io/hive.py b/py-polars/polars/io/hive.py deleted file mode 100644 index b8d7231bf7b9..000000000000 --- a/py-polars/polars/io/hive.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Sequence - -from polars._utils.unstable import unstable -from polars._utils.various import ( - is_str_sequence, - normalize_filepath, -) - - -class PartitionedWriteOptions: - """ - Configuration for writing a partitioned dataset. - - This is passed to `write_*` functions that support writing partitioned datasets. - - .. warning:: - This functionality is considered **unstable**. It may be changed - at any point without it being considered a breaking change. - - Parameters - ---------- - path - Path to the base directory for the partitioned dataset. - partition_by - Column(s) to partition by. - chunk_size_bytes - Approximate size to split DataFrames within a single partition when - writing. Note this is calculated using the size of the DataFrame in - memory - the size of the output file may differ depending on the - file format / compression. - """ - - @unstable() - def __init__( - self, - path: str | Path, - partition_by: str | Sequence[str], - *, - chunk_size_bytes: int = 4_294_967_296, - ): - if not isinstance(path, (str, Path)): - msg = f"`path` should be of type str or Path, got {type(path).__name__!r}" - raise TypeError(msg) - - path = normalize_filepath(path, check_not_directory=False) - - if isinstance(partition_by, str): - partition_by = [partition_by] - - if not is_str_sequence(partition_by): - msg = f"`partition_by` should be of type str or Collection[str], got {type(path).__name__!r}" - raise TypeError(msg) - - from polars.polars import PartitionedWriteOptions - - self._inner = PartitionedWriteOptions(path, partition_by, chunk_size_bytes) diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index ccaa82282a77..e28a07a7d819 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -378,7 +378,7 @@ impl PyDataFrame { } #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size))] + #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes))] pub fn write_parquet( &mut self, py: Python, @@ -388,14 +388,16 @@ impl PyDataFrame { statistics: Wrap, row_group_size: Option, data_page_size: Option, + partition_by: Option>, + partition_chunk_size_bytes: usize, ) -> PyResult<()> { use polars_io::partition::write_partitioned_dataset; - use crate::io::hive::PartitionedWriteOptions; let compression = parse_parquet_compression(compression, compression_level)?; - if let Ok(part_opts) = py_f.downcast_bound::(py) { - let part_opts = part_opts.get(); + if let Some(partition_by) = partition_by { + let path = py_f.extract::(py)?; + py.allow_threads(|| { let write_options = ParquetWriteOptions { compression, @@ -406,10 +408,10 @@ impl PyDataFrame { }; write_partitioned_dataset( &self.df, - std::path::Path::new(AsRef::::as_ref(&part_opts.path)), - part_opts.partition_by.as_slice(), + std::path::Path::new(path.as_str()), + partition_by.as_slice(), &write_options, - part_opts.chunk_size_bytes, + partition_chunk_size_bytes, ) .map_err(PyPolarsErr::from) })?; diff --git a/py-polars/src/io/hive.rs b/py-polars/src/io/hive.rs deleted file mode 100644 index a835112d2d12..000000000000 --- a/py-polars/src/io/hive.rs +++ /dev/null @@ -1,25 +0,0 @@ -use pyo3::prelude::*; -use pyo3::pybacked::PyBackedStr; - -#[pyclass(frozen)] -pub struct PartitionedWriteOptions { - pub path: PyBackedStr, - pub partition_by: Vec, - pub chunk_size_bytes: usize, -} - -#[pymethods] -impl PartitionedWriteOptions { - #[new] - pub fn __init__( - path: PyBackedStr, - partition_by: Vec, - chunk_size_bytes: usize, - ) -> PyResult { - Ok(Self { - path, - partition_by, - chunk_size_bytes, - }) - } -} diff --git a/py-polars/src/io/mod.rs b/py-polars/src/io/mod.rs deleted file mode 100644 index 5ba92ac1f6c1..000000000000 --- a/py-polars/src/io/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hive; diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 99c4e489b90b..4e91dbb63e1f 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -23,7 +23,6 @@ mod file; mod functions; mod gil_once_cell; mod interop; -mod io; mod lazyframe; mod lazygroupby; mod map; @@ -126,7 +125,6 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { m.add_class::().unwrap(); #[cfg(feature = "sql")] m.add_class::().unwrap(); - m.add_class::().unwrap(); // Submodules // LogicalPlan objects diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 77d1b1506c40..7a7217e9cfba 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -672,7 +672,7 @@ def test_projection_only_hive_parts_gives_correct_number_of_rows( @pytest.mark.write_disk() def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None: root = tmp_path - df.write_parquet(pl.PartitionedWriteOptions(root, ["a", "b"])) + df.write_parquet(root, partition_by=["a", "b"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -693,9 +693,7 @@ def test_hive_write_multiple_files(tmp_path: Path) -> None: assert n_files > 1, "increase df size or decrease file size" root = tmp_path - df.write_parquet( - pl.PartitionedWriteOptions(root, ["a"], chunk_size_bytes=chunk_size) - ) + df.write_parquet(root, partition_by="a", partition_chunk_size_bytes=chunk_size) assert sum(1 for _ in (root / "a=0").iterdir()) == n_files assert_frame_equal(pl.scan_parquet(root).collect(), df) @@ -723,7 +721,7 @@ def test_hive_write_dates(tmp_path: Path) -> None: ) root = tmp_path - df.write_parquet(pl.PartitionedWriteOptions(root, ["date1", "date2"])) + df.write_parquet(root, partition_by=["date1", "date2"]) lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) @@ -741,7 +739,7 @@ def test_hive_predicate_dates_14712( ) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet( - pl.PartitionedWriteOptions(tmp_path, ["a"]) + tmp_path, partition_by="a" ) pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect() assert "hive partitioning: skipped 1 files" in capfd.readouterr().err