-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added celerity blockchain for task divergence checking
- Loading branch information
Showing
18 changed files
with
929 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
#pragma once | ||
|
||
#include "recorders.h" | ||
#include <mutex> | ||
#include <thread> | ||
#include <vector> | ||
|
||
namespace celerity::detail { | ||
// in c++23 replace this with mdspan | ||
template <typename T> | ||
struct mpi_multidim_send_wrapper { | ||
public: | ||
const T& operator[](std::pair<int, int> ij) const { | ||
assert(ij.first * m_width + ij.second < m_data.size()); | ||
return m_data[ij.first * m_width + ij.second]; | ||
} | ||
|
||
T* data() { return m_data.data(); } | ||
|
||
mpi_multidim_send_wrapper(size_t width, size_t height) : m_data(width * height), m_width(width){}; | ||
|
||
private: | ||
std::vector<T> m_data; | ||
const size_t m_width; | ||
}; | ||
|
||
// Probably replace this in c++20 with span | ||
template <typename T> | ||
struct window { | ||
public: | ||
window(const std::vector<T>& value) : m_value(value) {} | ||
|
||
const T& operator[](size_t i) const { | ||
assert(i >= 0 && i < m_width); | ||
return m_value[m_offset + i]; | ||
} | ||
|
||
size_t size() { | ||
m_width = m_value.size() - m_offset; | ||
return m_width; | ||
} | ||
|
||
void slide(size_t i) { | ||
assert(i == 0 || (i >= 0 && i <= m_width)); | ||
m_offset += i; | ||
m_width -= i; | ||
} | ||
|
||
private: | ||
const std::vector<T>& m_value; | ||
size_t m_offset = 0; | ||
size_t m_width = 0; | ||
}; | ||
|
||
using task_hash = size_t; | ||
using task_hash_data = mpi_multidim_send_wrapper<task_hash>; | ||
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>; | ||
|
||
class abstract_block_chain { | ||
friend struct abstract_block_chain_testspy; | ||
|
||
public: | ||
virtual void stop() { m_is_running = false; }; | ||
|
||
abstract_block_chain(const abstract_block_chain&) = delete; | ||
abstract_block_chain& operator=(const abstract_block_chain&) = delete; | ||
abstract_block_chain& operator=(abstract_block_chain&&) = delete; | ||
|
||
abstract_block_chain(abstract_block_chain&&) = default; | ||
|
||
abstract_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm) | ||
: m_local_nid(local_nid), m_num_nodes(num_nodes), m_sizes(num_nodes), m_task_recorder_window(task_recorder), m_comm(comm) {} | ||
|
||
virtual ~abstract_block_chain() = default; | ||
|
||
protected: | ||
void start() { m_is_running = true; }; | ||
|
||
virtual void run() = 0; | ||
|
||
virtual void divergence_out(const divergence_map& check_map, const int task_num) = 0; | ||
|
||
void add_new_hashes(); | ||
void clear(const int min_progress); | ||
virtual void allgather_sizes(); | ||
virtual void allgather_hashes(const int max_size, task_hash_data& data); | ||
std::pair<int, int> collect_sizes(); | ||
task_hash_data collect_hashes(const int max_size); | ||
divergence_map create_check_map(const task_hash_data& task_graphs, const int task_num) const; | ||
|
||
void check_for_deadlock() const; | ||
|
||
static void print_node_divergences(const divergence_map& check_map, const int task_num); | ||
|
||
static void print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash); | ||
|
||
virtual void dedub_print_task_record(const divergence_map& check_map, const int task_num) const; | ||
|
||
bool check_for_divergence(); | ||
|
||
protected: | ||
node_id m_local_nid; | ||
size_t m_num_nodes; | ||
|
||
std::vector<task_hash> m_hashes; | ||
std::vector<int> m_sizes; | ||
|
||
bool m_is_running = true; | ||
|
||
window<task_record> m_task_recorder_window; | ||
|
||
std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now(); | ||
|
||
MPI_Comm m_comm; | ||
}; | ||
|
||
class single_node_test_divergence_block_chain : public abstract_block_chain { | ||
public: | ||
single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm, | ||
const std::vector<std::reference_wrapper<const std::vector<task_record>>>& other_task_records) | ||
: abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) { | ||
for(auto& tsk_rcd : other_task_records) { | ||
m_other_task_records.push_back(window<task_record>(tsk_rcd)); | ||
} | ||
} | ||
|
||
private: | ||
void run() override {} | ||
|
||
void divergence_out(const divergence_map& check_map, const int task_num) override; | ||
void allgather_sizes() override; | ||
void allgather_hashes(const int max_size, task_hash_data& data) override; | ||
|
||
void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override; | ||
|
||
std::vector<std::vector<task_hash>> m_other_hashes; | ||
std::vector<window<task_record>> m_other_task_records; | ||
|
||
int m_injected_delete_size = 0; | ||
}; | ||
|
||
class distributed_test_divergence_block_chain : public abstract_block_chain { | ||
public: | ||
distributed_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm) | ||
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {} | ||
|
||
private: | ||
void run() override {} | ||
|
||
void divergence_out(const divergence_map& check_map, const int task_num) override; | ||
}; | ||
|
||
class divergence_block_chain : public abstract_block_chain { | ||
public: | ||
void start(); | ||
void stop() override; | ||
|
||
divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm) | ||
: abstract_block_chain(num_nodes, local_nid, task_record, comm) { | ||
divergence_block_chain::start(); | ||
} | ||
|
||
divergence_block_chain(const divergence_block_chain&) = delete; | ||
divergence_block_chain& operator=(const divergence_block_chain&) = delete; | ||
divergence_block_chain& operator=(divergence_block_chain&&) = delete; | ||
|
||
divergence_block_chain(divergence_block_chain&&) = default; | ||
|
||
~divergence_block_chain() override { divergence_block_chain::stop(); } | ||
|
||
private: | ||
void run() override; | ||
|
||
void divergence_out(const divergence_map& check_map, const int task_num) override; | ||
|
||
private: | ||
std::thread m_thread; | ||
}; | ||
} // namespace celerity::detail |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.