Skip to content

Commit 258c63a

Browse files
authored
Executor context (#228)
* Add faabric execution context * Trailing whitespace * Add impl to cmake * Exec context tests * Fix EC test * Add isSet * Tidy up imports * Add convenience method to test fixture * Add test for context in executor
1 parent f524080 commit 258c63a

File tree

9 files changed

+263
-34
lines changed

9 files changed

+263
-34
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#pragma once
2+
3+
#include <faabric/proto/faabric.pb.h>
4+
#include <faabric/scheduler/Scheduler.h>
5+
6+
namespace faabric::scheduler {
7+
8+
/**
9+
* Globally-accessible wrapper that allows executing applications to query
10+
* their execution context. The context is thread-local, so applications can
11+
* query which specific message they are executing.
12+
*/
13+
class ExecutorContext
14+
{
15+
public:
16+
ExecutorContext(Executor* executorIn,
17+
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
18+
int msgIdx);
19+
20+
static bool isSet();
21+
22+
static void set(Executor* executorIn,
23+
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
24+
int msgIdxIn);
25+
26+
static void unset();
27+
28+
static std::shared_ptr<ExecutorContext> get();
29+
30+
Executor* getExecutor() { return executor; }
31+
32+
std::shared_ptr<faabric::BatchExecuteRequest> getBatchRequest()
33+
{
34+
return req;
35+
}
36+
37+
faabric::Message& getMsg()
38+
{
39+
if (req == nullptr) {
40+
throw std::runtime_error(
41+
"Getting message when no request set in context");
42+
}
43+
return req->mutable_messages()->at(msgIdx);
44+
}
45+
46+
int getMsgIdx() { return msgIdx; }
47+
48+
private:
49+
Executor* executor = nullptr;
50+
std::shared_ptr<faabric::BatchExecuteRequest> req = nullptr;
51+
int msgIdx = 0;
52+
};
53+
}

include/faabric/scheduler/Scheduler.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,11 @@ class ExecutorTask
3838

3939
ExecutorTask(int messageIndexIn,
4040
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
41-
std::shared_ptr<std::atomic<int>> batchCounterIn,
42-
bool skipResetIn);
41+
std::shared_ptr<std::atomic<int>> batchCounterIn);
4342

4443
std::shared_ptr<faabric::BatchExecuteRequest> req;
4544
std::shared_ptr<std::atomic<int>> batchCounter;
4645
int messageIndex = 0;
47-
bool skipReset = false;
4846
};
4947

5048
class Executor
@@ -122,10 +120,6 @@ class Executor
122120
void threadPoolThread(int threadPoolIdx);
123121
};
124122

125-
Executor* getExecutingExecutor();
126-
127-
void setExecutingExecutor(Executor* exec);
128-
129123
class Scheduler
130124
{
131125
public:

src/scheduler/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
faabric_lib(scheduler
22
ExecGraph.cpp
3+
ExecutorContext.cpp
34
ExecutorFactory.cpp
45
Executor.cpp
56
FunctionCallClient.cpp

src/scheduler/Executor.cpp

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <faabric/proto/faabric.pb.h>
2+
#include <faabric/scheduler/ExecutorContext.h>
23
#include <faabric/scheduler/MpiWorldRegistry.h>
34
#include <faabric/scheduler/Scheduler.h>
45
#include <faabric/snapshot/SnapshotRegistry.h>
@@ -31,26 +32,12 @@
3132

3233
namespace faabric::scheduler {
3334

34-
static thread_local Executor* executingExecutor = nullptr;
35-
36-
Executor* getExecutingExecutor()
37-
{
38-
return executingExecutor;
39-
}
40-
41-
void setExecutingExecutor(Executor* exec)
42-
{
43-
executingExecutor = exec;
44-
}
45-
4635
ExecutorTask::ExecutorTask(int messageIndexIn,
4736
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
48-
std::shared_ptr<std::atomic<int>> batchCounterIn,
49-
bool skipResetIn)
37+
std::shared_ptr<std::atomic<int>> batchCounterIn)
5038
: req(std::move(reqIn))
5139
, batchCounter(std::move(batchCounterIn))
5240
, messageIndex(messageIndexIn)
53-
, skipReset(skipResetIn)
5441
{}
5542

5643
// TODO - avoid the copy of the message here?
@@ -87,7 +74,7 @@ void Executor::finish()
8774
// Send a kill message
8875
SPDLOG_TRACE("Executor {} killing thread pool {}", id, i);
8976
threadTaskQueues[i].enqueue(
90-
ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr, false));
77+
ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr));
9178

