Skip to content

Commit 1d7e8bd

Browse files
committed
POC: Remove ListingTable Append Support (apache#7994)
1 parent 656c6a9 commit 1d7e8bd

20 files changed

+94
-792
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 5 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ use futures::stream::BoxStream;
3434
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
3535
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
3636

37-
use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
37+
use super::write::orchestration::stateless_multipart_put;
3838
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
3939
use crate::datasource::file_format::file_compression_type::FileCompressionType;
40-
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
40+
use crate::datasource::file_format::write::BatchSerializer;
4141
use crate::datasource::physical_plan::{
4242
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
4343
};
@@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
465465
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466466
match t {
467467
DisplayFormatType::Default | DisplayFormatType::Verbose => {
468-
write!(
469-
f,
470-
"CsvSink(writer_mode={:?}, file_groups=",
471-
self.config.writer_mode
472-
)?;
468+
write!(f, "CsvSink(file_groups=",)?;
473469
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
474470
write!(f, ")")
475471
}
@@ -481,55 +477,6 @@ impl CsvSink {
481477
fn new(config: FileSinkConfig) -> Self {
482478
Self { config }
483479
}
484-
485-
async fn append_all(
486-
&self,
487-
data: SendableRecordBatchStream,
488-
context: &Arc<TaskContext>,
489-
) -> Result<u64> {
490-
if !self.config.table_partition_cols.is_empty() {
491-
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
492-
}
493-
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
494-
let (builder, compression) =
495-
(&writer_options.writer_options, &writer_options.compression);
496-
let compression = FileCompressionType::from(*compression);
497-
498-
let object_store = context
499-
.runtime_env()
500-
.object_store(&self.config.object_store_url)?;
501-
let file_groups = &self.config.file_groups;
502-
503-
let builder_clone = builder.clone();
504-
let options_clone = writer_options.clone();
505-
let get_serializer = move |file_size| {
506-
let inner_clone = builder_clone.clone();
507-
// In append mode, consider has_header flag only when file is empty (at the start).
508-
// For other modes, use has_header flag as is.
509-
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
510-
CsvSerializer::new()
511-
.with_builder(inner_clone)
512-
.with_header(false)
513-
} else {
514-
CsvSerializer::new()
515-
.with_builder(inner_clone)
516-
.with_header(options_clone.writer_options.header())
517-
});
518-
serializer
519-
};
520-
521-
stateless_append_all(
522-
data,
523-
context,
524-
object_store,
525-
file_groups,
526-
self.config.unbounded_input,
527-
compression,
528-
Box::new(get_serializer),
529-
)
530-
.await
531-
}
532-
533480
async fn multipartput_all(
534481
&self,
535482
data: SendableRecordBatchStream,
@@ -577,19 +524,8 @@ impl DataSink for CsvSink {
577524
data: SendableRecordBatchStream,
578525
context: &Arc<TaskContext>,
579526
) -> Result<u64> {
580-
match self.config.writer_mode {
581-
FileWriterMode::Append => {
582-
let total_count = self.append_all(data, context).await?;
583-
Ok(total_count)
584-
}
585-
FileWriterMode::PutMultipart => {
586-
let total_count = self.multipartput_all(data, context).await?;
587-
Ok(total_count)
588-
}
589-
FileWriterMode::Put => {
590-
return not_impl_err!("FileWriterMode::Put is not supported yet!")
591-
}
592-
}
527+
let total_count = self.multipartput_all(data, context).await?;
528+
Ok(total_count)
593529
}
594530
}
595531

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
4545
use crate::physical_plan::SendableRecordBatchStream;
4646
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
4747

48-
use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
48+
use super::write::orchestration::stateless_multipart_put;
4949

5050
use crate::datasource::file_format::file_compression_type::FileCompressionType;
51-
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
51+
use crate::datasource::file_format::write::BatchSerializer;
5252
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
5353
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
5454
use crate::error::Result;
@@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
245245
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246246
match t {
247247
DisplayFormatType::Default | DisplayFormatType::Verbose => {
248-
write!(
249-
f,
250-
"JsonSink(writer_mode={:?}, file_groups=",
251-
self.config.writer_mode
252-
)?;
248+
write!(f, "JsonSink(file_groups=",)?;
253249
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
254250
write!(f, ")")
255251
}
@@ -262,40 +258,6 @@ impl JsonSink {
262258
Self { config }
263259
}
264260

265-
async fn append_all(
266-
&self,
267-
data: SendableRecordBatchStream,
268-
context: &Arc<TaskContext>,
269-
) -> Result<u64> {
270-
if !self.config.table_partition_cols.is_empty() {
271-
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
272-
}
273-
274-
let writer_options = self.config.file_type_writer_options.try_into_json()?;
275-
let compression = &writer_options.compression;
276-
277-
let object_store = context
278-
.runtime_env()
279-
.object_store(&self.config.object_store_url)?;
280-
let file_groups = &self.config.file_groups;
281-
282-
let get_serializer = move |_| {
283-
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
284-
serializer
285-
};
286-
287-
stateless_append_all(
288-
data,
289-
context,
290-
object_store,
291-
file_groups,
292-
self.config.unbounded_input,
293-
(*compression).into(),
294-
Box::new(get_serializer),
295-
)
296-
.await
297-
}
298-
299261
async fn multipartput_all(
300262
&self,
301263
data: SendableRecordBatchStream,
@@ -336,19 +298,8 @@ impl DataSink for JsonSink {
336298
data: SendableRecordBatchStream,
337299
context: &Arc<TaskContext>,
338300
) -> Result<u64> {
339-
match self.config.writer_mode {
340-
FileWriterMode::Append => {
341-
let total_count = self.append_all(data, context).await?;
342-
Ok(total_count)
343-
}
344-
FileWriterMode::PutMultipart => {
345-
let total_count = self.multipartput_all(data, context).await?;
346-
Ok(total_count)
347-
}
348-
FileWriterMode::Put => {
349-
return not_impl_err!("FileWriterMode::Put is not supported yet!")
350-
}
351-
}
301+
let total_count = self.multipartput_all(data, context).await?;
302+
Ok(total_count)
352303
}
353304
}
354305

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType;
2828
#[cfg(feature = "parquet")]
2929
use crate::datasource::file_format::parquet::ParquetFormat;
3030
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
31-
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
31+
use crate::datasource::listing::ListingTableUrl;
3232
use crate::datasource::{
3333
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
3434
listing::ListingOptions,
@@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
7676
pub infinite: bool,
7777
/// Indicates how the file is sorted
7878
pub file_sort_order: Vec<Vec<Expr>>,
79-
/// Setting controls how inserts to this file should be handled
80-
pub insert_mode: ListingTableInsertMode,
8179
}
8280

8381
impl<'a> Default for CsvReadOptions<'a> {
@@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
10199
file_compression_type: FileCompressionType::UNCOMPRESSED,
102100
infinite: false,
103101
file_sort_order: vec![],
104-
insert_mode: ListingTableInsertMode::AppendToFile,
105102
}
106103
}
107104

@@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
184181
self.file_sort_order = file_sort_order;
185182
self
186183
}
187-
188-
/// Configure how insertions to this table should be handled
189-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
190-
self.insert_mode = insert_mode;
191-
self
192-
}
193184
}
194185

