From f917d4c8280e669aea350013f4f537fbed9f943f Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 3 Nov 2024 12:57:37 -0800 Subject: [PATCH 01/13] Use a reference instead of a pointer. --- catkit_core/Synchronization.cpp | 6 +++--- catkit_core/Synchronization.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp index 8da03b86e..54e6aa2b9 100644 --- a/catkit_core/Synchronization.cpp +++ b/catkit_core/Synchronization.cpp @@ -9,15 +9,15 @@ #include "Timing.h" -SynchronizationLock::SynchronizationLock(Synchronization *sync) +SynchronizationLock::SynchronizationLock(Synchronization &sync) : m_Sync(sync) { - m_Sync->Lock(); + m_Sync.Lock(); } SynchronizationLock::~SynchronizationLock() { - m_Sync->Unlock(); + m_Sync.Unlock(); } Synchronization::Synchronization() diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 45ec8dfa9..2aa1a8b6f 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -30,11 +30,11 @@ struct SynchronizationSharedData class SynchronizationLock { public: - SynchronizationLock(Synchronization *sync); + SynchronizationLock(Synchronization &sync); ~SynchronizationLock(); private: - Synchronization *m_Sync; + Synchronization &m_Sync; }; class Synchronization From 8b6a01e7c102822243493c53237a251aa2435f33 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 17:29:29 -0800 Subject: [PATCH 02/13] Better scope the synchronization lock. It should only be around the tme-critical part. --- catkit_core/DataStream.cpp | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 09d20a822..64c0f1924 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -173,22 +173,27 @@ void DataStream::SubmitFrame(size_t id) DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer); meta->m_TimeStamp = GetTimeStamp(); - // Obtain a lock as we are about to modify the condition of the - // synchronization. - auto lock = SynchronizationLock(&m_Synchronization); - - // Make frame available: - // Use a do-while loop to ensure we are never decrementing the last id. - size_t last_id; - do { - last_id = m_Header->m_LastId; - - if (last_id >= id + 1) - break; - } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + // Obtain a lock as we are about to modify the condition of the + // synchronization. + auto lock = SynchronizationLock(m_Synchronization); + + // Make frame available: + // Use a do-while loop to ensure we are never decrementing the last id. + size_t last_id; + do + { + last_id = m_Header->m_LastId; + + if (last_id >= id + 1) + break; + } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + + m_Synchronization.Signal(); + } - m_Synchronization.Signal(); + auto ts = GetTimeStamp(); + tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); // Don't update the framerate counter for the first frame. if (id == 0) @@ -207,9 +212,6 @@ void DataStream::SubmitFrame(size_t id) m_Header->m_FrameRateCounter = m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta) + FRAMERATE_DECAY; - - auto ts = GetTimeStamp(); - tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); } void DataStream::SubmitData(const void *data) From b07c6445d9784719c9dc9b2fad6ae947160b9340 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:30:32 -0800 Subject: [PATCH 03/13] Use CRTP instead of preprocessor directives for operating systems. This allows us to separate different types of synchronization within one operating system. --- catkit_core/Synchronization.cpp | 224 -------------------------------- catkit_core/Synchronization.h | 74 +++++------ catkit_core/Synchronization.inl | 121 +++++++++++++++++ 3 files changed, 152 insertions(+), 267 deletions(-) delete mode 100644 catkit_core/Synchronization.cpp create mode 100644 catkit_core/Synchronization.inl diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp deleted file mode 100644 index 54e6aa2b9..000000000 --- a/catkit_core/Synchronization.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -#ifndef _WIN32 - #include -#endif - -#include "Timing.h" - -SynchronizationLock::SynchronizationLock(Synchronization &sync) - : m_Sync(sync) -{ - m_Sync.Lock(); -} - -SynchronizationLock::~SynchronizationLock() -{ - m_Sync.Unlock(); -} - -Synchronization::Synchronization() - : m_IsOwner(false), m_SharedData(nullptr) -{ -} - -Synchronization::~Synchronization() -{ - if (m_SharedData) - { -#ifdef _WIN32 - CloseHandle(m_Semaphore); -#else - -#endif - } -} - -void Synchronization::Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create) -{ - if (create) - { - Create(id, shared_data); - } - else - { - Open(id, shared_data); - } -} - -void Synchronization::Create(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while creating semaphore."); - - shared_data->m_NumReadersWaiting = 0; -#else - pthread_mutexattr_t mutex_attr; - pthread_mutexattr_init(&mutex_attr); - pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&(shared_data->m_Mutex), &mutex_attr); - pthread_mutexattr_destroy(&mutex_attr); - - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); -#endif // __APPLE__ - pthread_cond_init(&(shared_data->m_Condition), &cond_attr); - pthread_condattr_destroy(&cond_attr); -#endif // _WIN32 - - m_SharedData = shared_data; -} - -void Synchronization::Open(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while opening semaphore."); -#else -#endif - - m_SharedData = shared_data; -} - -void Synchronization::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ - if (!m_SharedData) - throw std::runtime_error("Wait() was called before the synchronization was intialized."); - -#ifdef _WIN32 - Timer timer; - DWORD res = WAIT_OBJECT_0; - - while (!condition()) - { - if (res == WAIT_OBJECT_0) - { - // Increment the number of readers that are waiting, making sure the counter - // is at least 1 after the increment. This can occur when a previous reader got - // interrupted and the trigger happening before decrementing the - // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) - { - } - } - - // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); - - if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("Waiting time has expired."); - } - - if (res == WAIT_FAILED) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); - } - - if (error_check != nullptr) - { - try - { - error_check(); - } - catch (...) - { - m_SharedData->m_NumReadersWaiting--; - throw; - } - } - } -#else - Timer timer; - - while (!condition()) - { - // Wait for a maximum of 20ms to perform periodic error checking. - long timeout_wait = std::min(20L, timeout_in_ms); - -#ifdef __APPLE__ - // Relative timespec. - timespec timeout; - timeout.tv_sec = timeout_wait / 1000; - timeout.tv_nsec = 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait_relative_np(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#else - // Absolute timespec. - timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += timeout_wait / 1000; - timeout.tv_nsec += 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#endif // __APPLE__ - if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - throw std::runtime_error("Waiting time has expired."); - } - - if (error_check != nullptr) - error_check(); - } -#endif // _WIN32 -} - -void Synchronization::Signal() -{ - if (!m_SharedData) - throw std::runtime_error("Signal() was called before the synchronization was intialized."); - -#ifdef _WIN32 - // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); - - // If a reader times out in between us reading the number of readers that are waiting - // and us releasing the semaphore, we are releasing one too many readers. This - // results in a future reader being released immediately, which is not a problem, - // as there are checks in place for that. - - if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); -#else - pthread_cond_broadcast(&(m_SharedData->m_Condition)); -#endif // _WIN32 -} - -void Synchronization::Lock() -{ -#ifndef _WIN32 - pthread_mutex_lock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} - -void Synchronization::Unlock() -{ -#ifndef _WIN32 - pthread_mutex_unlock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 2aa1a8b6f..a00fce299 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -6,47 +6,24 @@ #include #include -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#else - #include - #include -#endif // _WIN32 - -class Synchronization; - -struct SynchronizationSharedData -{ -#ifdef _WIN32 - std::atomic_long m_NumReadersWaiting; -#else - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -#endif -}; - -class SynchronizationLock +template +class SynchronizationBase { public: - SynchronizationLock(Synchronization &sync); - ~SynchronizationLock(); + using SharedState = SharedStateType; -private: - Synchronization &m_Sync; -}; + SynchronizationBase(); + SynchronizationBase(const SynchronizationBase &other) = delete; + ~SynchronizationBase(); -class Synchronization -{ -public: - Synchronization(); - Synchronization(const Synchronization &other) = delete; - ~Synchronization(); + SynchronizationBase &operator=(const SynchronizationBase &other) = delete; + + void Initialize(const std::string &id, SharedState *shared_state, bool create); - Synchronization &operator=(const Synchronization &other) = delete; + void Create(const std::string &id, SharedState *shared_state); - void Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create); + void Open(const std::string &id, SharedState *shared_state); + void Close(); void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); @@ -54,17 +31,28 @@ class Synchronization void Lock(); void Unlock(); -private: - void Create(const std::string &id, SynchronizationSharedData *shared_data); - void Open(const std::string &id, SynchronizationSharedData *shared_data); +protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + + void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); bool m_IsOwner; - SynchronizationSharedData *m_SharedData; - std::string m_Id; + bool m_IsOpen; + SharedState *m_SharedState; +}; + +template +class SynchronizationLock +{ +public: + SynchronizationLock(T &sync); + ~SynchronizationLock(); -#ifdef _WIN32 - HANDLE m_Semaphore; -#endif +private: + T &m_Sync; }; +#include "Synchronization.inl" + #endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl new file mode 100644 index 000000000..be6f89571 --- /dev/null +++ b/catkit_core/Synchronization.inl @@ -0,0 +1,121 @@ +#include "Synchronization.h" + +template +SynchronizationLock::SynchronizationLock(T &sync) + : m_Sync(sync) +{ + m_Sync.Lock(); +} + +template +SynchronizationLock::~SynchronizationLock() +{ + m_Sync.Unlock(); +} + +template +SynchronizationBase::SynchronizationBase() + : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) +{ +} + +template +SynchronizationBase::~SynchronizationBase() +{ + Close(); +} + +template +void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) +{ + if (create) + { + Create(id, shared_state); + } + else + { + Open(id, shared_state); + } +} + +template +void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Create called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = true; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Open called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = false; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Close() +{ + if (!m_IsOpen) + return; + + static_cast(this)->CloseImpl(); + + m_IsOpen = false; + m_SharedState = nullptr; +} + +template +void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ +} + +template +void SynchronizationBase::Signal() +{ +} + +template +void SynchronizationBase::Lock() +{ +} + +template +void SynchronizationBase::Unlock() +{ +} + +template +void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::CloseImpl() +{ +} From 7a4a07065981837d30c1f6944aa452c2f21d5842 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:01 -0800 Subject: [PATCH 04/13] Add Windows semaphore synchronization. --- catkit_core/SynchronizationSemaphore.cpp | 95 ++++++++++++++++++++++++ catkit_core/SynchronizationSemaphore.h | 43 +++++++++++ 2 files changed, 138 insertions(+) create mode 100644 catkit_core/SynchronizationSemaphore.cpp create mode 100644 catkit_core/SynchronizationSemaphore.h diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp new file mode 100644 index 000000000..459821bae --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -0,0 +1,95 @@ +#include "SynchronizationSemaphore.h" + +#ifdef _WIN32 +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +{ + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedData->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedData->m_NumReadersWaiting--; + throw; + } + } + } +} + +void SynchronizationSemaphore::Signal() +{ + // Notify waiting processes. + long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); +} + +void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_data->m_NumReadersWaiting = 0; +} + +void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +} + +void SynchronizationSempahore::CloseImpl() +{ + CloseHandle(m_Semaphore); +} + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +#endif diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h new file mode 100644 index 000000000..c2ad265d2 --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.h @@ -0,0 +1,43 @@ +#ifndef SYNCHRONIZATION_SEMAPHORE_H +#define SYNCHRONIZATION_SEMAPHORE_H + +#include "Synchronization.h" + +#include + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif + +#ifdef _WIN32 +struct SharedDataWindowsSemaphore +{ + std::atomic_long m_NumReadersWaiting; +}; + +class SynchronizationWindowsSemaphore : public SynchronizationBase +{ +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void Create(const std::string &id, SharedState *shared_state); + void Open(const std::string &id, SharedState *shared_state); + + HANDLE m_Semaphore; +}; +#endif // _WIN32 + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +# endif // SYNCHRONIZATION_SEMAPHORE_H From fd1b739d5e5a8ab104dfbfa82e412d13c34e64f8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:18 -0800 Subject: [PATCH 05/13] Add Posix condition variable synchronization. --- .../SynchronizationConditionVariable.cpp | 75 +++++++++++++++++++ .../SynchronizationConditionVariable.h | 32 ++++++++ 2 files changed, 107 insertions(+) create mode 100644 catkit_core/SynchronizationConditionVariable.cpp create mode 100644 catkit_core/SynchronizationConditionVariable.h diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/SynchronizationConditionVariable.cpp new file mode 100644 index 000000000..8bc97eaff --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.cpp @@ -0,0 +1,75 @@ +#include "SynchronizationConditionVariable.h" + +#include "Timing.h" + +#if defined(__linux__) || defined(__APPLE__) + +void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +} + +void SynchronizationConditionVariable::Signal() +{ + pthread_cond_broadcast(&(m_SharedState->m_Condition)); +} + +void SynchronizationConditionVariable::Lock() +{ + pthread_mutex_lock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::Unlock() +{ + pthread_mutex_unlock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +{ + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_state->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +} + +#endif diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h new file mode 100644 index 000000000..f2cabf908 --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.h @@ -0,0 +1,32 @@ +#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H +#define SYNCHRONIZATION_CONDITION_VARIABLE_H + +#include "Synchronization.h" + +#if defined(__linux__) || defined(__APPLE__) + +#include + +struct SharedStateConditionVariable +{ + pthread_cond_t m_Condition; + pthread_mutex_t m_Mutex; +}; + +class SynchronizationConditionVariable : public SynchronizationBase +{ + friend SynchronizationBase; +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); +}; + +#endif // Linux or Apple + +#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H From 6de99c7a0a4480323dae68f4283267a41ef15cc1 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:32:45 -0800 Subject: [PATCH 06/13] Set default Synchronization method for DataStreams. --- catkit_core/DataStream.cpp | 2 +- catkit_core/DataStream.h | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 64c0f1924..b80754b6f 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -334,7 +334,7 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(&m_Synchronization); + auto lock = SynchronizationLock(m_Synchronization); m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 322a2ea2d..440f26000 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -8,8 +8,16 @@ #include "SharedMemory.h" #include "Synchronization.h" +#include "SynchronizationConditionVariable.h" +#include "SynchronizationSemaphore.h" #include "Tensor.h" +#ifdef _WIN32 +using Synchronization = SynchronizationSemaphore; +#elif defined(__linux__) or defined(__APPLE__) +using Synchronization = SynchronizationConditionVariable; +#endif + const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -46,7 +54,7 @@ struct DataStreamHeader double m_FrameRateCounter; - SynchronizationSharedData m_SynchronizationSharedData; + Synchronization::SharedState m_SynchronizationSharedData; }; class DataFrame : public Tensor From 74be966aed44c6044deaf5e5f5d1285a35c8c558 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:33:19 -0800 Subject: [PATCH 07/13] Compile synchronization methods. --- catkit_core/CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index af141bddb..42798a69f 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,7 +14,8 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - Synchronization.cpp + SynchronizationConditionVariable.cpp + SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp From 0614ed2df3fa20d9165c9ba811fac822f37dbdd8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:48:33 -0800 Subject: [PATCH 08/13] Fix variable names. --- catkit_core/SynchronizationSemaphore.cpp | 10 +++++----- catkit_core/SynchronizationSemaphore.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 459821bae..68fdcab3d 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -14,7 +14,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co // is at least 1 after the increment. This can occur when a previous reader got // interrupted and the trigger happening before decrementing the // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) + while (m_SharedState->m_NumReadersWaiting++ < 0) { } } @@ -24,13 +24,13 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("Waiting time has expired."); } if (res == WAIT_FAILED) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); } @@ -42,7 +42,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co } catch (...) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw; } } @@ -52,7 +52,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co void SynchronizationSemaphore::Signal() { // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); // If a reader times out in between us reading the number of readers that are waiting // and us releasing the semaphore, we are releasing one too many readers. This diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index c2ad265d2..0b6881dac 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -12,12 +12,12 @@ #endif #ifdef _WIN32 -struct SharedDataWindowsSemaphore +struct SharedStateSemaphore { std::atomic_long m_NumReadersWaiting; }; -class SynchronizationWindowsSemaphore : public SynchronizationBase +class SynchronizationSemaphore : public SynchronizationBase { public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); From a26733c6a87ae92d6404b02281e166413ec41317 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:50:51 -0800 Subject: [PATCH 09/13] Add missing includes. --- catkit_core/Synchronization.inl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl index be6f89571..1f842e62c 100644 --- a/catkit_core/Synchronization.inl +++ b/catkit_core/Synchronization.inl @@ -1,5 +1,8 @@ #include "Synchronization.h" +#include +#include + template SynchronizationLock::SynchronizationLock(T &sync) : m_Sync(sync) From b71806c7cf8b19321845604e9d8020f418238648 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 00:00:03 -0800 Subject: [PATCH 10/13] Fix the function pointer type. --- catkit_core/SynchronizationSemaphore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 68fdcab3d..563d44af2 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,7 +1,7 @@ #include "SynchronizationSemaphore.h" #ifdef _WIN32 -void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; From 8d0462b77e377f418fb9547e91430435539b16b3 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 10:41:50 -0800 Subject: [PATCH 11/13] Add missing include. --- catkit_core/SynchronizationSemaphore.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 563d44af2..63a986ffb 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,4 +1,5 @@ #include "SynchronizationSemaphore.h" +#include "Timing.h" #ifdef _WIN32 void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) From 56431e5eb6031b5283df55146a9c578a091e701d Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 11:37:41 -0800 Subject: [PATCH 12/13] Fix function names. --- catkit_core/SynchronizationSemaphore.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 0b6881dac..778007bad 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -27,8 +27,8 @@ class SynchronizationSemaphore : public SynchronizationBase Date: Wed, 6 Nov 2024 12:11:00 -0800 Subject: [PATCH 13/13] Add friend classes and fix class name typos. --- catkit_core/SynchronizationConditionVariable.h | 1 + catkit_core/SynchronizationSemaphore.cpp | 4 ++-- catkit_core/SynchronizationSemaphore.h | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h index f2cabf908..2027215c0 100644 --- a/catkit_core/SynchronizationConditionVariable.h +++ b/catkit_core/SynchronizationConditionVariable.h @@ -16,6 +16,7 @@ struct SharedStateConditionVariable class SynchronizationConditionVariable : public SynchronizationBase { friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 63a986ffb..b381011ff 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -71,7 +71,7 @@ void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *sh if (m_Semaphore == NULL) throw std::runtime_error("Something went wrong while creating semaphore."); - shared_data->m_NumReadersWaiting = 0; + shared_state->m_NumReadersWaiting = 0; } void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) @@ -82,7 +82,7 @@ void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shar throw std::runtime_error("Something went wrong while opening semaphore."); } -void SynchronizationSempahore::CloseImpl() +void SynchronizationSemaphore::CloseImpl() { CloseHandle(m_Semaphore); } diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 778007bad..46cd2f923 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -19,16 +19,16 @@ struct SharedStateSemaphore class SynchronizationSemaphore : public SynchronizationBase { + friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); - void Lock(); - void Unlock(); - protected: void CreateImpl(const std::string &id, SharedState *shared_state); void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); HANDLE m_Semaphore; };