Skip to content

Commit

Permalink
Merge pull request #2214 from albtam/iox-2177-update-sofi
Browse files Browse the repository at this point in the history
iox-#2177 Update SPSC SoFi
  • Loading branch information
elBoberido authored Aug 29, 2024
2 parents a285e98 + 8e0af98 commit 1549c96
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 198 deletions.
2 changes: 2 additions & 0 deletions doc/website/release-notes/iceoryx-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
- Listener examples need to take all samples in the callback [#2251](https://github.com/eclipse-iceoryx/iceoryx/issues/2251)
- 'iox::string' tests can exceed the translation unit compilation timeout [#2278](https://github.com/eclipse-iceoryx/iceoryx/issues/2278)
- Building iceoryx with bazel on Windows is broken [#2320](https://github.com/eclipse-iceoryx/iceoryx/issues/2320)
- Fix wrong memory orders in SpscSoFi [#2177](https://github.com/eclipse-iceoryx/iceoryx/issues/2177)
-

**Refactoring:**

Expand Down
166 changes: 84 additions & 82 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,117 +25,118 @@

#include <cstdint>
#include <cstring>
#include <utility>

namespace iox
{
namespace concurrent
{
/// @brief
/// Thread safe producer and consumer queue with a safe overflowing behavior.
/// SpscSofi is designed in a FIFO Manner but prevents data loss when pushing into
/// a full SpscSofi. When SpscSofi is full and a Sender tries to push, the data at the
/// current read position will be returned. SpscSofi is threadsafe without using
/// locks. When the buffer is filled, new data is written starting at the
/// beginning of the buffer and overwriting the old.The SpscSofi is especially
/// designed to provide fixed capacity storage. When its capacity is exhausted,
/// newly inserted elements will cause elements either at the beginning
/// to be overwritten.The SpscSofi only allocates memory when
/// created , capacity can be is adjusted explicitly.
///
/// @brief Thread safe lock-free single producer and single consumer queue with a safe
/// overflowing behavior
/// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be
/// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into
/// a full SpscSoFi.
/// SpscSoFi is especially designed to provide fixed capacity storage.
/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as
/// many elements as specified with 'CapacityValue' can be removed
/// @param[in] ValueType DataType to be stored, must be trivially copyable
/// @param[in] CapacityValue Capacity of the SpscSofi
template <class ValueType, uint64_t CapacityValue>
class SpscSofi
{
static_assert(std::is_trivially_copyable<ValueType>::value,
"SpscSofi can handle only trivially copyable data types");
"SpscSofi can only handle trivially copyable data types since 'memcpy' is used internally");
/// @brief Check if Atomic integer is lockfree on platform
/// ATOMIC_INT_LOCK_FREE = 2 - is always lockfree
/// ATOMIC_INT_LOCK_FREE = 1 - is sometimes lockfree
/// ATOMIC_INT_LOCK_FREE = 0 - is never lockfree
static_assert(2 <= ATOMIC_INT_LOCK_FREE, "SpscSofi is not able to run lock free on this data type");

/// @brief Internal size needs to be bigger than the size desirred by the user
/// This is because of buffer empty detection and overflow handling
static constexpr uint32_t INTERNAL_SIZE_ADD_ON = 1;

/// @brief This is the resulting internal size on creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_SIZE = CapacityValue + INTERNAL_SIZE_ADD_ON;
// To ensure a consumer gets at least the amount of capacity of data when a queue is full, an additional free
// slot (add-on) is required.
// ========================================================================
// Consider the following scenario when there is no "capacity add-on":
// 1. CapacityValue = 2
// |--A--|--B--|
// ^
// w=2, r=0
// 2. The producer thread pushes a new element
// 3. Increment the read position (this effectively reduces the capacity and is the reason the internal capacity
// needs to be larger;
// |--A--|--B--|
// ^ ^
// w=2 r=1
// 4. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|
// ^
// w=2, r=2
// 5. The consumer tries to pop another value but the queue looks empty as
// write position == read position: the consumer cannot pop
// out CAPACITY amount of samples even though the queue was full
// ========================================================================
// With "capacity add-on"
// 1. CapacityValue = 2, InternalCapacity = 3
// |--A--|--B--|----|
// ^ ^
// r=0 w=2
// 2. The producer threads pushes a new element
// 3. First write the element at index 2 % capacity and increment the write index
// |--A--|--B--|--C--|
// ^
// w=3, r=0,
// 4. Then increment the read position and return the overflowing 'A'
// |-----|--B--|--C--|
// ^ ^
// w=3 r=1
// 5. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|--C--|
// ^ ^
// w=3 r=2
// 6. The consumer thread pops another value
// |--A--|-----|-----|
// ^
// w=3, r=3
// 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to
// pop CapacityValue elements
// ========================================================================
static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1;

/// @brief Internal capacity of the queue at creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON;

public:
/// @brief default constructor which constructs an empty sofi
/// @brief default constructor which constructs an empty SpscSofi
SpscSofi() noexcept = default;

/// @brief pushs an element into SpscSofi. if SpscSofi is full the oldest data will be
/// @brief push an element into SpscSofi. if SpscSofi is full the oldest data will be
/// returned and the pushed element is stored in its place instead.
/// @param[in] valueIn value which should be stored
/// @param[out] valueOut if SpscSofi is overflowing the value of the overridden value
/// @param[in] value_in value which should be stored
/// @param[out] value_out if SpscSofi is overflowing the value of the overridden value
/// is stored here
/// @concurrent restricted thread safe: single pop, single push no
/// push calls from multiple contexts
/// @return return true if push was sucessfull else false.
/// @code
/// (initial situation, SpscSofi is FULL)
/// Start|-----A-------|
/// |-----B-------|
/// |-----C-------|
/// |-----D-------|
///
///
/// (calling push with data ’E’)
/// Start|-----E-------|
/// |-----A-------|
/// |-----B-------|
/// |-----C-------|
/// (’D’ is returned as valueOut)
///
/// ###################################################################
///
/// (if SpscSofi is not FULL , calling push() add new data)
/// Start|-------------|
/// |-------------| ( Initial SpscSofi )
/// (push() Called two times)
///
/// |-------------|
/// (New Data)
/// |-------------|
/// (New Data)
/// @endcode
/// @note restricted thread safe: can only be called from one thread. The authorization to push into the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return return true if push was successful else false.
/// @remarks
/// 1. SpscSofi is empty |-----|-----|
/// 2. push an element |--A--|-----|
/// 3. push an element |--A--|--B--|
/// 5. SpscSofi is full
/// 6. push an element |--C--|--B--| -> value_out is set to 'A'
bool push(const ValueType& valueIn, ValueType& valueOut) noexcept;

/// @brief pop the oldest element
/// @param[out] valueOut storage of the pop'ed value
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @concurrent restricted thread safe: can only be called from one thread. The authorization to pop from the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return false if SpscSofi is empty, otherwise true
bool pop(ValueType& valueOut) noexcept;

/// @brief conditional pop call to provide an alternative for a peek
/// and pop approach. If the verificator returns true the
/// peeked element is returned.
/// @param[out] valueOut storage of the pop'ed value
/// @param[in] verificator callable of type bool(const ValueType& peekValue)
/// which takes the value which would be pop'ed as argument and returns
/// true if it should be pop'ed, otherwise false
/// @code
/// int limit = 7128;
/// mysofi.popIf(value, [=](const ValueType & peek)
/// {
/// return peek < limit; // pop only when peek is smaller than limit
/// }
/// ); // pop's a value only if it is smaller than 9012
/// @endcode
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @return false if SpscSofi is empty or when verificator returns false, otherwise true
template <typename Verificator_T>
bool popIf(ValueType& valueOut, const Verificator_T& verificator) noexcept;

/// @brief returns true if SpscSofi is empty, otherwise false
/// @note the use of this function is limited in the concurrency case. if you
/// call this and in another thread pop is called the result can be out
/// of date as soon as you require it
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
bool empty() const noexcept;

/// @brief resizes SpscSofi
Expand All @@ -150,15 +151,16 @@ class SpscSofi
uint64_t capacity() const noexcept;

/// @brief returns the current size of SpscSofi
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
uint64_t size() const noexcept;

private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_SIZE> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_SIZE;
std::pair<uint64_t, uint64_t> getReadWritePositions() const noexcept;

/// @brief the write/read pointers are "atomic pointers" so that they are not
/// reordered (read or written too late)
private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_CAPACITY> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_CAPACITY;
Atomic<uint64_t> m_readPosition{0};
Atomic<uint64_t> m_writePosition{0};
};
Expand Down
Loading

0 comments on commit 1549c96

Please sign in to comment.