Skip to content

Commit 7e944ed

Browse files
authored
fix: panic when use Scheduler to execute plan (#4097)
1 parent f61b43a commit 7e944ed

File tree

2 files changed

+31
-14
lines changed

2 files changed

+31
-14
lines changed

datafusion/core/src/scheduler/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,11 @@ mod tests {
323323
async fn test_simple() {
324324
init_logging();
325325

326-
let scheduler = Scheduler::new(4);
326+
let scheduler = SchedulerBuilder::new(4)
327+
.panic_handler(|panic| {
328+
unreachable!("not expect panic: {:?}", panic);
329+
})
330+
.build();
327331

328332
let config = SessionConfig::new().with_target_partitions(4);
329333
let context = SessionContext::with_config(config);
@@ -341,6 +345,8 @@ mod tests {
341345
"select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
342346
"select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id",
343347
"select count(*) from table1 where table1.a > 4",
348+
"WITH gp AS (SELECT id FROM table1 GROUP BY id)
349+
SELECT COUNT(CAST(CAST(gp.id || 'xx' AS TIMESTAMP) AS BIGINT)) FROM gp",
344350
];
345351

346352
for sql in queries {
@@ -353,8 +359,8 @@ mod tests {
353359
info!("Plan: {}", displayable(plan.as_ref()).indent());
354360

355361
let stream = scheduler.schedule(plan, task).unwrap().stream();
356-
let scheduled: Vec<_> = stream.try_collect().await.unwrap();
357-
let expected = query.collect().await.unwrap();
362+
let scheduled: Vec<_> = stream.try_collect().await.unwrap_or_default();
363+
let expected = query.collect().await.unwrap_or_default();
358364

359365
let total_expected = expected.iter().map(|x| x.num_rows()).sum::<usize>();
360366
let total_scheduled = scheduled.iter().map(|x| x.num_rows()).sum::<usize>();

datafusion/core/src/scheduler/task.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,24 @@ impl Task {
108108
routable: &RoutablePipeline,
109109
error: DataFusionError,
110110
) {
111-
self.context.send_query_output(partition, Err(error));
112-
if let Some(link) = routable.output {
113-
trace!(
114-
"Closing pipeline: {:?}, partition: {}, due to error",
115-
link,
116-
self.waker.partition,
117-
);
118-
119-
self.context.pipelines[link.pipeline]
120-
.pipeline
121-
.close(link.child, self.waker.partition);
111+
match routable.output {
112+
Some(link) => {
113+
// The query output partitioning may not match the current pipeline's
114+
// but the query output has at least one partition
115+
// so send error to the first partition of the query output.
116+
self.context.send_query_output(0, Err(error));
117+
118+
trace!(
119+
"Closing pipeline: {:?}, partition: {}, due to error",
120+
link,
121+
self.waker.partition,
122+
);
123+
124+
self.context.pipelines[link.pipeline]
125+
.pipeline
126+
.close(link.child, self.waker.partition);
127+
}
128+
None => self.context.send_query_output(partition, Err(error)),
122129
}
123130
}
124131

@@ -303,6 +310,10 @@ impl ExecutionContext {
303310

304311
/// Sends `output` to this query's output stream
305312
fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) {
313+
debug_assert!(
314+
self.output.len() > partition,
315+
"the specified partition exceeds the total number of output partitions"
316+
);
306317
let _ = self.output[partition].unbounded_send(Some(output));
307318
}
308319

0 commit comments

Comments
 (0)