From 6c7d01537dd5673aa85fc3eb177ff35bf0bc8236 Mon Sep 17 00:00:00 2001 From: dialecticDolt Date: Fri, 10 Nov 2023 16:46:00 -0600 Subject: [PATCH] style: fix cython lint --- src/python/parla/cython/core.pxd | 3 -- src/python/parla/cython/core.pyx | 16 ++----- src/python/parla/cython/cyparray.pyx | 1 - src/python/parla/cython/device.pxd | 4 +- src/python/parla/cython/device.pyx | 14 ++---- src/python/parla/cython/device_manager.pxd | 3 +- src/python/parla/cython/device_manager.pyx | 9 ++-- src/python/parla/cython/mm.pyx | 6 +-- src/python/parla/cython/scheduler.pyx | 56 +++++++--------------- src/python/parla/cython/stream_pool.pxd | 2 - src/python/parla/cython/stream_pool.pyx | 16 ++----- src/python/parla/cython/tasks.pyx | 4 +- src/python/parla/cython/variants.pyx | 3 +- 13 files changed, 41 insertions(+), 96 deletions(-) diff --git a/src/python/parla/cython/core.pxd b/src/python/parla/cython/core.pxd index 1b646d54..a783ff33 100644 --- a/src/python/parla/cython/core.pxd +++ b/src/python/parla/cython/core.pxd @@ -29,8 +29,6 @@ cdef extern from "include/runtime.hpp" nogil: void stop_callback(stopfunc_t func, void* scheduler) noexcept void create_parray(InnerPArray* parray, int parray_dev_id) noexcept - #ctypedef void* Ptr_t - #ctypedef InnerTask* InnerTaskPtr_t cdef cppclass _StatusFlags "TaskStatusFlags": bool spawnable @@ -153,7 +151,6 @@ cdef extern from "include/runtime.hpp" nogil: void increase_num_active_tasks() void decrease_num_active_tasks() - #int get_num_active_workers() int get_num_active_tasks() int get_num_running_tasks() int get_num_ready_tasks() diff --git a/src/python/parla/cython/core.pyx b/src/python/parla/cython/core.pyx index eae059b5..f1d34987 100644 --- a/src/python/parla/cython/core.pyx +++ b/src/python/parla/cython/core.pyx @@ -7,17 +7,13 @@ import cython -from ..common.parray.core import PArray -from ..common.dataflow import Dataflow -from ..common.globals import AccessMode, cupy +from ..common.globals import cupy from .device cimport Device from .cyparray cimport CyPArray from .device_manager cimport CyDeviceManager, DeviceManager from .mm cimport CyMM -import threading -from enum import IntEnum, auto from libc.stdint cimport uintptr_t LOG_TRACE = 0 @@ -627,7 +623,6 @@ cdef class PyInnerScheduler: cdef InnerScheduler* c_self = self.inner_scheduler c_self.remove_parray_from_tracker(cy_parray.get_cpp_parray(), dev_id) - #TODO(wlr): Should we release the GIL here? Or is it better to keep it? cpdef task_cleanup(self, PyInnerWorker worker, PyInnerTask task, int state): cdef InnerScheduler* c_self = self.inner_scheduler cdef InnerWorker* c_worker = worker.inner_worker @@ -689,13 +684,11 @@ cdef class PyInnerScheduler: cdef InnerScheduler* c_self = self.inner_scheduler c_self.create_parray(cy_parray.get_cpp_parray(), parray_dev_id) - cpdef get_mapped_parray_state(\ - self, int global_dev_id, long long int parray_parent_id): + cpdef get_mapped_parray_state(self, int global_dev_id, long long int parray_parent_id): cdef InnerScheduler* c_self = self.inner_scheduler return c_self.get_mapped_parray_state(global_dev_id, parray_parent_id) - cpdef get_reserved_parray_state(\ - self, int global_dev_id, long long int parray_parent_id): + cpdef get_reserved_parray_state(self, int global_dev_id, long long int parray_parent_id): cdef InnerScheduler* c_self = self.inner_scheduler return c_self.get_reserved_parray_state(global_dev_id, parray_parent_id) @@ -744,8 +737,7 @@ class DataMovementTaskAttributes: This is delcared to avoid circular imports that could happen when we import tasks.pyx in here. """ - def __init__(self, name, py_parray, access_mode, assigned_devices, \ - c_attrs: CyDataMovementTaskAttributes, dev_id): + def __init__(self, name, py_parray, access_mode, assigned_devices, c_attrs: CyDataMovementTaskAttributes, dev_id): self.name = name self.parray = py_parray self.access_mode = access_mode diff --git a/src/python/parla/cython/cyparray.pyx b/src/python/parla/cython/cyparray.pyx index c194f020..88f7a6bf 100644 --- a/src/python/parla/cython/cyparray.pyx +++ b/src/python/parla/cython/cyparray.pyx @@ -7,7 +7,6 @@ from .cyparray cimport InnerPArray from .cyparray_state cimport CyPArrayState -from ..common.parray.core import PArray # a Cython wrapper class around C++ PArray cdef class CyPArray: diff --git a/src/python/parla/cython/device.pxd b/src/python/parla/cython/device.pxd index b8875c3f..193e6b9a 100644 --- a/src/python/parla/cython/device.pxd +++ b/src/python/parla/cython/device.pxd @@ -1,5 +1,5 @@ -#cython: language_level=3 -#cython: language=c++ +# cython: language_level=3 +# cython: language=c++ from ..cython.resources cimport Resource import cython diff --git a/src/python/parla/cython/device.pyx b/src/python/parla/cython/device.pyx index d3dbc486..8aa637a2 100644 --- a/src/python/parla/cython/device.pyx +++ b/src/python/parla/cython/device.pyx @@ -35,8 +35,7 @@ cdef class CyGPUDevice(CyDevice): def __cinit__(self, int dev_id, long mem_sz, long num_vcus, py_device): # C++ device object. # This object is deallocated by the C++ device manager. - self._cpp_device = new GPUDevice(dev_id, mem_sz, num_vcus, \ - py_device) + self._cpp_device = new GPUDevice(dev_id, mem_sz, num_vcus, py_device) def __init__(self, int dev_id, long mem_sz, long num_vcus, py_device): pass @@ -197,11 +196,6 @@ class PyDevice: return self._device_id -""" -Device instances in Python manage resource status. -TODO(hc): the device configuration will be packed in a data class soon. -""" - class PyGPUDevice(PyDevice): """ An inherited class from `PyDevice` for a device object specialized to CUDA. @@ -209,7 +203,6 @@ class PyGPUDevice(PyDevice): def __init__(self, dev_id: int = 0, mem_sz: long = 0, num_vcus: long = 1): super().__init__(DeviceType.GPU, "GPU", dev_id) - #TODO(wlr): If we ever support VECs, we might need to move this device initialization self._cy_device = CyGPUDevice(dev_id, int(mem_sz*0.8), num_vcus, self) @property @@ -391,6 +384,7 @@ class PyGPUArchitecture(PyArchitecture): def __init__(self): super().__init__("GPUArch", DeviceType.GPU) + class ImportableGPUArchitecture(PyGPUArchitecture, ImportableArchitecture): def __init__(self): ImportableArchitecture.__init__(self, "GPUArch", DeviceType.GPU) @@ -496,11 +490,11 @@ class CupyStream(Stream): return self.__repr__() def __enter__(self): - #Set the device to the stream's device. + # Set the device to the stream's device. self.active_device = cupy.cuda.Device(self._device_id) self.active_device.__enter__() - #Set the stream to the current stream. + # Set the stream to the current stream. self._stream.__enter__() Locals.push_stream(self) diff --git a/src/python/parla/cython/device_manager.pxd b/src/python/parla/cython/device_manager.pxd index dbc28fe2..efd5d7fe 100644 --- a/src/python/parla/cython/device_manager.pxd +++ b/src/python/parla/cython/device_manager.pxd @@ -20,8 +20,7 @@ cdef extern from "include/device_manager.hpp" nogil: void free_memory_by_parray_id(int parray_dev_id, unsigned long memory_size) void free_memory(unsigned int global_dev_id, unsigned long memory_size) - - + cdef class CyDeviceManager: cdef DeviceManager* cpp_device_manager_ cpdef register_device(self, CyDevice cy_device) diff --git a/src/python/parla/cython/device_manager.pyx b/src/python/parla/cython/device_manager.pyx index b4e4dcf8..f12af63b 100644 --- a/src/python/parla/cython/device_manager.pyx +++ b/src/python/parla/cython/device_manager.pyx @@ -34,7 +34,6 @@ gpu = ImportableGPUArchitecture() cpu = ImportableCPUArchitecture() from . import stream_pool -from .stream_pool cimport CyStreamPool cdef class CyDeviceManager: """ @@ -246,9 +245,11 @@ class PyDeviceManager: for dev_id in range(num_of_gpus): if self.num_real_gpus > 0: - py_cuda_device = PyGPUDevice(dev_id % self.num_real_gpus, \ - gpu_mem_sizes[dev_id], \ - VCU_BASELINE) + py_cuda_device = PyGPUDevice( + dev_id % self.num_real_gpus, + gpu_mem_sizes[dev_id], + VCU_BASELINE + ) else: py_cuda_device = PyCPUDevice( diff --git a/src/python/parla/cython/mm.pyx b/src/python/parla/cython/mm.pyx index 268e7cfc..c62ea8f2 100644 --- a/src/python/parla/cython/mm.pyx +++ b/src/python/parla/cython/mm.pyx @@ -3,7 +3,7 @@ from parla.cython import device_manager from parla.cython.core cimport LRUGlobalEvictionManager from parla.cython cimport device_manager -#from parla.cython.core import LRUGlobalEvictionManager + class PyMM: def __init__(self, dm: device_manager.PyDeviceManager): @@ -20,13 +20,11 @@ class PyMM: return self._cy_mm def print_memory_stats(self, device_id, label: str): - import psutil - import os print(f"[{label}] Memory tracking", flush=True) try: import cupy mempool = cupy.get_default_memory_pool() - pinned_mempool = cupy.get_default_pinned_memory_pool() + _pinned_mempool = cupy.get_default_pinned_memory_pool() print(( f"\t GPU{device_id} {label} CuPy used bytes: {mempool.used_bytes()} \n" f"\t GPU{device_id} {label} Free bytes: {mempool.free_bytes()} \n" diff --git a/src/python/parla/cython/scheduler.pyx b/src/python/parla/cython/scheduler.pyx index 2e51aa58..14f16d13 100644 --- a/src/python/parla/cython/scheduler.pyx +++ b/src/python/parla/cython/scheduler.pyx @@ -8,7 +8,6 @@ from abc import abstractmethod import threading import inspect -import os from ..common.globals import DeviceType, cupy, CUPY_ENABLED from ..common.globals import SynchronizationType as SyncType from ..common.globals import _Locals as Locals @@ -26,9 +25,6 @@ from . import tasks from . cimport core from . import core from .cyparray import CyPArray -from .mm import PyMM - -from ..common.parray.core import PArray from ..utility.tracer import NVTXTracer Task = tasks.Task @@ -208,12 +204,8 @@ class WorkerThread(ControllableThread, SchedulerContext): self.task = self.inner_worker.get_task() if(self.task is None): self.status = "Waiting" - # print("WAITING", flush=True) - #with self._monitor: - # if not self.task: - # self._monitor.wait() nvtx.push_range(message="worker::wait", domain="Python Runtime", color="blue") - self.inner_worker.wait_for_task() # GIL Release + self.inner_worker.wait_for_task() # GIL Release self.task = self.inner_worker.get_task() if isinstance(self.task, core.DataMovementTaskAttributes): self.task_attrs = self.task @@ -237,13 +229,11 @@ class WorkerThread(ControllableThread, SchedulerContext): # Save device_context with task object active_task.environment = device_context - #Writes all 'default' streams and event pointers to c++ task - #This allows their synchronization without the GIL and faster iteration over them - #(only saves initial runtime ones, TODO(wlr): save any user added events or streams after body returns) + # Writes all 'default' streams and event pointers to c++ task + # This allows their synchronization without the GIL and faster iteration over them + # (only saves initial runtime ones, TODO(wlr): save any user added events or streams after body returns) device_context.write_to_task(active_task) - #print("Wrote enviornment to task", active_task, flush=True) - # Wait / Enqueue event for dependencies to complete if USE_PYTHON_RUNAHEAD: active_task.py_handle_runahead_dependencies() @@ -296,23 +286,22 @@ class WorkerThread(ControllableThread, SchedulerContext): elif isinstance(final_state, tasks.TaskRunahead): core.binlog_2("Worker", "Runahead task: ", active_task.inner_task, " on worker: ", self.inner_worker) - if USE_PYTHON_RUNAHEAD: - #Handle synchronization in Python (for debugging, works!) - #print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True) + # Handle synchronization in Python (for debugging, works!) + # print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True) if self._should_run: - #print("In if", flush=True) + # print("In if", flush=True) self.status = "Waiting" nvtx.push_range(message="worker::wait::2", domain="Python Runtime", color="red") self.scheduler.inner_scheduler.task_cleanup_and_wait_for_task(self.inner_worker, active_task.inner_task, active_task.state.value) else: - #print("In else", flush=True) + # print("In else", flush=True) self.scheduler.inner_scheduler.task_cleanup_presync(self.inner_worker, active_task.inner_task, active_task.state.value) if active_task.runahead != SyncType.NONE: device_context.synchronize(events=True) self.scheduler.inner_scheduler.task_cleanup_postsync(self.inner_worker, active_task.inner_task, active_task.state.value) else: - #Handle synchronization in C++ + # Handle synchronization in C++ # self.scheduler.inner_scheduler.task_cleanup(self.inner_worker, active_task.inner_task, active_task.state.value) # Adding wait here to reduce context switch between GIL print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True) @@ -320,14 +309,14 @@ class WorkerThread(ControllableThread, SchedulerContext): self.status = "Waiting" nvtx.push_range(message="worker::wait::2", domain="Python Runtime", color="red") self.scheduler.inner_scheduler.task_cleanup_and_wait_for_task(self.inner_worker, active_task.inner_task, active_task.state.value) - #self.task = self.inner_worker.get_task() + # self.task = self.inner_worker.get_task() else: self.scheduler.inner_scheduler.task_cleanup(self.inner_worker, active_task.inner_task, active_task.state.value) # print("Finished Cleaning up Task", active_task, flush=True) - #print("Should run before device_context", self._should_run, task, flush=True) + # print("Should run before device_context", self._should_run, task, flush=True) if active_task.runahead != SyncType.NONE: device_context.return_streams() - #print("Should run before final_state cleanup", self._should_run, task, flush=True) + # print("Should run before final_state cleanup", self._should_run, task, flush=True) if isinstance(final_state, tasks.TaskRunahead): final_state = tasks.TaskCompleted(final_state.return_value) active_task.cleanup() @@ -339,7 +328,6 @@ class WorkerThread(ControllableThread, SchedulerContext): self.task = None nvtx.pop_range(domain="Python Runtime") - # Adding wait here to reduce context switch between GIL # if self._should_run: # self.status = "Waiting" @@ -442,7 +430,7 @@ class Scheduler(ControllableThread, SchedulerContext): def parray_eviction(self): py_mm = self.memory_manager - print("Eviction policy is activated") + # print("Eviction policy is activated") for cuda_device in self.device_manager.get_devices(DeviceType.CUDA): global_id = cuda_device.get_global_id() parray_id = self.device_manager.globalid_to_parrayid(global_id) @@ -453,31 +441,19 @@ class Scheduler(ControllableThread, SchedulerContext): # from Python eviction manager. num_evictable_parray = py_mm.size(global_id) # TODO(hc): remove this. this is for test. - #import cupy + # import cupy for i in range(0, num_evictable_parray): try: # Get a PArray from a memory manager to evict. evictable_parray = \ py_mm.remove_and_return_head_from_zrlist(global_id) if evictable_parray is not None: - # TODO(hc): remove this. this is for test. - #for k in range(0, 4): - # with cupy.cuda.Device(k): - # mempool = cupy.get_default_memory_pool() - # print(f"\t OK? {k} Used GPU{k}: {mempool.used_bytes()}, Free Mmeory: {mempool.free_bytes()}", flush=True) - evictable_parray.evict(parray_id) - # TODO(hc): remove this. this is for test. - #for k in range(0, 4): - # with cupy.cuda.Device(k): - # mempool = cupy.get_default_memory_pool() - # print(f"\t OK {k} Used GPU{k}: {mempool.used_bytes()}, Free Mmeory: {mempool.free_bytes()}", flush=True) - # Repeat eviction until it gets enough memory. memory_size_to_evict -= \ evictable_parray.nbytes_at(parray_id) - #print("\t Remaining size to evict:", memory_size_to_evict, flush=True) + # print("\t Remaining size to evict:", memory_size_to_evict, flush=True) if memory_size_to_evict <= 0: break except Exception as e: @@ -490,7 +466,7 @@ class Scheduler(ControllableThread, SchedulerContext): print("Scheduler: Running", flush=True) self.inner_scheduler.run() should_run = self.inner_scheduler.get_should_run() - if should_run == False: + if should_run is False: break # This case is executed if PArray eviction # mechanism was invoked by C++ scheduler. diff --git a/src/python/parla/cython/stream_pool.pxd b/src/python/parla/cython/stream_pool.pxd index deab2c14..fee81941 100644 --- a/src/python/parla/cython/stream_pool.pxd +++ b/src/python/parla/cython/stream_pool.pxd @@ -10,7 +10,6 @@ cdef extern from "include/device_contexts.hpp": void push_event(int device_id, uintptr_t event) uintptr_t pop_event(int device_id) - cdef class CyStreamPool: @@ -18,4 +17,3 @@ cdef class CyStreamPool: cdef dict _pool cdef int _per_device cdef list _device_list - diff --git a/src/python/parla/cython/stream_pool.pyx b/src/python/parla/cython/stream_pool.pyx index 3ba98b11..addac192 100644 --- a/src/python/parla/cython/stream_pool.pyx +++ b/src/python/parla/cython/stream_pool.pyx @@ -11,9 +11,6 @@ cdef class CupyStream(Stream): def __init__(self, device: int = 0, stream = None): super().__init__(device=device, stream=stream) - - - cdef class CyStreamPool: def __cinit__(self): @@ -42,7 +39,7 @@ cdef class CyStreamPool: def get_stream(self, device): if len(self._pool[device]) == 0: - #Create a new stream if the pool is empty. + # Create a new stream if the pool is empty. new_stream = self.StreamClass(device=device) return new_stream @@ -52,7 +49,7 @@ cdef class CyStreamPool: self._pool[stream.device].append(stream) def __summarize__(self): - summary = "" + summary = "" for device in self._device_list: summary += f"({device} : {len(self._pool[device])})" @@ -62,10 +59,6 @@ cdef class CyStreamPool: return f"StreamPool({self.__summarize__()})" - - - - class StreamPool: def __init__(self, device_list, per_device=8, cupy_flag: bool = True): @@ -88,7 +81,7 @@ class StreamPool: def get_stream(self, device): if len(self._pool[device]) == 0: - #Create a new stream if the pool is empty. + # Create a new stream if the pool is empty. new_stream = self.StreamClass(device=device) return new_stream @@ -98,7 +91,7 @@ class StreamPool: self._pool[stream.device].append(stream) def __summarize__(self): - summary = "" + summary = "" for device in self._device_list: summary += f"({device} : {len(self._pool[device])})" @@ -106,4 +99,3 @@ class StreamPool: def __repr__(self): return f"StreamPool({self.__summarize__()})" - diff --git a/src/python/parla/cython/tasks.pyx b/src/python/parla/cython/tasks.pyx index 2eb7fa4f..ea3331c0 100644 --- a/src/python/parla/cython/tasks.pyx +++ b/src/python/parla/cython/tasks.pyx @@ -17,11 +17,10 @@ from .cyparray import CyPArray from ..common.globals import _Locals as Locals from ..common.globals import DeviceType -from ..common.globals import get_stream_pool, get_scheduler +from ..common.globals import get_stream_pool from ..common.globals import AccessMode, Storage from ..common.globals import SynchronizationType as SyncType from ..common.parray.core import PArray -from ..common.globals import _global_data_tasks DeviceType = PyDeviceType @@ -34,6 +33,7 @@ import traceback import cython cimport cython + class TaskState(object, metaclass=ABCMeta): """! @brief Abstract base class for Task State. diff --git a/src/python/parla/cython/variants.pyx b/src/python/parla/cython/variants.pyx index 35f93eaa..7fae811b 100644 --- a/src/python/parla/cython/variants.pyx +++ b/src/python/parla/cython/variants.pyx @@ -8,7 +8,7 @@ import functools from ..common.globals import _Locals as Locals -from .device import PyArchitecture + class VariantDefinitionError(ValueError): """! @@ -31,7 +31,6 @@ class _VariantFunction(object): self._variants = {} functools.update_wrapper(self, func) - def variant(self, spec_list=None, override=False, architecture=None, max_amount=8): """! @brief Decorator to declare a variant of this function for a specific architecture.