Skip to content

Commit

Permalink
added celerity blockchain for task divergence checking
Browse files Browse the repository at this point in the history
  • Loading branch information
GagaLP committed Oct 9, 2023
1 parent cde3587 commit 6c3f128
Show file tree
Hide file tree
Showing 18 changed files with 929 additions and 25 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down
4 changes: 1 addition & 3 deletions include/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ namespace debug {
return detail::get_buffer_name(buff);
}

inline void set_task_name(celerity::handler& cgh, const std::string& debug_name) {
detail::set_task_name(cgh, debug_name);
}
inline void set_task_name(celerity::handler& cgh, const std::string& debug_name) { detail::set_task_name(cgh, debug_name); }
} // namespace debug
} // namespace celerity
179 changes: 179 additions & 0 deletions include/divergence_block_chain.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#pragma once

#include "recorders.h"
#include <mutex>
#include <thread>
#include <vector>

namespace celerity::detail {
// in c++23 replace this with mdspan
template <typename T>
struct mpi_multidim_send_wrapper {
public:
const T& operator[](std::pair<int, int> ij) const {
assert(ij.first * m_width + ij.second < m_data.size());
return m_data[ij.first * m_width + ij.second];
}

T* data() { return m_data.data(); }

mpi_multidim_send_wrapper(size_t width, size_t height) : m_data(width * height), m_width(width){};

private:
std::vector<T> m_data;
const size_t m_width;
};

// Probably replace this in c++20 with span
template <typename T>
struct window {
public:
window(const std::vector<T>& value) : m_value(value) {}

const T& operator[](size_t i) const {
assert(i >= 0 && i < m_width);
return m_value[m_offset + i];
}

size_t size() {
m_width = m_value.size() - m_offset;
return m_width;
}

void slide(size_t i) {
assert(i == 0 || (i >= 0 && i <= m_width));
m_offset += i;
m_width -= i;
}

private:
const std::vector<T>& m_value;
size_t m_offset = 0;
size_t m_width = 0;
};

using task_hash = size_t;
using task_hash_data = mpi_multidim_send_wrapper<task_hash>;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;

class abstract_block_chain {
friend struct abstract_block_chain_testspy;

public:
virtual void stop() { m_is_running = false; };

abstract_block_chain(const abstract_block_chain&) = delete;
abstract_block_chain& operator=(const abstract_block_chain&) = delete;
abstract_block_chain& operator=(abstract_block_chain&&) = delete;

abstract_block_chain(abstract_block_chain&&) = default;

abstract_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm)
: m_local_nid(local_nid), m_num_nodes(num_nodes), m_sizes(num_nodes), m_task_recorder_window(task_recorder), m_comm(comm) {}

virtual ~abstract_block_chain() = default;

protected:
void start() { m_is_running = true; };

virtual void run() = 0;

virtual void divergence_out(const divergence_map& check_map, const int task_num) = 0;

void add_new_hashes();
void clear(const int min_progress);
virtual void allgather_sizes();
virtual void allgather_hashes(const int max_size, task_hash_data& data);
std::pair<int, int> collect_sizes();
task_hash_data collect_hashes(const int max_size);
divergence_map create_check_map(const task_hash_data& task_graphs, const int task_num) const;

void check_for_deadlock() const;

static void print_node_divergences(const divergence_map& check_map, const int task_num);

static void print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);

virtual void dedub_print_task_record(const divergence_map& check_map, const int task_num) const;

bool check_for_divergence();

protected:
node_id m_local_nid;
size_t m_num_nodes;

std::vector<task_hash> m_hashes;
std::vector<int> m_sizes;

bool m_is_running = true;

window<task_record> m_task_recorder_window;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();

MPI_Comm m_comm;
};

class single_node_test_divergence_block_chain : public abstract_block_chain {
public:
single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm,
const std::vector<std::reference_wrapper<const std::vector<task_record>>>& other_task_records)
: abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) {
for(auto& tsk_rcd : other_task_records) {
m_other_task_records.push_back(window<task_record>(tsk_rcd));
}
}

private:
void run() override {}

void divergence_out(const divergence_map& check_map, const int task_num) override;
void allgather_sizes() override;
void allgather_hashes(const int max_size, task_hash_data& data) override;

void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override;

std::vector<std::vector<task_hash>> m_other_hashes;
std::vector<window<task_record>> m_other_task_records;

int m_injected_delete_size = 0;
};

class distributed_test_divergence_block_chain : public abstract_block_chain {
public:
distributed_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {}

private:
void run() override {}

void divergence_out(const divergence_map& check_map, const int task_num) override;
};

