From d5d2e905e5c0f230ab6f6596acab036e7ff38143 Mon Sep 17 00:00:00 2001 From: Gabriel Mitterrutzner Date: Mon, 2 Oct 2023 08:37:10 +0200 Subject: [PATCH] added celerity blockchain for task divergence checking --- CMakeLists.txt | 1 + include/divergence_block_chain.h | 179 ++++++++++++++++++++++ include/grid.h | 22 +++ include/print_utils.h | 2 +- include/ranges.h | 23 +++ include/recorders.h | 82 ++++++++++ include/runtime.h | 3 + include/task.h | 29 ++++ include/utils.h | 23 ++- src/divergence_block_chain.cc | 250 +++++++++++++++++++++++++++++++ src/runtime.cc | 18 +++ test/CMakeLists.txt | 1 + test/divergence_check_tests.cc | 199 ++++++++++++++++++++++++ test/system/distr_tests.cc | 79 ++++++++++ test/test_utils.h | 10 ++ 15 files changed, 916 insertions(+), 5 deletions(-) create mode 100644 include/divergence_block_chain.h create mode 100644 src/divergence_block_chain.cc create mode 100644 test/divergence_check_tests.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index a7335e206..4fd0f75c5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -187,6 +187,7 @@ set(SOURCES src/command_graph.cc src/config.cc src/device_queue.cc + src/divergence_block_chain.cc src/executor.cc src/distributed_graph_generator.cc src/graph_serializer.cc diff --git a/include/divergence_block_chain.h b/include/divergence_block_chain.h new file mode 100644 index 000000000..349721f58 --- /dev/null +++ b/include/divergence_block_chain.h @@ -0,0 +1,179 @@ +#pragma once + +#include "recorders.h" +#include +#include +#include + +namespace celerity::detail { +// in c++23 replace this with mdspan +template +struct mpi_2d_send_wrapper { + public: + const T& operator[](std::pair ij) const { + assert(ij.first * m_width + ij.second < m_data.size()); + return m_data[ij.first * m_width + ij.second]; + } + + T* data() { return m_data.data(); } + + mpi_2d_send_wrapper(size_t width, size_t height) : m_data(width * height), m_width(width){}; + + private: + std::vector m_data; + const size_t m_width; +}; + +// Probably replace this in c++20 with span +template +struct window { + public: + window(const std::vector& value) : m_value(value) {} + + const T& operator[](size_t i) const { + assert(i >= 0 && i < m_width); + return m_value[m_offset + i]; + } + + size_t size() { + m_width = m_value.size() - m_offset; + return m_width; + } + + void slide(size_t i) { + assert(i == 0 || (i >= 0 && i <= m_width)); + m_offset += i; + m_width -= i; + } + + private: + const std::vector& m_value; + size_t m_offset = 0; + size_t m_width = 0; +}; + +using task_hash = size_t; +using task_hash_data = mpi_2d_send_wrapper; +using divergence_map = std::unordered_map>; + +class abstract_block_chain { + friend struct abstract_block_chain_testspy; + + public: + virtual void stop() { m_is_running = false; }; + + abstract_block_chain(const abstract_block_chain&) = delete; + abstract_block_chain& operator=(const abstract_block_chain&) = delete; + abstract_block_chain& operator=(abstract_block_chain&&) = delete; + + abstract_block_chain(abstract_block_chain&&) = default; + + abstract_block_chain(size_t num_nodes, node_id local_nid, const std::vector& task_recorder, MPI_Comm comm) + : m_local_nid(local_nid), m_num_nodes(num_nodes), m_sizes(num_nodes), m_task_recorder_window(task_recorder), m_comm(comm) {} + + virtual ~abstract_block_chain() = default; + + protected: + void start() { m_is_running = true; }; + + virtual void run() = 0; + + virtual void divergence_out(const divergence_map& check_map, const int task_num) = 0; + + void add_new_hashes(); + void clear(const int min_progress); + virtual void allgather_sizes(); + virtual void allgather_hashes(const int max_size, task_hash_data& data); + std::pair collect_sizes(); + task_hash_data collect_hashes(const int max_size); + divergence_map create_check_map(const task_hash_data& task_graphs, const int task_num) const; + + void check_for_deadlock() const; + + static void print_node_divergences(const divergence_map& check_map, const int task_num); + + static void print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash); + + virtual void dedub_print_task_record(const divergence_map& check_map, const int task_num) const; + + bool check_for_divergence(); + + protected: + node_id m_local_nid; + size_t m_num_nodes; + + std::vector m_hashes; + std::vector m_sizes; + + bool m_is_running = true; + + window m_task_recorder_window; + + std::chrono::time_point m_last_cleared = std::chrono::steady_clock::now(); + + MPI_Comm m_comm; +}; + +class single_node_test_divergence_block_chain : public abstract_block_chain { + public: + single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector& task_recorder, MPI_Comm comm, + const std::vector>>& other_task_records) + : abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) { + for(auto& tsk_rcd : other_task_records) { + m_other_task_records.push_back(window(tsk_rcd)); + } + } + + private: + void run() override {} + + void divergence_out(const divergence_map& check_map, const int task_num) override; + void allgather_sizes() override; + void allgather_hashes(const int max_size, task_hash_data& data) override; + + void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override; + + std::vector> m_other_hashes; + std::vector> m_other_task_records; + + int m_injected_delete_size = 0; +}; + +class distributed_test_divergence_block_chain : public abstract_block_chain { + public: + distributed_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector& task_record, MPI_Comm comm) + : abstract_block_chain(num_nodes, local_nid, task_record, comm) {} + + private: + void run() override {} + + void divergence_out(const divergence_map& check_map, const int task_num) override; +}; + +class divergence_block_chain : public abstract_block_chain { + public: + void start(); + void stop() override; + + divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector& task_record, MPI_Comm comm) + : abstract_block_chain(num_nodes, local_nid, task_record, comm) { + divergence_block_chain::start(); + } + + divergence_block_chain(const divergence_block_chain&) = delete; + divergence_block_chain& operator=(const divergence_block_chain&) = delete; + divergence_block_chain& operator=(divergence_block_chain&&) = delete; + + divergence_block_chain(divergence_block_chain&&) = default; + + ~divergence_block_chain() override { divergence_block_chain::stop(); } + + private: + void run() override; + + void divergence_out(const divergence_map& check_map, const int task_num) override; + + private: + std::thread m_thread; +}; +} // namespace celerity::detail \ No newline at end of file diff --git a/include/grid.h b/include/grid.h index df3a3a8f8..9eb2ca29a 100644 --- a/include/grid.h +++ b/include/grid.h @@ -8,6 +8,7 @@ #include #include "ranges.h" +#include "utils.h" #include "workaround.h" namespace celerity::detail { @@ -257,6 +258,27 @@ class region { } // namespace celerity::detail +template +struct std::hash> { + std::size_t operator()(const celerity::detail::box r) { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash>{}(r.get_min()), std::hash>{}(r.get_max())); + return seed; + }; +}; + +template +struct std::hash> { + std::size_t operator()(const celerity::detail::region r) { + std::size_t seed = 0; + for(auto box : r.get_boxes()) { + celerity::detail::utils::hash_combine(seed, std::hash>{}(box)); + } + return seed; + }; +}; + + namespace celerity::detail::grid_detail { // forward-declaration for tests (explicitly instantiated) diff --git a/include/print_utils.h b/include/print_utils.h index 97d6ce653..bb67db8de 100644 --- a/include/print_utils.h +++ b/include/print_utils.h @@ -70,4 +70,4 @@ struct fmt::formatter> : fmt::formatter>::format(celerity::id(chunk.global_size), ctx); // cast to id to avoid multiple inheritance return out; } -}; +}; \ No newline at end of file diff --git a/include/ranges.h b/include/ranges.h index 110676933..5a62a7c74 100644 --- a/include/ranges.h +++ b/include/ranges.h @@ -1,6 +1,7 @@ #pragma once #include "sycl_wrappers.h" +#include "utils.h" #include "workaround.h" namespace celerity { @@ -229,6 +230,17 @@ struct ones_t { }; // namespace celerity::detail +template +struct std::hash> { + std::size_t operator()(const celerity::detail::coordinate& r) const noexcept { + std::size_t seed = 0; + for(int i = 0; i < Dims; ++i) { + celerity::detail::utils::hash_combine(seed, std::hash{}(r[i])); + } + return seed; + }; +}; + namespace celerity { template @@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>; } // namespace celerity + +template +struct std::hash> { + std::size_t operator()(const celerity::range& r) const noexcept { return std::hash, Dims>>{}(r); }; +}; + +template +struct std::hash> { + std::size_t operator()(const celerity::id& r) const noexcept { return std::hash, Dims>>{}(r); }; +}; + namespace celerity { namespace detail { diff --git a/include/recorders.h b/include/recorders.h index caf45b8c6..28e4e43d7 100644 --- a/include/recorders.h +++ b/include/recorders.h @@ -114,3 +114,85 @@ class command_recorder { }; } // namespace celerity::detail + +template <> +struct std::hash { + std::size_t operator()(const celerity::detail::reduction_record& r) const noexcept { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash{}(r.rid), std::hash{}(r.bid), + std::hash{}(r.buffer_name), std::hash{}(r.init_from_buffer)); + return seed; + }; +}; + +template <> +struct std::hash { + std::size_t operator()(const celerity::detail::access_record& r) { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash{}(r.bid), std::hash{}(r.buffer_name), + std::hash{}(r.mode), std::hash>{}(r.req)); + return seed; + }; +}; + +template +struct std::hash> { + std::size_t operator()(const celerity::detail::dependency_record& r) const noexcept { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash{}(r.node), std::hash{}(r.kind), + std::hash{}(r.origin)); + return seed; + }; +}; + +template <> +struct std::hash { + std::size_t operator()(const celerity::detail::side_effect_map& m) const noexcept { + std::size_t seed = 0; + for(auto& [hoid, order] : m) { + celerity::detail::utils::hash_combine( + seed, std::hash{}(hoid), std::hash{}(order)); + } + return seed; + }; +}; + +template <> +struct std::hash { + std::size_t operator()(const celerity::detail::task_record& t) const noexcept { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash{}(t.tid), std::hash{}(t.debug_name), + std::hash{}(t.cgid), std::hash{}(t.type), + std::hash{}(t.geometry), celerity::detail::utils::vector_hash{}(t.reductions), + celerity::detail::utils::vector_hash{}(t.accesses), std::hash{}(t.side_effect_map), + celerity::detail::utils::vector_hash{}(t.dependencies)); + + return seed; + }; +}; + +template <> +struct fmt::formatter : fmt::formatter { + static format_context::iterator format(const celerity::detail::dependency_kind& dk, format_context& ctx) { + auto out = ctx.out(); + switch(dk) { + case celerity::detail::dependency_kind::anti_dep: out = std::copy_n("anti-dep", 8, out); break; + case celerity::detail::dependency_kind::true_dep: out = std::copy_n("true-dep", 8, out); break; + } + return out; + } +}; + +template <> +struct fmt::formatter : fmt::formatter { + static format_context::iterator format(const celerity::detail::dependency_origin& dk, format_context& ctx) { + auto out = ctx.out(); + switch(dk) { + case celerity::detail::dependency_origin::dataflow: out = std::copy_n("dataflow", 8, out); break; + case celerity::detail::dependency_origin::collective_group_serialization: out = std::copy_n("collective-group-serialization", 31, out); break; + case celerity::detail::dependency_origin::execution_front: out = std::copy_n("execution-front", 15, out); break; + case celerity::detail::dependency_origin::last_epoch: out = std::copy_n("last-epoch", 10, out); break; + } + return out; + } +}; \ No newline at end of file diff --git a/include/runtime.h b/include/runtime.h index fb2672619..2b2ea213d 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -7,6 +7,7 @@ #include "command.h" #include "config.h" #include "device_queue.h" +#include "divergence_block_chain.h" #include "frame.h" #include "host_queue.h" #include "recorders.h" @@ -101,6 +102,8 @@ namespace detail { size_t m_num_nodes; node_id m_local_nid; + std::unique_ptr m_divergence_check; + // These management classes are only constructed on the master node. std::unique_ptr m_cdag; std::unique_ptr m_schdlr; diff --git a/include/task.h b/include/task.h index e4d09e529..7744f6b65 100644 --- a/include/task.h +++ b/include/task.h @@ -13,6 +13,7 @@ #include "lifetime_extending_state.h" #include "range_mapper.h" #include "types.h" +#include "utils.h" namespace celerity { @@ -273,3 +274,31 @@ namespace detail { } // namespace detail } // namespace celerity + +template <> +struct std::hash { + std::size_t operator()(const celerity::detail::task_geometry& g) const noexcept { + std::size_t seed = 0; + celerity::detail::utils::hash_combine(seed, std::hash{}(g.dimensions), std::hash>{}(g.global_size), + std::hash>{}(g.global_offset), std::hash>{}(g.granularity)); + return seed; + }; +}; + +template <> +struct fmt::formatter : fmt::formatter { + static format_context::iterator format(const celerity::detail::task_type& tt, format_context& ctx) { + auto out = ctx.out(); + switch(tt) { + case celerity::detail::task_type::epoch: out = std::copy_n("epoch", 5, out); break; + case celerity::detail::task_type::host_compute: out = std::copy_n("host-compute", 12, out); break; + case celerity::detail::task_type::device_compute: out = std::copy_n("device-compute", 14, out); break; + case celerity::detail::task_type::collective: out = std::copy_n("collective", 10, out); break; + case celerity::detail::task_type::master_node: out = std::copy_n("master-node", 11, out); break; + case celerity::detail::task_type::horizon: out = std::copy_n("horizon", 7, out); break; + case celerity::detail::task_type::fence: out = std::copy_n("fence", 5, out); break; + default: out = std::copy_n("unknown", 7, out); break; + } + return out; + } +}; diff --git a/include/utils.h b/include/utils.h index e46f980fe..96c7ec094 100644 --- a/include/utils.h +++ b/include/utils.h @@ -41,15 +41,30 @@ decltype(auto) match(Variant&& v, Arms&&... arms) { return std::visit(overload{std::forward(arms)...}, std::forward(v)); } -// Implementation from Boost.ContainerHash, licensed under the Boost Software License, Version 1.0. -inline void hash_combine(std::size_t& seed, std::size_t value) { seed ^= value + 0x9e3779b9 + (seed << 6) + (seed >> 2); } +// A parameter pack extension to the implementation from Boost.ContainerHash, licensed under the Boost Software License, Version 1.0. +template +inline void hash_combine(std::size_t& seed, const T& v, const Rest&... rest) { + seed ^= std::hash{}(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + (hash_combine(seed, rest), ...); +} struct pair_hash { template std::size_t operator()(const std::pair& p) const { std::size_t seed = 0; - hash_combine(seed, std::hash{}(p.first)); - hash_combine(seed, std::hash{}(p.second)); + hash_combine(seed, std::hash{}(p.first), std::hash{}(p.second)); + return seed; + } +}; + + +struct vector_hash { + template + std::size_t operator()(const std::vector& v) const { + std::size_t seed = 0; + for(auto& e : v) { + hash_combine(seed, std::hash{}(e)); + } return seed; } }; diff --git a/src/divergence_block_chain.cc b/src/divergence_block_chain.cc new file mode 100644 index 000000000..e934cae83 --- /dev/null +++ b/src/divergence_block_chain.cc @@ -0,0 +1,250 @@ +#include "divergence_block_chain.h" + +namespace celerity::detail { +void abstract_block_chain::add_new_hashes() { + for(size_t i = 0; i < m_task_recorder_window.size(); ++i) { + std::size_t seed = m_hashes.empty() ? 0 : m_hashes.back(); + celerity::detail::utils::hash_combine(seed, std::hash{}(m_task_recorder_window[i])); + m_hashes.push_back(seed); + } +} + +void abstract_block_chain::clear(const int min_progress) { + m_hashes.erase(m_hashes.begin(), m_hashes.begin() + min_progress); + m_last_cleared = std::chrono::steady_clock::now(); +} + +void abstract_block_chain::allgather_sizes() { MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, m_sizes.data(), 1, MPI_INT, m_comm); } + +std::pair abstract_block_chain::collect_sizes() { + m_sizes[m_local_nid] = static_cast(m_hashes.size()); + + allgather_sizes(); + + const auto [min, max] = std::minmax_element(m_sizes.cbegin(), m_sizes.cend()); + + return {*min, *max}; +} + +void abstract_block_chain::allgather_hashes(const int max_size, task_hash_data& data) { + MPI_Allgather(m_hashes.data(), max_size, MPI_UNSIGNED_LONG, data.data(), max_size, MPI_UNSIGNED_LONG, m_comm); +} + +task_hash_data abstract_block_chain::collect_hashes(const int max_size) { + if(m_hashes.size() < static_cast(max_size)) { m_hashes.resize(max_size); } + + task_hash_data data(max_size, m_num_nodes); + + allgather_hashes(max_size, data); + return data; +} + + +divergence_map abstract_block_chain::create_check_map(const task_hash_data& task_graphs, const int task_num) const { + divergence_map check_map; + for(int i = 0; i < static_cast(m_num_nodes); ++i) { + check_map[task_graphs[{i, task_num}]].push_back(i); + } + return check_map; +} + +void abstract_block_chain::check_for_deadlock() const { + auto diff = std::chrono::duration_cast(std::chrono::steady_clock::now() - m_last_cleared); + static auto last = std::chrono::seconds(0); + + if(diff >= std::chrono::seconds(10) && diff - last >= std::chrono::seconds(5)) { + std::string warning = fmt::format("After {} seconds of waiting nodes", diff.count()); + + for(size_t i = 0; i < m_num_nodes; ++i) { + if(m_sizes[i] == 0) { warning += fmt::format(" {},", i); } + } + + warning += " did not move to the next task. The runtime might be stuck."; + + CELERITY_WARN("{}", warning); + last = diff; + } +} + +void abstract_block_chain::print_node_divergences(const divergence_map& check_map, const int task_num) { + std::string error = fmt::format("Divergence detected in task graph at index {}:\n\n", task_num); + for(auto& [hash, nodes] : check_map) { + error += fmt::format("{:#x} on nodes ", hash); + for(auto& node : nodes) { + error += fmt::format("{} ", node); + } + error += "\n"; + } + CELERITY_ERROR("{}", error); +} + +void abstract_block_chain::print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash) { + std::string task_record_output = fmt::format("Task record for hash {:#x}:\n\n", hash); + task_record_output += fmt::format("id: {}, debug_name: {}, type: {}, cgid: {}\n", task.tid, task.debug_name, task.type, task.cgid); + const auto& geometry = task.geometry; + task_record_output += fmt::format("geometry:\n"); + task_record_output += fmt::format("\t dimensions: {}, global_size: {}, global_offset: {}, granularity: {}\n", geometry.dimensions, geometry.global_size, + geometry.global_offset, geometry.granularity); + + if(!task.reductions.empty()) { + task_record_output += fmt::format("reductions: \n"); + for(const auto& red : task.reductions) { + task_record_output += fmt::format( + "\t id: {}, bid: {}, buffer_name: {}, init_from_buffer: {}\n", red.rid, red.bid, red.buffer_name, red.init_from_buffer ? "true" : "false"); + } + } + + if(!task.accesses.empty()) { + task_record_output += fmt::format("accesses: \n"); + for(const auto& acc : task.accesses) { + task_record_output += fmt::format("\t bid: {}, buffer_name: {}, mode: {}, req: {}\n", acc.bid, acc.buffer_name, acc.mode, acc.req); + } + } + + if(!task.side_effect_map.empty()) { + task_record_output += fmt::format("side_effect_map: \n"); + for(const auto& [hoid, order] : task.side_effect_map) { + task_record_output += fmt::format("\t hoid: {}, order: {}\n", hoid, order); + } + } + + if(!task.dependencies.empty()) { + task_record_output += fmt::format("dependencies: \n"); + for(const auto& dep : task.dependencies) { + task_record_output += fmt::format("\t node: {}, kind: {}, origin: {}\n", dep.node, dep.kind, dep.origin); + } + } + CELERITY_ERROR("{}", task_record_output); +} + +void abstract_block_chain::dedub_print_task_record(const divergence_map& check_map, const int task_num) const { + for(auto& [hash, nodes] : check_map) { + if(nodes[0] == m_local_nid) { print_task_record(check_map, m_task_recorder_window[task_num], hash); } + } +} + +bool abstract_block_chain::check_for_divergence() { + add_new_hashes(); + + const auto [min_size, max_size] = collect_sizes(); + + if(min_size == 0) { + if(max_size != 0 && m_local_nid == 0) { + check_for_deadlock(); + } else if(max_size == 0 && !m_is_running) { + return true; + } + return false; + } + + task_hash_data task_graphs = collect_hashes(max_size); + + for(int j = 0; j < min_size; ++j) { + divergence_map check_map = create_check_map(task_graphs, j); + + if(check_map.size() > 1) { divergence_out(check_map, j); } + } + + m_task_recorder_window.slide(min_size); + clear(min_size); + + return false; +} + +void single_node_test_divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) { + if(m_local_nid == 0) { print_node_divergences(check_map, task_num); } + + dedub_print_task_record(check_map, task_num); +} + +void single_node_test_divergence_block_chain::allgather_sizes() { + for(size_t i = 0; i < m_num_nodes - 1; ++i) { + auto& other_hashes = m_other_hashes[i]; + other_hashes.erase(other_hashes.begin(), other_hashes.begin() + m_injected_delete_size); + m_other_task_records[i].slide(m_injected_delete_size); + } + + for(size_t i = 0; i < m_num_nodes - 1; ++i) { + for(size_t j = 0; j < m_other_task_records[i].size(); ++j) { + std::size_t seed = m_other_hashes[i].empty() ? 0 : m_other_hashes[i].back(); + celerity::detail::utils::hash_combine(seed, std::hash{}(m_other_task_records[i][j])); + m_other_hashes[i].push_back(seed); + } + } + + for(size_t i = 1; i < m_num_nodes; ++i) { + m_sizes[i] = static_cast(m_other_hashes[i - 1].size()); + } + + m_injected_delete_size = *std::min_element(m_sizes.cbegin(), m_sizes.cend()); +} + +void single_node_test_divergence_block_chain::allgather_hashes(const int max_size, task_hash_data& data) { + auto data_data = data.data(); + for(size_t i = 0; i < m_num_nodes - 1; ++i) { + for(int j = 0; j < max_size; ++j) { + auto index = (i + 1) * max_size + j; + if(m_other_hashes[i].size() > static_cast(j)) { + data_data[index] = m_other_hashes[i][j]; + } else { + data_data[index] = 0; + } + } + } + + for(int j = 0; j < max_size; ++j) { + data_data[j] = m_hashes[j]; + } +} + +void single_node_test_divergence_block_chain::dedub_print_task_record(const divergence_map& check_map, const int task_num) const { + for(auto& [hash, nodes] : check_map) { + if(nodes[0] == m_local_nid) { + print_task_record(check_map, m_task_recorder_window[task_num], hash); + } else { + print_task_record(check_map, m_other_task_records[nodes[0] - 1][task_num], hash); + } + } +} + +void distributed_test_divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) { + if(m_local_nid == 0) { print_node_divergences(check_map, task_num); } + + // sleep for local_nid * 100 ms such that we have a no lock synchronized output + std::this_thread::sleep_for(std::chrono::milliseconds(m_local_nid * 100)); + dedub_print_task_record(check_map, task_num); +} + + +void divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) { + if(m_local_nid == 0) { print_node_divergences(check_map, task_num); } + + // sleep for local_nid * 100 ms such that we have a no lock synchronized output + std::this_thread::sleep_for(std::chrono::milliseconds(m_local_nid * 100)); + + dedub_print_task_record(check_map, task_num); + + MPI_Barrier(m_comm); + + throw std::runtime_error("Divergence in task graph detected"); +} + +void divergence_block_chain::start() { + stop(); + m_thread = std::thread(&divergence_block_chain::run, this); + m_is_running = true; +} + +void divergence_block_chain::run() { + bool is_finished = false; + while(!is_finished) { + is_finished = check_for_divergence(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + +void divergence_block_chain::stop() { + m_is_running = false; + if(m_thread.joinable()) { m_thread.join(); } +} +} // namespace celerity::detail \ No newline at end of file diff --git a/src/runtime.cc b/src/runtime.cc index a73ed3b6e..8dd1f18c0 100644 --- a/src/runtime.cc +++ b/src/runtime.cc @@ -159,6 +159,16 @@ namespace detail { m_schdlr = std::make_unique(is_dry_run(), std::move(dggen), *m_exec); m_task_mngr->register_task_callback([this](const task* tsk) { m_schdlr->notify_task_created(tsk); }); + if(m_cfg->is_recording()) { + MPI_Comm comm = nullptr; + MPI_Comm_dup(MPI_COMM_WORLD, &comm); + if(m_test_mode) { + m_divergence_check = std::make_unique(m_num_nodes, m_local_nid, m_task_recorder->get_tasks(), comm); + } else { + m_divergence_check = std::make_unique(m_num_nodes, m_local_nid, m_task_recorder->get_tasks(), comm); + } + } + CELERITY_INFO("Celerity runtime version {} running on {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(), get_pid(), get_build_type(), get_mimalloc_string()); m_d_queue->init(*m_cfg, user_device_or_selector); @@ -219,6 +229,14 @@ namespace detail { std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Avoid racing on stdout with other nodes (funneled through mpirun) CELERITY_TRACE("Command graph:\n\n{}\n", cmd_graph); } + + if(m_divergence_check != nullptr) { + // Sychronize all nodes before reseting shuch that we don't get into a deadlock + MPI_Barrier(MPI_COMM_WORLD); + m_divergence_check.reset(); + } else { + CELERITY_WARN("Divergence block chain not initialized"); + } } // Shutting down the task_manager will cause all buffers captured inside command group functions to unregister. diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0ee0566fd..fd0f88341 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ set(TEST_TARGETS test_utils_tests utils_tests device_selection_tests + divergence_check_tests ) add_library(test_main test_main.cc grid_test_utils.cc) diff --git a/test/divergence_check_tests.cc b/test/divergence_check_tests.cc new file mode 100644 index 000000000..b7dd5479c --- /dev/null +++ b/test/divergence_check_tests.cc @@ -0,0 +1,199 @@ +#include +#include +#include + +#include + +#include "log_test_utils.h" +#include "test_utils.h" + + +using namespace celerity; +using namespace celerity::detail; +using namespace celerity::test_utils; +using celerity::access::fixed; + +TEST_CASE("Test diverged task execution on device tasks", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{2, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf_two = tt_two.mbf.create_buffer(range<1>(128)); + + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { buf.get_access(cgh, fixed<1>{{0, 64}}); }); + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { buf.get_access(cgh, fixed<1>{{0, 128}}); }); + test_utils::add_compute_task(tt_two.tm, [&](handler& cgh) { buf_two.get_access(cgh, fixed<1>{{64, 128}}); }); + + test_utils::log_capture log_capture; + + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + CHECK_THAT(log_capture.get_log(), Catch::Matchers::ContainsSubstring("Divergence detected")); +} + +TEST_CASE("Test divergence free task execution on device", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{2, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf_two = tt_two.mbf.create_buffer(range<1>(128)); + + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { + // manually set the name because SYCL needs the class tag to be unique making the default name different. + celerity::debug::set_task_name(cgh, "task_a"); + buf.get_access(cgh, fixed<1>{{0, 64}}); + }); + + test_utils::add_compute_task(tt_two.tm, [&](handler& cgh) { + // manually set the name because SYCL needs the class tag to be unique making the default name different. + celerity::debug::set_task_name(cgh, "task_a"); + buf_two.get_access(cgh, fixed<1>{{0, 64}}); + }); + + test_utils::log_capture log_capture; + + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + CHECK_THAT(log_capture.get_log(), !Catch::Matchers::ContainsSubstring("Divergence detected")); + + div_test.stop(); + + CHECK(abstract_block_chain_testspy::call_check_for_divergence(div_test)); +} + +TEST_CASE("Test diverged task execution on host task", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{2, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf_two = tt_two.mbf.create_buffer(range<1>(128)); + + test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, fixed<1>({0, 128})); }); + test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, fixed<1>({64, 128})); }); + test_utils::add_host_task(tt_two.tm, on_master_node, [&](handler& cgh) { buf_two.get_access(cgh, fixed<1>({64, 128})); }); + + test_utils::log_capture log_capture; + + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + CHECK_THAT(log_capture.get_log(), Catch::Matchers::ContainsSubstring("Divergence detected")); +} + +TEST_CASE("Test divergence free task execution on host task", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{2, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf_two = tt_two.mbf.create_buffer(range<1>(128)); + + test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, fixed<1>({0, 128})); }); + test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, fixed<1>({64, 128})); }); + + test_utils::add_host_task(tt_two.tm, on_master_node, [&](handler& cgh) { buf_two.get_access(cgh, fixed<1>({0, 128})); }); + test_utils::add_host_task(tt_two.tm, on_master_node, [&](handler& cgh) { buf_two.get_access(cgh, fixed<1>({64, 128})); }); + + test_utils::log_capture log_capture; + + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + CHECK_THAT(log_capture.get_log(), !Catch::Matchers::ContainsSubstring("Divergence detected")); + + div_test.stop(); + + CHECK(abstract_block_chain_testspy::call_check_for_divergence(div_test)); +} + +TEST_CASE("Test divergence warning for tasks that are stale longer than 10 seconds", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{2, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + + test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, fixed<1>({0, 128})); }); + + test_utils::log_capture log_capture; + + // call two times because first time the start task has to be cleared + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + abstract_block_chain_testspy::set_last_cleared(div_test, (std::chrono::steady_clock::now() - std::chrono::seconds(10))); + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + CHECK_THAT(log_capture.get_log(), + Catch::Matchers::ContainsSubstring("After 10 seconds of waiting nodes 1, did not move to the next task. The runtime might be stuck.")); +} + +size_t get_hash(const std::vector& tasks, size_t start, size_t end) { + size_t seed = 0; + for(size_t i = start; i <= end; i++) { + utils::hash_combine(seed, std::hash{}(tasks[i])); + } + return seed; +} + +TEST_CASE("Test correct output of 3 different divergent tasks", "[divergence]") { + using namespace cl::sycl::access; + + auto tt = test_utils::task_test_context{}; + auto tt_two = test_utils::task_test_context{}; + auto tt_three = test_utils::task_test_context{}; + std::vector>> task_records{tt_two.trec.get_tasks(), tt_three.trec.get_tasks()}; + + single_node_test_divergence_block_chain div_test{task_records.size() + 1, 0, tt.trec.get_tasks(), nullptr, task_records}; + + auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf_two = tt_two.mbf.create_buffer(range<1>(128)); + auto buf_three = tt_three.mbf.create_buffer(range<1>(128)); + + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { + celerity::debug::set_task_name(cgh, "task_a"); + buf.get_access(cgh, fixed<1>{{0, 64}}); + }); + + test_utils::add_compute_task(tt_two.tm, [&](handler& cgh) { + celerity::debug::set_task_name(cgh, "task_a"); + buf_two.get_access(cgh, fixed<1>{{64, 128}}); + }); + + test_utils::add_compute_task(tt_three.tm, [&](handler& cgh) { + celerity::debug::set_task_name(cgh, "task_a"); + buf_three.get_access(cgh, fixed<1>{{0, 128}}); + }); + + test_utils::log_capture log_capture; + + abstract_block_chain_testspy::call_check_for_divergence(div_test); + + std::string error_msg = "Divergence detected in task graph at index 1:\n\n"; + + error_msg += fmt::format("{:#x} on nodes 2 \n", get_hash(tt_three.trec.get_tasks(), 0, 1)); + error_msg += fmt::format("{:#x} on nodes 1 \n", get_hash(tt_two.trec.get_tasks(), 0, 1)); + error_msg += fmt::format("{:#x} on nodes 0 \n", get_hash(tt.trec.get_tasks(), 0, 1)); + + CHECK_THAT(log_capture.get_log(), Catch::Matchers::ContainsSubstring(error_msg)); +} \ No newline at end of file diff --git a/test/system/distr_tests.cc b/test/system/distr_tests.cc index 788ee42d8..1ce6a0948 100644 --- a/test/system/distr_tests.cc +++ b/test/system/distr_tests.cc @@ -436,5 +436,84 @@ namespace detail { } } + TEST_CASE_METHOD(test_utils::runtime_fixture, "Check divergence of different nodes", "[divergence]") { + env::scoped_test_environment tenv(recording_enabled_env_setting); + + runtime::init(nullptr, nullptr); + + test_utils::log_capture log_capture; + + size_t n = 0; + size_t rank = 0; + + { + distr_queue queue; + + n = runtime::get_instance().get_num_nodes(); + REQUIRE(n > 1); + + auto& div_chek = runtime_testspy::get_divergence_block_chain(runtime::get_instance()); + + const auto range = celerity::range<1>(10000); + celerity::buffer buff(range); + + celerity::debug::set_buffer_name(buff, "mat_a"); + + rank = celerity::detail::runtime::get_instance().get_local_nid(); + + abstract_block_chain_testspy::call_check_for_divergence(div_chek); + + // here we need a divergence which doesn't result in a deadlock, because else we would run into ether a failed test or a incompletable test... + if(rank % 2 == 0) { + queue.submit([&](celerity::handler& cgh) { + celerity::accessor dw{buff, cgh, celerity::access::one_to_one{}, celerity::write_only, celerity::no_init}; + const auto range = buff.get_range(); + cgh.parallel_for(range, [=](celerity::item<1> item) { + if(item[0] % 2 == 0) { dw[item] = 2.5; } + }); + }); + } + + abstract_block_chain_testspy::set_last_cleared(div_chek, std::chrono::steady_clock::now() - std::chrono::seconds(10)); + abstract_block_chain_testspy::call_check_for_divergence(div_chek); + + if(rank % 2 == 1) { + queue.submit([&](celerity::handler& cgh) { + celerity::accessor dw{buff, cgh, celerity::access::one_to_one{}, celerity::write_only, celerity::no_init}; + const auto range = buff.get_range(); + cgh.parallel_for(range, [=](celerity::item<1> item) { + if(item[0] % 2 == 0) { dw[item] = 0.5; } + }); + }); + } + + queue.submit([&](celerity::handler& cgh) { + celerity::accessor acc{buff, cgh, celerity::access::all{}, celerity::read_only_host_task}; + const auto range = buff.get_range(); + cgh.host_task(celerity::on_master_node, [=] { + for(size_t i = 0; i < range.get(0); ++i) { + if(acc[i] == 3) { break; } + } + }); + }); + + abstract_block_chain_testspy::call_check_for_divergence(div_chek); + } + + // create the check text + std::string check_text = "After 10 seconds of waiting nodes "; + + for(unsigned long i = 0; i < n; ++i) { + if(i % 2 == 1) { check_text += std::to_string(i) + ", "; } + } + + check_text += "did not move to the next task. The runtime might be stuck."; + + if(rank == 0) { + const auto log = log_capture.get_log(); + CHECK_THAT(log, Catch::Matchers::ContainsSubstring(check_text)); + CHECK_THAT(log, Catch::Matchers::ContainsSubstring("Task record for ")); + } + } } // namespace detail } // namespace celerity \ No newline at end of file diff --git a/test/test_utils.h b/test/test_utils.h index c2a321061..820839bd3 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -21,6 +21,7 @@ #include "command_graph.h" #include "device_queue.h" #include "distributed_graph_generator.h" +#include "divergence_block_chain.h" #include "graph_serializer.h" #include "print_graph.h" #include "range_mapper.h" @@ -60,6 +61,7 @@ namespace detail { static command_graph& get_cdag(runtime& rt) { return *rt.m_cdag; } static std::string print_task_graph(runtime& rt) { return detail::print_task_graph(*rt.m_task_recorder); } static std::string print_command_graph(const node_id local_nid, runtime& rt) { return detail::print_command_graph(local_nid, *rt.m_command_recorder); } + static abstract_block_chain& get_divergence_block_chain(runtime& rt) { return *rt.m_divergence_check; } }; struct task_ring_buffer_testspy { @@ -109,6 +111,14 @@ namespace detail { } return false; } + + struct abstract_block_chain_testspy { + static bool call_check_for_divergence(abstract_block_chain& div_test) { return div_test.check_for_divergence(); } + + static void set_last_cleared(abstract_block_chain& div_test, std::chrono::time_point time) { + div_test.m_last_cleared = time; + } + }; } // namespace detail namespace test_utils {