Skip to content

Commit 7440c1c

Browse files
committed
Add function to wrap any stream
1 parent 5e93cfc commit 7440c1c

File tree

4 files changed

+50
-14
lines changed

4 files changed

+50
-14
lines changed

datafusion-examples/examples/thread_pools.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
//! issues described in the [Architecture section] such as throttled bandwidth
2525
//! due to congestion control and increased latencies for processing network
2626
//! messages.
27-
2827
use arrow::util::pretty::pretty_format_batches;
2928
use datafusion::error::Result;
3029
use datafusion::execution::SendableRecordBatchStream;
@@ -144,8 +143,6 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()>
144143

145144
/// Demonstrates how to run queries on a different runtime than the current run
146145
/// and how to handle IO operations.
147-
///
148-
149146
async fn different_runtime_advanced() -> Result<()> {
150147
// In this example, we will configure access to a remote object store
151148
// over the network during the plan

datafusion/physical-plan/src/cross_rt_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ where
103103
{
104104
/// Create new stream based on an existing stream that transports [`Result`]s.
105105
///
106-
/// Also receives an executor that actually executes the underlying stream as well as a converter that convets
106+
/// Also receives an executor that actually executes the underlying stream as well as a converter that converts
107107
/// [`executor::JobError`] to the error type of the stream (so we can send potential crashes/panics).
108108
pub fn new_with_error_stream<S, C>(
109109
stream: S,

datafusion/physical-plan/src/dedicated_executor.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ use crate::io_object_store::IoObjectStore;
2525
use crate::stream::RecordBatchStreamAdapter;
2626
use crate::SendableRecordBatchStream;
2727
use datafusion_common::DataFusionError;
28-
use futures::{
29-
future::{BoxFuture, Shared},
30-
Future, FutureExt, TryFutureExt,
31-
};
28+
use futures::{future::{BoxFuture, Shared}, Future, FutureExt, Stream, TryFutureExt};
3229
use log::{info, warn};
3330
use object_store::ObjectStore;
3431
use parking_lot::RwLock;
@@ -59,6 +56,8 @@ impl From<Builder> for DedicatedExecutorBuilder {
5956
/// tasks on the same threadpool by running futures (and any `tasks` that are
6057
/// `tokio::task::spawned` by them) on a separate tokio [`Executor`].
6158
///
59+
/// DedicatedExecutor can be `clone`ed and all clones share the same threadpool.
60+
///
6261
/// TODO add note about `io_thread`
6362
///
6463
/// TODO: things we use in InfluxData
@@ -119,7 +118,7 @@ impl From<Builder> for DedicatedExecutorBuilder {
119118
/// happens when a runtime is dropped from within an asynchronous
120119
/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
121120
///
122-
#[derive(Clone)]
121+
#[derive(Clone, Debug)]
123122
pub struct DedicatedExecutor {
124123
state: Arc<RwLock<State>>,
125124
}
@@ -222,8 +221,8 @@ impl DedicatedExecutor {
222221
/// Note that this object store will only work correctly if run on this
223222
/// dedicated executor. If you try and use it on another executor, it will
224223
/// panic with "no IO runtime registered" type error.
225-
pub fn wrap_object_store(object_store: Arc<dyn ObjectStore>) -> Arc<IoObjectStore> {
226-
Arc::new(IoObjectStore::new(object_store))
224+
pub fn wrap_object_store(&self, object_store: Arc<dyn ObjectStore>) -> Arc<IoObjectStore> {
225+
Arc::new(IoObjectStore::new(self.clone(), object_store))
227226
}
228227

229228
/// Returns a SendableRecordBatchStream that will run on this executor's thread pool
@@ -237,6 +236,28 @@ impl DedicatedExecutor {
237236
Box::pin(RecordBatchStreamAdapter::new(schema, cross_rt_stream))
238237
}
239238

239+
/// Runs an stream that produces Results on the executor's thread pool
240+
///
241+
/// Ths stream must produce Results so that any errors on the dedicated
242+
/// executor (like a panic or shutdown) can be communicated back.
243+
///
244+
/// # Arguments:
245+
/// - stream: the stream to run on this dedicated executor
246+
/// - converter: a function that converts a [`JobError`] to the error type of the stream
247+
pub fn run_stream<X, E, S, C>(
248+
&self,
249+
stream: S,
250+
converter: C,
251+
) -> impl Stream<Item = Result<X, E>> + Send + 'static
252+
where
253+
X: Send + 'static,
254+
E: Send + 'static,
255+
S: Stream<Item = Result<X, E>> + Send + 'static,
256+
C: Fn(JobError) -> E + Send + 'static,
257+
{
258+
CrossRtStream::new_with_error_stream(stream, self.clone(), converter)
259+
}
260+
240261
/// Registers `handle` as the IO runtime for this thread
241262
///
242263
/// This sets a thread-local variable
@@ -306,6 +327,7 @@ impl<T> Future for DropGuard<T> {
306327
/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
307328
/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
308329
/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
330+
#[derive(Debug)]
309331
struct State {
310332
/// Runtime handle.
311333
///

datafusion/physical-plan/src/io_object_store.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ use std::sync::Arc;
2020
use crate::DedicatedExecutor;
2121
use async_trait::async_trait;
2222
use futures::stream::BoxStream;
23+
use futures::StreamExt;
2324
use object_store::{
2425
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
2526
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
2627
};
28+
use crate::dedicated_executor::JobError;
2729

2830
/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying
2931
/// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime
@@ -33,12 +35,14 @@ use object_store::{
3335
///
3436
#[derive(Debug)]
3537
pub struct IoObjectStore {
38+
executor: DedicatedExecutor,
3639
inner: Arc<dyn ObjectStore>,
3740
}
3841

3942
impl IoObjectStore {
40-
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
43+
pub fn new(executor: DedicatedExecutor, object_store: Arc<dyn ObjectStore>) -> Self {
4144
Self {
45+
executor,
4246
inner: object_store,
4347
}
4448
}
@@ -50,9 +54,15 @@ impl std::fmt::Display for IoObjectStore {
5054
}
5155
}
5256

57+
fn convert_error(e: JobError) -> object_store::Error {
58+
object_store::Error::Generic {
59+
store: "IoObjectStore",
60+
source: Box::new(e)
61+
}
62+
}
63+
5364
#[async_trait]
5465
impl ObjectStore for IoObjectStore {
55-
/// TODO wrap the resulting stream in CrossRTStream
5666
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
5767
let location = location.clone();
5868
let store = Arc::clone(&self.inner);
@@ -83,7 +93,12 @@ impl ObjectStore for IoObjectStore {
8393
}
8494

8595
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
86-
self.inner.list(prefix)
96+
// run the inner list on the dedicated executor
97+
let inner_stream = self.inner.list(prefix);
98+
99+
inner_stream
100+
//self.executor.run_stream(inner_stream, convert_error)
101+
// .boxed()
87102
}
88103

89104
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
@@ -121,4 +136,6 @@ impl ObjectStore for IoObjectStore {
121136
})
122137
.await
123138
}
139+
140+
124141
}

0 commit comments

Comments
 (0)