From 3bff2340abc1e4a49a63c2ba2e05ba86a3c391fc Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Mon, 28 Oct 2024 13:22:12 +0100 Subject: [PATCH] [RM] Durante feedback --- include/graph.h | 10 ++++++---- include/task_manager.h | 14 ++++---------- src/scheduler.cc | 2 +- src/task_manager.cc | 11 ++++++----- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/include/graph.h b/include/graph.h index 3dfab999a..05050ab4b 100644 --- a/include/graph.h +++ b/include/graph.h @@ -2,12 +2,14 @@ #include "types.h" +#include +#include #include #include namespace celerity::detail { -/// A `graph` keeps ownership of all graph nodes that have not been pruned by epoch or horizon application. +/// An `epoch_partitioned_graph` keeps ownership of all graph nodes that have not been pruned by epoch or horizon application. template class epoch_partitioned_graph { friend struct graph_testspy; @@ -28,8 +30,8 @@ class epoch_partitioned_graph { } // Free all graph nodes that were pushed before begin_epoch(tid) was called. - void prune_before_epoch(const task_id tid) { - const auto first_retained = std::partition_point(m_epochs.begin(), m_epochs.end(), [=](const graph_epoch& epoch) { return epoch.epoch_tid < tid; }); + void delete_before_epoch(const task_id tid) { + const auto first_retained = std::find_if(m_epochs.begin(), m_epochs.end(), [=](const graph_epoch& epoch) { return epoch.epoch_tid >= tid; }); assert(first_retained != m_epochs.end() && first_retained->epoch_tid == tid); m_epochs.erase(m_epochs.begin(), first_retained); } @@ -40,7 +42,7 @@ class epoch_partitioned_graph { std::vector> nodes; // graph node pointers are stable, so it is safe to hand them to another thread }; - std::vector m_epochs; + std::deque m_epochs; }; } // namespace celerity::detail diff --git a/include/task_manager.h b/include/task_manager.h index fef255c9c..6145d1cf2 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -85,16 +85,8 @@ namespace detail { auto unique_tsk = invoke_command_group_function(tid, m_num_collective_nodes, std::forward(cgf)); auto& tsk = register_task_internal(std::move(unique_tsk)); compute_dependencies(tsk); - - // the following deletion is intentionally redundant with the one happening when waiting for free task slots - // we want to free tasks earlier than just when running out of slots, - // so that we can potentially reclaim additional resources such as buffers earlier - m_task_graph.prune_before_epoch(m_latest_epoch_reached.get()); - invoke_callbacks(&tsk); - if(need_new_horizon()) { generate_horizon_task(); } - ++m_num_user_command_groups_submitted; return tid; } @@ -168,13 +160,15 @@ namespace detail { task* last_side_effect = nullptr; }; + static constexpr task_id initial_epoch_task = 0; + delegate* m_delegate; const size_t m_num_collective_nodes; policy_set m_policy; task_graph m_task_graph; - task_id m_next_tid = 0; + task_id m_next_tid = initial_epoch_task; // The active epoch is used as the last writer for host-initialized buffers. // This is useful so we can correctly generate anti-dependencies onto tasks that read host-initialized buffers. @@ -212,7 +206,7 @@ namespace detail { std::optional m_latest_horizon_reached; // The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread. - epoch_monitor m_latest_epoch_reached{0}; + epoch_monitor m_latest_epoch_reached{initial_epoch_task}; // Track the number of user-generated task and epochs to heuristically detect programs that lose performance by frequently calling `queue::wait()`. size_t m_num_user_command_groups_submitted = 0; diff --git a/src/scheduler.cc b/src/scheduler.cc index 3335474f0..98d1bb874 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -97,7 +97,7 @@ namespace detail { // The cggen automatically prunes the CDAG on generation, which is safe because commands are not shared across threads. // We might want to refactor this to match the IDAG behavior in the future. CELERITY_DETAIL_TRACY_ZONE_SCOPED("scheduler::prune_idag", Gray); - m_idag->prune_before_epoch(e.tid); + m_idag->delete_before_epoch(e.tid); } // The scheduler will receive the shutdown-epoch completion event via the runtime even if executor destruction has already begun. diff --git a/src/task_manager.cc b/src/task_manager.cc index cbaf59045..48def28c5 100644 --- a/src/task_manager.cc +++ b/src/task_manager.cc @@ -45,7 +45,7 @@ namespace detail { // m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized. assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid); - assert(epoch_tid == 0 || m_latest_epoch_reached.get() < epoch_tid); + assert(epoch_tid == initial_epoch_task || m_latest_epoch_reached.get() < epoch_tid); m_latest_epoch_reached.set(epoch_tid); m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself @@ -177,7 +177,7 @@ namespace detail { // Tasks without any other true-dependency must depend on the last epoch to ensure they cannot be re-ordered before the epoch // Exception is the initial epoch, which is the only TDAG node without a predecessor. - assert(m_epoch_for_new_tasks != nullptr || tsk.get_type() == task_type::epoch); + assert(m_epoch_for_new_tasks != nullptr || (tsk.get_type() == task_type::epoch && tsk.get_epoch_action() == epoch_action::init)); if(m_epoch_for_new_tasks != nullptr) { if(const auto deps = tsk.get_dependencies(); std::none_of(deps.begin(), deps.end(), [](const task::dependency d) { return d.kind == dependency_kind::true_dep; })) { @@ -188,11 +188,12 @@ namespace detail { task& task_manager::register_task_internal(std::unique_ptr task) { // register_task_internal() is called for all task types, so we use this location to assert that the init epoch is submitted first and exactly once - assert((task->get_id() == 0) == (task->get_type() == task_type::epoch && task->get_epoch_action() == epoch_action::init) + assert((task->get_id() == initial_epoch_task) == (task->get_type() == task_type::epoch && task->get_epoch_action() == epoch_action::init) && "first task submitted is not an init epoch, or init epoch is not the first task submitted"); auto& task_ref = *task; assert(task != nullptr); + m_task_graph.delete_before_epoch(m_latest_epoch_reached.get()); m_task_graph.append(std::move(task)); m_execution_front.insert(&task_ref); return task_ref; @@ -234,8 +235,8 @@ namespace detail { void task_manager::set_epoch_for_new_tasks(task* const epoch) { // apply the new epoch to buffers_last_writers and last_collective_tasks data structs for(auto& [_, buffer] : m_buffers) { - buffer.last_writers.apply_to_values([epoch](task* const tsk) { - if(tsk == nullptr) return tsk; + buffer.last_writers.apply_to_values([epoch](task* const tsk) -> task* { + if(tsk == nullptr) return nullptr; return tsk->get_id() < epoch->get_id() ? epoch : tsk; }); }