Skip to content

Commit

Permalink
Revision: add celerity blockchain for task divergence checking
Browse files Browse the repository at this point in the history
  • Loading branch information
GagaLP committed Dec 18, 2023
1 parent 1968c3d commit 3fe7ae2
Show file tree
Hide file tree
Showing 17 changed files with 163 additions and 150 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Add divergence check blockchain for automatic detection of diverging tasks in debug mode (#217)
- Add automatic detection of diverging execution in debug mode (#217)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)
- Introduce new `experimental::hint` API for providing the runtime with additional information on how to execute a task (#227)
Expand Down
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ endif()

option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_DIVERGENCE_CHECK "Enable divergence check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

if(CELERITY_DIVERGENCE_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Divergence checker enabled - this will impact the overall performance")
endif()

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -186,7 +191,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/divergence_checker.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down Expand Up @@ -289,6 +294,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_DIVERGENCE_CHECK=$<BOOL:${CELERITY_DIVERGENCE_CHECK}>
CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

Expand Down
4 changes: 4 additions & 0 deletions docs/pitfalls.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ if(rand() > 1337) {
celerity::buffer<float, 2> my_buffer(...);
}
```

> Diverging Host-Execution can be detected at runtime by enabling the
> `CELERITY_DIVERGENCE_CHECK` CMake option at the cost of some runtime
> overhead (enabled by default in debug builds).
27 changes: 13 additions & 14 deletions include/divergence_block_chain.h → include/divergence_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
#include "communicator.h"
#include "recorders.h"

namespace celerity::detail {
struct runtime_testspy;
}

namespace celerity::detail::divergence_checker_detail {
using task_hash = size_t;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
Expand Down Expand Up @@ -72,29 +68,32 @@ class divergence_block_chain {
std::mutex m_task_records_mutex;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0);

std::unique_ptr<communicator> m_communicator;

void divergence_out(const divergence_map& check_map, const int task_num);
void reprot_divergence(const divergence_map& check_map, const int task_num);

void add_new_hashes();
void clear(const int min_progress);
std::pair<int, int> collect_hash_counts();
per_node_task_hashes collect_hashes(const int min_hash_count) const;
divergence_map create_check_map(const per_node_task_hashes& task_hashes, const int task_num) const;
divergence_map create_divergence_map(const per_node_task_hashes& task_hashes, const int task_num) const;

void check_for_deadlock() const;
void check_for_deadlock();

static void log_node_divergences(const divergence_map& check_map, const int task_num);
static void log_node_divergences(const divergence_map& check_map, const int task_id);
static void log_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
void log_task_record_once(const divergence_map& check_map, const int task_num);

void add_new_task(const task_record& task);
task_record thread_save_get_task_record(const size_t task_num);
};
}; // namespace celerity::detail::divergence_checker_detail

namespace celerity::detail {
class divergence_checker {
friend struct ::celerity::detail::runtime_testspy;
friend struct runtime_testspy;

public:
divergence_checker(task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false)
Expand All @@ -111,6 +110,10 @@ class divergence_checker {
~divergence_checker() { stop(); }

private:
std::thread m_thread;
bool m_is_running = false;
divergence_checker_detail::divergence_block_chain m_block_chain;

void start() {
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
Expand All @@ -129,9 +132,5 @@ class divergence_checker {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

std::thread m_thread;
bool m_is_running = false;
divergence_block_chain m_block_chain;
};
}; // namespace celerity::detail::divergence_checker_detail
};
2 changes: 1 addition & 1 deletion include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto box : r.get_boxes()) {
for(auto& box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
Expand Down
7 changes: 4 additions & 3 deletions include/mpi_communicator.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

#include <memory>

#include <mpi.h>
Expand All @@ -11,6 +12,8 @@ class mpi_communicator : public communicator {
mpi_communicator(MPI_Comm comm) : m_comm(comm) {}

private:
MPI_Comm m_comm;

void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, sendrecvbuf, sendrecvcount, MPI_BYTE, m_comm);
};
Expand All @@ -32,7 +35,5 @@ class mpi_communicator : public communicator {
MPI_Comm_rank(m_comm, &rank);
return static_cast<node_id>(rank);
}

MPI_Comm m_comm;
};
} // namespace celerity::detail
} // namespace celerity::detail
2 changes: 1 addition & 1 deletion include/ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
celerity::detail::utils::hash_combine(seed, std::hash<size_t>{}(r[i]));
}
return seed;
};
Expand Down
9 changes: 5 additions & 4 deletions include/recorders.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ class task_recorder {
void record_task(const task& tsk);

void add_callback(task_callback callback);
void invoke_callbacks(const task_record& tsk) const;

const task_records& get_tasks() const { return m_recorded_tasks; }

private:
task_records m_recorded_tasks;
std::vector<task_callback> m_callbacks{};
const buffer_manager* m_buff_mngr;

void invoke_callbacks(const task_record& tsk) const;
};

// Command recording
Expand Down Expand Up @@ -104,16 +105,16 @@ struct command_record {

class command_recorder {
public:
using command_record = std::vector<command_record>;
using command_records = std::vector<command_record>;

command_recorder(const task_manager* task_mngr, const buffer_manager* buff_mngr = nullptr) : m_task_mngr(task_mngr), m_buff_mngr(buff_mngr) {}

void record_command(const abstract_command& com);

const command_record& get_commands() const { return m_recorded_commands; }
const command_records& get_commands() const { return m_recorded_commands; }

private:
command_record m_recorded_commands;
command_records m_recorded_commands;
const task_manager* m_task_mngr;
const buffer_manager* m_buff_mngr;
};
Expand Down
4 changes: 2 additions & 2 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "command.h"
#include "config.h"
#include "device_queue.h"
#include "divergence_block_chain.h"
#include "divergence_checker.h"
#include "frame.h"
#include "host_queue.h"
#include "recorders.h"
Expand Down Expand Up @@ -102,7 +102,7 @@ namespace detail {
size_t m_num_nodes;
node_id m_local_nid;

std::unique_ptr<divergence_checker_detail::divergence_checker> m_divergence_check;
std::unique_ptr<divergence_checker> m_divergence_check;

// These management classes are only constructed on the master node.
std::unique_ptr<command_graph> m_cdag;
Expand Down
5 changes: 5 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ namespace detail {
const auto has_dry_run_nodes = parsed_and_validated_envs.get(env_dry_run_nodes);
if(has_dry_run_nodes) { m_dry_run_nodes = *has_dry_run_nodes; }

#if CELERITY_DIVERGENCE_CHECK
// divergence checker needs recording
m_recording = true;
#else
m_recording = parsed_and_validated_envs.get_or(env_recording, false);
#endif
m_horizon_step = parsed_and_validated_envs.get(env_horizon_step);
m_horizon_max_parallelism = parsed_and_validated_envs.get(env_horizon_max_para);

Expand Down
122 changes: 61 additions & 61 deletions src/divergence_block_chain.cc → src/divergence_checker.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
#include "divergence_block_chain.h"
#include "divergence_checker.h"

namespace celerity::detail::divergence_checker_detail {
bool divergence_block_chain::check_for_divergence() {
add_new_hashes();

const auto [min_hash_count, max_hash_count] = collect_hash_counts();

if(min_hash_count == 0) {
if(max_hash_count != 0 && m_local_nid == 0) {
check_for_deadlock();
} else if(max_hash_count == 0) {
return true;
}
return false;
}

const per_node_task_hashes task_graphs = collect_hashes(min_hash_count);

for(int j = 0; j < min_hash_count; ++j) {
const divergence_map check_map = create_divergence_map(task_graphs, j);

// If there is more than one hash for this task, we have a divergence!
if(check_map.size() > 1) { reprot_divergence(check_map, j); }
}

clear(min_hash_count);

return false;
}

void divergence_block_chain::reprot_divergence(const divergence_map& check_map, const int task_num) {
if(m_local_nid == 0) { log_node_divergences(check_map, task_num + static_cast<int>(m_tasks_checked) + 1); }

// sleep for local_nid * 100 ms such that we have a no lock synchronized output
std::this_thread::sleep_for(std::chrono::milliseconds(m_local_nid * 100));

log_task_record_once(check_map, task_num);

m_communicator->barrier();

throw std::runtime_error("Divergence in task graph detected");
}

void divergence_block_chain::add_new_hashes() {
std::lock_guard<std::mutex> lock(m_task_records_mutex);
Expand Down Expand Up @@ -38,36 +78,36 @@ per_node_task_hashes divergence_block_chain::collect_hashes(const int min_hash_c
}


divergence_map divergence_block_chain::create_check_map(const per_node_task_hashes& task_hashes, const int task_num) const {
divergence_map divergence_block_chain::create_divergence_map(const per_node_task_hashes& task_hashes, const int task_num) const {
divergence_map check_map;
for(size_t i = 0; i < m_num_nodes; ++i) {
check_map[task_hashes(i, task_num)].push_back(i);
for(node_id nid = 0; nid < m_num_nodes; ++nid) {
check_map[task_hashes(nid, task_num)].push_back(nid);
}
return check_map;
}

void divergence_block_chain::check_for_deadlock() const {
void divergence_block_chain::check_for_deadlock() {
auto diff = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_last_cleared);
static auto last = std::chrono::seconds(0);

if(diff >= std::chrono::seconds(10) && diff - last >= std::chrono::seconds(5)) {
std::string warning = fmt::format("After {} seconds of waiting nodes", diff.count());
if(diff >= std::chrono::seconds(10) && diff - m_time_of_last_warning >= std::chrono::seconds(5)) {
std::string warning = fmt::format("After {} seconds of waiting, node(s)", diff.count());

for(size_t i = 0; i < m_num_nodes; ++i) {
if(m_per_node_hash_counts[i] == 0) { warning += fmt::format(" {},", i); }
std::vector<node_id> stuck_nodes;
for(node_id nid = 0; nid < m_num_nodes; ++nid) {
if(m_per_node_hash_counts[nid] == 0) stuck_nodes.push_back(nid);
}

warning += " did not move to the next task. The runtime might be stuck.";
warning += fmt::format(" {} ", fmt::join(stuck_nodes, ","));
warning += "did not move to the next task. The runtime might be stuck.";

CELERITY_WARN("{}", warning);
last = diff;
m_time_of_last_warning = diff;
}
}

void divergence_block_chain::log_node_divergences(const divergence_map& check_map, const int task_num) {
std::string error = fmt::format("Divergence detected in task graph at index {}:\n\n", task_num);
void divergence_block_chain::log_node_divergences(const divergence_map& check_map, const int task_id) {
std::string error = fmt::format("Divergence detected. Task Nr {} diverges on nodes:\n\n", task_id);
for(auto& [hash, nodes] : check_map) {
error += fmt::format("{:#x} on nodes ", hash);
error += fmt::format("Following task-hash {:#x} resulted on {} ", hash, nodes.size() > 1 ? "nodes" : "node ");
for(auto& node : nodes) {
error += fmt::format("{} ", node);
}
Expand Down Expand Up @@ -115,11 +155,6 @@ void divergence_block_chain::log_task_record(const divergence_map& check_map, co
CELERITY_ERROR("{}", task_record_output);
}

task_record divergence_block_chain::thread_save_get_task_record(const size_t task_num) {
std::lock_guard<std::mutex> lock(m_task_records_mutex);
return m_task_records[task_num];
}

void divergence_block_chain::log_task_record_once(const divergence_map& check_map, const int task_num) {
for(auto& [hash, nodes] : check_map) {
if(nodes[0] == m_local_nid) {
Expand All @@ -129,49 +164,14 @@ void divergence_block_chain::log_task_record_once(const divergence_map& check_ma
}
}

bool divergence_block_chain::check_for_divergence() {
add_new_hashes();

const auto [min_hash_count, max_hash_count] = collect_hash_counts();

if(min_hash_count == 0) {
if(max_hash_count != 0 && m_local_nid == 0) {
check_for_deadlock();
} else if(max_hash_count == 0) {
return true;
}
return false;
}

const per_node_task_hashes task_graphs = collect_hashes(min_hash_count);

for(int j = 0; j < min_hash_count; ++j) {
const divergence_map check_map = create_check_map(task_graphs, j);

if(check_map.size() > 1) { divergence_out(check_map, j); }
}

clear(min_hash_count);

return false;
}

void divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) {
if(m_local_nid == 0) { log_node_divergences(check_map, task_num); }

// sleep for local_nid * 100 ms such that we have a no lock synchronized output
std::this_thread::sleep_for(std::chrono::milliseconds(m_local_nid * 100));

log_task_record_once(check_map, task_num);

m_communicator->barrier();

throw std::runtime_error("Divergence in task graph detected");
}

void divergence_block_chain::add_new_task(const task_record& task) { //
std::lock_guard<std::mutex> lock(m_task_records_mutex);
// make copy of task record so that we can access it later
m_task_records.emplace_back(task);
}

task_record divergence_block_chain::thread_save_get_task_record(const size_t task_num) {
std::lock_guard<std::mutex> lock(m_task_records_mutex);
return m_task_records[task_num];
}
} // namespace celerity::detail::divergence_checker_detail
Loading

0 comments on commit 3fe7ae2

Please sign in to comment.