Skip to content

Commit

Permalink
[no ci] Revision: add celerity blockchain for task divergence checking
Browse files Browse the repository at this point in the history
  • Loading branch information
GagaLP committed Dec 19, 2023
1 parent fac5661 commit 1a6e3ee
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 149 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Add divergence check blockchain for automatic detection of diverging tasks in debug mode (#217)
- Add automatic detection of diverging execution in debug mode (#217)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)
- Introduce new `experimental::hint` API for providing the runtime with additional information on how to execute a task (#227)
Expand Down
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ endif()

option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_DIVERGENCE_CHECK "Enable divergence check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

if(CELERITY_DIVERGENCE_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Divergence checker enabled - this will impact the overall performance")
endif()

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -186,7 +191,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/divergence_checker.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down Expand Up @@ -289,6 +294,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_DIVERGENCE_CHECK=$<BOOL:${CELERITY_DIVERGENCE_CHECK}>
CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

Expand Down
4 changes: 4 additions & 0 deletions docs/pitfalls.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ if(rand() > 1337) {
celerity::buffer<float, 2> my_buffer(...);
}
```

> Diverging Host-Execution can be detected at runtime by enabling the
> `CELERITY_DIVERGENCE_CHECK` CMake option at the cost of some runtime
> overhead (enabled by default in debug builds).
28 changes: 14 additions & 14 deletions include/divergence_block_chain.h → include/divergence_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
#include "communicator.h"
#include "recorders.h"

namespace celerity::detail {
struct runtime_testspy;
}

namespace celerity::detail::divergence_checker_detail {
using task_hash = size_t;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
Expand Down Expand Up @@ -67,34 +63,38 @@ class divergence_block_chain {
std::vector<task_record> m_task_records;
size_t m_tasks_checked = 0;
size_t m_hashes_added = 0;
task_hash m_last_hash = 0;

std::vector<int> m_per_node_hash_counts;
std::mutex m_task_records_mutex;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0);

std::unique_ptr<communicator> m_communicator;

void divergence_out(const divergence_map& check_map, const int task_num);
void reprot_divergence(const divergence_map& check_map, const int task_num);

void add_new_hashes();
void clear(const int min_progress);
std::pair<int, int> collect_hash_counts();
per_node_task_hashes collect_hashes(const int min_hash_count) const;
divergence_map create_check_map(const per_node_task_hashes& task_hashes, const int task_num) const;
divergence_map create_divergence_map(const per_node_task_hashes& task_hashes, const int task_num) const;

void check_for_deadlock() const;
void check_for_deadlock();

static void log_node_divergences(const divergence_map& check_map, const int task_num);
static void log_node_divergences(const divergence_map& check_map, const int task_id);
static void log_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
void log_task_record_once(const divergence_map& check_map, const int task_num);

void add_new_task(const task_record& task);
task_record thread_save_get_task_record(const size_t task_num);
};
}; // namespace celerity::detail::divergence_checker_detail

namespace celerity::detail {
class divergence_checker {
friend struct ::celerity::detail::runtime_testspy;
friend struct runtime_testspy;

public:
divergence_checker(task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false)
Expand All @@ -111,6 +111,10 @@ class divergence_checker {
~divergence_checker() { stop(); }

private:
std::thread m_thread;
bool m_is_running = false;
divergence_checker_detail::divergence_block_chain m_block_chain;

void start() {
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
Expand All @@ -129,9 +133,5 @@ class divergence_checker {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

std::thread m_thread;
bool m_is_running = false;
divergence_block_chain m_block_chain;
};
}; // namespace celerity::detail::divergence_checker_detail
}; // namespace celerity::detail
2 changes: 1 addition & 1 deletion include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto box : r.get_boxes()) {
for(auto& box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
Expand Down
7 changes: 4 additions & 3 deletions include/mpi_communicator.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

#include <memory>

#include <mpi.h>
Expand All @@ -11,6 +12,8 @@ class mpi_communicator : public communicator {
mpi_communicator(MPI_Comm comm) : m_comm(comm) {}

private:
MPI_Comm m_comm;

void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, sendrecvbuf, sendrecvcount, MPI_BYTE, m_comm);
};
Expand All @@ -32,7 +35,5 @@ class mpi_communicator : public communicator {
MPI_Comm_rank(m_comm, &rank);
return static_cast<node_id>(rank);
}

MPI_Comm m_comm;
};
} // namespace celerity::detail
} // namespace celerity::detail
2 changes: 1 addition & 1 deletion include/ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
celerity::detail::utils::hash_combine(seed, std::hash<size_t>{}(r[i]));
}
return seed;
};
Expand Down
9 changes: 5 additions & 4 deletions include/recorders.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ class task_recorder {
void record_task(const task& tsk);

void add_callback(task_callback callback);
void invoke_callbacks(const task_record& tsk) const;

const task_records& get_tasks() const { return m_recorded_tasks; }

private:
task_records m_recorded_tasks;
std::vector<task_callback> m_callbacks{};
const buffer_manager* m_buff_mngr;

void invoke_callbacks(const task_record& tsk) const;
};

// Command recording
Expand Down Expand Up @@ -104,16 +105,16 @@ struct command_record {

class command_recorder {
public:
using command_record = std::vector<command_record>;
using command_records = std::vector<command_record>;

command_recorder(const task_manager* task_mngr, const buffer_manager* buff_mngr = nullptr) : m_task_mngr(task_mngr), m_buff_mngr(buff_mngr) {}

void record_command(const abstract_command& com);

const command_record& get_commands() const { return m_recorded_commands; }
const command_records& get_commands() const { return m_recorded_commands; }

private:
command_record m_recorded_commands;
command_records m_recorded_commands;
const task_manager* m_task_mngr;
const buffer_manager* m_buff_mngr;
};
Expand Down
4 changes: 2 additions & 2 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "command.h"
#include "config.h"
#include "device_queue.h"
#include "divergence_block_chain.h"
#include "divergence_checker.h"
#include "frame.h"
#include "host_queue.h"
#include "recorders.h"
Expand Down Expand Up @@ -102,7 +102,7 @@ namespace detail {
size_t m_num_nodes;
node_id m_local_nid;

std::unique_ptr<divergence_checker_detail::divergence_checker> m_divergence_check;
std::unique_ptr<divergence_checker> m_divergence_check;

// These management classes are only constructed on the master node.
std::unique_ptr<command_graph> m_cdag;
Expand Down
5 changes: 5 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ namespace detail {
const auto has_dry_run_nodes = parsed_and_validated_envs.get(env_dry_run_nodes);
if(has_dry_run_nodes) { m_dry_run_nodes = *has_dry_run_nodes; }

#if CELERITY_DIVERGENCE_CHECK
// divergence checker needs recording
m_recording = true;
#else
m_recording = parsed_and_validated_envs.get_or(env_recording, false);
#endif
m_horizon_step = parsed_and_validated_envs.get(env_horizon_step);
m_horizon_max_parallelism = parsed_and_validated_envs.get(env_horizon_max_para);

Expand Down
Loading

0 comments on commit 1a6e3ee

Please sign in to comment.