From 4b7381f8d1fa712fc2bc5ce325eefb6c101aa6f0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 27 Sep 2024 08:06:33 -0400 Subject: [PATCH 1/4] Minor: document execution handling on shutdown better --- datafusion/physical-plan/src/execution_plan.rs | 10 ++++++++++ datafusion/physical-plan/src/repartition/mod.rs | 5 +++++ datafusion/physical-plan/src/sorts/merge.rs | 1 + .../physical-plan/src/sorts/sort_preserving_merge.rs | 5 +++++ 4 files changed, 21 insertions(+) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 542861688dfe..b14021f4a99b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -228,6 +228,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`TryStreamExt`]: futures::stream::TryStreamExt /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter /// + /// # Error handling + /// + /// Any error that occurs during execution is sent as an `Err` in the output + /// stream. + /// + /// `ExecutionPlan` implementations in DataFusion cancel additional work + /// immediately once an error occurs. The rationale is that if the overall + /// query will return an error, any additional work such as continued + /// polling of inputs will be wasted as it will be thrown away. + /// /// # Cancellation / Aborting Execution /// /// The [`Stream`] that is returned must ensure that any allocated resources diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 093803e3c8b3..b814995c2ef5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -385,6 +385,11 @@ impl BatchPartitioner { /// `───────' `───────' ///``` /// +/// # Error Handling +/// +/// If any of the input partitions return an error, the error is propagated to +/// all output partitions and inputs are not polled again. +/// /// # Output Ordering /// /// If more than one stream is being repartitioned, the output will be some diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 875922ac34b5..e0644e3d99e5 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -39,6 +39,7 @@ use futures::Stream; /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] type CursorStream = Box>>; +/// Merges a stream of sorted cursors and record batches into a single sorted stream #[derive(Debug)] pub(crate) struct SortPreservingMergeStream { in_progress: BatchBuilder, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f83bb58d08dd..b00a11a5355f 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -65,6 +65,11 @@ use log::{debug, trace}; /// Input Streams Output stream /// (sorted) (sorted) /// ``` +/// +/// # Error Handling +/// +/// If any of the input partitions return an error, the error is propagated to +/// the output and inputs are not polled again. #[derive(Debug)] pub struct SortPreservingMergeExec { /// Input plan From 4b7c484960c9433303eb4a5e45e7fc641f9421d2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 27 Sep 2024 08:17:34 -0400 Subject: [PATCH 2/4] Improve RecordBatchStream docs --- datafusion/execution/src/stream.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index 7fc5e458b86b..7fea9da4bb09 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -20,7 +20,9 @@ use datafusion_common::Result; use futures::Stream; use std::pin::Pin; -/// Trait for types that stream [arrow::record_batch::RecordBatch] +/// Trait for types that stream [RecordBatch] +/// +/// See [`SendableRecordBatchStream`] for more details. pub trait RecordBatchStream: Stream> { /// Returns the schema of this `RecordBatchStream`. /// @@ -29,5 +31,23 @@ pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; } -/// Trait for a [`Stream`] of [`RecordBatch`]es +/// Trait for a [`Stream`] of [`RecordBatch`]es that can be passed between threads +/// +/// This trait is used to retrieve the results of DataFusion execution plans. +/// +/// The trait is a specialized Rust Async [`Stream`] that also knows the schema +/// of the data it will return (even if the stream has no data). Every +/// `RecordBatch` returned by the stream should have the same schema as returned +/// by [`schema`](`RecordBatchStream::schema`). +/// +/// # Error Handling +/// +/// One a stream returns an error, it should not be polled again (the caller +/// should stop calling `next`) and handle the error. +/// +/// However, returning `Ready(None)` (end of stream) is likely the safest +/// behavior after an error. Like [`Stream`]s, `RecordBatchStream`s should not +/// be polled after end of stream or returning an error. However, also like +/// [`Stream`]s there is no mechanism to prevent callers polling so returning +/// `Ready(None)` is recommended. pub type SendableRecordBatchStream = Pin>; From 7f113fc9cdf3d83be578b050f13ad794d79022f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 27 Sep 2024 19:27:39 -0400 Subject: [PATCH 3/4] Update datafusion/execution/src/stream.rs Co-authored-by: Andy Grove --- datafusion/execution/src/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index 7fea9da4bb09..d8b00a4f1f50 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -42,7 +42,7 @@ pub trait RecordBatchStream: Stream> { /// /// # Error Handling /// -/// One a stream returns an error, it should not be polled again (the caller +/// Once a stream returns an error, it should not be polled again (the caller /// should stop calling `next`) and handle the error. /// /// However, returning `Ready(None)` (end of stream) is likely the safest From a04b8a870aa3fb64e6ada474478f4e32e7d97e0a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Sep 2024 11:48:27 -0400 Subject: [PATCH 4/4] Update datafusion/execution/src/stream.rs Co-authored-by: Oleks V --- datafusion/execution/src/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index d8b00a4f1f50..f3eb7b77e03c 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -33,7 +33,7 @@ pub trait RecordBatchStream: Stream> { /// Trait for a [`Stream`] of [`RecordBatch`]es that can be passed between threads /// -/// This trait is used to retrieve the results of DataFusion execution plans. +/// This trait is used to retrieve the results of DataFusion execution plan nodes. /// /// The trait is a specialized Rust Async [`Stream`] that also knows the schema /// of the data it will return (even if the stream has no data). Every