Skip to content

Commit

Permalink
[no ci] add celerity blockchain for task divergence checking
Browse files Browse the repository at this point in the history
  • Loading branch information
GagaLP committed Oct 12, 2023
1 parent 0822c32 commit 4af1341
Show file tree
Hide file tree
Showing 15 changed files with 924 additions and 5 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down
161 changes: 161 additions & 0 deletions include/divergence_block_chain.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#pragma once

#include "recorders.h"
#include <mutex>
#include <thread>
#include <vector>

namespace celerity::detail {
/**
* @brief This class is a wrapper around a 1D vector that allows us to access it as a 2D array.
*
* It is used to send the task hashes to other nodes using MPI while keeping the code simple and readable.
*/
template <typename T>
struct mpi_2d_send_wrapper {
public:
mpi_2d_send_wrapper(size_t width, size_t height) : m_data(width * height), m_width(width){};

const T& operator[](std::pair<int, int> ij) const {
assert(ij.first * m_width + ij.second < m_data.size());
return m_data[ij.first * m_width + ij.second];
}

T* data() { return m_data.data(); }

private:
std::vector<T> m_data;
const size_t m_width;
};

/**
* @brief This class gives a view into a const vector.
*
* It is used to give us the currently unhashed task records while keeping track of the offset and width.
*/
template <typename T>
struct window {
public:
window(const std::vector<T>& value) : m_value(value) {}

const T& operator[](size_t i) const {
assert(i >= 0 && i < m_width);
return m_value[m_offset + i];
}

size_t size() {
m_width = m_value.size() - m_offset;
return m_width;
}

void slide(size_t i) {
assert(i == 0 || (i >= 0 && i <= m_width));
m_offset += i;
m_width -= i;
}

private:
const std::vector<T>& m_value;
size_t m_offset = 0;
size_t m_width = 0;
};

using task_hash = size_t;
using task_hash_data = mpi_2d_send_wrapper<task_hash>;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;

/**
* @brief This class is the base implementation for the divergence check.
*
* It is responsible for collecting the task hashes from all nodes and checking for differences -> divergence.
* When a divergence is found, the task record for the diverging task is printed and the program is terminated.
* Additionally it also checks for deadlocks and prints a warning if one is detected.
*
* The class is abstract to allow a different divergence check implementation in tests
*/
class abstract_block_chain {
friend struct abstract_block_chain_testspy;

public:
abstract_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm)
: m_local_nid(local_nid), m_num_nodes(num_nodes), m_sizes(num_nodes), m_task_recorder_window(task_recorder), m_comm(comm) {}

abstract_block_chain(const abstract_block_chain&) = delete;
abstract_block_chain(abstract_block_chain&&) = default;

virtual ~abstract_block_chain() = default;

abstract_block_chain& operator=(const abstract_block_chain&) = delete;
abstract_block_chain& operator=(abstract_block_chain&&) = delete;

virtual void stop() { m_is_running = false; };

protected:
node_id m_local_nid;
size_t m_num_nodes;

std::vector<task_hash> m_hashes;
std::vector<int> m_sizes;

bool m_is_running = true;

window<task_record> m_task_recorder_window;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();

MPI_Comm m_comm;

void start() { m_is_running = true; };

virtual void run() = 0;

virtual void divergence_out(const divergence_map& check_map, const int task_num) = 0;

void add_new_hashes();
void clear(const int min_progress);
virtual void allgather_sizes();
virtual void allgather_hashes(const int max_size, task_hash_data& data);
std::pair<int, int> collect_sizes();
task_hash_data collect_hashes(const int max_size);
divergence_map create_check_map(const task_hash_data& task_graphs, const int task_num) const;

void check_for_deadlock() const;

static void print_node_divergences(const divergence_map& check_map, const int task_num);

static void print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);

virtual void dedub_print_task_record(const divergence_map& check_map, const int task_num) const;

bool check_for_divergence();
};

/**
* @brief This class is the main implementation for the divergence check.
*/
class divergence_block_chain : public abstract_block_chain {
public:
divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm, bool test_mode = false)
: abstract_block_chain(num_nodes, local_nid, task_record, comm), m_test_mode(test_mode) {
divergence_block_chain::start();
}