class divergence_block_chain : public abstract_block_chain {
public:
void start();
void stop() override;

divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {
divergence_block_chain::start();
}

divergence_block_chain(const divergence_block_chain&) = delete;
divergence_block_chain& operator=(const divergence_block_chain&) = delete;
divergence_block_chain& operator=(divergence_block_chain&&) = delete;

divergence_block_chain(divergence_block_chain&&) = default;

~divergence_block_chain() override { divergence_block_chain::stop(); }

private:
void run() override;

void divergence_out(const divergence_map& check_map, const int task_num) override;

private:
std::thread m_thread;
};
} // namespace celerity::detail
22 changes: 22 additions & 0 deletions include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <gch/small_vector.hpp>

#include "ranges.h"
#include "utils.h"
#include "workaround.h"

namespace celerity::detail {
Expand Down Expand Up @@ -257,6 +258,27 @@ class region {

} // namespace celerity::detail

template <int Dims>
struct std::hash<celerity::detail::box<Dims>> {
std::size_t operator()(const celerity::detail::box<Dims> r) {
std::size_t seed = 0;
celerity::detail::utils::hash_combine(seed, std::hash<celerity::id<Dims>>{}(r.get_min()), std::hash<celerity::id<Dims>>{}(r.get_max()));
return seed;
};
};

template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(size_t i = 0; i < r.get_boxes().size(); ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(r.get_boxes()[i]));
}
return seed;
};
};


namespace celerity::detail::grid_detail {

// forward-declaration for tests (explicitly instantiated)
Expand Down
10 changes: 3 additions & 7 deletions include/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class handler {
friend void detail::add_reduction(handler& cgh, const detail::reduction_info& rinfo);
friend void detail::extend_lifetime(handler& cgh, std::shared_ptr<detail::lifetime_extending_state> state);

friend void detail::set_task_name(handler &cgh, const std::string& debug_name);
friend void detail::set_task_name(handler& cgh, const std::string& debug_name);

detail::task_id m_tid;
detail::buffer_access_map m_access_map;
Expand Down Expand Up @@ -462,11 +462,9 @@ class handler {
}
// Note that cgf_diagnostics has a similar check, but we don't catch void side effects there.
if(!m_side_effects.empty()) { throw std::runtime_error{"Side effects cannot be used in device kernels"}; }
m_task =
detail::task::make_device_compute(m_tid, geometry, std::move(launcher), std::move(m_access_map), std::move(m_reductions));
m_task = detail::task::make_device_compute(m_tid, geometry, std::move(launcher), std::move(m_access_map), std::move(m_reductions));

m_task->set_debug_name(m_usr_def_task_name.value_or(debug_name));

}

void create_collective_task(detail::collective_group_id cgid, std::unique_ptr<detail::command_launcher_storage_base> launcher) {
Expand Down Expand Up @@ -588,9 +586,7 @@ namespace detail {

inline void extend_lifetime(handler& cgh, std::shared_ptr<detail::lifetime_extending_state> state) { cgh.extend_lifetime(std::move(state)); }

inline void set_task_name(handler& cgh, const std::string& debug_name) {
cgh.m_usr_def_task_name = {debug_name};
}
inline void set_task_name(handler& cgh, const std::string& debug_name) { cgh.m_usr_def_task_name = {debug_name}; }

// TODO: The _impl functions in detail only exist during the grace period for deprecated reductions on const buffers; move outside again afterwards.
template <typename DataT, int Dims, typename BinaryOperation>
Expand Down
4 changes: 3 additions & 1 deletion include/print_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "grid.h"
#include "ranges.h"

#include "intrusive_graph.h"

#include <spdlog/fmt/fmt.h>

template <typename Interface, int Dims>
Expand Down Expand Up @@ -70,4 +72,4 @@ struct fmt::formatter<celerity::chunk<Dims>> : fmt::formatter<celerity::subrange
out = formatter<celerity::id<Dims>>::format(celerity::id(chunk.global_size), ctx); // cast to id to avoid multiple inheritance
return out;
}
};
};
23 changes: 23 additions & 0 deletions include/ranges.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "sycl_wrappers.h"
#include "utils.h"
#include "workaround.h"

namespace celerity {
Expand Down Expand Up @@ -229,6 +230,17 @@ struct ones_t {

}; // namespace celerity::detail

template <typename Interface, int Dims>
struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
}
return seed;
};
};

namespace celerity {

template <int Dims>
Expand Down Expand Up @@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>;

} // namespace celerity


template <int Dims>
struct std::hash<celerity::range<Dims>> {
std::size_t operator()(const celerity::range<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::range<Dims>, Dims>>{}(r); };
};

template <int Dims>
struct std::hash<celerity::id<Dims>> {
std::size_t operator()(const celerity::id<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::id<Dims>, Dims>>{}(r); };
};

namespace celerity {
namespace detail {

Expand Down
Loading

0 comments on commit 6c3f128

Please sign in to comment.