diff --git a/include/out_of_order_engine.h b/include/out_of_order_engine.h index 176c3ac47..b60862472 100644 --- a/include/out_of_order_engine.h +++ b/include/out_of_order_engine.h @@ -2,6 +2,7 @@ #include #include +#include namespace celerity::detail::out_of_order_engine_detail { struct engine_impl; @@ -67,6 +68,10 @@ class out_of_order_engine { /// True when all submitted instructions have completed. bool is_idle() const; + /// Returns the set of all current incomplete instructions that do not have incomplete predecessors themselves, i.e. that are actively executing from the + /// out-of-order engine's perspective, not merely eagerly assigned. + const std::unordered_set& get_execution_front() const; + /// Returns the number of instructions currently awaiting normal or eager assignment (diagnostic use only). size_t get_assignment_queue_length() const; diff --git a/src/live_executor.cc b/src/live_executor.cc index e01209275..bd410b2e5 100644 --- a/src/live_executor.cc +++ b/src/live_executor.cc @@ -313,7 +313,6 @@ struct boundary_check_info { struct async_instruction_state { - instruction_id iid = -1; allocation_id alloc_aid = null_allocation_id; ///< non-null iff instruction is an alloc_instruction async_event event; CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(std::unique_ptr oob_info;) // unique_ptr: oob_info is optional and rather large @@ -331,7 +330,7 @@ struct executor_impl { out_of_order_engine engine{backend->get_system_info()}; bool expecting_more_submissions = true; ///< shutdown epoch has not been executed yet - std::vector in_flight_async_instructions; + std::unordered_map in_flight_async_instructions; std::unordered_map allocations{{null_allocation_id, nullptr}}; ///< obtained from alloc_instruction or track_user_allocation std::unordered_map> host_object_instances; ///< passed in through track_host_object_instance std::unordered_map> cloned_communicators; ///< transitive clones of root_communicator @@ -350,7 +349,7 @@ struct executor_impl { void poll_in_flight_async_instructions(); void poll_submission_queue(); void try_issue_one_instruction(); - void retire_async_instruction(async_instruction_state& async); + void retire_async_instruction(instruction_id iid, async_instruction_state& async); void check_progress(); // Instruction types that complete synchronously within the executor. @@ -437,12 +436,16 @@ void executor_impl::run() { } void executor_impl::poll_in_flight_async_instructions() { - utils::erase_if(in_flight_async_instructions, [&](async_instruction_state& async) { - if(!async.event.is_complete()) return false; - retire_async_instruction(async); + // collect completed instruction ids up-front, since retire_async_instruction would alter the execution front + std::vector completed_now; // std::vector because it will be empty in the common case + for(const auto iid : engine.get_execution_front()) { + if(in_flight_async_instructions.at(iid).event.is_complete()) { completed_now.push_back(iid); } + } + for(const auto iid : completed_now) { + retire_async_instruction(iid, in_flight_async_instructions.at(iid)); + in_flight_async_instructions.erase(iid); made_progress = true; - return true; - }); + } CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assigned_instructions_plot.update(in_flight_async_instructions.size())); } @@ -478,12 +481,12 @@ void executor_impl::poll_submission_queue() { } } -void executor_impl::retire_async_instruction(async_instruction_state& async) { +void executor_impl::retire_async_instruction(const instruction_id iid, async_instruction_state& async) { CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::retire", Brown); #if CELERITY_ACCESSOR_BOUNDARY_CHECK if(async.oob_info != nullptr) { - CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("executor::oob_check", Red, "I{} bounds check", async.iid); + CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("executor::oob_check", Red, "I{} bounds check", iid); const auto& oob_info = *async.oob_info; for(size_t i = 0; i < oob_info.accessors.size(); ++i) { if(const auto oob_box = oob_info.illegal_access_bounding_boxes[i].into_box(); !oob_box.empty()) { @@ -502,9 +505,9 @@ void executor_impl::retire_async_instruction(async_instruction_state& async) { if(spdlog::should_log(spdlog::level::trace)) { if(const auto native_time = async.event.get_native_execution_time(); native_time.has_value()) { - CELERITY_TRACE("[executor] retired I{} after {:.2f}", async.iid, as_sub_second(*native_time)); + CELERITY_TRACE("[executor] retired I{} after {:.2f}", iid, as_sub_second(*native_time)); } else { - CELERITY_TRACE("[executor] retired I{}", async.iid); + CELERITY_TRACE("[executor] retired I{}", iid); } } @@ -518,7 +521,7 @@ void executor_impl::retire_async_instruction(async_instruction_state& async) { allocations.emplace(async.alloc_aid, ptr); } - engine.complete_assigned(async.iid); + engine.complete_assigned(iid); CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length())); } @@ -565,8 +568,7 @@ auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assi CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_integration::instruction_info info); CELERITY_DETAIL_IF_TRACY_ENABLED(info = tracy_integration::make_instruction_info(instr)); - auto& async = in_flight_async_instructions.emplace_back(); - async.iid = assignment.instruction->get_id(); + auto& async = in_flight_async_instructions.emplace(assignment.instruction->get_id(), async_instruction_state{}).first->second; issue_async(instr, assignment, async); // stores event in `async` and completes asynchronously // instr may now dangle @@ -599,9 +601,9 @@ void executor_impl::check_progress() { const auto elapsed_since_last_progress = std::chrono::steady_clock::now() - *last_progress_timestamp; if(elapsed_since_last_progress > *policy.progress_warning_timeout && !progress_warning_emitted) { std::string instr_list; - for(auto& in_flight : in_flight_async_instructions) { + for(auto& [iid, async] : in_flight_async_instructions) { if(!instr_list.empty()) instr_list += ", "; - fmt::format_to(std::back_inserter(instr_list), "I{}", in_flight.iid); + fmt::format_to(std::back_inserter(instr_list), "I{}", iid); } CELERITY_WARN("[executor] no progress for {:.0f}, might be stuck. Active instructions: {}", as_sub_second(elapsed_since_last_progress), in_flight_async_instructions.empty() ? "none" : instr_list); diff --git a/src/out_of_order_engine.cc b/src/out_of_order_engine.cc index 5828b081f..c6d0a6344 100644 --- a/src/out_of_order_engine.cc +++ b/src/out_of_order_engine.cc @@ -94,6 +94,9 @@ struct engine_impl { /// is assumed to have completed earlier (triggering its removal from the map). std::unordered_map incomplete_instructions; + /// The subset of assigned, incomplete_instructions that to not have incomplete predecessors themselves. + std::unordered_set execution_front; + /// Queue of all instructions in `conditional_eagerly_assignable_state` and `unconditional_assignable_state`, in decreasing order of instruction priority. std::priority_queue, instruction_priority_less> assignment_queue; @@ -315,6 +318,8 @@ void engine_impl::complete(const instruction_id iid) { auto deleted_node = std::move(node_it->second); // move so we can access members / iterate successors after erasure incomplete_instructions.erase(node_it); + execution_front.erase(iid); + auto& was_assigned = std::get(deleted_node.assignment); if(deleted_node.target == target::host_queue || deleted_node.target == target::device_queue) { // "remove" instruction from assigned lane @@ -333,7 +338,11 @@ void engine_impl::complete(const instruction_id iid) { auto& successor = succ_it->second; assert(successor.num_incomplete_predecessors > 0); --successor.num_incomplete_predecessors; - try_mark_for_assignment(successor); + if(!std::holds_alternative(successor.assignment)) { + try_mark_for_assignment(successor); + } else if(successor.num_incomplete_predecessors == 0) { + execution_front.insert(succ_iid); + } } } } @@ -438,7 +447,9 @@ std::optional engine_impl::assign_one() { } } - return assignment{node.instr, node.target, assigned.device, assigned.lane}; + if(node.num_incomplete_predecessors == 0) { execution_front.insert(node.instr->get_id()); } + + return assignment(node.instr, node.target, assigned.device, assigned.lane); } } // namespace celerity::detail::out_of_order_engine_detail @@ -448,6 +459,7 @@ namespace celerity::detail { out_of_order_engine::out_of_order_engine(const system_info& system) : m_impl(new out_of_order_engine_detail::engine_impl(system)) {} out_of_order_engine::~out_of_order_engine() = default; bool out_of_order_engine::is_idle() const { return m_impl->is_idle(); } +const std::unordered_set& out_of_order_engine::get_execution_front() const { return m_impl->execution_front; } size_t out_of_order_engine::get_assignment_queue_length() const { return m_impl->assignment_queue.size(); } void out_of_order_engine::submit(const instruction* const instr) { m_impl->submit(instr); } void out_of_order_engine::complete_assigned(const instruction_id iid) { m_impl->complete(iid); } diff --git a/test/out_of_order_engine_tests.cc b/test/out_of_order_engine_tests.cc index 147526237..1c0cec985 100644 --- a/test/out_of_order_engine_tests.cc +++ b/test/out_of_order_engine_tests.cc @@ -176,6 +176,15 @@ class out_of_order_test_context { bool is_idle() const { return m_engine.is_idle(); } + std::unordered_set get_execution_front() const { + const auto& iid_front = m_engine.get_execution_front(); + std::unordered_set instr_front; + for(const auto& instr : m_instrs) { + if(iid_front.contains(instr->get_id())) { instr_front.insert(instr.get()); } + } + return instr_front; + } + private: instruction_id m_next_iid = 0; std::vector> m_instrs; @@ -203,6 +212,8 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out const auto h1 = octx.host_task({h0}); const auto h2 = octx.host_task({h0, h1}); + CHECK(octx.get_execution_front().empty()); + { const auto iq = octx.assign_all(); CHECK(iq.assigned_instructions_are({d0_k0, d0_k1, d1_k0, d1_k1})); @@ -215,6 +226,8 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out CHECK(iq.assigned_concurrently({d0_k0, d0_k1}, {d1_k0, d1_k1})); } + CHECK(octx.get_execution_front() == std::unordered_set{d0_k0, d1_k0}); + octx.complete(d0_k0); octx.complete(d0_k1); octx.complete(d1_k0); @@ -224,13 +237,18 @@ TEST_CASE("out_of_order_engine schedules independent chains concurrently", "[out CHECK(iq.assigned_instructions_are({})); } + CHECK(octx.get_execution_front() == std::unordered_set{d1_k1}); + octx.complete(d1_k1); + CHECK(octx.get_execution_front().empty()); { const auto iq = octx.assign_all(); CHECK(iq.assigned_instructions_are({h0, h1, h2})); CHECK(iq.assigned_in_order({h0, h1, h2})); } + + CHECK(octx.get_execution_front() == std::unordered_set{h0}); } TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of their dependencies", "[out_of_order_engine]") { @@ -247,6 +265,8 @@ TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of CHECK_FALSE(octx.is_idle()); + CHECK(octx.get_execution_front().empty()); + { const auto iq = octx.assign_all(); CHECK(iq.assigned_instructions_are({d0_k0, d1_k0, d2_k0, d3_k0, copy_dep0, copy_dep1, copy_dep2, copy_dep3})); @@ -260,11 +280,18 @@ TEST_CASE("out_of_order_engine eagerly assigns copy-instructions to the lanes of CHECK(iq.assigned_in_order({d3_k0, copy_dep3})); } + CHECK(octx.get_execution_front() == std::unordered_set{d0_k0, d1_k0, d2_k0, d3_k0}); + octx.complete(d0_k0); octx.complete(d1_k0); + + CHECK(octx.get_execution_front() == std::unordered_set{d2_k0, d3_k0, copy_dep0, copy_dep1}); + octx.complete(d2_k0); octx.complete(d3_k0); + CHECK_FALSE(octx.is_idle()); + CHECK(octx.get_execution_front() == std::unordered_set{copy_dep0, copy_dep1, copy_dep2, copy_dep3}); const auto copy_indep0 = octx.copy({d0_k0 /* already complete */}, first_device_memory_id + 1, first_device_memory_id); const auto copy_indep1 = octx.copy({d1_k0 /* already complete */}, first_device_memory_id, first_device_memory_id + 1); @@ -361,17 +388,23 @@ TEST_CASE("out_of_order_engine does not attempt to assign instructions more than out_of_order_test_context octx(1); auto k1 = octx.device_kernel({}, device_id(0)); auto k2 = octx.device_kernel({k1}, device_id(0)); + CHECK(octx.get_execution_front().empty()); const auto assigned = octx.assign_one(); REQUIRE(assigned.has_value()); CHECK(assigned->instruction == k1); + CHECK(octx.get_execution_front() == std::unordered_set{k1}); octx.complete(k1); + CHECK(octx.get_execution_front().empty()); + const auto other = octx.assign_all(); CHECK(other.assigned_instructions_are({k2})); + CHECK(octx.get_execution_front() == std::unordered_set{k2}); octx.complete(k2); + CHECK(octx.get_execution_front().empty()); CHECK(octx.is_idle()); } @@ -381,15 +414,19 @@ TEST_CASE("assigned sets of instructions with internal dependencies can be compl out_of_order_test_context octx(1); auto k1 = octx.device_kernel({}, device_id(0)); auto k2 = octx.device_kernel({k1}, device_id(0)); + CHECK(octx.get_execution_front().empty()); octx.assign_all(); CHECK_FALSE(octx.is_idle()); + CHECK(octx.get_execution_front() == std::unordered_set{k1}); octx.complete(k2); CHECK_FALSE(octx.is_idle()); + CHECK(octx.get_execution_front() == std::unordered_set{k1}); octx.complete(k1); CHECK(octx.is_idle()); + CHECK(octx.get_execution_front().empty()); } TEST_CASE("concurrent instructions are assigned in decreasing priority", "[out_of_order_engine]") {