divergence_block_chain(const divergence_block_chain&) = delete;
divergence_block_chain(divergence_block_chain&&) = default;

divergence_block_chain& operator=(const divergence_block_chain&) = delete;
divergence_block_chain& operator=(divergence_block_chain&&) = delete;

~divergence_block_chain() override { divergence_block_chain::stop(); }

void start();
void stop() override;

private:
std::thread m_thread;
bool m_test_mode = false;

void run() override;
void divergence_out(const divergence_map& check_map, const int task_num) override;
};
}; // namespace celerity::detail
22 changes: 22 additions & 0 deletions include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <gch/small_vector.hpp>

#include "ranges.h"
#include "utils.h"
#include "workaround.h"

namespace celerity::detail {
Expand Down Expand Up @@ -257,6 +258,27 @@ class region {

} // namespace celerity::detail

template <int Dims>
struct std::hash<celerity::detail::box<Dims>> {
std::size_t operator()(const celerity::detail::box<Dims> r) {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::id<Dims>>{}(r.get_min()), std::hash<celerity::id<Dims>>{}(r.get_max()));
return seed;
};
};

template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
};
};


namespace celerity::detail::grid_detail {

// forward-declaration for tests (explicitly instantiated)
Expand Down
23 changes: 23 additions & 0 deletions include/ranges.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "sycl_wrappers.h"
#include "utils.h"
#include "workaround.h"

namespace celerity {
Expand Down Expand Up @@ -229,6 +230,17 @@ struct ones_t {

}; // namespace celerity::detail

template <typename Interface, int Dims>
struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
}
return seed;
};
};

namespace celerity {

template <int Dims>
Expand Down Expand Up @@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>;

} // namespace celerity


template <int Dims>
struct std::hash<celerity::range<Dims>> {
std::size_t operator()(const celerity::range<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::range<Dims>, Dims>>{}(r); };
};

template <int Dims>
struct std::hash<celerity::id<Dims>> {
std::size_t operator()(const celerity::id<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::id<Dims>, Dims>>{}(r); };
};

namespace celerity {
namespace detail {

Expand Down
82 changes: 82 additions & 0 deletions include/recorders.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,85 @@ class command_recorder {
};

} // namespace celerity::detail

template <>
struct std::hash<celerity::detail::reduction_record> {
std::size_t operator()(const celerity::detail::reduction_record& r) const noexcept {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::reduction_id>{}(r.rid), std::hash<celerity::detail::buffer_id>{}(r.bid),
std::hash<std::string>{}(r.buffer_name), std::hash<bool>{}(r.init_from_buffer));
return seed;
};
};

template <>
struct std::hash<celerity::detail::access_record> {
std::size_t operator()(const celerity::detail::access_record& r) {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::buffer_id>{}(r.bid), std::hash<std::string>{}(r.buffer_name),
std::hash<celerity::access_mode>{}(r.mode), std::hash<celerity::detail::region<3>>{}(r.req));
return seed;
};
};

template <typename IdType>
struct std::hash<celerity::detail::dependency_record<IdType>> {
std::size_t operator()(const celerity::detail::dependency_record<IdType>& r) const noexcept {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<IdType>{}(r.node), std::hash<celerity::detail::dependency_kind>{}(r.kind),
std::hash<celerity::detail::dependency_origin>{}(r.origin));
return seed;
};
};

template <>
struct std::hash<celerity::detail::side_effect_map> {
std::size_t operator()(const celerity::detail::side_effect_map& m) const noexcept {
std::size_t seed = 0;
for(auto& [hoid, order] : m) {
celerity::detail::utils::hash_combine(
seed, std::hash<celerity::detail::host_object_id>{}(hoid), std::hash<celerity::experimental::side_effect_order>{}(order));
}
return seed;
};
};

