Skip to content

Commit

Permalink
Add ackNotify usage, to keep message flow consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
ben committed Aug 25, 2024
1 parent 82f8519 commit 8bc4e7d
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 88 deletions.
3 changes: 1 addition & 2 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct ConnectionsInfo

int main(int argc, char *argv[])
{
const std::string shared_memory_name{"/_shmem4"};
const std::string shared_memory_name{"/_shmem10"};
bool isStopRequested{false}, connectionConfirmed{false};
std::unique_ptr<ProcCommunicator> slave = std::make_unique<ProcCommunicator>(false, true, shared_memory_name);
am::configuration::Configuration default_conf{75, 10, 1, 50, 5, 10.0};
Expand All @@ -107,7 +107,6 @@ int main(int argc, char *argv[])
// confirm master connection by Handshake message, and then set configuration
// such order defines expected message flow
ConnectionsInfo connections;
// confirm handshake, entry point

bool isRunning = true;
while (isRunning)
Expand Down
7 changes: 4 additions & 3 deletions sh_mem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_CXX_EXTENSIONS OFF) # Turn off compiler-specific extensions

ADD_LIBRARY( shared_mem STATIC
ProcCommunicator.cpp SharedMemoryReceiver.cpp SharedMemorySender.cpp )
ADD_LIBRARY(shared_mem STATIC
ProcCommunicator.cpp SharedMemoryReceiver.cpp SharedMemorySender.cpp )

add_executable(sh_mem_test ProcCommunicator.cpp SharedMemoryReceiver.cpp SharedMemorySender.cpp main.cpp)
add_executable(sh_mem_test main.cpp)
target_link_libraries(sh_mem_test shared_mem)
Empty file modified sh_mem/Message.hpp
100644 → 100755
Empty file.
94 changes: 47 additions & 47 deletions sh_mem/ProcCommunicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ static constexpr int SEMAPHORE_ENABLED = 1;

ProcCommunicator::ProcCommunicator(const bool isMasterMode,
const bool isMultipleMasters,
const std::string &shMemName) : m_master_mode(isMasterMode),
m_multiple_master(isMultipleMasters)/*,
m_master_received((sem_t *)-1),
m_slave_received((sem_t *)-1),
m_master_sent((sem_t *)-1),
m_slave_sent((sem_t *)-1)*/
const std::string &shMemName) : m_master_mode(isMasterMode)
{
const std::string master_mem_name = shMemName + "_master";
const std::string slave_mem_name = shMemName + "_slave";
Expand All @@ -29,9 +24,7 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
m_slave_received = sem_open((shMemName + "_s_rsem").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_master_sent = sem_open((shMemName + "_m_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_slave_sent = sem_open((shMemName + "_s_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);

if (isMultipleMasters)
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_RDWR, 0666, SEMAPHORE_ENABLED);
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_RDWR, 0666, SEMAPHORE_ENABLED);
}
else
{
Expand All @@ -42,20 +35,15 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
m_slave_received = sem_open((shMemName + "_s_rsem").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);
m_master_sent = sem_open((shMemName + "_m_sent").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);
m_slave_sent = sem_open((shMemName + "_s_sent").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);

if (isMultipleMasters)
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_CREAT, 0666, SEMAPHORE_ENABLED);
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_CREAT, 0666, SEMAPHORE_ENABLED);
}
if (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED || m_slave_ready == SEM_FAILED)
{
perror("ProcCommunicator sem_open failure.");
exit(1);
}
if(isMultipleMasters && m_slave_ready==SEM_FAILED){
perror("ProcCommunicator MultiMaster mode sem_open failure.");
exit(1);
}

