Skip to content

Commit dee95c2

Browse files
Cheng-Yuan-LaiIan Lai
Cheng-Yuan-Lai
authored and
Ian Lai
committed
feat: refactor error handling to use new external error macros and improve error messages
1 parent 63eff0b commit dee95c2

File tree

13 files changed

+44
-33
lines changed

13 files changed

+44
-33
lines changed

datafusion-cli/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::path::Path;
2121
use std::process::ExitCode;
2222
use std::sync::{Arc, LazyLock};
2323

24-
use datafusion::error::{DataFusionError, Result};
24+
use datafusion::error::Result;
2525
use datafusion::execution::context::SessionConfig;
2626
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
2727
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
@@ -381,6 +381,7 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
381381
mod tests {
382382
use super::*;
383383
use datafusion::common::test_util::batches_to_string;
384+
use datafusion::common::DataFusionError;
384385
use insta::assert_snapshot;
385386

386387
fn assert_conversion(input: &str, expected: Result<usize, String>) {

datafusion-cli/src/print_options.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use crate::print_format::PrintFormat;
2424

2525
use arrow::datatypes::SchemaRef;
2626
use arrow::record_batch::RecordBatch;
27+
use datafusion::common::external_err;
2728
use datafusion::common::instant::Instant;
28-
use datafusion::common::{external_err, DataFusionError};
2929
use datafusion::error::Result;
3030
use datafusion::physical_plan::RecordBatchStream;
3131

@@ -143,9 +143,11 @@ impl PrintOptions {
143143
format_options: &FormatOptions,
144144
) -> Result<()> {
145145
if self.format == PrintFormat::Table {
146-
return external_err!("PrintFormat::Table is not implemented"
147-
.to_string()
148-
.into());
146+
let err = std::io::Error::new(
147+
std::io::ErrorKind::Other,
148+
"PrintFormat::Table is not implemented",
149+
);
150+
return external_err!(err);
149151
};
150152

151153
let stdout = std::io::stdout();

datafusion-examples/examples/sql_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::common::{
2424
use datafusion::datasource::file_format::parquet::ParquetFormat;
2525
use datafusion::datasource::listing::ListingOptions;
2626
use datafusion::datasource::MemTable;
27-
use datafusion::error::{DataFusionError, Result};
27+
use datafusion::error::Result;
2828
use datafusion::prelude::*;
2929
use object_store::local::LocalFileSystem;
3030
use std::path::Path;

datafusion/common/src/error.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,10 @@ macro_rules! schema_err {
923923
#[macro_export]
924924
macro_rules! external_datafusion_err {
925925
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
926-
let err = DataFusionError::External(Box::new($ERR), Some(DataFusionError::get_back_trace()));
926+
let err = $crate::error::DataFusionError::External(
927+
Box::new($ERR),
928+
Some($crate::error::DataFusionError::get_back_trace())
929+
);
927930
$(
928931
let err = err.with_diagnostic($DIAG);
929932
)?
@@ -935,7 +938,7 @@ macro_rules! external_datafusion_err {
935938
#[macro_export]
936939
macro_rules! external_err {
937940
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
938-
let err = datafusion_common::external_datafusion_err!($ERR);
941+
let err = $crate::external_datafusion_err!($ERR);
939942
$(
940943
let err = err.with_diagnostic($DIAG);
941944
)?
@@ -947,7 +950,10 @@ macro_rules! external_err {
947950
#[macro_export]
948951
macro_rules! execution_join_datafusion_err {
949952
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
950-
let err = DataFusionError::ExecutionJoin($ERR, Some(DataFusionError::get_back_trace()));
953+
let err = $crate::error::DataFusionError::ExecutionJoin(
954+
$ERR,
955+
Some($crate::error::DataFusionError::get_back_trace())
956+
);
951957
$(
952958
let err = err.with_diagnostic($DIAG);
953959
)?
@@ -959,7 +965,7 @@ macro_rules! execution_join_datafusion_err {
959965
#[macro_export]
960966
macro_rules! execution_join_err {
961967
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
962-
let err = datafusion_common::execution_join_datafusion_err!($ERR);
968+
let err = $crate::execution_join_datafusion_err!($ERR);
963969
$(
964970
let err = err.with_diagnostic($DIAG);
965971
)?

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ use arrow::ipc::{root_as_message, CompressionType};
4444
use datafusion_catalog::Session;
4545
use datafusion_common::parsers::CompressionTypeVariant;
4646
use datafusion_common::{
47-
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
47+
execution_join_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics,
48+
DEFAULT_ARROW_EXTENSION,
4849
};
4950
use datafusion_common_runtime::{JoinSet, SpawnedTask};
5051
use datafusion_datasource::display::FileGroupDisplay;
@@ -294,7 +295,7 @@ impl FileSink for ArrowFileSink {
294295
demux_task
295296
.join_unwind()
296297
.await
297-
.map_err(DataFusionError::ExecutionJoin)??;
298+
.map_err(|e| execution_join_datafusion_err!(e))??;
298299
Ok(row_count as u64)
299300
}
300301
}

datafusion/core/src/test/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data};
3838
use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
3939
use arrow::datatypes::{DataType, Field, Schema};
4040
use arrow::record_batch::RecordBatch;
41-
use datafusion_common::{external_datafusion_err, DataFusionError};
41+
use datafusion_common::external_datafusion_err;
4242
use datafusion_datasource::source::DataSourceExec;
4343

4444
#[cfg(feature = "compression")]

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions
4040
use datafusion_common::parsers::CompressionTypeVariant;
4141
use datafusion_common::stats::Precision;
4242
use datafusion_common::{
43-
internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
44-
DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
43+
execution_join_datafusion_err, internal_datafusion_err, internal_err, not_impl_err,
44+
ColumnStatistics, DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
4545
};
4646
use datafusion_common::{HashMap, Statistics};
4747
use datafusion_common_runtime::{JoinSet, SpawnedTask};
@@ -1210,7 +1210,7 @@ impl FileSink for ParquetSink {
12101210
demux_task
12111211
.join_unwind()
12121212
.await
1213-
.map_err(DataFusionError::ExecutionJoin)??;
1213+
.map_err(|e| execution_join_datafusion_err!(e))??;
12141214

12151215
Ok(row_count as u64)
12161216
}
@@ -1338,7 +1338,7 @@ fn spawn_rg_join_and_finalize_task(
13381338
let (writer, _col_reservation) = task
13391339
.join_unwind()
13401340
.await
1341-
.map_err(DataFusionError::ExecutionJoin)??;
1341+
.map_err(|e| execution_join_datafusion_err!(e))??;
13421342
let encoded_size = writer.get_estimated_total_bytes();
13431343
rg_reservation.grow(encoded_size);
13441344
finalized_rg.push(writer.close()?);
@@ -1475,7 +1475,7 @@ async fn concatenate_parallel_row_groups(
14751475
let result = task.join_unwind().await;
14761476
let mut rg_out = parquet_writer.next_row_group()?;
14771477
let (serialized_columns, mut rg_reservation, _cnt) =
1478-
result.map_err(DataFusionError::ExecutionJoin)??;
1478+
result.map_err(|e| execution_join_datafusion_err!(e))??;
14791479
for chunk in serialized_columns {
14801480
chunk.append_to_row_group(&mut rg_out)?;
14811481
rg_reservation.free();
@@ -1542,7 +1542,7 @@ async fn output_single_parquet_file_parallelized(
15421542
launch_serialization_task
15431543
.join_unwind()
15441544
.await
1545-
.map_err(DataFusionError::ExecutionJoin)??;
1545+
.map_err(|e| execution_join_datafusion_err!(e))??;
15461546
Ok(file_metadata)
15471547
}
15481548

datafusion/datasource/src/url.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ impl ListingTableUrl {
128128
};
129129

130130
let url = url_from_filesystem_path(path).ok_or_else(|| {
131-
external_datafusion_err!(
132-
format!("Failed to convert path to URL: {path}").into()
133-
)
131+
let err = std::io::Error::new(
132+
std::io::ErrorKind::Other,
133+
format!("Failed to convert path to URL: {path}"),
134+
);
135+
external_datafusion_err!(err)
134136
})?;
135137

136138
Self::try_new(url, glob)

datafusion/datasource/src/write/orchestration.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use crate::file_compression_type::FileCompressionType;
2727
use datafusion_common::error::Result;
2828

2929
use arrow::array::RecordBatch;
30-
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
30+
use datafusion_common::{
31+
execution_join_datafusion_err, internal_datafusion_err, internal_err, DataFusionError,
32+
};
3133
use datafusion_common_runtime::{JoinSet, SpawnedTask};
3234
use datafusion_execution::TaskContext;
3335

@@ -285,8 +287,8 @@ pub async fn spawn_writer_tasks_and_join(
285287
write_coordinator_task.join_unwind(),
286288
demux_task.join_unwind()
287289
);
288-
r1.map_err(DataFusionError::ExecutionJoin)??;
289-
r2.map_err(DataFusionError::ExecutionJoin)??;
290+
r1.map_err(|e| execution_join_datafusion_err!(e))??;
291+
r2.map_err(|e| execution_join_datafusion_err!(e))??;
290292

291293
// Return total row count:
292294
rx_row_cnt.await.map_err(|_| {

datafusion/functions/src/regex/regexpreplace.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ use datafusion_common::external_datafusion_err;
3030
use datafusion_common::external_err;
3131
use datafusion_common::plan_err;
3232
use datafusion_common::ScalarValue;
33-
use datafusion_common::{
34-
cast::as_generic_string_array, internal_err, DataFusionError, Result,
35-
};
33+
use datafusion_common::{cast::as_generic_string_array, internal_err, Result};
3634
use datafusion_expr::function::Hint;
3735
use datafusion_expr::ColumnarValue;
3836
use datafusion_expr::TypeSignature;

datafusion/optimizer/src/simplify_expressions/regex.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_common::{external_datafusion_err, DataFusionError, Result, ScalarValue};
18+
use datafusion_common::{external_datafusion_err, Result, ScalarValue};
1919
use datafusion_expr::{lit, BinaryExpr, Expr, Like, Operator};
2020
use regex_syntax::hir::{Capture, Hir, HirKind, Literal, Look};
2121

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ use arrow::datatypes::{Schema, SchemaRef};
3333
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
3434
use arrow::record_batch::RecordBatch;
3535

36-
use datafusion_common::{
37-
exec_datafusion_err, external_err, DataFusionError, HashSet, Result,
38-
};
36+
use datafusion_common::{exec_datafusion_err, external_err, HashSet, Result};
3937
use datafusion_common_runtime::SpawnedTask;
4038
use datafusion_execution::disk_manager::RefCountedTempFile;
4139
use datafusion_execution::RecordBatchStream;

datafusion/sqllogictest/bin/sqllogictests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ async fn run_file_in_runner<D: AsyncDB, M: MakeConnection<Conn = D>>(
271271
}
272272
msg.push_str(&format!("{}. {err}\n\n", i + 1));
273273
}
274-
return external_err!(msg.into());
274+
let err = std::io::Error::new(std::io::ErrorKind::Other, msg);
275+
return external_err!(err);
275276
}
276277

277278
Ok(())

0 commit comments

Comments
 (0)