Skip to content

Commit

Permalink
[RM] Durante feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
fknorr committed Oct 28, 2024
1 parent 784bdd9 commit 3bff234
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 20 deletions.
10 changes: 6 additions & 4 deletions include/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

#include "types.h"

#include <algorithm>
#include <deque>
#include <memory>
#include <vector>

namespace celerity::detail {

/// A `graph` keeps ownership of all graph nodes that have not been pruned by epoch or horizon application.
/// An `epoch_partitioned_graph` keeps ownership of all graph nodes that have not been pruned by epoch or horizon application.
template <typename Node, typename NodeIdLess>
class epoch_partitioned_graph {
friend struct graph_testspy;
Expand All @@ -28,8 +30,8 @@ class epoch_partitioned_graph {
}

// Free all graph nodes that were pushed before begin_epoch(tid) was called.
void prune_before_epoch(const task_id tid) {
const auto first_retained = std::partition_point(m_epochs.begin(), m_epochs.end(), [=](const graph_epoch& epoch) { return epoch.epoch_tid < tid; });
void delete_before_epoch(const task_id tid) {
const auto first_retained = std::find_if(m_epochs.begin(), m_epochs.end(), [=](const graph_epoch& epoch) { return epoch.epoch_tid >= tid; });
assert(first_retained != m_epochs.end() && first_retained->epoch_tid == tid);
m_epochs.erase(m_epochs.begin(), first_retained);
}
Expand All @@ -40,7 +42,7 @@ class epoch_partitioned_graph {
std::vector<std::unique_ptr<Node>> nodes; // graph node pointers are stable, so it is safe to hand them to another thread
};

std::vector<graph_epoch> m_epochs;
std::deque<graph_epoch> m_epochs;
};

} // namespace celerity::detail
14 changes: 4 additions & 10 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,8 @@ namespace detail {
auto unique_tsk = invoke_command_group_function(tid, m_num_collective_nodes, std::forward<CGF>(cgf));
auto& tsk = register_task_internal(std::move(unique_tsk));
compute_dependencies(tsk);

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots,
// so that we can potentially reclaim additional resources such as buffers earlier
m_task_graph.prune_before_epoch(m_latest_epoch_reached.get());

invoke_callbacks(&tsk);

if(need_new_horizon()) { generate_horizon_task(); }

++m_num_user_command_groups_submitted;
return tid;
}
Expand Down Expand Up @@ -168,13 +160,15 @@ namespace detail {
task* last_side_effect = nullptr;
};

static constexpr task_id initial_epoch_task = 0;

delegate* m_delegate;

const size_t m_num_collective_nodes;
policy_set m_policy;

task_graph m_task_graph;
task_id m_next_tid = 0;
task_id m_next_tid = initial_epoch_task;

// The active epoch is used as the last writer for host-initialized buffers.
// This is useful so we can correctly generate anti-dependencies onto tasks that read host-initialized buffers.
Expand Down Expand Up @@ -212,7 +206,7 @@ namespace detail {
std::optional<task_id> m_latest_horizon_reached;

// The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread.
epoch_monitor m_latest_epoch_reached{0};
epoch_monitor m_latest_epoch_reached{initial_epoch_task};

// Track the number of user-generated task and epochs to heuristically detect programs that lose performance by frequently calling `queue::wait()`.
size_t m_num_user_command_groups_submitted = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ namespace detail {
// The cggen automatically prunes the CDAG on generation, which is safe because commands are not shared across threads.
// We might want to refactor this to match the IDAG behavior in the future.
CELERITY_DETAIL_TRACY_ZONE_SCOPED("scheduler::prune_idag", Gray);
m_idag->prune_before_epoch(e.tid);
m_idag->delete_before_epoch(e.tid);
}

// The scheduler will receive the shutdown-epoch completion event via the runtime even if executor destruction has already begun.
Expand Down
11 changes: 6 additions & 5 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace detail {
// m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid);
assert(epoch_tid == 0 || m_latest_epoch_reached.get() < epoch_tid);
assert(epoch_tid == initial_epoch_task || m_latest_epoch_reached.get() < epoch_tid);

m_latest_epoch_reached.set(epoch_tid);
m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
Expand Down Expand Up @@ -177,7 +177,7 @@ namespace detail {

// Tasks without any other true-dependency must depend on the last epoch to ensure they cannot be re-ordered before the epoch
// Exception is the initial epoch, which is the only TDAG node without a predecessor.
assert(m_epoch_for_new_tasks != nullptr || tsk.get_type() == task_type::epoch);
assert(m_epoch_for_new_tasks != nullptr || (tsk.get_type() == task_type::epoch && tsk.get_epoch_action() == epoch_action::init));
if(m_epoch_for_new_tasks != nullptr) {
if(const auto deps = tsk.get_dependencies();
std::none_of(deps.begin(), deps.end(), [](const task::dependency d) { return d.kind == dependency_kind::true_dep; })) {
Expand All @@ -188,11 +188,12 @@ namespace detail {

task& task_manager::register_task_internal(std::unique_ptr<task> task) {
// register_task_internal() is called for all task types, so we use this location to assert that the init epoch is submitted first and exactly once
assert((task->get_id() == 0) == (task->get_type() == task_type::epoch && task->get_epoch_action() == epoch_action::init)
assert((task->get_id() == initial_epoch_task) == (task->get_type() == task_type::epoch && task->get_epoch_action() == epoch_action::init)
&& "first task submitted is not an init epoch, or init epoch is not the first task submitted");

auto& task_ref = *task;
assert(task != nullptr);
m_task_graph.delete_before_epoch(m_latest_epoch_reached.get());
m_task_graph.append(std::move(task));
m_execution_front.insert(&task_ref);
return task_ref;
Expand Down Expand Up @@ -234,8 +235,8 @@ namespace detail {
void task_manager::set_epoch_for_new_tasks(task* const epoch) {
// apply the new epoch to buffers_last_writers and last_collective_tasks data structs
for(auto& [_, buffer] : m_buffers) {
buffer.last_writers.apply_to_values([epoch](task* const tsk) {
if(tsk == nullptr) return tsk;
buffer.last_writers.apply_to_values([epoch](task* const tsk) -> task* {
if(tsk == nullptr) return nullptr;
return tsk->get_id() < epoch->get_id() ? epoch : tsk;
});
}
Expand Down

0 comments on commit 3bff234

Please sign in to comment.