Skip to content

Commit

Permalink
Trigger synchronization for each (parent) topic.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehpor committed Dec 23, 2024
1 parent e8f6086 commit d175952
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
41 changes: 31 additions & 10 deletions catkit_core/MessageBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, s

if (device_id < 0)
{
message.m_Payload = m_CpuPayloadMemory->GetAddress() + offset;
message.m_Payload = m_CpuPayloadMemory->GetAddress(offset);
}
else
{
message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress() + offset;
message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress(offset);
}

// Allocate a message header.
Expand Down Expand Up @@ -114,9 +114,10 @@ void MessageBroker::PublishMessage(Message &message, bool is_final)
return;
}

auto topic_header = m_TopicHeaders.Find(message.m_Header->topic);
auto topic = std::string_view(message.m_Header->topic);
auto topic_header = m_TopicHeaders.Find(topic);

if (message.m_Header->partial_frame_id == INVALID_FRAME_ID)
if (message.m_Header->frame_id == INVALID_FRAME_ID)
{
// First partial frame. Assign a new frame ID.
message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed);
Expand All @@ -128,11 +129,25 @@ void MessageBroker::PublishMessage(Message &message, bool is_final)
message.m_Header->partial_frame_id++;
}

// Get timestamp.
// Set the timestamp.
message.m_Header->producer_timestamp = GetTimeStamp();

// TODO: put message offsets.

// Go to synchronization structures and signal them.
// This includes parent topics.
for (std::size_t i = 0; i <= topic.size(); ++i)
{
std::size_t size = topic.size() - i;

if (i == 0 || topic[size] == '/')
{
auto synchronization = GetSynchronization(topic.substr(0, size));

if (synchronization)
synchronization->Signal();
}
}

if (!is_final)
{
Expand All @@ -152,7 +167,7 @@ void MessageBroker::PublishMessage(Message &message, bool is_final)
message.m_HasBeenPublished = is_final;
}

FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id)
std::shared_ptr<FreeListAllocator> MessageBroker::GetAllocator(int8_t device_id)
{
if (device_id < -1 || device_id >= MAX_NUM_GPUS)
{
Expand All @@ -161,13 +176,13 @@ FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id)

if (device_id == -1)
{
return &m_CpuPayloadAllocator;
return m_CpuPayloadAllocator;
}

return &m_GpuPayloadAllocator[device_id];
return m_GpuPayloadAllocator[device_id];
}

Synchronization *MessageBroker::GetSynchronization(const std::string &topic)
std::shared_ptr<Synchronization> MessageBroker::GetSynchronization(std::string_view topic)
{
auto topic_header = m_TopicHeaders.Find(topic);

Expand All @@ -176,5 +191,11 @@ Synchronization *MessageBroker::GetSynchronization(const std::string &topic)
return nullptr;
}

// TODO: look up the synchronization structure (not the shared data).
// Look up the synchronization structure (not the shared data).
if (m_Synchronizations.find(topic) == m_Synchronizations.end())
{
m_Synchronizations[topic] = std::make_shared<Synchronization>(topic_header->synchronization);
}

return m_Synchronizations[topic];
}
11 changes: 7 additions & 4 deletions catkit_core/MessageBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "UuidGenerator.h"

#include <memory>
#include <map>

const char * const MESSAGE_BROKER_VERSION = "0.1";

Expand Down Expand Up @@ -174,8 +175,8 @@ class MessageBroker : std::enable_shared_from_this<MessageBroker>
Message GetMessage(const std::string &topic, size_t frame_id);

private:
FreeListAllocator *GetAllocator(int8_t device_id);
Synchronization *GetSynchronization(const std::string &topic);
std::shared_ptr<FreeListAllocator> GetAllocator(int8_t device_id);
std::shared_ptr<Synchronization> GetSynchronization(std::string_view topic);

MessageBrokerHeader &m_Header;

Expand All @@ -184,13 +185,15 @@ class MessageBroker : std::enable_shared_from_this<MessageBroker>

MessageHeader *m_MessageHeaders;

FreeListAllocator m_CpuPayloadAllocator;
std::shared_ptr<FreeListAllocator> m_CpuPayloadAllocator;
std::shared_ptr<SharedMemory> m_CpuPayloadMemory;

FreeListAllocator m_GpuPayloadAllocator[MAX_NUM_GPUS];
std::shared_ptr<FreeListAllocator> m_GpuPayloadAllocator[MAX_NUM_GPUS];
std::shared_ptr<CudaSharedMemory> m_GpuPayloadMemory[MAX_NUM_GPUS];

UuidGenerator m_UuidGenerator;

std::map<std::string_view, std::shared_ptr<Synchronization>> m_Synchronizations;
};

#endif // MESSAGE_BROKER_H

0 comments on commit d175952

Please sign in to comment.