From d175952da44855cc283e3a2081d404bf4ce9a159 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 22 Dec 2024 12:21:16 -0800 Subject: [PATCH] Trigger synchronization for each (parent) topic. --- catkit_core/MessageBroker.cpp | 41 ++++++++++++++++++++++++++--------- catkit_core/MessageBroker.h | 11 ++++++---- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index 1fc6df6d..89245a03 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -61,11 +61,11 @@ Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, s if (device_id < 0) { - message.m_Payload = m_CpuPayloadMemory->GetAddress() + offset; + message.m_Payload = m_CpuPayloadMemory->GetAddress(offset); } else { - message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress() + offset; + message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress(offset); } // Allocate a message header. @@ -114,9 +114,10 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) return; } - auto topic_header = m_TopicHeaders.Find(message.m_Header->topic); + auto topic = std::string_view(message.m_Header->topic); + auto topic_header = m_TopicHeaders.Find(topic); - if (message.m_Header->partial_frame_id == INVALID_FRAME_ID) + if (message.m_Header->frame_id == INVALID_FRAME_ID) { // First partial frame. Assign a new frame ID. message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed); @@ -128,11 +129,25 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) message.m_Header->partial_frame_id++; } - // Get timestamp. + // Set the timestamp. message.m_Header->producer_timestamp = GetTimeStamp(); + // TODO: put message offsets. + // Go to synchronization structures and signal them. + // This includes parent topics. + for (std::size_t i = 0; i <= topic.size(); ++i) + { + std::size_t size = topic.size() - i; + + if (i == 0 || topic[size] == '/') + { + auto synchronization = GetSynchronization(topic.substr(0, size)); + if (synchronization) + synchronization->Signal(); + } + } if (!is_final) { @@ -152,7 +167,7 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) message.m_HasBeenPublished = is_final; } -FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id) +std::shared_ptr MessageBroker::GetAllocator(int8_t device_id) { if (device_id < -1 || device_id >= MAX_NUM_GPUS) { @@ -161,13 +176,13 @@ FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id) if (device_id == -1) { - return &m_CpuPayloadAllocator; + return m_CpuPayloadAllocator; } - return &m_GpuPayloadAllocator[device_id]; + return m_GpuPayloadAllocator[device_id]; } -Synchronization *MessageBroker::GetSynchronization(const std::string &topic) +std::shared_ptr MessageBroker::GetSynchronization(std::string_view topic) { auto topic_header = m_TopicHeaders.Find(topic); @@ -176,5 +191,11 @@ Synchronization *MessageBroker::GetSynchronization(const std::string &topic) return nullptr; } - // TODO: look up the synchronization structure (not the shared data). + // Look up the synchronization structure (not the shared data). + if (m_Synchronizations.find(topic) == m_Synchronizations.end()) + { + m_Synchronizations[topic] = std::make_shared(topic_header->synchronization); + } + + return m_Synchronizations[topic]; } diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index 25fdf79c..c547c9b8 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -10,6 +10,7 @@ #include "UuidGenerator.h" #include +#include const char * const MESSAGE_BROKER_VERSION = "0.1"; @@ -174,8 +175,8 @@ class MessageBroker : std::enable_shared_from_this Message GetMessage(const std::string &topic, size_t frame_id); private: - FreeListAllocator *GetAllocator(int8_t device_id); - Synchronization *GetSynchronization(const std::string &topic); + std::shared_ptr GetAllocator(int8_t device_id); + std::shared_ptr GetSynchronization(std::string_view topic); MessageBrokerHeader &m_Header; @@ -184,13 +185,15 @@ class MessageBroker : std::enable_shared_from_this MessageHeader *m_MessageHeaders; - FreeListAllocator m_CpuPayloadAllocator; + std::shared_ptr m_CpuPayloadAllocator; std::shared_ptr m_CpuPayloadMemory; - FreeListAllocator m_GpuPayloadAllocator[MAX_NUM_GPUS]; + std::shared_ptr m_GpuPayloadAllocator[MAX_NUM_GPUS]; std::shared_ptr m_GpuPayloadMemory[MAX_NUM_GPUS]; UuidGenerator m_UuidGenerator; + + std::map> m_Synchronizations; }; #endif // MESSAGE_BROKER_H