Skip to content

Commit 3d917a0

Browse files
Extend insert into support to include Json backed tables (#7212)
* jsonsink and test simplemented * fix tests and clean up * clippy * minor refactor * comments + append existing file test check no new files added * format comments Co-authored-by: Metehan Yıldırım <[email protected]> --------- Co-authored-by: Metehan Yıldırım <[email protected]>
1 parent 99e2cd4 commit 3d917a0

File tree

5 files changed

+518
-138
lines changed

5 files changed

+518
-138
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ use bytes::{Buf, Bytes};
3636
use futures::stream::BoxStream;
3737
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
3838
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
39-
use tokio::io::{AsyncWrite, AsyncWriteExt};
39+
use tokio::io::AsyncWrite;
4040

41-
use super::FileFormat;
41+
use super::{stateless_serialize_and_write_files, FileFormat};
4242
use crate::datasource::file_format::file_type::FileCompressionType;
4343
use crate::datasource::file_format::FileWriterMode;
4444
use crate::datasource::file_format::{
@@ -274,6 +274,12 @@ impl FileFormat for CsvFormat {
274274
"Overwrites are not implemented yet for CSV".into(),
275275
));
276276
}
277+
278+
if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
279+
return Err(DataFusionError::NotImplemented(
280+
"Inserting compressed CSV is not implemented yet.".into(),
281+
));
282+
}
277283
let sink_schema = conf.output_schema().clone();
278284
let sink = Arc::new(CsvSink::new(
279285
conf,
@@ -439,28 +445,6 @@ impl BatchSerializer for CsvSerializer {
439445
}
440446
}
441447

442-
async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
443-
result: Result<T>,
444-
writers: &mut [AbortableWrite<W>],
445-
) -> Result<T> {
446-
match result {
447-
Ok(value) => Ok(value),
448-
Err(e) => {
449-
// Abort all writers before returning the error:
450-
for writer in writers {
451-
let mut abort_future = writer.abort_writer();
452-
if let Ok(abort_future) = &mut abort_future {
453-
let _ = abort_future.await;
454-
}
455-
// Ignore errors that occur during abortion,
456-
// We do try to abort all writers before returning error.
457-
}
458-
// After aborting writers return original error.
459-
Err(e)
460-
}
461-
}
462-
}
463-
464448
/// Implements [`DataSink`] for writing to a CSV file.
465449
struct CsvSink {
466450
/// Config options for writing data
@@ -566,7 +550,7 @@ impl CsvSink {
566550
impl DataSink for CsvSink {
567551
async fn write_all(
568552
&self,
569-
mut data: Vec<SendableRecordBatchStream>,
553+
data: Vec<SendableRecordBatchStream>,
570554
context: &Arc<TaskContext>,
571555
) -> Result<u64> {
572556
let num_partitions = data.len();
@@ -576,7 +560,7 @@ impl DataSink for CsvSink {
576560
.object_store(&self.config.object_store_url)?;
577561

578562
// Construct serializer and writer for each file group
579-
let mut serializers = vec![];
563+
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
580564
let mut writers = vec![];
581565
match self.config.writer_mode {
582566
FileWriterMode::Append => {
@@ -590,7 +574,7 @@ impl DataSink for CsvSink {
590574
let serializer = CsvSerializer::new()
591575
.with_builder(builder)
592576
.with_header(header);
593-
serializers.push(serializer);
577+
serializers.push(Box::new(serializer));
594578

595579
let file = file_group.clone();
596580
let writer = self
@@ -608,17 +592,17 @@ impl DataSink for CsvSink {
608592
))
609593
}
610594
FileWriterMode::PutMultipart => {
611-
//currently assuming only 1 partition path (i.e. not hive style partitioning on a column)
595+
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
612596
let base_path = &self.config.table_paths[0];
613-
//uniquely identify this batch of files with a random string, to prevent collisions overwriting files
597+
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
614598
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
615599
for part_idx in 0..num_partitions {
616600
let header = true;
617601
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
618602
let serializer = CsvSerializer::new()
619603
.with_builder(builder)
620604
.with_header(header);
621-
serializers.push(serializer);
605+
serializers.push(Box::new(serializer));
622606
let file_path = base_path
623607
.prefix()
624608
.child(format!("/{}_{}.csv", write_id, part_idx));
@@ -636,39 +620,7 @@ impl DataSink for CsvSink {
636620
}
637621
}
638622

639-
let mut row_count = 0;
640-
// Map errors to DatafusionError.
641-
let err_converter =
642-
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
643-
// TODO parallelize serialization accross partitions and batches within partitions
644-
// see: https://github.com/apache/arrow-datafusion/issues/7079
645-
for idx in 0..num_partitions {
646-
while let Some(maybe_batch) = data[idx].next().await {
647-
// Write data to files in a round robin fashion:
648-
let serializer = &mut serializers[idx];
649-
let batch = check_for_errors(maybe_batch, &mut writers).await?;
650-
row_count += batch.num_rows();
651-
let bytes =
652-
check_for_errors(serializer.serialize(batch).await, &mut writers)
653-
.await?;
654-
let writer = &mut writers[idx];
655-
check_for_errors(
656-
writer.write_all(&bytes).await.map_err(err_converter),
657-
&mut writers,
658-
)
659-
.await?;
660-
}
661-
}
662-
// Perform cleanup:
663-
let n_writers = writers.len();
664-
for idx in 0..n_writers {
665-
check_for_errors(
666-
writers[idx].shutdown().await.map_err(err_converter),
667-
&mut writers,
668-
)
669-
.await?;
670-
}
671-
Ok(row_count as u64)
623+
stateless_serialize_and_write_files(data, serializers, writers).await
672624
}
673625
}
674626

0 commit comments

Comments
 (0)