Skip to content

Commit 76624a6

Browse files
committed
Remove ListingTable Append Support (apache#7994)
1 parent 020b8fc commit 76624a6

File tree

18 files changed

+67
-989
lines changed

18 files changed

+67
-989
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 5 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ use futures::stream::BoxStream;
3434
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
3535
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
3636

37-
use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
37+
use super::write::orchestration::stateless_multipart_put;
3838
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
3939
use crate::datasource::file_format::file_compression_type::FileCompressionType;
40-
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
40+
use crate::datasource::file_format::write::BatchSerializer;
4141
use crate::datasource::physical_plan::{
4242
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
4343
};
@@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
465465
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466466
match t {
467467
DisplayFormatType::Default | DisplayFormatType::Verbose => {
468-
write!(
469-
f,
470-
"CsvSink(writer_mode={:?}, file_groups=",
471-
self.config.writer_mode
472-
)?;
468+
write!(f, "CsvSink(file_groups=",)?;
473469
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
474470
write!(f, ")")
475471
}
@@ -481,55 +477,6 @@ impl CsvSink {
481477
fn new(config: FileSinkConfig) -> Self {
482478
Self { config }
483479
}
484-
485-
async fn append_all(
486-
&self,
487-
data: SendableRecordBatchStream,
488-
context: &Arc<TaskContext>,
489-
) -> Result<u64> {
490-
if !self.config.table_partition_cols.is_empty() {
491-
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
492-
}
493-
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
494-
let (builder, compression) =
495-
(&writer_options.writer_options, &writer_options.compression);
496-
let compression = FileCompressionType::from(*compression);
497-
498-
let object_store = context
499-
.runtime_env()
500-
.object_store(&self.config.object_store_url)?;
501-
let file_groups = &self.config.file_groups;
502-
503-
let builder_clone = builder.clone();
504-
let options_clone = writer_options.clone();
505-
let get_serializer = move |file_size| {
506-
let inner_clone = builder_clone.clone();
507-
// In append mode, consider has_header flag only when file is empty (at the start).
508-
// For other modes, use has_header flag as is.
509-
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
510-
CsvSerializer::new()
511-
.with_builder(inner_clone)
512-
.with_header(false)
513-
} else {
514-
CsvSerializer::new()
515-
.with_builder(inner_clone)
516-
.with_header(options_clone.writer_options.header())
517-
});
518-
serializer
519-
};
520-
521-
stateless_append_all(
522-
data,
523-
context,
524-
object_store,
525-
file_groups,
526-
self.config.unbounded_input,
527-
compression,
528-
Box::new(get_serializer),
529-
)
530-
.await
531-
}
532-
533480
async fn multipartput_all(
534481
&self,
535482
data: SendableRecordBatchStream,
@@ -577,19 +524,8 @@ impl DataSink for CsvSink {
577524
data: SendableRecordBatchStream,
578525
context: &Arc<TaskContext>,
579526
) -> Result<u64> {
580-
match self.config.writer_mode {
581-
FileWriterMode::Append => {
582-
let total_count = self.append_all(data, context).await?;
583-
Ok(total_count)
584-
}
585-
FileWriterMode::PutMultipart => {
586-
let total_count = self.multipartput_all(data, context).await?;
587-
Ok(total_count)
588-
}
589-
FileWriterMode::Put => {
590-
return not_impl_err!("FileWriterMode::Put is not supported yet!")
591-
}
592-
}
527+
let total_count = self.multipartput_all(data, context).await?;
528+
Ok(total_count)
593529
}
594530
}
595531

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
4545
use crate::physical_plan::SendableRecordBatchStream;
4646
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
4747

48-
use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
48+
use super::write::orchestration::stateless_multipart_put;
4949

5050
use crate::datasource::file_format::file_compression_type::FileCompressionType;
51-
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
51+
use crate::datasource::file_format::write::BatchSerializer;
5252
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
5353
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
5454
use crate::error::Result;
@@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
245245
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246246
match t {
247247
DisplayFormatType::Default | DisplayFormatType::Verbose => {
248-
write!(
249-
f,
250-
"JsonSink(writer_mode={:?}, file_groups=",
251-
self.config.writer_mode
252-
)?;
248+
write!(f, "JsonSink(file_groups=",)?;
253249
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
254250
write!(f, ")")
255251
}
@@ -268,40 +264,6 @@ impl JsonSink {
268264
&self.config
269265
}
270266

271-
async fn append_all(
272-
&self,
273-
data: SendableRecordBatchStream,
274-
context: &Arc<TaskContext>,
275-
) -> Result<u64> {
276-
if !self.config.table_partition_cols.is_empty() {
277-
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
278-
}
279-
280-
let writer_options = self.config.file_type_writer_options.try_into_json()?;
281-
let compression = &writer_options.compression;
282-
283-
let object_store = context
284-
.runtime_env()
285-
.object_store(&self.config.object_store_url)?;
286-
let file_groups = &self.config.file_groups;
287-
288-
let get_serializer = move |_| {
289-
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
290-
serializer
291-
};
292-
293-
stateless_append_all(
294-
data,
295-
context,
296-
object_store,
297-
file_groups,
298-
self.config.unbounded_input,
299-
(*compression).into(),
300-
Box::new(get_serializer),
301-
)
302-
.await
303-
}
304-
305267
async fn multipartput_all(
306268
&self,
307269
data: SendableRecordBatchStream,
@@ -342,19 +304,8 @@ impl DataSink for JsonSink {
342304
data: SendableRecordBatchStream,
343305
context: &Arc<TaskContext>,
344306
) -> Result<u64> {
345-
match self.config.writer_mode {
346-
FileWriterMode::Append => {
347-
let total_count = self.append_all(data, context).await?;
348-
Ok(total_count)
349-
}
350-
FileWriterMode::PutMultipart => {
351-
let total_count = self.multipartput_all(data, context).await?;
352-
Ok(total_count)
353-
}
354-
FileWriterMode::Put => {
355-
return not_impl_err!("FileWriterMode::Put is not supported yet!")
356-
}
357-
}
307+
let total_count = self.multipartput_all(data, context).await?;
308+
Ok(total_count)
358309
}
359310
}
360311

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType;
2828
#[cfg(feature = "parquet")]
2929
use crate::datasource::file_format::parquet::ParquetFormat;
3030
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
31-
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
31+
use crate::datasource::listing::ListingTableUrl;
3232
use crate::datasource::{
3333
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
3434
listing::ListingOptions,
@@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
7676
pub infinite: bool,
7777
/// Indicates how the file is sorted
7878
pub file_sort_order: Vec<Vec<Expr>>,
79-
/// Setting controls how inserts to this file should be handled
80-
pub insert_mode: ListingTableInsertMode,
8179
}
8280

8381
impl<'a> Default for CsvReadOptions<'a> {
@@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
10199
file_compression_type: FileCompressionType::UNCOMPRESSED,
102100
infinite: false,
103101
file_sort_order: vec![],
104-
insert_mode: ListingTableInsertMode::AppendToFile,
105102
}
106103
}
107104

@@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
184181
self.file_sort_order = file_sort_order;
185182
self
186183
}
187-
188-
/// Configure how insertions to this table should be handled
189-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
190-
self.insert_mode = insert_mode;
191-
self
192-
}
193184
}
194185

