diff --git a/datafusion-examples/examples/dataframe-to-s3.rs b/datafusion-examples/examples/dataframe-to-s3.rs index 0d8d0a9fc15f..883da7d0d13d 100644 --- a/datafusion-examples/examples/dataframe-to-s3.rs +++ b/datafusion-examples/examples/dataframe-to-s3.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; @@ -61,15 +62,20 @@ async fn main() -> Result<()> { let df = ctx.sql("SELECT * from test").await?; let out_path = format!("s3://{bucket_name}/test_write/"); - df.clone().write_parquet(&out_path, None).await?; + df.clone() + .write_parquet(&out_path, DataFrameWriteOptions::new(), None) + .await?; //write as JSON to s3 let json_out = format!("s3://{bucket_name}/json_out"); - df.clone().write_json(&json_out).await?; + df.clone() + .write_json(&json_out, DataFrameWriteOptions::new()) + .await?; //write as csv to s3 let csv_out = format!("s3://{bucket_name}/csv_out"); - df.write_csv(&csv_out).await?; + df.write_csv(&csv_out, DataFrameWriteOptions::new(), None) + .await?; let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); let listing_options = ListingOptions::new(Arc::new(file_format)) diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index ebf177fdce98..336180b25633 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -46,6 +46,19 @@ pub struct CsvWriterOptions { // https://github.com/apache/arrow-rs/issues/4735 } +impl CsvWriterOptions { + pub fn new( + writer_options: WriterBuilder, + compression: CompressionTypeVariant, + ) -> Self { + Self { + writer_options, + compression, + has_header: true, + } + } +} + impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { type Error = DataFusionError; diff --git a/datafusion/common/src/file_options/json_writer.rs b/datafusion/common/src/file_options/json_writer.rs index b3ea76b6510a..7f988016c69d 100644 --- a/datafusion/common/src/file_options/json_writer.rs +++ b/datafusion/common/src/file_options/json_writer.rs @@ -33,6 +33,12 @@ pub struct JsonWriterOptions { pub compression: CompressionTypeVariant, } +impl JsonWriterOptions { + pub fn new(compression: CompressionTypeVariant) -> Self { + Self { compression } + } +} + impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions { type Error = DataFusionError; diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index ea3276b062ac..fed773f29e27 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -36,6 +36,12 @@ pub struct ParquetWriterOptions { pub writer_options: WriterProperties, } +impl ParquetWriterOptions { + pub fn new(writer_options: WriterProperties) -> Self { + Self { writer_options } + } +} + impl ParquetWriterOptions { pub fn writer_options(&self) -> &WriterProperties { &self.writer_options @@ -44,7 +50,7 @@ impl ParquetWriterOptions { /// Constructs a default Parquet WriterPropertiesBuilder using /// Session level ConfigOptions to initialize settings -fn default_builder(options: &ConfigOptions) -> Result { +pub fn default_builder(options: &ConfigOptions) -> Result { let parquet_session_options = &options.execution.parquet; let mut builder = WriterProperties::builder() .set_data_page_size_limit(parquet_session_options.data_pagesize_limit) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index fbd78ec9d7d8..648b2340d46e 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -22,10 +22,19 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; +use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; -use datafusion_common::file_options::StatementOptions; -use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions}; +use datafusion_common::file_options::csv_writer::CsvWriterOptions; +use datafusion_common::file_options::json_writer::JsonWriterOptions; +use datafusion_common::file_options::parquet_writer::{ + default_builder, ParquetWriterOptions, +}; +use datafusion_common::parsers::CompressionTypeVariant; +use datafusion_common::{ + DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions, +}; +use datafusion_expr::dml::CopyOptions; use parquet::file::properties::WriterProperties; use datafusion_common::{Column, DFSchema, ScalarValue}; @@ -57,21 +66,42 @@ use crate::prelude::SessionContext; /// written out from a DataFrame pub struct DataFrameWriteOptions { /// Controls if existing data should be overwritten - overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider - // settings such as LOCATION and FILETYPE can be set here - // e.g. add location: Option + overwrite: bool, + /// Controls if all partitions should be coalesced into a single output file + /// Generally will have slower performance when set to true. + single_file_output: bool, + /// Sets compression by DataFusion applied after file serialization. + /// Allows compression of CSV and JSON. + /// Not supported for parquet. + compression: CompressionTypeVariant, } impl DataFrameWriteOptions { /// Create a new DataFrameWriteOptions with default values pub fn new() -> Self { - DataFrameWriteOptions { overwrite: false } + DataFrameWriteOptions { + overwrite: false, + single_file_output: false, + compression: CompressionTypeVariant::UNCOMPRESSED, + } } /// Set the overwrite option to true or false pub fn with_overwrite(mut self, overwrite: bool) -> Self { self.overwrite = overwrite; self } + + /// Set the single_file_output value to true or false + pub fn with_single_file_output(mut self, single_file_output: bool) -> Self { + self.single_file_output = single_file_output; + self + } + + /// Sets the compression type applied to the output file(s) + pub fn with_compression(mut self, compression: CompressionTypeVariant) -> Self { + self.compression = compression; + self + } } impl Default for DataFrameWriteOptions { @@ -995,14 +1025,29 @@ impl DataFrame { pub async fn write_csv( self, path: &str, + options: DataFrameWriteOptions, + writer_properties: Option, ) -> Result, DataFusionError> { + if options.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented for DataFrame::write_csv.".to_owned(), + )); + } + let props = match writer_properties { + Some(props) => props, + None => WriterBuilder::new(), + }; + + let file_type_writer_options = + FileTypeWriterOptions::CSV(CsvWriterOptions::new(props, options.compression)); + let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options)); + let plan = LogicalPlanBuilder::copy_to( self.plan, path.into(), FileType::CSV, - false, - // TODO implement options - StatementOptions::new(vec![]), + options.single_file_output, + copy_options, )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1012,15 +1057,31 @@ impl DataFrame { pub async fn write_parquet( self, path: &str, - _writer_properties: Option, + options: DataFrameWriteOptions, + writer_properties: Option, ) -> Result, DataFusionError> { + if options.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented for DataFrame::write_parquet.".to_owned(), + )); + } + match options.compression{ + CompressionTypeVariant::UNCOMPRESSED => (), + _ => return Err(DataFusionError::Configuration("DataFrame::write_parquet method does not support compression set via DataFrameWriteOptions. Set parquet compression via writer_properties instead.".to_owned())) + } + let props = match writer_properties { + Some(props) => props, + None => default_builder(self.session_state.config_options())?.build(), + }; + let file_type_writer_options = + FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props)); + let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options)); let plan = LogicalPlanBuilder::copy_to( self.plan, path.into(), FileType::PARQUET, - false, - // TODO implement options - StatementOptions::new(vec![]), + options.single_file_output, + copy_options, )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1030,14 +1091,22 @@ impl DataFrame { pub async fn write_json( self, path: &str, + options: DataFrameWriteOptions, ) -> Result, DataFusionError> { + if options.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented for DataFrame::write_json.".to_owned(), + )); + } + let file_type_writer_options = + FileTypeWriterOptions::JSON(JsonWriterOptions::new(options.compression)); + let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options)); let plan = LogicalPlanBuilder::copy_to( self.plan, path.into(), FileType::JSON, - false, - // TODO implement options - StatementOptions::new(vec![]), + options.single_file_output, + copy_options, )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1249,6 +1318,11 @@ mod tests { WindowFunction, }; use datafusion_physical_expr::expressions::Column; + use object_store::local::LocalFileSystem; + use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; + use parquet::file::reader::FileReader; + use tempfile::TempDir; + use url::Url; use crate::execution::context::SessionConfig; use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; @@ -2292,4 +2366,53 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn write_parquet_with_compression() -> Result<()> { + let test_df = test_table().await?; + + let output_path = "file://local/test.parquet"; + let test_compressions = vec![ + parquet::basic::Compression::SNAPPY, + parquet::basic::Compression::LZ4, + parquet::basic::Compression::LZ4_RAW, + parquet::basic::Compression::GZIP(GzipLevel::default()), + parquet::basic::Compression::BROTLI(BrotliLevel::default()), + parquet::basic::Compression::ZSTD(ZstdLevel::default()), + ]; + for compression in test_compressions.into_iter() { + let df = test_df.clone(); + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + let ctx = &test_df.session_state; + ctx.runtime_env().register_object_store(&local_url, local); + df.write_parquet( + output_path, + DataFrameWriteOptions::new().with_single_file_output(true), + Some( + WriterProperties::builder() + .set_compression(compression) + .build(), + ), + ) + .await?; + + // Check that file actually used the specified compression + let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + + let reader = + parquet::file::serialized_reader::SerializedFileReader::new(file) + .unwrap(); + + let parquet_metadata = reader.metadata(); + + let written_compression = + parquet_metadata.row_group(0).column(0).compression(); + + assert_eq!(written_compression, compression); + } + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 9f670431cb70..e692810381f4 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -577,6 +577,7 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; + use crate::dataframe::DataFrameWriteOptions; use crate::prelude::*; use crate::test::{partitioned_csv_config, partitioned_file_groups}; use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; @@ -1071,7 +1072,7 @@ mod tests { let out_dir_url = "file://local/out"; let e = df - .write_csv(out_dir_url) + .write_csv(out_dir_url, DataFrameWriteOptions::new(), None) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); @@ -1106,7 +1107,8 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_csv(out_dir_url).await?; + df.write_csv(out_dir_url, DataFrameWriteOptions::new(), None) + .await?; // create a new context and verify that the results were saved to a partitioned csv file let ctx = SessionContext::new(); diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 4ad268a97652..ec5a4c1cefb8 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -321,6 +321,7 @@ mod tests { use object_store::local::LocalFileSystem; use crate::assert_batches_eq; + use crate::dataframe::DataFrameWriteOptions; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; @@ -696,7 +697,8 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT a, b FROM test").await?; - df.write_json(out_dir_url).await?; + df.write_json(out_dir_url, DataFrameWriteOptions::new()) + .await?; // create a new context and verify that the results were saved to a partitioned csv file let ctx = SessionContext::new(); @@ -789,7 +791,7 @@ mod tests { let df = ctx.read_csv("tests/data/corrupt.csv", options).await?; let out_dir_url = "file://local/out"; let e = df - .write_json(out_dir_url) + .write_json(out_dir_url, DataFrameWriteOptions::new()) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d973f13fad0a..861a37a3027b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -736,6 +736,7 @@ mod tests { // See also `parquet_exec` integration test use super::*; + use crate::dataframe::DataFrameWriteOptions; use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; @@ -932,7 +933,7 @@ mod tests { let df = ctx.read_csv("tests/data/corrupt.csv", options).await?; let out_dir_url = "file://local/out"; let e = df - .write_parquet(out_dir_url, None) + .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); @@ -1951,7 +1952,8 @@ mod tests { let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_parquet(out_dir_url, None).await?; + df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None) + .await?; // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?; // create a new context and verify that the results were saved to a partitioned parquet file diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 25b11b1f973f..3a59f40eded8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -40,7 +40,7 @@ use crate::logical_expr::{ use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::FileTypeWriterOptions; use datafusion_common::FileType; -use datafusion_expr::dml::CopyTo; +use datafusion_expr::dml::{CopyOptions, CopyTo}; use crate::logical_expr::{Limit, Values}; use crate::physical_expr::create_physical_expr; @@ -557,7 +557,7 @@ impl DefaultPhysicalPlanner { output_url, file_format, single_file_output, - statement_options, + copy_options, }) => { let input_exec = self.create_initial_plan(input, session_state).await?; @@ -569,10 +569,15 @@ impl DefaultPhysicalPlanner { let schema: Schema = (**input.schema()).clone().into(); - let file_type_writer_options = FileTypeWriterOptions::build( - file_format, - session_state.config_options(), - statement_options)?; + let file_type_writer_options = match copy_options{ + CopyOptions::SQLOptions(statement_options) => { + FileTypeWriterOptions::build( + file_format, + session_state.config_options(), + statement_options)? + }, + CopyOptions::WriterOptions(writer_options) => *writer_options.clone() + }; // Set file sink related options let config = FileSinkConfig { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d3cc42afbe66..eb7833504bd3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -17,7 +17,7 @@ //! This module provides a builder for creating LogicalPlans -use crate::dml::CopyTo; +use crate::dml::{CopyOptions, CopyTo}; use crate::expr::Alias; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, @@ -41,7 +41,6 @@ use crate::{ Expr, ExprSchemable, TableSource, }; use arrow::datatypes::{DataType, Schema, SchemaRef}; -use datafusion_common::file_options::StatementOptions; use datafusion_common::plan_err; use datafusion_common::UnnestOptions; use datafusion_common::{ @@ -240,14 +239,14 @@ impl LogicalPlanBuilder { output_url: String, file_format: FileType, single_file_output: bool, - statement_options: StatementOptions, + copy_options: CopyOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, file_format, single_file_output, - statement_options, + copy_options, }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 501f2eeba146..4cd56b89ac63 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -21,7 +21,8 @@ use std::{ }; use datafusion_common::{ - file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference, + file_options::StatementOptions, DFSchemaRef, FileType, FileTypeWriterOptions, + OwnedTableReference, }; use crate::LogicalPlan; @@ -40,7 +41,47 @@ pub struct CopyTo { /// to which each output partition is written to its own output file pub single_file_output: bool, /// Arbitrary options as tuples - pub statement_options: StatementOptions, + pub copy_options: CopyOptions, +} + +/// When the logical plan is constructed from SQL, CopyOptions +/// will contain arbitrary string tuples which must be parsed into +/// FileTypeWriterOptions. When the logical plan is constructed directly +/// from rust code (such as via the DataFrame API), FileTypeWriterOptions +/// can be provided directly, avoiding the run time cost and fallibility of +/// parsing string based options. +#[derive(Clone)] +pub enum CopyOptions { + /// Holds StatementOptions parsed from a SQL statement + SQLOptions(StatementOptions), + /// Holds FileTypeWriterOptions directly provided + WriterOptions(Box), +} + +impl PartialEq for CopyOptions { + fn eq(&self, other: &CopyOptions) -> bool { + match self { + Self::SQLOptions(statement1) => match other { + Self::SQLOptions(statement2) => statement1.eq(statement2), + Self::WriterOptions(_) => false, + }, + Self::WriterOptions(_) => false, + } + } +} + +impl Eq for CopyOptions {} + +impl std::hash::Hash for CopyOptions { + fn hash(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { + match self { + Self::SQLOptions(statement) => statement.hash(hasher), + Self::WriterOptions(_) => (), + } + } } /// The operator that modifies the content of a database (adapted from diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b17db245e6b1..083ee230c785 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -17,6 +17,7 @@ //! Logical plan types +use crate::dml::CopyOptions; use crate::expr::{Alias, Exists, InSubquery, Placeholder}; use crate::expr_rewriter::create_col_from_scalar_expr; use crate::expr_vec_fmt; @@ -1118,15 +1119,18 @@ impl LogicalPlan { output_url, file_format, single_file_output, - statement_options, + copy_options, }) => { - let op_str = statement_options - .clone() - .into_inner() - .iter() - .map(|(k, v)| format!("{k} {v}")) - .collect::>() - .join(", "); + let op_str = match copy_options { + CopyOptions::SQLOptions(statement) => statement + .clone() + .into_inner() + .iter() + .map(|(k, v)| format!("{k} {v}")) + .collect::>() + .join(", "), + CopyOptions::WriterOptions(_) => "".into(), + }; write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})") } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index fd6a65b312c6..be48418cb848 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -756,13 +756,13 @@ pub fn from_plan( output_url, file_format, single_file_output, - statement_options, + copy_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs[0].clone()), output_url: output_url.clone(), file_format: file_format.clone(), single_file_output: *single_file_output, - statement_options: statement_options.clone(), + copy_options: copy_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 655442d7e353..0b0c39113468 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -32,7 +32,7 @@ use datafusion_common::{ DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema, }; -use datafusion_expr::dml::CopyTo; +use datafusion_expr::dml::{CopyOptions, CopyTo}; use datafusion_expr::expr::Placeholder; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; @@ -617,12 +617,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // COPY defaults to outputting a single file if not otherwise specified let single_file_output = single_file_output.unwrap_or(true); + let copy_options = CopyOptions::SQLOptions(statement_options); + Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url: statement.target, file_format, single_file_output, - statement_options, + copy_options, })) }