Skip to content

Commit

Permalink
add celerity blockchain for task divergence checking
Browse files Browse the repository at this point in the history
  • Loading branch information
GagaLP committed Dec 6, 2023
1 parent 0822c32 commit 388c399
Show file tree
Hide file tree
Showing 19 changed files with 1,058 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Add divergence check blockchain for automatic detection of diverging tasks in debug mode (#217)

## Changed

Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down
47 changes: 47 additions & 0 deletions include/communicator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include "types.h"

namespace celerity::detail {

/*
* @brief Defines an interface for a communicator that can be used to communicate between nodes.
*
* This interface is used to abstract away the communication between nodes. This allows us to use different communication backends during testing and
* runtime. For example, we can use MPI for the runtime and a custom implementation for testing.
*/
class communicator {
public:
communicator() = default;
communicator(const communicator&) = delete;
communicator(communicator&&) noexcept = default;

communicator& operator=(const communicator&) = delete;
communicator& operator=(communicator&&) noexcept = default;

virtual ~communicator() = default;

template <typename S>
void allgather_inplace(S* sendrecvbuf, const int sendrecvcount) {
allgather_inplace_impl(reinterpret_cast<std::byte*>(sendrecvbuf), sendrecvcount * sizeof(S));
}

template <typename S, typename R>
void allgather(const S* sendbuf, const int sendcount, R* recvbuf, const int recvcount) {
allgather_impl(reinterpret_cast<const std::byte*>(sendbuf), sendcount * sizeof(S), reinterpret_cast<std::byte*>(recvbuf), recvcount * sizeof(R));
}

void barrier() { barrier_impl(); }

size_t get_num_nodes() { return num_nodes_impl(); }

node_id get_local_nid() { return local_nid_impl(); }

protected:
virtual void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) = 0;
virtual void allgather_impl(const std::byte* sendbuf, const int sendcount, std::byte* recvbuf, const int recvcount) = 0;
virtual void barrier_impl() = 0;
virtual size_t num_nodes_impl() = 0;
virtual node_id local_nid_impl() = 0;
};
} // namespace celerity::detail
137 changes: 137 additions & 0 deletions include/divergence_block_chain.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#pragma once

#include <mutex>
#include <thread>
#include <vector>

#include "communicator.h"
#include "recorders.h"

namespace celerity::detail {
struct runtime_testspy;
}

