From 5ccb9d756aa74845c96c35b0cb8dcddc6dc1c46d Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 2 Dec 2022 02:20:02 -0600 Subject: [PATCH 01/16] baseline of LRU garbage collector --- parla/tracking.py | 584 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 584 insertions(+) create mode 100644 parla/tracking.py diff --git a/parla/tracking.py b/parla/tracking.py new file mode 100644 index 0000000..00f3567 --- /dev/null +++ b/parla/tracking.py @@ -0,0 +1,584 @@ +""" +Contains data structures for logging and tracking PArray instances across devices. + +DataTracker: +- Hash-map of PArrays to devices +- Hash-map of Devices to EvictionManagers + +EvictionManager: + - Implements an eviction policy for a managed list of PArray objects on a single device +""" + +#TODO(hc): how to guarantee that the evicted data can be refetched correctly when a later task needs it. +# synchronization between data movement tasks and data eviction; for example, if a data is evicted +# after a data movement task is executed, then it breaks correctness. +# also, it could cause thrashing. +# condition 1. if a mapped task in a queue will use an evicted data, its data movement task should exist. +# condition 2. + +#TODO(hc): so, eviction should happen at the proper timing. +# let's assume that we evicted a proper data at a proper timing. then what do we need? +# 1) parray coherency protocol should be aware of that. +# 2) update priority of the data object; or we can just remove that data from the list. + +#TODO(hc): how to handle slicing? + + + +import threading + +#TODO(wlr): Nothing in this file is threadsafe at the moment. Developing structure first, then we'll add locks. + +#TODO(wlr): I assume PArrays hash to a unique value during their lifetime. If not, we'll need to add such a hash function to PArray. + +#TODO(wlr): This is developed without considering sliced PArrays. +# For slices, I imagine we might need something with the following rules: +# - An access of a slice, locks the parent on that device +# - An eviction of a slice, may not evict the parent +# - An eviction of a parent, evicts all slices + +# I'm less certain about the following: +# - Updating the priority of a slice, updates the priority of the parent +# - Updating the priority of a parent, updates the priority of all slices + + +#Needs: +# - wrap in locks (use external locks on base class) + + +class DataNode: + """ + A node containing a data object (PArray) on a specific device. + Used in the linked lists in the EvictionManager + """ + def __init__(self, data, device, priority=0): + self.data = data + self.device = device + self.priority = priority + + self.next = None + self.prev = None + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f"DataNode({self.data}, {self.device})" + +class ListNode: + """ + A node containing a linked list of DataNodes with an associated value (typically priority). + Useful for more complex eviction data structures. + """ + + def __init__(self, value, list): + self.value = value + self.list = list + + self.next = None + self.prev = None + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f"ListNode({self.list})" + +class DLList: + """ + A doubly linked list used in the EvictionManager + """ + def __init__(self): + self.head = None + self.tail = None + self.length = 0 + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f"DLList({self.head}, {self.tail})" + + def append(self, node): + if self.head is None: + self.head = node + self.tail = node + else: + self.tail.next = node + node.prev = self.tail + self.tail = node + + self.length += 1 + + def remove(self, node): + edit = False + + if self.head == node: + self.head = node.next + edit = True + + if self.tail == node: + self.tail = node.prev + edit = True + + if node.prev is not None: + node.prev.next = node.next + edit = True + + if node.next is not None: + node.next.prev = node.prev + edit = True + + node.prev = None + node.next = None + + if edit: + self.length -= 1 + + return edit + + def insert_before(self, node, new_node): + if node.prev is not None: + node.prev.next = new_node + new_node.prev = node.prev + else: + self.head = new_node + node.prev = new_node + new_node.next = node + + self.length += 1 + + def insert_after(self, node, new_node): + if node.next is not None: + node.next.prev = new_node + new_node.next = node.next + else: + self.tail = new_node + node.next = new_node + new_node.prev = node + + self.length += 1 + + def __len__(self): + return self.length +class EvictionManager: + """ + Track usage of data objects on devices. Used to chose which blocks to evict. + """ + + def __init__(self, device, memory_limit): + self.device = device + + #values in bytes + self.memory_limit = memory_limit + self.used_memory = 0 + self.evictable_memory = 0 + + self.lock = threading.Condition(threading.Lock()) + + def map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + with self.lock: + self._map_data(data) + + def _map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + pass + + def _unmap_data(self, data): + pass + + def unmap_data(self, data): + """ + Called when a data object is unmapped from a device. + """ + with self.lock: + self._unmap_data(self, data) + + def _start_prefetch_data(self, data): + pass + + def start_prefetch_data(self, data): + """ + Called when a data object starts a prefetch. + Updates the used memory size. + + Can update the priority of the data object. + """ + with self.lock: + self._start_prefetch_data(data) + + def _stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + # TODO(hc): what does it mean? + pass + + def stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + with self.lock: + self._stop_prefetch_data(data) + + def _access_data(self, data): + pass + + + def access_data(self, data): + """ + Called when a data object is accessed. + + Can update the priority of the data object. + Locks the data object (cannot be evicted while in use) + Updates the evictable memory size. + """ + with self.lock: + self._access_data(data) + + + def _release_data(self, data): + pass + + def release_data(self, data): + """ + Called when a data object is no longer in use. + + Updates the priority of the data object. + Unlocks the data object (can be evicted) + Updates the evictable memory size. + """ + with self.lock: + self._release_data(data) + + def _evict_data(self, data): + pass + + def evict_data(self, data): + """ + Called to evict a specific data object. + Updates the used memory size and evictable memory size. + """ + with self.lock: + self._evict_data(data) + + def _evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + pass + + + def evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + with self.lock: + self._evict() + +class LRUManager(EvictionManager): + """ + Eviction manager for a LRU (least recently used) policy. + + Use is updated when a data object is accessed and released by a task. + """ + + def __init__(self, device, memory_limit): + super().__init__(device, memory_limit) + self.data_list = DLList() + self.data_map = {} + + #Note(wlr): These tracking dictionaries are optional, I just think it's interesting to track. + #Holds data objects on this device that are being prefetched. + self.prefetch_map = {} + #Holds data objects on this device that are needed by tasks that have not yet completed (this includes data in the process of being prefetched). + self.active_map = {} + #holds data objects that are currently being used by tasks. + self.used_map = {} + + + def _start_prefetch_data(self, data): + + data.add_prefetch(self.device) + data.add_active(self.device) + + if data in self.data_map: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + #Remove it from the evictable list. + + # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. + # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. + # but this is very rare case and I am gonna leave it as the future work. + success = self.data_list.remove(data) + + if success: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + self.evictable_memory -= data.size + else: + #This is a new block, update the used memory size. + self.used_memory += data.size + + self.prefetch_map[data] = data + self.active_map[data] = data + + assert(self.used_memory <= self.memory_limit) + + def _stop_prefetch_data(self, data): + + count = data.get_prefetch(self.device) + data.remove_prefetch(self.device) + + if count == 1: + del self.prefetch_map[data] + + def _access_data(self, data): + + #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. + # Any logic here would be a sanity check. I'm removing it for now. + + #node = self.data_map.get(data, None) + #if node is not None: + # #Remove the node from the eviction list while it's in use + # success = self.data_list.remove(node) + + self.used_map[data] = data + data.add_use(self.device) + + def _release_data(self, data): + node = self.data_map[data] + + active_count = data.get_active(self.device) + use_count = data.get_use(self.device) + + data.remove_active(self.device) + data.remove_use(self.device) + + if active_count == 1: + del self.active_map[data] + + #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + self.data_list.append(node) + self.evictable_memory += data.nbytes + + if use_count == 1: + del self.used_map[data] + + def _evict_data(self, data): + node = self.data_map[data] + + assert(data.get_use(self.device) == 0) + assert(data.get_active(self.device) == 0) + + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + data.evict(self.device) + + self.data_list.remove(node) + del self.data_map[data] + + self.used_memory -= data.nbytes + self.evictable_memory -= data.nbytes + + def _evict(self): + # Get the oldest data object + # Because we append after use this is at the front of the list + + node = self.data_list.head + self._evict_data(node) + +class LFUManager(EvictionManager): + """ + Eviction Manager for a LFU (Least Frequently Used) policy. + Use is updated when a data object is accessed and released by a task. + + The data structure follows the O(1) implementation described by (Ketan Shah/Anirban Mitra/Dhruv Matani, 2010): http://dhruvbird.com/lfu.pdf + """ + + def __init__(self, device, memory_limit): + super().__init__(device, memory_limit) + self.priority_map = {} + self.data_map = {} + + self.priority_list = DLList() + + #NOTE(wlr): These tracking dictionaries are optional, I just think it's interesting to track. + #Holds data objects on this device that are being prefetched. + self.prefetch_map = {} + #Holds data objects on this device that are needed by tasks that have not yet completed (this includes data in the process of being prefetched). + self.active_map = {} + #holds data objects that are currently being used by tasks. + self.used_map = {} + + def _add(self, node): + + #Increment usage count + node.priority += 1 + + #Lookup ListNode in priority table + list_node = self.priority_map.get(node.priority, None) + + if list_node is None: + #Create new list node + list_node = ListNode(node.priority, DLList()) + self.priority_list[node.priority] = list_node + + if node.priority == 1: + #Add to the head of the list + self.priority_list.append(list_node) + else: + #Add as the next node after the previous priority + self.priority_list.insert_after(list_node, self.priority_list[node.priority - 1]) + + #Add data node to internal list + list_node.list.append(node) + + def _remove(self, node, delete=False): + + #Lookup ListNode in priority table + list_node = self.priority_map.get(node.priority, None) + + assert(list_node is not None) + + success = list_node.list.remove(node) + + if len(list_node.list) == 0: + #Remove the list node from the priority list + + self.priority_list.remove(list_node) + del self.priority_map[list_node.priority] + + if delete: + del self.data_map[node] + + return success + + + def _get_evict_target(self): + + #Get least frequently used node + + #Get the first list node in the priority list + + list_node = self.priority_list.head + data_node = list_node.list.head + + while list_node is not None: + + #Check all data nodes with the same priority + while data_node is not None: + data_node = data_node.next + + if data_node.data.get_active(self.device) == 0: + break + + #Continue search in next priority list + if data_node.data.get_active(self.device) == 0: + break + + list_node = list_node.next + + return data_node + + def _start_prefetch_data(self, data): + + data.add_prefetch(self.device) + data.add_active(self.device) + + if data in self.data_map: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + + if data.get_active(self.device) == 1: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + self.evictable_memory -= data.size + else: + #This is a new block, update the used memory size. + self.used_memory += data.size + + self.prefetch_map[data] = data + self.active_map[data] = data + + assert(self.used_memory <= self.memory_limit) + + def _stop_prefetch_data(self, data): + + count = data.get_prefetch(self.device) + data.remove_prefetch(self.device) + + if count == 1: + del self.prefetch_map[data] + + def _access_data(self, data): + + #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. + # Any logic here would be a sanity check. I'm removing it for now. + + #node = self.data_map.get(data, None) + #if node is not None: + # #Remove the node from the eviction list while it's in use + # success = self.data_list.remove(node) + + self.used_map[data] = data + data.add_use(self.device) + + + def _release_data(self, data): + node = self.data_map[data] + + active_count = data.get_active(self.device) + use_count = data.get_use(self.device) + + data.remove_active(self.device) + data.remove_use(self.device) + + self._add(node) + + if active_count == 1: + del self.active_map[data] + self.evictable_memory += data.nbytes + + if use_count == 1: + del self.used_map[data] + + def _evict_data(self, data): + node = self.data_map[data] + + assert(data.get_use(self.device) == 0) + assert(data.get_active(self.device) == 0) + + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + data.evict(self.device) + + self._remove(node, delete=True) + + self.used_memory -= data.nbytes + self.evictable_memory -= data.nbytes + + + def _evict(self): + # Get the oldest data object and remove it + node = self._get_evict_target() + self._evict_data(node) From c61d00818b3fec8aa1c3b08e99df7783fe728b17 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 6 Jan 2023 00:10:50 -0600 Subject: [PATCH 02/16] Design a baseline and prefetch start --- parla/tracking.py | 309 +++++++++++++++++++++++++--------------------- 1 file changed, 170 insertions(+), 139 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index 00f3567..4dc59af 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -24,9 +24,9 @@ #TODO(hc): how to handle slicing? - import threading + #TODO(wlr): Nothing in this file is threadsafe at the moment. Developing structure first, then we'll add locks. #TODO(wlr): I assume PArrays hash to a unique value during their lifetime. If not, we'll need to add such a hash function to PArray. @@ -45,6 +45,7 @@ #Needs: # - wrap in locks (use external locks on base class) +from typing import TypedDict, Dict class DataNode: """ @@ -65,6 +66,7 @@ def __str__(self): def __repr__(self): return f"DataNode({self.data}, {self.device})" + class ListNode: """ A node containing a linked list of DataNodes with an associated value (typically priority). @@ -84,9 +86,10 @@ def __str__(self): def __repr__(self): return f"ListNode({self.list})" + class DLList: """ - A doubly linked list used in the EvictionManager + A doubly linked list used in the EvictionManager. """ def __init__(self): self.head = None @@ -161,161 +164,56 @@ def insert_after(self, node, new_node): def __len__(self): return self.length -class EvictionManager: - """ - Track usage of data objects on devices. Used to chose which blocks to evict. - """ - - def __init__(self, device, memory_limit): - self.device = device - - #values in bytes - self.memory_limit = memory_limit - self.used_memory = 0 - self.evictable_memory = 0 - - self.lock = threading.Condition(threading.Lock()) - - def map_data(self, data): - """ - Called when a data object is mapped to a device. - """ - with self.lock: - self._map_data(data) - - def _map_data(self, data): - """ - Called when a data object is mapped to a device. - """ - pass - - def _unmap_data(self, data): - pass - - def unmap_data(self, data): - """ - Called when a data object is unmapped from a device. - """ - with self.lock: - self._unmap_data(self, data) - - def _start_prefetch_data(self, data): - pass - - def start_prefetch_data(self, data): - """ - Called when a data object starts a prefetch. - Updates the used memory size. - - Can update the priority of the data object. - """ - with self.lock: - self._start_prefetch_data(data) - - def _stop_prefetch_data(self, data): - """ - Called when a data object is no longer being prefetched. - - Updates the priority of the data object. - """ - # TODO(hc): what does it mean? - pass - - def stop_prefetch_data(self, data): - """ - Called when a data object is no longer being prefetched. - Updates the priority of the data object. - """ - with self.lock: - self._stop_prefetch_data(data) - - def _access_data(self, data): - pass - - - def access_data(self, data): - """ - Called when a data object is accessed. - - Can update the priority of the data object. - Locks the data object (cannot be evicted while in use) - Updates the evictable memory size. - """ - with self.lock: - self._access_data(data) - - - def _release_data(self, data): - pass - - def release_data(self, data): - """ - Called when a data object is no longer in use. - - Updates the priority of the data object. - Unlocks the data object (can be evicted) - Updates the evictable memory size. - """ - with self.lock: - self._release_data(data) - def _evict_data(self, data): - pass - - def evict_data(self, data): - """ - Called to evict a specific data object. - Updates the used memory size and evictable memory size. - """ - with self.lock: - self._evict_data(data) - - def _evict(self): - """ - Called when memory is needed. - - Evicts the data object with the highest priority (based on the policy). - """ - pass - - - def evict(self): - """ - Called when memory is needed. +class DataMapType(TypedDict): + """ + Track information of data instance in a device + """ + state: str + ref_count: int - Evicts the data object with the highest priority (based on the policy). - """ - with self.lock: - self._evict() -class LRUManager(EvictionManager): +#class LRUManager(EvictionManager): +class LRUManager: """ - Eviction manager for a LRU (least recently used) policy. - - Use is updated when a data object is accessed and released by a task. + LRU policy for garbage collecting. + It mantains a list of the zero-referenced data objects for each device. + The head of the list is the target task to be evicted + and the tail of the list is the data used most recently. """ def __init__(self, device, memory_limit): super().__init__(device, memory_limit) - self.data_list = DLList() - self.data_map = {} + # A list containig zero-reference data objects in a specified device. + self.zr_data_list = DLList() + # A dictionary containing all data information on a device. + self.data_map = Dict[DataMapType] + # A lock for guarding a reference count. + self.ref_count_lock = threading.Condition(threading.Lock()) #Note(wlr): These tracking dictionaries are optional, I just think it's interesting to track. #Holds data objects on this device that are being prefetched. + #XXX(hc): This might be necessary as data being prefetched cannot be used yet but it can avoid + # unnecessary future data prefetching or move. self.prefetch_map = {} #Holds data objects on this device that are needed by tasks that have not yet completed (this includes data in the process of being prefetched). self.active_map = {} #holds data objects that are currently being used by tasks. self.used_map = {} + def _increase_ref_count(self, data): + with self.ref_count_lock: + assert(self.data_map[data.ID]["ref_count"] >= 0) + self.data_map[data.ID]["ref_count"] += 1 - def _start_prefetch_data(self, data): - - data.add_prefetch(self.device) - data.add_active(self.device) + def _decrease_ref_count(self, data): + with self.ref_count_lock: + self.data_map[data.ID]["ref_count"] -= 1 + assert(self.data_map[data.ID]["ref_count"] >= 0) - if data in self.data_map: + def _start_prefetch_data(self, data): + if data.ID in self.data_map: #This is a prefetch of a data object that is already on the device (or is being prefetched). #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. #Remove it from the evictable list. @@ -323,21 +221,23 @@ def _start_prefetch_data(self, data): # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. # but this is very rare case and I am gonna leave it as the future work. - success = self.data_list.remove(data) + success = self.zr_data_list.remove(data) if success: #This is the first prefetch of a data object that is already on the device. #Update the evictable memory size (as this data object is no longer evictable). self.evictable_memory -= data.size + self._increase_ref_count(data) else: + self.data_map[data.ID] = {"state" : "Reserved", "ref_count" : 1} #This is a new block, update the used memory size. self.used_memory += data.size - self.prefetch_map[data] = data self.active_map[data] = data assert(self.used_memory <= self.memory_limit) + ''' def _stop_prefetch_data(self, data): count = data.get_prefetch(self.device) @@ -403,7 +303,9 @@ def _evict(self): node = self.data_list.head self._evict_data(node) + ''' +''' class LFUManager(EvictionManager): """ Eviction Manager for a LFU (Least Frequently Used) policy. @@ -582,3 +484,132 @@ def _evict(self): # Get the oldest data object and remove it node = self._get_evict_target() self._evict_data(node) + +class EvictionManager: + """ + Track usage of data objects on devices. Used to chose which blocks to evict. + """ + + def __init__(self, device, memory_limit): + self.device = device + + #values in bytes + self.memory_limit = memory_limit + self.used_memory = 0 + self.evictable_memory = 0 + + self.lock = threading.Condition(threading.Lock()) + + def map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + with self.lock: + self._map_data(data) + + def _map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + pass + + def _unmap_data(self, data): + pass + + def unmap_data(self, data): + """ + Called when a data object is unmapped from a device. + """ + with self.lock: + self._unmap_data(self, data) + + def _start_prefetch_data(self, data): + pass + + def start_prefetch_data(self, data): + """ + Called when a data object starts a prefetch. + Updates the used memory size. + + Can update the priority of the data object. + """ + with self.lock: + self._start_prefetch_data(data) + + def _stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + # TODO(hc): what does it mean? + pass + + def stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + with self.lock: + self._stop_prefetch_data(data) + + def _access_data(self, data): + pass + + + def access_data(self, data): + """ + Called when a data object is accessed. + + Can update the priority of the data object. + Locks the data object (cannot be evicted while in use) + Updates the evictable memory size. + """ + with self.lock: + self._access_data(data) + + + def _release_data(self, data): + pass + + def release_data(self, data): + """ + Called when a data object is no longer in use. + + Updates the priority of the data object. + Unlocks the data object (can be evicted) + Updates the evictable memory size. + """ + with self.lock: + self._release_data(data) + + def _evict_data(self, data): + pass + + def evict_data(self, data): + """ + Called to evict a specific data object. + Updates the used memory size and evictable memory size. + """ + with self.lock: + self._evict_data(data) + + def _evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + pass + + + def evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + with self.lock: + self._evict() +''' From c6dc1a0b588ded8d859c0bb05941e7f92b028147 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 6 Jan 2023 00:13:13 -0600 Subject: [PATCH 03/16] Add prefetching end function --- parla/tracking.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index 4dc59af..3840aab 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -170,6 +170,7 @@ class DataMapType(TypedDict): """ Track information of data instance in a device """ + # TODO(hc): state should be an enum type. state: str ref_count: int @@ -229,7 +230,7 @@ def _start_prefetch_data(self, data): self.evictable_memory -= data.size self._increase_ref_count(data) else: - self.data_map[data.ID] = {"state" : "Reserved", "ref_count" : 1} + self.data_map[data.ID] = {"state" : "Prefetching", "ref_count" : 1} #This is a new block, update the used memory size. self.used_memory += data.size self.prefetch_map[data] = data @@ -237,15 +238,10 @@ def _start_prefetch_data(self, data): assert(self.used_memory <= self.memory_limit) - ''' def _stop_prefetch_data(self, data): + self.data_map[data.ID]["state"] = "Reserved" - count = data.get_prefetch(self.device) - data.remove_prefetch(self.device) - - if count == 1: - del self.prefetch_map[data] - + ''' def _access_data(self, data): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. From ca9cecf63f7d8efbf02e55822b49f885eda9d1f9 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 6 Jan 2023 11:57:51 -0600 Subject: [PATCH 04/16] Implement a baseline of the LRU policy --- parla/tracking.py | 78 ++++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index 3840aab..b6da9ca 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -47,18 +47,35 @@ from typing import TypedDict, Dict +# TODO(hc): It should be declared on PArray. class DataNode: """ A node containing a data object (PArray) on a specific device. Used in the linked lists in the EvictionManager """ def __init__(self, data, device, priority=0): - self.data = data - self.device = device - self.priority = priority + self._data = data + self._device = device + self._priority = priority - self.next = None - self.prev = None + self._next = None + self._prev = None + + @property + def data(self): + return self._data + + @property + def device(self): + return self._device + + @property + def next(self): + return self._next + + @property + def prev(self): + return self._prev def __str__(self): return self.__repr__() @@ -222,7 +239,9 @@ def _start_prefetch_data(self, data): # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. # but this is very rare case and I am gonna leave it as the future work. - success = self.zr_data_list.remove(data) + + # TODO(hc): PArray should point to a corresponding data node. + success = self.zr_data_list.remove(data.ref_list_node) if success: #This is the first prefetch of a data object that is already on the device. @@ -233,17 +252,14 @@ def _start_prefetch_data(self, data): self.data_map[data.ID] = {"state" : "Prefetching", "ref_count" : 1} #This is a new block, update the used memory size. self.used_memory += data.size - self.prefetch_map[data] = data - self.active_map[data] = data - + #self.prefetch_map[data] = data + #self.active_map[data] = data assert(self.used_memory <= self.memory_limit) def _stop_prefetch_data(self, data): self.data_map[data.ID]["state"] = "Reserved" - ''' - def _access_data(self, data): - + def _acquire_data(self, data): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. # Any logic here would be a sanity check. I'm removing it for now. @@ -256,40 +272,39 @@ def _access_data(self, data): data.add_use(self.device) def _release_data(self, data): - node = self.data_map[data] + assert(data.ID in self.data_map) - active_count = data.get_active(self.device) - use_count = data.get_use(self.device) + self._decrease_ref_count(data) + assert(self.data_map[data.ID].ref_count >= 0) - data.remove_active(self.device) - data.remove_use(self.device) + #active_count = data.get_active(self.device) + #use_count = data.get_use(self.device) - if active_count == 1: - del self.active_map[data] + #data.remove_active(self.device) + #data.remove_use(self.device) + if self.data_map[data.ID].ref_count == 0: + #del self.active_map[data] #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + node = data.ref_list_node self.data_list.append(node) self.evictable_memory += data.nbytes - if use_count == 1: - del self.used_map[data] + #if use_count == 1: + #del self.used_map[data] def _evict_data(self, data): - node = self.data_map[data] - - assert(data.get_use(self.device) == 0) - assert(data.get_active(self.device) == 0) + assert(self.data_map[data.ID].ref_count == 0) #Call internal data object evict method #This should: # - Backup the data if its not in a SHARED state # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) # - Mark the data for deletion (this may be done by the CuPy/Python GC) - data.evict(self.device) - - self.data_list.remove(node) - del self.data_map[data] + data.evict() + self.zr_data_list.remove(data.ref_list_node) + del self.data_map[data.ID] self.used_memory -= data.nbytes self.evictable_memory -= data.nbytes @@ -297,9 +312,8 @@ def _evict(self): # Get the oldest data object # Because we append after use this is at the front of the list - node = self.data_list.head - self._evict_data(node) - ''' + node = self.zr_data_list.head + self._evict_data(node.data) ''' class LFUManager(EvictionManager): From f03240b03c441f0f11857e7364a0e267f4446137 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 6 Jan 2023 15:12:01 -0600 Subject: [PATCH 05/16] Add device information to the data node of the zero-referenced list --- parla/tracking.py | 131 +++++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 43 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index b6da9ca..4d3a908 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -104,6 +104,7 @@ def __repr__(self): return f"ListNode({self.list})" +# TODO(hc): This list should be protected by a lock. class DLList: """ A doubly linked list used in the EvictionManager. @@ -190,6 +191,7 @@ class DataMapType(TypedDict): # TODO(hc): state should be an enum type. state: str ref_count: int + ref_list_node: DataNode #class LRUManager(EvictionManager): @@ -206,9 +208,11 @@ def __init__(self, device, memory_limit): # A list containig zero-reference data objects in a specified device. self.zr_data_list = DLList() # A dictionary containing all data information on a device. - self.data_map = Dict[DataMapType] + self.data_dict = Dict[DataMapType] # A lock for guarding a reference count. - self.ref_count_lock = threading.Condition(threading.Lock()) + self._ref_count_lock = threading.Condition(threading.Lock()) + # A lock for guarding a data state. + self._data_state_lock = threading.Condition(threading.Lock()) #Note(wlr): These tracking dictionaries are optional, I just think it's interesting to track. #Holds data objects on this device that are being prefetched. @@ -220,18 +224,52 @@ def __init__(self, device, memory_limit): #holds data objects that are currently being used by tasks. self.used_map = {} - def _increase_ref_count(self, data): - with self.ref_count_lock: - assert(self.data_map[data.ID]["ref_count"] >= 0) - self.data_map[data.ID]["ref_count"] += 1 - - def _decrease_ref_count(self, data): - with self.ref_count_lock: - self.data_map[data.ID]["ref_count"] -= 1 - assert(self.data_map[data.ID]["ref_count"] >= 0) - - def _start_prefetch_data(self, data): - if data.ID in self.data_map: + def _increase_ref_count(self, data_info): + with self._ref_count_lock: + assert(data_info["ref_count"] >= 0) + data_info["ref_count"] += 1 + + def _decrease_ref_count(self, data_info): + with self._ref_count_lock: + data_info["ref_count"] -= 1 + assert(data_info["ref_count"] >= 0) + + def _check_ref_count_zero(self, data_info): + with self._ref_count_lock: + return data_info["ref_count"] == 0 + + def _update_data_state(self, data_id, new_state): + data_state = self.data_dict[data_id]["state"] + # prefetching, reserved, using, free + with self._data_state_lock: + if data_state == new_state: + return + if new_state == "prefetching": + if data_state == "free": + data_state = new_state + return + elif new_state == "reserved": + if data_state == "prefetching": + data_state = new_state + return + elif new_state == "using": + if data_state == "reserved": + data_state = new_state + return + elif new_state == "free": + if data_state != "prefetching": + assert(data_state != reserved) + data_state = new_state + return + + def _dict_id(self, data, dev): + """ Genereate an ID of a data on a data information dictionary. """ + dev_index = "G" + str(dev.index) if (dev.architecture is not cpu) else "C" + return str(data.ID + "." + dev_index) + + def _start_prefetch_data(self, data, dev): + data_id = self._dict_id(data, dev) + if data_id in self.data_dict: #This is a prefetch of a data object that is already on the device (or is being prefetched). #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. #Remove it from the evictable list. @@ -241,41 +279,48 @@ def _start_prefetch_data(self, data): # but this is very rare case and I am gonna leave it as the future work. # TODO(hc): PArray should point to a corresponding data node. - success = self.zr_data_list.remove(data.ref_list_node) + data_info = self.data_dict[data_id] + success = self.zr_data_list.remove(data_info.ref_list_node) if success: #This is the first prefetch of a data object that is already on the device. #Update the evictable memory size (as this data object is no longer evictable). self.evictable_memory -= data.size - self._increase_ref_count(data) + self._increase_ref_count(data_info) else: - self.data_map[data.ID] = {"state" : "Prefetching", "ref_count" : 1} + self.data_dict[data_id] = { "state" : "prefetching", \ + "ref_count" : 1, \ + "ref_list_node" : DataNode(data, dev) } #This is a new block, update the used memory size. self.used_memory += data.size #self.prefetch_map[data] = data #self.active_map[data] = data assert(self.used_memory <= self.memory_limit) - def _stop_prefetch_data(self, data): - self.data_map[data.ID]["state"] = "Reserved" + def _stop_prefetch_data(self, data, dev): + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + self._update_data_state(data_id, "reserved") - def _acquire_data(self, data): + def _acquire_data(self, data, dev): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. # Any logic here would be a sanity check. I'm removing it for now. - - #node = self.data_map.get(data, None) + #node = self.data_dict.get(data, None) #if node is not None: # #Remove the node from the eviction list while it's in use # success = self.data_list.remove(node) - - self.used_map[data] = data - data.add_use(self.device) - - def _release_data(self, data): - assert(data.ID in self.data_map) - - self._decrease_ref_count(data) - assert(self.data_map[data.ID].ref_count >= 0) + #self.used_map[data] = data + #data.add_use(self.device) + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + self._update_data_state(data_id, "using") + + def _release_data(self, data, dev): + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + data_info = self.data_dict[data_id] + self._decrease_ref_count(data_info) + assert(self._check_ref_count_zero(data_info)) #active_count = data.get_active(self.device) #use_count = data.get_use(self.device) @@ -283,37 +328,37 @@ def _release_data(self, data): #data.remove_active(self.device) #data.remove_use(self.device) - if self.data_map[data.ID].ref_count == 0: + if data_info["ref_count"] == 0: #del self.active_map[data] #If the data object is no longer needed by any already prefetched tasks, it can be evicted. - node = data.ref_list_node + node = data_info["ref_list_node"] self.data_list.append(node) self.evictable_memory += data.nbytes - #if use_count == 1: #del self.used_map[data] - def _evict_data(self, data): - assert(self.data_map[data.ID].ref_count == 0) - + def _evict_data(self, target_data, target_dev): + data_id = self._dict_id(target_data, target_dev) + data_info = self.data_dict[data_id] + assert(self._check_ref_count_zero(data_info)) #Call internal data object evict method #This should: # - Backup the data if its not in a SHARED state # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) # - Mark the data for deletion (this may be done by the CuPy/Python GC) - data.evict() - - self.zr_data_list.remove(data.ref_list_node) - del self.data_map[data.ID] + target_data.evict(target_dev) + self.zr_data_list.remove(data_info["ref_list_node"]) + del data_info self.used_memory -= data.nbytes self.evictable_memory -= data.nbytes def _evict(self): # Get the oldest data object # Because we append after use this is at the front of the list - node = self.zr_data_list.head - self._evict_data(node.data) + n_data = node.data + n_dev = node.dev + self._evict_data(n_data, n_dev) ''' class LFUManager(EvictionManager): From 187b78a05f8b551a63abf7a010070460f4e8af4f Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 01:07:55 -0600 Subject: [PATCH 06/16] Add log printing and debug codes --- parla/task_runtime.py | 32 +++++++++++++++++- parla/tracking.py | 77 +++++++++++++++++++++++++++++-------------- 2 files changed, 83 insertions(+), 26 deletions(-) diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 2c902ef..efb740d 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -17,6 +17,8 @@ from parla.cpu_impl import cpu from parla.dataflow import Dataflow +from parla.tracking import LRUManager + # Logger configuration (uncomment and adjust level if needed) #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -611,8 +613,27 @@ def _handle_dependency_spawn(self, dependency: "Task"): if self.num_unspawned_dependencies == 0: self._ready_to_map() + def acquire_parray(self): + ctx = get_scheduler_context() + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._acquire_data(parray, d) + + def release_parray(self): + ctx = get_scheduler_context() + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d) + def _execute_task(self): - return self._state.func(self, *self._state.args) + self.acquire_parray() + result = self._state.func(self, *self._state.args) + self.release_parray() + return result def cleanup(self): self._func = None @@ -673,7 +694,10 @@ def _execute_task(self): dev_no = -1 if (dev_type.architecture is not cpu): dev_no = dev_type.index + ctx = get_scheduler_context() + ctx.scheduler.lrum._start_prefetch_data(self._target_data, dev_type) self._target_data._auto_move(device_id=dev_no, do_write=write_flag) + ctx.scheduler.lrum._stop_prefetch_data(self._target_data, dev_type) return TaskCompleted(None) def cleanup(self): @@ -1479,6 +1503,8 @@ def __init__(self, environments: Collection[TaskEnvironment], n_threads: Optiona self._device_launched_datamove_task_counts = { dev: 0 for dev in self._available_resources.get_resources()} + self._lrum = LRUManager() + # Dictionary mapping data block to task lists. self._datablock_dict = defaultdict(list) @@ -1506,6 +1532,10 @@ def components(self) -> List[EnvironmentComponentInstance]: def scheduler(self): return self + @ property + def lrum(self): + return self._lrum + def __enter__(self): if self._active_task_count != 1: raise InvalidSchedulerAccessException( diff --git a/parla/tracking.py b/parla/tracking.py index 4d3a908..38046a3 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -46,6 +46,7 @@ # - wrap in locks (use external locks on base class) from typing import TypedDict, Dict +from parla.cpu_impl import cpu # TODO(hc): It should be declared on PArray. class DataNode: @@ -58,8 +59,8 @@ def __init__(self, data, device, priority=0): self._device = device self._priority = priority - self._next = None - self._prev = None + self.next = None + self.prev = None @property def data(self): @@ -69,14 +70,6 @@ def data(self): def device(self): return self._device - @property - def next(self): - return self._next - - @property - def prev(self): - return self._prev - def __str__(self): return self.__repr__() @@ -112,6 +105,8 @@ class DLList: def __init__(self): self.head = None self.tail = None + self.next = None + self.prev = None self.length = 0 def __str__(self): @@ -183,6 +178,15 @@ def insert_after(self, node, new_node): def __len__(self): return self.length + def __repr__(self): + repr_str = ":\n" + tmp_node = self.head + while (tmp_node != None): + repr_str += str(id(tmp_node)) + " -> " + tmp_node = tmp_node.next + repr_str += "\n" + return repr_str + class DataMapType(TypedDict): """ @@ -203,12 +207,12 @@ class LRUManager: and the tail of the list is the data used most recently. """ - def __init__(self, device, memory_limit): - super().__init__(device, memory_limit) + def __init__(self, memory_limit = 999999): +#super().__init__(device, memory_limit) # A list containig zero-reference data objects in a specified device. self.zr_data_list = DLList() # A dictionary containing all data information on a device. - self.data_dict = Dict[DataMapType] + self.data_dict: Dict[str, DataMapType] = {} # A lock for guarding a reference count. self._ref_count_lock = threading.Condition(threading.Lock()) # A lock for guarding a data state. @@ -236,12 +240,15 @@ def _decrease_ref_count(self, data_info): def _check_ref_count_zero(self, data_info): with self._ref_count_lock: + print("Check:", data_info["ref_count"], flush=True) return data_info["ref_count"] == 0 def _update_data_state(self, data_id, new_state): data_state = self.data_dict[data_id]["state"] # prefetching, reserved, using, free with self._data_state_lock: + print(f"[GC] Data (ID: {data_id})'s state is updated from "+ + f"{data_state} to {new_state}", flush=True) if data_state == new_state: return if new_state == "prefetching": @@ -265,7 +272,7 @@ def _update_data_state(self, data_id, new_state): def _dict_id(self, data, dev): """ Genereate an ID of a data on a data information dictionary. """ dev_index = "G" + str(dev.index) if (dev.architecture is not cpu) else "C" - return str(data.ID + "." + dev_index) + return str(data.ID) + "." + dev_index def _start_prefetch_data(self, data, dev): data_id = self._dict_id(data, dev) @@ -280,27 +287,39 @@ def _start_prefetch_data(self, data, dev): # TODO(hc): PArray should point to a corresponding data node. data_info = self.data_dict[data_id] - success = self.zr_data_list.remove(data_info.ref_list_node) + success = self.zr_data_list.remove(data_info["ref_list_node"]) + self._update_data_state(data_id, "prefetching") - if success: + #if success: #This is the first prefetch of a data object that is already on the device. #Update the evictable memory size (as this data object is no longer evictable). - self.evictable_memory -= data.size + #self.evictable_memory -= data.size self._increase_ref_count(data_info) + print(f"[GC] Existing data (ID: {data_id}) is updated through prefetching"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) else: self.data_dict[data_id] = { "state" : "prefetching", \ "ref_count" : 1, \ "ref_list_node" : DataNode(data, dev) } + print(f"[GC] New data (ID: {data_id}) is added through prefetching"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] Zero-referenced list after prefetching data: \n{self.zr_data_list}", flush=True) #This is a new block, update the used memory size. - self.used_memory += data.size + #self.used_memory += data.size #self.prefetch_map[data] = data #self.active_map[data] = data - assert(self.used_memory <= self.memory_limit) + #assert(self.used_memory <= self.memory_limit) def _stop_prefetch_data(self, data, dev): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) self._update_data_state(data_id, "reserved") + print(f"[GC] Existing data (ID: {data_id}) is updated as prefetching completes"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + def _acquire_data(self, data, dev): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. @@ -314,13 +333,16 @@ def _acquire_data(self, data, dev): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) self._update_data_state(data_id, "using") + print(f"[GC] Existing data (ID: {data_id}) is updated as a compute task acquires"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + def _release_data(self, data, dev): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) data_info = self.data_dict[data_id] self._decrease_ref_count(data_info) - assert(self._check_ref_count_zero(data_info)) #active_count = data.get_active(self.device) #use_count = data.get_use(self.device) @@ -329,14 +351,19 @@ def _release_data(self, data, dev): #data.remove_use(self.device) if data_info["ref_count"] == 0: + assert(self._check_ref_count_zero(data_info)) #del self.active_map[data] #If the data object is no longer needed by any already prefetched tasks, it can be evicted. node = data_info["ref_list_node"] - self.data_list.append(node) - self.evictable_memory += data.nbytes + self.zr_data_list.append(node) + #self.evictable_memory += data.nbytes #if use_count == 1: #del self.used_map[data] - + print(f"[GC] Existing data (ID: {data_id}) is updated as a compute task releases"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] Zero-referenced list after releasing data: \n{self.zr_data_list}", flush=True) + def _evict_data(self, target_data, target_dev): data_id = self._dict_id(target_data, target_dev) data_info = self.data_dict[data_id] @@ -349,8 +376,8 @@ def _evict_data(self, target_data, target_dev): target_data.evict(target_dev) self.zr_data_list.remove(data_info["ref_list_node"]) del data_info - self.used_memory -= data.nbytes - self.evictable_memory -= data.nbytes + #self.used_memory -= data.nbytes + #self.evictable_memory -= data.nbytes def _evict(self): # Get the oldest data object From ff623e778d0b1d929952956a89992990458d98ab Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 01:13:37 -0600 Subject: [PATCH 07/16] Add a list lock that guards modifying list --- parla/tracking.py | 108 ++++++++++++++++++++++++---------------------- 1 file changed, 57 insertions(+), 51 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index 38046a3..79b03c4 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -108,6 +108,7 @@ def __init__(self): self.next = None self.prev = None self.length = 0 + self._list_lock = threading.Condition(threading.Lock()) def __str__(self): return self.__repr__() @@ -116,76 +117,81 @@ def __repr__(self): return f"DLList({self.head}, {self.tail})" def append(self, node): - if self.head is None: - self.head = node - self.tail = node - else: - self.tail.next = node - node.prev = self.tail - self.tail = node - - self.length += 1 + with self._list_lock: + if self.head is None: + self.head = node + self.tail = node + else: + self.tail.next = node + node.prev = self.tail + self.tail = node + self.length += 1 def remove(self, node): - edit = False + with self._list_lock: + edit = False - if self.head == node: - self.head = node.next - edit = True + if self.head == node: + self.head = node.next + edit = True - if self.tail == node: - self.tail = node.prev - edit = True + if self.tail == node: + self.tail = node.prev + edit = True - if node.prev is not None: - node.prev.next = node.next - edit = True + if node.prev is not None: + node.prev.next = node.next + edit = True - if node.next is not None: - node.next.prev = node.prev - edit = True + if node.next is not None: + node.next.prev = node.prev + edit = True - node.prev = None - node.next = None + node.prev = None + node.next = None - if edit: - self.length -= 1 + if edit: + self.length -= 1 - return edit + return edit def insert_before(self, node, new_node): - if node.prev is not None: - node.prev.next = new_node - new_node.prev = node.prev - else: - self.head = new_node - node.prev = new_node - new_node.next = node + with self._list_lock: + if node.prev is not None: + node.prev.next = new_node + new_node.prev = node.prev + else: + self.head = new_node + node.prev = new_node + new_node.next = node - self.length += 1 + self.length += 1 def insert_after(self, node, new_node): - if node.next is not None: - node.next.prev = new_node - new_node.next = node.next - else: - self.tail = new_node - node.next = new_node - new_node.prev = node + with self._list_lock: + if node.next is not None: + node.next.prev = new_node + new_node.next = node.next + else: + self.tail = new_node + node.next = new_node + new_node.prev = node - self.length += 1 + self.length += 1 def __len__(self): - return self.length + with self._list_lock: + return self.length def __repr__(self): - repr_str = ":\n" - tmp_node = self.head - while (tmp_node != None): - repr_str += str(id(tmp_node)) + " -> " - tmp_node = tmp_node.next - repr_str += "\n" - return repr_str + with self._list_lock: + repr_str = ":\n" + tmp_node = self.head + while (tmp_node != None): + repr_str += str(id(tmp_node)) + " -> " + tmp_node = tmp_node.next + repr_str += "\n" + return repr_str class DataMapType(TypedDict): From 1058d8a1db891a7f2bf8bf02f36bff315133e5d3 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 01:34:59 -0600 Subject: [PATCH 08/16] Add enum for data states --- parla/tracking.py | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index 79b03c4..abcbef0 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -45,9 +45,11 @@ #Needs: # - wrap in locks (use external locks on base class) +from enum import Enum from typing import TypedDict, Dict from parla.cpu_impl import cpu + # TODO(hc): It should be declared on PArray. class DataNode: """ @@ -194,12 +196,22 @@ def __repr__(self): return repr_str +class GCDataState(Enum): + """ + Enum of data states. + """ + PREFETCHING = "Prefetching" # Data is being prefetched. + RESERVED = "Reserved" # Data's ref. count is >= 1, but not acquired. + ACQUIRED = "Acquired" # A task is using data. + FREE = "Free" # None of tasks (mapped/running) does not need data. + + class DataMapType(TypedDict): """ Track information of data instance in a device """ # TODO(hc): state should be an enum type. - state: str + state: GCDataState ref_count: int ref_list_node: DataNode @@ -257,22 +269,22 @@ def _update_data_state(self, data_id, new_state): f"{data_state} to {new_state}", flush=True) if data_state == new_state: return - if new_state == "prefetching": - if data_state == "free": + if new_state == GCDataState.PREFETCHING: + if data_state == GCDataState.FREE: data_state = new_state return - elif new_state == "reserved": - if data_state == "prefetching": + elif new_state == GCDataState.RESERVED: + if data_state == GCDataState.PREFETCHING or \ + data_state == GCDataState.FREE: data_state = new_state return - elif new_state == "using": - if data_state == "reserved": - data_state = new_state + elif new_state == GCDataState.ACQUIRED: + assert(data_state == GCDataState.ACQUIRED) + data_state = new_state return - elif new_state == "free": - if data_state != "prefetching": - assert(data_state != reserved) - data_state = new_state + elif new_state == GCDataState.FREE: + assert(data_state == GCDataState.ACQUIRED) + data_state = new_state return def _dict_id(self, data, dev): @@ -294,7 +306,7 @@ def _start_prefetch_data(self, data, dev): # TODO(hc): PArray should point to a corresponding data node. data_info = self.data_dict[data_id] success = self.zr_data_list.remove(data_info["ref_list_node"]) - self._update_data_state(data_id, "prefetching") + self._update_data_state(data_id, GCDataState.PREFETCHING) #if success: #This is the first prefetch of a data object that is already on the device. @@ -305,7 +317,7 @@ def _start_prefetch_data(self, data, dev): f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) else: - self.data_dict[data_id] = { "state" : "prefetching", \ + self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ "ref_count" : 1, \ "ref_list_node" : DataNode(data, dev) } print(f"[GC] New data (ID: {data_id}) is added through prefetching"+ @@ -321,7 +333,7 @@ def _start_prefetch_data(self, data, dev): def _stop_prefetch_data(self, data, dev): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) - self._update_data_state(data_id, "reserved") + self._update_data_state(data_id, GCDataState.RESERVED) print(f"[GC] Existing data (ID: {data_id}) is updated as prefetching completes"+ f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) @@ -338,7 +350,7 @@ def _acquire_data(self, data, dev): #data.add_use(self.device) data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) - self._update_data_state(data_id, "using") + self._update_data_state(data_id, GCDataState.ACQUIRED) print(f"[GC] Existing data (ID: {data_id}) is updated as a compute task acquires"+ f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) From 849f2a49aa287c6335f69309310ccd57cf7194ee Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 13:11:50 -0600 Subject: [PATCH 09/16] debug --- parla/task_runtime.py | 8 +- parla/tracking.py | 230 ++++++++++++++++++++++-------------------- 2 files changed, 122 insertions(+), 116 deletions(-) diff --git a/parla/task_runtime.py b/parla/task_runtime.py index efb740d..4a35435 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -619,7 +619,7 @@ def acquire_parray(self): self.dataflow.inout + \ self.dataflow.output): for d in self.req.devices: - ctx.scheduler.lrum._acquire_data(parray, d) + ctx.scheduler.lrum._acquire_data(parray, d, str(self.taskid)) def release_parray(self): ctx = get_scheduler_context() @@ -627,7 +627,7 @@ def release_parray(self): self.dataflow.inout + \ self.dataflow.output): for d in self.req.devices: - ctx.scheduler.lrum._release_data(parray, d) + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) def _execute_task(self): self.acquire_parray() @@ -695,9 +695,9 @@ def _execute_task(self): if (dev_type.architecture is not cpu): dev_no = dev_type.index ctx = get_scheduler_context() - ctx.scheduler.lrum._start_prefetch_data(self._target_data, dev_type) + ctx.scheduler.lrum._start_prefetch_data(self._target_data, dev_type, str(self.taskid)) self._target_data._auto_move(device_id=dev_no, do_write=write_flag) - ctx.scheduler.lrum._stop_prefetch_data(self._target_data, dev_type) + ctx.scheduler.lrum._stop_prefetch_data(self._target_data, dev_type, str(self.taskid)) return TaskCompleted(None) def cleanup(self): diff --git a/parla/tracking.py b/parla/tracking.py index abcbef0..da1b650 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -247,99 +247,100 @@ def __init__(self, memory_limit = 999999): self.used_map = {} def _increase_ref_count(self, data_info): - with self._ref_count_lock: - assert(data_info["ref_count"] >= 0) - data_info["ref_count"] += 1 + assert(data_info["ref_count"] >= 0) + data_info["ref_count"] += 1 def _decrease_ref_count(self, data_info): - with self._ref_count_lock: - data_info["ref_count"] -= 1 - assert(data_info["ref_count"] >= 0) + data_info["ref_count"] -= 1 + assert(data_info["ref_count"] >= 0) def _check_ref_count_zero(self, data_info): - with self._ref_count_lock: - print("Check:", data_info["ref_count"], flush=True) - return data_info["ref_count"] == 0 + print("Check:", data_info["ref_count"], flush=True) + return data_info["ref_count"] == 0 - def _update_data_state(self, data_id, new_state): - data_state = self.data_dict[data_id]["state"] + def _update_data_state(self, data_id, new_state, taskid): # prefetching, reserved, using, free - with self._data_state_lock: - print(f"[GC] Data (ID: {data_id})'s state is updated from "+ - f"{data_state} to {new_state}", flush=True) - if data_state == new_state: - return - if new_state == GCDataState.PREFETCHING: - if data_state == GCDataState.FREE: - data_state = new_state - return - elif new_state == GCDataState.RESERVED: - if data_state == GCDataState.PREFETCHING or \ - data_state == GCDataState.FREE: - data_state = new_state - return - elif new_state == GCDataState.ACQUIRED: - assert(data_state == GCDataState.ACQUIRED) - data_state = new_state - return - elif new_state == GCDataState.FREE: - assert(data_state == GCDataState.ACQUIRED) - data_state = new_state - return + data_info = self.data_dict[data_id] + data_state = data_info["state"] + print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ + f"{data_state} to {new_state}", flush=True) + if data_state == new_state: + return + if new_state == GCDataState.PREFETCHING: + if data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.RESERVED: + if data_state == GCDataState.PREFETCHING or \ + data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.ACQUIRED: + print(">>>>> ", data_state, flush=True) + assert(data_state == GCDataState.RESERVED) + data_info["state"] = new_state + return + elif new_state == GCDataState.FREE: + assert(data_state == GCDataState.ACQUIRED) + data_info["state"] = new_state + return def _dict_id(self, data, dev): """ Genereate an ID of a data on a data information dictionary. """ dev_index = "G" + str(dev.index) if (dev.architecture is not cpu) else "C" return str(data.ID) + "." + dev_index - def _start_prefetch_data(self, data, dev): + def _start_prefetch_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) - if data_id in self.data_dict: - #This is a prefetch of a data object that is already on the device (or is being prefetched). - #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. - #Remove it from the evictable list. - - # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. - # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. - # but this is very rare case and I am gonna leave it as the future work. - - # TODO(hc): PArray should point to a corresponding data node. - data_info = self.data_dict[data_id] - success = self.zr_data_list.remove(data_info["ref_list_node"]) - self._update_data_state(data_id, GCDataState.PREFETCHING) - - #if success: - #This is the first prefetch of a data object that is already on the device. - #Update the evictable memory size (as this data object is no longer evictable). - #self.evictable_memory -= data.size - self._increase_ref_count(data_info) - print(f"[GC] Existing data (ID: {data_id}) is updated through prefetching"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - else: - self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ - "ref_count" : 1, \ - "ref_list_node" : DataNode(data, dev) } - print(f"[GC] New data (ID: {data_id}) is added through prefetching"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - print(f"[GC] Zero-referenced list after prefetching data: \n{self.zr_data_list}", flush=True) + with self._data_state_lock: + if data_id in self.data_dict: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + #Remove it from the evictable list. + + # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. + # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. + # but this is very rare case and I am gonna leave it as the future work. + + # TODO(hc): PArray should point to a corresponding data node. + data_info = self.data_dict[data_id] + success = self.zr_data_list.remove(data_info["ref_list_node"]) + self._update_data_state(data_id, GCDataState.PREFETCHING, taskid) + + #if success: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + #self.evictable_memory -= data.size + self._increase_ref_count(data_info) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + else: + self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ + "ref_count" : 1, \ + "ref_list_node" : DataNode(data, dev) } + print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ + f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] (Task: {taskid}) Zero-referenced list after prefetching data: "+ + f"\n{self.zr_data_list}", flush=True) #This is a new block, update the used memory size. #self.used_memory += data.size #self.prefetch_map[data] = data #self.active_map[data] = data #assert(self.used_memory <= self.memory_limit) - def _stop_prefetch_data(self, data, dev): + def _stop_prefetch_data(self, data, dev, taskid=""): data_id = self._dict_id(data, dev) - assert(data_id in self.data_dict) - self._update_data_state(data_id, GCDataState.RESERVED) - print(f"[GC] Existing data (ID: {data_id}) is updated as prefetching completes"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + with self._data_state_lock: + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.RESERVED, taskid) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - def _acquire_data(self, data, dev): + def _acquire_data(self, data, dev, taskid = ""): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. # Any logic here would be a sanity check. I'm removing it for now. #node = self.data_dict.get(data, None) @@ -349,53 +350,58 @@ def _acquire_data(self, data, dev): #self.used_map[data] = data #data.add_use(self.device) data_id = self._dict_id(data, dev) - assert(data_id in self.data_dict) - self._update_data_state(data_id, GCDataState.ACQUIRED) - print(f"[GC] Existing data (ID: {data_id}) is updated as a compute task acquires"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + with self._data_state_lock: + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.ACQUIRED, taskid) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"a compute task acquires"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - def _release_data(self, data, dev): + def _release_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) - assert(data_id in self.data_dict) - data_info = self.data_dict[data_id] - self._decrease_ref_count(data_info) - - #active_count = data.get_active(self.device) - #use_count = data.get_use(self.device) - - #data.remove_active(self.device) - #data.remove_use(self.device) - - if data_info["ref_count"] == 0: - assert(self._check_ref_count_zero(data_info)) - #del self.active_map[data] - #If the data object is no longer needed by any already prefetched tasks, it can be evicted. - node = data_info["ref_list_node"] - self.zr_data_list.append(node) - #self.evictable_memory += data.nbytes - #if use_count == 1: - #del self.used_map[data] - print(f"[GC] Existing data (ID: {data_id}) is updated as a compute task releases"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - print(f"[GC] Zero-referenced list after releasing data: \n{self.zr_data_list}", flush=True) + with self._data_state_lock: + assert(data_id in self.data_dict) + data_info = self.data_dict[data_id] + self._decrease_ref_count(data_info) + + #active_count = data.get_active(self.device) + #use_count = data.get_use(self.device) + + #data.remove_active(self.device) + #data.remove_use(self.device) + + if data_info["ref_count"] == 0: + assert(self._check_ref_count_zero(data_info)) + #del self.active_map[data] + #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + node = data_info["ref_list_node"] + self.zr_data_list.append(node) + #self.evictable_memory += data.nbytes + #if use_count == 1: + #del self.used_map[data] + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ + f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ + f"\n{self.zr_data_list}", flush=True) def _evict_data(self, target_data, target_dev): data_id = self._dict_id(target_data, target_dev) - data_info = self.data_dict[data_id] - assert(self._check_ref_count_zero(data_info)) - #Call internal data object evict method - #This should: - # - Backup the data if its not in a SHARED state - # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) - # - Mark the data for deletion (this may be done by the CuPy/Python GC) - target_data.evict(target_dev) - self.zr_data_list.remove(data_info["ref_list_node"]) - del data_info - #self.used_memory -= data.nbytes - #self.evictable_memory -= data.nbytes + with self._data_state_lock: + data_info = self.data_dict[data_id] + assert(self._check_ref_count_zero(data_info)) + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + target_data.evict(target_dev) + self.zr_data_list.remove(data_info["ref_list_node"]) + del data_info + #self.used_memory -= data.nbytes + #self.evictable_memory -= data.nbytes def _evict(self): # Get the oldest data object From da7dd271664e04b48fbb9d3a7bafbfc8d06364b5 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 13:18:16 -0600 Subject: [PATCH 10/16] Use separate locks for guarding data information. --- parla/tracking.py | 223 +++++++++++++++++++++++----------------------- 1 file changed, 111 insertions(+), 112 deletions(-) diff --git a/parla/tracking.py b/parla/tracking.py index da1b650..3269a54 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -231,10 +231,10 @@ def __init__(self, memory_limit = 999999): self.zr_data_list = DLList() # A dictionary containing all data information on a device. self.data_dict: Dict[str, DataMapType] = {} - # A lock for guarding a reference count. - self._ref_count_lock = threading.Condition(threading.Lock()) # A lock for guarding a data state. self._data_state_lock = threading.Condition(threading.Lock()) + # A lock for guarding a reference count. + self._data_ref_count_lock = threading.Condition(threading.Lock()) #Note(wlr): These tracking dictionaries are optional, I just think it's interesting to track. #Holds data objects on this device that are being prefetched. @@ -247,43 +247,47 @@ def __init__(self, memory_limit = 999999): self.used_map = {} def _increase_ref_count(self, data_info): - assert(data_info["ref_count"] >= 0) - data_info["ref_count"] += 1 + with self._data_ref_count_lock: + assert(data_info["ref_count"] >= 0) + data_info["ref_count"] += 1 def _decrease_ref_count(self, data_info): - data_info["ref_count"] -= 1 - assert(data_info["ref_count"] >= 0) + with self._data_ref_count_lock: + data_info["ref_count"] -= 1 + assert(data_info["ref_count"] >= 0) def _check_ref_count_zero(self, data_info): - print("Check:", data_info["ref_count"], flush=True) - return data_info["ref_count"] == 0 + with self._data_ref_count_lock: + print("Check:", data_info["ref_count"], flush=True) + return data_info["ref_count"] == 0 def _update_data_state(self, data_id, new_state, taskid): - # prefetching, reserved, using, free - data_info = self.data_dict[data_id] - data_state = data_info["state"] - print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ - f"{data_state} to {new_state}", flush=True) - if data_state == new_state: - return - if new_state == GCDataState.PREFETCHING: - if data_state == GCDataState.FREE: + with self._data_state_lock: + # prefetching, reserved, using, free + data_info = self.data_dict[data_id] + data_state = data_info["state"] + print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ + f"{data_state} to {new_state}", flush=True) + if data_state == new_state: + return + if new_state == GCDataState.PREFETCHING: + if data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.RESERVED: + if data_state == GCDataState.PREFETCHING or \ + data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.ACQUIRED: + print(">>>>> ", data_state, flush=True) + assert(data_state == GCDataState.RESERVED) data_info["state"] = new_state - return - elif new_state == GCDataState.RESERVED: - if data_state == GCDataState.PREFETCHING or \ - data_state == GCDataState.FREE: + return + elif new_state == GCDataState.FREE: + assert(data_state == GCDataState.ACQUIRED) data_info["state"] = new_state - return - elif new_state == GCDataState.ACQUIRED: - print(">>>>> ", data_state, flush=True) - assert(data_state == GCDataState.RESERVED) - data_info["state"] = new_state - return - elif new_state == GCDataState.FREE: - assert(data_state == GCDataState.ACQUIRED) - data_info["state"] = new_state - return + return def _dict_id(self, data, dev): """ Genereate an ID of a data on a data information dictionary. """ @@ -292,36 +296,35 @@ def _dict_id(self, data, dev): def _start_prefetch_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) - with self._data_state_lock: - if data_id in self.data_dict: - #This is a prefetch of a data object that is already on the device (or is being prefetched). - #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. - #Remove it from the evictable list. - - # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. - # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. - # but this is very rare case and I am gonna leave it as the future work. - - # TODO(hc): PArray should point to a corresponding data node. - data_info = self.data_dict[data_id] - success = self.zr_data_list.remove(data_info["ref_list_node"]) - self._update_data_state(data_id, GCDataState.PREFETCHING, taskid) - - #if success: - #This is the first prefetch of a data object that is already on the device. - #Update the evictable memory size (as this data object is no longer evictable). - #self.evictable_memory -= data.size - self._increase_ref_count(data_info) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ - f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - else: - self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ - "ref_count" : 1, \ - "ref_list_node" : DataNode(data, dev) } - print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ - f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if data_id in self.data_dict: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + #Remove it from the evictable list. + + # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. + # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. + # but this is very rare case and I am gonna leave it as the future work. + + # TODO(hc): PArray should point to a corresponding data node. + data_info = self.data_dict[data_id] + success = self.zr_data_list.remove(data_info["ref_list_node"]) + self._update_data_state(data_id, GCDataState.PREFETCHING, taskid) + + #if success: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + #self.evictable_memory -= data.size + self._increase_ref_count(data_info) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + else: + self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ + "ref_count" : 1, \ + "ref_list_node" : DataNode(data, dev) } + print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ + f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) print(f"[GC] (Task: {taskid}) Zero-referenced list after prefetching data: "+ f"\n{self.zr_data_list}", flush=True) #This is a new block, update the used memory size. @@ -332,12 +335,11 @@ def _start_prefetch_data(self, data, dev, taskid = ""): def _stop_prefetch_data(self, data, dev, taskid=""): data_id = self._dict_id(data, dev) - with self._data_state_lock: - assert(data_id in self.data_dict) - self._update_data_state(data_id, GCDataState.RESERVED, taskid) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ - f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.RESERVED, taskid) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) def _acquire_data(self, data, dev, taskid = ""): @@ -350,58 +352,55 @@ def _acquire_data(self, data, dev, taskid = ""): #self.used_map[data] = data #data.add_use(self.device) data_id = self._dict_id(data, dev) - with self._data_state_lock: - assert(data_id in self.data_dict) - self._update_data_state(data_id, GCDataState.ACQUIRED, taskid) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ - f"a compute task acquires"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.ACQUIRED, taskid) + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"a compute task acquires"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) def _release_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) - with self._data_state_lock: - assert(data_id in self.data_dict) - data_info = self.data_dict[data_id] - self._decrease_ref_count(data_info) - - #active_count = data.get_active(self.device) - #use_count = data.get_use(self.device) - - #data.remove_active(self.device) - #data.remove_use(self.device) - - if data_info["ref_count"] == 0: - assert(self._check_ref_count_zero(data_info)) - #del self.active_map[data] - #If the data object is no longer needed by any already prefetched tasks, it can be evicted. - node = data_info["ref_list_node"] - self.zr_data_list.append(node) - #self.evictable_memory += data.nbytes - #if use_count == 1: - #del self.used_map[data] - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ - f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ - f"\n{self.zr_data_list}", flush=True) + assert(data_id in self.data_dict) + data_info = self.data_dict[data_id] + self._decrease_ref_count(data_info) + + #active_count = data.get_active(self.device) + #use_count = data.get_use(self.device) + + #data.remove_active(self.device) + #data.remove_use(self.device) + + if data_info["ref_count"] == 0: + assert(self._check_ref_count_zero(data_info)) + #del self.active_map[data] + #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + node = data_info["ref_list_node"] + self.zr_data_list.append(node) + #self.evictable_memory += data.nbytes + #if use_count == 1: + #del self.used_map[data] + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ + f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ + f"\n{self.zr_data_list}", flush=True) def _evict_data(self, target_data, target_dev): data_id = self._dict_id(target_data, target_dev) - with self._data_state_lock: - data_info = self.data_dict[data_id] - assert(self._check_ref_count_zero(data_info)) - #Call internal data object evict method - #This should: - # - Backup the data if its not in a SHARED state - # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) - # - Mark the data for deletion (this may be done by the CuPy/Python GC) - target_data.evict(target_dev) - self.zr_data_list.remove(data_info["ref_list_node"]) - del data_info - #self.used_memory -= data.nbytes - #self.evictable_memory -= data.nbytes + data_info = self.data_dict[data_id] + assert(self._check_ref_count_zero(data_info)) + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + target_data.evict(target_dev) + self.zr_data_list.remove(data_info["ref_list_node"]) + del data_info + #self.used_memory -= data.nbytes + #self.evictable_memory -= data.nbytes def _evict(self): # Get the oldest data object From 34fda4a9b869a58e512a49f1e0906157394e5f4d Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sat, 7 Jan 2023 15:15:30 -0600 Subject: [PATCH 11/16] Add eager GC eviction whenever a task completes --- parla/task_runtime.py | 1 + parla/tracking.py | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 4a35435..6c0033c 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -628,6 +628,7 @@ def release_parray(self): self.dataflow.output): for d in self.req.devices: ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + ctx.scheduler.lrum._evict() def _execute_task(self): self.acquire_parray() diff --git a/parla/tracking.py b/parla/tracking.py index 3269a54..7632c9c 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -258,7 +258,6 @@ def _decrease_ref_count(self, data_info): def _check_ref_count_zero(self, data_info): with self._data_ref_count_lock: - print("Check:", data_info["ref_count"], flush=True) return data_info["ref_count"] == 0 def _update_data_state(self, data_id, new_state, taskid): @@ -280,7 +279,6 @@ def _update_data_state(self, data_id, new_state, taskid): data_info["state"] = new_state return elif new_state == GCDataState.ACQUIRED: - print(">>>>> ", data_state, flush=True) assert(data_state == GCDataState.RESERVED) data_info["state"] = new_state return @@ -341,7 +339,6 @@ def _stop_prefetch_data(self, data, dev, taskid=""): f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - def _acquire_data(self, data, dev, taskid = ""): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. # Any logic here would be a sanity check. I'm removing it for now. @@ -359,7 +356,6 @@ def _acquire_data(self, data, dev, taskid = ""): f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - def _release_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) @@ -378,6 +374,7 @@ def _release_data(self, data, dev, taskid = ""): #If the data object is no longer needed by any already prefetched tasks, it can be evicted. node = data_info["ref_list_node"] self.zr_data_list.append(node) + print(">>>>>>>>>>>>>>>>>>>> ", type(node), " and ", type(node.data)) #self.evictable_memory += data.nbytes #if use_count == 1: #del self.used_map[data] @@ -390,13 +387,16 @@ def _release_data(self, data, dev, taskid = ""): def _evict_data(self, target_data, target_dev): data_id = self._dict_id(target_data, target_dev) data_info = self.data_dict[data_id] + dev_id = -1 if target_dev.architecture is cpu else target_dev.index + print(f"[GC] Zero-referenced data (ID: {data_id}) is evicted", flush=True) assert(self._check_ref_count_zero(data_info)) #Call internal data object evict method #This should: # - Backup the data if its not in a SHARED state # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) # - Mark the data for deletion (this may be done by the CuPy/Python GC) - target_data.evict(target_dev) + print("type:", type(target_data)) + target_data.evict(dev_id) self.zr_data_list.remove(data_info["ref_list_node"]) del data_info #self.used_memory -= data.nbytes @@ -405,10 +405,12 @@ def _evict_data(self, target_data, target_dev): def _evict(self): # Get the oldest data object # Because we append after use this is at the front of the list - node = self.zr_data_list.head - n_data = node.data - n_dev = node.dev - self._evict_data(n_data, n_dev) + while self.zr_data_list.head != None: + node = self.zr_data_list.head + n_data = node.data + n_dev = node.device + self._evict_data(n_data, n_dev) + ''' class LFUManager(EvictionManager): From ba8651b1938a2d68280f0cbe42aa1e503d84263f Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sun, 8 Jan 2023 11:10:01 -0600 Subject: [PATCH 12/16] Temporary bug fixes --- parla/parray/core.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parla/parray/core.py b/parla/parray/core.py index 0b469a8..742a9e9 100644 --- a/parla/parray/core.py +++ b/parla/parray/core.py @@ -206,8 +206,8 @@ def update(self, array) -> None: # update shape self._array.shape = array.shape - - cupy.cuda.stream.get_current_stream().synchronize() + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() # slicing/indexing @@ -282,8 +282,7 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None: with self._coherence_cv[device_id]: operations = self._coherence.evict(device_id, keep_one_copy) - for op in operations: - self._process_operation(op) + self._process_operations(operations) # Coherence update operations: @@ -362,7 +361,8 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT self._array.copy_data_between_device(op.dst, op.src, dst_is_current_device) # sync stream before set it as ready, so asyc call is ensured to be done - cupy.cuda.stream.get_current_stream().synchronize() + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() # data is ready now if MemoryOperation.LOAD_SUBARRAY in op.flag: From e87d7f4acef8fa09cf3b62f9e780608eb757686c Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Sun, 8 Jan 2023 13:09:11 -0600 Subject: [PATCH 13/16] Add a test for garbage collecting --- tests/test_gc.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 tests/test_gc.py diff --git a/tests/test_gc.py b/tests/test_gc.py new file mode 100644 index 0000000..f3d284b --- /dev/null +++ b/tests/test_gc.py @@ -0,0 +1,37 @@ +# Import Parla +from parla import Parla, spawn, parray, TaskSpace +# Import the 'cpu' device type +from parla.cpu import cpu +from parla.cuda import gpu + +import parla.tracking + +import numpy as np + +import time + +def main(): + A = parray.asarray(np.random.rand(10000, 10000)) + B = parray.asarray(np.random.rand(10000, 10000)) + t = TaskSpace("AxB") + + for i in range(0, 4): + # Spawn a task to be scheduled by the Parla runtime + @spawn(t[i], input=[A, B], placement=gpu(i)) + def axb(): + print(f"{A} >>>> ", flush=True) + C = A @ B + print("C is ", type(C)) + time.sleep(10) + + @spawn(dependencies=[t]) + def apb(): + print("apb", flush=True) + time.sleep(20) + + + +# Execute the Parla program within the Parla context +if __name__ == "__main__": + with Parla(): + main() From cd04f4c13a98df690afcb2c8a3af6c3f73103c9a Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Tue, 10 Jan 2023 01:16:13 -0600 Subject: [PATCH 14/16] Add logs and fix data list manimulation --- parla/parray/core.py | 15 +++ parla/parray/memory.py | 22 +++++ parla/task_runtime.py | 47 +++++---- parla/tracking.py | 213 +++++++++++++++++++++++++++-------------- 4 files changed, 208 insertions(+), 89 deletions(-) diff --git a/parla/parray/core.py b/parla/parray/core.py index 742a9e9..20da5d3 100644 --- a/parla/parray/core.py +++ b/parla/parray/core.py @@ -282,7 +282,9 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None: with self._coherence_cv[device_id]: operations = self._coherence.evict(device_id, keep_one_copy) + print("Evict operations", flush=True) self._process_operations(operations) + print("Evict operations done", flush=True) # Coherence update operations: @@ -336,13 +338,19 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT """ for op in operations: if op.inst == MemoryOperation.NOOP: + print("No-op", flush=True) pass # do nothing elif op.inst == MemoryOperation.CHECK_DATA: + print("Check starts", flush=True) if not self._coherence.data_is_ready(op.src): # if data is not ready, wait + print("Check starts - 1", flush=True) with self._coherence_cv[op.src]: + print("Check starts - 2", flush=True) while not self._coherence.data_is_ready(op.src): self._coherence_cv[op.src].wait() + print("Check completes", flush=True) elif op.inst == MemoryOperation.LOAD: + print("Load starts", flush=True) with self._coherence_cv[op.dst]: # hold the CV when moving data # if the flag is set, skip this checking @@ -370,10 +378,13 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT else: self._coherence.set_data_as_ready(op.dst) self._coherence_cv[op.dst].notify_all() # let other threads know the data is ready + print("Load completes", flush=True) elif op.inst == MemoryOperation.EVICT: + print("Evict starts", flush=True) self._array.clear(op.src) # decrement the reference counter, relying on GC to free the memory if MemoryOperation.NO_MARK_AS_READY not in op.flag: self._coherence.set_data_as_ready(op.src, None) # mark it as done + print("Evict completes", flush=True) elif op.inst == MemoryOperation.ERROR: raise RuntimeError("PArray gets an error from coherence protocol") else: @@ -410,9 +421,13 @@ def _auto_move(self, device_id: int = None, do_write: bool = False) -> None: slices = None if not self._slices else self._slices[0] if do_write: + print("write starts", flush=True) self._coherence_write(device_id, slices) + print("write complets", flush=True) else: + print("read starts", flush=True) self._coherence_read(device_id, slices) + print("read complets", flush=True) def _on_same_device(self, other: "PArray") -> bool: """ diff --git a/parla/parray/memory.py b/parla/parray/memory.py index 5ead20d..109516d 100644 --- a/parla/parray/memory.py +++ b/parla/parray/memory.py @@ -3,6 +3,9 @@ import numpy +import sys +#import gc + #TODO: Fix this to be more stable and less of a hack. try: import cupy @@ -514,5 +517,24 @@ def clear(self, device_id) -> None: """ Clear data in device_id """ + """ + import psutil + import os + mempool = cupy.get_default_memory_pool() + pinned_mempool = cupy.get_default_pinned_memory_pool() +#proc = psutil.Process(os.getpid()) +#mem0 = proc.memory_info().rss + print("Before:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) + print("device id:", device_id, " is cleared: ", len(self._buffer[device_id]), ", ", sys.getrefcount(self._buffer[device_id])) +#del self._buffer[device_id] + import psutil + """ self._indices_map[device_id] = None self._buffer[device_id] = None + """ +#mem1 = proc.memory_info().rss +#gc.collect() + mempool.free_all_blocks() + print("After:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) +#print("\t Before deallcation: ", mem0, " after: ", mem1, flush=True) + """ diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 6c0033c..082bba3 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -382,6 +382,7 @@ def run(self): events = env.get_events_from_components() self._wait_for_dependency_events(env) task_state = self._execute_task() + print(str(self.taskid), " execution complets", flush=True) # Events could be multiple for multiple devices task. env.record_events() if len(events) > 0: @@ -389,7 +390,9 @@ def run(self): # notify dependents and make them wait for that event, # not Parla task completion. if not isinstance(task_state, TaskRunning): + print(str(self.taskid), " notifies", flush=True) self._notify_dependents(events) + print(str(self.taskid), " notifies done", flush=True) env.sync_events() task_state = task_state or TaskCompleted(None) except Exception as e: @@ -488,6 +491,7 @@ def _notify_dependents_mutex(self, events=None): def _notify_dependents(self, events=None): for dependent in self._dependents: + print("dependent:", str(dependent.taskid), flush=True) dependent._handle_dependency_event_mutex(events) self._dependents = [] @@ -515,6 +519,7 @@ def check_all_dependency_mapped(self) -> bool: def _handle_dependency_event_mutex(self, events): with self._mutex: self._num_blocking_dependencies -= 1 + print(str(self.taskid), " has dependency:", self._num_blocking_dependencies, flush=True) # Add events from one dependent task. # (We are aiming to multiple device tasks, and it would # be possible to have multiple events) @@ -615,25 +620,28 @@ def _handle_dependency_spawn(self, dependency: "Task"): def acquire_parray(self): ctx = get_scheduler_context() - for parray in (self.dataflow.input + \ - self.dataflow.inout + \ - self.dataflow.output): - for d in self.req.devices: - ctx.scheduler.lrum._acquire_data(parray, d, str(self.taskid)) + if self.dataflow is not None: + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._acquire_data(parray, d, str(self.taskid)) def release_parray(self): ctx = get_scheduler_context() - for parray in (self.dataflow.input + \ - self.dataflow.inout + \ - self.dataflow.output): - for d in self.req.devices: - ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) - ctx.scheduler.lrum._evict() + if self.dataflow is not None: + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + ctx.scheduler.lrum._evict() def _execute_task(self): self.acquire_parray() result = self._state.func(self, *self._state.args) self.release_parray() + print("Execution done", flush=True) return result def cleanup(self): @@ -687,18 +695,23 @@ def __init__(self, computation_task: ComputeTask, taskid, self._operand_type = operand_type def _execute_task(self): + print(str(self.taskid), " starts data move", flush=True) write_flag = True if (self._operand_type == OperandType.IN): write_flag = False # Move data to current device - dev_type = get_current_devices()[0] + self.dev_type = get_current_devices()[0] dev_no = -1 - if (dev_type.architecture is not cpu): - dev_no = dev_type.index + if (self.dev_type.architecture is not cpu): + dev_no = self.dev_type.index ctx = get_scheduler_context() - ctx.scheduler.lrum._start_prefetch_data(self._target_data, dev_type, str(self.taskid)) + print(str(self.taskid), " starts data move - 1", flush=True) + ctx.scheduler.lrum._start_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) + print(str(self.taskid), " starts data move - 2", flush=True) self._target_data._auto_move(device_id=dev_no, do_write=write_flag) - ctx.scheduler.lrum._stop_prefetch_data(self._target_data, dev_type, str(self.taskid)) + print(str(self.taskid), " starts data move - 3", flush=True) + ctx.scheduler.lrum._stop_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) + print(str(self.taskid), " starts data move - 4", flush=True) return TaskCompleted(None) def cleanup(self): @@ -713,7 +726,7 @@ def _finish(self, ctx): # Don't update parray tracking information either # The scheduler already registered the new location # If size changes, the ComputeTask will take care of that - + ctx = get_scheduler_context() # Decrease the number of running tasks on the device d. for d in self.req.devices: ctx.scheduler.update_mapped_task_count_mutex(self, d, -1) diff --git a/parla/tracking.py b/parla/tracking.py index 7632c9c..fbeb17e 100644 --- a/parla/tracking.py +++ b/parla/tracking.py @@ -49,6 +49,8 @@ from typing import TypedDict, Dict from parla.cpu_impl import cpu +print_log=False + # TODO(hc): It should be declared on PArray. class DataNode: @@ -115,48 +117,64 @@ def __init__(self): def __str__(self): return self.__repr__() - def __repr__(self): - return f"DLList({self.head}, {self.tail})" - def append(self, node): with self._list_lock: - if self.head is None: + if self.length == 0: self.head = node self.tail = node + node = self.tail = self.head else: self.tail.next = node node.prev = self.tail + node.next = None self.tail = node self.length += 1 - def remove(self, node): + def remove_mutex(self, node): with self._list_lock: - edit = False + self.remove(node) - if self.head == node: - self.head = node.next - edit = True - - if self.tail == node: - self.tail = node.prev - edit = True + def remove(self, node): + edit = False + if self.length == 1 and (node == self.head or node == self.tail): + edit = True + self.head = None + self.tail = None + node.prev = None + node.next = None + self.length = 0 + return edit + elif node.prev is None and node.next is None: + return edit - if node.prev is not None: - node.prev.next = node.next - edit = True + if self.head == node: + self.head = node.next + edit = True + elif self.tail == node: + self.tail = node.prev + edit = True + else: + node.prev.next = node.next + node.next.prev = node.prev + edit = True - if node.next is not None: - node.next.prev = node.prev - edit = True + node.prev = None + node.next = None + if print_log: + print(f"After remove {id(node)} next: {id(node.next)} and prev: {id(node.prev)}: new head: {id(self.head)} and tail: {id(self.tail)}", flush=True) - node.prev = None - node.next = None + if edit: + self.length -= 1 + return edit - if edit: - self.length -= 1 + def remove_head(self): + with self._list_lock: + if self.head is None: + return None + old_head = self.head + self.remove(old_head) + return old_head - return edit - def insert_before(self, node, new_node): with self._list_lock: if node.prev is not None: @@ -181,16 +199,24 @@ def insert_after(self, node, new_node): self.length += 1 - def __len__(self): + def is_head_none(self): with self._list_lock: - return self.length + if self.head is None: + return True + return False + + def __len__(self): + return self.length def __repr__(self): with self._list_lock: repr_str = ":\n" tmp_node = self.head - while (tmp_node != None): + while (tmp_node is not None): repr_str += str(id(tmp_node)) + " -> " + print("Head:", id(self.head), " tail:", id(self.tail), " and current node:", id(tmp_node), flush=True) + if id(tmp_node) == id(self.tail): + break tmp_node = tmp_node.next repr_str += "\n" return repr_str @@ -265,8 +291,9 @@ def _update_data_state(self, data_id, new_state, taskid): # prefetching, reserved, using, free data_info = self.data_dict[data_id] data_state = data_info["state"] - print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ - f"{data_state} to {new_state}", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ + f"{data_state} to {new_state}", flush=True) if data_state == new_state: return if new_state == GCDataState.PREFETCHING: @@ -279,12 +306,14 @@ def _update_data_state(self, data_id, new_state, taskid): data_info["state"] = new_state return elif new_state == GCDataState.ACQUIRED: - assert(data_state == GCDataState.RESERVED) + assert(data_state == GCDataState.RESERVED or data_state == GCDataState.ACQUIRED) data_info["state"] = new_state return elif new_state == GCDataState.FREE: assert(data_state == GCDataState.ACQUIRED) - data_info["state"] = new_state + with self._data_ref_count_lock: + if data_info["ref_count"] == 0: + data_info["state"] = new_state return def _dict_id(self, data, dev): @@ -292,6 +321,9 @@ def _dict_id(self, data, dev): dev_index = "G" + str(dev.index) if (dev.architecture is not cpu) else "C" return str(data.ID) + "." + dev_index + def _get_devid(self, dev): + return -1 if dev.architecture is cpu else dev.index + def _start_prefetch_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) if data_id in self.data_dict: @@ -305,26 +337,44 @@ def _start_prefetch_data(self, data, dev, taskid = ""): # TODO(hc): PArray should point to a corresponding data node. data_info = self.data_dict[data_id] - success = self.zr_data_list.remove(data_info["ref_list_node"]) + if print_log: + print(f"[GC] (Task: {taskid}) Before Zero-referenced list after prefetching data: "+ + f" node id: {id(data_info['ref_list_node'])}", + f"\n{self.zr_data_list}", flush=True) + + self._increase_ref_count(data_info) + success = self.zr_data_list.remove_mutex(data_info["ref_list_node"]) self._update_data_state(data_id, GCDataState.PREFETCHING, taskid) + if success and print_log: + print(f"[GC] (Task: {taskid}) Data (ID: {data_id}) is removed "+ + f"from zero-referenced list during prefetching "+ + f"(Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is ready to be updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + #if success: #This is the first prefetch of a data object that is already on the device. #Update the evictable memory size (as this data object is no longer evictable). #self.evictable_memory -= data.size - self._increase_ref_count(data_info) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ - f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) else: self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ "ref_count" : 1, \ "ref_list_node" : DataNode(data, dev) } - print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ - f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - print(f"[GC] (Task: {taskid}) Zero-referenced list after prefetching data: "+ - f"\n{self.zr_data_list}", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ + f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Zero-referenced list after prefetching data: "+ + f"\n{self.zr_data_list}", flush=True) #This is a new block, update the used memory size. #self.used_memory += data.size #self.prefetch_map[data] = data @@ -335,9 +385,12 @@ def _stop_prefetch_data(self, data, dev, taskid=""): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) self._update_data_state(data_id, GCDataState.RESERVED, taskid) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ - f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] prefetching-stop (Task: {taskid}) ID: {data_id} is None", flush=True) def _acquire_data(self, data, dev, taskid = ""): #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. @@ -351,10 +404,13 @@ def _acquire_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) assert(data_id in self.data_dict) self._update_data_state(data_id, GCDataState.ACQUIRED, taskid) - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ - f"a compute task acquires"+ - f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"a compute task acquires"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] acquire (Task: {taskid}) ID: {data_id} is None", flush=True) def _release_data(self, data, dev, taskid = ""): data_id = self._dict_id(data, dev) @@ -368,48 +424,61 @@ def _release_data(self, data, dev, taskid = ""): #data.remove_active(self.device) #data.remove_use(self.device) - if data_info["ref_count"] == 0: - assert(self._check_ref_count_zero(data_info)) - #del self.active_map[data] - #If the data object is no longer needed by any already prefetched tasks, it can be evicted. - node = data_info["ref_list_node"] - self.zr_data_list.append(node) - print(">>>>>>>>>>>>>>>>>>>> ", type(node), " and ", type(node.data)) - #self.evictable_memory += data.nbytes + self._update_data_state(data_id, GCDataState.FREE, taskid) + with self._data_ref_count_lock: + if data_info["ref_count"] == 0: + #del self.active_map[data] + #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + node = data_info["ref_list_node"] + self.zr_data_list.remove(node) + self.zr_data_list.append(node) + #self.evictable_memory += data.nbytes + """ #if use_count == 1: #del self.used_map[data] - print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ - f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ - f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) - print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ - f"\n{self.zr_data_list}", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ + f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ + f"\n{self.zr_data_list}", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] release (Task: {taskid}) ID: {data_id} is None", flush=True) + """ def _evict_data(self, target_data, target_dev): data_id = self._dict_id(target_data, target_dev) data_info = self.data_dict[data_id] dev_id = -1 if target_dev.architecture is cpu else target_dev.index - print(f"[GC] Zero-referenced data (ID: {data_id}) is evicted", flush=True) - assert(self._check_ref_count_zero(data_info)) + if print_log: + print(f"[GC] Zero-referenced data (ID: {data_id}) is evicted: " + + f"reference count = {data_info['ref_count']}", flush=True) + # TODO(hc): It is possible that this data has a reference count being bigger + # than 1 in parallel environment. + #assert(self._check_ref_count_zero(data_info)) #Call internal data object evict method #This should: # - Backup the data if its not in a SHARED state # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) # - Mark the data for deletion (this may be done by the CuPy/Python GC) - print("type:", type(target_data)) target_data.evict(dev_id) - self.zr_data_list.remove(data_info["ref_list_node"]) - del data_info +#del data_info #self.used_memory -= data.nbytes #self.evictable_memory -= data.nbytes def _evict(self): + if print_log: + print(f"[GC] Zero-referenced list before evict starts: "+ + f"\n{self.zr_data_list}", flush=True) + # Get the oldest data object # Because we append after use this is at the front of the list - while self.zr_data_list.head != None: - node = self.zr_data_list.head - n_data = node.data - n_dev = node.device - self._evict_data(n_data, n_dev) + while not self.zr_data_list.is_head_none(): + node = self.zr_data_list.remove_head() + if node is not None: + n_data = node.data + n_dev = node.device + self._evict_data(n_data, n_dev) ''' From 8f474b9996fb8d6d56db5ad831957716e392fbcd Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Fri, 13 Jan 2023 14:23:07 -0600 Subject: [PATCH 15/16] (temp) add examples --- .../cholesky/blocked_cholesky_automatic.py | 16 ++++++++-------- tests/test_gc.py | 19 +++++++++++++++---- tutorial/0_hello_world/hello.py | 14 ++++++++++++-- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/examples/cholesky/blocked_cholesky_automatic.py b/examples/cholesky/blocked_cholesky_automatic.py index 0c705e3..87fcd5f 100644 --- a/examples/cholesky/blocked_cholesky_automatic.py +++ b/examples/cholesky/blocked_cholesky_automatic.py @@ -208,7 +208,7 @@ def cholesky_blocked_inplace(a, block_size): @spawn(gemm1[j, k], [solve[j, k], gemm1[j, 0:k]], input=[a[j][k]], inout=[a[j][j]], placement=loc_syrk, memory=mem) def t1(): - #print(f"+SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) + print(f"+SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) out = a[j][j].array rhs = a[j][k].array out = update(rhs, rhs, out) @@ -217,7 +217,7 @@ def t1(): stream.synchronize() a[j][j].update(out) stream.synchronize() - #print(f"-SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) + print(f"-SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) # Cholesky on block mem = 8*block_size**2 @@ -229,7 +229,7 @@ def t1(): @spawn(subcholesky[j], [gemm1[j, 0:j]], inout=[a[j][j]], placement=loc_potrf, memory=mem) def t2(): - #print(f"+POTRF: ({j}) - Requires rw({j},{j})", flush=True) + print(f"+POTRF: ({j}) - Requires rw({j},{j})", flush=True) dblock = a[j][j].array log_memory() @@ -239,7 +239,7 @@ def t2(): stream.synchronize() a[j][j].update(dblock) stream.synchronize() - #print(f"-POTRF: ({j}) - Requires rw({j},{j})", flush=True) + print(f"-POTRF: ({j}) - Requires rw({j},{j})", flush=True) for i in range(j+1, len(a)): for k in range(j): # Inter-block GEMM @@ -251,7 +251,7 @@ def t2(): @spawn(gemm2[i, j, k], [solve[j, k], solve[i, k], gemm2[i, j, 0:k]], inout=[a[i][j]], input=[a[i][k], a[j][k]], placement=loc_gemm, memory=mem) def t3(): - #print(f"+GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) + print(f"+GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) out = a[i][j].array rhs1 = a[i][k].array rhs2 = a[j][k].array @@ -263,7 +263,7 @@ def t3(): stream.synchronize() a[i][j].update(out) stream.synchronize() - #print(f"-GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) + print(f"-GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) # Triangular solve mem = 8*2*block_size**2 @@ -275,7 +275,7 @@ def t3(): @spawn(solve[i, j], [gemm2[i, j, 0:j], subcholesky[j]], inout=[a[i][j]], input=[a[j][j]], placement=loc_trsm, memory=mem) def t4(): - #print(f"+TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) + print(f"+TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) factor = a[j][j].array panel = a[i][j].array @@ -285,7 +285,7 @@ def t4(): stream.synchronize() a[i][j].update(out) stream.synchronize() - #print(f"-TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) + print(f"-TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) return subcholesky[len(a) - 1] diff --git a/tests/test_gc.py b/tests/test_gc.py index f3d284b..64a8b57 100644 --- a/tests/test_gc.py +++ b/tests/test_gc.py @@ -17,17 +17,28 @@ def main(): for i in range(0, 4): # Spawn a task to be scheduled by the Parla runtime + """ @spawn(t[i], input=[A, B], placement=gpu(i)) def axb(): - print(f"{A} >>>> ", flush=True) C = A @ B - print("C is ", type(C)) + print("axb is called", flush=True) time.sleep(10) + """ + @spawn(t[i], input=[A], placement=gpu(i)) + def axb(): + print("axb is called", flush=True) + time.sleep(3) - @spawn(dependencies=[t]) + @spawn(t[4], dependencies=[t[:4]]) def apb(): print("apb", flush=True) - time.sleep(20) + time.sleep(2) + + @spawn(dependencies=[t[4]], input=[A], placement=gpu(1)) + def apb(): + print("apb2", flush=True) + print(A.array) + time.sleep(2) diff --git a/tutorial/0_hello_world/hello.py b/tutorial/0_hello_world/hello.py index caeeaf0..f591293 100644 --- a/tutorial/0_hello_world/hello.py +++ b/tutorial/0_hello_world/hello.py @@ -10,20 +10,30 @@ ''' # Import Parla -from parla import Parla, spawn +from parla import Parla, spawn, parray # Import the 'cpu' device type from parla.cpu import cpu +import parla.tracking + +import numpy as np # Define the main function (recommended when using Parla) # Tasks cannot be defined in the global scope def main(): + A = parray.asarray([[1, 2], [3, 4]]) + data = np.random.rand(2, 2) + B = parray.asarray(data) # Spawn a task to be scheduled by the Parla runtime - @spawn() + @spawn(input=[A], output=[B], placement=cpu) def hello_world(): print("Hello, World!", flush=True) + @spawn(input=[A], placement=cpu) + def hello_world2(): + print("Grab task A", flush=True) + # Execute the Parla program within the Parla context if __name__ == "__main__": with Parla(): From d11aec4b569a871046c8be7bf5b3d80b3fc4f0f5 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Thu, 19 Jan 2023 13:03:41 -0600 Subject: [PATCH 16/16] workaround lru --- parla/parray/coherence.py | 50 ++++++++++++++++++--------------------- parla/parray/core.py | 15 ------------ parla/parray/memory.py | 9 ++----- parla/task_runtime.py | 48 ++++++++++++++++++++++++------------- 4 files changed, 56 insertions(+), 66 deletions(-) diff --git a/parla/parray/coherence.py b/parla/parray/coherence.py index 9fdd07b..6547270 100644 --- a/parla/parray/coherence.py +++ b/parla/parray/coherence.py @@ -535,6 +535,7 @@ def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperat """ device_local_state = self._local_states[device_id] operations = [] + evict_last_copy = False if device_local_state == self.INVALID: # already evicted, do nothing operations.append(MemoryOperation.noop()) @@ -547,40 +548,35 @@ def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperat new_owner = device break - # this device owns the last copy - if new_owner is None: - if keep_one_copy: - if device_id == CPU_INDEX: - # the last copy is already at CPU, - # do nothing and skip the rest of the code - return [MemoryOperation.noop()] - else: - # write back the last copy to CPU - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) - - # now CPU has exclusive access to the data - self._global_state = self.MODIFIED - self._local_states[CPU_INDEX] = self.MODIFIED - - new_owner = CPU_INDEX - else: - self._global_state = self.INVALID # the system lose the last copy - self.owner = new_owner + if new_owner is None: + evict_last_copy = True + else: + # update states + self._local_states[device_id] = self.INVALID + operations.append(MemoryOperation.evict(device_id)) + self._versions[device_id] = -1 + self._is_complete[device_id] = None - # update states - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) else: # Modified, this device owns the last copy + evict_last_copy = True + + if evict_last_copy: if keep_one_copy: # write back to CPU - self.owner = CPU_INDEX - self._local_states[CPU_INDEX] = self.MODIFIED + if device_id != CPU_INDEX: + operations.extend(self._write_back_to(CPU_INDEX, self.MODIFIED, on_different_device=True, this_device_id=device_id)[0]) - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) + self.owner = CPU_INDEX + self._local_states[CPU_INDEX] = self.MODIFIED + self._is_complete[device_id] = True + else: + return [MemoryOperation.noop()] else: self._global_state = self.INVALID # the system lose the last copy self.owner = None + self._versions[device_id] = -1 + self._is_complete[device_id] = None - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) + self._local_states[device_id] = self.INVALID + operations.append(MemoryOperation.evict(device_id)) return operations diff --git a/parla/parray/core.py b/parla/parray/core.py index 20da5d3..742a9e9 100644 --- a/parla/parray/core.py +++ b/parla/parray/core.py @@ -282,9 +282,7 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None: with self._coherence_cv[device_id]: operations = self._coherence.evict(device_id, keep_one_copy) - print("Evict operations", flush=True) self._process_operations(operations) - print("Evict operations done", flush=True) # Coherence update operations: @@ -338,19 +336,13 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT """ for op in operations: if op.inst == MemoryOperation.NOOP: - print("No-op", flush=True) pass # do nothing elif op.inst == MemoryOperation.CHECK_DATA: - print("Check starts", flush=True) if not self._coherence.data_is_ready(op.src): # if data is not ready, wait - print("Check starts - 1", flush=True) with self._coherence_cv[op.src]: - print("Check starts - 2", flush=True) while not self._coherence.data_is_ready(op.src): self._coherence_cv[op.src].wait() - print("Check completes", flush=True) elif op.inst == MemoryOperation.LOAD: - print("Load starts", flush=True) with self._coherence_cv[op.dst]: # hold the CV when moving data # if the flag is set, skip this checking @@ -378,13 +370,10 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT else: self._coherence.set_data_as_ready(op.dst) self._coherence_cv[op.dst].notify_all() # let other threads know the data is ready - print("Load completes", flush=True) elif op.inst == MemoryOperation.EVICT: - print("Evict starts", flush=True) self._array.clear(op.src) # decrement the reference counter, relying on GC to free the memory if MemoryOperation.NO_MARK_AS_READY not in op.flag: self._coherence.set_data_as_ready(op.src, None) # mark it as done - print("Evict completes", flush=True) elif op.inst == MemoryOperation.ERROR: raise RuntimeError("PArray gets an error from coherence protocol") else: @@ -421,13 +410,9 @@ def _auto_move(self, device_id: int = None, do_write: bool = False) -> None: slices = None if not self._slices else self._slices[0] if do_write: - print("write starts", flush=True) self._coherence_write(device_id, slices) - print("write complets", flush=True) else: - print("read starts", flush=True) self._coherence_read(device_id, slices) - print("read complets", flush=True) def _on_same_device(self, other: "PArray") -> bool: """ diff --git a/parla/parray/memory.py b/parla/parray/memory.py index 109516d..db97a6b 100644 --- a/parla/parray/memory.py +++ b/parla/parray/memory.py @@ -517,24 +517,19 @@ def clear(self, device_id) -> None: """ Clear data in device_id """ - """ import psutil import os mempool = cupy.get_default_memory_pool() pinned_mempool = cupy.get_default_pinned_memory_pool() #proc = psutil.Process(os.getpid()) #mem0 = proc.memory_info().rss - print("Before:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) - print("device id:", device_id, " is cleared: ", len(self._buffer[device_id]), ", ", sys.getrefcount(self._buffer[device_id])) + #print("Before:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) #del self._buffer[device_id] import psutil - """ self._indices_map[device_id] = None self._buffer[device_id] = None - """ #mem1 = proc.memory_info().rss #gc.collect() mempool.free_all_blocks() - print("After:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) + #print("After:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) #print("\t Before deallcation: ", mem0, " after: ", mem1, flush=True) - """ diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 082bba3..f65ba11 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -355,6 +355,11 @@ def _finish(self, ctx: 'SchedulerContext'): """Cleanup works after executing the task.""" raise NotImplementedError() + @abstractmethod + def _invoke_garbage_collector(self): + """Invoke a garbage collector; for now, invoke it for + each task execution.""" + raise NotImplementedError() def run(self): assert self._assigned, "Task was not assigned before running." @@ -364,6 +369,7 @@ def run(self): with self._mutex: # A default state to avoid exceptions during catch task_state = TaskException(RuntimeError("Unknown fatal error")) + event_exists = False # Run the task and assign the new task state try: assert isinstance(self._state, TaskRunning), " Task is not running state: {} on task: {}".format(self._state, self.taskid) @@ -382,17 +388,16 @@ def run(self): events = env.get_events_from_components() self._wait_for_dependency_events(env) task_state = self._execute_task() - print(str(self.taskid), " execution complets", flush=True) # Events could be multiple for multiple devices task. env.record_events() if len(events) > 0: + event_exists = True # If any event created by the current task exist, # notify dependents and make them wait for that event, # not Parla task completion. if not isinstance(task_state, TaskRunning): - print(str(self.taskid), " notifies", flush=True) + self._invoke_garbage_collector() self._notify_dependents(events) - print(str(self.taskid), " notifies done", flush=True) env.sync_events() task_state = task_state or TaskCompleted(None) except Exception as e: @@ -409,10 +414,12 @@ def run(self): # new dependents could be added after the above # notifications, while other devices are running # their kernels asynchronously. - if not isinstance(task_state, TaskRunning): + if event_exists == False and not isinstance(task_state, TaskRunning): + self._invoke_garbage_collector() self._notify_dependents() self._set_state(task_state) - self._finish(ctx) + if isinstance(self._state, TaskCompleted): + self._finish(ctx) except Exception as e: logger.exception("Task %r: Exception in task handling", self) raise e @@ -491,7 +498,6 @@ def _notify_dependents_mutex(self, events=None): def _notify_dependents(self, events=None): for dependent in self._dependents: - print("dependent:", str(dependent.taskid), flush=True) dependent._handle_dependency_event_mutex(events) self._dependents = [] @@ -519,7 +525,6 @@ def check_all_dependency_mapped(self) -> bool: def _handle_dependency_event_mutex(self, events): with self._mutex: self._num_blocking_dependencies -= 1 - print(str(self.taskid), " has dependency:", self._num_blocking_dependencies, flush=True) # Add events from one dependent task. # (We are aiming to multiple device tasks, and it would # be possible to have multiple events) @@ -627,7 +632,8 @@ def acquire_parray(self): for d in self.req.devices: ctx.scheduler.lrum._acquire_data(parray, d, str(self.taskid)) - def release_parray(self): + def _invoke_garbage_collector(self): + print(f"Garbage collector is activated", flush=True) ctx = get_scheduler_context() if self.dataflow is not None: for parray in (self.dataflow.input + \ @@ -640,8 +646,6 @@ def release_parray(self): def _execute_task(self): self.acquire_parray() result = self._state.func(self, *self._state.args) - self.release_parray() - print("Execution done", flush=True) return result def cleanup(self): @@ -660,14 +664,26 @@ def _finish(self, ctx): ctx.scheduler.update_mapped_task_count_mutex(self, d, -1) ctx.scheduler.update_launched_task_count_mutex(self, d, -1) - # _finish() can be called more than once on global task. if (self.dataflow != None): + """ + self.release_parray() + for parray in self.dataflow.input: + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + """ # Update OUT parrays which may have changed size from 0 to something # We assume all IN and INOUT params don't change size - for parray in (self.dataflow.output): + for parray in (self.dataflow.output + self.dataflow.inout): + """ + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + """ ctx.scheduler._available_resources.update_parray_nbytes( parray, self.req.devices) + """ + ctx.scheduler.lrum._evict() + """ ctx.scheduler.decr_active_compute_tasks() self.cleanup() @@ -695,7 +711,6 @@ def __init__(self, computation_task: ComputeTask, taskid, self._operand_type = operand_type def _execute_task(self): - print(str(self.taskid), " starts data move", flush=True) write_flag = True if (self._operand_type == OperandType.IN): write_flag = False @@ -705,15 +720,14 @@ def _execute_task(self): if (self.dev_type.architecture is not cpu): dev_no = self.dev_type.index ctx = get_scheduler_context() - print(str(self.taskid), " starts data move - 1", flush=True) ctx.scheduler.lrum._start_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) - print(str(self.taskid), " starts data move - 2", flush=True) self._target_data._auto_move(device_id=dev_no, do_write=write_flag) - print(str(self.taskid), " starts data move - 3", flush=True) ctx.scheduler.lrum._stop_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) - print(str(self.taskid), " starts data move - 4", flush=True) return TaskCompleted(None) + def _invoke_garbage_collector(self): + pass + def cleanup(self): self._target_data = None