From f66924d7e4dee28eda485ac11c7a4d8add5e30be Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Tue, 11 Mar 2025 21:51:02 +0800 Subject: [PATCH 1/9] Reimplement ShuffleWriterExec using interleave_record_batch --- native/core/src/execution/planner.rs | 2 +- native/core/src/execution/shuffle/builders.rs | 599 -------------- native/core/src/execution/shuffle/codec.rs | 4 +- native/core/src/execution/shuffle/mod.rs | 1 - .../src/execution/shuffle/shuffle_writer.rs | 732 ++++++++---------- 5 files changed, 335 insertions(+), 1003 deletions(-) delete mode 100644 native/core/src/execution/shuffle/builders.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 60803dfeb8..9496acce38 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1214,7 +1214,7 @@ impl PhysicalPlanner { }?; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( - Arc::clone(&child.native_plan), + Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), partitioning, codec, writer.output_data_file.clone(), diff --git a/native/core/src/execution/shuffle/builders.rs b/native/core/src/execution/shuffle/builders.rs deleted file mode 100644 index 75a0068f4b..0000000000 --- a/native/core/src/execution/shuffle/builders.rs +++ /dev/null @@ -1,599 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::builder::{make_builder, ArrayBuilder}; -use std::sync::Arc; - -use crate::common::bit::ceil; -use arrow::datatypes::*; -use datafusion::arrow::{ - array::*, - datatypes::{DataType, SchemaRef, TimeUnit}, - error::Result as ArrowResult, - record_batch::RecordBatch, -}; - -pub(crate) fn new_array_builders( - schema: &SchemaRef, - batch_size: usize, -) -> Vec> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - if matches!(dt, DataType::Dictionary(_, _)) { - make_dict_builder(dt, batch_size) - } else { - make_builder(dt, batch_size) - } - }) - .collect::>() -} - -macro_rules! primitive_dict_builder_inner_helper { - ($kt:ty, $vt:ty, $capacity:ident) => { - Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity( - $capacity, - $capacity / 100, - )) - }; -} - -macro_rules! primitive_dict_builder_helper { - ($kt:ty, $vt:ident, $capacity:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity) - } - DataType::Int16 => { - primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity) - } - DataType::Int32 => { - primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity) - } - DataType::Int64 => { - primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity) - } - DataType::UInt8 => { - primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity) - } - DataType::UInt16 => { - primitive_dict_builder_inner_helper!($kt, UInt16Type, $capacity) - } - DataType::UInt32 => { - primitive_dict_builder_inner_helper!($kt, UInt32Type, $capacity) - } - DataType::UInt64 => { - primitive_dict_builder_inner_helper!($kt, UInt64Type, $capacity) - } - DataType::Float32 => { - primitive_dict_builder_inner_helper!($kt, Float32Type, $capacity) - } - DataType::Float64 => { - primitive_dict_builder_inner_helper!($kt, Float64Type, $capacity) - } - DataType::Decimal128(p, s) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = - Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s)); - Box::new( - PrimitiveDictionaryBuilder::<$kt, Decimal128Type>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = TimestampMicrosecondBuilder::new() - .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, timezone.clone())); - Box::new( - PrimitiveDictionaryBuilder::<$kt, TimestampMicrosecondType>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Date32 => { - primitive_dict_builder_inner_helper!($kt, Date32Type, $capacity) - } - DataType::Date64 => { - primitive_dict_builder_inner_helper!($kt, Date64Type, $capacity) - } - t => unimplemented!("{:?} is not supported", t), - } - }; -} - -macro_rules! byte_dict_builder_inner_helper { - ($kt:ty, $capacity:ident, $builder:ident) => { - Box::new($builder::<$kt>::with_capacity( - $capacity, - $capacity / 100, - $capacity, - )) - }; -} - -/// Returns a dictionary array builder with capacity `capacity` that corresponds to the datatype -/// `DataType` This function is useful to construct arrays from an arbitrary vectors with -/// known/expected schema. -/// TODO: move this to the upstream. -fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box { - match datatype { - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - match key_type.as_ref() { - DataType::Int8 => primitive_dict_builder_helper!(Int8Type, value_type, capacity), - DataType::Int16 => primitive_dict_builder_helper!(Int16Type, value_type, capacity), - DataType::Int32 => primitive_dict_builder_helper!(Int32Type, value_type, capacity), - DataType::Int64 => primitive_dict_builder_helper!(Int64Type, value_type, capacity), - DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, value_type, capacity), - DataType::UInt16 => { - primitive_dict_builder_helper!(UInt16Type, value_type, capacity) - } - DataType::UInt32 => { - primitive_dict_builder_helper!(UInt32Type, value_type, capacity) - } - DataType::UInt64 => { - primitive_dict_builder_helper!(UInt64Type, value_type, capacity) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, StringDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, StringDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, StringDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, StringDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, StringDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, StringDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, StringDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, StringDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeStringDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, BinaryDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - t => panic!("Data type {t:?} is not currently supported"), - } -} - -pub(crate) fn slot_size(len: usize, data_type: &DataType) -> usize { - match data_type { - DataType::Boolean => ceil(len, 8), - DataType::Int8 => len, - DataType::Int16 => len * 2, - DataType::Int32 => len * 4, - DataType::Int64 => len * 8, - DataType::UInt8 => len, - DataType::UInt16 => len * 2, - DataType::UInt32 => len * 4, - DataType::UInt64 => len * 8, - DataType::Float32 => len * 4, - DataType::Float64 => len * 8, - DataType::Date32 => len * 4, - DataType::Date64 => len * 8, - DataType::Time32(TimeUnit::Second) => len * 4, - DataType::Time32(TimeUnit::Millisecond) => len * 4, - DataType::Time64(TimeUnit::Microsecond) => len * 8, - DataType::Time64(TimeUnit::Nanosecond) => len * 8, - // TODO: this is not accurate, but should be good enough for now - DataType::Utf8 => len * 100 + len * 4, - DataType::LargeUtf8 => len * 100 + len * 8, - DataType::Decimal128(_, _) => len * 16, - DataType::Dictionary(key_type, value_type) => { - // TODO: this is not accurate, but should be good enough for now - slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) - } - // TODO: this is not accurate, but should be good enough for now - DataType::Binary => len * 100 + len * 4, - DataType::LargeBinary => len * 100 + len * 8, - DataType::FixedSizeBinary(s) => len * (*s as usize), - DataType::Timestamp(_, _) => len * 8, - dt => unimplemented!( - "{}", - format!("data type {dt} not supported in shuffle write") - ), - } -} - -pub(crate) fn append_columns( - to: &mut Box, - from: &Arc, - indices: &[usize], - data_type: &DataType, -) { - /// Append values from `from` to `to` using `indices`. - macro_rules! append { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::().unwrap(); - let f = from.as_any().downcast_ref::().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method returning - /// a `Result`. - macro_rules! append_unwrap { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::().unwrap(); - let f = from.as_any().downcast_ref::().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)).unwrap(); - } else { - t.append_null(); - } - } - }}; - } - - /// Appends values from a dictionary array to a dictionary builder. - macro_rules! append_dict { - ($kt:ty, $builder:ty, $dict_array:ty) => {{ - let t = to.as_any_mut().downcast_mut::<$builder>().unwrap(); - let f = from - .as_any() - .downcast_ref::>() - .unwrap() - .downcast_dict::<$dict_array>() - .unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - macro_rules! append_dict_helper { - ($kt:ident, $ty:ty, $dict_array:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => append_dict!(Int8Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int16 => append_dict!(Int16Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int32 => append_dict!(Int32Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int64 => append_dict!(Int64Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::UInt8 => append_dict!(UInt8Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::UInt16 => { - append_dict!(UInt16Type, PrimitiveDictionaryBuilder, $dict_array) - } - DataType::UInt32 => { - append_dict!(UInt32Type, PrimitiveDictionaryBuilder, $dict_array) - } - DataType::UInt64 => { - append_dict!(UInt64Type, PrimitiveDictionaryBuilder, $dict_array) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - macro_rules! primitive_append_dict_helper { - ($kt:ident, $vt:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - append_dict_helper!($kt, Int8Type, Int8Array) - } - DataType::Int16 => { - append_dict_helper!($kt, Int16Type, Int16Array) - } - DataType::Int32 => { - append_dict_helper!($kt, Int32Type, Int32Array) - } - DataType::Int64 => { - append_dict_helper!($kt, Int64Type, Int64Array) - } - DataType::UInt8 => { - append_dict_helper!($kt, UInt8Type, UInt8Array) - } - DataType::UInt16 => { - append_dict_helper!($kt, UInt16Type, UInt16Array) - } - DataType::UInt32 => { - append_dict_helper!($kt, UInt32Type, UInt32Array) - } - DataType::UInt64 => { - append_dict_helper!($kt, UInt64Type, UInt64Array) - } - DataType::Float32 => { - append_dict_helper!($kt, Float32Type, Float32Array) - } - DataType::Float64 => { - append_dict_helper!($kt, Float64Type, Float64Array) - } - DataType::Decimal128(_, _) => { - append_dict_helper!($kt, Decimal128Type, Decimal128Array) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append_dict_helper!($kt, TimestampMicrosecondType, TimestampMicrosecondArray) - } - DataType::Date32 => { - append_dict_helper!($kt, Date32Type, Date32Array) - } - DataType::Date64 => { - append_dict_helper!($kt, Date64Type, Date64Array) - } - t => unimplemented!("{:?} is not supported for appending dictionary builder", t), - } - }; - } - - macro_rules! append_byte_dict { - ($kt:ident, $byte_type:ty, $array_type:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => { - append_dict!(Int8Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int16 => { - append_dict!(Int16Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int32 => { - append_dict!(Int32Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int64 => { - append_dict!(Int64Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt8 => { - append_dict!(UInt8Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt16 => { - append_dict!(UInt16Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt32 => { - append_dict!(UInt32Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt64 => { - append_dict!(UInt64Type, GenericByteDictionaryBuilder, $array_type) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - match data_type { - DataType::Boolean => append!(Boolean), - DataType::Int8 => append!(Int8), - DataType::Int16 => append!(Int16), - DataType::Int32 => append!(Int32), - DataType::Int64 => append!(Int64), - DataType::UInt8 => append!(UInt8), - DataType::UInt16 => append!(UInt16), - DataType::UInt32 => append!(UInt32), - DataType::UInt64 => append!(UInt64), - DataType::Float32 => append!(Float32), - DataType::Float64 => append!(Float64), - DataType::Date32 => append!(Date32), - DataType::Date64 => append!(Date64), - DataType::Time32(TimeUnit::Second) => append!(Time32Second), - DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond), - DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond), - DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond), - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append!(TimestampMicrosecond) - } - DataType::Utf8 => append!(String), - DataType::LargeUtf8 => append!(LargeString), - DataType::Decimal128(_, _) => append!(Decimal128), - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - primitive_append_dict_helper!(key_type, value_type) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - append_byte_dict!(key_type, GenericStringType, StringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - append_byte_dict!(key_type, GenericStringType, LargeStringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - append_byte_dict!(key_type, GenericBinaryType, BinaryArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - append_byte_dict!(key_type, GenericBinaryType, LargeBinaryArray) - } - DataType::Binary => append!(Binary), - DataType::LargeBinary => append!(LargeBinary), - DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary), - t => unimplemented!( - "{}", - format!("data type {} not supported in shuffle write", t) - ), - } -} - -pub(crate) fn make_batch( - schema: SchemaRef, - mut arrays: Vec>, - row_count: usize, -) -> ArrowResult { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); - RecordBatch::try_new_with_options(schema, columns, &options) -} diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index ff02b933f8..55f968ba4d 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -750,14 +750,16 @@ mod test { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub enum CompressionCodec { + #[default] None, Lz4Frame, Zstd(i32), Snappy, } +#[derive(Debug, Clone, Default)] pub struct ShuffleBlockWriter { fast_encoding: bool, codec: CompressionCodec, diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index acd7ff551c..8aed74b997 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod builders; pub(crate) mod codec; mod list; mod map; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 16f605acbb..04a04fb12a 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,11 +17,10 @@ //! Defines the External shuffle repartition plan. -use crate::execution::shuffle::builders::{ - append_columns, make_batch, new_array_builders, slot_size, -}; use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter}; +use arrow::compute::interleave_record_batch; use async_trait::async_trait; +use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::EmptyRecordBatchStream; @@ -53,7 +52,7 @@ use std::{ fmt, fmt::{Debug, Formatter}, fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Cursor, Seek, SeekFrom, Write}, + io::{BufReader, BufWriter, Seek, Write}, sync::Arc, }; use tokio::time::Instant; @@ -287,17 +286,35 @@ struct ShuffleRepartitioner { output_data_file: String, output_index_file: String, schema: SchemaRef, - buffered_partitions: Vec, + buffered_batches: Vec, + partition_indices: Vec>, + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use partitioning: Partitioning, runtime: Arc, metrics: ShuffleRepartitionerMetrics, - /// Hashes for each row in the current batch - hashes_buf: Vec, - /// Partition ids for each row in the current batch - partition_ids: Vec, + /// Reused scratch space for computing partition indices + scratch: ScratchSpace, /// The configured batch size batch_size: usize, + /// Reservation for repartitioning + reservation: MemoryReservation, +} + +#[derive(Default)] +struct ScratchSpace { + /// Hashes for each row in the current batch. + hashes_buf: Vec, + /// Partition ids for each row in the current batch. + partition_ids: Vec, + /// The row indices of the rows in each partition. This array is conceptually dividied into + /// partitions, where each partition contains the row indices of the rows in that partition. + /// The length of this array is the same as the number of rows in the batch. + shuffle_partition_ids: Vec, + /// The start indices of partitions in shuffle_partition_ids. The length of this array is the + /// same as the number of partitions. + partition_starts: Vec, } impl ShuffleRepartitioner { @@ -314,17 +331,25 @@ impl ShuffleRepartitioner { codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result { - let mut hashes_buf = Vec::with_capacity(batch_size); - let mut partition_ids = Vec::with_capacity(batch_size); - - // Safety: `hashes_buf` will be filled with valid values before being used. - // `partition_ids` will be filled with valid values before being used. - unsafe { - hashes_buf.set_len(batch_size); - partition_ids.set_len(batch_size); - } + let num_output_partitions = partitioning.partition_count(); + + // Vectors in the scratch space will be filled with valid values before being used, this + // initialization code is simply initializing the vectors to the desired size. + // The initial values are not used. + let scratch = ScratchSpace { + hashes_buf: vec![0; batch_size], + partition_ids: vec![0; batch_size], + shuffle_partition_ids: vec![0; batch_size], + partition_starts: vec![0; num_output_partitions + 1], + }; + + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec.clone())?; + + let partition_writers = (0..num_output_partitions) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .collect::>>()?; - // This will be split into each PartitionBuffer Reservations, and does not need to be kept itself let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition)) .with_can_spill(true) .register(&runtime.memory_pool); @@ -333,23 +358,16 @@ impl ShuffleRepartitioner { output_data_file, output_index_file, schema: Arc::clone(&schema), - buffered_partitions: (0..partitioning.partition_count()) - .map(|_| { - PartitionBuffer::try_new( - Arc::clone(&schema), - batch_size, - reservation.new_empty(), - codec.clone(), - enable_fast_encoding, - ) - }) - .collect::>>()?, + buffered_batches: vec![], + partition_indices: vec![vec![]; num_output_partitions], + partition_writers, + shuffle_block_writer, partitioning, runtime, metrics, - hashes_buf, - partition_ids, + scratch, batch_size, + reservation, }) } @@ -399,20 +417,17 @@ impl ShuffleRepartitioner { match &self.partitioning { any if any.partition_count() == 1 => { - let buffered_partitions = &mut self.buffered_partitions; - - assert_eq!(buffered_partitions.len(), 1, "Expected 1 partition"); + assert_eq!(self.partition_writers.len(), 1, "Expected 1 partition"); // TODO the single partition case could be optimized to avoid appending all // rows from the batch into builders and then recreating the batch // https://github.com/apache/datafusion-comet/issues/1453 - let indices = (0..input.num_rows()).collect::>(); - self.append_rows_to_partition(input.columns(), &indices, 0) - .await?; + self.buffer_batch_may_spill(input).await?; } Partitioning::Hash(exprs, num_output_partitions) => { - let (partition_starts, shuffled_partition_ids): (Vec, Vec) = { + let mut scratch = std::mem::take(&mut self.scratch); + let (partition_starts, shuffled_partition_ids): (&Vec, &Vec) = { let mut timer = self.metrics.repart_time.timer(); // evaluate partition expressions @@ -422,27 +437,29 @@ impl ShuffleRepartitioner { .collect::>>()?; // use identical seed as spark hash partition - let hashes_buf = &mut self.hashes_buf[..arrays[0].len()]; + let hashes_buf = &mut scratch.hashes_buf[..arrays[0].len()]; hashes_buf.fill(42_u32); // Hash arrays and compute buckets based on number of partitions - let partition_ids = &mut self.partition_ids[..arrays[0].len()]; + let partition_ids = &mut scratch.partition_ids[..arrays[0].len()]; create_murmur3_hashes(&arrays, hashes_buf)? .iter() .enumerate() .for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, *num_output_partitions) as u64 + partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32; }); // count each partition size - let mut partition_counters = vec![0usize; *num_output_partitions]; + let partition_counters = &mut scratch.partition_starts; + partition_counters.resize(num_output_partitions + 1, 0); + partition_counters.fill(0); partition_ids .iter() .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); // accumulate partition counters into partition ends // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] - let mut partition_ends = partition_counters; + let partition_ends = partition_counters; let mut accum = 0; partition_ends.iter_mut().for_each(|v| { *v += accum; @@ -453,38 +470,30 @@ impl ShuffleRepartitioner { // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 4, 5, 0] which is the // row indices for rows ordered by their partition id. For example, first partition // 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let mut shuffled_partition_ids = vec![0usize; input.num_rows()]; + let shuffled_partition_ids = &mut scratch.shuffle_partition_ids; + shuffled_partition_ids.resize(input.num_rows(), 0); for (index, partition_id) in partition_ids.iter().enumerate().rev() { partition_ends[*partition_id as usize] -= 1; let end = partition_ends[*partition_id as usize]; - shuffled_partition_ids[end] = index; + shuffled_partition_ids[end as usize] = index as u32; } // after calculating, partition ends become partition starts - let mut partition_starts = partition_ends; - partition_starts.push(input.num_rows()); + let partition_starts = partition_ends; timer.stop(); - Ok::<(Vec, Vec), DataFusionError>(( + Ok::<(&Vec, &Vec), DataFusionError>(( partition_starts, shuffled_partition_ids, )) }?; - // For each interval of row indices of partition, taking rows from input batch and - // appending into output buffer. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) - { - self.append_rows_to_partition( - input.columns(), - &shuffled_partition_ids[start..end], - partition_id, - ) - .await?; - } + self.buffer_partitioned_batch_may_spill( + input, + shuffled_partition_ids, + partition_starts, + ) + .await?; + self.scratch = scratch; } other => { // this should be unreachable as long as the validation logic @@ -498,17 +507,68 @@ impl ShuffleRepartitioner { Ok(()) } + async fn buffer_partitioned_batch_may_spill( + &mut self, + input: RecordBatch, + shuffled_partition_ids: &[u32], + partition_starts: &[u32], + ) -> Result<()> { + let mut mem_growth: usize = input.get_array_memory_size(); + let buffered_partition_idx = self.buffered_batches.len() as u32; + self.buffered_batches.push(input); + + for (partition_id, (&start, &end)) in partition_starts + .iter() + .tuple_windows() + .enumerate() + .filter(|(_, (start, end))| start < end) + { + let row_indices = &shuffled_partition_ids[start as usize..end as usize]; + let indices = &mut self.partition_indices[partition_id]; + let before_size = indices.allocated_size(); + indices.reserve(row_indices.len()); + for row_idx in row_indices { + indices.push((buffered_partition_idx, *row_idx)); + } + let after_size = indices.allocated_size(); + mem_growth += after_size.saturating_sub(before_size); + } + + let grow_result = { + let mut timer = self.metrics.mempool_time.timer(); + let result = self.reservation.try_grow(mem_growth); + timer.stop(); + result + }; + if grow_result.is_err() { + self.spill().await?; + } + + Ok(()) + } + + async fn buffer_batch_may_spill(&mut self, input: RecordBatch) -> Result<()> { + let size = input.get_array_memory_size(); + self.buffered_batches.push(input); + + let grow_result = { + let mut timer = self.metrics.mempool_time.timer(); + let result = self.reservation.try_grow(size); + timer.stop(); + result + }; + if grow_result.is_err() { + self.spill().await?; + } + Ok(()) + } + /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { + let mut partitioned_batches = self.partitioned_batches(); let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); - let buffered_partitions = &mut self.buffered_partitions; - let num_output_partitions = buffered_partitions.len(); - let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; + let num_output_partitions = self.partition_indices.len(); let mut offsets = vec![0; num_output_partitions + 1]; - for i in 0..num_output_partitions { - buffered_partitions[i].flush(&self.metrics)?; - output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); - } let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); @@ -524,14 +584,24 @@ impl ShuffleRepartitioner { let mut output_data = BufWriter::new(output_data); + #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; - output_data.write_all(&output_batches[i])?; - output_batches[i].clear(); + + // Write in memory batches to output data file + let partition_iter = partitioned_batches.produce(i); + for batch in partition_iter { + let batch = batch?; + self.shuffle_block_writer.write_batch( + &batch, + &mut output_data, + &self.metrics.encode_time, + )?; + } // if we wrote a spill file for this partition then copy the // contents into the shuffle file - if let Some(spill_data) = self.buffered_partitions[i].spill_file.as_ref() { + if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { let mut spill_file = BufReader::new( File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?, ); @@ -569,10 +639,7 @@ impl ShuffleRepartitioner { } fn used(&self) -> usize { - self.buffered_partitions - .iter() - .map(|b| b.reservation.size()) - .sum() + self.reservation.size() } fn spilled_bytes(&self) -> usize { @@ -587,6 +654,26 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } + /// This function transfers the ownership of the buffered batches and partition indices from the + /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct + /// can be used to produce shuffled batches. + fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { + let num_output_partitions = self.partition_indices.len(); + let buffered_batches = std::mem::take(&mut self.buffered_batches); + // let indices = std::mem::take(&mut self.partition_indices); + let indices = std::mem::replace( + &mut self.partition_indices, + vec![vec![]; num_output_partitions], + ); + let single_partition_mode = num_output_partitions == 1; + PartitionedBatchesProducer::new( + buffered_batches, + indices, + self.batch_size, + single_partition_mode, + ) + } + async fn spill(&mut self) -> Result<()> { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", @@ -595,63 +682,28 @@ impl ShuffleRepartitioner { ); // we could always get a chance to free some memory as long as we are holding some - if self.buffered_partitions.is_empty() { + if self.buffered_batches.is_empty() { return Ok(()); } + let num_output_partitions = self.partition_writers.len(); + let mut partitioned_batches = self.partitioned_batches(); let mut spilled_bytes = 0; - for p in &mut self.buffered_partitions { - spilled_bytes += p.spill(&self.runtime, &self.metrics)?; - } - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - Ok(()) - } - - /// Appends rows of specified indices from columns into active array builders in the specified partition. - async fn append_rows_to_partition( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - partition_id: usize, - ) -> Result<()> { - let output = &mut self.buffered_partitions[partition_id]; - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics)?; - - loop { - match output_ret { - AppendRowStatus::Appended => { - break; - } - AppendRowStatus::StartIndex(new_start) => { - // Cannot allocate enough memory for the array builders in this partition, - // spill all partitions and retry. - self.spill().await?; - - start_index = new_start; - let output = &mut self.buffered_partitions[partition_id]; - output_ret = - output.append_rows(columns, indices, start_index, &self.metrics)?; - - if let AppendRowStatus::StartIndex(new_start) = output_ret { - if new_start == start_index { - // If the start index is not updated, it means that the partition - // is still not able to allocate enough memory for the array builders. - return Err(DataFusionError::Internal( - "Partition is still not able to allocate enough memory for the array builders after spilling." - .to_string(), - )); - } - } - } + for partition_id in 0..num_output_partitions { + let partition_writer = &mut self.partition_writers[partition_id]; + let iter = partitioned_batches.produce(partition_id); + for batch in iter { + let batch = batch?; + spilled_bytes += partition_writer.spill(&batch, &self.runtime, &self.metrics)?; } } + let mut timer = self.metrics.mempool_time.timer(); + self.reservation.free(); + timer.stop(); + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(spilled_bytes); Ok(()) } } @@ -667,175 +719,169 @@ impl Debug for ShuffleRepartitioner { } } -/// The status of appending rows to a partition buffer. -#[derive(Debug)] -enum AppendRowStatus { - /// Rows were appended - Appended, - /// Not all rows were appended due to lack of available memory - StartIndex(usize), +/// A helper struct to produce shuffled batches. +/// This struct takes ownership of the buffered batches and partition indices from the +/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. +struct PartitionedBatchesProducer { + buffered_batches: Vec, + partition_indices: Vec>, + batch_size: usize, + single_partition_mode: bool, } -struct PartitionBuffer { - /// The schema of batches to be partitioned. - schema: SchemaRef, - /// The "frozen" Arrow IPC bytes of active data. They are frozen when `flush` is called. - frozen: Vec, - /// Array builders for appending rows into buffering batches. - active: Vec>, - /// The estimation of memory size of active builders in bytes when they are filled. - active_slots_mem_size: usize, - /// Number of rows in active builders. - num_active_rows: usize, - /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, - /// the active array builders will be frozen and appended to frozen buffer `frozen`. +impl PartitionedBatchesProducer { + fn new( + buffered_batches: Vec, + indices: Vec>, + batch_size: usize, + single_partition_mode: bool, + ) -> Self { + Self { + partition_indices: indices, + buffered_batches, + batch_size, + single_partition_mode, + } + } + + fn produce(&mut self, partition_id: usize) -> BatchIterator { + if self.single_partition_mode { + assert!( + partition_id == 0, + "Single partition mode can only be used for the first partition" + ); + BatchIterator::SinglePartition(SinglePartitionBatchIterator::new(std::mem::take( + &mut self.buffered_batches, + ))) + } else { + BatchIterator::Partitioned(PartitionedBatchIterator::new( + &self.partition_indices[partition_id], + &self.buffered_batches, + self.batch_size, + )) + } + } +} + +enum BatchIterator<'a> { + Partitioned(PartitionedBatchIterator<'a>), + SinglePartition(SinglePartitionBatchIterator), +} + +struct SinglePartitionBatchIterator { + buffered_batches: std::vec::IntoIter, +} + +struct PartitionedBatchIterator<'a> { + record_batches: Vec<&'a RecordBatch>, batch_size: usize, - /// Memory reservation for this partition buffer. - reservation: MemoryReservation, - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression - shuffle_block_writer: ShuffleBlockWriter, + indices: Vec<(usize, usize)>, + pos: usize, } -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, +impl Iterator for BatchIterator<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + match self { + BatchIterator::Partitioned(iter) => iter.next(), + BatchIterator::SinglePartition(iter) => iter.next(), + } + } } -impl PartitionBuffer { - fn try_new( - schema: SchemaRef, +impl SinglePartitionBatchIterator { + fn new(buffered_batches: Vec) -> Self { + Self { + buffered_batches: buffered_batches.into_iter(), + } + } + + fn next(&mut self) -> Option> { + self.buffered_batches.next().map(Ok) + } +} + +impl<'a> PartitionedBatchIterator<'a> { + fn new( + indices: &'a [(u32, u32)], + buffered_batches: &'a [RecordBatch], batch_size: usize, - reservation: MemoryReservation, - codec: CompressionCodec, - enable_fast_encoding: bool, - ) -> Result { - let shuffle_block_writer = - ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec)?; - let active_slots_mem_size = schema - .fields() + ) -> Self { + if indices.is_empty() { + // Avoid unnecessary allocations when the partition is empty + return Self { + record_batches: vec![], + batch_size, + indices: vec![], + pos: 0, + }; + } + let record_batches = buffered_batches.iter().collect::>(); + let current_indices = indices .iter() - .map(|field| slot_size(batch_size, field.data_type())) - .sum::(); - Ok(Self { - schema, - frozen: vec![], - active: vec![], - active_slots_mem_size, - num_active_rows: 0, + .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) + .collect::>(); + Self { + record_batches, batch_size, - reservation, - spill_file: None, - shuffle_block_writer, - }) - } - - /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. - fn allocate_active_builders(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { - if self.active.is_empty() { - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_grow(self.active_slots_mem_size)?; - mempool_timer.stop(); - - let mut repart_timer = metrics.repart_time.timer(); - self.active = new_array_builders(&self.schema, self.batch_size); - repart_timer.stop(); + indices: current_indices, + pos: 0, } - Ok(()) } - /// Appends rows of specified indices from columns into active array builders. - fn append_rows( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - start_index: usize, - metrics: &ShuffleRepartitionerMetrics, - ) -> Result { - let mut start = start_index; - - // loop until all indices are processed - while start < indices.len() { - let end = (start + self.batch_size).min(indices.len()); - - // allocate builders - if self.allocate_active_builders(metrics).is_err() { - // could not allocate memory for builders, so abort - // and return the current index - return Ok(AppendRowStatus::StartIndex(start)); - } - - let mut repart_timer = metrics.repart_time.timer(); - self.active - .iter_mut() - .zip(columns) - .for_each(|(builder, column)| { - append_columns(builder, column, &indices[start..end], column.data_type()); - }); - self.num_active_rows += end - start; - repart_timer.stop(); - start = end; + fn next(&mut self) -> Option> { + if self.pos >= self.indices.len() { + return None; + } - if self.num_active_rows >= self.batch_size { - self.flush(metrics)?; + let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); + let indices = &self.indices[self.pos..indices_end]; + match interleave_record_batch(&self.record_batches, indices) { + Ok(batch) => { + self.pos = indices_end; + Some(Ok(batch)) } + Err(e) => Some(Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + ))), } - Ok(AppendRowStatus::Appended) } +} - /// Flush active data into frozen bytes. This can reduce memory usage because the frozen - /// bytes are compressed. - fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { - if self.num_active_rows == 0 { - return Ok(()); - } +#[derive(Default)] +struct PartitionWriter { + /// Spill file for intermediate shuffle output for this partition. Each spill event + /// will append to this file and the contents will be copied to the shuffle file at + /// the end of processing. + spill_file: Option, + /// Writer that performs encoding and compression + shuffle_block_writer: ShuffleBlockWriter, +} - // active -> staging - let active = std::mem::take(&mut self.active); - let num_rows = self.num_active_rows; - self.num_active_rows = 0; - - let mut repart_timer = metrics.repart_time.timer(); - let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; - repart_timer.stop(); - - let mut cursor = Cursor::new(&mut self.frozen); - cursor.seek(SeekFrom::End(0))?; - let bytes_written = self.shuffle_block_writer.write_batch( - &frozen_batch, - &mut cursor, - &metrics.encode_time, - )?; - - // we typically expect the frozen bytes to take up less memory than - // the builders due to compression but there could be edge cases where - // this is not the case - let mut mempool_timer = metrics.mempool_time.timer(); - if self.active_slots_mem_size >= bytes_written { - self.reservation - .try_shrink(self.active_slots_mem_size - bytes_written)?; - } else { - self.reservation - .try_grow(bytes_written - self.active_slots_mem_size)?; - } - mempool_timer.stop(); +struct SpillFile { + temp_file: RefCountedTempFile, + file: File, +} - Ok(()) +impl PartitionWriter { + fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result { + Ok(Self { + spill_file: None, + shuffle_block_writer, + }) } fn spill( &mut self, + batch: &RecordBatch, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, ) -> Result { - self.flush(metrics).unwrap(); - let output_batches = std::mem::take(&mut self.frozen); let mut write_timer = metrics.write_time.timer(); if self.spill_file.is_none() { + // Spill file is not yet created, create it let spill_file = runtime .disk_manager .create_tmp_file("shuffle writer spill")?; @@ -852,16 +898,19 @@ impl PartitionBuffer { file: spill_data, }); } - self.spill_file - .as_mut() - .unwrap() - .file - .write_all(&output_batches)?; + + let bytes_written = { + let mut writer = BufWriter::new(&self.spill_file.as_mut().unwrap().file); + let bytes_written = + self.shuffle_block_writer + .write_batch(batch, &mut writer, &metrics.encode_time)?; + writer.flush()?; + bytes_written + }; + write_timer.stop(); - let mut timer = metrics.mempool_time.timer(); - self.reservation.free(); - timer.stop(); - Ok(output_batches.len()) + + Ok(bytes_written) } } @@ -885,7 +934,7 @@ mod test { use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::common::collect; use datafusion::prelude::SessionContext; - use parquet::file::reader::Length; + use std::io::Cursor; use tokio::runtime::Runtime; #[test] @@ -919,34 +968,6 @@ mod test { } } - #[test] - fn test_slot_size() { - let batch_size = 1usize; - // not inclusive of all supported types, but enough to test the function - let supported_primitive_types = [ - DataType::Int32, - DataType::Int64, - DataType::UInt32, - DataType::UInt64, - DataType::Float32, - DataType::Float64, - DataType::Boolean, - DataType::Utf8, - DataType::LargeUtf8, - DataType::Binary, - DataType::LargeBinary, - DataType::FixedSizeBinary(16), - ]; - let expected_slot_size = [4, 8, 4, 8, 4, 8, 1, 104, 108, 104, 108, 16]; - supported_primitive_types - .iter() - .zip(expected_slot_size.iter()) - .for_each(|(data_type, expected)| { - let slot_size = slot_size(batch_size, data_type); - assert_eq!(slot_size, *expected); - }) - } - #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { @@ -973,75 +994,6 @@ mod test { shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); } - #[test] - fn partition_buffer_memory() { - let batch = create_batch(900); - let runtime_env = create_runtime(128 * 1024); - let reservation = MemoryConsumer::new("ShuffleRepartitioner[0]") - .with_can_spill(true) - .register(&runtime_env.memory_pool); - let mut buffer = PartitionBuffer::try_new( - batch.schema(), - 1024, - reservation, - CompressionCodec::Lz4Frame, - true, - ) - .unwrap(); - let metrics_set = ExecutionPlanMetricsSet::new(); - let metrics = ShuffleRepartitionerMetrics::new(&metrics_set, 0); - let indices: Vec = (0..batch.num_rows()).collect(); - - assert_eq!(0, buffer.reservation.size()); - assert!(buffer.spill_file.is_none()); - - // append first batch - should fit in memory - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(900, buffer.num_active_rows); - assert_eq!(106496, buffer.reservation.size()); - assert_eq!(0, buffer.frozen.len()); - assert!(buffer.spill_file.is_none()); - - // append second batch - should trigger flush to frozen bytes - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(0, buffer.num_active_rows); - assert_eq!(9914, buffer.frozen.len()); - assert_eq!(9914, buffer.reservation.size()); - assert!(buffer.spill_file.is_none()); - - // spill - buffer.spill(&runtime_env, &metrics).unwrap(); - assert_eq!(0, buffer.num_active_rows); - assert_eq!(0, buffer.frozen.len()); - assert_eq!(0, buffer.reservation.size()); - assert!(buffer.spill_file.is_some()); - assert_eq!(9914, buffer.spill_file.as_ref().unwrap().file.len()); - - // append after spill - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(900, buffer.num_active_rows); - assert_eq!(106496, buffer.reservation.size()); - assert_eq!(0, buffer.frozen.len()); - } - #[tokio::test] async fn shuffle_repartitioner_memory() { let batch = create_batch(900); @@ -1067,41 +1019,19 @@ mod test { repartitioner.insert_batch(batch.clone()).await.unwrap(); - assert_eq!(2, repartitioner.buffered_partitions.len()); + assert_eq!(2, repartitioner.partition_writers.len()); - assert!(repartitioner.buffered_partitions[0].spill_file.is_none()); - assert!(repartitioner.buffered_partitions[1].spill_file.is_none()); - - assert_eq!( - 106496, - repartitioner.buffered_partitions[0].reservation.size() - ); - assert_eq!( - 106496, - repartitioner.buffered_partitions[1].reservation.size() - ); + assert!(repartitioner.partition_writers[0].spill_file.is_none()); + assert!(repartitioner.partition_writers[1].spill_file.is_none()); repartitioner.spill().await.unwrap(); // after spill, there should be spill files - assert!(repartitioner.buffered_partitions[0].spill_file.is_some()); - assert!(repartitioner.buffered_partitions[1].spill_file.is_some()); - - // after spill, all reservations should be freed - assert_eq!(0, repartitioner.buffered_partitions[0].reservation.size()); - assert_eq!(0, repartitioner.buffered_partitions[1].reservation.size()); + assert!(repartitioner.partition_writers[0].spill_file.is_some()); + assert!(repartitioner.partition_writers[1].spill_file.is_some()); // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); - - assert_eq!( - 106496, - repartitioner.buffered_partitions[0].reservation.size() - ); - assert_eq!( - 106496, - repartitioner.buffered_partitions[1].reservation.size() - ); } fn create_runtime(memory_limit: usize) -> Arc { From ae6b0f3b421999cc2f74221494d9d79bd15a89c4 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Tue, 18 Mar 2025 20:36:06 +0800 Subject: [PATCH 2/9] Coalesce small batches into larger batches when shuffle writing single partition --- .../src/execution/shuffle/shuffle_writer.rs | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 04a04fb12a..f2be940957 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -750,9 +750,10 @@ impl PartitionedBatchesProducer { partition_id == 0, "Single partition mode can only be used for the first partition" ); - BatchIterator::SinglePartition(SinglePartitionBatchIterator::new(std::mem::take( - &mut self.buffered_batches, - ))) + BatchIterator::SinglePartition(SinglePartitionBatchIterator::new( + std::mem::take(&mut self.buffered_batches), + self.batch_size, + )) } else { BatchIterator::Partitioned(PartitionedBatchIterator::new( &self.partition_indices[partition_id], @@ -770,6 +771,9 @@ enum BatchIterator<'a> { struct SinglePartitionBatchIterator { buffered_batches: std::vec::IntoIter, + batch_size: usize, + pending_batch: Option, + concatenating_batches: Vec, } struct PartitionedBatchIterator<'a> { @@ -791,14 +795,62 @@ impl Iterator for BatchIterator<'_> { } impl SinglePartitionBatchIterator { - fn new(buffered_batches: Vec) -> Self { + fn new(buffered_batches: Vec, batch_size: usize) -> Self { Self { buffered_batches: buffered_batches.into_iter(), + batch_size, + pending_batch: None, + concatenating_batches: vec![], } } fn next(&mut self) -> Option> { - self.buffered_batches.next().map(Ok) + if self.pending_batch.is_none() { + // Get the first batch if we don't have a pending batch + self.pending_batch = self.buffered_batches.next(); + self.pending_batch.as_ref()?; + } + + let pending_batch = self.pending_batch.take().unwrap(); + let mut current_row_count = pending_batch.num_rows(); + let schema = pending_batch.schema(); + self.concatenating_batches.clear(); + self.concatenating_batches.push(pending_batch); + + // Keep accumulating batches until we reach/exceed batch_size or run out of batches + while current_row_count < self.batch_size { + if let Some(next_batch) = self.buffered_batches.next() { + let next_row_count = next_batch.num_rows(); + + // If adding this batch would exceed batch_size, save it for next time + current_row_count += next_row_count; + if current_row_count > self.batch_size { + self.pending_batch = Some(next_batch); + break; + } + + // Otherwise, concatenate the batches + self.concatenating_batches.push(next_batch); + } else { + // No more batches to concatenate + break; + } + } + + if self.concatenating_batches.len() > 1 { + match arrow::compute::concat_batches(&schema, self.concatenating_batches.iter()) { + Ok(concatenated) => Some(Ok(concatenated)), + Err(e) => { + // If concatenation fails, still return what we have so far + Some(Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + ))) + } + } + } else { + Some(Ok(self.concatenating_batches.pop().unwrap())) + } } } From 3a001ad8c00f3058eb62bb25529e392a7cc4178a Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Fri, 21 Mar 2025 20:37:31 +0800 Subject: [PATCH 3/9] buffer writes to shuffle files and spill files --- .../src/execution/shuffle/shuffle_writer.rs | 123 ++++++++++++++---- 1 file changed, 97 insertions(+), 26 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index f2be940957..c895be2a79 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -46,7 +46,7 @@ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::executor::block_on; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; -use std::io::Error; +use std::io::{Cursor, Error, SeekFrom}; use std::{ any::Any, fmt, @@ -589,15 +589,13 @@ impl ShuffleRepartitioner { offsets[i] = output_data.stream_position()?; // Write in memory batches to output data file - let partition_iter = partitioned_batches.produce(i); - for batch in partition_iter { - let batch = batch?; - self.shuffle_block_writer.write_batch( - &batch, - &mut output_data, - &self.metrics.encode_time, - )?; - } + let mut partition_iter = partitioned_batches.produce(i); + Self::shuffle_write_partition( + &mut partition_iter, + &mut self.shuffle_block_writer, + &mut output_data, + &self.metrics.encode_time, + )?; // if we wrote a spill file for this partition then copy the // contents into the shuffle file @@ -634,6 +632,21 @@ impl ShuffleRepartitioner { )))) } + fn shuffle_write_partition( + partition_iter: &mut BatchIterator, + shuffle_block_writer: &mut ShuffleBlockWriter, + output_data: &mut BufWriter, + encode_time: &Time, + ) -> Result<()> { + let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data); + for batch in partition_iter { + let batch = batch?; + buf_batch_writer.write(&batch, encode_time)?; + } + buf_batch_writer.flush()?; + Ok(()) + } + fn to_df_err(e: Error) -> DataFusionError { DataFusionError::Execution(format!("shuffle write error: {:?}", e)) } @@ -692,11 +705,8 @@ impl ShuffleRepartitioner { for partition_id in 0..num_output_partitions { let partition_writer = &mut self.partition_writers[partition_id]; - let iter = partitioned_batches.produce(partition_id); - for batch in iter { - let batch = batch?; - spilled_bytes += partition_writer.spill(&batch, &self.runtime, &self.metrics)?; - } + let mut iter = partitioned_batches.produce(partition_id); + spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?; } let mut timer = self.metrics.mempool_time.timer(); @@ -927,11 +937,37 @@ impl PartitionWriter { fn spill( &mut self, - batch: &RecordBatch, + iter: &mut BatchIterator, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, ) -> Result { - let mut write_timer = metrics.write_time.timer(); + if let Some(batch) = iter.next() { + let mut write_timer = metrics.write_time.timer(); + self.ensure_spill_file_created(runtime)?; + + let total_bytes_written = { + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + ); + let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time)?; + for batch in iter { + let batch = batch?; + bytes_written += buf_batch_writer.write(&batch, &metrics.encode_time)?; + } + buf_batch_writer.flush()?; + bytes_written + }; + + write_timer.stop(); + + Ok(total_bytes_written) + } else { + Ok(0) + } + } + + fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> Result<()> { if self.spill_file.is_none() { // Spill file is not yet created, create it let spill_file = runtime @@ -950,20 +986,55 @@ impl PartitionWriter { file: spill_data, }); } + Ok(()) + } +} - let bytes_written = { - let mut writer = BufWriter::new(&self.spill_file.as_mut().unwrap().file); - let bytes_written = - self.shuffle_block_writer - .write_batch(batch, &mut writer, &metrics.encode_time)?; - writer.flush()?; - bytes_written - }; +/// Write batches to writer while using a buffer to avoid frequent system calls. +/// The record batches were first written by ShuffleBlockWriter into an internal buffer. +/// Once the buffer exceeds the max size, the buffer will be flushed to the writer. +struct BufBatchWriter<'a, W: Write> { + shuffle_block_writer: &'a mut ShuffleBlockWriter, + writer: &'a mut W, + buffer: Vec, + buffer_max_size: usize, +} - write_timer.stop(); +impl<'a, W: Write> BufBatchWriter<'a, W> { + fn new(shuffle_block_writer: &'a mut ShuffleBlockWriter, writer: &'a mut W) -> Self { + // 1MB should be good enough to avoid frequent system calls, + // and also won't cause too much memory usage + let buffer_max_size = 1024 * 1024; + Self { + shuffle_block_writer, + writer, + buffer: vec![], + buffer_max_size, + } + } + fn write(&mut self, batch: &RecordBatch, encode_time: &Time) -> Result { + let mut cursor = Cursor::new(&mut self.buffer); + cursor.seek(SeekFrom::End(0))?; + let bytes_written = + self.shuffle_block_writer + .write_batch(batch, &mut cursor, encode_time)?; + let pos = cursor.position(); + if pos > self.buffer_max_size as u64 { + self.writer.write_all(&self.buffer)?; + self.buffer.clear(); + } Ok(bytes_written) } + + fn flush(&mut self) -> Result<()> { + if !self.buffer.is_empty() { + self.writer.write_all(&self.buffer)?; + self.buffer.clear(); + } + self.writer.flush()?; + Ok(()) + } } fn pmod(hash: u32, n: usize) -> usize { From ef79ab5c65ceb593959218876f2c1e83ec2c27ed Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 26 Mar 2025 15:49:25 +0800 Subject: [PATCH 4/9] Reimplemented single partition shuffle write --- .../src/execution/shuffle/shuffle_writer.rs | 587 ++++++++++-------- 1 file changed, 331 insertions(+), 256 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index c895be2a79..0884530518 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -46,6 +46,7 @@ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::executor::block_on; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; +use std::borrow::Borrow; use std::io::{Cursor, Error, SeekFrom}; use std::{ any::Any, @@ -214,18 +215,30 @@ async fn external_shuffle( enable_fast_encoding: bool, ) -> Result { let schema = input.schema(); - let mut repartitioner = ShuffleRepartitioner::try_new( - partition, - output_data_file, - output_index_file, - Arc::clone(&schema), - partitioning, - metrics, - context.runtime_env(), - context.session_config().batch_size(), - codec, - enable_fast_encoding, - )?; + + let mut repartitioner: Box = match &partitioning { + any if any.partition_count() == 1 => Box::new(SinglePartitionShufflePartitioner::try_new( + output_data_file, + output_index_file, + Arc::clone(&schema), + metrics, + context.session_config().batch_size(), + codec, + enable_fast_encoding, + )?), + _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + enable_fast_encoding, + )?), + }; while let Some(batch) = input.next().await { // Block on the repartitioner to insert the batch and shuffle the rows @@ -234,7 +247,11 @@ async fn external_shuffle( // current batch in the repartitioner. block_on(repartitioner.insert_batch(batch?))?; } - repartitioner.shuffle_write().await + + repartitioner.shuffle_write().await?; + + // shuffle writer always has empty output + Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema)))) } struct ShuffleRepartitionerMetrics { @@ -282,10 +299,18 @@ impl ShuffleRepartitionerMetrics { } } -struct ShuffleRepartitioner { +#[async_trait::async_trait] +trait ShufflePartitioner: Send + Sync { + /// Insert a batch into the partitioner + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; + /// Write shuffle data and shuffle index file to disk + async fn shuffle_write(&mut self) -> Result<()>; +} + +/// A partitioner that uses a hash function to partition data into multiple partitions +struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, - schema: SchemaRef, buffered_batches: Vec, partition_indices: Vec>, partition_writers: Vec, @@ -317,7 +342,7 @@ struct ScratchSpace { partition_starts: Vec, } -impl ShuffleRepartitioner { +impl MultiPartitionShuffleRepartitioner { #[allow(clippy::too_many_arguments)] pub fn try_new( partition: usize, @@ -357,7 +382,6 @@ impl ShuffleRepartitioner { Ok(Self { output_data_file, output_index_file, - schema: Arc::clone(&schema), buffered_batches: vec![], partition_indices: vec![vec![]; num_output_partitions], partition_writers, @@ -371,26 +395,6 @@ impl ShuffleRepartitioner { }) } - /// Shuffles rows in input batch into corresponding partition buffer. - /// This function will slice input batch according to configured batch size and then - /// shuffle rows into corresponding partition buffer. - async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - let start_time = Instant::now(); - let mut start = 0; - while start < batch.num_rows() { - let end = (start + self.batch_size).min(batch.num_rows()); - let batch = batch.slice(start, end - start); - self.partitioning_batch(batch).await?; - start = end; - } - self.metrics.input_batches.add(1); - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - } - /// Shuffles rows in input batch into corresponding partition buffer. /// This function first calculates hashes for rows and then takes rows in same /// partition as a record batch which is appended into partition buffer. @@ -416,15 +420,6 @@ impl ShuffleRepartitioner { self.metrics.baseline.record_output(input.num_rows()); match &self.partitioning { - any if any.partition_count() == 1 => { - assert_eq!(self.partition_writers.len(), 1, "Expected 1 partition"); - - // TODO the single partition case could be optimized to avoid appending all - // rows from the batch into builders and then recreating the batch - // https://github.com/apache/datafusion-comet/issues/1453 - - self.buffer_batch_may_spill(input).await?; - } Partitioning::Hash(exprs, num_output_partitions) => { let mut scratch = std::mem::take(&mut self.scratch); let (partition_starts, shuffled_partition_ids): (&Vec, &Vec) = { @@ -547,93 +542,8 @@ impl ShuffleRepartitioner { Ok(()) } - async fn buffer_batch_may_spill(&mut self, input: RecordBatch) -> Result<()> { - let size = input.get_array_memory_size(); - self.buffered_batches.push(input); - - let grow_result = { - let mut timer = self.metrics.mempool_time.timer(); - let result = self.reservation.try_grow(size); - timer.stop(); - result - }; - if grow_result.is_err() { - self.spill().await?; - } - Ok(()) - } - - /// Writes buffered shuffled record batches into Arrow IPC bytes. - async fn shuffle_write(&mut self) -> Result { - let mut partitioned_batches = self.partitioned_batches(); - let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); - let num_output_partitions = self.partition_indices.len(); - let mut offsets = vec![0; num_output_partitions + 1]; - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let mut write_time = self.metrics.write_time.timer(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - - let mut output_data = BufWriter::new(output_data); - - #[allow(clippy::needless_range_loop)] - for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; - - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.encode_time, - )?; - - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { - let mut spill_file = BufReader::new( - File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?, - ); - std::io::copy(&mut spill_file, &mut output_data).map_err(Self::to_df_err)?; - } - } - output_data.flush()?; - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position().map_err(Self::to_df_err)?; - - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) - })?); - for offset in offsets { - output_index - .write_all(&(offset as i64).to_le_bytes()[..]) - .map_err(Self::to_df_err)?; - } - output_index.flush()?; - - write_time.stop(); - - elapsed_compute.stop(); - - // shuffle writer always has empty output - Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))) - } - fn shuffle_write_partition( - partition_iter: &mut BatchIterator, + partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, encode_time: &Time, @@ -647,10 +557,6 @@ impl ShuffleRepartitioner { Ok(()) } - fn to_df_err(e: Error) -> DataFusionError { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) - } - fn used(&self) -> usize { self.reservation.size() } @@ -678,13 +584,7 @@ impl ShuffleRepartitioner { &mut self.partition_indices, vec![vec![]; num_output_partitions], ); - let single_partition_mode = num_output_partitions == 1; - PartitionedBatchesProducer::new( - buffered_batches, - indices, - self.batch_size, - single_partition_mode, - ) + PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) } async fn spill(&mut self) -> Result<()> { @@ -718,7 +618,98 @@ impl ShuffleRepartitioner { } } -impl Debug for ShuffleRepartitioner { +#[async_trait::async_trait] +impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { + /// Shuffles rows in input batch into corresponding partition buffer. + /// This function will slice input batch according to configured batch size and then + /// shuffle rows into corresponding partition buffer. + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let batch = batch.slice(start, end - start); + self.partitioning_batch(batch).await?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + /// Writes buffered shuffled record batches into Arrow IPC bytes. + async fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); + + let mut partitioned_batches = self.partitioned_batches(); + let num_output_partitions = self.partition_indices.len(); + let mut offsets = vec![0; num_output_partitions + 1]; + + let data_file = self.output_data_file.clone(); + let index_file = self.output_index_file.clone(); + + let mut write_time = self.metrics.write_time.timer(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + + let mut output_data = BufWriter::new(output_data); + + #[allow(clippy::needless_range_loop)] + for i in 0..num_output_partitions { + offsets[i] = output_data.stream_position()?; + + // Write in memory batches to output data file + let mut partition_iter = partitioned_batches.produce(i); + Self::shuffle_write_partition( + &mut partition_iter, + &mut self.shuffle_block_writer, + &mut output_data, + &self.metrics.encode_time, + )?; + + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { + let mut spill_file = + BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?); + std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; + } + } + output_data.flush()?; + + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?; + + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + })?); + for offset in offsets { + output_index + .write_all(&(offset as i64).to_le_bytes()[..]) + .map_err(to_df_err)?; + } + output_index.flush()?; + + write_time.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} + +impl Debug for MultiPartitionShuffleRepartitioner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("ShuffleRepartitioner") .field("memory_used", &self.used()) @@ -729,6 +720,175 @@ impl Debug for ShuffleRepartitioner { } } +/// A partitioner that writes all shuffle data to a single file and a single index file +struct SinglePartitionShufflePartitioner { + // output_data_file: File, + output_data_writer: BufBatchWriter, + output_index_path: String, + /// Batches that are smaller than the batch size and to be concatenated + buffered_batches: Vec, + /// Number of rows in the concatenating batches + num_buffered_rows: usize, + /// Metrics for the repartitioner + metrics: ShuffleRepartitionerMetrics, + /// The configured batch size + batch_size: usize, +} + +impl SinglePartitionShufflePartitioner { + fn try_new( + output_data_path: String, + output_index_path: String, + schema: SchemaRef, + metrics: ShuffleRepartitionerMetrics, + batch_size: usize, + codec: CompressionCodec, + enable_fast_encoding: bool, + ) -> Result { + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec.clone())?; + + let output_data_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_path) + .map_err(to_df_err)?; + + let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file); + + Ok(Self { + output_data_writer, + output_index_path, + buffered_batches: vec![], + num_buffered_rows: 0, + metrics, + batch_size, + }) + } + + /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated + /// and written to the output data file when the number of rows in the buffer reaches the batch size. + fn add_buffered_batch(&mut self, batch: RecordBatch) { + self.num_buffered_rows += batch.num_rows(); + self.buffered_batches.push(batch); + } + + /// Consumes buffered batches and return a concatenated batch if successful + fn concat_buffered_batches(&mut self) -> Result> { + if self.buffered_batches.is_empty() { + Ok(None) + } else if self.buffered_batches.len() == 1 { + let batch = self.buffered_batches.remove(0); + self.num_buffered_rows = 0; + Ok(Some(batch)) + } else { + let schema = &self.buffered_batches[0].schema(); + match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) { + Ok(concatenated) => { + self.buffered_batches.clear(); + self.num_buffered_rows = 0; + Ok(Some(concatenated)) + } + Err(e) => Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + )), + } + } + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for SinglePartitionShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + + if num_rows > 0 { + self.metrics.data_size.add(batch.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { + let concatenated_batch = self.concat_buffered_batches()?; + + let write_start_time = Instant::now(); + + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer + .write(&batch, &self.metrics.encode_time)?; + } + + if num_rows >= self.batch_size { + // Write the new batch + self.output_data_writer + .write(&batch, &self.metrics.encode_time)?; + } else { + // Add the new batch to the buffer + self.add_buffered_batch(batch); + } + + self.metrics + .write_time + .add_duration(write_start_time.elapsed()); + } else { + self.add_buffered_batch(batch); + } + } + + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + async fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); + let concatenated_batch = self.concat_buffered_batches()?; + + // Write the concatenated buffered batch + let mut write_time = self.metrics.write_time.timer(); + if let Some(batch) = concatenated_batch { + self.output_data_writer + .write(&batch, &self.metrics.encode_time)?; + } + self.output_data_writer.flush()?; + write_time.stop(); + + // Write index file. It should only contain 2 entires: 0 and the total number of bytes written + let mut index_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(self.output_index_path.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + let data_file_length = self + .output_data_writer + .writer + .stream_position() + .map_err(to_df_err)?; + for offset in [0, data_file_length] { + index_file + .write_all(&(offset as i64).to_le_bytes()[..]) + .map_err(to_df_err)?; + } + index_file.flush()?; + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} + +fn to_df_err(e: Error) -> DataFusionError { + DataFusionError::Execution(format!("shuffle write error: {:?}", e)) +} + /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the /// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. @@ -736,7 +896,6 @@ struct PartitionedBatchesProducer { buffered_batches: Vec, partition_indices: Vec>, batch_size: usize, - single_partition_mode: bool, } impl PartitionedBatchesProducer { @@ -744,48 +903,23 @@ impl PartitionedBatchesProducer { buffered_batches: Vec, indices: Vec>, batch_size: usize, - single_partition_mode: bool, ) -> Self { Self { partition_indices: indices, buffered_batches, batch_size, - single_partition_mode, } } - fn produce(&mut self, partition_id: usize) -> BatchIterator { - if self.single_partition_mode { - assert!( - partition_id == 0, - "Single partition mode can only be used for the first partition" - ); - BatchIterator::SinglePartition(SinglePartitionBatchIterator::new( - std::mem::take(&mut self.buffered_batches), - self.batch_size, - )) - } else { - BatchIterator::Partitioned(PartitionedBatchIterator::new( - &self.partition_indices[partition_id], - &self.buffered_batches, - self.batch_size, - )) - } + fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator { + PartitionedBatchIterator::new( + &self.partition_indices[partition_id], + &self.buffered_batches, + self.batch_size, + ) } } -enum BatchIterator<'a> { - Partitioned(PartitionedBatchIterator<'a>), - SinglePartition(SinglePartitionBatchIterator), -} - -struct SinglePartitionBatchIterator { - buffered_batches: std::vec::IntoIter, - batch_size: usize, - pending_batch: Option, - concatenating_batches: Vec, -} - struct PartitionedBatchIterator<'a> { record_batches: Vec<&'a RecordBatch>, batch_size: usize, @@ -793,77 +927,6 @@ struct PartitionedBatchIterator<'a> { pos: usize, } -impl Iterator for BatchIterator<'_> { - type Item = Result; - - fn next(&mut self) -> Option { - match self { - BatchIterator::Partitioned(iter) => iter.next(), - BatchIterator::SinglePartition(iter) => iter.next(), - } - } -} - -impl SinglePartitionBatchIterator { - fn new(buffered_batches: Vec, batch_size: usize) -> Self { - Self { - buffered_batches: buffered_batches.into_iter(), - batch_size, - pending_batch: None, - concatenating_batches: vec![], - } - } - - fn next(&mut self) -> Option> { - if self.pending_batch.is_none() { - // Get the first batch if we don't have a pending batch - self.pending_batch = self.buffered_batches.next(); - self.pending_batch.as_ref()?; - } - - let pending_batch = self.pending_batch.take().unwrap(); - let mut current_row_count = pending_batch.num_rows(); - let schema = pending_batch.schema(); - self.concatenating_batches.clear(); - self.concatenating_batches.push(pending_batch); - - // Keep accumulating batches until we reach/exceed batch_size or run out of batches - while current_row_count < self.batch_size { - if let Some(next_batch) = self.buffered_batches.next() { - let next_row_count = next_batch.num_rows(); - - // If adding this batch would exceed batch_size, save it for next time - current_row_count += next_row_count; - if current_row_count > self.batch_size { - self.pending_batch = Some(next_batch); - break; - } - - // Otherwise, concatenate the batches - self.concatenating_batches.push(next_batch); - } else { - // No more batches to concatenate - break; - } - } - - if self.concatenating_batches.len() > 1 { - match arrow::compute::concat_batches(&schema, self.concatenating_batches.iter()) { - Ok(concatenated) => Some(Ok(concatenated)), - Err(e) => { - // If concatenation fails, still return what we have so far - Some(Err(DataFusionError::ArrowError( - e, - Some(DataFusionError::get_back_trace()), - ))) - } - } - } else { - Some(Ok(self.concatenating_batches.pop().unwrap())) - } - } -} - impl<'a> PartitionedBatchIterator<'a> { fn new( indices: &'a [(u32, u32)], @@ -891,8 +954,12 @@ impl<'a> PartitionedBatchIterator<'a> { pos: 0, } } +} + +impl Iterator for PartitionedBatchIterator<'_> { + type Item = Result; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { if self.pos >= self.indices.len() { return None; } @@ -937,7 +1004,7 @@ impl PartitionWriter { fn spill( &mut self, - iter: &mut BatchIterator, + iter: &mut PartitionedBatchIterator, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, ) -> Result { @@ -993,15 +1060,15 @@ impl PartitionWriter { /// Write batches to writer while using a buffer to avoid frequent system calls. /// The record batches were first written by ShuffleBlockWriter into an internal buffer. /// Once the buffer exceeds the max size, the buffer will be flushed to the writer. -struct BufBatchWriter<'a, W: Write> { - shuffle_block_writer: &'a mut ShuffleBlockWriter, - writer: &'a mut W, +struct BufBatchWriter, W: Write> { + shuffle_block_writer: S, + writer: W, buffer: Vec, buffer_max_size: usize, } -impl<'a, W: Write> BufBatchWriter<'a, W> { - fn new(shuffle_block_writer: &'a mut ShuffleBlockWriter, writer: &'a mut W) -> Self { +impl, W: Write> BufBatchWriter { + fn new(shuffle_block_writer: S, writer: W) -> Self { // 1MB should be good enough to avoid frequent system calls, // and also won't cause too much memory usage let buffer_max_size = 1024 * 1024; @@ -1018,9 +1085,10 @@ impl<'a, W: Write> BufBatchWriter<'a, W> { cursor.seek(SeekFrom::End(0))?; let bytes_written = self.shuffle_block_writer + .borrow() .write_batch(batch, &mut cursor, encode_time)?; let pos = cursor.position(); - if pos > self.buffer_max_size as u64 { + if pos >= self.buffer_max_size as u64 { self.writer.write_all(&self.buffer)?; self.buffer.clear(); } @@ -1091,6 +1159,13 @@ mod test { } } + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_single_partition_shuffle_writer() { + shuffle_write_test(1000, 100, 1, None); + // shuffle_write_test(10000, 10, 1, None); + } + #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { @@ -1126,7 +1201,7 @@ mod test { let num_partitions = 2; let runtime_env = create_runtime(memory_limit); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut repartitioner = ShuffleRepartitioner::try_new( + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( 0, "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), From 7ad9dfc19f4048c7fbf6b80063c93fac386bd267 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 27 Mar 2025 22:59:33 +0800 Subject: [PATCH 5/9] Clean up --- native/core/src/execution/shuffle/codec.rs | 5 ++--- native/core/src/execution/shuffle/shuffle_writer.rs | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index 55f968ba4d..edfc84e225 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -750,16 +750,15 @@ mod test { } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub enum CompressionCodec { - #[default] None, Lz4Frame, Zstd(i32), Snappy, } -#[derive(Debug, Clone, Default)] +#[derive(Clone)] pub struct ShuffleBlockWriter { fast_encoding: bool, codec: CompressionCodec, diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 0884530518..e142b18ea8 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -979,7 +979,6 @@ impl Iterator for PartitionedBatchIterator<'_> { } } -#[derive(Default)] struct PartitionWriter { /// Spill file for intermediate shuffle output for this partition. Each spill event /// will append to this file and the contents will be copied to the shuffle file at @@ -1163,7 +1162,7 @@ mod test { #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_single_partition_shuffle_writer() { shuffle_write_test(1000, 100, 1, None); - // shuffle_write_test(10000, 10, 1, None); + shuffle_write_test(10000, 10, 1, None); } #[test] From 6ec846d68cb47aca98c17999eead1a8608d8891f Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Fri, 28 Mar 2025 11:53:36 +0800 Subject: [PATCH 6/9] Address review comments --- .../src/execution/shuffle/shuffle_writer.rs | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index e142b18ea8..01e29c94a1 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -333,12 +333,13 @@ struct ScratchSpace { hashes_buf: Vec, /// Partition ids for each row in the current batch. partition_ids: Vec, - /// The row indices of the rows in each partition. This array is conceptually dividied into + /// The row indices of the rows in each partition. This array is conceptually divided into /// partitions, where each partition contains the row indices of the rows in that partition. /// The length of this array is the same as the number of rows in the batch. - shuffle_partition_ids: Vec, - /// The start indices of partitions in shuffle_partition_ids. The length of this array is the - /// same as the number of partitions. + partition_row_indices: Vec, + /// The start indices of partitions in partition_row_indices. partition_starts[K] and + /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. + /// The length of this array is 1 + the number of partitions. partition_starts: Vec, } @@ -357,6 +358,10 @@ impl MultiPartitionShuffleRepartitioner { enable_fast_encoding: bool, ) -> Result { let num_output_partitions = partitioning.partition_count(); + assert_ne!( + num_output_partitions, 1, + "Use SinglePartitionShufflePartitioner for 1 output partition." + ); // Vectors in the scratch space will be filled with valid values before being used, this // initialization code is simply initializing the vectors to the desired size. @@ -364,7 +369,7 @@ impl MultiPartitionShuffleRepartitioner { let scratch = ScratchSpace { hashes_buf: vec![0; batch_size], partition_ids: vec![0; batch_size], - shuffle_partition_ids: vec![0; batch_size], + partition_row_indices: vec![0; batch_size], partition_starts: vec![0; num_output_partitions + 1], }; @@ -422,7 +427,7 @@ impl MultiPartitionShuffleRepartitioner { match &self.partitioning { Partitioning::Hash(exprs, num_output_partitions) => { let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, shuffled_partition_ids): (&Vec, &Vec) = { + let (partition_starts, partition_row_indices): (&Vec, &Vec) = { let mut timer = self.metrics.repart_time.timer(); // evaluate partition expressions @@ -444,7 +449,7 @@ impl MultiPartitionShuffleRepartitioner { partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32; }); - // count each partition size + // count each partition size, while leaving the last extra element as 0 let partition_counters = &mut scratch.partition_starts; partition_counters.resize(num_output_partitions + 1, 0); partition_counters.fill(0); @@ -453,7 +458,7 @@ impl MultiPartitionShuffleRepartitioner { .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); // accumulate partition counters into partition ends - // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] + // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] let partition_ends = partition_counters; let mut accum = 0; partition_ends.iter_mut().for_each(|v| { @@ -461,16 +466,23 @@ impl MultiPartitionShuffleRepartitioner { accum = *v; }); - // calculate shuffled partition ids - // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 4, 5, 0] which is the - // row indices for rows ordered by their partition id. For example, first partition - // 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let shuffled_partition_ids = &mut scratch.shuffle_partition_ids; - shuffled_partition_ids.resize(input.num_rows(), 0); + // calculate partition row indices and partition starts + // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices + // and partition_starts arrays: + // + // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] + // partition_starts: [0, 1, 4, 6, 7] + // + // partition_starts conceptually splits partition_row_indices into smaller slices. + // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the + // row indices of the input batch that are partitioned into partition K. For example, + // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. + let partition_row_indices = &mut scratch.partition_row_indices; + partition_row_indices.resize(input.num_rows(), 0); for (index, partition_id) in partition_ids.iter().enumerate().rev() { partition_ends[*partition_id as usize] -= 1; let end = partition_ends[*partition_id as usize]; - shuffled_partition_ids[end as usize] = index as u32; + partition_row_indices[end as usize] = index as u32; } // after calculating, partition ends become partition starts @@ -478,13 +490,13 @@ impl MultiPartitionShuffleRepartitioner { timer.stop(); Ok::<(&Vec, &Vec), DataFusionError>(( partition_starts, - shuffled_partition_ids, + partition_row_indices, )) }?; self.buffer_partitioned_batch_may_spill( input, - shuffled_partition_ids, + partition_row_indices, partition_starts, ) .await?; @@ -505,20 +517,28 @@ impl MultiPartitionShuffleRepartitioner { async fn buffer_partitioned_batch_may_spill( &mut self, input: RecordBatch, - shuffled_partition_ids: &[u32], + partition_row_indices: &[u32], partition_starts: &[u32], ) -> Result<()> { let mut mem_growth: usize = input.get_array_memory_size(); let buffered_partition_idx = self.buffered_batches.len() as u32; self.buffered_batches.push(input); + // partition_starts conceptually slices partition_row_indices into smaller slices, + // each slice contains the indices of rows in input that will go into the corresponding + // partition. The following loop iterates over the slices and put the row indices into + // the indices array of the corresponding partition. for (partition_id, (&start, &end)) in partition_starts .iter() .tuple_windows() .enumerate() .filter(|(_, (start, end))| start < end) { - let row_indices = &shuffled_partition_ids[start as usize..end as usize]; + let row_indices = &partition_row_indices[start as usize..end as usize]; + + // Put row indices for the current partition into the indices array of that partition. + // This indices array will be used for calling interleave_record_batch to produce + // shuffled batches. let indices = &mut self.partition_indices[partition_id]; let before_size = indices.allocated_size(); indices.reserve(row_indices.len()); From aab2943c9d790ba8c9011e2fdff3e98c638e20be Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Tue, 1 Apr 2025 16:45:35 +0800 Subject: [PATCH 7/9] Address review comments --- .../src/execution/shuffle/shuffle_writer.rs | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 01e29c94a1..ec559ad6d8 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -567,13 +567,14 @@ impl MultiPartitionShuffleRepartitioner { shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, encode_time: &Time, + write_time: &Time, ) -> Result<()> { let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data); for batch in partition_iter { let batch = batch?; - buf_batch_writer.write(&batch, encode_time)?; + buf_batch_writer.write(&batch, encode_time, write_time)?; } - buf_batch_writer.flush()?; + buf_batch_writer.flush(write_time)?; Ok(()) } @@ -671,8 +672,6 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); - let mut write_time = self.metrics.write_time.timer(); - let output_data = OpenOptions::new() .write(true) .create(true) @@ -693,6 +692,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut self.shuffle_block_writer, &mut output_data, &self.metrics.encode_time, + &self.metrics.write_time, )?; // if we wrote a spill file for this partition then copy the @@ -719,8 +719,6 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { } output_index.flush()?; - write_time.stop(); - self.metrics .baseline .elapsed_compute() @@ -836,14 +834,20 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer - .write(&batch, &self.metrics.encode_time)?; + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; } if num_rows >= self.batch_size { // Write the new batch - self.output_data_writer - .write(&batch, &self.metrics.encode_time)?; + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; } else { // Add the new batch to the buffer self.add_buffered_batch(batch); @@ -870,15 +874,16 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { let concatenated_batch = self.concat_buffered_batches()?; // Write the concatenated buffered batch - let mut write_time = self.metrics.write_time.timer(); if let Some(batch) = concatenated_batch { - self.output_data_writer - .write(&batch, &self.metrics.encode_time)?; + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; } - self.output_data_writer.flush()?; - write_time.stop(); + self.output_data_writer.flush(&self.metrics.write_time)?; - // Write index file. It should only contain 2 entires: 0 and the total number of bytes written + // Write index file. It should only contain 2 entries: 0 and the total number of bytes written let mut index_file = OpenOptions::new() .write(true) .create(true) @@ -1028,7 +1033,6 @@ impl PartitionWriter { metrics: &ShuffleRepartitionerMetrics, ) -> Result { if let Some(batch) = iter.next() { - let mut write_timer = metrics.write_time.timer(); self.ensure_spill_file_created(runtime)?; let total_bytes_written = { @@ -1036,17 +1040,20 @@ impl PartitionWriter { &mut self.shuffle_block_writer, &mut self.spill_file.as_mut().unwrap().file, ); - let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time)?; + let mut bytes_written = + buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; for batch in iter { let batch = batch?; - bytes_written += buf_batch_writer.write(&batch, &metrics.encode_time)?; + bytes_written += buf_batch_writer.write( + &batch, + &metrics.encode_time, + &metrics.write_time, + )?; } - buf_batch_writer.flush()?; + buf_batch_writer.flush(&metrics.write_time)?; bytes_written }; - write_timer.stop(); - Ok(total_bytes_written) } else { Ok(0) @@ -1099,9 +1106,15 @@ impl, W: Write> BufBatchWriter { } } - fn write(&mut self, batch: &RecordBatch, encode_time: &Time) -> Result { + fn write( + &mut self, + batch: &RecordBatch, + encode_time: &Time, + write_time: &Time, + ) -> Result { let mut cursor = Cursor::new(&mut self.buffer); cursor.seek(SeekFrom::End(0))?; + let mut write_timer = write_time.timer(); let bytes_written = self.shuffle_block_writer .borrow() @@ -1111,15 +1124,18 @@ impl, W: Write> BufBatchWriter { self.writer.write_all(&self.buffer)?; self.buffer.clear(); } + write_timer.stop(); Ok(bytes_written) } - fn flush(&mut self) -> Result<()> { + fn flush(&mut self, write_time: &Time) -> Result<()> { + let mut write_timer = write_time.timer(); if !self.buffer.is_empty() { self.writer.write_all(&self.buffer)?; self.buffer.clear(); } self.writer.flush()?; + write_timer.stop(); Ok(()) } } From a78ac0924dd01e5102161b1655b5d96b7441ecbc Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Tue, 1 Apr 2025 22:13:36 +0800 Subject: [PATCH 8/9] Fix timer in write and flush to measure nothing but write_all and flush --- native/core/src/execution/shuffle/shuffle_writer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index ec559ad6d8..c80cf8906e 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -1114,17 +1114,17 @@ impl, W: Write> BufBatchWriter { ) -> Result { let mut cursor = Cursor::new(&mut self.buffer); cursor.seek(SeekFrom::End(0))?; - let mut write_timer = write_time.timer(); let bytes_written = self.shuffle_block_writer .borrow() .write_batch(batch, &mut cursor, encode_time)?; let pos = cursor.position(); if pos >= self.buffer_max_size as u64 { + let mut write_timer = write_time.timer(); self.writer.write_all(&self.buffer)?; + write_timer.stop(); self.buffer.clear(); } - write_timer.stop(); Ok(bytes_written) } @@ -1132,10 +1132,10 @@ impl, W: Write> BufBatchWriter { let mut write_timer = write_time.timer(); if !self.buffer.is_empty() { self.writer.write_all(&self.buffer)?; - self.buffer.clear(); } self.writer.flush()?; write_timer.stop(); + self.buffer.clear(); Ok(()) } } From f2e76d8304bb753870ac434d6052364ce143a28f Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Tue, 1 Apr 2025 22:23:26 +0800 Subject: [PATCH 9/9] Measure more disk writing operations --- native/core/src/execution/shuffle/shuffle_writer.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index c80cf8906e..a2026ce955 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -700,14 +700,20 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { let mut spill_file = BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?); + let mut write_timer = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; + write_timer.stop(); } } + + let mut write_timer = self.metrics.write_time.timer(); output_data.flush()?; + write_timer.stop(); // add one extra offset at last to ease partition length computation offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?; + let mut write_timer = self.metrics.write_time.timer(); let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { DataFusionError::Execution(format!("shuffle write error: {:?}", e)) @@ -718,6 +724,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .map_err(to_df_err)?; } output_index.flush()?; + write_timer.stop(); self.metrics .baseline