Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Change API for writing partitioned Parquet to reduce code duplication #17586

Merged
merged 6 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::io::ipc::{read, write};
use polars_core::prelude::*;

use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader, WriterFactory};
use crate::shared::{finish_reader, ArrowReader};

/// Read Arrows Stream IPC format into a DataFrame
///
Expand Down Expand Up @@ -290,13 +290,3 @@ impl Default for IpcStreamWriterOption {
Self::new()
}
}

impl WriterFactory for IpcStreamWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcStreamWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub use ipc_file::{IpcReader, IpcScanOptions};
pub use ipc_reader_async::*;
#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption, IpcWriterOptions};
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOptions};
51 changes: 7 additions & 44 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::Write;
use std::path::PathBuf;

use arrow::io::ipc::write;
use arrow::io::ipc::write::WriteOptions;
Expand All @@ -8,7 +7,7 @@ use polars_core::prelude::*;
use serde::{Deserialize, Serialize};

use crate::prelude::*;
use crate::shared::{schema_to_arrow_checked, WriterFactory};
use crate::shared::schema_to_arrow_checked;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand All @@ -19,6 +18,12 @@ pub struct IpcWriterOptions {
pub maintain_order: bool,
}

impl IpcWriterOptions {
pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
IpcWriter::new(writer).with_compression(self.compression)
}
}

/// Write a DataFrame to Arrow's IPC format
///
/// # Example
Expand Down Expand Up @@ -153,45 +158,3 @@ impl From<IpcCompression> for write::Compression {
}
}
}

pub struct IpcWriterOption {
compression: Option<IpcCompression>,
extension: PathBuf,
}

impl IpcWriterOption {
pub fn new() -> Self {
Self {
compression: None,
extension: PathBuf::from(".ipc"),
}
}

/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
self.compression = compression;
self
}

/// Set the extension. Defaults to ".ipc".
pub fn with_extension(mut self, extension: PathBuf) -> Self {
self.extension = extension;
self
}
}

impl Default for IpcWriterOption {
fn default() -> Self {
Self::new()
}
}

impl WriterFactory for IpcWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}
137 changes: 18 additions & 119 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,144 +1,43 @@
//! Functionality for writing a DataFrame partitioned into multiple files.

use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::path::Path;

use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::parquet::write::ParquetWriteOptions;
use crate::utils::resolve_homedir;
use crate::WriterFactory;

/// Write a DataFrame with disk partitioning
///
/// # Example
/// ```
/// use polars_core::prelude::*;
/// use polars_io::ipc::IpcWriterOption;
/// use polars_io::partition::PartitionedWriter;
///
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
/// let option = IpcWriterOption::default();
/// PartitionedWriter::new(option, "./rootdir", ["a", "b"])
/// .finish(df)
/// }
/// ```

pub struct PartitionedWriter<F> {
option: F,
rootdir: PathBuf,
by: Vec<String>,
parallel: bool,
}

impl<F> PartitionedWriter<F>
where
F: WriterFactory + Send + Sync,
{
pub fn new<P, I, S>(option: F, rootdir: P, by: I) -> Self
where
P: Into<PathBuf>,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
Self {
option,
rootdir: rootdir.into(),
by: by.into_iter().map(|s| s.as_ref().to_string()).collect(),
parallel: true,
}
}

/// Write the parquet file in parallel (default).
pub fn with_parallel(mut self, parallel: bool) -> Self {
self.parallel = parallel;
self
}

fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> {
let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df);
std::fs::create_dir_all(&path)?;

path.push(format!(
"data-{:04}.{}",
i,
self.option.extension().display()
));

let file = std::fs::File::create(path)?;
let writer = BufWriter::new(file);

self.option
.create_writer::<BufWriter<File>>(writer)
.finish(partition_df)
}

