diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index 7fc5e458b86b..f3eb7b77e03c 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -20,7 +20,9 @@ use datafusion_common::Result; use futures::Stream; use std::pin::Pin; -/// Trait for types that stream [arrow::record_batch::RecordBatch] +/// Trait for types that stream [RecordBatch] +/// +/// See [`SendableRecordBatchStream`] for more details. pub trait RecordBatchStream: Stream> { /// Returns the schema of this `RecordBatchStream`. /// @@ -29,5 +31,23 @@ pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; } -/// Trait for a [`Stream`] of [`RecordBatch`]es +/// Trait for a [`Stream`] of [`RecordBatch`]es that can be passed between threads +/// +/// This trait is used to retrieve the results of DataFusion execution plan nodes. +/// +/// The trait is a specialized Rust Async [`Stream`] that also knows the schema +/// of the data it will return (even if the stream has no data). Every +/// `RecordBatch` returned by the stream should have the same schema as returned +/// by [`schema`](`RecordBatchStream::schema`). +/// +/// # Error Handling +/// +/// Once a stream returns an error, it should not be polled again (the caller +/// should stop calling `next`) and handle the error. +/// +/// However, returning `Ready(None)` (end of stream) is likely the safest +/// behavior after an error. Like [`Stream`]s, `RecordBatchStream`s should not +/// be polled after end of stream or returning an error. However, also like +/// [`Stream`]s there is no mechanism to prevent callers polling so returning +/// `Ready(None)` is recommended. pub type SendableRecordBatchStream = Pin>; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 542861688dfe..b14021f4a99b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -228,6 +228,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`TryStreamExt`]: futures::stream::TryStreamExt /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter /// + /// # Error handling + /// + /// Any error that occurs during execution is sent as an `Err` in the output + /// stream. + /// + /// `ExecutionPlan` implementations in DataFusion cancel additional work + /// immediately once an error occurs. The rationale is that if the overall + /// query will return an error, any additional work such as continued + /// polling of inputs will be wasted as it will be thrown away. + /// /// # Cancellation / Aborting Execution /// /// The [`Stream`] that is returned must ensure that any allocated resources diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 093803e3c8b3..b814995c2ef5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -385,6 +385,11 @@ impl BatchPartitioner { /// `───────' `───────' ///``` /// +/// # Error Handling +/// +/// If any of the input partitions return an error, the error is propagated to +/// all output partitions and inputs are not polled again. +/// /// # Output Ordering /// /// If more than one stream is being repartitioned, the output will be some diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 875922ac34b5..e0644e3d99e5 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -39,6 +39,7 @@ use futures::Stream; /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] type CursorStream = Box>>; +/// Merges a stream of sorted cursors and record batches into a single sorted stream #[derive(Debug)] pub(crate) struct SortPreservingMergeStream { in_progress: BatchBuilder, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f83bb58d08dd..b00a11a5355f 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -65,6 +65,11 @@ use log::{debug, trace}; /// Input Streams Output stream /// (sorted) (sorted) /// ``` +/// +/// # Error Handling +/// +/// If any of the input partitions return an error, the error is propagated to +/// the output and inputs are not polled again. #[derive(Debug)] pub struct SortPreservingMergeExec { /// Input plan