diff --git a/include/mpsc.hpp b/include/mpsc.hpp index 3a70fd9..678946e 100644 --- a/include/mpsc.hpp +++ b/include/mpsc.hpp @@ -19,14 +19,16 @@ class MPSC { MPSC() = default; put_t put(const T &value) noexcept { - auto write_index = next_free_index_.load(std::memory_order_acquire); + alignas(64) thread_local static std::size_t reader_index_cache; + alignas(64) thread_local static std::size_t write_index; do { - while (write_index > (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_)) { + while (write_index > (reader_index_cache + common_.index_mask_)) { + write_index = next_free_index_.load(std::memory_order_acquire); + reader_index_cache = consumer_.reader_index_.load(std::memory_order_relaxed); if constexpr (std::is_same::value) { return false; } else { - common_.put_wait_.wait( - [this, write_index] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); }); + common_.put_wait_.wait([this] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); }); } } } while (!next_free_index_.compare_exchange_strong(write_index, write_index + 1, std::memory_order_acq_rel, std::memory_order_acquire)); @@ -36,7 +38,7 @@ class MPSC { // commit in the correct order to avoid problems while (last_committed_index_.load(std::memory_order_relaxed) != write_index) { // we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token - common_.put_wait_.wait([this, write_index] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; }); + common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; }); } last_committed_index_.store(++write_index, std::memory_order_release); @@ -50,7 +52,8 @@ class MPSC { } get_t get() noexcept { - while (consumer_.reader_index_2_ >= last_committed_index_.load(std::memory_order_relaxed)) { + while (consumer_.reader_index_2_ >= consumer_.last_committed_index_cache_) { + consumer_.last_committed_index_cache_ = last_committed_index_.load(std::memory_order_relaxed); if constexpr (std::is_same::value) { return std::nullopt; } else { @@ -66,7 +69,9 @@ class MPSC { return contents; } + // for testing only void empty() noexcept { + consumer_.last_committed_index_cache_ = 0; consumer_.reader_index_2_ = 0; next_free_index_.store(0, std::memory_order_release); last_committed_index_.store(0, std::memory_order_release); @@ -98,7 +103,7 @@ class MPSC { }; struct alignas(64) Consumer { - std::size_t next_free_index_cache_{0}; + std::size_t last_committed_index_cache_{0}; std::size_t reader_index_2_{0}; std::atomic reader_index_{0}; }; diff --git a/include/wait_strategy.hpp b/include/wait_strategy.hpp index 366005b..959ec99 100644 --- a/include/wait_strategy.hpp +++ b/include/wait_strategy.hpp @@ -15,51 +15,51 @@ template class WaitStrategyInterface { public: template - void wait(Predicate p) {} - void notify() {} + inline void wait(Predicate p) {} + inline void notify() {} }; class ReturnImmediateStrategy : public WaitStrategyInterface { public: template - void wait(Predicate p) {} - void notify() {} + inline void wait(Predicate p) {} + inline void notify() {} }; class NoOpWaitStrategy : public WaitStrategyInterface { public: template - void wait(Predicate p) {} - void notify() {} + inline void wait(Predicate p) {} + inline void notify() {} }; class PauseWaitStrategy : public WaitStrategyInterface { public: template - void wait(Predicate p) { + inline void wait(Predicate p) { cpu_pause(); } - void notify() {} + inline void notify() {} }; class YieldWaitStrategy : public WaitStrategyInterface { public: template - void wait(Predicate p) { + inline void wait(Predicate p) { std::this_thread::yield(); } - void notify() {} + inline void notify() {} }; class CVWaitStrategy : public WaitStrategyInterface { public: template - void wait(Predicate p) { + inline void wait(Predicate p) { std::unique_lock lock(mutex_); cv_.wait_for(lock, std::chrono::nanoseconds(100), p); } - void notify() { cv_.notify_all(); } + inline void notify() { cv_.notify_all(); } private: std::condition_variable cv_;