Skip to content

Commit

Permalink
feat: Added DataFrameWriteOptions option when writing as csv, json, p…
Browse files Browse the repository at this point in the history
…arquet.
  • Loading branch information
allinux committed Sep 7, 2024
1 parent 6409af3 commit ff45187
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
35 changes: 30 additions & 5 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,37 +409,62 @@ def except_all(self, other: DataFrame) -> DataFrame:
"""
return DataFrame(self.df.except_all(other.df))

def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None:
def write_csv(
self,
path: str | pathlib.Path,
with_header: bool = False,
overwrite: bool = False,
single_file_output: bool = False,
partition_by: Optional[List[str]] = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a CSV file.
Args:
path: Path of the CSV file to write.
with_header: If true, output the CSV header row.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name.
"""
self.df.write_csv(str(path), with_header)
self.df.write_csv(str(path), with_header, overwrite, single_file_output, partition_by)

def write_parquet(
self,
path: str | pathlib.Path,
compression: str = "uncompressed",
compression_level: int | None = None,
overwrite: bool = False,
single_file_output: bool = False,
partition_by: Optional[List[str]] = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
Args:
path: Path of the Parquet file to write.
compression: Compression type to use.
compression_level: Compression level to use.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name.
"""
self.df.write_parquet(str(path), compression, compression_level)
self.df.write_parquet(str(path), compression, compression_level, overwrite, single_file_output, partition_by)

def write_json(self, path: str | pathlib.Path) -> None:
def write_json(
self,
path: str | pathlib.Path,
overwrite: bool = False,
single_file_output: bool = False,
partition_by: Optional[List[str]] = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.
Args:
path: Path of the JSON file to write.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name.
"""
self.df.write_json(str(path))
self.df.write_json(str(path), overwrite, single_file_output, partition_by)

def to_arrow_table(self) -> pa.Table:
"""Execute the :py:class:`DataFrame` and convert it into an Arrow Table.
Expand Down
58 changes: 52 additions & 6 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,22 @@ impl PyDataFrame {
}

/// Write a `DataFrame` to a CSV file.
fn write_csv(&self, path: &str, with_header: bool, py: Python) -> PyResult<()> {
#[pyo3(signature = (
path,
with_header=false,
overwrite=false,
single_file_output=false,
partition_by=vec![],
))]
fn write_csv(
&self,
path: &str,
with_header: bool,
overwrite: bool,
single_file_output: bool,
partition_by: Vec<String>,
py: Python
) -> PyResult<()> {
let csv_options = CsvOptions {
has_header: Some(with_header),
..Default::default()
Expand All @@ -411,7 +426,10 @@ impl PyDataFrame {
py,
self.df.as_ref().clone().write_csv(
path,
DataFrameWriteOptions::new(),
DataFrameWriteOptions::default()
.with_overwrite(overwrite)
.with_single_file_output(single_file_output)
.with_partition_by(partition_by),
Some(csv_options),
),
)?;
Expand All @@ -422,13 +440,19 @@ impl PyDataFrame {
#[pyo3(signature = (
path,
compression="uncompressed",
compression_level=None
compression_level=None,
overwrite=false,
single_file_output=false,
partition_by=vec![],
))]
fn write_parquet(
&self,
path: &str,
compression: &str,
compression_level: Option<u32>,
overwrite: bool,
single_file_output: bool,
partition_by: Vec<String>,
py: Python,
) -> PyResult<()> {
fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
Expand Down Expand Up @@ -472,21 +496,43 @@ impl PyDataFrame {
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
DataFrameWriteOptions::default()
.with_overwrite(overwrite)
.with_single_file_output(single_file_output)
.with_partition_by(partition_by),
Option::from(options),
),
)?;
Ok(())
}

/// Executes a query and writes the results to a partitioned JSON file.
fn write_json(&self, path: &str, py: Python) -> PyResult<()> {
#[pyo3(signature = (
path,
overwrite=false,
single_file_output=false,
partition_by=vec![],
))]
fn write_json(
&self,
path: &str,
overwrite: bool,
single_file_output: bool,
partition_by: Vec<String>,
py: Python
) -> PyResult<()> {
wait_for_future(
py,
self.df
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new(), None),
.write_json(
path,
DataFrameWriteOptions::default()
.with_overwrite(overwrite)
.with_single_file_output(single_file_output)
.with_partition_by(partition_by),
None),
)?;
Ok(())
}
Expand Down

0 comments on commit ff45187

Please sign in to comment.