From 86f6c46af507614e6b5ab4bebf252d49e92dddec Mon Sep 17 00:00:00 2001 From: Micah Chambers Date: Thu, 25 Feb 2021 21:19:19 -0800 Subject: [PATCH] compiles again --- cmd/BUILD | 13 +-- cmd/pinger.cc | 2 +- cmd/ponger.cc | 2 +- cmd/standalone_publisher.cc | 167 +------------------------------- cmd/standalone_subscriber.cc | 181 +---------------------------------- cmd/topology_node.cc | 4 +- lib/BUILD | 63 ++++++++++-- lib/IPCMessage.cc | 67 +++++++++++++ lib/IPCMessage.h | 35 +++++++ lib/IPCNode.cc | 2 +- lib/IPCNode.h | 2 +- lib/Publisher.cc | 139 +++++++++++++++++++++++++++ lib/Publisher.h | 19 ++++ lib/Subscriber.cc | 97 +++++++++++++++++++ lib/Subscriber.h | 48 ++++++++++ lib/TopologyManager.cc | 6 +- lib/TopologyServer.cc | 4 +- lib/UDSClient.cc | 4 +- lib/UDSServer.cc | 4 +- lib/Utils.cc | 2 +- lib/ipc_node_test.cc | 2 +- lib/test_topology_manager.cc | 4 +- 22 files changed, 491 insertions(+), 376 deletions(-) create mode 100644 lib/IPCMessage.cc create mode 100644 lib/IPCMessage.h create mode 100644 lib/Publisher.cc create mode 100644 lib/Publisher.h create mode 100644 lib/Subscriber.cc create mode 100644 lib/Subscriber.h diff --git a/cmd/BUILD b/cmd/BUILD index 7010a75..8103c50 100644 --- a/cmd/BUILD +++ b/cmd/BUILD @@ -41,19 +41,14 @@ cc_binary( cc_binary( name = "standalone_publisher", srcs = ["standalone_publisher.cc"], - linkopts = [ - "-pthread", - "-lpthread", - "-lrt", - ], + deps = ["//lib:Publisher"], ) cc_binary( name = "standalone_subscriber", srcs = ["standalone_subscriber.cc"], - linkopts = [ - "-pthread", - "-lpthread", - "-lrt", + deps = [ + "//lib:IPCMessage", + "//lib:Subscriber", ], ) diff --git a/cmd/pinger.cc b/cmd/pinger.cc index 4e5f2f9..9ae328a 100644 --- a/cmd/pinger.cc +++ b/cmd/pinger.cc @@ -4,7 +4,7 @@ #include #include -#include "ips/IPCNode.h" +#include "ipc_pubsub/IPCNode.h" using namespace ips; int main() { diff --git a/cmd/ponger.cc b/cmd/ponger.cc index 61b47aa..be60faa 100644 --- a/cmd/ponger.cc +++ b/cmd/ponger.cc @@ -4,7 +4,7 @@ #include #include -#include "ips/IPCNode.h" +#include "ipc_pubsub/IPCNode.h" using namespace ips; int main() { diff --git a/cmd/standalone_publisher.cc b/cmd/standalone_publisher.cc index 5736722..efdf7c1 100644 --- a/cmd/standalone_publisher.cc +++ b/cmd/standalone_publisher.cc @@ -1,169 +1,6 @@ -#include // For O_* constants -#include -#include // for shm_open -#include -#include // For mode constants -#include -#include -#include +#include -#include -#include -#include -#include -#include -#include -#include -#include - -struct OnReturn { - OnReturn(std::function cb) : mCb(cb) {} - ~OnReturn() { mCb(); } - std::function mCb; -}; - -static std::string RandomString(size_t len) { - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - - const size_t seed1 = std::chrono::system_clock::now().time_since_epoch().count(); - std::minstd_rand0 g1(seed1); - - std::string tmp(len, ' '); - for (size_t i = 0; i < len; ++i) { - tmp[i] = alphanum[g1() % (sizeof(alphanum) - 1)]; - } - return tmp; -} - -class Publisher { - public: - Publisher(); - bool Send(const std::string& meta, size_t len, const uint8_t* data); - - int mFd; - std::mutex mMtx; - std::vector mClients; - std::thread mAcceptThread; -}; - -Publisher::Publisher() { - struct sockaddr_un addr; - const std::string socket_path = "\0socket"; - assert(socket_path.size() + 1 < sizeof(addr.sun_path)); - - if ((mFd = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) { - perror("socket error"); - exit(-1); - } - - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - std::copy(socket_path.c_str(), socket_path.c_str() + socket_path.size(), addr.sun_path); - if (bind(mFd, reinterpret_cast(&addr), sizeof(addr)) == -1) { - perror("bind error"); - exit(-1); - } - - if (listen(mFd, 5) == -1) { - perror("listen error"); - exit(-1); - } - - mAcceptThread = std::thread([this]() { - while (1) { - int cl; - if ((cl = accept(mFd, nullptr, nullptr)) == -1) { - perror("accept error"); - } else { - std::cerr << "Accepted" << std::endl; - std::lock_guard lk(mMtx); - mClients.push_back(cl); - } - } - }); -} - -bool Publisher::Send(const std::string& meta, size_t len, const uint8_t* data) { - const std::string name = RandomString(8); - - // open and write data - int shmFdWrite = shm_open(name.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); - if (shmFdWrite == -1) { - perror("Failed to create shm"); - return false; - } - - // unlink when we leave this function, since it will have been shared by - // then or failed - OnReturn onRet1([&name, shmFdWrite]() { - shm_unlink(name.c_str()); - close(shmFdWrite); - }); - - if (ssize_t written = write(shmFdWrite, data, len); written != ssize_t(len)) { - perror("Failed to write to shm"); - return false; - } - - // send read only version to other processes, close after we finish sending - int shmFdSend = shm_open(name.c_str(), O_RDONLY, S_IRUSR); - OnReturn onRet2([shmFdSend]() { close(shmFdSend); }); - - // Construct header - struct msghdr msgh; - - std::vector metadata; - std::copy(meta.begin(), meta.end(), std::back_inserter(metadata)); - metadata.push_back(0); - - // Vector of data to send, NOTE: we must always send at least one byte - // This is the data that will be sent in the actual socket stream - struct iovec iov; - iov.iov_base = metadata.data(); - iov.iov_len = metadata.size(); - - // Don't need destination because we are using a connection - msgh.msg_name = nullptr; - msgh.msg_namelen = 0; - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; // sending one message - - // Allocate a char array of suitable size to hold the ancillary data. - // However, since this buffer is in reality a 'struct cmsghdr', use a - // union to ensure that it is aligned as required for that structure. - union { - char buf[CMSG_SPACE(sizeof(shmFdSend))]; - /* Space large enough to hold an 'int' */ - struct cmsghdr align; - } controlMsg; - msgh.msg_control = controlMsg.buf; - msgh.msg_controllen = sizeof(controlMsg.buf); - - // The control message buffer must be zero-initialized in order - // for the CMSG_NXTHDR() macro to work correctly. - memset(controlMsg.buf, 0, sizeof(controlMsg.buf)); - - struct cmsghdr* cmsgp = CMSG_FIRSTHDR(&msgh); - cmsgp->cmsg_len = CMSG_LEN(sizeof(shmFdSend)); - cmsgp->cmsg_level = SOL_SOCKET; - cmsgp->cmsg_type = SCM_RIGHTS; - memcpy(CMSG_DATA(cmsgp), &shmFdSend, sizeof(shmFdSend)); - - std::cerr << (char*)(msgh.msg_iov->iov_base) << std::endl; - std::cerr << strlen((char*)msgh.msg_iov->iov_base) << std::endl; - std::cerr << msgh.msg_iov[0].iov_len << std::endl; - // Send - for (const int client : mClients) { - ssize_t ns = sendmsg(client, &msgh, 0); - if (ns == -1) { - std::cerr << "Failed to send to " << client << std::endl; - } - } - return true; -} +using ipc_pubsub::Publisher; int main(int argc, char** argv) { Publisher publisher; diff --git a/cmd/standalone_subscriber.cc b/cmd/standalone_subscriber.cc index ee06abc..b5e6dc4 100644 --- a/cmd/standalone_subscriber.cc +++ b/cmd/standalone_subscriber.cc @@ -1,183 +1,10 @@ -#include -#include -#include -#include -#include +#include -#include -#include -#include #include -#include -#include -#include -#include -struct Message { - static std::shared_ptr Create(size_t vecLen, struct msghdr msgh); - Message(std::string_view meta, uint8_t* ptr, size_t len, int fd) - : metaData(meta), blobPtr(ptr), blobSize(len), mFd(fd) {} - - ~Message() { - munmap(blobPtr, blobSize); - if (mFd != -1) close(mFd); - } - - std::string_view Contents() { - return std::string_view(reinterpret_cast(blobPtr), blobSize); - } - std::string_view MetaData() { return std::string_view(metaData); } - - // small amount of data informing the meaning of the larger blob - std::string metaData; - - // large chunk of memory mapped / shared memory data - uint8_t* blobPtr; - size_t blobSize; - - int mFd = -1; -}; - -std::shared_ptr Message::Create(size_t vecLen, struct msghdr msgh) { - int recvFd; - struct cmsghdr* cmsgp = CMSG_FIRSTHDR(&msgh); - if (cmsgp == nullptr || cmsgp->cmsg_len != CMSG_LEN(sizeof(recvFd))) { - std::cerr << "bad cmsg header / message length" << std::endl; - return nullptr; - } - if (cmsgp->cmsg_level != SOL_SOCKET) { - std::cerr << "cmsg_level != SOL_SOCKET" << std::endl; - return nullptr; - } - if (cmsgp->cmsg_type != SCM_RIGHTS) { - std::cerr << "cmsg_type != SCM_RIGHTS" << std::endl; - return nullptr; - } - - if (msgh.msg_iovlen == 0) { - std::cerr << "No vector data!?" << std::endl; - return nullptr; - } - if (msgh.msg_iovlen != 1) { - std::cerr << "Expected exactly 1 pieces of vector data, others will be ignoed" << std::endl; - return nullptr; - } - - // construct metadata from first vector payload - std::string_view meta(reinterpret_cast(msgh.msg_iov[0].iov_base), vecLen); - - /* The data area of the 'cmsghdr' is an 'int' (a file descriptor); - copy that integer to a local variable. (The received file descriptor - is typically a different file descriptor number than was used in the - sending process.) */ - memcpy(&recvFd, CMSG_DATA(cmsgp), sizeof(recvFd)); - - // seek to end so we know the length of the file to map - ssize_t blobLen = lseek(recvFd, 0, SEEK_END); - if (blobLen < 0) { - perror("Failed to see to end of received file descriptor"); - return nullptr; - } - - void* mappedData = mmap(nullptr, blobLen, PROT_READ, MAP_PRIVATE, recvFd, 0); - if (mappedData == nullptr) { - perror("Failed to map"); - return nullptr; - } - - return std::make_shared(meta, reinterpret_cast(mappedData), blobLen, recvFd); -} - -class Subscriber { - public: - Subscriber(std::function)> callback); - ~Subscriber(); - void WaitForShutdown(); - - int mFd; - std::thread mReadThread; - std::function)> mCallback; - - int mShutdownFd = -1; -}; - -Subscriber::~Subscriber() { - // trigger shutdown - size_t val = UINT32_MAX; - write(mShutdownFd, &val, sizeof(size_t)); - - mReadThread.join(); -} - -void Subscriber::WaitForShutdown() { - size_t data; - read(mShutdownFd, &data, sizeof(data)); -} - -Subscriber::Subscriber(std::function)> callback) - : mCallback(callback) { - mShutdownFd = eventfd(0, EFD_SEMAPHORE); - - struct sockaddr_un addr; - const std::string socket_path = "\0socket"; - assert(socket_path.size() + 1 < sizeof(addr.sun_path)); - - if ((mFd = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) { - perror("socket error"); - exit(-1); - } - - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - std::copy(socket_path.c_str(), socket_path.c_str() + socket_path.size(), addr.sun_path); - if (connect(mFd, reinterpret_cast(&addr), sizeof(addr)) == -1) { - perror("connect error"); - exit(-1); - } - - mReadThread = std::thread([this]() { - struct msghdr msgh; - - msgh.msg_name = nullptr; // doesn't matter because we are connected - msgh.msg_namelen = 0; - - constexpr size_t BUFFLEN = 1 << 12; - char buffer[BUFFLEN]; - struct iovec iov; - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; - iov.iov_base = buffer; - iov.iov_len = BUFFLEN; - - int recvFd = -1; - union { - char buf[CMSG_SPACE(sizeof(recvFd))]; - /* Space large enough to hold an 'int' */ - struct cmsghdr align; - } controlMsg; - msgh.msg_control = controlMsg.buf; - msgh.msg_controllen = sizeof(controlMsg.buf); - - while (ssize_t ns = recvmsg(mFd, &msgh, 0)) { - if (ns < 0) { - perror("Bad recv"); - return; - } - - std::cerr << ns << std::endl; - auto msg = Message::Create(ns, msgh); - if (msg != nullptr) { - mCallback(msg); - } - } - - // if we exit due to error, make sure to notify any blocking processes - size_t val = UINT32_MAX; - write(mShutdownFd, &val, sizeof(size_t)); - }); -} - -static void Callback(std::shared_ptr msg) { +using ipc_pubsub::IPCMessage; +using ipc_pubsub::Subscriber; +static void Callback(std::shared_ptr msg) { std::cerr << "Message Received" << std::endl; std::cerr << "Meta: " << msg->MetaData() << std::endl; std::cerr << "Msg: " << msg->Contents() << std::endl; diff --git a/cmd/topology_node.cc b/cmd/topology_node.cc index 6caa54f..36ea0c1 100644 --- a/cmd/topology_node.cc +++ b/cmd/topology_node.cc @@ -1,8 +1,8 @@ #include #include -#include "ips/TopologyManager.h" -#include "ips/Utils.h" +#include "ipc_pubsub/TopologyManager.h" +#include "ipc_pubsub/Utils.h" using ips::NodeChange; using ips::TopicChange; diff --git a/lib/BUILD b/lib/BUILD index ae2af60..c45a607 100644 --- a/lib/BUILD +++ b/lib/BUILD @@ -10,11 +10,62 @@ cc_test( ], ) +cc_library( + name = "IPCMessage", + srcs = ["IPCMessage.cc"], + hdrs = ["IPCMessage.h"], + include_prefix = "ipc_pubsub", + includes = ["."], + linkopts = [ + "-pthread", + "-lpthread", + "-lrt", + ], + visibility = ["//visibility:public"], + deps = [ + "@com_github_gabime_spdlog//:spdlog", + ], +) + +cc_library( + name = "Subscriber", + srcs = ["Subscriber.cc"], + hdrs = ["Subscriber.h"], + include_prefix = "ipc_pubsub", + includes = ["."], + linkopts = [ + "-pthread", + "-lpthread", + "-lrt", + ], + visibility = ["//visibility:public"], + deps = [ + ":IPCMessage", + ], +) + +cc_library( + name = "Publisher", + srcs = ["Publisher.cc"], + hdrs = ["Publisher.h"], + include_prefix = "ipc_pubsub", + includes = ["."], + linkopts = [ + "-pthread", + "-lpthread", + "-lrt", + ], + visibility = ["//visibility:public"], + deps = [ + ":Utils", + ], +) + cc_library( name = "TopologyServer", srcs = ["TopologyServer.cc"], hdrs = ["TopologyServer.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], deps = [ ":UDSServer", @@ -28,7 +79,7 @@ cc_library( name = "IPCNode", srcs = ["IPCNode.cc"], hdrs = ["IPCNode.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], visibility = ["//visibility:public"], deps = [":TopologyManager"], @@ -38,7 +89,7 @@ cc_library( name = "TopologyManager", srcs = ["TopologyManager.cc"], hdrs = ["TopologyManager.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], visibility = ["//visibility:public"], deps = [ @@ -54,7 +105,7 @@ cc_library( name = "UDSClient", srcs = ["UDSClient.cc"], hdrs = ["UDSClient.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], deps = [ ":Utils", @@ -66,7 +117,7 @@ cc_library( name = "UDSServer", srcs = ["UDSServer.cc"], hdrs = ["UDSServer.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], deps = [ ":Utils", @@ -78,7 +129,7 @@ cc_library( name = "Utils", srcs = ["Utils.cc"], hdrs = ["Utils.h"], - include_prefix = "ips", + include_prefix = "ipc_pubsub", includes = ["."], visibility = ["//visibility:public"], ) diff --git a/lib/IPCMessage.cc b/lib/IPCMessage.cc new file mode 100644 index 0000000..c7f972f --- /dev/null +++ b/lib/IPCMessage.cc @@ -0,0 +1,67 @@ +#include "ipc_pubsub/IPCMessage.h" + +#include +#include +#include +#include +#include + +namespace ipc_pubsub { + +IPCMessage::~IPCMessage() { + munmap(mBlobPtr, mBlobSize); + if (mFd != -1) close(mFd); +} + +std::shared_ptr IPCMessage::Create(size_t vecLen, const struct msghdr& msgh) { + int recvFd; + struct cmsghdr* cmsgp = CMSG_FIRSTHDR(&msgh); + if (cmsgp == nullptr || cmsgp->cmsg_len != CMSG_LEN(sizeof(recvFd))) { + SPDLOG_ERROR("bad cmsg header / message length"); + return nullptr; + } + if (cmsgp->cmsg_level != SOL_SOCKET) { + SPDLOG_ERROR("cmsg_level != SOL_SOCKET"); + return nullptr; + } + if (cmsgp->cmsg_type != SCM_RIGHTS) { + SPDLOG_ERROR("cmsg_type != SCM_RIGHTS"); + return nullptr; + } + + if (msgh.msg_iovlen == 0) { + SPDLOG_ERROR("No vector data!?"); + return nullptr; + } + if (msgh.msg_iovlen != 1) { + SPDLOG_ERROR("Expected exactly 1 pieces of vector data, others will be ignoed"); + return nullptr; + } + + // construct metadata from first vector payload + std::string_view meta(reinterpret_cast(msgh.msg_iov[0].iov_base), vecLen); + + /* The data area of the 'cmsghdr' is an 'int' (a file descriptor); + copy that integer to a local variable. (The received file descriptor + is typically a different file descriptor number than was used in the + sending process.) */ + memcpy(&recvFd, CMSG_DATA(cmsgp), sizeof(recvFd)); + + // seek to end so we know the length of the file to map + ssize_t blobLen = lseek(recvFd, 0, SEEK_END); + if (blobLen < 0) { + perror("Failed to see to end of received file descriptor"); + return nullptr; + } + + void* mappedData = mmap(nullptr, blobLen, PROT_READ, MAP_PRIVATE, recvFd, 0); + if (mappedData == nullptr) { + perror("Failed to map"); + return nullptr; + } + + return std::make_shared(meta, reinterpret_cast(mappedData), blobLen, + recvFd); +} + +} // namespace ipc_pubsub diff --git a/lib/IPCMessage.h b/lib/IPCMessage.h new file mode 100644 index 0000000..b697352 --- /dev/null +++ b/lib/IPCMessage.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +struct msghdr; + +namespace ipc_pubsub { +struct IPCMessage { + public: + static std::shared_ptr Create(size_t vecLen, const struct msghdr& msgh); + IPCMessage(std::string_view meta, uint8_t* ptr, size_t len, int fd) + : mMetaData(meta), mBlobPtr(ptr), mBlobSize(len), mFd(fd) {} + ~IPCMessage(); + + const std::string_view Contents() const { + return std::string_view(reinterpret_cast(mBlobPtr), mBlobSize); + } + const std::string_view MetaData() const { return std::string_view(mMetaData); } + + private: + // small amount of data informing the meaning of the larger blob + std::string mMetaData; + + // large chunk of memory mapped / shared memory data + uint8_t* mBlobPtr; + size_t mBlobSize; + + // File Descriptor of Shared Memory + int mFd = -1; +}; +} // namespace ipc_pubsub diff --git a/lib/IPCNode.cc b/lib/IPCNode.cc index 14051f1..178607d 100644 --- a/lib/IPCNode.cc +++ b/lib/IPCNode.cc @@ -1,4 +1,4 @@ -#include "ips/IPCNode.h" +#include "ipc_pubsub/IPCNode.h" #include #include diff --git a/lib/IPCNode.h b/lib/IPCNode.h index 5c3b368..3db1a33 100644 --- a/lib/IPCNode.h +++ b/lib/IPCNode.h @@ -4,7 +4,7 @@ #include #include -#include "ips/TopologyManager.h" +#include "ipc_pubsub/TopologyManager.h" namespace ips { struct IPCNeighbor; diff --git a/lib/Publisher.cc b/lib/Publisher.cc new file mode 100644 index 0000000..8e694fc --- /dev/null +++ b/lib/Publisher.cc @@ -0,0 +1,139 @@ +#include "ipc_pubsub/Publisher.h" + +#include // For O_* constants +#include +#include // for shm_open +#include +#include // For mode constants +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ipc_pubsub/Utils.h" + +namespace ipc_pubsub { +Publisher::Publisher() { + struct sockaddr_un addr; + const std::string socket_path = "\0socket"; + assert(socket_path.size() + 1 < sizeof(addr.sun_path)); + + if ((mFd = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) { + perror("socket error"); + exit(-1); + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + std::copy(socket_path.c_str(), socket_path.c_str() + socket_path.size(), addr.sun_path); + if (bind(mFd, reinterpret_cast(&addr), sizeof(addr)) == -1) { + perror("bind error"); + exit(-1); + } + + if (listen(mFd, 5) == -1) { + perror("listen error"); + exit(-1); + } + + mAcceptThread = std::thread([this]() { + while (1) { + int cl; + if ((cl = accept(mFd, nullptr, nullptr)) == -1) { + perror("accept error"); + } else { + std::cerr << "Accepted" << std::endl; + std::lock_guard lk(mMtx); + mClients.push_back(cl); + } + } + }); +} + +bool Publisher::Send(const std::string& meta, size_t len, const uint8_t* data) { + const std::string name = GenRandom(8); + + // open and write data + int shmFdWrite = shm_open(name.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (shmFdWrite == -1) { + perror("Failed to create shm"); + return false; + } + + // unlink when we leave this function, since it will have been shared by + // then or failed + OnRet onRet1([&name, shmFdWrite]() { + shm_unlink(name.c_str()); + close(shmFdWrite); + }); + + if (ssize_t written = write(shmFdWrite, data, len); written != ssize_t(len)) { + perror("Failed to write to shm"); + return false; + } + + // send read only version to other processes, close after we finish sending + int shmFdSend = shm_open(name.c_str(), O_RDONLY, S_IRUSR); + OnRet onRet2([shmFdSend]() { close(shmFdSend); }); + + // Construct header + struct msghdr msgh; + + std::vector metadata; + std::copy(meta.begin(), meta.end(), std::back_inserter(metadata)); + metadata.push_back(0); + + // Vector of data to send, NOTE: we must always send at least one byte + // This is the data that will be sent in the actual socket stream + struct iovec iov; + iov.iov_base = metadata.data(); + iov.iov_len = metadata.size(); + + // Don't need destination because we are using a connection + msgh.msg_name = nullptr; + msgh.msg_namelen = 0; + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; // sending one message + + // Allocate a char array of suitable size to hold the ancillary data. + // However, since this buffer is in reality a 'struct cmsghdr', use a + // union to ensure that it is aligned as required for that structure. + union { + char buf[CMSG_SPACE(sizeof(shmFdSend))]; + /* Space large enough to hold an 'int' */ + struct cmsghdr align; + } controlMsg; + msgh.msg_control = controlMsg.buf; + msgh.msg_controllen = sizeof(controlMsg.buf); + + // The control message buffer must be zero-initialized in order + // for the CMSG_NXTHDR() macro to work correctly. + memset(controlMsg.buf, 0, sizeof(controlMsg.buf)); + + struct cmsghdr* cmsgp = CMSG_FIRSTHDR(&msgh); + cmsgp->cmsg_len = CMSG_LEN(sizeof(shmFdSend)); + cmsgp->cmsg_level = SOL_SOCKET; + cmsgp->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(cmsgp), &shmFdSend, sizeof(shmFdSend)); + + std::cerr << reinterpret_cast(msgh.msg_iov->iov_base) << std::endl; + std::cerr << strlen(reinterpret_cast(msgh.msg_iov->iov_base)) << std::endl; + std::cerr << msgh.msg_iov[0].iov_len << std::endl; + // Send + for (const int client : mClients) { + ssize_t ns = sendmsg(client, &msgh, 0); + if (ns == -1) { + std::cerr << "Failed to send to " << client << std::endl; + } + } + return true; +} +} // namespace ipc_pubsub diff --git a/lib/Publisher.h b/lib/Publisher.h new file mode 100644 index 0000000..4d71c1d --- /dev/null +++ b/lib/Publisher.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include +#include + +namespace ipc_pubsub { +class Publisher { + public: + Publisher(); + bool Send(const std::string& meta, size_t len, const uint8_t* data); + + int mFd; + std::mutex mMtx; + std::vector mClients; + std::thread mAcceptThread; +}; +} // namespace ipc_pubsub diff --git a/lib/Subscriber.cc b/lib/Subscriber.cc new file mode 100644 index 0000000..6792ff6 --- /dev/null +++ b/lib/Subscriber.cc @@ -0,0 +1,97 @@ +#include "ipc_pubsub/Subscriber.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ipc_pubsub { + +Subscriber::~Subscriber() { + // trigger shutdown + size_t val = UINT32_MAX; + write(mShutdownFd, &val, sizeof(size_t)); + + mReadThread.join(); +} + +void Subscriber::WaitForShutdown() { + size_t data; + read(mShutdownFd, &data, sizeof(data)); +} + +Subscriber::Subscriber(std::function)> callback) + : mCallback(callback) { + mShutdownFd = eventfd(0, EFD_SEMAPHORE); + + struct sockaddr_un addr; + const std::string socket_path = "\0socket"; + assert(socket_path.size() + 1 < sizeof(addr.sun_path)); + + if ((mFd = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) { + perror("socket error"); + exit(-1); + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + std::copy(socket_path.c_str(), socket_path.c_str() + socket_path.size(), addr.sun_path); + if (connect(mFd, reinterpret_cast(&addr), sizeof(addr)) == -1) { + perror("connect error"); + exit(-1); + } + + mReadThread = std::thread([this]() { + struct msghdr msgh; + + msgh.msg_name = nullptr; // doesn't matter because we are connected + msgh.msg_namelen = 0; + + constexpr size_t BUFFLEN = 1 << 12; + char buffer[BUFFLEN]; + struct iovec iov; + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + iov.iov_base = buffer; + iov.iov_len = BUFFLEN; + + int recvFd = -1; + union { + char buf[CMSG_SPACE(sizeof(recvFd))]; + /* Space large enough to hold an 'int' */ + struct cmsghdr align; + } controlMsg; + msgh.msg_control = controlMsg.buf; + msgh.msg_controllen = sizeof(controlMsg.buf); + + while (ssize_t ns = recvmsg(mFd, &msgh, 0)) { + if (ns < 0) { + perror("Bad recv"); + return; + } + + std::cerr << ns << std::endl; + auto msg = IPCMessage::Create(ns, msgh); + if (msg != nullptr) { + mCallback(msg); + } + } + + // if we exit due to error, make sure to notify any blocking processes + size_t val = UINT32_MAX; + write(mShutdownFd, &val, sizeof(size_t)); + }); +} + +} // namespace ipc_pubsub diff --git a/lib/Subscriber.h b/lib/Subscriber.h new file mode 100644 index 0000000..8481b53 --- /dev/null +++ b/lib/Subscriber.h @@ -0,0 +1,48 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +struct msghdr; + +namespace ipc_pubsub { +struct IPCMessage { + public: + static std::shared_ptr Create(size_t vecLen, const struct msghdr& msgh); + IPCMessage(std::string_view meta, uint8_t* ptr, size_t len, int fd) + : metaData(meta), blobPtr(ptr), blobSize(len), mFd(fd) {} + ~IPCMessage(); + + const std::string_view Contents() const { + return std::string_view(reinterpret_cast(blobPtr), blobSize); + } + const std::string_view MetaData() const { return std::string_view(metaData); } + + private: + // small amount of data informing the meaning of the larger blob + std::string metaData; + + // large chunk of memory mapped / shared memory data + uint8_t* blobPtr; + size_t blobSize; + + // File Descriptor of Shared Memory + int mFd = -1; +}; + +class Subscriber { + public: + Subscriber(std::function)> callback); + ~Subscriber(); + void WaitForShutdown(); + + int mFd; + std::thread mReadThread; + std::function)> mCallback; + + int mShutdownFd = -1; +}; +} // namespace ipc_pubsub diff --git a/lib/TopologyManager.cc b/lib/TopologyManager.cc index ab8355e..02cc0c0 100644 --- a/lib/TopologyManager.cc +++ b/lib/TopologyManager.cc @@ -1,4 +1,4 @@ -#include "ips/TopologyManager.h" +#include "ipc_pubsub/TopologyManager.h" #include #include @@ -16,8 +16,8 @@ #include #include -#include "ips/TopologyServer.h" -#include "ips/UDSClient.h" +#include "ipc_pubsub/TopologyServer.h" +#include "ipc_pubsub/UDSClient.h" #include "protos/index.pb.h" namespace ips { diff --git a/lib/TopologyServer.cc b/lib/TopologyServer.cc index f4652f0..1c6000a 100644 --- a/lib/TopologyServer.cc +++ b/lib/TopologyServer.cc @@ -1,4 +1,4 @@ -#include "ips/TopologyServer.h" +#include "ipc_pubsub/TopologyServer.h" #include #include @@ -17,7 +17,7 @@ #include #include -#include "ips/UDSServer.h" +#include "ipc_pubsub/UDSServer.h" #include "protos/index.pb.h" namespace ips { diff --git a/lib/UDSClient.cc b/lib/UDSClient.cc index fb15fd3..31f43e1 100644 --- a/lib/UDSClient.cc +++ b/lib/UDSClient.cc @@ -1,4 +1,4 @@ -#include "ips/UDSClient.h" +#include "ipc_pubsub/UDSClient.h" #include #include @@ -13,7 +13,7 @@ #include #include -#include "ips/Utils.h" +#include "ipc_pubsub/Utils.h" namespace ips { std::shared_ptr UDSClient::Create(std::string_view sockPath, OnDataCallback onData, std::function onDisconnect) { diff --git a/lib/UDSServer.cc b/lib/UDSServer.cc index ba8d275..6ec801b 100644 --- a/lib/UDSServer.cc +++ b/lib/UDSServer.cc @@ -1,4 +1,4 @@ -#include "ips/UDSServer.h" +#include "ipc_pubsub/UDSServer.h" #include #include @@ -11,7 +11,7 @@ #include #include -#include "ips/Utils.h" +#include "ipc_pubsub/Utils.h" namespace ips { std::shared_ptr UDSServer::Create(std::string_view sockPath, ConnHandler onConnect, ConnHandler onDisconnect, DataHandler onData) { diff --git a/lib/Utils.cc b/lib/Utils.cc index ccb0f45..47b7429 100644 --- a/lib/Utils.cc +++ b/lib/Utils.cc @@ -1,5 +1,5 @@ -#include "ips/Utils.h" +#include "ipc_pubsub/Utils.h" #include #include diff --git a/lib/ipc_node_test.cc b/lib/ipc_node_test.cc index 804b468..1df3f7d 100644 --- a/lib/ipc_node_test.cc +++ b/lib/ipc_node_test.cc @@ -1,7 +1,7 @@ #include #include -#include "ips/IPCNode.h" +#include "ipc_pubsub/IPCNode.h" int main() { auto node = IPCNode::Create("group"); std::this_thread::sleep_for(std::chrono::seconds(5)); diff --git a/lib/test_topology_manager.cc b/lib/test_topology_manager.cc index a23ccd8..350e154 100644 --- a/lib/test_topology_manager.cc +++ b/lib/test_topology_manager.cc @@ -1,8 +1,8 @@ #include #include -#include "ips/TopologyManager.h" -#include "ips/Utils.h" +#include "ipc_pubsub/TopologyManager.h" +#include "ipc_pubsub/Utils.h" using namespace ips;