195186
/// Options that control the reading of Parquet files.
@@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
219210
pub schema: Option<&'a Schema>,
220211
/// Indicates how the file is sorted
221212
pub file_sort_order: Vec<Vec<Expr>>,
222-
/// Setting controls how inserts to this file should be handled
223-
pub insert_mode: ListingTableInsertMode,
224213
}
225214

226215
impl<'a> Default for ParquetReadOptions<'a> {
@@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
232221
skip_metadata: None,
233222
schema: None,
234223
file_sort_order: vec![],
235-
insert_mode: ListingTableInsertMode::AppendNewFiles,
236224
}
237225
}
238226
}
@@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
272260
self.file_sort_order = file_sort_order;
273261
self
274262
}
275-
276-
/// Configure how insertions to this table should be handled
277-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
278-
self.insert_mode = insert_mode;
279-
self
280-
}
281263
}
282264

283265
/// Options that control the reading of ARROW files.
@@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
403385
pub infinite: bool,
404386
/// Indicates how the file is sorted
405387
pub file_sort_order: Vec<Vec<Expr>>,
406-
/// Setting controls how inserts to this file should be handled
407-
pub insert_mode: ListingTableInsertMode,
408388
}
409389

410390
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
417397
file_compression_type: FileCompressionType::UNCOMPRESSED,
418398
infinite: false,
419399
file_sort_order: vec![],
420-
insert_mode: ListingTableInsertMode::AppendToFile,
421400
}
422401
}
423402
}
@@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
464443
self.file_sort_order = file_sort_order;
465444
self
466445
}
467-
468-
/// Configure how insertions to this table should be handled
469-
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
470-
self.insert_mode = insert_mode;
471-
self
472-
}
473446
}
474447

475448
#[async_trait]
@@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
528501
.with_table_partition_cols(self.table_partition_cols.clone())
529502
.with_file_sort_order(self.file_sort_order.clone())
530503
.with_infinite_source(self.infinite)
531-
.with_insert_mode(self.insert_mode.clone())
532504
}
533505

534506
async fn get_resolved_schema(
@@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
555527
.with_target_partitions(config.target_partitions())
556528
.with_table_partition_cols(self.table_partition_cols.clone())
557529
.with_file_sort_order(self.file_sort_order.clone())
558-
.with_insert_mode(self.insert_mode.clone())
559530
}
560531

561532
async fn get_resolved_schema(
@@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
582553
.with_table_partition_cols(self.table_partition_cols.clone())
583554
.with_infinite_source(self.infinite)
584555
.with_file_sort_order(self.file_sort_order.clone())
585-
.with_insert_mode(self.insert_mode.clone())
586556
}
587557

588558
async fn get_resolved_schema(

0 commit comments

Comments
 (0)