Skip to content

Commit

Permalink
broken, but not too broken
Browse files Browse the repository at this point in the history
  • Loading branch information
micahcc committed Jan 28, 2021
1 parent fa7faab commit d5b4f0c
Show file tree
Hide file tree
Showing 23 changed files with 1,120 additions and 331 deletions.
9 changes: 7 additions & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ build --incompatible_strict_action_env
build --cxxopt=-std=c++20
build --cxxopt=-Wall
build --cxxopt=-Weverything
build --cxxopt=-Wno-c++98-compat
build --cxxopt=-Wno-zero-length-array
#build --cxxopt=-Wno-c++98-compat
#build --cxxopt=-Wno-zero-length-array
build --cxxopt=-Wno-padded
build --cxxopt=-Wno-exit-time-destructors
#build --cxxopt=-Wno-suggest-destructor-override
build --cxxopt=-Wno-global-constructors
#build --cxxopt=-Wno-zero-as-null-pointer-constant
#build --cxxopt=-Wno-double-promotion

# so that protobuf doesn't trigger warnings
build --cxxopt=-Wno-c++98-compat-pedantic
Expand Down
10 changes: 10 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ http_archive(
strip_prefix = "fmt-%s" % FMTLIB_VERSION,
urls = ["https://github.com/fmtlib/fmt/releases/download/%s/fmt-%s.zip" % (FMTLIB_VERSION, FMTLIB_VERSION)],
)

http_archive(
name = "gtest",
build_file = "@com_github_micahcc_ipc_pubsub//bazel/external:gtest.BUILD",
sha256 = "9dc9157a9a1551ec7a7e43daea9a694a0bb5fb8bec81235d8a1e6ef64c716dcb",
strip_prefix = "googletest-release-1.10.0",
urls = [
"https://github.com/google/googletest/archive/release-1.10.0.tar.gz",
],
)
59 changes: 59 additions & 0 deletions bazel/external/gtest.BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package(default_visibility = ["//visibility:public"])

# Library that defines the FRIEND_TEST macro.
cc_library(
name = "gtest_prod",
hdrs = ["googletest/include/gtest/gtest_prod.h"],
includes = ["googletest/include"],
)

# Google Test including Google Mock
cc_library(
name = "gtest",
srcs = glob(
include = [
"googletest/src/*.cc",
"googletest/src/*.h",
"googletest/include/gtest/**/*.h",
"googlemock/src/*.cc",
"googlemock/include/gmock/**/*.h",
],
exclude = [
"googletest/src/gtest-all.cc",
"googletest/src/gtest_main.cc",
"googlemock/src/gmock-all.cc",
"googlemock/src/gmock_main.cc",
],
),
hdrs = glob([
"googletest/include/gtest/*.h",
"googlemock/include/gmock/*.h",
]),
copts = [
"-pthread",
"-Wno-undef",
"-Wno-unused-member-function",
"-Wno-zero-as-null-pointer-constant",
"-Wno-used-but-marked-unused",
"-Wno-missing-noreturn",
"-Wno-covered-switch-default",
"-Wno-disabled-macro-expansion",
"-Wno-weak-vtables",
"-Wno-switch-enum",
"-Wno-missing-prototypes",
"-Wno-deprecated-copy-dtor",
],
includes = [
"googlemock",
"googlemock/include",
"googletest",
"googletest/include",
],
linkopts = ["-pthread"],
)

cc_library(
name = "gtest_main",
srcs = ["googlemock/src/gmock_main.cc"],
deps = [":gtest"],
)
46 changes: 15 additions & 31 deletions protos/index.proto
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
syntax = "proto3";

package ipc_pubsub;
message InFlight {
string topic = 1;

// shared memory containing the data
string payload_name = 2;
};

message Node {
string name = 1;
uint64 id = 2;
string notify = 3; // name of OS semaphore to up when sending a message
int32 pid = 4; // used to clean up dead nodes

// Messages that need to be processed
repeated InFlight in_flight = 10;
};

