Skip to content

Commit 83d15e8

Browse files
authored
Move SpawnedTask from datafusion_physical_plan to new datafusion_common_runtime crate (#9414)
* Initial commit * Tmp * Remove clippy ignores * Move SpawnedTask to under new crate * Minor changes * re-export from core
1 parent 10fbf42 commit 83d15e8

File tree

22 files changed

+186
-66
lines changed

22 files changed

+186
-66
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
exclude = ["datafusion-cli"]
2020
members = [
2121
"datafusion/common",
22+
"datafusion/common_runtime",
2223
"datafusion/core",
2324
"datafusion/expr",
2425
"datafusion/execution",
@@ -72,6 +73,7 @@ ctor = "0.2.0"
7273
dashmap = "5.4.0"
7374
datafusion = { path = "datafusion/core", version = "36.0.0", default-features = false }
7475
datafusion-common = { path = "datafusion/common", version = "36.0.0", default-features = false }
76+
datafusion-common-runtime = { path = "datafusion/common_runtime", version = "36.0.0" }
7577
datafusion-execution = { path = "datafusion/execution", version = "36.0.0" }
7678
datafusion-expr = { path = "datafusion/expr", version = "36.0.0" }
7779
datafusion-functions = { path = "datafusion/functions", version = "36.0.0" }

datafusion-cli/Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common_runtime/Cargo.toml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-common-runtime"
20+
description = "Common Runtime functionality for DataFusion query engine"
21+
keywords = ["arrow", "query", "sql"]
22+
readme = "README.md"
23+
version = { workspace = true }
24+
edition = { workspace = true }
25+
homepage = { workspace = true }
26+
repository = { workspace = true }
27+
license = { workspace = true }
28+
authors = { workspace = true }
29+
rust-version = { workspace = true }
30+
31+
[lib]
32+
name = "datafusion_common_runtime"
33+
path = "src/lib.rs"
34+
35+
[dependencies]
36+
tokio = { workspace = true }

datafusion/common_runtime/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Common Runtime
21+
22+
[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
23+
24+
This crate is a submodule of DataFusion that provides common utilities.
25+
26+
[df]: https://crates.io/crates/datafusion
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::future::Future;
19+
20+
use tokio::task::{JoinError, JoinSet};
21+
22+
/// Helper that provides a simple API to spawn a single task and join it.
23+
/// Provides guarantees of aborting on `Drop` to keep it cancel-safe.
24+
///
25+
/// Technically, it's just a wrapper of `JoinSet` (with size=1).
26+
#[derive(Debug)]
27+
pub struct SpawnedTask<R> {
28+
inner: JoinSet<R>,
29+
}
30+
31+
impl<R: 'static> SpawnedTask<R> {
32+
pub fn spawn<T>(task: T) -> Self
33+
where
34+
T: Future<Output = R>,
35+
T: Send + 'static,
36+
R: Send,
37+
{
38+
let mut inner = JoinSet::new();
39+
inner.spawn(task);
40+
Self { inner }
41+
}
42+
43+
pub fn spawn_blocking<T>(task: T) -> Self
44+
where
45+
T: FnOnce() -> R,
46+
T: Send + 'static,
47+
R: Send,
48+
{
49+
let mut inner = JoinSet::new();
50+
inner.spawn_blocking(task);
51+
Self { inner }
52+
}
53+
54+
pub async fn join(mut self) -> Result<R, JoinError> {
55+
self.inner
56+
.join_next()
57+
.await
58+
.expect("`SpawnedTask` instance always contains exactly 1 task")
59+
}
60+
}

datafusion/common_runtime/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod common;
19+
20+
pub use common::SpawnedTask;

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ bzip2 = { version = "0.4.3", optional = true }
8989
chrono = { workspace = true }
9090
dashmap = { workspace = true }
9191
datafusion-common = { workspace = true, features = ["object_store"] }
92+
datafusion-common-runtime = { workspace = true }
9293
datafusion-execution = { workspace = true }
9394
datafusion-expr = { workspace = true }
9495
datafusion-functions = { workspace = true }

datafusion/core/src/dataframe/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,6 +1510,7 @@ mod tests {
15101510
use arrow::array::{self, Int32Array};
15111511
use arrow::datatypes::DataType;
15121512
use datafusion_common::{Constraint, Constraints};
1513+
use datafusion_common_runtime::SpawnedTask;
15131514
use datafusion_expr::{
15141515
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
15151516
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
@@ -2169,15 +2170,14 @@ mod tests {
21692170
}
21702171

21712172
#[tokio::test]
2172-
#[allow(clippy::disallowed_methods)]
21732173
async fn sendable() {
21742174
let df = test_table().await.unwrap();
21752175
// dataframes should be sendable between threads/tasks
2176-
let task = tokio::task::spawn(async move {
2176+
let task = SpawnedTask::spawn(async move {
21772177
df.select_columns(&["c1"])
21782178
.expect("should be usable in a task")
21792179
});
2180-
task.await.expect("task completed successfully");
2180+
task.join().await.expect("task completed successfully");
21812181
}
21822182

21832183
#[tokio::test]

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use arrow::datatypes::SchemaRef;
4040
use arrow::datatypes::{Fields, Schema};
4141
use bytes::{BufMut, BytesMut};
4242
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
43+
use datafusion_common_runtime::SpawnedTask;
4344
use datafusion_execution::TaskContext;
4445
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
45-
use datafusion_physical_plan::common::SpawnedTask;
4646
use futures::{StreamExt, TryStreamExt};
4747
use hashbrown::HashMap;
4848
use object_store::path::Path;

datafusion/core/src/datasource/file_format/write/demux.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@ use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArr
3333
use arrow_schema::{DataType, Schema};
3434
use datafusion_common::cast::as_string_array;
3535
use datafusion_common::{exec_datafusion_err, DataFusionError};
36-
36+
use datafusion_common_runtime::SpawnedTask;
3737
use datafusion_execution::TaskContext;
3838

3939
use futures::StreamExt;
4040
use object_store::path::Path;
4141

4242
use rand::distributions::DistString;
4343

44-
use datafusion_physical_plan::common::SpawnedTask;
4544
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
4645

4746
type RecordBatchReceiver = Receiver<RecordBatch>;

datafusion/core/src/datasource/file_format/write/orchestration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use crate::physical_plan::SendableRecordBatchStream;
3030

3131
use arrow_array::RecordBatch;
3232
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
33+
use datafusion_common_runtime::SpawnedTask;
3334
use datafusion_execution::TaskContext;
3435

3536
use bytes::Bytes;
36-
use datafusion_physical_plan::common::SpawnedTask;
3737
use futures::try_join;
3838
use tokio::io::{AsyncWrite, AsyncWriteExt};
3939
use tokio::sync::mpsc::{self, Receiver};

datafusion/core/src/datasource/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ use async_trait::async_trait;
3131
use futures::StreamExt;
3232

3333
use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
34+
use datafusion_common_runtime::SpawnedTask;
3435
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3536
use datafusion_expr::{CreateExternalTable, Expr, TableType};
36-
use datafusion_physical_plan::common::SpawnedTask;
3737
use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
3838
use datafusion_physical_plan::metrics::MetricsSet;
3939
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;

datafusion/core/src/execution/context/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2222,6 +2222,7 @@ mod tests {
22222222
use crate::test_util::{plan_and_collect, populate_csv_partitions};
22232223
use crate::variable::VarType;
22242224
use async_trait::async_trait;
2225+
use datafusion_common_runtime::SpawnedTask;
22252226
use datafusion_expr::Expr;
22262227
use std::env;
22272228
use std::path::PathBuf;
@@ -2321,7 +2322,6 @@ mod tests {
23212322
}
23222323

23232324
#[tokio::test]
2324-
#[allow(clippy::disallowed_methods)]
23252325
async fn send_context_to_threads() -> Result<()> {
23262326
// ensure SessionContexts can be used in a multi-threaded
23272327
// environment. Usecase is for concurrent planing.
@@ -2332,7 +2332,7 @@ mod tests {
23322332
let threads: Vec<_> = (0..2)
23332333
.map(|_| ctx.clone())
23342334
.map(|ctx| {
2335-
tokio::spawn(async move {
2335+
SpawnedTask::spawn(async move {
23362336
// Ensure we can create logical plan code on a separate thread.
23372337
ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
23382338
.await
@@ -2341,7 +2341,7 @@ mod tests {
23412341
.collect();
23422342

23432343
for handle in threads {
2344-
handle.await.unwrap().unwrap();
2344+
handle.join().await.unwrap().unwrap();
23452345
}
23462346
Ok(())
23472347
}

datafusion/core/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,11 @@ pub use parquet;
480480
/// re-export of [`datafusion_common`] crate
481481
pub mod common {
482482
pub use datafusion_common::*;
483+
484+
/// re-export of [`datafusion_common_runtime`] crate
485+
pub mod runtime {
486+
pub use datafusion_common_runtime::*;
487+
}
483488
}
484489

485490
// Backwards compatibility

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use datafusion::physical_plan::windows::{
3030
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
3131
use datafusion::prelude::{SessionConfig, SessionContext};
3232
use datafusion_common::{Result, ScalarValue};
33+
use datafusion_common_runtime::SpawnedTask;
3334
use datafusion_expr::type_coercion::aggregates::coerce_types;
3435
use datafusion_expr::{
3536
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
@@ -123,8 +124,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> {
123124
for i in 0..n {
124125
let idx = i % test_cases.len();
125126
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
126-
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
127-
let job = tokio::spawn(run_window_test(
127+
let job = SpawnedTask::spawn(run_window_test(
128128
make_staggered_batches::<true>(1000, n_distinct, i as u64),
129129
i as u64,
130130
pb_cols,
@@ -134,7 +134,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> {
134134
handles.push(job);
135135
}
136136
for job in handles {
137-
job.await.unwrap()?;
137+
job.join().await.unwrap()?;
138138
}
139139
}
140140
Ok(())

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ arrow-schema = { workspace = true }
4343
async-trait = { workspace = true }
4444
chrono = { workspace = true }
4545
datafusion-common = { workspace = true, default-features = true }
46+
datafusion-common-runtime = { workspace = true, default-features = true }
4647
datafusion-execution = { workspace = true }
4748
datafusion-expr = { workspace = true }
4849
datafusion-physical-expr = { workspace = true, default-features = true }

0 commit comments

Comments
 (0)