Skip to content

Commit

Permalink
Only poll events of instructions that are actively executing
Browse files Browse the repository at this point in the history
  • Loading branch information
fknorr committed Oct 17, 2024
1 parent 784e789 commit cfd3d53
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 19 deletions.
5 changes: 5 additions & 0 deletions include/out_of_order_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>
#include <optional>
#include <unordered_set>

namespace celerity::detail::out_of_order_engine_detail {
struct engine_impl;
Expand Down Expand Up @@ -67,6 +68,10 @@ class out_of_order_engine {
/// True when all submitted instructions have completed.
bool is_idle() const;

/// Returns the set of all current incomplete instructions that do not have incomplete predecessors themselves, i.e. that are actively executing from the
/// out-of-order engine's perspective, not merely eagerly assigned.
const std::unordered_set<instruction_id>& get_execution_front() const;

/// Returns the number of instructions currently awaiting normal or eager assignment (diagnostic use only).
size_t get_assignment_queue_length() const;

Expand Down
36 changes: 19 additions & 17 deletions src/live_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ struct boundary_check_info {


struct async_instruction_state {
instruction_id iid = -1;
allocation_id alloc_aid = null_allocation_id; ///< non-null iff instruction is an alloc_instruction
async_event event;
CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(std::unique_ptr<boundary_check_info> oob_info;) // unique_ptr: oob_info is optional and rather large
Expand All @@ -331,7 +330,7 @@ struct executor_impl {
out_of_order_engine engine{backend->get_system_info()};

bool expecting_more_submissions = true; ///< shutdown epoch has not been executed yet
std::vector<async_instruction_state> in_flight_async_instructions;
std::unordered_map<instruction_id, async_instruction_state> in_flight_async_instructions;
std::unordered_map<allocation_id, void*> allocations{{null_allocation_id, nullptr}}; ///< obtained from alloc_instruction or track_user_allocation
std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances; ///< passed in through track_host_object_instance
std::unordered_map<collective_group_id, std::unique_ptr<communicator>> cloned_communicators; ///< transitive clones of root_communicator
Expand All @@ -350,7 +349,7 @@ struct executor_impl {
void poll_in_flight_async_instructions();
void poll_submission_queue();
void try_issue_one_instruction();
void retire_async_instruction(async_instruction_state& async);
void retire_async_instruction(instruction_id iid, async_instruction_state& async);
void check_progress();

// Instruction types that complete synchronously within the executor.
Expand Down Expand Up @@ -437,12 +436,16 @@ void executor_impl::run() {
}

void executor_impl::poll_in_flight_async_instructions() {
utils::erase_if(in_flight_async_instructions, [&](async_instruction_state& async) {
if(!async.event.is_complete()) return false;
retire_async_instruction(async);
// collect completed instruction ids up-front, since retire_async_instruction would alter the execution front
std::vector<instruction_id> completed_now; // std::vector because it will be empty in the common case
for(const auto iid : engine.get_execution_front()) {
if(in_flight_async_instructions.at(iid).event.is_complete()) { completed_now.push_back(iid); }
}
for(const auto iid : completed_now) {
retire_async_instruction(iid, in_flight_async_instructions.at(iid));
in_flight_async_instructions.erase(iid);
made_progress = true;
return true;
});
}

CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assigned_instructions_plot.update(in_flight_async_instructions.size()));
}
Expand Down Expand Up @@ -478,12 +481,12 @@ void executor_impl::poll_submission_queue() {
}
}

void executor_impl::retire_async_instruction(async_instruction_state& async) {
void executor_impl::retire_async_instruction(const instruction_id iid, async_instruction_state& async) {
CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::retire", Brown);

#if CELERITY_ACCESSOR_BOUNDARY_CHECK
if(async.oob_info != nullptr) {
CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("executor::oob_check", Red, "I{} bounds check", async.iid);
CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("executor::oob_check", Red, "I{} bounds check", iid);
const auto& oob_info = *async.oob_info;
for(size_t i = 0; i < oob_info.accessors.size(); ++i) {
if(const auto oob_box = oob_info.illegal_access_bounding_boxes[i].into_box(); !oob_box.empty()) {
Expand All @@ -502,9 +505,9 @@ void executor_impl::retire_async_instruction(async_instruction_state& async) {

if(spdlog::should_log(spdlog::level::trace)) {
if(const auto native_time = async.event.get_native_execution_time(); native_time.has_value()) {
CELERITY_TRACE("[executor] retired I{} after {:.2f}", async.iid, as_sub_second(*native_time));
CELERITY_TRACE("[executor] retired I{} after {:.2f}", iid, as_sub_second(*native_time));
} else {
CELERITY_TRACE("[executor] retired I{}", async.iid);
CELERITY_TRACE("[executor] retired I{}", iid);
}
}

Expand All @@ -518,7 +521,7 @@ void executor_impl::retire_async_instruction(async_instruction_state& async) {
allocations.emplace(async.alloc_aid, ptr);
}

engine.complete_assigned(async.iid);
engine.complete_assigned(iid);

CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length()));
}
Expand Down Expand Up @@ -565,8 +568,7 @@ auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assi
CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_integration::instruction_info info);
CELERITY_DETAIL_IF_TRACY_ENABLED(info = tracy_integration::make_instruction_info(instr));

auto& async = in_flight_async_instructions.emplace_back();
async.iid = assignment.instruction->get_id();
auto& async = in_flight_async_instructions.emplace(assignment.instruction->get_id(), async_instruction_state{}).first->second;
issue_async(instr, assignment, async); // stores event in `async` and completes asynchronously
// instr may now dangle

Expand Down Expand Up @@ -599,9 +601,9 @@ void executor_impl::check_progress() {
const auto elapsed_since_last_progress = std::chrono::steady_clock::now() - *last_progress_timestamp;
if(elapsed_since_last_progress > *policy.progress_warning_timeout && !progress_warning_emitted) {
std::string instr_list;
for(auto& in_flight : in_flight_async_instructions) {
for(auto& [iid, async] : in_flight_async_instructions) {
if(!instr_list.empty()) instr_list += ", ";
fmt::format_to(std::back_inserter(instr_list), "I{}", in_flight.iid);
fmt::format_to(std::back_inserter(instr_list), "I{}", iid);
}
CELERITY_WARN("[executor] no progress for {:.0f}, might be stuck. Active instructions: {}", as_sub_second(elapsed_since_last_progress),
in_flight_async_instructions.empty() ? "none" : instr_list);
Expand Down
16 changes: 14 additions & 2 deletions src/out_of_order_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ struct engine_impl {
/// is assumed to have completed earlier (triggering its removal from the map).
std::unordered_map<instruction_id, incomplete_instruction_state> incomplete_instructions;

/// The subset of assigned, incomplete_instructions that to not have incomplete predecessors themselves.
std::unordered_set<instruction_id> execution_front;

/// Queue of all instructions in `conditional_eagerly_assignable_state` and `unconditional_assignable_state`, in decreasing order of instruction priority.
std::priority_queue<const instruction*, std::vector<const instruction*>, instruction_priority_less> assignment_queue;

Expand Down Expand Up @@ -315,6 +318,8 @@ void engine_impl::complete(const instruction_id iid) {
auto deleted_node = std::move(node_it->second); // move so we can access members / iterate successors after erasure
incomplete_instructions.erase(node_it);

execution_front.erase(iid);

auto& was_assigned = std::get<assigned_state>(deleted_node.assignment);
if(deleted_node.target == target::host_queue || deleted_node.target == target::device_queue) {
// "remove" instruction from assigned lane
Expand All @@ -333,7 +338,11 @@ void engine_impl::complete(const instruction_id iid) {
auto& successor = succ_it->second;
assert(successor.num_incomplete_predecessors > 0);
--successor.num_incomplete_predecessors;
try_mark_for_assignment(successor);
if(!std::holds_alternative<assigned_state>(successor.assignment)) {
try_mark_for_assignment(successor);
} else if(successor.num_incomplete_predecessors == 0) {
execution_front.insert(succ_iid);
}
}
}
}
Expand Down Expand Up @@ -438,7 +447,9 @@ std::optional<assignment> engine_impl::assign_one() {
}
}

return assignment{node.instr, node.target, assigned.device, assigned.lane};
if(node.num_incomplete_predecessors == 0) { execution_front.insert(node.instr->get_id()); }

return assignment(node.instr, node.target, assigned.device, assigned.lane);
}

} // namespace celerity::detail::out_of_order_engine_detail
Expand All @@ -448,6 +459,7 @@ namespace celerity::detail {
out_of_order_engine::out_of_order_engine(const system_info& system) : m_impl(new out_of_order_engine_detail::engine_impl(system)) {}
out_of_order_engine::~out_of_order_engine() = default;
bool out_of_order_engine::is_idle() const { return m_impl->is_idle(); }
const std::unordered_set<instruction_id>& out_of_order_engine::get_execution_front() const { return m_impl->execution_front; }
size_t out_of_order_engine::get_assignment_queue_length() const { return m_impl->assignment_queue.size(); }
void out_of_order_engine::submit(const instruction* const instr) { m_impl->submit(instr); }
void out_of_order_engine::complete_assigned(const instruction_id iid) { m_impl->complete(iid); }
Expand Down
37 changes: 37 additions & 0 deletions test/out_of_order_engine_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ class out_of_order_test_context {

bool is_idle() const { return m_engine.is_idle(); }

std::unordered_set<const instruction*> get_execution_front() const {
const auto& iid_front = m_engine.get_execution_front();
std::unordered_set<const instruction*> instr_front;
for(const auto& instr : m_instrs) {
if(iid_front.contains(instr->get_id())) { instr_front.insert(instr.get()); }
}
return instr_front;
}

private:
instruction_id m_next_iid = 0;
std::vector<std::unique_ptr<instruction>> m_instrs;
Expand Down Expand Up @@ -203,6 +212,8 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out
const auto h1 = octx.host_task({h0});
const auto h2 = octx.host_task({h0, h1});

CHECK(octx.get_execution_front().empty());

{
const auto iq = octx.assign_all();
CHECK(iq.assigned_instructions_are({d0_k0, d0_k1, d1_k0, d1_k1}));
Expand All @@ -215,6 +226,8 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out
CHECK(iq.assigned_concurrently({d0_k0, d0_k1}, {d1_k0, d1_k1}));
}

CHECK(octx.get_execution_front() == std::unordered_set{d0_k0, d1_k0});

octx.complete(d0_k0);
octx.complete(d0_k1);
octx.complete(d1_k0);
Expand All @@ -224,13 +237,18 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out
CHECK(iq.assigned_instructions_are({}));
}

CHECK(octx.get_execution_front() == std::unordered_set{d1_k1});

octx.complete(d1_k1);
CHECK(octx.get_execution_front().empty());

{
const auto iq = octx.assign_all();
CHECK(iq.assigned_instructions_are({h0, h1, h2}));
CHECK(iq.assigned_in_order({h0, h1, h2}));
}

CHECK(octx.get_execution_front() == std::unordered_set{h0});
}

TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of their dependencies", "[out_of_order_engine]") {
Expand All @@ -247,6 +265,8 @@ TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of

CHECK_FALSE(octx.is_idle());

CHECK(octx.get_execution_front().empty());

{
const auto iq = octx.assign_all();
CHECK(iq.assigned_instructions_are({d0_k0, d1_k0, d2_k0, d3_k0, copy_dep0, copy_dep1, copy_dep2, copy_dep3}));
Expand All @@ -260,11 +280,18 @@ TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of
CHECK(iq.assigned_in_order({d3_k0, copy_dep3}));
}

CHECK(octx.get_execution_front() == std::unordered_set{d0_k0, d1_k0, d2_k0, d3_k0});

octx.complete(d0_k0);
octx.complete(d1_k0);

CHECK(octx.get_execution_front() == std::unordered_set{d2_k0, d3_k0, copy_dep0, copy_dep1});

octx.complete(d2_k0);
octx.complete(d3_k0);

CHECK_FALSE(octx.is_idle());
CHECK(octx.get_execution_front() == std::unordered_set{copy_dep0, copy_dep1, copy_dep2, copy_dep3});

const auto copy_indep0 = octx.copy({d0_k0 /* already complete */}, first_device_memory_id + 1, first_device_memory_id);
const auto copy_indep1 = octx.copy({d1_k0 /* already complete */}, first_device_memory_id, first_device_memory_id + 1);
Expand Down Expand Up @@ -361,17 +388,23 @@ TEST_CASE("out_of_order_engine does not attempt to assign instructions more than
out_of_order_test_context octx(1);
auto k1 = octx.device_kernel({}, device_id(0));
auto k2 = octx.device_kernel({k1}, device_id(0));
CHECK(octx.get_execution_front().empty());

const auto assigned = octx.assign_one();
REQUIRE(assigned.has_value());
CHECK(assigned->instruction == k1);
CHECK(octx.get_execution_front() == std::unordered_set{k1});

octx.complete(k1);

CHECK(octx.get_execution_front().empty());

const auto other = octx.assign_all();
CHECK(other.assigned_instructions_are({k2}));
CHECK(octx.get_execution_front() == std::unordered_set{k2});

octx.complete(k2);
CHECK(octx.get_execution_front().empty());
CHECK(octx.is_idle());
}

Expand All @@ -381,15 +414,19 @@ TEST_CASE("assigned sets of instructions with internal dependencies can be compl
out_of_order_test_context octx(1);
auto k1 = octx.device_kernel({}, device_id(0));
auto k2 = octx.device_kernel({k1}, device_id(0));
CHECK(octx.get_execution_front().empty());

octx.assign_all();
CHECK_FALSE(octx.is_idle());
CHECK(octx.get_execution_front() == std::unordered_set{k1});

octx.complete(k2);
CHECK_FALSE(octx.is_idle());
CHECK(octx.get_execution_front() == std::unordered_set{k1});

octx.complete(k1);
CHECK(octx.is_idle());
CHECK(octx.get_execution_front().empty());
}

TEST_CASE("concurrent instructions are assigned in decreasing priority", "[out_of_order_engine]") {
Expand Down

0 comments on commit cfd3d53

Please sign in to comment.