diff --git a/include/rigtorp/MPMCQueue.h b/include/rigtorp/MPMCQueue.h index 01ca713..da47d17 100644 --- a/include/rigtorp/MPMCQueue.h +++ b/include/rigtorp/MPMCQueue.h @@ -29,6 +29,7 @@ SOFTWARE. #include #include // std::hardware_destructive_interference_size #include +#include #ifndef __cpp_aligned_new #ifdef _WIN32 @@ -253,6 +254,33 @@ class Queue { } } + template bool try_consume(Consumer &&c) noexcept { +#ifdef __cpp_lib_is_invocable + static_assert(std::is_nothrow_invocable_v, + "Consumer must be noexcept invocable with T&&"); +#endif + auto tail = tail_.load(std::memory_order_acquire); + for (;;) { + auto &slot = slots_[idx(tail)]; + if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { + if (tail_.compare_exchange_strong(tail, tail + 1)) { + static_assert(noexcept(c(slot.move())), + "Consumer must be noexcept invocable with T&&"); + c(slot.move()); + slot.destroy(); + slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); + return true; + } + } else { + auto const prevTail = tail; + tail = tail_.load(std::memory_order_acquire); + if (tail == prevTail) { + return false; + } + } + } + } + private: constexpr size_t idx(size_t i) const noexcept { return i % capacity_; } diff --git a/src/MPMCQueueTest.cpp b/src/MPMCQueueTest.cpp index d4ea4a2..54cb63d 100644 --- a/src/MPMCQueueTest.cpp +++ b/src/MPMCQueueTest.cpp @@ -122,16 +122,77 @@ int main(int argc, char *argv[]) { { MPMCQueue> q(16); // lvalue - auto v = std::unique_ptr(new int(1)); + // auto v = std::unique_ptr(new int(1)); // q.emplace(v); // q.try_emplace(v); // q.push(v); // q.try_push(v); // xvalue q.emplace(std::unique_ptr(new int(1))); - q.try_emplace(std::unique_ptr(new int(1))); - q.push(std::unique_ptr(new int(1))); - q.try_push(std::unique_ptr(new int(1))); + q.try_emplace(std::unique_ptr(new int(2))); + q.push(std::unique_ptr(new int(3))); + q.try_push(std::unique_ptr(new int(4))); + + std::unique_ptr v; + q.pop(v); + assert(*v == 1); + assert(q.try_pop(v) == true); + assert(*v == 2); + assert(q.try_consume([&v](auto &&w) noexcept { v = std::move(w); }) == + true); + assert(*v == 3); + } + + // Check that pointer have same address and value after moving through queue + { + MPMCQueue> q(1); + { + auto up = std::make_unique(1); + auto *rp = up.get(); + std::unique_ptr up2; + assert(q.try_emplace(std::move(up)) == true); + assert(q.try_consume([&](std::unique_ptr v) noexcept { + up2 = std::move(v); + }) == true); + assert(rp == up2.get()); + assert(*up2 == 1); + } + + { + auto up = std::make_unique(1); + auto *rp = up.get(); + std::unique_ptr up2; + q.emplace(std::move(up)); + assert(q.try_consume([&](std::unique_ptr v) noexcept { + up2 = std::move(v); + }) == true); + assert(rp == up2.get()); + assert(*up2 == 1); + } + + { + auto up = std::make_unique(1); + auto *rp = up.get(); + std::unique_ptr up2; + q.push(std::move(up)); + assert(q.try_consume([&](std::unique_ptr v) noexcept { + up2 = std::move(v); + }) == true); + assert(rp == up2.get()); + assert(*up2 == 1); + } + + { + auto up = std::make_unique(1); + auto *rp = up.get(); + std::unique_ptr up2; + assert(q.try_push(std::move(up)) == true); + assert(q.try_consume([&](std::unique_ptr v) noexcept { + up2 = std::move(v); + }) == true); + assert(rp == up2.get()); + assert(*up2 == 1); + } } {