diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 60803dfeb8..9496acce38 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1214,7 +1214,7 @@ impl PhysicalPlanner { }?; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( - Arc::clone(&child.native_plan), + Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), partitioning, codec, writer.output_data_file.clone(), diff --git a/native/core/src/execution/shuffle/builders.rs b/native/core/src/execution/shuffle/builders.rs deleted file mode 100644 index 75a0068f4b..0000000000 --- a/native/core/src/execution/shuffle/builders.rs +++ /dev/null @@ -1,599 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::builder::{make_builder, ArrayBuilder}; -use std::sync::Arc; - -use crate::common::bit::ceil; -use arrow::datatypes::*; -use datafusion::arrow::{ - array::*, - datatypes::{DataType, SchemaRef, TimeUnit}, - error::Result as ArrowResult, - record_batch::RecordBatch, -}; - -pub(crate) fn new_array_builders( - schema: &SchemaRef, - batch_size: usize, -) -> Vec> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - if matches!(dt, DataType::Dictionary(_, _)) { - make_dict_builder(dt, batch_size) - } else { - make_builder(dt, batch_size) - } - }) - .collect::>() -} - -macro_rules! primitive_dict_builder_inner_helper { - ($kt:ty, $vt:ty, $capacity:ident) => { - Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity( - $capacity, - $capacity / 100, - )) - }; -} - -macro_rules! primitive_dict_builder_helper { - ($kt:ty, $vt:ident, $capacity:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity) - } - DataType::Int16 => { - primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity) - } - DataType::Int32 => { - primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity) - } - DataType::Int64 => { - primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity) - } - DataType::UInt8 => { - primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity) - } - DataType::UInt16 => { - primitive_dict_builder_inner_helper!($kt, UInt16Type, $capacity) - } - DataType::UInt32 => { - primitive_dict_builder_inner_helper!($kt, UInt32Type, $capacity) - } - DataType::UInt64 => { - primitive_dict_builder_inner_helper!($kt, UInt64Type, $capacity) - } - DataType::Float32 => { - primitive_dict_builder_inner_helper!($kt, Float32Type, $capacity) - } - DataType::Float64 => { - primitive_dict_builder_inner_helper!($kt, Float64Type, $capacity) - } - DataType::Decimal128(p, s) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = - Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s)); - Box::new( - PrimitiveDictionaryBuilder::<$kt, Decimal128Type>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = TimestampMicrosecondBuilder::new() - .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, timezone.clone())); - Box::new( - PrimitiveDictionaryBuilder::<$kt, TimestampMicrosecondType>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Date32 => { - primitive_dict_builder_inner_helper!($kt, Date32Type, $capacity) - } - DataType::Date64 => { - primitive_dict_builder_inner_helper!($kt, Date64Type, $capacity) - } - t => unimplemented!("{:?} is not supported", t), - } - }; -} - -macro_rules! byte_dict_builder_inner_helper { - ($kt:ty, $capacity:ident, $builder:ident) => { - Box::new($builder::<$kt>::with_capacity( - $capacity, - $capacity / 100, - $capacity, - )) - }; -} - -/// Returns a dictionary array builder with capacity `capacity` that corresponds to the datatype -/// `DataType` This function is useful to construct arrays from an arbitrary vectors with -/// known/expected schema. -/// TODO: move this to the upstream. -fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box { - match datatype { - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - match key_type.as_ref() { - DataType::Int8 => primitive_dict_builder_helper!(Int8Type, value_type, capacity), - DataType::Int16 => primitive_dict_builder_helper!(Int16Type, value_type, capacity), - DataType::Int32 => primitive_dict_builder_helper!(Int32Type, value_type, capacity), - DataType::Int64 => primitive_dict_builder_helper!(Int64Type, value_type, capacity), - DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, value_type, capacity), - DataType::UInt16 => { - primitive_dict_builder_helper!(UInt16Type, value_type, capacity) - } - DataType::UInt32 => { - primitive_dict_builder_helper!(UInt32Type, value_type, capacity) - } - DataType::UInt64 => { - primitive_dict_builder_helper!(UInt64Type, value_type, capacity) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, StringDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, StringDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, StringDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, StringDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, StringDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, StringDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, StringDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, StringDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeStringDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, BinaryDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - t => panic!("Data type {t:?} is not currently supported"), - } -} - -pub(crate) fn slot_size(len: usize, data_type: &DataType) -> usize { - match data_type { - DataType::Boolean => ceil(len, 8), - DataType::Int8 => len, - DataType::Int16 => len * 2, - DataType::Int32 => len * 4, - DataType::Int64 => len * 8, - DataType::UInt8 => len, - DataType::UInt16 => len * 2, - DataType::UInt32 => len * 4, - DataType::UInt64 => len * 8, - DataType::Float32 => len * 4, - DataType::Float64 => len * 8, - DataType::Date32 => len * 4, - DataType::Date64 => len * 8, - DataType::Time32(TimeUnit::Second) => len * 4, - DataType::Time32(TimeUnit::Millisecond) => len * 4, - DataType::Time64(TimeUnit::Microsecond) => len * 8, - DataType::Time64(TimeUnit::Nanosecond) => len * 8, - // TODO: this is not accurate, but should be good enough for now - DataType::Utf8 => len * 100 + len * 4, - DataType::LargeUtf8 => len * 100 + len * 8, - DataType::Decimal128(_, _) => len * 16, - DataType::Dictionary(key_type, value_type) => { - // TODO: this is not accurate, but should be good enough for now - slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) - } - // TODO: this is not accurate, but should be good enough for now - DataType::Binary => len * 100 + len * 4, - DataType::LargeBinary => len * 100 + len * 8, - DataType::FixedSizeBinary(s) => len * (*s as usize), - DataType::Timestamp(_, _) => len * 8, - dt => unimplemented!( - "{}", - format!("data type {dt} not supported in shuffle write") - ), - } -} - -pub(crate) fn append_columns( - to: &mut Box, - from: &Arc, - indices: &[usize], - data_type: &DataType, -) { - /// Append values from `from` to `to` using `indices`. - macro_rules! append { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::().unwrap(); - let f = from.as_any().downcast_ref::().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method returning - /// a `Result`. - macro_rules! append_unwrap { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::().unwrap(); - let f = from.as_any().downcast_ref::().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)).unwrap(); - } else { - t.append_null(); - } - } - }}; - } - - /// Appends values from a dictionary array to a dictionary builder. - macro_rules! append_dict { - ($kt:ty, $builder:ty, $dict_array:ty) => {{ - let t = to.as_any_mut().downcast_mut::<$builder>().unwrap(); - let f = from - .as_any() - .downcast_ref::>() - .unwrap() - .downcast_dict::<$dict_array>() - .unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - macro_rules! append_dict_helper { - ($kt:ident, $ty:ty, $dict_array:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => append_dict!(Int8Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int16 => append_dict!(Int16Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int32 => append_dict!(Int32Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::Int64 => append_dict!(Int64Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::UInt8 => append_dict!(UInt8Type, PrimitiveDictionaryBuilder, $dict_array), - DataType::UInt16 => { - append_dict!(UInt16Type, PrimitiveDictionaryBuilder, $dict_array) - } - DataType::UInt32 => { - append_dict!(UInt32Type, PrimitiveDictionaryBuilder, $dict_array) - } - DataType::UInt64 => { - append_dict!(UInt64Type, PrimitiveDictionaryBuilder, $dict_array) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - macro_rules! primitive_append_dict_helper { - ($kt:ident, $vt:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - append_dict_helper!($kt, Int8Type, Int8Array) - } - DataType::Int16 => { - append_dict_helper!($kt, Int16Type, Int16Array) - } - DataType::Int32 => { - append_dict_helper!($kt, Int32Type, Int32Array) - } - DataType::Int64 => { - append_dict_helper!($kt, Int64Type, Int64Array) - } - DataType::UInt8 => { - append_dict_helper!($kt, UInt8Type, UInt8Array) - } - DataType::UInt16 => { - append_dict_helper!($kt, UInt16Type, UInt16Array) - } - DataType::UInt32 => { - append_dict_helper!($kt, UInt32Type, UInt32Array) - } - DataType::UInt64 => { - append_dict_helper!($kt, UInt64Type, UInt64Array) - } - DataType::Float32 => { - append_dict_helper!($kt, Float32Type, Float32Array) - } - DataType::Float64 => { - append_dict_helper!($kt, Float64Type, Float64Array) - } - DataType::Decimal128(_, _) => { - append_dict_helper!($kt, Decimal128Type, Decimal128Array) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append_dict_helper!($kt, TimestampMicrosecondType, TimestampMicrosecondArray) - } - DataType::Date32 => { - append_dict_helper!($kt, Date32Type, Date32Array) - } - DataType::Date64 => { - append_dict_helper!($kt, Date64Type, Date64Array) - } - t => unimplemented!("{:?} is not supported for appending dictionary builder", t), - } - }; - } - - macro_rules! append_byte_dict { - ($kt:ident, $byte_type:ty, $array_type:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => { - append_dict!(Int8Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int16 => { - append_dict!(Int16Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int32 => { - append_dict!(Int32Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::Int64 => { - append_dict!(Int64Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt8 => { - append_dict!(UInt8Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt16 => { - append_dict!(UInt16Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt32 => { - append_dict!(UInt32Type, GenericByteDictionaryBuilder, $array_type) - } - DataType::UInt64 => { - append_dict!(UInt64Type, GenericByteDictionaryBuilder, $array_type) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - match data_type { - DataType::Boolean => append!(Boolean), - DataType::Int8 => append!(Int8), - DataType::Int16 => append!(Int16), - DataType::Int32 => append!(Int32), - DataType::Int64 => append!(Int64), - DataType::UInt8 => append!(UInt8), - DataType::UInt16 => append!(UInt16), - DataType::UInt32 => append!(UInt32), - DataType::UInt64 => append!(UInt64), - DataType::Float32 => append!(Float32), - DataType::Float64 => append!(Float64), - DataType::Date32 => append!(Date32), - DataType::Date64 => append!(Date64), - DataType::Time32(TimeUnit::Second) => append!(Time32Second), - DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond), - DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond), - DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond), - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append!(TimestampMicrosecond) - } - DataType::Utf8 => append!(String), - DataType::LargeUtf8 => append!(LargeString), - DataType::Decimal128(_, _) => append!(Decimal128), - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - primitive_append_dict_helper!(key_type, value_type) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - append_byte_dict!(key_type, GenericStringType, StringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - append_byte_dict!(key_type, GenericStringType, LargeStringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - append_byte_dict!(key_type, GenericBinaryType, BinaryArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - append_byte_dict!(key_type, GenericBinaryType, LargeBinaryArray) - } - DataType::Binary => append!(Binary), - DataType::LargeBinary => append!(LargeBinary), - DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary), - t => unimplemented!( - "{}", - format!("data type {} not supported in shuffle write", t) - ), - } -} - -pub(crate) fn make_batch( - schema: SchemaRef, - mut arrays: Vec>, - row_count: usize, -) -> ArrowResult { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); - RecordBatch::try_new_with_options(schema, columns, &options) -} diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index ff02b933f8..edfc84e225 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -758,6 +758,7 @@ pub enum CompressionCodec { Snappy, } +#[derive(Clone)] pub struct ShuffleBlockWriter { fast_encoding: bool, codec: CompressionCodec, diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index acd7ff551c..8aed74b997 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod builders; pub(crate) mod codec; mod list; mod map; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 16f605acbb..a2026ce955 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,11 +17,10 @@ //! Defines the External shuffle repartition plan. -use crate::execution::shuffle::builders::{ - append_columns, make_batch, new_array_builders, slot_size, -}; use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter}; +use arrow::compute::interleave_record_batch; use async_trait::async_trait; +use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::EmptyRecordBatchStream; @@ -47,13 +46,14 @@ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::executor::block_on; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; -use std::io::Error; +use std::borrow::Borrow; +use std::io::{Cursor, Error, SeekFrom}; use std::{ any::Any, fmt, fmt::{Debug, Formatter}, fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Cursor, Seek, SeekFrom, Write}, + io::{BufReader, BufWriter, Seek, Write}, sync::Arc, }; use tokio::time::Instant; @@ -215,18 +215,30 @@ async fn external_shuffle( enable_fast_encoding: bool, ) -> Result { let schema = input.schema(); - let mut repartitioner = ShuffleRepartitioner::try_new( - partition, - output_data_file, - output_index_file, - Arc::clone(&schema), - partitioning, - metrics, - context.runtime_env(), - context.session_config().batch_size(), - codec, - enable_fast_encoding, - )?; + + let mut repartitioner: Box = match &partitioning { + any if any.partition_count() == 1 => Box::new(SinglePartitionShufflePartitioner::try_new( + output_data_file, + output_index_file, + Arc::clone(&schema), + metrics, + context.session_config().batch_size(), + codec, + enable_fast_encoding, + )?), + _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + enable_fast_encoding, + )?), + }; while let Some(batch) = input.next().await { // Block on the repartitioner to insert the batch and shuffle the rows @@ -235,7 +247,11 @@ async fn external_shuffle( // current batch in the repartitioner. block_on(repartitioner.insert_batch(batch?))?; } - repartitioner.shuffle_write().await + + repartitioner.shuffle_write().await?; + + // shuffle writer always has empty output + Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema)))) } struct ShuffleRepartitionerMetrics { @@ -283,24 +299,51 @@ impl ShuffleRepartitionerMetrics { } } -struct ShuffleRepartitioner { +#[async_trait::async_trait] +trait ShufflePartitioner: Send + Sync { + /// Insert a batch into the partitioner + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; + /// Write shuffle data and shuffle index file to disk + async fn shuffle_write(&mut self) -> Result<()>; +} + +/// A partitioner that uses a hash function to partition data into multiple partitions +struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, - schema: SchemaRef, - buffered_partitions: Vec, + buffered_batches: Vec, + partition_indices: Vec>, + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use partitioning: Partitioning, runtime: Arc, metrics: ShuffleRepartitionerMetrics, - /// Hashes for each row in the current batch - hashes_buf: Vec, - /// Partition ids for each row in the current batch - partition_ids: Vec, + /// Reused scratch space for computing partition indices + scratch: ScratchSpace, /// The configured batch size batch_size: usize, + /// Reservation for repartitioning + reservation: MemoryReservation, +} + +#[derive(Default)] +struct ScratchSpace { + /// Hashes for each row in the current batch. + hashes_buf: Vec, + /// Partition ids for each row in the current batch. + partition_ids: Vec, + /// The row indices of the rows in each partition. This array is conceptually divided into + /// partitions, where each partition contains the row indices of the rows in that partition. + /// The length of this array is the same as the number of rows in the batch. + partition_row_indices: Vec, + /// The start indices of partitions in partition_row_indices. partition_starts[K] and + /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. + /// The length of this array is 1 + the number of partitions. + partition_starts: Vec, } -impl ShuffleRepartitioner { +impl MultiPartitionShuffleRepartitioner { #[allow(clippy::too_many_arguments)] pub fn try_new( partition: usize, @@ -314,17 +357,29 @@ impl ShuffleRepartitioner { codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result { - let mut hashes_buf = Vec::with_capacity(batch_size); - let mut partition_ids = Vec::with_capacity(batch_size); - - // Safety: `hashes_buf` will be filled with valid values before being used. - // `partition_ids` will be filled with valid values before being used. - unsafe { - hashes_buf.set_len(batch_size); - partition_ids.set_len(batch_size); - } + let num_output_partitions = partitioning.partition_count(); + assert_ne!( + num_output_partitions, 1, + "Use SinglePartitionShufflePartitioner for 1 output partition." + ); + + // Vectors in the scratch space will be filled with valid values before being used, this + // initialization code is simply initializing the vectors to the desired size. + // The initial values are not used. + let scratch = ScratchSpace { + hashes_buf: vec![0; batch_size], + partition_ids: vec![0; batch_size], + partition_row_indices: vec![0; batch_size], + partition_starts: vec![0; num_output_partitions + 1], + }; + + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec.clone())?; + + let partition_writers = (0..num_output_partitions) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .collect::>>()?; - // This will be split into each PartitionBuffer Reservations, and does not need to be kept itself let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition)) .with_can_spill(true) .register(&runtime.memory_pool); @@ -332,47 +387,19 @@ impl ShuffleRepartitioner { Ok(Self { output_data_file, output_index_file, - schema: Arc::clone(&schema), - buffered_partitions: (0..partitioning.partition_count()) - .map(|_| { - PartitionBuffer::try_new( - Arc::clone(&schema), - batch_size, - reservation.new_empty(), - codec.clone(), - enable_fast_encoding, - ) - }) - .collect::>>()?, + buffered_batches: vec![], + partition_indices: vec![vec![]; num_output_partitions], + partition_writers, + shuffle_block_writer, partitioning, runtime, metrics, - hashes_buf, - partition_ids, + scratch, batch_size, + reservation, }) } - /// Shuffles rows in input batch into corresponding partition buffer. - /// This function will slice input batch according to configured batch size and then - /// shuffle rows into corresponding partition buffer. - async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - let start_time = Instant::now(); - let mut start = 0; - while start < batch.num_rows() { - let end = (start + self.batch_size).min(batch.num_rows()); - let batch = batch.slice(start, end - start); - self.partitioning_batch(batch).await?; - start = end; - } - self.metrics.input_batches.add(1); - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - } - /// Shuffles rows in input batch into corresponding partition buffer. /// This function first calculates hashes for rows and then takes rows in same /// partition as a record batch which is appended into partition buffer. @@ -398,21 +425,9 @@ impl ShuffleRepartitioner { self.metrics.baseline.record_output(input.num_rows()); match &self.partitioning { - any if any.partition_count() == 1 => { - let buffered_partitions = &mut self.buffered_partitions; - - assert_eq!(buffered_partitions.len(), 1, "Expected 1 partition"); - - // TODO the single partition case could be optimized to avoid appending all - // rows from the batch into builders and then recreating the batch - // https://github.com/apache/datafusion-comet/issues/1453 - let indices = (0..input.num_rows()).collect::>(); - - self.append_rows_to_partition(input.columns(), &indices, 0) - .await?; - } Partitioning::Hash(exprs, num_output_partitions) => { - let (partition_starts, shuffled_partition_ids): (Vec, Vec) = { + let mut scratch = std::mem::take(&mut self.scratch); + let (partition_starts, partition_row_indices): (&Vec, &Vec) = { let mut timer = self.metrics.repart_time.timer(); // evaluate partition expressions @@ -422,69 +437,70 @@ impl ShuffleRepartitioner { .collect::>>()?; // use identical seed as spark hash partition - let hashes_buf = &mut self.hashes_buf[..arrays[0].len()]; + let hashes_buf = &mut scratch.hashes_buf[..arrays[0].len()]; hashes_buf.fill(42_u32); // Hash arrays and compute buckets based on number of partitions - let partition_ids = &mut self.partition_ids[..arrays[0].len()]; + let partition_ids = &mut scratch.partition_ids[..arrays[0].len()]; create_murmur3_hashes(&arrays, hashes_buf)? .iter() .enumerate() .for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, *num_output_partitions) as u64 + partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32; }); - // count each partition size - let mut partition_counters = vec![0usize; *num_output_partitions]; + // count each partition size, while leaving the last extra element as 0 + let partition_counters = &mut scratch.partition_starts; + partition_counters.resize(num_output_partitions + 1, 0); + partition_counters.fill(0); partition_ids .iter() .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); // accumulate partition counters into partition ends - // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] - let mut partition_ends = partition_counters; + // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] + let partition_ends = partition_counters; let mut accum = 0; partition_ends.iter_mut().for_each(|v| { *v += accum; accum = *v; }); - // calculate shuffled partition ids - // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 4, 5, 0] which is the - // row indices for rows ordered by their partition id. For example, first partition - // 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let mut shuffled_partition_ids = vec![0usize; input.num_rows()]; + // calculate partition row indices and partition starts + // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices + // and partition_starts arrays: + // + // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] + // partition_starts: [0, 1, 4, 6, 7] + // + // partition_starts conceptually splits partition_row_indices into smaller slices. + // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the + // row indices of the input batch that are partitioned into partition K. For example, + // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. + let partition_row_indices = &mut scratch.partition_row_indices; + partition_row_indices.resize(input.num_rows(), 0); for (index, partition_id) in partition_ids.iter().enumerate().rev() { partition_ends[*partition_id as usize] -= 1; let end = partition_ends[*partition_id as usize]; - shuffled_partition_ids[end] = index; + partition_row_indices[end as usize] = index as u32; } // after calculating, partition ends become partition starts - let mut partition_starts = partition_ends; - partition_starts.push(input.num_rows()); + let partition_starts = partition_ends; timer.stop(); - Ok::<(Vec, Vec), DataFusionError>(( + Ok::<(&Vec, &Vec), DataFusionError>(( partition_starts, - shuffled_partition_ids, + partition_row_indices, )) }?; - // For each interval of row indices of partition, taking rows from input batch and - // appending into output buffer. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) - { - self.append_rows_to_partition( - input.columns(), - &shuffled_partition_ids[start..end], - partition_id, - ) - .await?; - } + self.buffer_partitioned_batch_may_spill( + input, + partition_row_indices, + partition_starts, + ) + .await?; + self.scratch = scratch; } other => { // this should be unreachable as long as the validation logic @@ -498,81 +514,72 @@ impl ShuffleRepartitioner { Ok(()) } - /// Writes buffered shuffled record batches into Arrow IPC bytes. - async fn shuffle_write(&mut self) -> Result { - let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); - let buffered_partitions = &mut self.buffered_partitions; - let num_output_partitions = buffered_partitions.len(); - let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; - let mut offsets = vec![0; num_output_partitions + 1]; - for i in 0..num_output_partitions { - buffered_partitions[i].flush(&self.metrics)?; - output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); - } - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let mut write_time = self.metrics.write_time.timer(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - - let mut output_data = BufWriter::new(output_data); - - for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; - output_data.write_all(&output_batches[i])?; - output_batches[i].clear(); - - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_data) = self.buffered_partitions[i].spill_file.as_ref() { - let mut spill_file = BufReader::new( - File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?, - ); - std::io::copy(&mut spill_file, &mut output_data).map_err(Self::to_df_err)?; + async fn buffer_partitioned_batch_may_spill( + &mut self, + input: RecordBatch, + partition_row_indices: &[u32], + partition_starts: &[u32], + ) -> Result<()> { + let mut mem_growth: usize = input.get_array_memory_size(); + let buffered_partition_idx = self.buffered_batches.len() as u32; + self.buffered_batches.push(input); + + // partition_starts conceptually slices partition_row_indices into smaller slices, + // each slice contains the indices of rows in input that will go into the corresponding + // partition. The following loop iterates over the slices and put the row indices into + // the indices array of the corresponding partition. + for (partition_id, (&start, &end)) in partition_starts + .iter() + .tuple_windows() + .enumerate() + .filter(|(_, (start, end))| start < end) + { + let row_indices = &partition_row_indices[start as usize..end as usize]; + + // Put row indices for the current partition into the indices array of that partition. + // This indices array will be used for calling interleave_record_batch to produce + // shuffled batches. + let indices = &mut self.partition_indices[partition_id]; + let before_size = indices.allocated_size(); + indices.reserve(row_indices.len()); + for row_idx in row_indices { + indices.push((buffered_partition_idx, *row_idx)); } + let after_size = indices.allocated_size(); + mem_growth += after_size.saturating_sub(before_size); } - output_data.flush()?; - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position().map_err(Self::to_df_err)?; - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) - })?); - for offset in offsets { - output_index - .write_all(&(offset as i64).to_le_bytes()[..]) - .map_err(Self::to_df_err)?; + let grow_result = { + let mut timer = self.metrics.mempool_time.timer(); + let result = self.reservation.try_grow(mem_growth); + timer.stop(); + result + }; + if grow_result.is_err() { + self.spill().await?; } - output_index.flush()?; - write_time.stop(); - - elapsed_compute.stop(); - - // shuffle writer always has empty output - Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))) + Ok(()) } - fn to_df_err(e: Error) -> DataFusionError { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + fn shuffle_write_partition( + partition_iter: &mut PartitionedBatchIterator, + shuffle_block_writer: &mut ShuffleBlockWriter, + output_data: &mut BufWriter, + encode_time: &Time, + write_time: &Time, + ) -> Result<()> { + let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data); + for batch in partition_iter { + let batch = batch?; + buf_batch_writer.write(&batch, encode_time, write_time)?; + } + buf_batch_writer.flush(write_time)?; + Ok(()) } fn used(&self) -> usize { - self.buffered_partitions - .iter() - .map(|b| b.reservation.size()) - .sum() + self.reservation.size() } fn spilled_bytes(&self) -> usize { @@ -587,6 +594,20 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } + /// This function transfers the ownership of the buffered batches and partition indices from the + /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct + /// can be used to produce shuffled batches. + fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { + let num_output_partitions = self.partition_indices.len(); + let buffered_batches = std::mem::take(&mut self.buffered_batches); + // let indices = std::mem::take(&mut self.partition_indices); + let indices = std::mem::replace( + &mut self.partition_indices, + vec![vec![]; num_output_partitions], + ); + PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) + } + async fn spill(&mut self) -> Result<()> { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", @@ -595,68 +616,125 @@ impl ShuffleRepartitioner { ); // we could always get a chance to free some memory as long as we are holding some - if self.buffered_partitions.is_empty() { + if self.buffered_batches.is_empty() { return Ok(()); } + let num_output_partitions = self.partition_writers.len(); + let mut partitioned_batches = self.partitioned_batches(); let mut spilled_bytes = 0; - for p in &mut self.buffered_partitions { - spilled_bytes += p.spill(&self.runtime, &self.metrics)?; + + for partition_id in 0..num_output_partitions { + let partition_writer = &mut self.partition_writers[partition_id]; + let mut iter = partitioned_batches.produce(partition_id); + spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?; } + let mut timer = self.metrics.mempool_time.timer(); + self.reservation.free(); + timer.stop(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); Ok(()) } +} - /// Appends rows of specified indices from columns into active array builders in the specified partition. - async fn append_rows_to_partition( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - partition_id: usize, - ) -> Result<()> { - let output = &mut self.buffered_partitions[partition_id]; +#[async_trait::async_trait] +impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { + /// Shuffles rows in input batch into corresponding partition buffer. + /// This function will slice input batch according to configured batch size and then + /// shuffle rows into corresponding partition buffer. + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let batch = batch.slice(start, end - start); + self.partitioning_batch(batch).await?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics)?; + /// Writes buffered shuffled record batches into Arrow IPC bytes. + async fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); - loop { - match output_ret { - AppendRowStatus::Appended => { - break; - } - AppendRowStatus::StartIndex(new_start) => { - // Cannot allocate enough memory for the array builders in this partition, - // spill all partitions and retry. - self.spill().await?; - - start_index = new_start; - let output = &mut self.buffered_partitions[partition_id]; - output_ret = - output.append_rows(columns, indices, start_index, &self.metrics)?; - - if let AppendRowStatus::StartIndex(new_start) = output_ret { - if new_start == start_index { - // If the start index is not updated, it means that the partition - // is still not able to allocate enough memory for the array builders. - return Err(DataFusionError::Internal( - "Partition is still not able to allocate enough memory for the array builders after spilling." - .to_string(), - )); - } - } - } + let mut partitioned_batches = self.partitioned_batches(); + let num_output_partitions = self.partition_indices.len(); + let mut offsets = vec![0; num_output_partitions + 1]; + + let data_file = self.output_data_file.clone(); + let index_file = self.output_index_file.clone(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + + let mut output_data = BufWriter::new(output_data); + + #[allow(clippy::needless_range_loop)] + for i in 0..num_output_partitions { + offsets[i] = output_data.stream_position()?; + + // Write in memory batches to output data file + let mut partition_iter = partitioned_batches.produce(i); + Self::shuffle_write_partition( + &mut partition_iter, + &mut self.shuffle_block_writer, + &mut output_data, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { + let mut spill_file = + BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?); + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; + write_timer.stop(); } } + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + })?); + for offset in offsets { + output_index + .write_all(&(offset as i64).to_le_bytes()[..]) + .map_err(to_df_err)?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); Ok(()) } } -impl Debug for ShuffleRepartitioner { +impl Debug for MultiPartitionShuffleRepartitioner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("ShuffleRepartitioner") .field("memory_used", &self.used()) @@ -667,175 +745,331 @@ impl Debug for ShuffleRepartitioner { } } -/// The status of appending rows to a partition buffer. -#[derive(Debug)] -enum AppendRowStatus { - /// Rows were appended - Appended, - /// Not all rows were appended due to lack of available memory - StartIndex(usize), -} - -struct PartitionBuffer { - /// The schema of batches to be partitioned. - schema: SchemaRef, - /// The "frozen" Arrow IPC bytes of active data. They are frozen when `flush` is called. - frozen: Vec, - /// Array builders for appending rows into buffering batches. - active: Vec>, - /// The estimation of memory size of active builders in bytes when they are filled. - active_slots_mem_size: usize, - /// Number of rows in active builders. - num_active_rows: usize, - /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, - /// the active array builders will be frozen and appended to frozen buffer `frozen`. +/// A partitioner that writes all shuffle data to a single file and a single index file +struct SinglePartitionShufflePartitioner { + // output_data_file: File, + output_data_writer: BufBatchWriter, + output_index_path: String, + /// Batches that are smaller than the batch size and to be concatenated + buffered_batches: Vec, + /// Number of rows in the concatenating batches + num_buffered_rows: usize, + /// Metrics for the repartitioner + metrics: ShuffleRepartitionerMetrics, + /// The configured batch size batch_size: usize, - /// Memory reservation for this partition buffer. - reservation: MemoryReservation, - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression - shuffle_block_writer: ShuffleBlockWriter, } -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, -} - -impl PartitionBuffer { +impl SinglePartitionShufflePartitioner { fn try_new( + output_data_path: String, + output_index_path: String, schema: SchemaRef, + metrics: ShuffleRepartitionerMetrics, batch_size: usize, - reservation: MemoryReservation, codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result { let shuffle_block_writer = - ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec)?; - let active_slots_mem_size = schema - .fields() - .iter() - .map(|field| slot_size(batch_size, field.data_type())) - .sum::(); + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec.clone())?; + + let output_data_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_path) + .map_err(to_df_err)?; + + let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file); + Ok(Self { - schema, - frozen: vec![], - active: vec![], - active_slots_mem_size, - num_active_rows: 0, + output_data_writer, + output_index_path, + buffered_batches: vec![], + num_buffered_rows: 0, + metrics, batch_size, - reservation, - spill_file: None, - shuffle_block_writer, }) } - /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. - fn allocate_active_builders(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { - if self.active.is_empty() { - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_grow(self.active_slots_mem_size)?; - mempool_timer.stop(); - - let mut repart_timer = metrics.repart_time.timer(); - self.active = new_array_builders(&self.schema, self.batch_size); - repart_timer.stop(); - } - Ok(()) + /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated + /// and written to the output data file when the number of rows in the buffer reaches the batch size. + fn add_buffered_batch(&mut self, batch: RecordBatch) { + self.num_buffered_rows += batch.num_rows(); + self.buffered_batches.push(batch); } - /// Appends rows of specified indices from columns into active array builders. - fn append_rows( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - start_index: usize, - metrics: &ShuffleRepartitionerMetrics, - ) -> Result { - let mut start = start_index; - - // loop until all indices are processed - while start < indices.len() { - let end = (start + self.batch_size).min(indices.len()); - - // allocate builders - if self.allocate_active_builders(metrics).is_err() { - // could not allocate memory for builders, so abort - // and return the current index - return Ok(AppendRowStatus::StartIndex(start)); + /// Consumes buffered batches and return a concatenated batch if successful + fn concat_buffered_batches(&mut self) -> Result> { + if self.buffered_batches.is_empty() { + Ok(None) + } else if self.buffered_batches.len() == 1 { + let batch = self.buffered_batches.remove(0); + self.num_buffered_rows = 0; + Ok(Some(batch)) + } else { + let schema = &self.buffered_batches[0].schema(); + match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) { + Ok(concatenated) => { + self.buffered_batches.clear(); + self.num_buffered_rows = 0; + Ok(Some(concatenated)) + } + Err(e) => Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + )), } + } + } +} - let mut repart_timer = metrics.repart_time.timer(); - self.active - .iter_mut() - .zip(columns) - .for_each(|(builder, column)| { - append_columns(builder, column, &indices[start..end], column.data_type()); - }); - self.num_active_rows += end - start; - repart_timer.stop(); - start = end; +#[async_trait::async_trait] +impl ShufflePartitioner for SinglePartitionShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + + if num_rows > 0 { + self.metrics.data_size.add(batch.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { + let concatenated_batch = self.concat_buffered_batches()?; - if self.num_active_rows >= self.batch_size { - self.flush(metrics)?; + let write_start_time = Instant::now(); + + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } + + if num_rows >= self.batch_size { + // Write the new batch + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } else { + // Add the new batch to the buffer + self.add_buffered_batch(batch); + } + + self.metrics + .write_time + .add_duration(write_start_time.elapsed()); + } else { + self.add_buffered_batch(batch); } } - Ok(AppendRowStatus::Appended) + + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) } - /// Flush active data into frozen bytes. This can reduce memory usage because the frozen - /// bytes are compressed. - fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { - if self.num_active_rows == 0 { - return Ok(()); + async fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); + let concatenated_batch = self.concat_buffered_batches()?; + + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; } + self.output_data_writer.flush(&self.metrics.write_time)?; + + // Write index file. It should only contain 2 entries: 0 and the total number of bytes written + let mut index_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(self.output_index_path.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + let data_file_length = self + .output_data_writer + .writer + .stream_position() + .map_err(to_df_err)?; + for offset in [0, data_file_length] { + index_file + .write_all(&(offset as i64).to_le_bytes()[..]) + .map_err(to_df_err)?; + } + index_file.flush()?; + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} - // active -> staging - let active = std::mem::take(&mut self.active); - let num_rows = self.num_active_rows; - self.num_active_rows = 0; +fn to_df_err(e: Error) -> DataFusionError { + DataFusionError::Execution(format!("shuffle write error: {:?}", e)) +} - let mut repart_timer = metrics.repart_time.timer(); - let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; - repart_timer.stop(); +/// A helper struct to produce shuffled batches. +/// This struct takes ownership of the buffered batches and partition indices from the +/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. +struct PartitionedBatchesProducer { + buffered_batches: Vec, + partition_indices: Vec>, + batch_size: usize, +} - let mut cursor = Cursor::new(&mut self.frozen); - cursor.seek(SeekFrom::End(0))?; - let bytes_written = self.shuffle_block_writer.write_batch( - &frozen_batch, - &mut cursor, - &metrics.encode_time, - )?; - - // we typically expect the frozen bytes to take up less memory than - // the builders due to compression but there could be edge cases where - // this is not the case - let mut mempool_timer = metrics.mempool_time.timer(); - if self.active_slots_mem_size >= bytes_written { - self.reservation - .try_shrink(self.active_slots_mem_size - bytes_written)?; - } else { - self.reservation - .try_grow(bytes_written - self.active_slots_mem_size)?; +impl PartitionedBatchesProducer { + fn new( + buffered_batches: Vec, + indices: Vec>, + batch_size: usize, + ) -> Self { + Self { + partition_indices: indices, + buffered_batches, + batch_size, } - mempool_timer.stop(); + } - Ok(()) + fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator { + PartitionedBatchIterator::new( + &self.partition_indices[partition_id], + &self.buffered_batches, + self.batch_size, + ) + } +} + +struct PartitionedBatchIterator<'a> { + record_batches: Vec<&'a RecordBatch>, + batch_size: usize, + indices: Vec<(usize, usize)>, + pos: usize, +} + +impl<'a> PartitionedBatchIterator<'a> { + fn new( + indices: &'a [(u32, u32)], + buffered_batches: &'a [RecordBatch], + batch_size: usize, + ) -> Self { + if indices.is_empty() { + // Avoid unnecessary allocations when the partition is empty + return Self { + record_batches: vec![], + batch_size, + indices: vec![], + pos: 0, + }; + } + let record_batches = buffered_batches.iter().collect::>(); + let current_indices = indices + .iter() + .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) + .collect::>(); + Self { + record_batches, + batch_size, + indices: current_indices, + pos: 0, + } + } +} + +impl Iterator for PartitionedBatchIterator<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.pos >= self.indices.len() { + return None; + } + + let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); + let indices = &self.indices[self.pos..indices_end]; + match interleave_record_batch(&self.record_batches, indices) { + Ok(batch) => { + self.pos = indices_end; + Some(Ok(batch)) + } + Err(e) => Some(Err(DataFusionError::ArrowError( + e, + Some(DataFusionError::get_back_trace()), + ))), + } + } +} + +struct PartitionWriter { + /// Spill file for intermediate shuffle output for this partition. Each spill event + /// will append to this file and the contents will be copied to the shuffle file at + /// the end of processing. + spill_file: Option, + /// Writer that performs encoding and compression + shuffle_block_writer: ShuffleBlockWriter, +} + +struct SpillFile { + temp_file: RefCountedTempFile, + file: File, +} + +impl PartitionWriter { + fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result { + Ok(Self { + spill_file: None, + shuffle_block_writer, + }) } fn spill( &mut self, + iter: &mut PartitionedBatchIterator, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, ) -> Result { - self.flush(metrics).unwrap(); - let output_batches = std::mem::take(&mut self.frozen); - let mut write_timer = metrics.write_time.timer(); + if let Some(batch) = iter.next() { + self.ensure_spill_file_created(runtime)?; + + let total_bytes_written = { + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + ); + let mut bytes_written = + buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; + for batch in iter { + let batch = batch?; + bytes_written += buf_batch_writer.write( + &batch, + &metrics.encode_time, + &metrics.write_time, + )?; + } + buf_batch_writer.flush(&metrics.write_time)?; + bytes_written + }; + + Ok(total_bytes_written) + } else { + Ok(0) + } + } + + fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> Result<()> { if self.spill_file.is_none() { + // Spill file is not yet created, create it let spill_file = runtime .disk_manager .create_tmp_file("shuffle writer spill")?; @@ -852,16 +1086,64 @@ impl PartitionBuffer { file: spill_data, }); } - self.spill_file - .as_mut() - .unwrap() - .file - .write_all(&output_batches)?; + Ok(()) + } +} + +/// Write batches to writer while using a buffer to avoid frequent system calls. +/// The record batches were first written by ShuffleBlockWriter into an internal buffer. +/// Once the buffer exceeds the max size, the buffer will be flushed to the writer. +struct BufBatchWriter, W: Write> { + shuffle_block_writer: S, + writer: W, + buffer: Vec, + buffer_max_size: usize, +} + +impl, W: Write> BufBatchWriter { + fn new(shuffle_block_writer: S, writer: W) -> Self { + // 1MB should be good enough to avoid frequent system calls, + // and also won't cause too much memory usage + let buffer_max_size = 1024 * 1024; + Self { + shuffle_block_writer, + writer, + buffer: vec![], + buffer_max_size, + } + } + + fn write( + &mut self, + batch: &RecordBatch, + encode_time: &Time, + write_time: &Time, + ) -> Result { + let mut cursor = Cursor::new(&mut self.buffer); + cursor.seek(SeekFrom::End(0))?; + let bytes_written = + self.shuffle_block_writer + .borrow() + .write_batch(batch, &mut cursor, encode_time)?; + let pos = cursor.position(); + if pos >= self.buffer_max_size as u64 { + let mut write_timer = write_time.timer(); + self.writer.write_all(&self.buffer)?; + write_timer.stop(); + self.buffer.clear(); + } + Ok(bytes_written) + } + + fn flush(&mut self, write_time: &Time) -> Result<()> { + let mut write_timer = write_time.timer(); + if !self.buffer.is_empty() { + self.writer.write_all(&self.buffer)?; + } + self.writer.flush()?; write_timer.stop(); - let mut timer = metrics.mempool_time.timer(); - self.reservation.free(); - timer.stop(); - Ok(output_batches.len()) + self.buffer.clear(); + Ok(()) } } @@ -885,7 +1167,7 @@ mod test { use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::common::collect; use datafusion::prelude::SessionContext; - use parquet::file::reader::Length; + use std::io::Cursor; use tokio::runtime::Runtime; #[test] @@ -920,31 +1202,10 @@ mod test { } #[test] - fn test_slot_size() { - let batch_size = 1usize; - // not inclusive of all supported types, but enough to test the function - let supported_primitive_types = [ - DataType::Int32, - DataType::Int64, - DataType::UInt32, - DataType::UInt64, - DataType::Float32, - DataType::Float64, - DataType::Boolean, - DataType::Utf8, - DataType::LargeUtf8, - DataType::Binary, - DataType::LargeBinary, - DataType::FixedSizeBinary(16), - ]; - let expected_slot_size = [4, 8, 4, 8, 4, 8, 1, 104, 108, 104, 108, 16]; - supported_primitive_types - .iter() - .zip(expected_slot_size.iter()) - .for_each(|(data_type, expected)| { - let slot_size = slot_size(batch_size, data_type); - assert_eq!(slot_size, *expected); - }) + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_single_partition_shuffle_writer() { + shuffle_write_test(1000, 100, 1, None); + shuffle_write_test(10000, 10, 1, None); } #[test] @@ -973,75 +1234,6 @@ mod test { shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); } - #[test] - fn partition_buffer_memory() { - let batch = create_batch(900); - let runtime_env = create_runtime(128 * 1024); - let reservation = MemoryConsumer::new("ShuffleRepartitioner[0]") - .with_can_spill(true) - .register(&runtime_env.memory_pool); - let mut buffer = PartitionBuffer::try_new( - batch.schema(), - 1024, - reservation, - CompressionCodec::Lz4Frame, - true, - ) - .unwrap(); - let metrics_set = ExecutionPlanMetricsSet::new(); - let metrics = ShuffleRepartitionerMetrics::new(&metrics_set, 0); - let indices: Vec = (0..batch.num_rows()).collect(); - - assert_eq!(0, buffer.reservation.size()); - assert!(buffer.spill_file.is_none()); - - // append first batch - should fit in memory - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(900, buffer.num_active_rows); - assert_eq!(106496, buffer.reservation.size()); - assert_eq!(0, buffer.frozen.len()); - assert!(buffer.spill_file.is_none()); - - // append second batch - should trigger flush to frozen bytes - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(0, buffer.num_active_rows); - assert_eq!(9914, buffer.frozen.len()); - assert_eq!(9914, buffer.reservation.size()); - assert!(buffer.spill_file.is_none()); - - // spill - buffer.spill(&runtime_env, &metrics).unwrap(); - assert_eq!(0, buffer.num_active_rows); - assert_eq!(0, buffer.frozen.len()); - assert_eq!(0, buffer.reservation.size()); - assert!(buffer.spill_file.is_some()); - assert_eq!(9914, buffer.spill_file.as_ref().unwrap().file.len()); - - // append after spill - let status = buffer - .append_rows(batch.columns(), &indices, 0, &metrics) - .unwrap(); - assert_eq!( - format!("{status:?}"), - format!("{:?}", AppendRowStatus::Appended) - ); - assert_eq!(900, buffer.num_active_rows); - assert_eq!(106496, buffer.reservation.size()); - assert_eq!(0, buffer.frozen.len()); - } - #[tokio::test] async fn shuffle_repartitioner_memory() { let batch = create_batch(900); @@ -1051,7 +1243,7 @@ mod test { let num_partitions = 2; let runtime_env = create_runtime(memory_limit); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut repartitioner = ShuffleRepartitioner::try_new( + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( 0, "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), @@ -1067,41 +1259,19 @@ mod test { repartitioner.insert_batch(batch.clone()).await.unwrap(); - assert_eq!(2, repartitioner.buffered_partitions.len()); + assert_eq!(2, repartitioner.partition_writers.len()); - assert!(repartitioner.buffered_partitions[0].spill_file.is_none()); - assert!(repartitioner.buffered_partitions[1].spill_file.is_none()); - - assert_eq!( - 106496, - repartitioner.buffered_partitions[0].reservation.size() - ); - assert_eq!( - 106496, - repartitioner.buffered_partitions[1].reservation.size() - ); + assert!(repartitioner.partition_writers[0].spill_file.is_none()); + assert!(repartitioner.partition_writers[1].spill_file.is_none()); repartitioner.spill().await.unwrap(); // after spill, there should be spill files - assert!(repartitioner.buffered_partitions[0].spill_file.is_some()); - assert!(repartitioner.buffered_partitions[1].spill_file.is_some()); - - // after spill, all reservations should be freed - assert_eq!(0, repartitioner.buffered_partitions[0].reservation.size()); - assert_eq!(0, repartitioner.buffered_partitions[1].reservation.size()); + assert!(repartitioner.partition_writers[0].spill_file.is_some()); + assert!(repartitioner.partition_writers[1].spill_file.is_some()); // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); - - assert_eq!( - 106496, - repartitioner.buffered_partitions[0].reservation.size() - ); - assert_eq!( - 106496, - repartitioner.buffered_partitions[1].reservation.size() - ); } fn create_runtime(memory_limit: usize) -> Arc {