Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport(ptp): move from protobuf to fixed-size c-struct #388

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/transport/PointToPointMessage.h>
#include <faabric/util/config.h>
#include <faabric/util/locks.h>

Expand Down Expand Up @@ -120,27 +121,16 @@ class PointToPointBroker

void updateHostForIdx(int groupId, int groupIdx, std::string newHost);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
std::string hostHint,
bool mustOrderMsg = false);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
bool mustOrderMsg = false,
int sequenceNum = NO_SEQUENCE_NUM,
std::string hostHint = "");

std::vector<uint8_t> recvMessage(int groupId,
int sendIdx,
int recvIdx,
bool mustOrderMsg = false);
void recvMessage(PointToPointMessage& msg, bool mustOrderMsg = false);

void clearGroup(int groupId);

Expand All @@ -163,7 +153,8 @@ class PointToPointBroker

std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId);

Message doRecvMessage(int groupId, int sendIdx, int recvIdx);
// Returns the message response code and the sequence number
std::pair<MessageResponseCode, int> doRecvMessage(PointToPointMessage& msg);

void initSequenceCounters(int groupId);

Expand Down
7 changes: 4 additions & 3 deletions include/faabric/transport/PointToPointClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/PointToPointCall.h>
#include <faabric/transport/PointToPointMessage.h>

namespace faabric::transport {

std::vector<std::pair<std::string, faabric::PointToPointMappings>>
getSentMappings();

std::vector<std::pair<std::string, faabric::PointToPointMessage>>
std::vector<std::pair<std::string, PointToPointMessage>>
getSentPointToPointMessages();

std::vector<std::tuple<std::string,
faabric::transport::PointToPointCall,
faabric::PointToPointMessage>>
PointToPointMessage>>
getSentLockMessages();

void clearSentMessages();
Expand All @@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient

void sendMappings(faabric::PointToPointMappings& mappings);

void sendMessage(faabric::PointToPointMessage& msg,
void sendMessage(const PointToPointMessage& msg,
int sequenceNum = NO_SEQUENCE_NUM);

void groupLock(int appId,
Expand Down
45 changes: 45 additions & 0 deletions include/faabric/transport/PointToPointMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <cstdint>
#include <span>

namespace faabric::transport {

/* Simple fixed-size C-struct to capture the state of a PTP message moving
* through Faabric.
*
* We require fixed-size, and no unique pointers to be able to use
* high-throughput ring-buffers to send the messages around. This also means
* that we manually malloc/free the data pointer. The message size is:
* 4 * int32_t = 4 * 4 bytes = 16 bytes
* 1 * size_t = 1 * 8 bytes = 8 bytes
* 1 * void* = 1 * 8 bytes = 8 bytes
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
*/
struct PointToPointMessage
{
int32_t appId;
int32_t groupId;
int32_t sendIdx;
int32_t recvIdx;
size_t dataSize;
void* dataPtr;
};
static_assert((sizeof(PointToPointMessage) % 8) == 0,
"PTP message mus be 8-aligned!");

// The wire format for a PTP message is very simple: the fixed-size struct,
// followed by dataSize bytes containing the payload.
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);

// This parsing function mallocs space for the message payload. This is to
// keep the PTP message at fixed-size, and be able to efficiently move it
// around in-memory queues
void parsePtpMsg(std::span<const uint8_t> bytes, PointToPointMessage* msg);

// Alternative signature for parsing PTP messages for when the caller can
// provide an already-allocated buffer to write into
void parsePtpMsg(std::span<const uint8_t> bytes,
PointToPointMessage* msg,
std::span<uint8_t> preAllocBuffer);
}
39 changes: 26 additions & 13 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <faabric/mpi/MpiWorld.h>
#include <faabric/mpi/mpi.pb.h>
#include <faabric/planner/PlannerClient.h>
#include <faabric/transport/PointToPointMessage.h>
#include <faabric/transport/macros.h>
#include <faabric/util/ExecGraph.h>
#include <faabric/util/batch.h>
Expand Down Expand Up @@ -60,14 +61,16 @@
throw std::runtime_error("Error serialising message");
}
try {
broker.sendMessage(
thisRankMsg->groupid(),
sendRank,
recvRank,
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
serialisedBuffer.size(),
dstHost,
true);
// It is safe to send a pointer to a stack-allocated object
// because the broker will make an additional copy (and so will NNG!)
faabric::transport::PointToPointMessage msg(
{ .groupId = thisRankMsg->groupid(),
.sendIdx = sendRank,
.recvIdx = recvRank,
.dataSize = serialisedBuffer.size(),
.dataPtr = (void*)serialisedBuffer.data() });

Check warning on line 71 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L66-L71

Added lines #L66 - L71 were not covered by tests

broker.sendMessage(msg, dstHost, true);

Check warning on line 73 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L73

Added line #L73 was not covered by tests
} catch (std::runtime_error& e) {
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}",
thisRankMsg->appid(),
Expand All @@ -82,10 +85,12 @@
std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
int recvRank)
{
std::vector<uint8_t> msg;
faabric::transport::PointToPointMessage msg(
{ .groupId = thisRankMsg->groupid(),
.sendIdx = sendRank,
.recvIdx = recvRank });

Check warning on line 91 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L88-L91

Added lines #L88 - L91 were not covered by tests
try {
msg =
broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true);
broker.recvMessage(msg, true);

Check warning on line 93 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L93

Added line #L93 was not covered by tests
} catch (std::runtime_error& e) {
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}",
thisRankMsg->appid(),
Expand All @@ -95,7 +100,12 @@
recvRank);
throw e;
}
PARSE_MSG(MPIMessage, msg.data(), msg.size());