#else
m_sender = std::make_unique<SharedMemorySender>(slave_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(master_mem_name.c_str());
Expand All @@ -74,14 +62,11 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
if (!(m_slave_sent = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_sent").c_str())))
m_slave_sent = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_sent").c_str());

if (isMultipleMasters)
{
if (!(m_slave_ready = CreateSemaphoreW(NULL, SEMAPHORE_ENABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_ready = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, SEMAPHORE_ENABLED, (wshMemName + L"_s_ready").c_str());
}
if (!(m_slave_ready = CreateSemaphoreW(NULL, SEMAPHORE_ENABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_ready = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, SEMAPHORE_ENABLED, (wshMemName + L"_s_ready").c_str());

if (m_master_received == NULL || m_slave_received == NULL ||
m_master_sent == NULL || m_slave_sent == NULL || m_slave_ready == NULL )
m_master_sent == NULL || m_slave_sent == NULL || m_slave_ready == NULL)
{
perror("ProcCommunicator sem_open failure.");
exit(1);
Expand All @@ -93,12 +78,12 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
ProcCommunicator::~ProcCommunicator()
{
#ifndef _WIN32
if ( sem_close(m_master_received) == -1)
if (sem_close(m_master_received) == -1)
{
perror("Failed to destroy m_master_received semaphore");
}

if ( sem_close(m_slave_received) == -1)
if (sem_close(m_slave_received) == -1)
{
perror("Failed to destroy m_slave_received semaphore");
}
Expand All @@ -113,7 +98,7 @@ ProcCommunicator::~ProcCommunicator()
perror("Failed to destroy m_slave_sent semaphore");
}

if (m_multiple_master && sem_close(m_slave_ready) == -1)
if (sem_close(m_slave_ready) == -1)
{
perror("Failed to destroy m_slave_ready semaphore");
}
Expand All @@ -123,8 +108,6 @@ ProcCommunicator::~ProcCommunicator()
{
perror("Failed to destroy m_master_received semaphore");
}


if (m_slave_received && !CloseHandle(m_slave_received))
{
perror("Failed to destroy m_slave_received semaphore");
Expand All @@ -137,24 +120,22 @@ ProcCommunicator::~ProcCommunicator()
{
perror("Failed to destroy m_slave_sent semaphore");
}
if (m_multiple_master && m_slave_ready && !CloseHandle(m_slave_ready))
if (m_slave_ready && !CloseHandle(m_slave_ready))
{
perror("Failed to destroy m_slave_ready semaphore");
}
#endif
}
#ifndef _WIN32

void ProcCommunicator::send(const Message *msg)
{
if (m_multiple_master && m_master_mode)
if (m_master_mode)
sem_wait(m_slave_ready);

m_sender->sendMessage(msg);
sem_post(m_master_mode ? m_master_sent : m_slave_sent);
sem_wait(m_master_mode ? m_slave_received : m_master_received);

if (m_multiple_master && !m_master_mode)
sem_post(m_slave_ready);
}

Message *ProcCommunicator::receive()
Expand All @@ -165,10 +146,22 @@ Message *ProcCommunicator::receive()

return response;
}

void ProcCommunicator::ackNotify()
{
if (m_master_mode)
sem_post(m_slave_ready);
else
{
std::cerr << "ProcCommunicator::ackNotify must be sent only from master, when response processed.\n";
}
}

#else
void ProcCommunicator::send(const Message *msg)
{
if (m_multiple_master && m_master_mode) {
if (m_master_mode)
{
WaitForSingleObject(m_slave_ready, INFINITE); // INFINITE timeout to wait indefinitely
}
// Send the message
Expand All @@ -177,31 +170,38 @@ void ProcCommunicator::send(const Message *msg)
ReleaseSemaphore(m_master_mode ? m_master_sent : m_slave_sent, 1, NULL);
// Wait for the semaphore to be signaled
WaitForSingleObject(m_master_mode ? m_slave_received : m_master_received, INFINITE);
// Post (release) the semaphore if needed
if (m_multiple_master && !m_master_mode) {
ReleaseSemaphore(m_slave_ready, 1, NULL);
}
}
}

Message *ProcCommunicator::receive()
{
//std::cout << "ProcCommunicator::receive.\n";
DWORD waitResult = WaitForSingleObject(m_master_mode ? m_slave_sent : m_master_sent, INFINITE);
if (waitResult != WAIT_OBJECT_0) {
std::cout << "ProcCommunicator::receive FAIL\n";

if (waitResult != WAIT_OBJECT_0)
{
std::cerr << "ProcCommunicator::receive WaitForSingleObject FAIL\n";
return nullptr;
}
// Receive the message

Message *response = m_receiver->receiveMessage();
// Post (release) the semaphore

BOOL releaseResult = ReleaseSemaphore(m_master_mode ? m_master_received : m_slave_received, 1, NULL);
if (!releaseResult) {
// Handle error
if (!releaseResult)
{
std::cerr << "ProcCommunicator::receive ReleaseSemaphore FAIL\n";
return nullptr;
}
return response;
}
void ProcCommunicator::ackNotify()
{
if (m_master_mode)
{
ReleaseSemaphore(m_slave_ready, 1, NULL);
}
else
{
std::cerr << "ProcCommunicator::ackNotify must be sent only from master, when response processed.\n";
}
}
#endif

Message *ProcCommunicator::sendAndGetResponse(const Message *msg)
Expand Down
3 changes: 2 additions & 1 deletion sh_mem/ProcCommunicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class ProcCommunicator
void send(const Message *msg);
Message *receive();
Message *sendAndGetResponse(const Message *msg);
void ackNotify();

private:
std::unique_ptr<SharedMemorySender> m_sender;
std::unique_ptr<SharedMemoryReceiver> m_receiver;
bool m_master_mode;
bool m_multiple_master;

#ifndef _WIN32
sem_t *m_master_received;
sem_t *m_slave_received;
Expand Down
13 changes: 5 additions & 8 deletions sh_mem/SharedMemoryReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ SharedMemoryReceiver::SharedMemoryReceiver(const char *shMemName) : m_name(shMem
#ifndef _WIN32
void SharedMemoryReceiver::init()
{
// Try to create the shared memory segment
m_shm_fd = shm_open(m_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
if (m_shm_fd == -1)
{
if (errno == EEXIST)
{
// Segment already exists, open it without O_CREAT | O_EXCL
m_shm_fd = shm_open(m_name.c_str(), O_RDWR, 0666);
if (m_shm_fd == -1)
{
Expand All @@ -52,7 +50,6 @@ void SharedMemoryReceiver::init()
}
}

// Map the shared memory object into the address space of the process
m_ptr = mmap(0, SHARED_MEMORY_SIZE, PROT_READ, MAP_SHARED, m_shm_fd, 0);
if (m_ptr == MAP_FAILED)
{
Expand All @@ -77,9 +74,9 @@ void SharedMemoryReceiver::init()
{
std::wstring wshMemName(m_name.begin(), m_name.end());
m_shm_fd = OpenFileMappingW(
FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
wshMemName.c_str()); // name of mapping object
FILE_MAP_ALL_ACCESS,
FALSE,
wshMemName.c_str());

if (m_shm_fd == NULL)
{
Expand All @@ -89,12 +86,12 @@ void SharedMemoryReceiver::init()
PAGE_READWRITE, // read/write access
0, // maximum object size (high-order DWORD)
SHARED_MEMORY_SIZE, // maximum object size (low-order DWORD)
wshMemName.c_str()); // name of mapping object
wshMemName.c_str()); // name of mapping object

if (m_shm_fd == NULL)
{
printf(("Could not open file mapping object (%d).\n"),
GetLastError());
GetLastError());
return;
}
}
Expand Down
9 changes: 3 additions & 6 deletions sh_mem/SharedMemorySender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ SharedMemorySender::SharedMemorySender(const char *shMemName) : m_name(shMemName
#ifndef _WIN32
void SharedMemorySender::init()
{
// Try to create the shared memory segment
m_shm_fd = shm_open(m_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
if (m_shm_fd == -1)
{
if (errno == EEXIST)
{
// Segment already exists, open it without O_CREAT | O_EXCL
m_shm_fd = shm_open(m_name.c_str(), O_RDWR, 0666);
if (m_shm_fd == -1)
{
Expand All @@ -42,7 +40,6 @@ void SharedMemorySender::init()
}
else
{
// Segment created successfully, truncate it to the desired size
if (ftruncate(m_shm_fd, SHARED_MEMORY_SIZE) == -1)
{
perror("ftruncate failed");
Expand Down Expand Up @@ -97,12 +94,12 @@ void SharedMemorySender::init()
PAGE_READWRITE, // read/write access
0, // maximum object size (high-order DWORD)
SHARED_MEMORY_SIZE, // maximum object size (low-order DWORD)
wshMemName.c_str()); // name of mapping object
wshMemName.c_str()); // name of mapping object

if (m_shm_fd == NULL)
{
printf("Could not create file mapping object (%d).\n",
GetLastError());
GetLastError());
}
m_ptr = (void *)MapViewOfFile(m_shm_fd, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
Expand All @@ -125,7 +122,7 @@ void SharedMemorySender::finish()

void SharedMemorySender::sendMessage(const Message *msg)
{
if (msg->type == MessageType::SET_CONFIG)
if (msg->type == MessageType::SET_CONFIG)
CopyMemory(m_ptr, msg, sizeof(MessageSetConfig));
else if (msg->type == MessageType::COMPARE_REQUEST)
CopyMemory(m_ptr, msg, sizeof(MessageCompareRequest));
Expand Down
Loading

0 comments on commit 8bc4e7d

Please sign in to comment.