Skip to content

Commit

Permalink
An attempt to fix potential deadlock issues in Windows.
Browse files Browse the repository at this point in the history
Removes synchronization with worker threads on shutdown. Also removes the "search" for the main executor in the worker threads.

Instead we simply pass the main executor to the thread as a parameter.  We also pass the underlying shared_ptr to avoid potential edge cases where reference count drops to zero before some threads initialize.

I made the run_worker static to avoid any confusion about "this" vs "executor->ptr", and so it uses the shared_ptr to reference the shared memory.

The last worker thread will delete the shared memory, via the shared_ptr reference count.
  • Loading branch information
mathgeekcoder committed Sep 13, 2024
1 parent 0f31d20 commit 338c47d
Showing 1 changed file with 16 additions and 23 deletions.
39 changes: 16 additions & 23 deletions src/parallel/HighsTaskExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,23 @@ class HighsTaskExecutor {
return nullptr;
}

void run_worker(int workerId) {
// spin until the global executor pointer is set up
ExecutorHandle* executor;
// Following yields warning C4706: assignment within conditional
// expression when building libhighs on Windows (/W4):
//
// while (!(executor = mainWorkerHandle.load(std::memory_order_acquire)))
// HighsSpinMutex::yieldProcessor();
while (true) {
executor = mainWorkerHandle.load(std::memory_order_acquire);
if (executor != nullptr) break;
HighsSpinMutex::yieldProcessor();
}
static void run_worker(
int workerId,
ExecutorHandle* executor,
highs::cache_aligned::shared_ptr<HighsTaskExecutor> ref) {

// now acquire a reference count of the global executor
threadLocalExecutorHandle() = *executor;
HighsSplitDeque* localDeque = workerDeques[workerId].get();
HighsSplitDeque* localDeque = ref->workerDeques[workerId].get();
threadLocalWorkerDeque() = localDeque;
HighsTask* currentTask = workerBunk->waitForNewTask(localDeque);
HighsTask* currentTask = ref->workerBunk->waitForNewTask(localDeque);
while (currentTask != nullptr) {
localDeque->runStolenTask(currentTask);

currentTask = random_steal_loop(localDeque);
currentTask = ref->random_steal_loop(localDeque);
if (currentTask != nullptr) continue;

currentTask = workerBunk->waitForNewTask(localDeque);
currentTask = ref->workerBunk->waitForNewTask(localDeque);
}
}

Expand All @@ -124,8 +116,12 @@ class HighsTaskExecutor {
workerBunk, workerDeques.data(), i, numThreads);

threadLocalWorkerDeque() = workerDeques[0].get();
for (int i = 1; i < numThreads; ++i)
std::thread([&](int id) { run_worker(id); }, i).detach();
}

void init(ExecutorHandle* executor) {
for (int i = 1, numThreads = workerDeques.size(); i < numThreads; ++i) {
std::thread(&HighsTaskExecutor::run_worker, i, executor, executor->ptr).detach();
}
}

static HighsSplitDeque* getThisWorkerDeque() {
Expand All @@ -143,16 +139,13 @@ class HighsTaskExecutor {
cache_aligned::make_shared<HighsTaskExecutor>(numThreads);
executorHandle.ptr->mainWorkerHandle.store(&executorHandle,
std::memory_order_release);
executorHandle.ptr->init(&executorHandle);
}
}

static void shutdown(bool blocking = false) {
auto& executorHandle = threadLocalExecutorHandle();
if (executorHandle.ptr) {
// first spin until every worker has acquired its executor reference
while (executorHandle.ptr.use_count() !=
(long)executorHandle.ptr->workerDeques.size())
HighsSpinMutex::yieldProcessor();
// set the active flag to false first with release ordering
executorHandle.ptr->mainWorkerHandle.store(nullptr,
std::memory_order_release);
Expand Down

0 comments on commit 338c47d

Please sign in to comment.