// Parsing into the protobuf makes a copy of the message, so we can
// free the heap pointer after
PARSE_MSG(MPIMessage, msg.dataPtr, msg.dataSize);
faabric::util::free(msg.dataPtr);

Check warning on line 107 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L106-L107

Added lines #L106 - L107 were not covered by tests

return std::make_shared<MPIMessage>(parsedMsg);
}

Expand Down Expand Up @@ -599,7 +609,10 @@
// Assert message integrity
// Note - this checks won't happen in Release builds
if (m->messagetype() != messageType) {
SPDLOG_ERROR("Different message types (got: {}, expected: {})",
SPDLOG_ERROR("{}:{}:{} Different message types (got: {}, expected: {})",
m->worldid(),
m->sender(),
m->destination(),

Check warning on line 615 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L612-L615

Added lines #L612 - L615 were not covered by tests
m->messagetype(),
messageType);
}
Expand Down
8 changes: 0 additions & 8 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,6 @@ message StateAppendedResponse {
// POINT-TO-POINT
// ---------------------------------------------

message PointToPointMessage {
int32 appId = 1;
int32 groupId = 2;
int32 sendIdx = 3;
int32 recvIdx = 4;
bytes data = 5;
}

message PointToPointMappings {
int32 appId = 1;
int32 groupId = 2;
Expand Down
26 changes: 23 additions & 3 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,32 @@
auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId);
groupIdxs.erase(0);
for (const auto& recvIdx : groupIdxs) {
broker.sendMessage(
groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int));
// It is safe to send a pointer to the stack, because the
// transport layer will perform an additional copy of the PTP
// message to put it in the message body
// TODO(no-inproc): this may not be true once we move the inproc
// sockets to in-memory queues
faabric::transport::PointToPointMessage msg(
{ .groupId = groupId,
.sendIdx = 0,
.recvIdx = recvIdx,
.dataSize = sizeof(int),
.dataPtr = &newGroupId });

broker.sendMessage(msg);
}
} else if (overwriteNewGroupId == 0) {
std::vector<uint8_t> bytes = broker.recvMessage(groupId, 0, groupIdx);
faabric::transport::PointToPointMessage msg(
{ .groupId = groupId, .sendIdx = 0, .recvIdx = groupIdx });

Check warning on line 478 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L477-L478

Added lines #L477 - L478 were not covered by tests
// TODO(no-order): when we remove the need to order ptp messages we
// should be able to call recv giving it a pre-allocated buffer,
// avoiding the hassle of malloc-ing and free-ing
broker.recvMessage(msg);
std::vector<uint8_t> bytes((uint8_t*)msg.dataPtr,
(uint8_t*)msg.dataPtr + msg.dataSize);

Check warning on line 484 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L482-L484

Added lines #L482 - L484 were not covered by tests
newGroupId = faabric::util::bytesToInt(bytes);
// The previous call makes a copy, so safe to free now
faabric::util::free(msg.dataPtr);

Check warning on line 487 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L487

Added line #L487 was not covered by tests
} else {
// In some settings, like tests, we already know the new group id, so
// we can set it here (and in fact, we need to do so when faking two
Expand Down
1 change: 1 addition & 0 deletions src/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ faabric_lib(transport
MessageEndpointServer.cpp
PointToPointBroker.cpp
PointToPointClient.cpp
PointToPointMessage.cpp
PointToPointServer.cpp
)

Expand Down
1 change: 1 addition & 0 deletions src/transport/MessageEndpointClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void MessageEndpointClient::asyncSend(int header,
sequenceNum);
}

// TODO: consider making an iovec-style scatter/gather alternative signature
void MessageEndpointClient::asyncSend(int header,
const uint8_t* buffer,
size_t bufferSize,
Expand Down
Loading
Loading