Skip to content

Commit eeb9d58

Browse files
alambkorowa
andauthored
Minor: Improve documentation about OnceAsync (#13223)
* Minor: add documentation about OnceAsync * More refinement * Fix docs CI * Update datafusion/physical-plan/src/joins/hash_join.rs Co-authored-by: Eduard Karacharov <[email protected]> --------- Co-authored-by: Eduard Karacharov <[email protected]>
1 parent 0458d30 commit eeb9d58

File tree

4 files changed

+55
-21
lines changed

4 files changed

+55
-21
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
4646
use async_trait::async_trait;
4747
use futures::{ready, Stream, StreamExt, TryStreamExt};
4848

49-
/// Data of the left side
49+
/// Data of the left side that is buffered into memory
5050
#[derive(Debug)]
5151
struct JoinLeftData {
5252
/// Single RecordBatch with all rows from the left side
@@ -58,12 +58,20 @@ struct JoinLeftData {
5858
}
5959

6060
#[allow(rustdoc::private_intra_doc_links)]
61-
/// executes partitions in parallel and combines them into a set of
62-
/// partitions by combining all values from the left with all values on the right
61+
/// Cross Join Execution Plan
6362
///
64-
/// Note that the `Clone` trait is not implemented for this struct due to the
65-
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
66-
/// left side with the processing in each output stream.
63+
/// This operator is used when there are no predicates between two tables and
64+
/// returns the Cartesian product of the two tables.
65+
///
66+
/// Buffers the left input into memory and then streams batches from each
67+
/// partition on the right input combining them with the buffered left input
68+
/// to generate the output.
69+
///
70+
/// # Clone / Shared State
71+
///
72+
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
73+
/// loading of the left side with the processing in each output stream.
74+
/// Therefore it can not be [`Clone`]
6775
#[derive(Debug)]
6876
pub struct CrossJoinExec {
6977
/// left (build) side which gets loaded in memory
@@ -72,10 +80,16 @@ pub struct CrossJoinExec {
7280
pub right: Arc<dyn ExecutionPlan>,
7381
/// The schema once the join is applied
7482
schema: SchemaRef,
75-
/// Build-side data
83+
/// Buffered copy of left (build) side in memory.
84+
///
85+
/// This structure is *shared* across all output streams.
86+
///
87+
/// Each output stream waits on the `OnceAsync` to signal the completion of
88+
/// the left side loading.
7689
left_fut: OnceAsync<JoinLeftData>,
7790
/// Execution plan metrics
7891
metrics: ExecutionPlanMetricsSet,
92+
/// Properties such as schema, equivalence properties, ordering, partitioning, etc.
7993
cache: PlanProperties,
8094
}
8195

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,11 @@ impl JoinLeftData {
295295
/// └───────────────┘ └───────────────┘
296296
/// ```
297297
///
298-
/// Note that the `Clone` trait is not implemented for this struct due to the
299-
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
300-
/// left side with the processing in each output stream.
298+
/// # Clone / Shared State
299+
///
300+
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
301+
/// loading of the left side with the processing in each output stream.
302+
/// Therefore it can not be [`Clone`]
301303
#[derive(Debug)]
302304
pub struct HashJoinExec {
303305
/// left (build) side which gets hashed
@@ -314,6 +316,11 @@ pub struct HashJoinExec {
314316
/// if there is a projection, the schema isn't the same as the output schema.
315317
join_schema: SchemaRef,
316318
/// Future that consumes left input and builds the hash table
319+
///
320+
/// For CollectLeft partition mode, this structure is *shared* across all output streams.
321+
///
322+
/// Each output stream waits on the `OnceAsync` to signal the completion of
323+
/// the hash table creation.
317324
left_fut: OnceAsync<JoinLeftData>,
318325
/// Shared the `RandomState` for the hashing algorithm
319326
random_state: RandomState,

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Defines the nested loop join plan, it supports all [`JoinType`].
19-
//! The nested loop join can execute in parallel by partitions and it is
20-
//! determined by the [`JoinType`].
18+
//! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates).
2119
2220
use std::any::Any;
2321
use std::fmt::Formatter;
@@ -141,9 +139,11 @@ impl JoinLeftData {
141139
/// "reports" about probe phase completion (which means that "visited" bitmap won't be
142140
/// updated anymore), and only the last thread, reporting about completion, will return output.
143141
///
144-
/// Note that the `Clone` trait is not implemented for this struct due to the
145-
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
146-
/// left side with the processing in each output stream.
142+
/// # Clone / Shared State
143+
///
144+
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
145+
/// loading of the left side with the processing in each output stream.
146+
/// Therefore it can not be [`Clone`]
147147
#[derive(Debug)]
148148
pub struct NestedLoopJoinExec {
149149
/// left side
@@ -156,7 +156,12 @@ pub struct NestedLoopJoinExec {
156156
pub(crate) join_type: JoinType,
157157
/// The schema once the join is applied
158158
schema: SchemaRef,
159-
/// Build-side data
159+
/// Future that consumes left input and buffers it in memory
160+
///
161+
/// This structure is *shared* across all output streams.
162+
///
163+
/// Each output stream waits on the `OnceAsync` to signal the completion of
164+
/// the hash table creation.
160165
inner_table: OnceAsync<JoinLeftData>,
161166
/// Information of index and left / right placement of columns
162167
column_indices: Vec<ColumnIndex>,

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -700,11 +700,19 @@ pub fn build_join_schema(
700700
(fields.finish().with_metadata(metadata), column_indices)
701701
}
702702

703-
/// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls
704-
/// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation
703+
/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
704+
/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
705+
/// same computation.
705706
///
706-
/// This is useful for joins where the results of one child are buffered in memory
707-
/// and shared across potentially multiple output partitions
707+
/// This is useful for joins where the results of one child are needed to proceed
708+
/// with multiple output stream
709+
///
710+
///
711+
/// For example, in a hash join, one input is buffered and shared across
712+
/// potentially multiple output partitions. Each output partition must wait for
713+
/// the hash table to be built before proceeding.
714+
///
715+
/// Each output partition waits on the same `OnceAsync` before proceeding.
708716
pub(crate) struct OnceAsync<T> {
709717
fut: Mutex<Option<OnceFut<T>>>,
710718
}

0 commit comments

Comments
 (0)