namespace celerity::detail::divergence_checker_detail {
using task_hash = size_t;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;

/**
* @brief Stores the hashes of tasks for each node.
*
* The data is stored densely so it can easily be exchanged through MPI collective operations.
*/
struct per_node_task_hashes {
public:
per_node_task_hashes(const size_t max_hash_count, const size_t num_nodes) : m_data(max_hash_count * num_nodes), m_max_hash_count(max_hash_count){};
const task_hash& operator()(const node_id nid, const size_t i) const { return m_data.at(nid * m_max_hash_count + i); }
task_hash* data() { return m_data.data(); }

private:
std::vector<task_hash> m_data;
size_t m_max_hash_count;
};

/**
* @brief This class checks for divergences of tasks between nodes.
*
* It is responsible for collecting the task hashes from all nodes and checking for differences -> divergence.
* When a divergence is found, the task record for the diverging task is printed and the program is terminated.
* Additionally it will also print a warning when a deadlock is suspected.
*/

class divergence_block_chain {
friend struct divergence_block_chain_testspy;

public:
divergence_block_chain(task_recorder& task_recorder, std::unique_ptr<communicator> comm)
: m_local_nid(comm->get_local_nid()), m_num_nodes(comm->get_num_nodes()), m_per_node_hash_counts(comm->get_num_nodes()),
m_communicator(std::move(comm)) {
task_recorder.add_callback([this](const task_record& task) { add_new_task(task); });
}

divergence_block_chain(const divergence_block_chain&) = delete;
divergence_block_chain(divergence_block_chain&&) = delete;

~divergence_block_chain() = default;

divergence_block_chain& operator=(const divergence_block_chain&) = delete;
divergence_block_chain& operator=(divergence_block_chain&&) = delete;

bool check_for_divergence();

private:
node_id m_local_nid;
size_t m_num_nodes;

std::vector<task_hash> m_local_hashes;
std::vector<task_record> m_task_records;
size_t m_tasks_checked = 0;
size_t m_hashes_added = 0;

std::vector<int> m_per_node_hash_counts;
std::mutex m_task_records_mutex;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();

std::unique_ptr<communicator> m_communicator;

void divergence_out(const divergence_map& check_map, const int task_num);

void add_new_hashes();
void clear(const int min_progress);
std::pair<int, int> collect_hash_counts();
per_node_task_hashes collect_hashes(const int min_hash_count) const;
divergence_map create_check_map(const per_node_task_hashes& task_hashes, const int task_num) const;

void check_for_deadlock() const;

static void log_node_divergences(const divergence_map& check_map, const int task_num);
static void log_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
void log_task_record_once(const divergence_map& check_map, const int task_num);

void add_new_task(const task_record& task);
task_record thread_save_get_task_record(const size_t task_num);
};

class divergence_checker {
friend struct ::celerity::detail::runtime_testspy;

public:
divergence_checker(task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false)
: m_block_chain(task_recorder, std::move(comm)) {
if(!test_mode) { start(); }
}

divergence_checker(const divergence_checker&) = delete;
divergence_checker(const divergence_checker&&) = delete;

divergence_checker& operator=(const divergence_checker&) = delete;
divergence_checker& operator=(divergence_checker&&) = delete;

~divergence_checker() { stop(); }

private:
void start() {
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
}

void stop() {
m_is_running = false;
if(m_thread.joinable()) { m_thread.join(); }
}

void run() {
bool is_finished = false;
while(!is_finished || m_is_running) {
is_finished = m_block_chain.check_for_divergence();

std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

std::thread m_thread;
bool m_is_running = false;
divergence_block_chain m_block_chain;
};
}; // namespace celerity::detail::divergence_checker_detail
22 changes: 22 additions & 0 deletions include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <gch/small_vector.hpp>

#include "ranges.h"
#include "utils.h"
#include "workaround.h"

namespace celerity::detail {
Expand Down Expand Up @@ -257,6 +258,27 @@ class region {

} // namespace celerity::detail

template <int Dims>
struct std::hash<celerity::detail::box<Dims>> {
std::size_t operator()(const celerity::detail::box<Dims> r) {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::id<Dims>>{}(r.get_min()), std::hash<celerity::id<Dims>>{}(r.get_max()));
return seed;
};
};

template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
};
};


namespace celerity::detail::grid_detail {

// forward-declaration for tests (explicitly instantiated)
Expand Down
38 changes: 38 additions & 0 deletions include/mpi_communicator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once
#include <memory>

#include <mpi.h>

#include "communicator.h"

namespace celerity::detail {
class mpi_communicator : public communicator {
public:
mpi_communicator(MPI_Comm comm) : m_comm(comm) {}

private:
void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, sendrecvbuf, sendrecvcount, MPI_BYTE, m_comm);
};

void allgather_impl(const std::byte* sendbuf, const int sendcount, std::byte* recvbuf, const int recvcount) override {
MPI_Allgather(sendbuf, sendcount, MPI_BYTE, recvbuf, recvcount, MPI_BYTE, m_comm);
};

void barrier_impl() override { MPI_Barrier(m_comm); }

size_t num_nodes_impl() override {
int size = -1;
MPI_Comm_size(m_comm, &size);
return static_cast<size_t>(size);
}

node_id local_nid_impl() override {
int rank = -1;
MPI_Comm_rank(m_comm, &rank);
return static_cast<node_id>(rank);
}

MPI_Comm m_comm;
};
} // namespace celerity::detail
23 changes: 23 additions & 0 deletions include/ranges.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "sycl_wrappers.h"
#include "utils.h"
#include "workaround.h"

namespace celerity {
Expand Down Expand Up @@ -229,6 +230,17 @@ struct ones_t {

}; // namespace celerity::detail

template <typename Interface, int Dims>
struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
}
return seed;
};
};

namespace celerity {

template <int Dims>
Expand Down Expand Up @@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>;

} // namespace celerity


template <int Dims>
struct std::hash<celerity::range<Dims>> {
std::size_t operator()(const celerity::range<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::range<Dims>, Dims>>{}(r); };
};

template <int Dims>
struct std::hash<celerity::id<Dims>> {
std::size_t operator()(const celerity::id<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::id<Dims>, Dims>>{}(r); };
};

namespace celerity {
namespace detail {

Expand Down
Loading

0 comments on commit 388c399

Please sign in to comment.