-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DedicatedExecutor
to FlightSQL Server
#247
base: main
Are you sure you want to change the base?
Add DedicatedExecutor
to FlightSQL Server
#247
Conversation
src/execution/dedicated_executor.rs
Outdated
// also register the IO runtime for the current thread, since it might be used as well (esp. for the | ||
// current thread RT) | ||
register_io_runtime(io_handle.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me what this means in practice. Maybe will become more obvious once I start plugging this in to the rest of the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that part i was clear on - i meant more under what circumstances its used in actual app code. no real concern right now - just memorializing my thoughts as i work on this.
src/execution/dedicated_executor.rs
Outdated
let runtime = runtime_builder | ||
.on_thread_start(move || register_io_runtime(io_handle.clone())) | ||
.build() | ||
.expect("Creating tokio runtime"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to allow configuring the number of threads - but i guess that could be done by the caller since this takes tokio::runtime::Builder
src/execution/dedicated_executor.rs
Outdated
if tx_handle.send(Handle::current()).is_err() { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we want a log or something here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown
src/execution/mod.rs
Outdated
@@ -15,6 +15,7 @@ | |||
// specific language governing permissions and limitations | |||
// under the License. | |||
|
|||
pub mod dedicated_executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will probably end up including this in flightsql
feature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool @matthewmturner -- thank you
cc @crepererum and @tustvold
src/execution/dedicated_executor.rs
Outdated
// also register the IO runtime for the current thread, since it might be used as well (esp. for the | ||
// current thread RT) | ||
register_io_runtime(io_handle.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]
src/execution/dedicated_executor.rs
Outdated
if tx_handle.send(Handle::current()).is_err() { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown
src/execution/dedicated_executor.rs
Outdated
/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for | ||
/// [`start_shutdown`](Self::start_shutdown) and signals the completion via | ||
/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). | ||
struct State { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to use a tokio JoinSet https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html instead of this now (I think this code predates JoinSet
)
src/execution/dedicated_executor.rs
Outdated
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great -- thank you @matthewmturner
Once we get this sorted out I definitely think we should contemplate merging it back upstream in DataFusion (with documentation). I can totally help with that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
BTW I wrote up a bunch of backstory about why a separate executor is needed in apache/datafusion#13423 I hope to get an example up soon (that will show why this DedicatedExecutor is much nicer) |
@alamb thanks for that - it looks great. Im just plugging along getting this integrated into the flightsql server here. My first objective is to get all the pipes working and all the CPU bound work being executed by the current implementation of dedicated executor. After that I will probably look into updating the implementation to use the |
DedicatedExecutor
to FlightSQL Server
Add's a dedicated executor for running CPU bound work on the FlightSQL server.
There is interest from the DataFusion community for this, it was already on our roadmap and I think the DFT FlightSQL server is a great place to have a reference implementation.
Initial inspiration and context can be found here.
Most of the initial implementation was copied from here with some tweaks for our current setup. In particular we dont have metrics yet in the FlightSQL server implementation (but it is on the roadmap) - I expect to do a follow on where metrics are integrated.