message Topic {
string name = 1;
// Storage for actual IPC messages, may contain inline data
message MetadataMessage {
string topic = 1;

// if the topic contains protobuf then this will be the type infor otherwise
// a mimetype
string mime = 2;
// if the message is small then it will be inlined here
bytes inlined = 2;

// node ids of topic users
repeated uint64 publishers = 3;
repeated uint64 subscribers = 4;
// TODO(micah) add send timestamp
};

message Index {
repeated Node nodes = 1;
repeated Topic topics = 2;
}

enum NodeOperation {
NODE_UNSET = 0;
JOIN = 1;
Expand Down Expand Up @@ -65,6 +42,13 @@ message TopicChange {
};

message TopologyMessage {
repeated NodeChange node_changes = 1;
repeated TopicChange topic_changes = 2;
// sequences are minted by the server, when sending from client to server seq
// should be 0, in a message containing a digest, the seq will be the maximum
// value in the digest
uint64 seq = 1;

oneof Op {
NodeChange node_change = 3;
TopicChange topic_change = 4;
};
};
30 changes: 20 additions & 10 deletions src/BUILD
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library")

cc_test(
name = "test_topology_manager",
srcs = ["test_topology_manager.cc"],
deps = [
":TopologyManager",
":Utils",
"@gtest",
],
)

cc_binary(
name = "topology_node",
srcs = ["topology_node.cc"],
deps = [
":TopologyManager",
":Utils",
"@com_github_gabime_spdlog//:spdlog",
],
)
Expand All @@ -19,22 +30,21 @@ cc_binary(
srcs = ["unix_dgram_writer.cc"],
)

cc_library(
name = "TopologyStore",
srcs = ["TopologyStore.cc"],
hdrs = ["TopologyStore.h"],
deps = [
"//protos:index_cc_proto",
"@com_github_gabime_spdlog//:spdlog",
],
)
#cc_library(
# name = "TopologyStore",
# srcs = ["TopologyStore.cc"],
# hdrs = ["TopologyStore.h"],
# deps = [
# "//protos:index_cc_proto",
# "@com_github_gabime_spdlog//:spdlog",
# ],
#)

cc_library(
name = "TopologyServer",
srcs = ["TopologyServer.cc"],
hdrs = ["TopologyServer.h"],
deps = [
":TopologyStore",
":UDSServer",
":Utils",
"//protos:index_cc_proto",
Expand Down
183 changes: 183 additions & 0 deletions src/IPCNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#include "IPCNode.h"

#include <google/protobuf/message_lite.h>
#include <poll.h>

void IPCNode::Publish(std::string_view topic, int64_t len, uint8_t* data) {
thread_local std::vector<int> fds;
{
std::scoped_lock lk(mMtx);
fds = mFdsByTopic[topic];
}
for (const int fd : fds) write(fd, data, len);
}

void IPCNode::Publish(std::string_view topic, const MessageLite& msg) {
thread_local std::vector<int> fds;
{
std::scoped_lock lk(mMtx);
fds = mFdsByTopic[topic];
}
for (const int fd : fds) msg.SerializeToFileDescriptor(fd);
}

void IPCNode::Unsubscribe(std::string_view topic) {
// publish that we want the messages
mTopologyManager->Unsubscribe(topic);

// Remove callbacks
auto& topicObject = mTopics[topic];
topicObject.rawCb = nullptr;
topicObject.protoCb = nullptr;
}

void IPCNode::Subscribe(std::string_view topic, RawCallback cb) {
// publish that we want the messages
mTopologyManager->Subscribe(topic);

// add callback
auto& topicObject = mTopics[topic];
topicObject.rawCb = cb;
}

void IPCNode::Subscribe(std::string_view topic, ProtoCallback cb) {
// publish that we want the messages
mTopologyManager->Subscribe(topic);

// add callback
auto& topicObject = mTopics[topic];
topicObject.protoCb = cb;
}

void IPCNode::Announce(std::string_view topic, std::string_view mime) {
mTopologyManager->Announce(topic, mime);
}

void IPCNode::Retract(std::string_view topic) { mTopologyManager->Retract(topic, mime); }

void IPCNode::OnJoin() {}
void IPCNode::OnLeave() {}
void IPCNode::OnAnnounce() {}
void IPCNode::OnRetract() {}
void IPCNode::OnSubscribe() {}
void IPCNode::OnUnsubscribe() {}

void IPCNode::Create(std::string_view groupName, std::string_view nodeName) {
std::random_device rd;
std::mt19937_64 e2(rd());
nodeId = e2();

// add ourselves to the list of nodes
std::ostringstream oss;
oss << '\0' << std::hex << std::setw(16) << std::setfill('0') << nodeId;
dataPath = oss.str();

// create socket to read from
int sock;
struct sockaddr_un name;

/* Create socket from which to read. */
sock = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock < 0) {
perror("opening datagram socket");
return nullptr;
}

/* Create name. */
name.sun_family = AF_UNIX;
std::copy(dataPath.begin(), dataPath.end(), name.sun_path);
name.sun_path[dataPath.size()] = 0;

/* Bind the UNIX domain address to the created socket */
if (bind(sock, reinterpret_cast<struct sockaddr*>(&name), sizeof(struct sockaddr_un))) {
perror("binding name to datagram socket");
exit(1);
}
}

void IPCNode::IPCNode(std::string_view groupName, std::string_view nodeName, uint64_t nodeId,
std::string_view dataPath)
: mGroupName(groupName), mNodeName(nodeName), mNodeId(nodeId), mDataPath(dataPath) {
NodeChangeHandler onJoin = nullptr, auto onJoin = [this](const NodeChange&msg) { OnJoin(msg); };
auto onLeave = [this](const NodeChange& msg) { OnLeave(msg); };
auto onAnnounce = [this](const TopicChange& msg) { OnAnnounce(msg); };
auto onRetract = [this](const TopicChange& msg) { OnRetract(msg); };
auto onSubscribe = [this](const TopicChange& msg) { OnSubscribe(msg); };
auto onUnsubscribe = [this](const TopicChange& msg) { OnUnsubscribe(msg); };

auto topologyManager = std::make_shared<TopologyManager>(nodeId, groupName, dataPath, nodeName,
callbacks, onJoin, onLeave, onAnnounce,
onRetract, onSubscribe, onUnsubscribe);

mMainThread = std::thread([this]() { MainLoop(); });
}

void IPCNode::OnData(int64_t len, uint8_t* data) {
static thread_local MetadataMessage msg;
if (!msg.ParseFromArray(len, data)) {
SPDLOG_ERROR("Failed to parse data of size {}", len);
return;
}

// TODO(micah) get data out of shared memory
if (!msg.inlined.empty()) {
rawCb(msg.size(), msg.data());
}

RawCallback rawCb;
ProtoCallback protoCb;
{
std::scoped_lock lk(mMtx);
auto it = mTopics->find(msg.topic);
if (it == mTopics.end()) return;
}
}

int IPCNode::MainLoop() {
// Read from data loop
struct pollfd fds[2];
fds[0].fd = mShutdownFd;
fds[0].events = POLLIN;
fds[0].revents = 0;

fds[1].fd = mInputFd;
fds[1].events = POLLIN;
fds[1].revents = 0;
// now that we are connected field events from leader OR shutdown event
// wait for it to close or shutdown event
while (1) {
int ret = poll(fds, 2, -1);
if (ret < 0) {
SPDLOG_ERROR("Failed to Poll: {}", strerror(errno));
return -1;
}

if (fds[0].revents != 0) {
SPDLOG_INFO("Polled shutdown");
// shutdown event received, exit
return 0;
}

if (fds[1].revents != 0) {
if (fds[1].revents & POLLERR) {
SPDLOG_ERROR("poll error");
return -1;
} else if (fds[1].revents & POLLNVAL) {
SPDLOG_INFO("File descriptor not open");
return -1;
} else if (fds[1].revents & POLLIN) {
// socket has data, read it
uint8_t buffer[UINT16_MAX];
SPDLOG_INFO("onData");
int64_t nBytes = read(mFd, buffer, UINT16_MAX);
if (nBytes < 0) {
SPDLOG_ERROR("Error reading: {}", strerror(errno));
} else {
OnData(nBytes, buffer);
}
}
}
}
}

void IPCNode::~IPCNode() { close(sock); }
Loading

0 comments on commit d5b4f0c

Please sign in to comment.