-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43631: [C++] Add C++ implementation of Async C Data Interface #44495
GH-43631: [C++] Add C++ implementation of Async C Data Interface #44495
Conversation
Status operator()(const std::shared_ptr<RecordBatch>& record) { | ||
std::unique_lock<std::mutex> lock(state_->mutex_); | ||
if (state_->pending_requests_ == 0) { | ||
state_->cv_.wait(lock, [this]() -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we supposed to have a blocking call in consuming an AsyncGenerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly couldn't figure out a better way around this to manage the backpressure from the calls to request
. I'm open to suggestions for a better route here other than simply saying this should not be run on the main thread shrug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this works for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the way it "should" work is that we never call into the AsyncGenerator unless we have a pending request? But we'd then need to handle more of the control flow here ourselves
Co-authored-by: Benjamin Kietzman <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see anything out of place here and I checked that the documentation/interface makes sense to me (and it does!).
Not at all essential, but it might be worth marking these as experimental (e.g., so that it is less surprising if you have to update how the Executor
works).
@bkietz I addressed your suggestions! @paleolimbot I've updated the doc briefs to include |
Co-authored-by: Benjamin Kietzman <[email protected]>
This adds a basic implementation of helpers for managing an ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device interface. The corresponding C++ helper implementation can be found at apache/arrow#44495 with the discusson on the actual C structures located at apache/arrow#43632. --------- Co-authored-by: Sutou Kouhei <[email protected]>
After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit aab7d81. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 8 possible false positives for unstable benchmarks that are known to sometimes produce them. |
Rationale for this change
Building on #43632 which created the Async C Data Structures, this adds functions to
bridge.h
/bridge.cc
to implement helpers for managing the Async C Data interfacesWhat changes are included in this PR?
Two functions added to bridge.h:
CreateAsyncDeviceStreamHandler
populates aArrowAsyncDeviceStreamHandler
and anExecutor
to provide a future that resolves to anAsyncRecordBatchGenerator
to produce record batches as they are pushed asynchronously. TheArrowAsyncDeviceStreamHandler
can then be passed to any asynchronous producer.ExportAsyncRecordBatchReader
takes a record batch generator and a schema, along with anArrowAsyncDeviceStreamHandler
to use for calling the callbacks to push data as it is available from the generator.Are these changes tested?
Unit tests are added (currently only one test, more tests to be added)
Are there any user-facing changes?
No