195186
/// Options that control the reading of Parquet files.
@@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
219210
pub schema: Option<&'a Schema>,
220211
/// Indicates how the file is sorted
221212
pub file_sort_order: Vec<Vec<Expr>>,
222-
/// Setting controls how inserts to this file should be handled
223-
pub insert_mode: ListingTableInsertMode,
224213
}
225214

226215
impl<'a> Default for ParquetReadOptions<'a> {
@@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
232221
skip_metadata: None,
233222
schema: None,
234223
file_sort_order: vec![],
235-
insert_mode: ListingTableInsertMode::AppendNewFiles,
236224
}
237225
}
238226
}
@@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
272260
self.file_sort_order = file_sort_order;
273261
self
274262
}
275-
276-
/// Configure how insertions to this table should be handled
277-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
278-
self.insert_mode = insert_mode;
279-
self
280-
}
281263
}
282264

283265
/// Options that control the reading of ARROW files.
@@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
403385
pub infinite: bool,
404386
/// Indicates how the file is sorted
405387
pub file_sort_order: Vec<Vec<Expr>>,
406-
/// Setting controls how inserts to this file should be handled
407-
pub insert_mode: ListingTableInsertMode,
408388
}
409389

410390
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
417397
file_compression_type: FileCompressionType::UNCOMPRESSED,
418398
infinite: false,
419399
file_sort_order: vec![],
420-
insert_mode: ListingTableInsertMode::AppendToFile,
421400
}
422401
}
423402
}
@@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
464443
self.file_sort_order = file_sort_order;
465444
self
466445
}
467-
468-
/// Configure how insertions to this table should be handled
469-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
470-
self.insert_mode = insert_mode;
471-
self
472-
}
473446
}
474447

475448
#[async_trait]
@@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
528501
.with_table_partition_cols(self.table_partition_cols.clone())
529502
.with_file_sort_order(self.file_sort_order.clone())
530503
.with_infinite_source(self.infinite)
531-
.with_insert_mode(self.insert_mode.clone())
532504
}
533505

534506
async fn get_resolved_schema(
@@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
555527
.with_target_partitions(config.target_partitions())
556528
.with_table_partition_cols(self.table_partition_cols.clone())
557529
.with_file_sort_order(self.file_sort_order.clone())
558-
.with_insert_mode(self.insert_mode.clone())
559530
}
560531

561532
async fn get_resolved_schema(
@@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
582553
.with_table_partition_cols(self.table_partition_cols.clone())
583554
.with_infinite_source(self.infinite)
584555
.with_file_sort_order(self.file_sort_order.clone())
585-
.with_insert_mode(self.insert_mode.clone())
586556
}
587557

588558
async fn get_resolved_schema(

0 commit comments

Comments
 (0)