@@ -68,8 +68,54 @@ use crate::{
68
68
RecordBatchStream , SendableRecordBatchStream , Statistics ,
69
69
} ;
70
70
71
- /// join execution plan executes partitions in parallel and combines them into a set of
72
- /// partitions.
71
+ /// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge
72
+ /// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large
73
+ /// inputs where one or both of the inputs don't fit in the available memory.
74
+ ///
75
+ /// # Join Expressions
76
+ ///
77
+ /// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented by [`Self::on`].
78
+ ///
79
+ /// Non-equality predicates, which can not be pushed down to join inputs (e.g.
80
+ /// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
81
+ /// after the equijoin predicates. They are represented by [`Self::filter`]. These are optional
82
+ /// expressions.
83
+ ///
84
+ /// # Sorting
85
+ ///
86
+ /// Assumes that both the left and right input to the join are pre-sorted. It is not the
87
+ /// responisibility of this execution plan to sort the inputs.
88
+ ///
89
+ /// # "Streamed" vs "Buffered"
90
+ ///
91
+ /// The number of record batches of streamed input currently present in the memory will depend
92
+ /// on the output batch size of the execution plan. There is no spilling support for streamed input.
93
+ /// The comparisons are performed from values of join keys in streamed input with the values of
94
+ /// join keys in buffered input. One row in streamed record batch could be matched with multiple rows in
95
+ /// buffered input batches. The streamed input is managed through the states in `StreamedState`
96
+ /// and streamed input batches are represented by `StreamedBatch`.
97
+ ///
98
+ /// Buffered input is buffered for all record batches having the same value of join key.
99
+ /// If the memory limit increases beyond the specified value and spilling is enabled,
100
+ /// buffered batches could be spilled to disk. If spilling is disabled, the execution
101
+ /// will fail under the same conditions. Multiple record batches of buffered could currently reside
102
+ /// in memory/disk during the exectution. The number of buffered batches residing in
103
+ /// memory/disk depends on the number of rows of buffered input having the same value
104
+ /// of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs,
105
+ /// the algorithm understands when it is not needed anymore, and releases the buffered batches
106
+ /// from memory/disk. The buffered input is managed through the states in `BufferedState`
107
+ /// and buffered input batches are represented by `BufferedBatch`.
108
+ ///
109
+ /// Depending on the type of join, left or right input may be selected as streamed or buffered
110
+ /// respectively. For example, in a left-outer join, the left execution plan will be selected as
111
+ /// streamed input while in a right-outer join, the right execution plan will be selected as the
112
+ /// streamed input.
113
+ ///
114
+ /// Reference for the algorithm:
115
+ /// <https://en.wikipedia.org/wiki/Sort-merge_join>.
116
+ ///
117
+ /// Helpful short video demonstration:
118
+ /// <https://www.youtube.com/watch?v=jiWCPJtDE2c>.
73
119
#[ derive( Debug , Clone ) ]
74
120
pub struct SortMergeJoinExec {
75
121
/// Left sorted joining execution plan
@@ -529,6 +575,9 @@ struct StreamedJoinedChunk {
529
575
buffered_indices : UInt64Builder ,
530
576
}
531
577
578
+ /// Represents a record batch from streamed input.
579
+ ///
580
+ /// Also stores information of matching rows from buffered batches.
532
581
struct StreamedBatch {
533
582
/// The streamed record batch
534
583
pub batch : RecordBatch ,
@@ -667,8 +716,8 @@ impl BufferedBatch {
667
716
}
668
717
}
669
718
670
- /// Sort-merge join stream that consumes streamed and buffered data stream
671
- /// and produces joined output
719
+ /// Sort-Merge join stream that consumes streamed and buffered data streams
720
+ /// and produces joined output stream.
672
721
struct SortMergeJoinStream {
673
722
/// Current state of the stream
674
723
pub state : SortMergeJoinState ,
0 commit comments