Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor synchronization structure #263

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
3 changes: 2 additions & 1 deletion catkit_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ add_compile_options($<$<CXX_COMPILER_ID:MSVC>:/MP1>)
add_library(catkit_core STATIC
DataStream.cpp
SharedMemory.cpp
Synchronization.cpp
SynchronizationConditionVariable.cpp
SynchronizationSemaphore.cpp
Timing.cpp
Log.cpp
LogConsole.cpp
Expand Down
38 changes: 20 additions & 18 deletions catkit_core/DataStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,27 @@ void DataStream::SubmitFrame(size_t id)
DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer);
meta->m_TimeStamp = GetTimeStamp();

// Obtain a lock as we are about to modify the condition of the
// synchronization.
auto lock = SynchronizationLock(&m_Synchronization);

// Make frame available:
// Use a do-while loop to ensure we are never decrementing the last id.
size_t last_id;
do
{
last_id = m_Header->m_LastId;

if (last_id >= id + 1)
break;
} while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1));
// Obtain a lock as we are about to modify the condition of the
// synchronization.
auto lock = SynchronizationLock(m_Synchronization);

// Make frame available:
// Use a do-while loop to ensure we are never decrementing the last id.
size_t last_id;
do
{
last_id = m_Header->m_LastId;

if (last_id >= id + 1)
break;
} while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1));

m_Synchronization.Signal();
}

m_Synchronization.Signal();
auto ts = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0);

// Don't update the framerate counter for the first frame.
if (id == 0)
Expand All @@ -207,9 +212,6 @@ void DataStream::SubmitFrame(size_t id)
m_Header->m_FrameRateCounter =
m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta)
+ FRAMERATE_DECAY;

auto ts = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0);
}

void DataStream::SubmitData(const void *data)
Expand Down Expand Up @@ -332,7 +334,7 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che

// Wait until frame becomes available.
// Obtain a lock first.
auto lock = SynchronizationLock(&m_Synchronization);
auto lock = SynchronizationLock(m_Synchronization);
m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check);
}

Expand Down
10 changes: 9 additions & 1 deletion catkit_core/DataStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,16 @@

#include "SharedMemory.h"
#include "Synchronization.h"
#include "SynchronizationConditionVariable.h"
#include "SynchronizationSemaphore.h"
#include "Tensor.h"

#ifdef _WIN32
using Synchronization = SynchronizationSemaphore;
#elif defined(__linux__) or defined(__APPLE__)
using Synchronization = SynchronizationConditionVariable;
#endif

const char * const CURRENT_DATASTREAM_VERSION = "0.2";
const size_t MAX_NUM_FRAMES_IN_BUFFER = 20;
const long INFINITE_WAIT_TIME = LONG_MAX;
Expand Down Expand Up @@ -46,7 +54,7 @@ struct DataStreamHeader

double m_FrameRateCounter;

SynchronizationSharedData m_SynchronizationSharedData;
Synchronization::SharedState m_SynchronizationSharedData;
};

class DataFrame : public Tensor
Expand Down
224 changes: 0 additions & 224 deletions catkit_core/Synchronization.cpp

This file was deleted.

Loading
Loading