9279
faabric::util::UniqueLock threadsLock(threadsMutex);
9380
// Copy shared_ptr to avoid racing
@@ -277,11 +264,6 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
277264
// Set up shared counter for this batch of tasks
278265
auto batchCounter = std::make_shared<std::atomic<int>>(msgIdxs.size());
279266

280-
// Work out if we should skip the reset after this batch. This happens
281-
// for threads, as they will be restored from their respective snapshot
282-
// on the next execution.
283-
bool skipReset = isThreads;
284-
285267
// Iterate through and invoke tasks. By default, we allocate tasks
286268
// one-to-one with thread pool threads. Only once the pool is exhausted
287269
// do we start overloading
@@ -328,7 +310,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
328310

329311
// Enqueue the task
330312
threadTaskQueues[threadPoolIdx].enqueue(
331-
ExecutorTask(msgIdx, req, batchCounter, skipReset));
313+
ExecutorTask(msgIdx, req, batchCounter));
332314

333315
// Lazily create the thread
334316
if (threadPoolThreads.at(threadPoolIdx) == nullptr) {
@@ -452,8 +434,8 @@ void Executor::threadPoolThread(int threadPoolIdx)
452434
isThreads,
453435
msg.groupid());
454436

455-
// Set executing executor
456-
setExecutingExecutor(this);
437+
// Set up context
438+
ExecutorContext::set(this, task.req, task.messageIndex);
457439

458440
// Execute the task
459441
int32_t returnValue;
@@ -488,6 +470,9 @@ void Executor::threadPoolThread(int threadPoolIdx)
488470
msg.set_outputdata(errorMessage);
489471
}
490472

