diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index af141bddb..42798a69f 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,7 +14,8 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - Synchronization.cpp + SynchronizationConditionVariable.cpp + SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 09d20a822..b80754b6f 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -173,22 +173,27 @@ void DataStream::SubmitFrame(size_t id) DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer); meta->m_TimeStamp = GetTimeStamp(); - // Obtain a lock as we are about to modify the condition of the - // synchronization. - auto lock = SynchronizationLock(&m_Synchronization); - - // Make frame available: - // Use a do-while loop to ensure we are never decrementing the last id. - size_t last_id; - do { - last_id = m_Header->m_LastId; - - if (last_id >= id + 1) - break; - } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + // Obtain a lock as we are about to modify the condition of the + // synchronization. + auto lock = SynchronizationLock(m_Synchronization); + + // Make frame available: + // Use a do-while loop to ensure we are never decrementing the last id. + size_t last_id; + do + { + last_id = m_Header->m_LastId; + + if (last_id >= id + 1) + break; + } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + + m_Synchronization.Signal(); + } - m_Synchronization.Signal(); + auto ts = GetTimeStamp(); + tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); // Don't update the framerate counter for the first frame. if (id == 0) @@ -207,9 +212,6 @@ void DataStream::SubmitFrame(size_t id) m_Header->m_FrameRateCounter = m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta) + FRAMERATE_DECAY; - - auto ts = GetTimeStamp(); - tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); } void DataStream::SubmitData(const void *data) @@ -332,7 +334,7 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(&m_Synchronization); + auto lock = SynchronizationLock(m_Synchronization); m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 322a2ea2d..440f26000 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -8,8 +8,16 @@ #include "SharedMemory.h" #include "Synchronization.h" +#include "SynchronizationConditionVariable.h" +#include "SynchronizationSemaphore.h" #include "Tensor.h" +#ifdef _WIN32 +using Synchronization = SynchronizationSemaphore; +#elif defined(__linux__) or defined(__APPLE__) +using Synchronization = SynchronizationConditionVariable; +#endif + const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -46,7 +54,7 @@ struct DataStreamHeader double m_FrameRateCounter; - SynchronizationSharedData m_SynchronizationSharedData; + Synchronization::SharedState m_SynchronizationSharedData; }; class DataFrame : public Tensor diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp deleted file mode 100644 index 8da03b86e..000000000 --- a/catkit_core/Synchronization.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -#ifndef _WIN32 - #include -#endif - -#include "Timing.h" - -SynchronizationLock::SynchronizationLock(Synchronization *sync) - : m_Sync(sync) -{ - m_Sync->Lock(); -} - -SynchronizationLock::~SynchronizationLock() -{ - m_Sync->Unlock(); -} - -Synchronization::Synchronization() - : m_IsOwner(false), m_SharedData(nullptr) -{ -} - -Synchronization::~Synchronization() -{ - if (m_SharedData) - { -#ifdef _WIN32 - CloseHandle(m_Semaphore); -#else - -#endif - } -} - -void Synchronization::Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create) -{ - if (create) - { - Create(id, shared_data); - } - else - { - Open(id, shared_data); - } -} - -void Synchronization::Create(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while creating semaphore."); - - shared_data->m_NumReadersWaiting = 0; -#else - pthread_mutexattr_t mutex_attr; - pthread_mutexattr_init(&mutex_attr); - pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&(shared_data->m_Mutex), &mutex_attr); - pthread_mutexattr_destroy(&mutex_attr); - - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); -#endif // __APPLE__ - pthread_cond_init(&(shared_data->m_Condition), &cond_attr); - pthread_condattr_destroy(&cond_attr); -#endif // _WIN32 - - m_SharedData = shared_data; -} - -void Synchronization::Open(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while opening semaphore."); -#else -#endif - - m_SharedData = shared_data; -} - -void Synchronization::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ - if (!m_SharedData) - throw std::runtime_error("Wait() was called before the synchronization was intialized."); - -#ifdef _WIN32 - Timer timer; - DWORD res = WAIT_OBJECT_0; - - while (!condition()) - { - if (res == WAIT_OBJECT_0) - { - // Increment the number of readers that are waiting, making sure the counter - // is at least 1 after the increment. This can occur when a previous reader got - // interrupted and the trigger happening before decrementing the - // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) - { - } - } - - // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); - - if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("Waiting time has expired."); - } - - if (res == WAIT_FAILED) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); - } - - if (error_check != nullptr) - { - try - { - error_check(); - } - catch (...) - { - m_SharedData->m_NumReadersWaiting--; - throw; - } - } - } -#else - Timer timer; - - while (!condition()) - { - // Wait for a maximum of 20ms to perform periodic error checking. - long timeout_wait = std::min(20L, timeout_in_ms); - -#ifdef __APPLE__ - // Relative timespec. - timespec timeout; - timeout.tv_sec = timeout_wait / 1000; - timeout.tv_nsec = 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait_relative_np(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#else - // Absolute timespec. - timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += timeout_wait / 1000; - timeout.tv_nsec += 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#endif // __APPLE__ - if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - throw std::runtime_error("Waiting time has expired."); - } - - if (error_check != nullptr) - error_check(); - } -#endif // _WIN32 -} - -void Synchronization::Signal() -{ - if (!m_SharedData) - throw std::runtime_error("Signal() was called before the synchronization was intialized."); - -#ifdef _WIN32 - // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); - - // If a reader times out in between us reading the number of readers that are waiting - // and us releasing the semaphore, we are releasing one too many readers. This - // results in a future reader being released immediately, which is not a problem, - // as there are checks in place for that. - - if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); -#else - pthread_cond_broadcast(&(m_SharedData->m_Condition)); -#endif // _WIN32 -} - -void Synchronization::Lock() -{ -#ifndef _WIN32 - pthread_mutex_lock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} - -void Synchronization::Unlock() -{ -#ifndef _WIN32 - pthread_mutex_unlock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 45ec8dfa9..a00fce299 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -6,47 +6,24 @@ #include #include -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#else - #include - #include -#endif // _WIN32 - -class Synchronization; - -struct SynchronizationSharedData -{ -#ifdef _WIN32 - std::atomic_long m_NumReadersWaiting; -#else - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -#endif -}; - -class SynchronizationLock +template +class SynchronizationBase { public: - SynchronizationLock(Synchronization *sync); - ~SynchronizationLock(); + using SharedState = SharedStateType; -private: - Synchronization *m_Sync; -}; + SynchronizationBase(); + SynchronizationBase(const SynchronizationBase &other) = delete; + ~SynchronizationBase(); -class Synchronization -{ -public: - Synchronization(); - Synchronization(const Synchronization &other) = delete; - ~Synchronization(); + SynchronizationBase &operator=(const SynchronizationBase &other) = delete; + + void Initialize(const std::string &id, SharedState *shared_state, bool create); - Synchronization &operator=(const Synchronization &other) = delete; + void Create(const std::string &id, SharedState *shared_state); - void Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create); + void Open(const std::string &id, SharedState *shared_state); + void Close(); void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); @@ -54,17 +31,28 @@ class Synchronization void Lock(); void Unlock(); -private: - void Create(const std::string &id, SynchronizationSharedData *shared_data); - void Open(const std::string &id, SynchronizationSharedData *shared_data); +protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + + void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); bool m_IsOwner; - SynchronizationSharedData *m_SharedData; - std::string m_Id; + bool m_IsOpen; + SharedState *m_SharedState; +}; + +template +class SynchronizationLock +{ +public: + SynchronizationLock(T &sync); + ~SynchronizationLock(); -#ifdef _WIN32 - HANDLE m_Semaphore; -#endif +private: + T &m_Sync; }; +#include "Synchronization.inl" + #endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl new file mode 100644 index 000000000..1f842e62c --- /dev/null +++ b/catkit_core/Synchronization.inl @@ -0,0 +1,124 @@ +#include "Synchronization.h" + +#include +#include + +template +SynchronizationLock::SynchronizationLock(T &sync) + : m_Sync(sync) +{ + m_Sync.Lock(); +} + +template +SynchronizationLock::~SynchronizationLock() +{ + m_Sync.Unlock(); +} + +template +SynchronizationBase::SynchronizationBase() + : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) +{ +} + +template +SynchronizationBase::~SynchronizationBase() +{ + Close(); +} + +template +void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) +{ + if (create) + { + Create(id, shared_state); + } + else + { + Open(id, shared_state); + } +} + +template +void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Create called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = true; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Open called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = false; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Close() +{ + if (!m_IsOpen) + return; + + static_cast(this)->CloseImpl(); + + m_IsOpen = false; + m_SharedState = nullptr; +} + +template +void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ +} + +template +void SynchronizationBase::Signal() +{ +} + +template +void SynchronizationBase::Lock() +{ +} + +template +void SynchronizationBase::Unlock() +{ +} + +template +void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::CloseImpl() +{ +} diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/SynchronizationConditionVariable.cpp new file mode 100644 index 000000000..8bc97eaff --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.cpp @@ -0,0 +1,75 @@ +#include "SynchronizationConditionVariable.h" + +#include "Timing.h" + +#if defined(__linux__) || defined(__APPLE__) + +void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +} + +void SynchronizationConditionVariable::Signal() +{ + pthread_cond_broadcast(&(m_SharedState->m_Condition)); +} + +void SynchronizationConditionVariable::Lock() +{ + pthread_mutex_lock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::Unlock() +{ + pthread_mutex_unlock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +{ + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_state->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +} + +#endif diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h new file mode 100644 index 000000000..2027215c0 --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.h @@ -0,0 +1,33 @@ +#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H +#define SYNCHRONIZATION_CONDITION_VARIABLE_H + +#include "Synchronization.h" + +#if defined(__linux__) || defined(__APPLE__) + +#include + +struct SharedStateConditionVariable +{ + pthread_cond_t m_Condition; + pthread_mutex_t m_Mutex; +}; + +class SynchronizationConditionVariable : public SynchronizationBase +{ + friend SynchronizationBase; + +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); +}; + +#endif // Linux or Apple + +#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp new file mode 100644 index 000000000..b381011ff --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -0,0 +1,96 @@ +#include "SynchronizationSemaphore.h" +#include "Timing.h" + +#ifdef _WIN32 +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedState->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedState->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedState->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedState->m_NumReadersWaiting--; + throw; + } + } + } +} + +void SynchronizationSemaphore::Signal() +{ + // Notify waiting processes. + long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); +} + +void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_state->m_NumReadersWaiting = 0; +} + +void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +} + +void SynchronizationSemaphore::CloseImpl() +{ + CloseHandle(m_Semaphore); +} + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +#endif diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h new file mode 100644 index 000000000..46cd2f923 --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.h @@ -0,0 +1,43 @@ +#ifndef SYNCHRONIZATION_SEMAPHORE_H +#define SYNCHRONIZATION_SEMAPHORE_H + +#include "Synchronization.h" + +#include + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif + +#ifdef _WIN32 +struct SharedStateSemaphore +{ + std::atomic_long m_NumReadersWaiting; +}; + +class SynchronizationSemaphore : public SynchronizationBase +{ + friend SynchronizationBase; + +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + +protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); + + HANDLE m_Semaphore; +}; +#endif // _WIN32 + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +# endif // SYNCHRONIZATION_SEMAPHORE_H