Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add celerity blockchain for task divergence checking #217

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Add automatic detection of diverging execution in debug mode (#217)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)
- Introduce new `experimental::hint` API for providing the runtime with additional information on how to execute a task (#227)
Expand Down
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ endif()

option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_DIVERGENCE_CHECK "Enable divergence check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

if(CELERITY_DIVERGENCE_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Divergence checker enabled - this will impact the overall performance")
endif()

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -186,6 +191,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_checker.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down Expand Up @@ -288,6 +294,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_DIVERGENCE_CHECK=$<BOOL:${CELERITY_DIVERGENCE_CHECK}>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs to be added to cmake/celerity-config.cmake.in!

CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

Expand Down
4 changes: 4 additions & 0 deletions docs/pitfalls.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ if(rand() > 1337) {
celerity::buffer<float, 2> my_buffer(...);
}
```

> Diverging Host-Execution can be detected at runtime by enabling the
> `CELERITY_DIVERGENCE_CHECK` CMake option at the cost of some runtime
> overhead (enabled by default in debug builds).
47 changes: 47 additions & 0 deletions include/communicator.h
psalz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include "types.h"

namespace celerity::detail {
psalz marked this conversation as resolved.
Show resolved Hide resolved

/*
* @brief Defines an interface for a communicator that can be used to communicate between nodes.
*
* This interface is used to abstract away the communication between nodes. This allows us to use different communication backends during testing and
* runtime. For example, we can use MPI for the runtime and a custom implementation for testing.
*/
class communicator {
psalz marked this conversation as resolved.
Show resolved Hide resolved
public:
communicator() = default;
communicator(const communicator&) = delete;
communicator(communicator&&) noexcept = default;

communicator& operator=(const communicator&) = delete;
communicator& operator=(communicator&&) noexcept = default;

virtual ~communicator() = default;

template <typename S>
void allgather_inplace(S* sendrecvbuf, const int sendrecvcount) {
allgather_inplace_impl(reinterpret_cast<std::byte*>(sendrecvbuf), sendrecvcount * sizeof(S));
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
}

template <typename S, typename R>
void allgather(const S* sendbuf, const int sendcount, R* recvbuf, const int recvcount) {
allgather_impl(reinterpret_cast<const std::byte*>(sendbuf), sendcount * sizeof(S), reinterpret_cast<std::byte*>(recvbuf), recvcount * sizeof(R));
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
}

void barrier() { barrier_impl(); }

size_t get_num_nodes() { return num_nodes_impl(); }

node_id get_local_nid() { return local_nid_impl(); }

protected:
virtual void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) = 0;
psalz marked this conversation as resolved.
Show resolved Hide resolved
virtual void allgather_impl(const std::byte* sendbuf, const int sendcount, std::byte* recvbuf, const int recvcount) = 0;
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
virtual void barrier_impl() = 0;
virtual size_t num_nodes_impl() = 0;
virtual node_id local_nid_impl() = 0;
};
} // namespace celerity::detail
137 changes: 137 additions & 0 deletions include/divergence_checker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#pragma once

#include <mutex>
#include <thread>
#include <vector>

#include "communicator.h"
#include "recorders.h"

namespace celerity::detail::divergence_checker_detail {
using task_hash = size_t;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;

/**
* @brief Stores the hashes of tasks for each node.
*
* The data is stored densely so it can easily be exchanged through MPI collective operations.
*/
struct per_node_task_hashes {
public:
per_node_task_hashes(const size_t max_hash_count, const size_t num_nodes) : m_data(max_hash_count * num_nodes), m_max_hash_count(max_hash_count){};
psalz marked this conversation as resolved.
Show resolved Hide resolved
const task_hash& operator()(const node_id nid, const size_t i) const { return m_data.at(nid * m_max_hash_count + i); }
task_hash* data() { return m_data.data(); }

private:
std::vector<task_hash> m_data;
size_t m_max_hash_count;
};

/**
* @brief This class checks for divergences of tasks between nodes.
*
* It is responsible for collecting the task hashes from all nodes and checking for differences -> divergence.
* When a divergence is found, the task record for the diverging task is printed and the program is terminated.
* Additionally it will also print a warning when a deadlock is suspected.
*/

class divergence_block_chain {
friend struct divergence_block_chain_testspy;

public:
divergence_block_chain(task_recorder& task_recorder, std::unique_ptr<communicator> comm)
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
: m_local_nid(comm->get_local_nid()), m_num_nodes(comm->get_num_nodes()), m_per_node_hash_counts(comm->get_num_nodes()),
m_communicator(std::move(comm)) {
task_recorder.add_callback([this](const task_record& task) { add_new_task(task); });
}

divergence_block_chain(const divergence_block_chain&) = delete;
divergence_block_chain(divergence_block_chain&&) = delete;

~divergence_block_chain() = default;

divergence_block_chain& operator=(const divergence_block_chain&) = delete;
divergence_block_chain& operator=(divergence_block_chain&&) = delete;

bool check_for_divergence();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment on what a true / false return value means. It reads like this would return true when there was divergence, but the function actually throws in that case!


private:
node_id m_local_nid;
size_t m_num_nodes;

std::vector<task_hash> m_local_hashes;
std::vector<task_record> m_task_records;
size_t m_tasks_checked = 0;
size_t m_hashes_added = 0;
task_hash m_last_hash = 0;

std::vector<int> m_per_node_hash_counts;
std::mutex m_task_records_mutex;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0);

std::unique_ptr<communicator> m_communicator;

void reprot_divergence(const divergence_map& check_map, const int task_num);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void reprot_divergence(const divergence_map& check_map, const int task_num);
void report_divergence(const divergence_map& check_map, const int task_num);


void add_new_hashes();
void clear(const int min_progress);
std::pair<int, int> collect_hash_counts();
per_node_task_hashes collect_hashes(const int min_hash_count) const;
divergence_map create_divergence_map(const per_node_task_hashes& task_hashes, const int task_num) const;

void check_for_deadlock();

static void log_node_divergences(const divergence_map& check_map, const int task_id);
static void log_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
void log_task_record_once(const divergence_map& check_map, const int task_num);

void add_new_task(const task_record& task);
task_record thread_save_get_task_record(const size_t task_num);
};
}; // namespace celerity::detail::divergence_checker_detail

namespace celerity::detail {
class divergence_checker {
friend struct runtime_testspy;

public:
divergence_checker(task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false)
: m_block_chain(task_recorder, std::move(comm)) {
if(!test_mode) { start(); }
}

divergence_checker(const divergence_checker&) = delete;
divergence_checker(const divergence_checker&&) = delete;

divergence_checker& operator=(const divergence_checker&) = delete;
divergence_checker& operator=(divergence_checker&&) = delete;

~divergence_checker() { stop(); }

private:
std::thread m_thread;
bool m_is_running = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_is_running must be protected by a mutex!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just use an atomic.

divergence_checker_detail::divergence_block_chain m_block_chain;

void start() {
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
Comment on lines +119 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is a race between setting m_is_running = true here and the check for m_is_running in run(). I suggest you reverse the order to fix this.

Suggested change
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
m_is_running = true;
m_thread = std::thread(&divergence_checker::run, this);

}

void stop() {
m_is_running = false;
if(m_thread.joinable()) { m_thread.join(); }
}

void run() {
psalz marked this conversation as resolved.
Show resolved Hide resolved
bool is_finished = false;
while(!is_finished || m_is_running) {
is_finished = m_block_chain.check_for_divergence();

std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
};
}; // namespace celerity::detail
22 changes: 22 additions & 0 deletions include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <gch/small_vector.hpp>

#include "ranges.h"
#include "utils.h"
#include "workaround.h"

namespace celerity::detail {
Expand Down Expand Up @@ -257,6 +258,27 @@ class region {

} // namespace celerity::detail

template <int Dims>
struct std::hash<celerity::detail::box<Dims>> {
std::size_t operator()(const celerity::detail::box<Dims> r) {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::id<Dims>>{}(r.get_min()), std::hash<celerity::id<Dims>>{}(r.get_max()));
return seed;
};
};

template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto& box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
};
};


namespace celerity::detail::grid_detail {

// forward-declaration for tests (explicitly instantiated)
Expand Down
39 changes: 39 additions & 0 deletions include/mpi_communicator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <memory>
psalz marked this conversation as resolved.
Show resolved Hide resolved

#include <mpi.h>

#include "communicator.h"

namespace celerity::detail {
psalz marked this conversation as resolved.
Show resolved Hide resolved
class mpi_communicator : public communicator {
public:
mpi_communicator(MPI_Comm comm) : m_comm(comm) {}

private:
MPI_Comm m_comm;

void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) override {
psalz marked this conversation as resolved.
Show resolved Hide resolved
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, sendrecvbuf, sendrecvcount, MPI_BYTE, m_comm);
};

void allgather_impl(const std::byte* sendbuf, const int sendcount, std::byte* recvbuf, const int recvcount) override {
psalz marked this conversation as resolved.
Show resolved Hide resolved
psalz marked this conversation as resolved.
Show resolved Hide resolved
MPI_Allgather(sendbuf, sendcount, MPI_BYTE, recvbuf, recvcount, MPI_BYTE, m_comm);
};

void barrier_impl() override { MPI_Barrier(m_comm); }

size_t num_nodes_impl() override {
int size = -1;
MPI_Comm_size(m_comm, &size);
return static_cast<size_t>(size);
}

node_id local_nid_impl() override {
int rank = -1;
MPI_Comm_rank(m_comm, &rank);
return static_cast<node_id>(rank);
}
};
} // namespace celerity::detail
23 changes: 23 additions & 0 deletions include/ranges.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "sycl_wrappers.h"
#include "utils.h"
#include "workaround.h"

namespace celerity {
Expand Down Expand Up @@ -229,6 +230,17 @@ struct ones_t {

}; // namespace celerity::detail

template <typename Interface, int Dims>
struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<size_t>{}(r[i]));
}
return seed;
};
};

namespace celerity {

template <int Dims>
Expand Down Expand Up @@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>;

} // namespace celerity


template <int Dims>
struct std::hash<celerity::range<Dims>> {
std::size_t operator()(const celerity::range<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::range<Dims>, Dims>>{}(r); };
};

template <int Dims>
struct std::hash<celerity::id<Dims>> {
std::size_t operator()(const celerity::id<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::id<Dims>, Dims>>{}(r); };
};

namespace celerity {
namespace detail {

Expand Down
Loading