473+
// Unset context
474+
ExecutorContext::unset();
475+
491476
// Handle thread-local diffing for every thread
492477
if (doDirtyTracking) {
493478
// Stop dirty tracking
@@ -590,7 +575,9 @@ void Executor::threadPoolThread(int threadPoolIdx)
590575
// claim. Note that we have to release the claim _after_ resetting,
591576
// otherwise the executor won't be ready for reuse
592577
if (isLastInBatch) {
593-
if (task.skipReset) {
578+
// Threads skip the reset as they will be restored from their
579+
// respective snapshot on the next execution.
580+
if (isThreads) {
594581
SPDLOG_TRACE("Skipping reset for {}",
595582
faabric::util::funcToString(msg, true));
596583
} else {

src/scheduler/ExecutorContext.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#include <faabric/scheduler/ExecutorContext.h>
2+
3+
namespace faabric::scheduler {
4+
5+
static thread_local std::shared_ptr<ExecutorContext> context = nullptr;
6+
7+
ExecutorContext::ExecutorContext(
8+
Executor* executorIn,
9+
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
10+
int msgIdxIn)
11+
: executor(executorIn)
12+
, req(reqIn)
13+
, msgIdx(msgIdxIn)
14+
{}
15+
16+
bool ExecutorContext::isSet()
17+
{
18+
return context != nullptr;
19+
}
20+
21+
void ExecutorContext::set(Executor* executorIn,
22+
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
23+
int appIdxIn)
24+
{
25+
context = std::make_shared<ExecutorContext>(executorIn, reqIn, appIdxIn);
26+
}
27+
28+
void ExecutorContext::unset()
29+
{
30+
context = nullptr;
31+
}
32+
33+
std::shared_ptr<ExecutorContext> ExecutorContext::get()
34+
{
35+
if (context == nullptr) {
36+
SPDLOG_ERROR("No executor context set");
37+
throw std::runtime_error("No executor context set");
38+
}
39+
return context;
40+
}
41+
}

tests/test/scheduler/test_executor.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <faabric/proto/faabric.pb.h>
88
#include <faabric/redis/Redis.h>
9+
#include <faabric/scheduler/ExecutorContext.h>
910
#include <faabric/scheduler/ExecutorFactory.h>
1011
#include <faabric/scheduler/FunctionCallClient.h>
1112
#include <faabric/scheduler/Scheduler.h>
@@ -225,6 +226,36 @@ int32_t TestExecutor::executeTask(
225226
return 20;
226227
}
227228

229+
if (msg.function() == "context-check") {
230+
std::shared_ptr<faabric::scheduler::ExecutorContext> ctx =
231+
faabric::scheduler::ExecutorContext::get();
232+
if (ctx == nullptr) {
233+
SPDLOG_ERROR("Executor context is null");
234+
return 999;
235+
}
236+
237+
if (ctx->getExecutor() != this) {
238+
SPDLOG_ERROR("Executor not equal to this one");
239+
return 999;
240+
}
241+
242+
if (ctx->getBatchRequest()->id() != reqOrig->id()) {
243+
SPDLOG_ERROR("Context request does not match ({} != {})",
244+
ctx->getBatchRequest()->id(),
245+
reqOrig->id());
246+
return 999;
247+
}
248+
249+
if (ctx->getMsgIdx() != msgIdx) {
250+
SPDLOG_ERROR("Context message idx does not match ({} != {})",
251+
ctx->getMsgIdx(),
252+
msgIdx);
253+
return 999;
254+
}
255+
256+
return 123;
257+
}
258+
228259
if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) {
229260
SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id());
230261
return msg.id() / 100;
@@ -1029,4 +1060,23 @@ TEST_CASE_METHOD(TestExecutorFixture,
10291060

10301061
setMockMode(false);
10311062
}
1063+
1064+
TEST_CASE_METHOD(TestExecutorFixture,
1065+
"Test executor sees context",
1066+
"[executor]")
1067+
{
1068+
int nMessages = 5;
1069+
std::shared_ptr<BatchExecuteRequest> req =
1070+
faabric::util::batchExecFactory("dummy", "context-check", nMessages);
1071+
int expectedResult = 123;
1072+
1073+
sch.callFunctions(req);
1074+
1075+
for (int i = 0; i < nMessages; i++) {
1076+
faabric::Message res =
1077+
sch.getFunctionResult(req->messages().at(i).id(), 2000);
1078+
1079+
REQUIRE(res.returnvalue() == expectedResult);
1080+
}
1081+
}
10321082
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#include <catch2/catch.hpp>
2+
3+
#include "faabric_utils.h"
4+
5+
#include <faabric/scheduler/ExecutorContext.h>
6+
#include <faabric/util/func.h>
7+
8+
using namespace faabric::scheduler;
9+
10+
namespace tests {
11+
12+
TEST_CASE_METHOD(ExecutorContextTestFixture,
13+
"Test executor context",
14+
"[scheduler]")
15+
{
16+
REQUIRE(!ExecutorContext::isSet());
17+
18+
// Getting with no context should fail
19+
REQUIRE_THROWS(ExecutorContext::get());
20+
21+
faabric::Message msg = faabric::util::messageFactory("foo", "bar");
22+
23+
std::shared_ptr<DummyExecutorFactory> fac =
24+
std::make_shared<DummyExecutorFactory>();
25+
auto exec = fac->createExecutor(msg);
26+
27+
auto req = faabric::util::batchExecFactory("foo", "bar", 5);
28+
29+
SECTION("Set both executor and request")
30+
{
31+
ExecutorContext::set(exec.get(), req, 3);
32+
33+
std::shared_ptr<ExecutorContext> ctx = ExecutorContext::get();
34+
REQUIRE(ctx->getExecutor() == exec.get());
35+
REQUIRE(ctx->getBatchRequest() == req);
36+
REQUIRE(ctx->getMsgIdx() == 3);
37+
REQUIRE(ctx->getMsg().id() == req->mutable_messages()->at(3).id());
38+
}
39+
40+
SECTION("Just set executor")
41+
{
42+
ExecutorContext::set(exec.get(), nullptr, 0);
43+
44+
std::shared_ptr<ExecutorContext> ctx = ExecutorContext::get();
45+
REQUIRE(ctx->getExecutor() == exec.get());
46+
REQUIRE(ctx->getBatchRequest() == nullptr);
47+
REQUIRE(ctx->getMsgIdx() == 0);
48+
49+
REQUIRE_THROWS(ctx->getMsg());
50+
}
51+
52+
SECTION("Just set request")
53+
{
54+
ExecutorContext::set(nullptr, req, 3);
55+
56+
std::shared_ptr<ExecutorContext> ctx = ExecutorContext::get();
57+
REQUIRE(ctx->getExecutor() == nullptr);
58+
REQUIRE(ctx->getBatchRequest() == req);
59+
REQUIRE(ctx->getMsgIdx() == 3);
60+
REQUIRE(ctx->getMsg().id() == req->mutable_messages()->at(3).id());
61+
}
62+
63+
ExecutorContext::unset();
64+
REQUIRE_THROWS(ExecutorContext::get());
65+
}
66+
}

tests/utils/DummyExecutorFactory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ class DummyExecutorFactory : public ExecutorFactory
1111

1212
int getFlushCount();
1313

14-
protected:
1514
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override;
1615

16+
protected:
1717
void flushHost() override;
1818

1919
private:

0 commit comments

Comments
 (0)