15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
- //! Defines the sort preserving merge plan
18
+ //! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream.
19
19
20
20
use std:: any:: Any ;
21
21
use std:: sync:: Arc ;
@@ -38,10 +38,22 @@ use log::{debug, trace};
38
38
39
39
/// Sort preserving merge execution plan
40
40
///
41
- /// This takes an input execution plan and a list of sort expressions, and
42
- /// provided each partition of the input plan is sorted with respect to
43
- /// these sort expressions, this operator will yield a single partition
44
- /// that is also sorted with respect to them
41
+ /// # Overview
42
+ ///
43
+ /// This operator implements a K-way merge. It is used to merge multiple sorted
44
+ /// streams into a single sorted stream and is highly optimized.
45
+ ///
46
+ /// ## Inputs:
47
+ ///
48
+ /// 1. A list of sort expressions
49
+ /// 2. An input plan, where each partition is sorted with respect to
50
+ /// these sort expressions.
51
+ ///
52
+ /// ## Output:
53
+ ///
54
+ /// 1. A single partition that is also sorted with respect to the expressions
55
+ ///
56
+ /// ## Diagram
45
57
///
46
58
/// ```text
47
59
/// ┌─────────────────────────┐
@@ -55,12 +67,12 @@ use log::{debug, trace};
55
67
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
56
68
/// │ ╔═══╦═══╗ │ │
57
69
/// │ ║ B ║ E ║ ... │──┘ │
58
- /// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
59
- /// └─────────────────────────┘ places equal rows from stream 1
70
+ /// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`:
71
+ /// └─────────────────────────┘ the merged stream places equal rows from stream 1
60
72
/// Stream 2
61
73
///
62
74
///
63
- /// Input Streams Output stream
75
+ /// Input Partitions Output Partition
64
76
/// (sorted) (sorted)
65
77
/// ```
66
78
///
@@ -70,7 +82,7 @@ use log::{debug, trace};
70
82
/// the output and inputs are not polled again.
71
83
#[ derive( Debug , Clone ) ]
72
84
pub struct SortPreservingMergeExec {
73
- /// Input plan
85
+ /// Input plan with sorted partitions
74
86
input : Arc < dyn ExecutionPlan > ,
75
87
/// Sort expressions
76
88
expr : LexOrdering ,
@@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
80
92
fetch : Option < usize > ,
81
93
/// Cache holding plan properties like equivalences, output partitioning etc.
82
94
cache : PlanProperties ,
83
- /// Configuration parameter to enable round-robin selection of tied winners of loser tree.
95
+ /// Use round-robin selection of tied winners of loser tree
96
+ ///
97
+ /// See [`Self::with_round_robin_repartition`] for more information.
84
98
enable_round_robin_repartition : bool ,
85
99
}
86
100
@@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
105
119
}
106
120
107
121
/// Sets the selection strategy of tied winners of the loser tree algorithm
122
+ ///
123
+ /// If true (the default) equal output rows are placed in the merged stream
124
+ /// in round robin fashion. This approach consumes input streams at more
125
+ /// even rates when there are many rows with the same sort key.
126
+ ///
127
+ /// If false, equal output rows are always placed in the merged stream in
128
+ /// the order of the inputs, resulting in potentially slower execution but a
129
+ /// stable output order.
108
130
pub fn with_round_robin_repartition (
109
131
mut self ,
110
132
enable_round_robin_repartition : bool ,
@@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
128
150
self . fetch
129
151
}
130
152
131
- /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
153
+ /// Creates the cache object that stores the plan properties
154
+ /// such as schema, equivalence properties, ordering, partitioning, etc.
132
155
fn compute_properties (
133
156
input : & Arc < dyn ExecutionPlan > ,
134
157
ordering : LexOrdering ,
0 commit comments