pub fn finish(self, df: &DataFrame) -> PolarsResult<()> {
let groups = df.group_by(self.by.clone())?;
let groups = groups.get_groups();

// don't parallelize this
// there is a lot of parallelization in take and this may easily SO
POOL.install(|| {
match groups {
GroupsProxy::Idx(idx) => {
idx.par_iter()
.enumerate()
.map(|(i, (_, group))| {
// groups are in bounds
// and sorted
let mut part_df = unsafe {
df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
};
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>()
},
GroupsProxy::Slice { groups, .. } => groups
.par_iter()
.enumerate()
.map(|(i, [first, len])| {
let mut part_df = df.slice(*first as i64, *len as usize);
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>(),
}
})?;
#[cfg(feature = "ipc")]
use crate::prelude::IpcWriterOptions;
use crate::{SerWriter, WriteDataFrameToFile};

impl WriteDataFrameToFile for ParquetWriteOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
Ok(())
}
}

/// `partition_df` must be created in the same way as `partition_by`.
fn resolve_partition_dir<I, S>(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut path = PathBuf::new();
path.push(resolve_homedir(rootdir));

for key in by.into_iter() {
let value = partition_df[key.as_ref()].get(0).unwrap().to_string();
path.push(format!("{}={}", key.as_ref(), value))
#[cfg(feature = "ipc")]
impl WriteDataFrameToFile for IpcWriterOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
Ok(())
}
path
}

/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset<S>(
pub fn write_partitioned_dataset<S, O>(
df: &DataFrame,
path: &Path,
partition_by: &[S],
file_write_options: &ParquetWriteOptions,
file_write_options: &O,
chunk_size: usize,
) -> PolarsResult<()>
where
S: AsRef<str>,
O: WriteDataFrameToFile + Send + Sync,
{
// Note: When adding support for formats other than Parquet, avoid writing the partitioned
// columns into the file. We write them for parquet because they are encoded efficiently with
Expand Down Expand Up @@ -210,9 +109,9 @@ where
(n_files, rows_per_file)
};

let write_part = |mut df: DataFrame, path: &Path| {
let write_part = |df: DataFrame, path: &Path| {
let f = std::fs::File::create(path)?;
file_write_options.to_writer(f).finish(&mut df)?;
file_write_options.write_df_to_file(df, f)?;
PolarsResult::Ok(())
};

Expand Down Expand Up @@ -258,7 +157,7 @@ where
.par_iter()
.map(|group| {
let df = unsafe {
df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by - we can enable parallel gather now that we are processing in chunks.

};
finish_part_df(df)
})
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-io/src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::new_empty_array;
Expand Down Expand Up @@ -41,9 +40,8 @@ where
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()>;
}

pub trait WriterFactory {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>>;
fn extension(&self) -> PathBuf;
pub trait WriteDataFrameToFile {
fn write_df_to_file<W: std::io::Write>(&self, df: DataFrame, file: W) -> PolarsResult<()>;
}

pub trait ArrowReader {
Expand Down
5 changes: 3 additions & 2 deletions docs/src/python/user-guide/io/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
Path(path).parent.mkdir(exist_ok=True, parents=True)
df.write_parquet(path)

Path("docs/data/hive_mixed/description.txt").touch()
# Make sure the file is not empty because path expansion ignores empty files.
Path("docs/data/hive_mixed/description.txt").write_text("A")


def print_paths(path: str) -> None:
Expand Down Expand Up @@ -123,7 +124,7 @@ def dir_recurse(path: Path):
# --8<-- [end:write_parquet_partitioned_show_data]

# --8<-- [start:write_parquet_partitioned]
df.write_parquet_partitioned("docs/data/hive_write/", ["a", "b"])
df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"])
# --8<-- [end:write_parquet_partitioned]

# --8<-- [start:write_parquet_partitioned_show_paths]
Expand Down
8 changes: 4 additions & 4 deletions docs/user-guide/io/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ path included in the output:

### Handling mixed files

Passing a directory to `scan_parquet` may not work if there are extra non-data files next to the
data files.
Passing a directory to `scan_parquet` may not work if there are files with different extensions in
the directory.

For this example the following directory structure is used:

Expand Down Expand Up @@ -80,15 +80,15 @@ Polars supports writing hive partitioned parquet datasets, with planned support

For this example the following DataFrame is used:

{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',['write_parquet_partitioned'])}}
{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',[])}}

```python exec="on" result="text" session="user-guide/io/hive"
--8<-- "python/user-guide/io/hive.py:write_parquet_partitioned_show_data"
```

We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`:

{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet_partitioned'])}}
{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}}

```python exec="on" result="text" session="user-guide/io/hive"
--8<-- "python/user-guide/io/hive.py:write_parquet_partitioned"
Expand Down
1 change: 0 additions & 1 deletion py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ Parquet
read_parquet_schema
scan_parquet
DataFrame.write_parquet
DataFrame.write_parquet_partitioned
LazyFrame.sink_parquet

PyArrow Datasets
Expand Down
Loading