Skip to content

WIP add try_consume interface #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions include/rigtorp/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SOFTWARE.
#include <memory>
#include <new> // std::hardware_destructive_interference_size
#include <stdexcept>
#include <type_traits>

#ifndef __cpp_aligned_new
#ifdef _WIN32
Expand Down Expand Up @@ -253,6 +254,33 @@ class Queue {
}
}

template <typename Consumer> bool try_consume(Consumer &&c) noexcept {
#ifdef __cpp_lib_is_invocable
static_assert(std::is_nothrow_invocable_v<Consumer, T &&>,
"Consumer must be noexcept invocable with T&&");
#endif
auto tail = tail_.load(std::memory_order_acquire);
for (;;) {
auto &slot = slots_[idx(tail)];
if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) {
if (tail_.compare_exchange_strong(tail, tail + 1)) {
static_assert(noexcept(c(slot.move())),
"Consumer must be noexcept invocable with T&&");
c(slot.move());
slot.destroy();
slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
return true;
}
} else {
auto const prevTail = tail;
tail = tail_.load(std::memory_order_acquire);
if (tail == prevTail) {
return false;
}
}
}
}

private:
constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }

Expand Down
69 changes: 65 additions & 4 deletions src/MPMCQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,77 @@ int main(int argc, char *argv[]) {
{
MPMCQueue<std::unique_ptr<int>> q(16);
// lvalue
auto v = std::unique_ptr<int>(new int(1));
// auto v = std::unique_ptr<int>(new int(1));
// q.emplace(v);
// q.try_emplace(v);
// q.push(v);
// q.try_push(v);
// xvalue
q.emplace(std::unique_ptr<int>(new int(1)));
q.try_emplace(std::unique_ptr<int>(new int(1)));
q.push(std::unique_ptr<int>(new int(1)));
q.try_push(std::unique_ptr<int>(new int(1)));
q.try_emplace(std::unique_ptr<int>(new int(2)));
q.push(std::unique_ptr<int>(new int(3)));
q.try_push(std::unique_ptr<int>(new int(4)));

std::unique_ptr<int> v;
q.pop(v);
assert(*v == 1);
assert(q.try_pop(v) == true);
assert(*v == 2);
assert(q.try_consume([&v](auto &&w) noexcept { v = std::move(w); }) ==
true);
assert(*v == 3);
}

// Check that pointer have same address and value after moving through queue
{
MPMCQueue<std::unique_ptr<int>> q(1);
{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
assert(q.try_emplace(std::move(up)) == true);
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
q.emplace(std::move(up));
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
q.push(std::move(up));
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
assert(q.try_push(std::move(up)) == true);
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}
}

{
Expand Down