diff --git a/src/c/backend/include/containers.hpp b/src/c/backend/include/containers.hpp index 1431570c..c3886b71 100644 --- a/src/c/backend/include/containers.hpp +++ b/src/c/backend/include/containers.hpp @@ -35,6 +35,18 @@ // as well as std::atomic_flag. // template <> auto constexpr is_atomic = true; +// struct taskStructure { // Structure declaration +// int priority; // Member (int variable) +// int wait_time; +// InnerTask *task; // Member (string variable) +// bool operator() <(taskStructure& b){ +// if(priority < b.priority) { +// return true; // the order is correct and NO swapping of elements takes place +// } +// return false; // the order is NOT correct and swapping of elements takes place +// } +// }; + template class ProtectedVector { private: @@ -475,4 +487,160 @@ template class ProtectedQueue { inline bool empty_unsafe() { return this->q.empty(); } }; +template class ComparePriority { + public: + bool operator()(T a, T b){ + if(a->priority < b->priority) { + return true; // the order is correct and NO swapping of elements takes place + } + return false; // the order is NOT correct and swapping of elements takes place + } +}; + +template class ProtectedPriorityQueue { + +private: + std::priority_queue, ComparePriority> pq; + std::atomic length{0}; + std::mutex mtx; + std::string name; + +public: + ProtectedPriorityQueue() = default; + + ProtectedPriorityQueue(std::string name) { + this->mtx.lock(); + this->name = name; + this->mtx.unlock(); + } + + ProtectedPriorityQueue(std::string name, std::priority_queue pq) { + this->mtx.lock(); + this->name = name; + this->pq = pq; + this->mtx.unlock(); + } + + ProtectedPriorityQueue(std::string name, size_t size) { + this->mtx.lock(); + this->name = name; + this->pq.reserve(size); + this->mtx.unlock(); + } + + void lock() { this->mtx.lock(); } + + void unlock() { this->mtx.unlock(); } + + void push(T a) { + this->mtx.lock(); + this->pq.push(a); + this->mtx.unlock(); + this->length++; + } + + inline void push_unsafe(T a) { + this->pq.push(a); + this->length++; + } + + void push(std::vector &a) { + this->mtx.lock(); + for (auto val : a) { + this->push_unsafe(val); + } + this->mtx.unlock(); + } + + inline void push_unsafe(std::vector &a) { + for (auto val : a) { + this->push_unsafe(val); + } + } + + void pop() { + this->mtx.lock(); + this->pq.pop(); + this->mtx.unlock(); + this->length--; + } + + inline void pop_unsafe() { + this->pq.pop(); + this->length--; + } + + size_t atomic_size() { return this->length.load(); } + + size_t size() { + this->mtx.lock(); + int size = this->pq.size(); + this->mtx.unlock(); + return size; + } + + size_t size_unsafe() { return this->q.size(); } + + T operator[](size_t i) { + this->mtx.lock(); + auto val = this->pq[i]; + this->mtx.unlock(); + return val; + } + + T at(size_t i) { + this->mtx.lock(); + T val = this->pq.at(i); + this->mtx.unlock(); + return val; + } + + inline T at_unsafe(size_t i) { return this->pq.at(i); } + + T front() { + this->mtx.lock(); + T val = this->pq.top(); + this->mtx.unlock(); + return val; + } + + inline T front_unsafe() { return this->pq.top(); } + + T front_and_pop() { + this->mtx.lock(); + T val = this->front_unsafe(); + this->pop_unsafe(); + this->mtx.unlock(); + return val; + } + + inline T front_and_pop_unsafe() { + T val = this->front_unsafe(); + this->pop_unsafe(); + return val; + } + + // Add implementation of clear() + // void clear() { + // this->mtx.lock(); + // this->pq.clear(); + // this->mtx.unlock(); + // this->length = 0; + // } + + // inline void clear_unsafe() { + // this->pq.clear(); + // this->length = 0; + // } + + bool empty() { + this->mtx.lock(); + bool empty = this->pq.empty(); + this->mtx.unlock(); + return empty; + } + + inline bool empty_unsafe() { return this->pq.empty(); } +}; + #endif // PARLA_CONTAINERS_HPP diff --git a/src/c/backend/include/device_queues.hpp b/src/c/backend/include/device_queues.hpp index feb9a4b2..37cdc552 100644 --- a/src/c/backend/include/device_queues.hpp +++ b/src/c/backend/include/device_queues.hpp @@ -7,6 +7,7 @@ #include "device_manager.hpp" #include "runtime.hpp" +#include "containers.hpp" // TODO(wlr): FIXME Change these back to smart pointers. I'm leaking memory // here... @@ -21,7 +22,7 @@ * queue supervises the phase of. */ template class DeviceQueue { - using MixedQueue_t = TaskQueue; + using MixedQueue_t = PriorityTaskQueue; using MDQueue_t = TaskQueue; public: @@ -33,17 +34,29 @@ template class DeviceQueue { */ Device *get_device() { return device; } + /** + * Calculates priority for the task. The scheduling algorithm follows a low priority scheme. + * @param task the task to set priority for + */ + void set_priority(InnerTask *task) { + int num_dependents = task->dependents.size() + 1; // inveresly propotional -> more the # of dependents, earlier it should be scheduled, added 1 to handle if dependents are 0 + int num_gpus_required = task->assigned_devices.size(); // directly propotional -> more the # of GPUs req, later it should be scheduled + int priority = total_num_tasks + (num_gpus_required / num_dependents); // normalize and change this + std::cout << total_num_tasks << std::endl; + task->set_priority(priority); + // critical path length to most recently spawned task + // estimated completion time + } + /** * Enqueues a task on this device. * @param task the task to enqueue */ void enqueue(InnerTask *task) { - // std::cout << "DeviceQueue::enqueue() - " << task->get_name() << - // std::endl; - - // std::cout << "Mixed Queue size: " << mixed_queue.size() << std::endl; - this->mixed_queue.push_back(task); + this->set_priority(task); + this->mixed_queue.push(task); num_tasks++; + total_num_tasks++; }; /** @@ -115,7 +128,7 @@ template class DeviceQueue { std::cout << "Moving task to waiting queue: " << head->get_name() << std::endl; waiting_queue.push_back(head); - mixed_queue.pop_front(); + mixed_queue.pop(); // TODO(wlr): Should num_tasks include waiting tasks? // (2) @@ -142,7 +155,7 @@ template class DeviceQueue { InnerTask *task = front(); if (task != nullptr) { // std::cout << "Popping task: " << task->get_name() << std::endl; - mixed_queue.pop_front(); + mixed_queue.pop(); // Set removed status so task can be pruned from other queues task->set_removed(true); num_tasks--; @@ -158,6 +171,7 @@ template class DeviceQueue { MixedQueue_t mixed_queue; MDQueue_t waiting_queue; std::atomic num_tasks{0}; + std::atomic total_num_tasks{0}; }; // TODO(wlr): I don't know what to name this. diff --git a/src/c/backend/include/phases.hpp b/src/c/backend/include/phases.hpp index d8ed3bc2..07ec4079 100644 --- a/src/c/backend/include/phases.hpp +++ b/src/c/backend/include/phases.hpp @@ -380,6 +380,7 @@ class RuntimeReserver : virtual public SchedulerPhase { } void enqueue(InnerTask *task); + // void enqueue(taskStructure *task_with_priority); void enqueue(std::vector &tasks); void run(SchedulerPhase *next_phase); size_t get_count(); diff --git a/src/c/backend/include/runtime.hpp b/src/c/backend/include/runtime.hpp index 7a22090c..020be7db 100644 --- a/src/c/backend/include/runtime.hpp +++ b/src/c/backend/include/runtime.hpp @@ -44,6 +44,7 @@ #include "profiling.hpp" #include "resource_requirements.hpp" #include "memory_manager.hpp" +// # include "task.hpp" using namespace std::chrono_literals; using namespace parray; @@ -59,10 +60,25 @@ class TaskBarrier; class InnerWorker; class InnerScheduler; +// struct taskStructure { // Structure declaration +// int priority; // Member (int variable) +// int wait_time; +// InnerTask *task; // Member (string variable) +// // bool operator()< (const taskStructure& b) +// // { +// // return a.priority < b.priority; +// // // if(priority < b.priority) { +// // // return true; // the order is correct and NO swapping of elements takes place +// // // } +// // // return false; // the order is NOT correct and swapping of elements takes place +// // } +// }; + // Type Aliases for common containers using WorkerQueue = ProtectedQueue; using WorkerList = ProtectedVector; using TaskQueue = ProtectedQueue; +using PriorityTaskQueue = ProtectedPriorityQueue; using TaskList = ProtectedVector; using SpaceList = ProtectedVector; using PointerList = ProtectedVector; diff --git a/src/python/parla/cython/cyparray.pyx b/src/python/parla/cython/cyparray.pyx index 88f7a6bf..9e17c8be 100644 --- a/src/python/parla/cython/cyparray.pyx +++ b/src/python/parla/cython/cyparray.pyx @@ -7,7 +7,6 @@ from .cyparray cimport InnerPArray from .cyparray_state cimport CyPArrayState - # a Cython wrapper class around C++ PArray cdef class CyPArray: diff --git a/src/python/parla/cython/device_manager.pyx b/src/python/parla/cython/device_manager.pyx index f12af63b..34d4c205 100644 --- a/src/python/parla/cython/device_manager.pyx +++ b/src/python/parla/cython/device_manager.pyx @@ -83,7 +83,6 @@ class PrintableFrozenSet(frozenset): def __repr__(self): return self.get_name() - # TODO(wlr): - Allow device manager to initialize non-contiguous gpu ids. # TODO(wlr): - Provide a way to iterate over these real device ids diff --git a/src/python/parla/cython/tasks.pyx b/src/python/parla/cython/tasks.pyx index ea3331c0..c5ff57c5 100644 --- a/src/python/parla/cython/tasks.pyx +++ b/src/python/parla/cython/tasks.pyx @@ -1786,4 +1786,4 @@ class BackendTaskSpace(TaskSpace): return return_list def wait(self): - self.inner_space.wait() + self.inner_space.wait() \ No newline at end of file