Skip to content

Commit

Permalink
improve MPSC performance (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
geseq authored Feb 29, 2024
1 parent 0b965d0 commit 2c62633
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
19 changes: 12 additions & 7 deletions include/mpsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class MPSC {
MPSC() = default;

put_t put(const T &value) noexcept {
auto write_index = next_free_index_.load(std::memory_order_acquire);
alignas(64) thread_local static std::size_t reader_index_cache;
alignas(64) thread_local static std::size_t write_index;
do {
while (write_index > (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_)) {
while (write_index > (reader_index_cache + common_.index_mask_)) {
write_index = next_free_index_.load(std::memory_order_acquire);
reader_index_cache = consumer_.reader_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
return false;
} else {
common_.put_wait_.wait(
[this, write_index] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
common_.put_wait_.wait([this] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
}
}
} while (!next_free_index_.compare_exchange_strong(write_index, write_index + 1, std::memory_order_acq_rel, std::memory_order_acquire));
Expand All @@ -36,7 +38,7 @@ class MPSC {
// commit in the correct order to avoid problems
while (last_committed_index_.load(std::memory_order_relaxed) != write_index) {
// we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token
common_.put_wait_.wait([this, write_index] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; });
common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; });
}

last_committed_index_.store(++write_index, std::memory_order_release);
Expand All @@ -50,7 +52,8 @@ class MPSC {
}

get_t get() noexcept {
while (consumer_.reader_index_2_ >= last_committed_index_.load(std::memory_order_relaxed)) {
while (consumer_.reader_index_2_ >= consumer_.last_committed_index_cache_) {
consumer_.last_committed_index_cache_ = last_committed_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value) {
return std::nullopt;
} else {
Expand All @@ -66,7 +69,9 @@ class MPSC {
return contents;
}

// for testing only
void empty() noexcept {
consumer_.last_committed_index_cache_ = 0;
consumer_.reader_index_2_ = 0;
next_free_index_.store(0, std::memory_order_release);
last_committed_index_.store(0, std::memory_order_release);
Expand Down Expand Up @@ -98,7 +103,7 @@ class MPSC {
};

struct alignas(64) Consumer {
std::size_t next_free_index_cache_{0};
std::size_t last_committed_index_cache_{0};
std::size_t reader_index_2_{0};
std::atomic<std::size_t> reader_index_{0};
};
Expand Down
24 changes: 12 additions & 12 deletions include/wait_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,51 @@ template <typename Implementation>
class WaitStrategyInterface {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class ReturnImmediateStrategy : public WaitStrategyInterface<ReturnImmediateStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class NoOpWaitStrategy : public WaitStrategyInterface<NoOpWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class PauseWaitStrategy : public WaitStrategyInterface<PauseWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
cpu_pause();
}
void notify() {}
inline void notify() {}
};

class YieldWaitStrategy : public WaitStrategyInterface<YieldWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
std::this_thread::yield();
}
void notify() {}
inline void notify() {}
};

class CVWaitStrategy : public WaitStrategyInterface<PauseWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::nanoseconds(100), p);
}

void notify() { cv_.notify_all(); }
inline void notify() { cv_.notify_all(); }

private:
std::condition_variable cv_;
Expand Down

0 comments on commit 2c62633

Please sign in to comment.