Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler lookahead to elide buffer resizes #298

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- `handler::parallel_for(size_t, [size_t,] ...)` now acts as a shorthand for `parallel_for(range<1>, [id<1>,] ...)` (#288)
- Experimental support for the AdaptiveCpp generic single-pass compiler (#294)
- Constructor overloads to the `access::neighborhood` range mapper for reads in 3/5/7-point stencil codes (#292)
- The SYCL backend now uses per-device submission threads to dispatch commands for better performance.
- The SYCL backend now uses per-device submission threads to dispatch commands for better performance.
This new behaviour is enabled by default, and can be disabled via `CELERITY_BACKEND_DEVICE_SUBMISSION_THREADS` (#303)
- Celerity now has a thread pinning mechanism to control how threads are pinned to CPU cores.
- Celerity now has a thread pinning mechanism to control how threads are pinned to CPU cores.
This can be controlled via the `CELERITY_THREAD_PINNING` environment variable (#309)

### Changed
Expand All @@ -33,6 +33,8 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- On systems that do not support device-to-device copies, data is now staged in linearized buffers for better performance (#287)
- The `access::neighborhood` built-in range mapper now receives a `range` instead of a coordinate list (#292)
- Overhauled the [installation](docs/installation.md) and [configuration](docs/configuration.md) documentation (#309)
- Celerity will now queue up several command groups in order to combine allocations and elide resize operations.
This behavior can be influenced using the new `experimental::set_lookahead` and `experimental::flush` APIs (#298)

### Fixed

Expand Down
382 changes: 191 additions & 191 deletions ci/perf/gpuc2_bench.csv

Large diffs are not rendered by default.

384 changes: 192 additions & 192 deletions ci/perf/gpuc2_bench.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ specifically in development, debugging, and profiling scenarios:

## Environment Variables for Performance Tuning

The following environment variables can be used to tune Celerity's performance.
The following environment variables can be used to tune Celerity's performance.
Generally, these might need to be changed depending on the specific application and hardware setup to achieve the best possible performance, but the default values should work reasonably well in all cases:
| Option | Values | Description |
| --- | --- | --- |
| `CELERITY_HORIZON_STEP` | *number* | Determines the maximum number of sequential tasks before a new [horizon task](https://doi.org/10.1007/s42979-024-02749-w) is introduced. |
| `CELERITY_HORIZON_MAX_PARALLELISM` | *number* | Determines the maximum number of parallel tasks before a new horizon task is introduced. |
| `CELERITY_LOOKAHEAD` | `none`, `auto`, `infinite` | Controls how the scheduler will queue commands to optimize buffer allocations and avoid resizes. `none` will flush all commands immediately, `auto` will queue heuristically based on `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM`, and `infinite` will always queue up to the next synchronization point. |
| `CELERITY_BACKEND_DEVICE_SUBMISSION_THREADS` | `on`, `off` | Controls whether device commands are submitted in a separate backend thread for each local device. This improves performance particularly in cases where kernel runtimes are very short. (default: `on`) |
| `CELERITY_THREAD_PINNING` | `off`, `auto`, `from:#`, *core list* | Controls if and how threads are pinned to CPU cores. `off` disables pinning, `auto` lets Celerity decide, `from:#` starts pinning sequentially from the given core, and a core list specifies the exact pinning (see below). (default: `auto`) |

Expand Down
2 changes: 2 additions & 0 deletions include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace detail {
int get_dry_run_nodes() const { return m_dry_run_num_nodes; }
std::optional<int> get_horizon_step() const { return m_horizon_step; }
std::optional<int> get_horizon_max_parallelism() const { return m_horizon_max_parallelism; }
experimental::lookahead get_lookahead() { return m_lookahead; }
tracy_mode get_tracy_mode() const { return m_tracy_mode; }

private:
Expand All @@ -48,6 +49,7 @@ namespace detail {
bool m_should_print_graphs = false;
std::optional<int> m_horizon_step;
std::optional<int> m_horizon_max_parallelism;
experimental::lookahead m_lookahead = experimental::lookahead::automatic;
tracy_mode m_tracy_mode = tracy_mode::off;
};

Expand Down
10 changes: 10 additions & 0 deletions include/instruction_graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class instruction_graph_generator {
error_policy overlapping_write_error = error_policy::panic;
};

/// Hint returned by instruction_graph_generator::anticipate.
enum class scheduling_hint {
fknorr marked this conversation as resolved.
Show resolved Hide resolved
is_self_contained, ///< The instructions emitted for this command will not change even when invoking anticipate() on future commands.
could_merge_with_future_commands, ///< Queueing this command in anticipation of future commands may produce an optimized instruction graph.
};

/// Instruction graph generation requires information about the target system. `num_nodes` and `local_nid` affect the generation of communication
/// instructions and reductions, and `system` is used to determine work assignment, memory allocation and data migration between memories.
///
Expand Down Expand Up @@ -111,6 +117,10 @@ class instruction_graph_generator {
/// End tracking the host object with id `hoid`. Emits `destroy_host_object_instruction` if `create_host_object` was called with `owns_instance == true`.
void notify_host_object_destroyed(host_object_id hoid);

/// Inform the graph generator that a command is to follow in the future, without immediately generating any instructions for it. The scheduler will use
/// this to inject knowledge about future allocation sizes into the graph generator in order to avoid buffer resizes.
scheduling_hint anticipate(const command& cmd);

/// Compiles a command-graph node into a set of instructions, which are inserted into the shared instruction graph, and updates tracking structures.
void compile(const command& cmd);

Expand Down
21 changes: 21 additions & 0 deletions include/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "fence.h"
#include "runtime.h"
#include "tracy.h"
#include "types.h"

namespace celerity::detail {
struct barrier_tag {};
Expand Down Expand Up @@ -112,3 +113,23 @@ class queue {
};

} // namespace celerity

namespace celerity::experimental {

/// Controls the lookahead window size for all future submissions on the queue. The default setting is `lookahead::automatic`.
///
/// Use this function if the default configuration either does not eliminate enough buffer resizes in your application (`lookahead::infinite`), or host tasks
/// and kernels interact with the rest of your application in ways that require immediate flushing of every submitted command group (`lookahead::none`).
//
// Attached to the queue to signal that semantics are in-order with other submissions. Still applies to all queues.
// Experimental: This is only applicable to fully static work assignment, which might not remain the default forever.
inline void set_lookahead(celerity::queue& /* queue */, const experimental::lookahead lookahead) {
fknorr marked this conversation as resolved.
Show resolved Hide resolved
detail::runtime::get_instance().set_scheduler_lookahead(lookahead);
}

/// Flushes all command groups asynchronously enqueued in the scheduler.
///
/// This is beneficial only in rare situations where host-side code needs to synchronize with kernels or host tasks in a manner that is opaque to the runtime.
PeterTh marked this conversation as resolved.
Show resolved Hide resolved
inline void flush(celerity::queue& /* queue */) { detail::runtime::get_instance().flush_scheduler(); }

} // namespace celerity::experimental
5 changes: 5 additions & 0 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ namespace detail {

bool is_dry_run() const { return m_cfg->is_dry_run(); }

void set_scheduler_lookahead(experimental::lookahead lookahead);

void flush_scheduler();

private:
inline static bool s_mpi_initialized = false;
inline static bool s_mpi_finalized = false;
Expand All @@ -103,6 +107,7 @@ namespace detail {
std::unordered_set<buffer_id> m_live_buffers;
std::unordered_set<host_object_id> m_live_host_objects;

// Note: buffer / host object ids must not be re-used even after being freed, since scheduler lookahead may create these objects out-of-order.
buffer_id m_next_buffer_id = 0;
raw_allocation_id m_next_user_allocation_id = 1;
host_object_id m_next_host_object_id = 0;
Expand Down
199 changes: 75 additions & 124 deletions include/scheduler.h
Original file line number Diff line number Diff line change
@@ -1,140 +1,91 @@
#pragma once

#include <thread>
#include <variant>

#include "command_graph.h"
#include "command_graph_generator.h"
fknorr marked this conversation as resolved.
Show resolved Hide resolved
#include "double_buffered_queue.h"
#include "instruction_graph_generator.h"
#include "ranges.h"
#include "types.h"

#include <cstddef>
#include <functional>
fknorr marked this conversation as resolved.
Show resolved Hide resolved
#include <memory>
#include <string>

fknorr marked this conversation as resolved.
Show resolved Hide resolved
namespace celerity {
namespace detail {

class command_graph;
class command_recorder;
class instruction;
class instruction_graph;
class instruction_recorder;
struct outbound_pilot;
class task;

// Abstract base class to allow different threading implementation in tests
class abstract_scheduler {
protected:
friend struct scheduler_testspy;

public:
using delegate = instruction_graph_generator::delegate;

struct policy_set {
detail::command_graph_generator::policy_set command_graph_generator;
detail::instruction_graph_generator::policy_set instruction_graph_generator;
};

abstract_scheduler(size_t num_nodes, node_id local_node_id, const system_info& system_info, abstract_scheduler::delegate* delegate,
command_recorder* crec, instruction_recorder* irec, const policy_set& policy = {});

abstract_scheduler(const abstract_scheduler&) = delete;
abstract_scheduler(abstract_scheduler&&) = delete;
abstract_scheduler& operator=(const abstract_scheduler&) = delete;
abstract_scheduler& operator=(abstract_scheduler&&) = delete;

virtual ~abstract_scheduler();

/**
* @brief Notifies the scheduler that a new task has been created and is ready for scheduling.
*/
void notify_task_created(const task* const tsk) { notify(event_task_available{tsk}); }

void notify_buffer_created(
const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_allocation_id) {
notify(event_buffer_created{bid, range, elem_size, elem_align, user_allocation_id});
}

void notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) { notify(event_buffer_debug_name_changed{bid, name}); }

void notify_buffer_destroyed(const buffer_id bid) { notify(event_buffer_destroyed{bid}); }

void notify_host_object_created(const host_object_id hoid, const bool owns_instance) { notify(event_host_object_created{hoid, owns_instance}); }

void notify_host_object_destroyed(const host_object_id hoid) { notify(event_host_object_destroyed{hoid}); }

void notify_epoch_reached(const task_id tid) { notify(event_epoch_reached{tid}); }

protected:
/**
* This is called by the worker thread.
*/
void schedule();

private:
struct event_task_available {
const task* tsk;
};
struct event_buffer_created {
buffer_id bid;
celerity::range<3> range;
size_t elem_size;
size_t elem_align;
allocation_id user_allocation_id;
};
struct event_buffer_debug_name_changed {
buffer_id bid;
std::string debug_name;
};
struct event_buffer_destroyed {
buffer_id bid;
};
struct event_host_object_created {
host_object_id hoid;
bool owns_instance;
};
struct event_host_object_destroyed {
host_object_id hoid;
};
struct event_epoch_reached {
task_id tid;
};
struct event_test_inspect { // only used by scheduler_testspy
std::function<void()> inspect; // executed inside scheduler thread, making it safe to access scheduler members
};
using event = std::variant<event_task_available, event_buffer_created, event_buffer_debug_name_changed, event_buffer_destroyed,
event_host_object_created, event_host_object_destroyed, event_epoch_reached, event_test_inspect>;

std::unique_ptr<command_graph> m_cdag;
command_recorder* m_crec;
std::unique_ptr<command_graph_generator> m_cggen;
std::unique_ptr<instruction_graph> m_idag;
instruction_recorder* m_irec;
std::unique_ptr<instruction_graph_generator> m_iggen;

double_buffered_queue<event> m_event_queue;

void notify(event&& evt);
};

class scheduler final : public abstract_scheduler {
friend struct scheduler_testspy;
namespace celerity::detail::scheduler_detail {

/// executed inside scheduler thread, making it safe to access scheduler members
struct test_state {
const command_graph* cdag = nullptr;
const instruction_graph* idag = nullptr;
experimental::lookahead lookahead = experimental::lookahead::automatic;
};
using test_inspector = std::function<void(const test_state&)>;

struct scheduler_impl;

} // namespace celerity::detail::scheduler_detail

public:
scheduler(size_t num_nodes, node_id local_node_id, const system_info& system, scheduler::delegate* delegate, command_recorder* crec,
instruction_recorder* irec, const policy_set& policy = {});
namespace celerity::detail {

scheduler(const scheduler&) = delete;
scheduler(scheduler&&) = delete;
scheduler& operator=(const scheduler&) = delete;
scheduler& operator=(scheduler&&) = delete;
class command_recorder;
class instruction_recorder;
class task;

~scheduler() override;
// Abstract base class to allow different threading implementation in tests
class scheduler {
private:
friend struct scheduler_testspy;

private:
std::thread m_thread;
public:
using delegate = instruction_graph_generator::delegate;

void thread_main();
struct policy_set {
detail::command_graph_generator::policy_set command_graph_generator;
detail::instruction_graph_generator::policy_set instruction_graph_generator;
};

} // namespace detail
} // namespace celerity
scheduler(size_t num_nodes, node_id local_node_id, const system_info& system_info, scheduler::delegate* delegate, command_recorder* crec,
instruction_recorder* irec, const policy_set& policy = {});

scheduler(const scheduler&) = delete;
scheduler(scheduler&&) = default;
scheduler& operator=(const scheduler&) = delete;
scheduler& operator=(scheduler&&) = default;

~scheduler();

/**
* @brief Notifies the scheduler that a new task has been created and is ready for scheduling.
*/
void notify_task_created(const task* tsk);

void notify_buffer_created(buffer_id bid, const range<3>& range, size_t elem_size, size_t elem_align, allocation_id user_allocation_id);

void notify_buffer_debug_name_changed(buffer_id bid, const std::string& name);

void notify_buffer_destroyed(buffer_id bid);

void notify_host_object_created(host_object_id hoid, bool owns_instance);

void notify_host_object_destroyed(host_object_id hoid);

void notify_epoch_reached(task_id tid);

void set_lookahead(experimental::lookahead lookahead);

void flush_commands();

private:
struct test_threadless_tag {};

std::unique_ptr<scheduler_detail::scheduler_impl> m_impl;

// used in scheduler_testspy
scheduler(test_threadless_tag, size_t num_nodes, node_id local_node_id, const system_info& system_info, scheduler::delegate* delegate,
command_recorder* crec, instruction_recorder* irec, const policy_set& policy = {});
void test_scheduling_loop();
void test_inspect(scheduler_detail::test_inspector inspector);
};

} // namespace celerity::detail
20 changes: 20 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,23 @@ enum class epoch_action {
};

} // namespace celerity::detail

namespace celerity::experimental {

/// Controls how many command groups the runtime can queue up until starting execution of the first one.
enum class lookahead {
fknorr marked this conversation as resolved.
Show resolved Hide resolved
/// Command groups begin executing as soon as possible, minimizing latency. This is the right choice when asynchronous command groups must overlap with user
/// code in the application thread. Depending on the application, this might trigger frequent and expensive buffer resizes which can limit the maximum
/// buffer allocation (and thus problem size) per device.
none,

/// Queue up a window of command groups at the runtime's discretion. This is the default, and will combine buffer allocations and eliminate resizes and
/// out-of-memory conditions in most applications.
automatic,

/// Always queue up all command groups until the next synchronization point, i.e. `queue::fence`, `queue::wait` or runtime shutdown. This maximizes
/// throughput and avoids suboptimal buffer allocations at the expense of higher up-front scheduling latency.
infinite,
};

} // namespace celerity::experimental
Loading
Loading