template <>
struct std::hash<celerity::detail::task_record> {
std::size_t operator()(const celerity::detail::task_record& t) const noexcept {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::task_id>{}(t.tid), std::hash<std::string>{}(t.debug_name),
std::hash<celerity::detail::collective_group_id>{}(t.cgid), std::hash<celerity::detail::task_type>{}(t.type),
std::hash<celerity::detail::task_geometry>{}(t.geometry), celerity::detail::utils::vector_hash{}(t.reductions),
celerity::detail::utils::vector_hash{}(t.accesses), std::hash<celerity::detail::side_effect_map>{}(t.side_effect_map),
celerity::detail::utils::vector_hash{}(t.dependencies));

return seed;
};
};

template <>
struct fmt::formatter<celerity::detail::dependency_kind> : fmt::formatter<std::string> {
static format_context::iterator format(const celerity::detail::dependency_kind& dk, format_context& ctx) {
auto out = ctx.out();
switch(dk) {
case celerity::detail::dependency_kind::anti_dep: out = std::copy_n("anti-dep", 8, out); break;
case celerity::detail::dependency_kind::true_dep: out = std::copy_n("true-dep", 8, out); break;
}
return out;
}
};

template <>
struct fmt::formatter<celerity::detail::dependency_origin> : fmt::formatter<std::string> {
static format_context::iterator format(const celerity::detail::dependency_origin& dk, format_context& ctx) {
auto out = ctx.out();
switch(dk) {
case celerity::detail::dependency_origin::dataflow: out = std::copy_n("dataflow", 8, out); break;
case celerity::detail::dependency_origin::collective_group_serialization: out = std::copy_n("collective-group-serialization", 31, out); break;
case celerity::detail::dependency_origin::execution_front: out = std::copy_n("execution-front", 15, out); break;
case celerity::detail::dependency_origin::last_epoch: out = std::copy_n("last-epoch", 10, out); break;
}
return out;
}
};
3 changes: 3 additions & 0 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "command.h"
#include "config.h"
#include "device_queue.h"
#include "divergence_block_chain.h"
#include "frame.h"
#include "host_queue.h"
#include "recorders.h"
Expand Down Expand Up @@ -101,6 +102,8 @@ namespace detail {
size_t m_num_nodes;
node_id m_local_nid;

std::unique_ptr<abstract_block_chain> m_divergence_check;

// These management classes are only constructed on the master node.
std::unique_ptr<command_graph> m_cdag;
std::unique_ptr<scheduler> m_schdlr;
Expand Down
29 changes: 29 additions & 0 deletions include/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "lifetime_extending_state.h"
#include "range_mapper.h"
#include "types.h"
#include "utils.h"

namespace celerity {

Expand Down Expand Up @@ -273,3 +274,31 @@ namespace detail {

} // namespace detail
} // namespace celerity

template <>
struct std::hash<celerity::detail::task_geometry> {
std::size_t operator()(const celerity::detail::task_geometry& g) const noexcept {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(g.dimensions), std::hash<celerity::range<3>>{}(g.global_size),
std::hash<celerity::id<3>>{}(g.global_offset), std::hash<celerity::range<3>>{}(g.granularity));
return seed;
};
};

template <>
struct fmt::formatter<celerity::detail::task_type> : fmt::formatter<std::string> {
static format_context::iterator format(const celerity::detail::task_type& tt, format_context& ctx) {
auto out = ctx.out();
switch(tt) {
case celerity::detail::task_type::epoch: out = std::copy_n("epoch", 5, out); break;
case celerity::detail::task_type::host_compute: out = std::copy_n("host-compute", 12, out); break;
case celerity::detail::task_type::device_compute: out = std::copy_n("device-compute", 14, out); break;
case celerity::detail::task_type::collective: out = std::copy_n("collective", 10, out); break;
case celerity::detail::task_type::master_node: out = std::copy_n("master-node", 11, out); break;
case celerity::detail::task_type::horizon: out = std::copy_n("horizon", 7, out); break;
case celerity::detail::task_type::fence: out = std::copy_n("fence", 5, out); break;
default: out = std::copy_n("unknown", 7, out); break;
}
return out;
}
};
Loading

0 comments on commit 4af1341

Please sign in to comment.