Skip to content

Commit 3a001ad

Browse files
committed
buffer writes to shuffle files and spill files
1 parent ae6b0f3 commit 3a001ad

File tree

1 file changed

+97
-26
lines changed

1 file changed

+97
-26
lines changed

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
4646
use futures::executor::block_on;
4747
use futures::{StreamExt, TryFutureExt, TryStreamExt};
4848
use itertools::Itertools;
49-
use std::io::Error;
49+
use std::io::{Cursor, Error, SeekFrom};
5050
use std::{
5151
any::Any,
5252
fmt,
@@ -589,15 +589,13 @@ impl ShuffleRepartitioner {
589589
offsets[i] = output_data.stream_position()?;
590590

591591
// Write in memory batches to output data file
592-
let partition_iter = partitioned_batches.produce(i);
593-
for batch in partition_iter {
594-
let batch = batch?;
595-
self.shuffle_block_writer.write_batch(
596-
&batch,
597-
&mut output_data,
598-
&self.metrics.encode_time,
599-
)?;
600-
}
592+
let mut partition_iter = partitioned_batches.produce(i);
593+
Self::shuffle_write_partition(
594+
&mut partition_iter,
595+
&mut self.shuffle_block_writer,
596+
&mut output_data,
597+
&self.metrics.encode_time,
598+
)?;
601599

602600
// if we wrote a spill file for this partition then copy the
603601
// contents into the shuffle file
@@ -634,6 +632,21 @@ impl ShuffleRepartitioner {
634632
))))
635633
}
636634

635+
fn shuffle_write_partition(
636+
partition_iter: &mut BatchIterator,
637+
shuffle_block_writer: &mut ShuffleBlockWriter,
638+
output_data: &mut BufWriter<File>,
639+
encode_time: &Time,
640+
) -> Result<()> {
641+
let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data);
642+
for batch in partition_iter {
643+
let batch = batch?;
644+
buf_batch_writer.write(&batch, encode_time)?;
645+
}
646+
buf_batch_writer.flush()?;
647+
Ok(())
648+
}
649+
637650
fn to_df_err(e: Error) -> DataFusionError {
638651
DataFusionError::Execution(format!("shuffle write error: {:?}", e))
639652
}
@@ -692,11 +705,8 @@ impl ShuffleRepartitioner {
692705

693706
for partition_id in 0..num_output_partitions {
694707
let partition_writer = &mut self.partition_writers[partition_id];
695-
let iter = partitioned_batches.produce(partition_id);
696-
for batch in iter {
697-
let batch = batch?;
698-
spilled_bytes += partition_writer.spill(&batch, &self.runtime, &self.metrics)?;
699-
}
708+
let mut iter = partitioned_batches.produce(partition_id);
709+
spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?;
700710
}
701711

702712
let mut timer = self.metrics.mempool_time.timer();
@@ -927,11 +937,37 @@ impl PartitionWriter {
927937

928938
fn spill(
929939
&mut self,
930-
batch: &RecordBatch,
940+
iter: &mut BatchIterator,
931941
runtime: &RuntimeEnv,
932942
metrics: &ShuffleRepartitionerMetrics,
933943
) -> Result<usize> {
934-
let mut write_timer = metrics.write_time.timer();
944+
if let Some(batch) = iter.next() {
945+
let mut write_timer = metrics.write_time.timer();
946+
self.ensure_spill_file_created(runtime)?;
947+
948+
let total_bytes_written = {
949+
let mut buf_batch_writer = BufBatchWriter::new(
950+
&mut self.shuffle_block_writer,
951+
&mut self.spill_file.as_mut().unwrap().file,
952+
);
953+
let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time)?;
954+
for batch in iter {
955+
let batch = batch?;
956+
bytes_written += buf_batch_writer.write(&batch, &metrics.encode_time)?;
957+
}
958+
buf_batch_writer.flush()?;
959+
bytes_written
960+
};
961+
962+
write_timer.stop();
963+
964+
Ok(total_bytes_written)
965+
} else {
966+
Ok(0)
967+
}
968+
}
969+
970+
fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> Result<()> {
935971
if self.spill_file.is_none() {
936972
// Spill file is not yet created, create it
937973
let spill_file = runtime
@@ -950,20 +986,55 @@ impl PartitionWriter {
950986
file: spill_data,
951987
});
952988
}
989+
Ok(())
990+
}
991+
}
953992

954-
let bytes_written = {
955-
let mut writer = BufWriter::new(&self.spill_file.as_mut().unwrap().file);
956-
let bytes_written =
957-
self.shuffle_block_writer
958-
.write_batch(batch, &mut writer, &metrics.encode_time)?;
959-
writer.flush()?;
960-
bytes_written
961-
};
993+
/// Write batches to writer while using a buffer to avoid frequent system calls.
994+
/// The record batches were first written by ShuffleBlockWriter into an internal buffer.
995+
/// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
996+
struct BufBatchWriter<'a, W: Write> {
997+
shuffle_block_writer: &'a mut ShuffleBlockWriter,
998+
writer: &'a mut W,
999+
buffer: Vec<u8>,
1000+
buffer_max_size: usize,
1001+
}
9621002

963-
write_timer.stop();
1003+
impl<'a, W: Write> BufBatchWriter<'a, W> {
1004+
fn new(shuffle_block_writer: &'a mut ShuffleBlockWriter, writer: &'a mut W) -> Self {
1005+
// 1MB should be good enough to avoid frequent system calls,
1006+
// and also won't cause too much memory usage
1007+
let buffer_max_size = 1024 * 1024;
1008+
Self {
1009+
shuffle_block_writer,
1010+
writer,
1011+
buffer: vec![],
1012+
buffer_max_size,
1013+
}
1014+
}
9641015

1016+
fn write(&mut self, batch: &RecordBatch, encode_time: &Time) -> Result<usize> {
1017+
let mut cursor = Cursor::new(&mut self.buffer);
1018+
cursor.seek(SeekFrom::End(0))?;
1019+
let bytes_written =
1020+
self.shuffle_block_writer
1021+
.write_batch(batch, &mut cursor, encode_time)?;
1022+
let pos = cursor.position();
1023+
if pos > self.buffer_max_size as u64 {
1024+
self.writer.write_all(&self.buffer)?;
1025+
self.buffer.clear();
1026+
}
9651027
Ok(bytes_written)
9661028
}
1029+
1030+
fn flush(&mut self) -> Result<()> {
1031+
if !self.buffer.is_empty() {
1032+
self.writer.write_all(&self.buffer)?;
1033+
self.buffer.clear();
1034+
}
1035+
self.writer.flush()?;
1036+
Ok(())
1037+
}
9671038
}
9681039

9691040
fn pmod(hash: u32, n: usize) -> usize {

0 commit comments

Comments
 (0)