Skip to content

Commit

Permalink
added uid to messages
Browse files Browse the repository at this point in the history
  • Loading branch information
micahcc committed Mar 1, 2021
1 parent 86f914b commit 68af5d5
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 102 deletions.
65 changes: 33 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,39 @@ Simplified IPC over POSIX shared memory, with a publisher/subscriber interface.

# Design Notes

## Thoughts
- Unix / Posix queues need tuning to have a reasonable amount of file descriptors available. I would prefer not to require that.
- UDP requires proper networking setup which doesn't seem desirable
- Semaphores don't allow any data through which means all actual payloads must
be sent through complex shared memory structures (even metadata about what
nodes are available)
- Pipes have somewhat complex state that can result in spurious blocking of there
is too much data in the pipes
## Topology
- Topology is hub-and-spoke with randomly chosen hub.
- The first Node to create the hub wins

## Design
- A single shared memory object is kept with an index and in-flight messages
### Hub
- Hub maintains a list of topology events that it sends to each new client.
- Hub is responsible for minting new Topology Events when a Client sends updates
about themselves
- New hubs may be missing history (for instance if a just-started client wins
the race to create a hub) or create history that conflicts with the spokes (
for instance if a new client with no history attachs and registers itself
befure the rest of the clients send the full history).
- Upon connection clients will send their full version of history which the hub
will integrate into a complete history. The hub is the only source of truth on
history so if it sends updates that contradict clients, the clients must accept
the new history and trigger events for them. This should be rare though
- The hub 'mints' new history events by giving them a sequence (seq) and unique
ID (uid) number. It then sends these events to all clients.

### Adding a Node
- write-lock the metadata file
- read the current metadata
- append node information to data
- write metadata
- unlock the metadata file
### Spoke / Client
- The purpose of the Topology Client is to call callbacks for topology update
events. In general duplicate callbacks for events should be avoided, however
during Hub transfer (the hub dies and is restarted) there may occasionaly be
a duplicate.
- New clients will have no history and be sent the complete history upon
connecting to the hub.
- Its possible through various fail conditions that a client could have conflicting
history with the hub. In this case all conflicts are removed and any events
sent by the Hub should trigger additional Topology Events
- During Hub transfer the Client will reconnect to the new hub and send 1. the
entire history 2. and information about itself

### Sending a Message
- (optional) retrieve and use a shm buffer ::Allocate() -> ShmBuffer();
- Send(ShmBuffer) or Send(data, len)
- write-lock the metadata file
- read the current metadata
- append to the appropriate in-flight queues
- unlock the metadata file
- sem_up() on all the readers

### Reading a Message
- While true, wait for sem_down()
- write-lock the metadata file
- update current metadata
- pop all the in-flight shm objects
- unlock the metadata file
- pass off shm objects to handler thread
## Point-To-Point Messages
- Messages consist of a shared memory segment, the associated file descriptor is
then sent over Unix Domain Socket to all subscribers, then the segment (and fd)
is closed at the sender
7 changes: 4 additions & 3 deletions lib/TopologyManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ void TopologyManager::IntroduceOurselves(std::shared_ptr<UDSClient> client) {
}

