Skip to content

Commit

Permalink
threads: handle zero-sized requests and preload a la mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed May 9, 2024
1 parent 0c44a40 commit df2c945
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 39 deletions.
7 changes: 7 additions & 0 deletions src/batch-scheduler/BinPackScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ std::shared_ptr<SchedulingDecision> BinPackScheduler::makeSchedulingDecision(
auto decisionType = getDecisionType(inFlightReqs, req);
auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType);

// For an OpenMP request with the single host hint, we only consider
// scheduling in one VM
bool isOmp = req->messages_size() > 0 && req->messages(0).isomp();
if (req->singlehosthint() && isOmp) {
sortedHosts.erase(sortedHosts.begin() + 1, sortedHosts.end());

Check warning on line 314 in src/batch-scheduler/BinPackScheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/batch-scheduler/BinPackScheduler.cpp#L314

Added line #L314 was not covered by tests
}

// Assign slots from the list (i.e. bin-pack)
auto it = sortedHosts.begin();
int numLeftToSchedule = req->messages_size();
Expand Down
2 changes: 1 addition & 1 deletion src/batch-scheduler/SchedulingDecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bool SchedulingDecision::isSingleHost() const

std::string thisHost = conf.endpointHost;
std::set<std::string> hostSet(hosts.begin(), hosts.end());
return hostSet.size() == 1;
return hostSet.size() <= 1;
}

void SchedulingDecision::addMessage(const std::string& host,
Expand Down
98 changes: 65 additions & 33 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

// Special group ID magic to indicate MPI decisions that we have preemptively
// scheduled
#define MPI_PRELOADED_DECISION_GROUPID -99
#define FIXED_SIZE_PRELOADED_DECISION_GROUPID -99

namespace faabric::planner {

Expand Down Expand Up @@ -781,7 +781,8 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE;
bool isDistChange =
decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE;
bool existsPreloadedDec = state.preloadedSchedulingDecisions.contains(appId);
bool existsPreloadedDec =
state.preloadedSchedulingDecisions.contains(appId);

// For a SCALE_CHANGE decision (i.e. fork) with the elastic flag set, we
// want to scale up to as many available cores as possible in the app's
Expand All @@ -793,26 +794,49 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)

int numAvail = availableSlots(state.hostMap.at(mainHost));
int numRequested = req->messages_size();
int lastMsgIdx = req->messages(numRequested - 1).groupidx();
for (int itr = 0; itr < (numAvail - numRequested); itr++) {
int lastMsgIdx =
numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx();
for (int itr = 0; itr < (numAvail - numRequested); itr++) {

Check warning on line 799 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L795-L799

Added lines #L795 - L799 were not covered by tests
// Differentiate between the position in the message array (itr)
// and the new group index. Usually, in a fork, they would be
// offset by one
int msgIdx = lastMsgIdx + itr + 1;
SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", msgIdx, appId);
SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})",
msgIdx,
appId);

Check warning on line 806 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L803-L806

Added lines #L803 - L806 were not covered by tests

// To add a new message, copy from the last, and update the indexes
*req->add_messages() = req->messages(numRequested - 1);
req->mutable_messages(numRequested + itr)->set_appidx(msgIdx);
req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx);
if (numRequested == 0) {

Check warning on line 809 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L809

Added line #L809 was not covered by tests
// This is a special case where we scale up from zero
// parallelism (i.e. 1 OpenMP thread) that requires special
// care
auto* newMsg = req->add_messages();
*newMsg = state.inFlightReqs.at(appId).first->messages(0);
newMsg->set_mainhost(mainHost);
newMsg->set_appidx(msgIdx);
newMsg->set_groupidx(msgIdx);

Check warning on line 817 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L813-L817

Added lines #L813 - L817 were not covered by tests

// For requests that elastically scale from 1 (i.e. zero-
// parallelism) we make use of the group id field to pass the
// actual function pointer as a hack
newMsg->set_funcptr(req->groupid());
} else {
*req->add_messages() = req->messages(numRequested - 1);
req->mutable_messages(numRequested + itr)->set_appidx(msgIdx);
req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx);

Check warning on line 826 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L822-L826

Added lines #L822 - L826 were not covered by tests
}

// Also update the message id to make sure we can wait-for and
// clean-up the resources we use
req->mutable_messages(numRequested + itr)->set_id(faabric::util::generateGid());
req->mutable_messages(numRequested + itr)
->set_id(faabric::util::generateGid());

Check warning on line 832 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L831-L832

Added lines #L831 - L832 were not covered by tests
}

if (numAvail > numRequested) {
SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", appId, numRequested, numAvail);
SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})",
appId,
numRequested,
numAvail);
} else {
SPDLOG_INFO("Decided NOT to elastically scaled-up app {}", appId);

Check warning on line 841 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L835-L841

Added lines #L835 - L841 were not covered by tests
}
Expand All @@ -831,13 +855,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}
}

// For a NEW decision of an MPI application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks, and
// consume them later as a preloaded scheduling decision
// For a NEW decision of an MPI/OpenMP application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks/OMP
// threads, and consume them later as a preloaded scheduling decision
bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW;
bool isMpi = req->messages(0).ismpi();
std::shared_ptr<BatchExecuteRequest> mpiReq = nullptr;
bool isMpi = req->messages_size() > 0 && req->messages(0).ismpi();
bool isOmp = req->messages_size() > 0 && req->messages(0).isomp();
std::shared_ptr<BatchExecuteRequest> knownSizeReq = nullptr;

// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
Expand All @@ -849,25 +874,29 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// In general, after a scale change decision (that has been preloaded)
// it is safe to remove it
if (isScaleChange) {
SPDLOG_DEBUG("Removing pre-loaded scheduling decision for app {}",
appId);
state.preloadedSchedulingDecisions.erase(appId);
}
} else if (isNew && isMpi) {
mpiReq = std::make_shared<BatchExecuteRequest>();
*mpiReq = *req;
} else if (isNew && (isMpi || isOmp)) {
knownSizeReq = std::make_shared<BatchExecuteRequest>();
*knownSizeReq = *req;

// Deep-copy as many messages we can from the original BER, and mock
// the rest
for (int i = req->messages_size(); i < req->messages(0).mpiworldsize();
i++) {
auto* newMpiMsg = mpiReq->add_messages();
size_t reqSize = isMpi ? req->messages(0).mpiworldsize()
: req->messages(0).ompnumthreads();
assert(reqSize > 0);
for (int i = req->messages_size(); i < reqSize; i++) {
auto* newMpiMsg = knownSizeReq->add_messages();

newMpiMsg->set_appid(req->appid());
newMpiMsg->set_groupidx(i);
}
assert(mpiReq->messages_size() == req->messages(0).mpiworldsize());
assert(knownSizeReq->messages_size() == reqSize);

decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, mpiReq);
hostMapCopy, state.inFlightReqs, knownSizeReq);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
Expand Down Expand Up @@ -953,7 +982,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}

// Skip claiming slots and ports if we have preemptively allocated them
bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID;
bool skipClaim = decision->groupId == FIXED_SIZE_PRELOADED_DECISION_GROUPID;

// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
Expand Down Expand Up @@ -990,17 +1019,19 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// For a NEW MPI decision that was not preloaded we have
// preemptively scheduled all MPI messages but now we just need to
// return the first one, and preload the rest
if (isMpi && mpiReq != nullptr) {
auto mpiDecision = std::make_shared<
if ((isMpi || isOmp) && knownSizeReq != nullptr) {
auto knownSizeDecision = std::make_shared<
faabric::batch_scheduler::SchedulingDecision>(req->appid(),
req->groupid());
*mpiDecision = *decision;
mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = mpiDecision;
*knownSizeDecision = *decision;
knownSizeDecision->groupId =
FIXED_SIZE_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = knownSizeDecision;

// Remove all messages that we do not have to dispatch now
for (int i = 1; i < mpiDecision->messageIds.size(); i++) {
decision->removeMessage(mpiDecision->messageIds.at(i));
for (int i = 1; i < knownSizeDecision->messageIds.size(); i++) {
decision->removeMessage(
knownSizeDecision->messageIds.at(i));
}
}

Expand Down Expand Up @@ -1192,7 +1223,8 @@ void Planner::dispatchSchedulingDecision(
// Propagate the single host hint
hostRequests[thisHost]->set_singlehosthint(req->singlehosthint());
// Propagate the elastic scaling hint
hostRequests[thisHost]->set_elasticscalehint(req->elasticscalehint());
hostRequests[thisHost]->set_elasticscalehint(
req->elasticscalehint());
}

*hostRequests[thisHost]->add_messages() = msg;
Expand Down
13 changes: 8 additions & 5 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions(
// to other hosts. Given that we don't support nested threading, if we
// have a THREADS request here it means that we are being called from the
// main thread (which holds the main snapshot)
const std::string funcStr =
faabric::util::funcToString(req->messages(0), false);
std::string snapshotKey;
auto& reg = faabric::snapshot::getSnapshotRegistry();

std::string snapshotKey;
const auto firstMsg = req->messages(0);
if (isThreads) {
// Note that with threads we may have 0-sized BERs
if (isThreads && req->messages_size() > 0) {
const std::string funcStr =
faabric::util::funcToString(req->messages(0), false);

const auto firstMsg = req->messages(0);

if (!firstMsg.snapshotkey().empty()) {
SPDLOG_ERROR("{} should not provide snapshot key for {} threads",
funcStr,
Expand Down
5 changes: 5 additions & 0 deletions src/planner/PlannerEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ void PlannerEndpointHandler::onRequest(
inFlightPair.first->messages(0).mpiworldsize());
}

if (inFlightPair.first->messages(0).isomp()) {
inFlightAppResp->set_size(
inFlightPair.first->messages(0).ompnumthreads());

Check warning on line 192 in src/planner/PlannerEndpointHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/PlannerEndpointHandler.cpp#L191-L192

Added lines #L191 - L192 were not covered by tests
}

for (const auto& hostIp : decision->hosts) {
inFlightAppResp->add_hostips(hostIp);
}
Expand Down
4 changes: 4 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ message Message {
repeated int32 chainedMsgIds = 36;
map<string, int32> intExecGraphDetails = 37;
map<string, string> execGraphDetails = 38;

// OpenMP
bool isOmp = 39;
int32 ompNumThreads = 40;
}

// ---------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ long Scheduler::getFunctionExecutorCount(const faabric::Message& msg)

void Scheduler::executeBatch(std::shared_ptr<faabric::BatchExecuteRequest> req)
{
if (req->messages_size() == 0) {
return;

Check warning on line 253 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L253

Added line #L253 was not covered by tests
}

faabric::util::FullLock lock(mx);

bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
Expand Down

0 comments on commit df2c945

Please sign in to comment.