Skip to content

Commit 3892499

Browse files
fmeringdalalamb
andauthored
Support REPLACE INTO for INSERT statements (#12516)
* Add support for REPLACE INTO statements. This commit introduces an `InsertOp` enum to replace the boolean `overwrite` flag to provide a more clear and flexible control over how data is inserted. This change updates the following APIs and configs to reflect the change: `TableProvider::insert_into`, `FileSinkConfig` and `DataFrameWriteOptions`. * fix clippy and add license * Update vendored code --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 341ec9a commit 3892499

File tree

23 files changed

+447
-107
lines changed

23 files changed

+447
-107
lines changed

datafusion/catalog/src/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow_schema::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion_common::Result;
2727
use datafusion_common::{not_impl_err, Constraints, Statistics};
28+
use datafusion_expr::dml::InsertOp;
2829
use datafusion_expr::{
2930
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
3031
};
@@ -274,7 +275,7 @@ pub trait TableProvider: Debug + Sync + Send {
274275
&self,
275276
_state: &dyn Session,
276277
_input: Arc<dyn ExecutionPlan>,
277-
_overwrite: bool,
278+
_insert_op: InsertOp,
278279
) -> Result<Arc<dyn ExecutionPlan>> {
279280
not_impl_err!("Insert into not implemented for this table")
280281
}

datafusion/core/src/dataframe/mod.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
5252
use datafusion_common::{
5353
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
5454
};
55+
use datafusion_expr::dml::InsertOp;
5556
use datafusion_expr::{case, is_null, lit, SortExpr};
5657
use datafusion_expr::{
5758
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
@@ -66,8 +67,9 @@ use datafusion_catalog::Session;
6667
/// Contains options that control how data is
6768
/// written out from a DataFrame
6869
pub struct DataFrameWriteOptions {
69-
/// Controls if existing data should be overwritten
70-
overwrite: bool,
70+
/// Controls how new data should be written to the table, determining whether
71+
/// to append, overwrite, or replace existing data.
72+
insert_op: InsertOp,
7173
/// Controls if all partitions should be coalesced into a single output file
7274
/// Generally will have slower performance when set to true.
7375
single_file_output: bool,
@@ -80,14 +82,15 @@ impl DataFrameWriteOptions {
8082
/// Create a new DataFrameWriteOptions with default values
8183
pub fn new() -> Self {
8284
DataFrameWriteOptions {
83-
overwrite: false,
85+
insert_op: InsertOp::Append,
8486
single_file_output: false,
8587
partition_by: vec![],
8688
}
8789
}
88-
/// Set the overwrite option to true or false
89-
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
90-
self.overwrite = overwrite;
90+
91+
/// Set the insert operation
92+
pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
93+
self.insert_op = insert_op;
9194
self
9295
}
9396

@@ -1525,7 +1528,7 @@ impl DataFrame {
15251528
self.plan,
15261529
table_name.to_owned(),
15271530
&arrow_schema,
1528-
write_options.overwrite,
1531+
write_options.insert_op,
15291532
)?
15301533
.build()?;
15311534

@@ -1566,10 +1569,11 @@ impl DataFrame {
15661569
options: DataFrameWriteOptions,
15671570
writer_options: Option<CsvOptions>,
15681571
) -> Result<Vec<RecordBatch>, DataFusionError> {
1569-
if options.overwrite {
1570-
return Err(DataFusionError::NotImplemented(
1571-
"Overwrites are not implemented for DataFrame::write_csv.".to_owned(),
1572-
));
1572+
if options.insert_op != InsertOp::Append {
1573+
return Err(DataFusionError::NotImplemented(format!(
1574+
"{} is not implemented for DataFrame::write_csv.",
1575+
options.insert_op
1576+
)));
15731577
}
15741578

15751579
let format = if let Some(csv_opts) = writer_options {
@@ -1626,10 +1630,11 @@ impl DataFrame {
16261630
options: DataFrameWriteOptions,
16271631
writer_options: Option<JsonOptions>,
16281632
) -> Result<Vec<RecordBatch>, DataFusionError> {
1629-
if options.overwrite {
1630-
return Err(DataFusionError::NotImplemented(
1631-
"Overwrites are not implemented for DataFrame::write_json.".to_owned(),
1632-
));
1633+
if options.insert_op != InsertOp::Append {
1634+
return Err(DataFusionError::NotImplemented(format!(
1635+
"{} is not implemented for DataFrame::write_json.",
1636+
options.insert_op
1637+
)));
16331638
}
16341639

16351640
let format = if let Some(json_opts) = writer_options {

datafusion/core/src/dataframe/parquet.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use super::{
2626
};
2727

2828
use datafusion_common::config::TableParquetOptions;
29+
use datafusion_expr::dml::InsertOp;
2930

3031
impl DataFrame {
3132
/// Execute the `DataFrame` and write the results to Parquet file(s).
@@ -57,10 +58,11 @@ impl DataFrame {
5758
options: DataFrameWriteOptions,
5859
writer_options: Option<TableParquetOptions>,
5960
) -> Result<Vec<RecordBatch>, DataFusionError> {
60-
if options.overwrite {
61-
return Err(DataFusionError::NotImplemented(
62-
"Overwrites are not implemented for DataFrame::write_parquet.".to_owned(),
63-
));
61+
if options.insert_op != InsertOp::Append {
62+
return Err(DataFusionError::NotImplemented(format!(
63+
"{} is not implemented for DataFrame::write_parquet.",
64+
options.insert_op
65+
)));
6466
}
6567

6668
let format = if let Some(parquet_opts) = writer_options {

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use datafusion_common::{
4747
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
4848
};
4949
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
50+
use datafusion_expr::dml::InsertOp;
5051
use datafusion_physical_expr::PhysicalExpr;
5152
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
5253
use datafusion_physical_plan::metrics::MetricsSet;
@@ -181,7 +182,7 @@ impl FileFormat for ArrowFormat {
181182
conf: FileSinkConfig,
182183
order_requirements: Option<LexRequirement>,
183184
) -> Result<Arc<dyn ExecutionPlan>> {
184-
if conf.overwrite {
185+
if conf.insert_op != InsertOp::Append {
185186
return not_impl_err!("Overwrites are not implemented yet for Arrow format");
186187
}
187188

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use datafusion_common::{
4646
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
4747
};
4848
use datafusion_execution::TaskContext;
49+
use datafusion_expr::dml::InsertOp;
4950
use datafusion_physical_expr::PhysicalExpr;
5051
use datafusion_physical_plan::metrics::MetricsSet;
5152

@@ -382,7 +383,7 @@ impl FileFormat for CsvFormat {
382383
conf: FileSinkConfig,
383384
order_requirements: Option<LexRequirement>,
384385
) -> Result<Arc<dyn ExecutionPlan>> {
385-
if conf.overwrite {
386+
if conf.insert_op != InsertOp::Append {
386387
return not_impl_err!("Overwrites are not implemented yet for CSV");
387388
}
388389

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
4646
use datafusion_common::file_options::json_writer::JsonWriterOptions;
4747
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
4848
use datafusion_execution::TaskContext;
49+
use datafusion_expr::dml::InsertOp;
4950
use datafusion_physical_expr::PhysicalExpr;
5051
use datafusion_physical_plan::metrics::MetricsSet;
5152
use datafusion_physical_plan::ExecutionPlan;
@@ -252,7 +253,7 @@ impl FileFormat for JsonFormat {
252253
conf: FileSinkConfig,
253254
order_requirements: Option<LexRequirement>,
254255
) -> Result<Arc<dyn ExecutionPlan>> {
255-
if conf.overwrite {
256+
if conf.insert_op != InsertOp::Append {
256257
return not_impl_err!("Overwrites are not implemented yet for Json");
257258
}
258259

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use datafusion_common::{
5353
use datafusion_common_runtime::SpawnedTask;
5454
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
5555
use datafusion_execution::TaskContext;
56+
use datafusion_expr::dml::InsertOp;
5657
use datafusion_expr::Expr;
5758
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
5859
use datafusion_physical_expr::PhysicalExpr;
@@ -403,7 +404,7 @@ impl FileFormat for ParquetFormat {
403404
conf: FileSinkConfig,
404405
order_requirements: Option<LexRequirement>,
405406
) -> Result<Arc<dyn ExecutionPlan>> {
406-
if conf.overwrite {
407+
if conf.insert_op != InsertOp::Append {
407408
return not_impl_err!("Overwrites are not implemented yet for Parquet");
408409
}
409410

@@ -2269,7 +2270,7 @@ mod tests {
22692270
table_paths: vec![ListingTableUrl::parse("file:///")?],
22702271
output_schema: schema.clone(),
22712272
table_partition_cols: vec![],
2272-
overwrite: true,
2273+
insert_op: InsertOp::Overwrite,
22732274
keep_partition_by_columns: false,
22742275
};
22752276
let parquet_sink = Arc::new(ParquetSink::new(
@@ -2364,7 +2365,7 @@ mod tests {
23642365
table_paths: vec![ListingTableUrl::parse("file:///")?],
23652366
output_schema: schema.clone(),
23662367
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
2367-
overwrite: true,
2368+
insert_op: InsertOp::Overwrite,
23682369
keep_partition_by_columns: false,
23692370
};
23702371
let parquet_sink = Arc::new(ParquetSink::new(
@@ -2447,7 +2448,7 @@ mod tests {
24472448
table_paths: vec![ListingTableUrl::parse("file:///")?],
24482449
output_schema: schema.clone(),
24492450
table_partition_cols: vec![],
2450-
overwrite: true,
2451+
insert_op: InsertOp::Overwrite,
24512452
keep_partition_by_columns: false,
24522453
};
24532454
let parquet_sink = Arc::new(ParquetSink::new(

datafusion/core/src/datasource/listing/table.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::datasource::{
3434
use crate::execution::context::SessionState;
3535
use datafusion_catalog::TableProvider;
3636
use datafusion_common::{DataFusionError, Result};
37+
use datafusion_expr::dml::InsertOp;
3738
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
3839
use datafusion_expr::{SortExpr, TableType};
3940
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
@@ -916,7 +917,7 @@ impl TableProvider for ListingTable {
916917
&self,
917918
state: &dyn Session,
918919
input: Arc<dyn ExecutionPlan>,
919-
overwrite: bool,
920+
insert_op: InsertOp,
920921
) -> Result<Arc<dyn ExecutionPlan>> {
921922
// Check that the schema of the plan matches the schema of this table.
922923
if !self
@@ -975,7 +976,7 @@ impl TableProvider for ListingTable {
975976
file_groups,
976977
output_schema: self.schema(),
977978
table_partition_cols: self.options.table_partition_cols.clone(),
978-
overwrite,
979+
insert_op,
979980
keep_partition_by_columns,
980981
};
981982

@@ -1990,7 +1991,8 @@ mod tests {
19901991
// Therefore, we will have 8 partitions in the final plan.
19911992
// Create an insert plan to insert the source data into the initial table
19921993
let insert_into_table =
1993-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
1994+
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
1995+
.build()?;
19941996
// Create a physical plan from the insert plan
19951997
let plan = session_ctx
19961998
.state()

datafusion/core/src/datasource/memory.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef;
3939
use arrow::record_batch::RecordBatch;
4040
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
4141
use datafusion_execution::TaskContext;
42+
use datafusion_expr::dml::InsertOp;
4243
use datafusion_physical_plan::metrics::MetricsSet;
4344

4445
use async_trait::async_trait;
@@ -262,7 +263,7 @@ impl TableProvider for MemTable {
262263
&self,
263264
_state: &dyn Session,
264265
input: Arc<dyn ExecutionPlan>,
265-
overwrite: bool,
266+
insert_op: InsertOp,
266267
) -> Result<Arc<dyn ExecutionPlan>> {
267268
// If we are inserting into the table, any sort order may be messed up so reset it here
268269
*self.sort_order.lock() = vec![];
@@ -289,8 +290,8 @@ impl TableProvider for MemTable {
289290
.collect::<Vec<_>>()
290291
);
291292
}
292-
if overwrite {
293-
return not_impl_err!("Overwrite not implemented for MemoryTable yet");
293+
if insert_op != InsertOp::Append {
294+
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
294295
}
295296
let sink = Arc::new(MemSink::new(self.batches.clone()));
296297
Ok(Arc::new(DataSinkExec::new(
@@ -638,7 +639,8 @@ mod tests {
638639
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
639640
// Create an insert plan to insert the source data into the initial table
640641
let insert_into_table =
641-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
642+
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
643+
.build()?;
642644
// Create a physical plan from the insert plan
643645
let plan = session_ctx
644646
.state()

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
3636
pub use arrow_file::ArrowExec;
3737
pub use avro::AvroExec;
3838
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
39+
use datafusion_expr::dml::InsertOp;
3940
pub use file_groups::FileGroupPartitioner;
4041
pub use file_scan_config::{
4142
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
@@ -83,8 +84,9 @@ pub struct FileSinkConfig {
8384
/// A vector of column names and their corresponding data types,
8485
/// representing the partitioning columns for the file
8586
pub table_partition_cols: Vec<(String, DataType)>,
86-
/// Controls whether existing data should be overwritten by this sink
87-
pub overwrite: bool,
87+
/// Controls how new data should be written to the file, determining whether
88+
/// to append to, overwrite, or replace records in existing files.
89+
pub insert_op: InsertOp,
8890
/// Controls whether partition columns are kept for the file
8991
pub keep_partition_by_columns: bool,
9092
}

datafusion/core/src/datasource/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use arrow_schema::SchemaRef;
3333
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3434
use datafusion_common_runtime::SpawnedTask;
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36+
use datafusion_expr::dml::InsertOp;
3637
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
3738
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
3839
use datafusion_physical_plan::metrics::MetricsSet;
@@ -350,7 +351,7 @@ impl TableProvider for StreamTable {
350351
&self,
351352
_state: &dyn Session,
352353
input: Arc<dyn ExecutionPlan>,
353-
_overwrite: bool,
354+
_insert_op: InsertOp,
354355
) -> Result<Arc<dyn ExecutionPlan>> {
355356
let ordering = match self.0.order.first() {
356357
Some(x) => {

datafusion/core/src/physical_planner.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ use datafusion_common::{
7171
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
7272
ScalarValue,
7373
};
74-
use datafusion_expr::dml::CopyTo;
74+
use datafusion_expr::dml::{CopyTo, InsertOp};
7575
use datafusion_expr::expr::{
7676
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
7777
};
@@ -529,7 +529,7 @@ impl DefaultPhysicalPlanner {
529529
file_groups: vec![],
530530
output_schema: Arc::new(schema),
531531
table_partition_cols,
532-
overwrite: false,
532+
insert_op: InsertOp::Append,
533533
keep_partition_by_columns,
534534
};
535535

@@ -542,31 +542,15 @@ impl DefaultPhysicalPlanner {
542542
}
543543
LogicalPlan::Dml(DmlStatement {
544544
table_name,
545-
op: WriteOp::InsertInto,
545+
op: WriteOp::Insert(insert_op),
546546
..
547547
}) => {
548548
let name = table_name.table();
549549
let schema = session_state.schema_for_ref(table_name.clone())?;
550550
if let Some(provider) = schema.table(name).await? {
551551
let input_exec = children.one()?;
552552
provider
553-
.insert_into(session_state, input_exec, false)
554-
.await?
555-
} else {
556-
return exec_err!("Table '{table_name}' does not exist");
557-
}
558-
}
559-
LogicalPlan::Dml(DmlStatement {
560-
table_name,
561-
op: WriteOp::InsertOverwrite,
562-
..
563-
}) => {
564-
let name = table_name.table();
565-
let schema = session_state.schema_for_ref(table_name.clone())?;
566-
if let Some(provider) = schema.table(name).await? {
567-
let input_exec = children.one()?;
568-
provider
569-
.insert_into(session_state, input_exec, true)
553+
.insert_into(session_state, input_exec, *insert_op)
570554
.await?
571555
} else {
572556
return exec_err!("Table '{table_name}' does not exist");

0 commit comments

Comments
 (0)