From b1d9e8f9e09780a86b50441557dc18d2a3944c09 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Sun, 30 Mar 2025 16:41:34 +0800 Subject: [PATCH] Revert "Remove CoalescePartitions insertion from HashJoinExec (#15476)" This reverts commit 7e0738a979054e95d1935d6e3e58b27385679031. --- .../physical-plan/src/joins/hash_join.rs | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d6c0a682c271..c2a313edd156 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -40,6 +40,7 @@ use crate::projection::{ use crate::spill::get_record_batch_memory_size; use crate::ExecutionPlanProperties; use crate::{ + coalesce_partitions::CoalescePartitionsExec, common::can_project, handle_state, hash_utils::create_hashes, @@ -791,44 +792,34 @@ impl ExecutionPlan for HashJoinExec { ); } - if self.mode == PartitionMode::CollectLeft && left_partitions != 1 { - return internal_err!( - "Invalid HashJoinExec,the output partition count of the left child must be 1 in CollectLeft mode,\ - consider using CoalescePartitionsExec" - ); - } - let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { - PartitionMode::CollectLeft => { - let left_stream = self.left.execute(0, Arc::clone(&context))?; - - self.left_fut.once(|| { - let reservation = MemoryConsumer::new("HashJoinInput") - .register(context.memory_pool()); - - collect_left_input( - self.random_state.clone(), - left_stream, - on_left.clone(), - join_metrics.clone(), - reservation, - need_produce_result_in_final(self.join_type), - self.right().output_partitioning().partition_count(), - ) - }) - } + PartitionMode::CollectLeft => self.left_fut.once(|| { + let reservation = + MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); + collect_left_input( + None, + self.random_state.clone(), + Arc::clone(&self.left), + on_left.clone(), + Arc::clone(&context), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + self.right().output_partitioning().partition_count(), + ) + }), PartitionMode::Partitioned => { - let left_stream = self.left.execute(partition, Arc::clone(&context))?; - let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); OnceFut::new(collect_left_input( + Some(partition), self.random_state.clone(), - left_stream, + Arc::clone(&self.left), on_left.clone(), + Arc::clone(&context), join_metrics.clone(), reservation, need_produce_result_in_final(self.join_type), @@ -939,22 +930,36 @@ impl ExecutionPlan for HashJoinExec { /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) +#[allow(clippy::too_many_arguments)] async fn collect_left_input( + partition: Option, random_state: RandomState, - left_stream: SendableRecordBatchStream, + left: Arc, on_left: Vec, + context: Arc, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, ) -> Result { - let schema = left_stream.schema(); + let schema = left.schema(); + + let (left_input, left_input_partition) = if let Some(partition) = partition { + (left, partition) + } else if left.output_partitioning().partition_count() != 1 { + (Arc::new(CoalescePartitionsExec::new(left)) as _, 0) + } else { + (left, 0) + }; + + // Depending on partition argument load single partition or whole left side in memory + let stream = left_input.execute(left_input_partition, Arc::clone(&context))?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = left_stream + let (batches, num_rows, metrics, mut reservation) = stream .try_fold(initial, |mut acc, batch| async { let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch @@ -1650,7 +1655,6 @@ impl EmbeddedProjection for HashJoinExec { #[cfg(test)] mod tests { use super::*; - use crate::coalesce_partitions::CoalescePartitionsExec; use crate::test::TestMemoryExec; use crate::{ common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, @@ -2101,7 +2105,6 @@ mod tests { let left = TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None) .unwrap(); - let left = Arc::new(CoalescePartitionsExec::new(left)); let right = build_table( ("a1", &vec![1, 2, 3]), @@ -2174,7 +2177,6 @@ mod tests { let left = TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None) .unwrap(); - let left = Arc::new(CoalescePartitionsExec::new(left)); let right = build_table( ("a2", &vec![20, 30, 10]), ("b2", &vec![5, 6, 4]),