Skip to content

Commit

Permalink
[GraphBolt][io_uring] Remove redundant mechanism. (#7686)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin authored Aug 12, 2024
1 parent 4c1e14c commit bdbba5f
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions graphbolt/src/cnumpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
}
// If this is the first thread exiting, release the master thread's
// ticket as well by releasing 2 slots. Otherwise, release 1 slot.
const auto releasing = acquirer_->exiting_first_.test_and_set() ? 1 : 2;
const auto releasing =
acquirer_->exiting_first_.test_and_set(std::memory_order_relaxed)
? 1
: 2;
semaphore_.release(releasing);
acquirer_->num_acquisitions_.fetch_add(
-releasing, std::memory_order_relaxed);
}

::io_uring& get() const { return io_uring_queue_[thread_id_]; }
Expand All @@ -179,17 +180,16 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
}

~QueueAndBufferAcquirer() {
// If any of the worker threads exit early without being able to release
// the semaphore, we make sure to release it for them in the main thread.
const auto releasing = num_acquisitions_.load(std::memory_order_relaxed);
// If none of the worker threads acquire the semaphore, we make sure to
// release the ticket taken in the constructor.
const auto releasing =
exiting_first_.test_and_set(std::memory_order_relaxed) ? 0 : 1;
semaphore_.release(releasing);
TORCH_CHECK(releasing == 0, "An io_uring worker thread didn't not exit.");
}

std::pair<UniqueQueue, char*> get() {
// We consume a slot from the semaphore to use a queue.
semaphore_.acquire();
num_acquisitions_.fetch_add(1, std::memory_order_relaxed);
const auto thread_id = [&] {
std::lock_guard lock(available_queues_mtx_);
TORCH_CHECK(!available_queues_.empty());
Expand All @@ -205,7 +205,6 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
private:
const OnDiskNpyArray* array_;
std::atomic_flag exiting_first_ = ATOMIC_FLAG_INIT;
std::atomic<int> num_acquisitions_ = 1;
};

#endif // HAVE_LIBRARY_LIBURING
Expand Down

0 comments on commit bdbba5f

Please sign in to comment.