Skip to content

Commit ed95de9

Browse files
committed
refine
1 parent 11d7789 commit ed95de9

File tree

9 files changed

+56
-50
lines changed

9 files changed

+56
-50
lines changed
461 Bytes
Binary file not shown.
461 Bytes
Binary file not shown.
461 Bytes
Binary file not shown.
461 Bytes
Binary file not shown.

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -949,14 +949,14 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
949949
let should_preserve_ordering = input.plan.output_ordering().is_some();
950950

951951
let new_plan = if should_preserve_ordering {
952-
SortPreservingMergeExec::new(
952+
Arc::new(SortPreservingMergeExec::new(
953953
input
954954
.plan
955955
.output_ordering()
956956
.unwrap_or(&LexOrdering::default())
957957
.clone(),
958958
Arc::clone(&input.plan),
959-
) as _
959+
)) as _
960960
} else {
961961
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
962962
};

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ pub fn parallelize_sorts(
394394
SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
395395
Ok(Transformed::yes(
396396
PlanWithCorrespondingCoalescePartitions::new(
397-
spm.with_fetch(fetch).unwrap(),
397+
Arc::new(spm.with_fetch(fetch)),
398398
false,
399399
vec![requirements],
400400
),
@@ -509,9 +509,10 @@ fn analyze_immediate_sort_removal(
509509
// Replace the sort with a sort-preserving merge:
510510
let expr = LexOrdering::new(sort_exec.expr().to_vec());
511511

512-
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
513-
.with_fetch(sort_exec.fetch())
514-
.unwrap()
512+
Arc::new(
513+
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
514+
.with_fetch(sort_exec.fetch()),
515+
) as _
515516
} else {
516517
// Remove the sort:
517518
node.children = node.children.swap_remove(0).children;
@@ -737,9 +738,10 @@ fn remove_corresponding_sort_from_sub_plan(
737738
let plan = Arc::clone(&node.plan);
738739
let fetch = plan.fetch();
739740
let plan = if let Some(ordering) = plan.output_ordering() {
740-
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
741-
.with_fetch(fetch)
742-
.unwrap()
741+
Arc::new(
742+
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
743+
.with_fetch(fetch),
744+
) as _
743745
} else {
744746
Arc::new(CoalescePartitionsExec::new(plan)) as _
745747
};

datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ fn plan_with_order_preserving_variants(
140140
if let Some(ordering) = child.output_ordering() {
141141
// When the input of a `CoalescePartitionsExec` has an ordering,
142142
// replace it with a `SortPreservingMergeExec` if appropriate:
143-
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child))
144-
.with_fetch(fetch)
145-
.unwrap();
143+
let spm = Arc::new(
144+
SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child))
145+
.with_fetch(fetch),
146+
);
146147
sort_input.plan = spm;
147148
sort_input.children[0].data = true;
148149
return Ok(sort_input);

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use crate::{
2626
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
2727
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
2828
};
29-
use itertools::Itertools;
3029
use std::any::Any;
3130
use std::sync::Arc;
3231

@@ -38,8 +37,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
3837

3938
use crate::sorts::progressive_eval::ProgressiveEvalExec;
4039
use crate::statistics::MinMaxStatistics;
41-
use crate::stream::RecordBatchStreamAdapter;
42-
use futures::StreamExt;
4340
use log::{debug, trace};
4441

4542
/// Sort preserving merge execution plan
@@ -102,14 +99,13 @@ pub struct SortPreservingMergeExec {
10299
///
103100
/// See [`Self::with_round_robin_repartition`] for more information.
104101
enable_round_robin_repartition: bool,
102+
///
103+
progressive_eval_exec: Option<Arc<ProgressiveEvalExec>>,
105104
}
106105

107106
impl SortPreservingMergeExec {
108107
/// Create a new sort execution plan
109-
pub fn new(
110-
expr: LexOrdering,
111-
input: Arc<dyn ExecutionPlan>,
112-
) -> Arc<dyn ExecutionPlan> {
108+
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
113109
let cache = Self::compute_properties(&input, expr.clone());
114110

115111
// Todo: check if partition statistic is accurate
@@ -136,24 +132,23 @@ impl SortPreservingMergeExec {
136132
.filter(|groups| {
137133
groups.len() < input.properties().partitioning.partition_count()
138134
});
135+
let mut progressive_eval_exec = None;
139136
if let Some(partition_groups) = partition_groups {
140-
// Return ProgressiveEvalExec when partition_groups exists
141-
Arc::new(ProgressiveEvalExec::new(
142-
input,
137+
progressive_eval_exec = Some(Arc::new(ProgressiveEvalExec::new(
138+
input.clone(),
143139
None,
144140
None,
145141
partition_groups,
146-
))
147-
} else {
148-
// Return SortPreservingMergeExec otherwise
149-
Arc::new(Self {
150-
input,
151-
expr,
152-
metrics: ExecutionPlanMetricsSet::new(),
153-
fetch: None,
154-
cache,
155-
enable_round_robin_repartition: true,
156-
})
142+
)));
143+
}
144+
Self {
145+
input,
146+
expr,
147+
metrics: ExecutionPlanMetricsSet::new(),
148+
fetch: None,
149+
cache,
150+
enable_round_robin_repartition: true,
151+
progressive_eval_exec,
157152
}
158153
}
159154

@@ -221,11 +216,14 @@ impl DisplayAs for SortPreservingMergeExec {
221216
) -> std::fmt::Result {
222217
match t {
223218
DisplayFormatType::Default | DisplayFormatType::Verbose => {
224-
write!(f, "SortPreservingMergeExec: [{}]", self.expr)?;
225-
if let Some(fetch) = self.fetch {
226-
write!(f, ", fetch={fetch}")?;
227-
};
228-
219+
if let Some(progressive_eval) = &self.progressive_eval_exec {
220+
progressive_eval.fmt_as(t, f)?;
221+
} else {
222+
write!(f, "SortPreservingMergeExec: [{}]", self.expr)?;
223+
if let Some(fetch) = self.fetch {
224+
write!(f, ", fetch={fetch}")?;
225+
};
226+
}
229227
Ok(())
230228
}
231229
DisplayFormatType::TreeRender => {
@@ -272,6 +270,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
272270
fetch: limit,
273271
cache: self.cache.clone(),
274272
enable_round_robin_repartition: true,
273+
progressive_eval_exec: self.progressive_eval_exec.clone(),
275274
}))
276275
}
277276

@@ -299,18 +298,20 @@ impl ExecutionPlan for SortPreservingMergeExec {
299298
self: Arc<Self>,
300299
children: Vec<Arc<dyn ExecutionPlan>>,
301300
) -> Result<Arc<dyn ExecutionPlan>> {
302-
Ok(
301+
Ok(Arc::new(
303302
SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0]))
304-
.with_fetch(self.fetch)
305-
.unwrap(),
306-
)
303+
.with_fetch(self.fetch),
304+
))
307305
}
308306

309307
fn execute(
310308
&self,
311309
partition: usize,
312310
context: Arc<TaskContext>,
313311
) -> Result<SendableRecordBatchStream> {
312+
if let Some(progressive_eval) = &self.progressive_eval_exec {
313+
return progressive_eval.execute(partition, context);
314+
}
314315
trace!(
315316
"Start SortPreservingMergeExec::execute for partition: {}",
316317
partition
@@ -418,11 +419,13 @@ impl ExecutionPlan for SortPreservingMergeExec {
418419
});
419420
}
420421

421-
Ok(SortPreservingMergeExec::new(
422-
updated_exprs,
423-
make_with_child(projection, self.input())?,
424-
)
425-
.with_fetch(self.fetch()))
422+
Ok(Some(Arc::new(
423+
SortPreservingMergeExec::new(
424+
updated_exprs,
425+
make_with_child(projection, self.input())?,
426+
)
427+
.with_fetch(self.fetch()),
428+
)))
426429
}
427430
}
428431

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -966,9 +966,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
966966
} else {
967967
Some(sort.fetch as usize)
968968
};
969-
Ok(SortPreservingMergeExec::new(exprs, input)
970-
.with_fetch(fetch)
971-
.unwrap())
969+
Ok(Arc::new(
970+
SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
971+
))
972972
}
973973
PhysicalPlanType::Extension(extension) => {
974974
let inputs: Vec<Arc<dyn ExecutionPlan>> = extension

0 commit comments

Comments
 (0)