Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Write Options in DataFrame::write_* methods #7435

Merged
merged 6 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions datafusion-examples/examples/dataframe-to-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
Expand Down Expand Up @@ -61,15 +62,20 @@ async fn main() -> Result<()> {
let df = ctx.sql("SELECT * from test").await?;

let out_path = format!("s3://{bucket_name}/test_write/");
df.clone().write_parquet(&out_path, None).await?;
df.clone()
.write_parquet(&out_path, DataFrameWriteOptions::new(), None)
.await?;

//write as JSON to s3
let json_out = format!("s3://{bucket_name}/json_out");
df.clone().write_json(&json_out).await?;
df.clone()
.write_json(&json_out, DataFrameWriteOptions::new())
.await?;

//write as csv to s3
let csv_out = format!("s3://{bucket_name}/csv_out");
df.write_csv(&csv_out).await?;
df.write_csv(&csv_out, DataFrameWriteOptions::new(), None)
.await?;

let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ pub struct CsvWriterOptions {
// https://github.com/apache/arrow-rs/issues/4735
}

impl CsvWriterOptions {
pub fn new(
writer_options: WriterBuilder,
compression: CompressionTypeVariant,
) -> Self {
Self {
writer_options,
compression,
has_header: true,
}
}
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
type Error = DataFusionError;

Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/file_options/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub struct JsonWriterOptions {
pub compression: CompressionTypeVariant,
}

impl JsonWriterOptions {
pub fn new(compression: CompressionTypeVariant) -> Self {
Self { compression }
}
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions {
type Error = DataFusionError;

Expand Down
8 changes: 7 additions & 1 deletion datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub struct ParquetWriterOptions {
pub writer_options: WriterProperties,
}

impl ParquetWriterOptions {
pub fn new(writer_options: WriterProperties) -> Self {
Self { writer_options }
}
}

impl ParquetWriterOptions {
pub fn writer_options(&self) -> &WriterProperties {
&self.writer_options
Expand All @@ -44,7 +50,7 @@ impl ParquetWriterOptions {

/// Constructs a default Parquet WriterPropertiesBuilder using
/// Session level ConfigOptions to initialize settings
fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder> {
pub fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder> {
let parquet_session_options = &options.execution.parquet;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
Expand Down
142 changes: 126 additions & 16 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::file_options::StatementOptions;
use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions};
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::{
default_builder, ParquetWriterOptions,
};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::CopyOptions;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand Down Expand Up @@ -57,21 +66,42 @@ use crate::prelude::SessionContext;
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider
// settings such as LOCATION and FILETYPE can be set here
// e.g. add location: Option<Path>
overwrite: bool,
/// Controls if all partitions should be coalesced into a single output file
/// Generally will have slower performance when set to true.
single_file_output: bool,
/// Sets compression by DataFusion applied after file serialization.
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
}

impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions { overwrite: false }
DataFrameWriteOptions {
overwrite: false,
single_file_output: false,
compression: CompressionTypeVariant::UNCOMPRESSED,
}
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}

/// Set the single_file_output value to true or false
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
self.single_file_output = single_file_output;
self
}

/// Sets the compression type applied to the output file(s)
pub fn with_compression(mut self, compression: CompressionTypeVariant) -> Self {
self.compression = compression;
self
}
}

impl Default for DataFrameWriteOptions {
Expand Down Expand Up @@ -995,14 +1025,29 @@ impl DataFrame {
pub async fn write_csv(
self,
path: &str,
options: DataFrameWriteOptions,
writer_properties: Option<WriterBuilder>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_csv.".to_owned(),
));
}
let props = match writer_properties {
Some(props) => props,
None => WriterBuilder::new(),
};

let file_type_writer_options =
FileTypeWriterOptions::CSV(CsvWriterOptions::new(props, options.compression));
let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options));

let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::CSV,
false,
// TODO implement options
StatementOptions::new(vec![]),
options.single_file_output,
copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
Expand All @@ -1012,15 +1057,31 @@ impl DataFrame {
pub async fn write_parquet(
self,
path: &str,
_writer_properties: Option<WriterProperties>,
options: DataFrameWriteOptions,
writer_properties: Option<WriterProperties>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_parquet.".to_owned(),
));
}
match options.compression{
CompressionTypeVariant::UNCOMPRESSED => (),
_ => return Err(DataFusionError::Configuration("DataFrame::write_parquet method does not support compression set via DataFrameWriteOptions. Set parquet compression via writer_properties instead.".to_owned()))
}
let props = match writer_properties {
Some(props) => props,
None => default_builder(self.session_state.config_options())?.build(),
};
let file_type_writer_options =
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props));
let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options));
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::PARQUET,
false,
// TODO implement options
StatementOptions::new(vec![]),
options.single_file_output,
copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
Expand All @@ -1030,14 +1091,22 @@ impl DataFrame {
pub async fn write_json(
self,
path: &str,
options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_json.".to_owned(),
));
}
let file_type_writer_options =
FileTypeWriterOptions::JSON(JsonWriterOptions::new(options.compression));
let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options));
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::JSON,
false,
// TODO implement options
StatementOptions::new(vec![]),
options.single_file_output,
copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
Expand Down Expand Up @@ -1249,6 +1318,10 @@ mod tests {
WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
use object_store::local::LocalFileSystem;
use parquet::file::reader::FileReader;
use tempfile::TempDir;
use url::Url;

use crate::execution::context::SessionConfig;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
Expand Down Expand Up @@ -2292,4 +2365,41 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn write_parquet_with_compression() -> Result<()> {
let test_df = test_table().await?;
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
let ctx = &test_df.session_state;
ctx.runtime_env().register_object_store(&local_url, local);

let output_path = "file://local/test.parquet";
test_df
.write_parquet(
output_path,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(parquet::basic::Compression::SNAPPY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test looks good. I wonder if it is worth looping over a list of supported compressions rather than just testing this with one codec? If the default changed to SNAPPY in the future then this test would not really be testing that the WriterProperties value is respected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded the test to include all supported compression codecs. We unfortunately cannot test all compression levels for those codecs that support levels, since in general they do not include the used compression level in the file metadata. The parquet crate reader always reports the compression level as the "default" level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant lines in parquet crate:

https://github.com/apache/arrow-rs/blob/eeba0a3792a2774dee1d10a25340b2741cf95c9e/parquet/src/file/metadata.rs#L640

https://github.com/apache/arrow-rs/blob/eeba0a3792a2774dee1d10a25340b2741cf95c9e/parquet/src/format.rs#L3495

I suppose outside of testing, there is no compelling reason for the parquet file to store the compression level that was used for each column chunk.

.build(),
),
)
.await?;

// Check that file actually used snappy compression
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;

let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file).unwrap();

let parquet_metadata = reader.metadata();

let written_compression = parquet_metadata.row_group(0).column(0).compression();

assert_eq!(written_compression, parquet::basic::Compression::SNAPPY);

Ok(())
}
}
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
use crate::dataframe::DataFrameWriteOptions;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
Expand Down Expand Up @@ -1071,7 +1072,7 @@ mod tests {

let out_dir_url = "file://local/out";
let e = df
.write_csv(out_dir_url)
.write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}"));
Expand Down Expand Up @@ -1106,7 +1107,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_csv(out_dir_url).await?;
df.write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
.await?;

// create a new context and verify that the results were saved to a partitioned csv file
let ctx = SessionContext::new();
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ mod tests {
use object_store::local::LocalFileSystem;

use crate::assert_batches_eq;
use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -696,7 +697,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT a, b FROM test").await?;
df.write_json(out_dir_url).await?;
df.write_json(out_dir_url, DataFrameWriteOptions::new())
.await?;

// create a new context and verify that the results were saved to a partitioned csv file
let ctx = SessionContext::new();
Expand Down Expand Up @@ -789,7 +791,7 @@ mod tests {
let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
let out_dir_url = "file://local/out";
let e = df
.write_json(out_dir_url)
.write_json(out_dir_url, DataFrameWriteOptions::new())
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}"));
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ mod tests {
// See also `parquet_exec` integration test

use super::*;
use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
Expand Down Expand Up @@ -932,7 +933,7 @@ mod tests {
let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
let out_dir_url = "file://local/out";
let e = df
.write_parquet(out_dir_url, None)
.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}"));
Expand Down Expand Up @@ -1951,7 +1952,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_parquet(out_dir_url, None).await?;
df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
.await?;
// write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?;

// create a new context and verify that the results were saved to a partitioned parquet file
Expand Down
Loading