Skip to content

feat: add macros for DataFusionError variants #15946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
Expand All @@ -38,7 +38,7 @@ use datafusion_cli::{
};

use clap::Parser;
use datafusion::common::config_err;
use datafusion::common::{config_err, external_datafusion_err};
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::DiskManagerConfig;
use mimalloc::MiMalloc;
Expand Down Expand Up @@ -233,7 +233,7 @@ async fn main_inner() -> Result<()> {
// TODO maybe we can have thiserror for cli but for now let's keep it simple
return exec::exec_from_repl(&ctx, &mut print_options)
.await
.map_err(|e| DataFusionError::External(Box::new(e)));
.map_err(|e| external_datafusion_err!(e));
}

if !files.is_empty() {
Expand Down Expand Up @@ -381,6 +381,7 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
mod tests {
use super::*;
use datafusion::common::test_util::batches_to_string;
use datafusion::common::DataFusionError;
use insta::assert_snapshot;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down
6 changes: 2 additions & 4 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::print_format::PrintFormat;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::external_err;
use datafusion::common::instant::Instant;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;

Expand Down Expand Up @@ -143,9 +143,7 @@ impl PrintOptions {
format_options: &FormatOptions,
) -> Result<()> {
if self.format == PrintFormat::Table {
return Err(DataFusionError::External(
"PrintFormat::Table is not implemented".to_string().into(),
));
return external_err!("PrintFormat::Table is not implemented");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have noticed the lack of this macro recently too -- thank you for adding it 🙏

};

let stdout = std::io::stdout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use datafusion::{
common::external_datafusion_err,
error::{DataFusionError, Result},
prelude::SessionContext,
};
Expand All @@ -32,12 +33,12 @@ async fn main() -> Result<()> {
// so you will need to change the approach here based on your use case.
let target: &std::path::Path = "../../../../target/".as_ref();
let library_path = compute_library_path::<TableProviderModuleRef>(target)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| external_datafusion_err!(e))?;

// Load the module
let table_provider_module =
TableProviderModuleRef::load_from_directory(&library_path)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| external_datafusion_err!(e))?;

// By calling the code below, the table provided will be created within
// the module's code.
Expand Down
9 changes: 5 additions & 4 deletions datafusion-examples/examples/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
use datafusion::arrow::array::{UInt64Array, UInt8Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{assert_batches_eq, exec_datafusion_err};
use datafusion::common::{
assert_batches_eq, exec_datafusion_err, external_datafusion_err,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::MemTable;
use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion::prelude::*;
use object_store::local::LocalFileSystem;
use std::path::Path;
Expand Down Expand Up @@ -168,8 +170,7 @@ async fn query_parquet() -> Result<()> {

let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let u = url::Url::parse("file://./").map_err(|e| external_datafusion_err!(e))?;
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
Expand Down
7 changes: 5 additions & 2 deletions datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use std::sync::Arc;
use crate::{Session, TableProvider, TableProviderFactory};
use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
use arrow::datatypes::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common::{
config_err, execution_join_datafusion_err, plan_err, Constraints, DataFusionError,
Result,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -440,6 +443,6 @@ impl DataSink for StreamWrite {
write_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)?
.map_err(|e| execution_join_datafusion_err!(e))?
}
}
4 changes: 2 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
use crate::{external_datafusion_err, DataFusionError, Result};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
Expand Down Expand Up @@ -1186,7 +1186,7 @@ where
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
Box::new(external_datafusion_err!(e)),
)
})
}
Expand Down
73 changes: 67 additions & 6 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ pub enum DataFusionError {
/// [`JoinError`] during execution of the query.
///
/// This error can't occur for unjoined tasks, such as execution shutdown.
ExecutionJoin(JoinError),
///
/// 2nd argument is for optional backtrace
ExecutionJoin(JoinError, Option<String>),
/// Error when resources (such as memory of scratch disk space) are exhausted.
///
/// This error is thrown when a consumer cannot acquire additional memory
Expand Down Expand Up @@ -280,6 +282,12 @@ impl From<ArrowError> for DataFusionError {
}
}

impl From<JoinError> for DataFusionError {
fn from(e: JoinError) -> Self {
DataFusionError::ExecutionJoin(e, None)
}
}

impl From<DataFusionError> for ArrowError {
fn from(e: DataFusionError) -> Self {
match e {
Expand Down Expand Up @@ -376,7 +384,7 @@ impl Error for DataFusionError {
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ExecutionJoin(e) => Some(e),
DataFusionError::ExecutionJoin(e, _) => Some(e),
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
DataFusionError::Context(_, e) => Some(e.as_ref()),
Expand Down Expand Up @@ -508,7 +516,7 @@ impl DataFusionError {
}
DataFusionError::SchemaError(_, _) => "Schema error: ",
DataFusionError::Execution(_) => "Execution error: ",
DataFusionError::ExecutionJoin(_) => "ExecutionJoin error: ",
DataFusionError::ExecutionJoin(_, _) => "ExecutionJoin error: ",
DataFusionError::ResourcesExhausted(_) => {
"Resources exhausted: "
}
Expand Down Expand Up @@ -552,7 +560,10 @@ impl DataFusionError {
Cow::Owned(format!("{desc}{backtrace}"))
}
DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::ExecutionJoin(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::ExecutionJoin(ref desc, ref backtrace) => {
let backtrace = backtrace.clone().unwrap_or_else(|| "".to_owned());
Cow::Owned(format!("{desc}{backtrace}"))
}
DataFusionError::ResourcesExhausted(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::External(ref desc) => Cow::Owned(desc.to_string()),
#[cfg(feature = "object_store")]
Expand Down Expand Up @@ -900,8 +911,58 @@ macro_rules! schema_err {
let err = err.with_diagnostic($DIAG);
)?
Err(err)
}
};
}};
}

// Exposes a macro to create `DataFusionError::External` with optional backtrace
#[macro_export]
macro_rules! external_datafusion_err {
($CONVERTIBLE_TO_ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
let err = $crate::error::DataFusionError::External($crate::error::GenericError::from($CONVERTIBLE_TO_ERR)).context($crate::error::DataFusionError::get_back_trace());
$(
let err = err.with_diagnostic($DIAG);
)?
err
}};
}