// craft messages about ourselves
// TODO(micah) this will result in duplicates in the history everywhere, purge unecessary
// messages on the server and notify the clients
// Its possible this will be a duplicate of something in our history, but
// its up to the server to handle that
TopologyMessage msg;
auto nodeMsg = msg.mutable_node_change();
nodeMsg->set_id(mNodeId);
Expand Down Expand Up @@ -242,7 +242,8 @@ void TopologyManager::MainLoop() {
// we're connected send our history so that the server can integrate it
IntroduceOurselves(client);

// clear backlog and update client
// clear backlog (messages sent while the client was disconnected)
// and update client
std::vector<TopologyMessage> backlog;
{
std::lock_guard<std::mutex> lk(mMtx);
Expand Down
232 changes: 168 additions & 64 deletions lib/TopologyServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void TopologyServer::OnConnect(int fd) {
// new client, send complete state message including all
// nodes that we have a connection to
std::lock_guard<std::mutex> lk(mMtx);

SPDLOG_DEBUG("onConnect, sending digest");
auto& client = mClients[fd];
for (const auto& msg : mHistory) {
Expand All @@ -41,8 +42,37 @@ void TopologyServer::OnConnect(int fd) {
}
}

void TopologyServer::PruneRemoved() {
std::unordered_set<uint64_t> left;
for (const auto& msg : mHistory) {
if (msg.has_node_change() && msg.node_change().op() == NodeOperation::LEAVE)
left.emplace(msg.node_change().id());
}

// remove all messages for nodes that have been removed (nodes with LEAVE messages)
for (auto it = mHistory.begin(); it != mHistory.end();) {
if (it->has_node_change()) {
if (left.count(it->node_change().id()) > 0 &&
it->node_change().op() != NodeOperation::LEAVE) {
// node has left and this isn't the leave message, remove
it = mHistory.erase(it);
} else {
++it;
}
} else if (it->has_topic_change()) {
if (left.count(it->topic_change().node_id() > 0)) {
// topic change for a node that doesn't exist, remove it
it = mHistory.erase(it);
} else {
++it;
}
} else {
++it;
}
}
}

void TopologyServer::Broadcast() {
std::lock_guard<std::mutex> lk(mMtx);
for (auto& pair : mClients) {
for (const auto& msg : mHistory) {
if (msg.seq() > pair.second.seq) {
Expand All @@ -55,99 +85,173 @@ void TopologyServer::Broadcast() {
}

void TopologyServer::OnDisconnect(int fd) {
{
std::lock_guard<std::mutex> lk(mMtx);
auto it = mClients.find(fd);
assert(it != mClients.end()); // how can a fd disconnect that wasn't connected?
auto& msg = mHistory.emplace_back();
msg.set_seq(mNextSeq++);
auto nodeChangePtr = msg.mutable_node_change();
nodeChangePtr->set_id(it->second.nodeId);
nodeChangePtr->set_op(NodeOperation::LEAVE);

// notifiy all clients that a client has been removed
mClients.erase(it);
}
std::lock_guard<std::mutex> lk(mMtx);
auto it = mClients.find(fd);
assert(it != mClients.end()); // how can a fd disconnect that wasn't connected?
auto& msg = mHistory.emplace_back();
msg.set_seq(mNextSeq++);
auto nodeChangePtr = msg.mutable_node_change();
nodeChangePtr->set_id(it->second.nodeId);
nodeChangePtr->set_op(NodeOperation::LEAVE);

// notifiy all clients that a client has been removed
mClients.erase(it);

PruneRemoved();
Broadcast();
}

// adapted from String.hashCode()

bool operator==(const NodeChange& lhs, const NodeChange& rhs) {
return lhs.id() == rhs.id() && lhs.op() == rhs.op() && lhs.name() == rhs.name() &&
lhs.address() == rhs.address();
}

bool operator==(const TopicChange& lhs, const TopicChange& rhs) {
return lhs.name() == rhs.name() && lhs.mime() == rhs.mime() && lhs.node_id() == rhs.node_id() &&
lhs.op() == rhs.op() && lhs.socket() == rhs.socket();
}

bool operator==(const TopologyMessage& lhs, const TopologyMessage& rhs) {
return lhs.seq() == rhs.seq() && lhs.uid() == rhs.uid() &&
lhs.has_node_change() == rhs.has_node_change() &&
lhs.has_topic_change() == rhs.has_topic_change() &&
((lhs.has_node_change() && lhs.node_change() == rhs.node_change()) ||
(lhs.has_topic_change() && lhs.topic_change() == rhs.topic_change()));
}

bool TopologyServer::IsRedundant(const TopologyMessage& msg, const Client& client) {
for (const auto& hMsg : client.history) {
if (hMsg.has_node_change() && msg.has_node_change() &&
hMsg.node_change() == msg.node_change()) {
return true;
} else if (hMsg.has_topic_change() && msg.has_topic_change() &&
hMsg.topic_change() == msg.topic_change()) {
return true;
}
}
return false;
}

void TopologyServer::AddNovelMessage(const TopologyMessage& msg, Client* client) {
static std::random_device randomDevice;
static std::mt19937_64 randomGen(randomDevice());
// mint new seq and add to history if the message is fresh (no sequence)
if (IsRedundant(msg, *client)) {
SPDLOG_INFO("Redundant message ignored: {}", msg.ShortDebugString());
} else {
TopologyMessage tmp = msg;
tmp.set_seq(mNextSeq++);
tmp.set_uid(randomGen());
mHistory.push_back(tmp);
client->history.push_back(tmp);
}
}

void TopologyServer::AddOldMessage(const TopologyMessage& msg, Client* client) {
// see if we have this message, if not then insert into history
auto hit = std::lower_bound(mHistory.begin(), mHistory.end(), msg,
[](const TopologyMessage& lhs, const TopologyMessage& rhs) {
return lhs.seq() < rhs.seq();
});
if (hit == mHistory.end()) {
// we don't have this yet and it is the newest, accept
assert(msg.uid() != 0);
mHistory.insert(hit, msg);
client->history.push_back(msg);
mNextSeq = msg.seq() + 1;
} else if (hit->seq() == msg.seq() && hit->uid() == msg.uid()) {
// we have a match, just ignore
assert(*hit == msg);
} else {
// either the sequence isn't recent or has a different UID.
// The reason not to fill non-recent messages is that presumably
// the hole is there fore some reason -- for instance because the
// node is shutdown and the nodes history has been purged.
SPDLOG_ERROR("Received conflicting historic message, rejecting\n{}",
msg.ShortDebugString());
}
}

void TopologyServer::OnData(int fd, int64_t len, uint8_t* data) {
if (len == 0) return;

// notify all clients that a new client has been updated
TopologyMessage msg;
msg.ParseFromArray(data, int(len));

// get an ID out of the message (should only have one, clients should only
// send information about themselves)
uint64_t id = UINT64_MAX;
uint64_t nodeId = 0;
if (msg.has_node_change())
id = msg.node_change().id();
nodeId = msg.node_change().id();
else if (msg.has_topic_change())
id = msg.topic_change().node_id();
assert(id != UINT64_MAX);
nodeId = msg.topic_change().node_id();
if (nodeId == 0) {
SPDLOG_ERROR("Ignoring msg with no node ID: {}", msg.ShortDebugString());
return;
}

// Update fd -> nodeId map
{
std::lock_guard<std::mutex> lk(mMtx);
std::lock_guard<std::mutex> lk(mMtx);

auto cit = mClients.find(fd);
assert(cit != mClients.end()); // should have created when the client connected
cit->second.nodeId = id;
// Update fd -> nodeId map
auto cit = mClients.find(fd);
assert(cit != mClients.end()); // should have created when the client connected
if (cit->second.nodeId != nodeId) {
SPDLOG_ERROR("Node changed ID?");
}
cit->second.nodeId = nodeId;

// mint new seq and add to history if the message is fresh (no sequence)
if (msg.seq() == 0) {
msg.set_seq(mNextSeq++);
mHistory.push_back(msg);
} else {
// see if we have this message, if not then insert into history
auto hit = std::lower_bound(mHistory.begin(), mHistory.end(), msg,
[](const TopologyMessage& lhs, const TopologyMessage& rhs) {
return lhs.seq() < rhs.seq();
});
if (hit == mHistory.end() || hit->seq() != msg.seq()) {
// new message, insert
mHistory.insert(hit, msg);
}
}
// Update Message History
if (msg.seq() == 0) {
AddNovelMessage(msg, &cit->second);
} else {
AddOldMessage(msg, &cit->second);
}

// Remove Unused
PruneRemoved();

// ensure all clients are up-to-date with changes
Broadcast();
}

void TopologyServer::PurgeDisconnected() {
{
// one time, at the beginning of the server any disconnected nodes have left messages sent
std::lock_guard<std::mutex> lk(mMtx);
std::unordered_map<uint64_t, int> connected;
for (const auto& pair : mClients) {
connected.emplace(pair.second.nodeId, pair.first);
}
// one time, at the beginning of the server nodes that
// aren't connected will be remove (we'll send LEAVE messages)
std::lock_guard<std::mutex> lk(mMtx);
std::unordered_map<uint64_t, int> connected;
for (const auto& pair : mClients) {
connected.emplace(pair.second.nodeId, pair.first);
}

std::unordered_set<uint64_t> historicallyActive;
for (const auto& msg : mHistory) {
if (!msg.has_node_change()) continue;
std::unordered_set<uint64_t> historicallyActive;
for (const auto& msg : mHistory) {
if (!msg.has_node_change()) continue;

if (msg.node_change().op() == ips::JOIN) {
historicallyActive.emplace(msg.node_change().id());
} else if (msg.node_change().op() == ips::LEAVE) {
historicallyActive.erase(msg.node_change().id());
}
if (msg.node_change().op() == ips::JOIN) {
historicallyActive.emplace(msg.node_change().id());
} else if (msg.node_change().op() == ips::LEAVE) {
historicallyActive.erase(msg.node_change().id());
}
}

// construct LEAVE messages for each historically active node that isn't connected
for (uint64_t id : historicallyActive) {
if (connected.count(id) == 0) {
auto& msg = mHistory.emplace_back();
msg.set_seq(mNextSeq++);
auto nodeChangePtr = msg.mutable_node_change();
nodeChangePtr->set_id(id);
nodeChangePtr->set_op(NodeOperation::LEAVE);
}
// construct LEAVE messages for each historically active node that isn't
// connected
for (uint64_t id : historicallyActive) {
if (connected.count(id) == 0) {
auto& msg = mHistory.emplace_back();
msg.set_seq(mNextSeq++);
auto nodeChangePtr = msg.mutable_node_change();
nodeChangePtr->set_id(id);
nodeChangePtr->set_op(NodeOperation::LEAVE);
}
}

// Remove Unused
PruneRemoved();

// Update Clients
Broadcast();
}

Expand Down
Loading

0 comments on commit 68af5d5

Please sign in to comment.