// Exposes a macro to create `Err(DataFusionError::External)` with optional backtrace
#[macro_export]
macro_rules! external_err {
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
let err = $crate::external_datafusion_err!($ERR);
$(
let err = err.with_diagnostic($DIAG);
)?
Err(err)
}};
}

// Exposes a macro to create `DataFusionError::ExecutionJoin` with optional backtrace
#[macro_export]
macro_rules! execution_join_datafusion_err {
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
let err = $crate::error::DataFusionError::ExecutionJoin(
$ERR,
Some($crate::error::DataFusionError::get_back_trace())
);
$(
let err = err.with_diagnostic($DIAG);
)?
err
}};
}

// Exposes a macro to create `Err(DataFusionError::ExecutionJoin)` with optional backtrace
#[macro_export]
macro_rules! execution_join_err {
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
let err = $crate::execution_join_datafusion_err!($ERR);
$(
let err = err.with_diagnostic($DIAG);
)?
Err(err)
}};
}

// To avoid compiler error when using macro in the same crate:
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ use arrow::ipc::{root_as_message, CompressionType};
use datafusion_catalog::Session;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
execution_join_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics,
DEFAULT_ARROW_EXTENSION,
};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
Expand Down Expand Up @@ -294,7 +295,7 @@ impl FileSink for ArrowFileSink {
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
.map_err(|e| execution_join_datafusion_err!(e))??;
Ok(row_count as u64)
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data};
use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_common::external_datafusion_err;
use datafusion_datasource::source::DataSourceExec;

#[cfg(feature = "compression")]
Expand Down Expand Up @@ -135,7 +135,7 @@ pub fn partitioned_file_groups(
#[cfg(feature = "compression")]
FileCompressionType::ZSTD => {
let encoder = ZstdEncoder::new(file, 0)
.map_err(|e| DataFusionError::External(Box::new(e)))?
.map_err(|e| external_datafusion_err!(e))?
.auto_finish();
Box::new(encoder)
}
Expand Down
13 changes: 7 additions & 6 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION,
execution_join_datafusion_err, internal_datafusion_err, internal_err, not_impl_err,
ColumnStatistics, DataFusionError, GetExt, HashSet, Result,
DEFAULT_PARQUET_EXTENSION,
};
use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
Expand Down Expand Up @@ -1359,7 +1360,7 @@ impl FileSink for ParquetSink {
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
.map_err(|e| execution_join_datafusion_err!(e))??;

Ok(row_count as u64)
}
Expand Down Expand Up @@ -1487,7 +1488,7 @@ fn spawn_rg_join_and_finalize_task(
let (writer, _col_reservation) = task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
.map_err(|e| execution_join_datafusion_err!(e))??;
let encoded_size = writer.get_estimated_total_bytes();
rg_reservation.grow(encoded_size);
finalized_rg.push(writer.close()?);
Expand Down Expand Up @@ -1624,7 +1625,7 @@ async fn concatenate_parallel_row_groups(
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, mut rg_reservation, _cnt) =
result.map_err(DataFusionError::ExecutionJoin)??;
result.map_err(|e| execution_join_datafusion_err!(e))??;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
rg_reservation.free();
Expand Down Expand Up @@ -1691,7 +1692,7 @@ async fn output_single_parquet_file_parallelized(
launch_serialization_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
.map_err(|e| execution_join_datafusion_err!(e))??;
Ok(file_metadata)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file_compression_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::str::FromStr;

use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::{external_datafusion_err, DataFusionError, Result};

use datafusion_common::parsers::CompressionTypeVariant::{self, *};
use datafusion_common::GetExt;
Expand Down Expand Up @@ -231,7 +231,7 @@ impl FileCompressionType {
#[cfg(feature = "compression")]
ZSTD => match ZstdDecoder::new(r) {
Ok(decoder) => Box::new(decoder),
Err(e) => return Err(DataFusionError::External(Box::new(e))),
Err(e) => return Err(external_datafusion_err!(e)),
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
Expand Down
11 changes: 4 additions & 7 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use datafusion_common::{DataFusionError, Result};
use datafusion_common::{external_datafusion_err, DataFusionError, Result};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;

Expand Down Expand Up @@ -112,7 +112,7 @@ impl ListingTableUrl {
Ok(url) => Self::try_new(url, None),
#[cfg(not(target_arch = "wasm32"))]
Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
Err(e) => Err(DataFusionError::External(Box::new(e))),
Err(e) => Err(external_datafusion_err!(e)),
}
}

Expand All @@ -121,17 +121,14 @@ impl ListingTableUrl {
fn parse_path(s: &str) -> Result<Self> {
let (path, glob) = match split_glob_expression(s) {
Some((prefix, glob)) => {
let glob = Pattern::new(glob)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let glob = Pattern::new(glob).map_err(|e| external_datafusion_err!(e))?;
(prefix, Some(glob))
}
None => (s, None),
};

let url = url_from_filesystem_path(path).ok_or_else(|| {
DataFusionError::External(
format!("Failed to convert path to URL: {path}").into(),
)
external_datafusion_err!(format!("Failed to convert path to URL: {path}"))
})?;

Self::try_new(url, glob)
Expand Down
Loading
Loading