diff --git a/parla/parray/coherence.py b/parla/parray/coherence.py index 9fdd07b..49528cd 100644 --- a/parla/parray/coherence.py +++ b/parla/parray/coherence.py @@ -21,14 +21,11 @@ class MemoryOperation: NOOP = 0 LOAD = 1 EVICT = 2 - CHECK_DATA = 3 # Flag SWITCH_DEVICE_FLAG = 101 # if the flag is set, it means dst is not the current device - SKIP_SRC_CHECK = 102 # if the flag is set, no need to hold condition variable of src - LOAD_SUBARRAY = 103 # if the flag is set, it means a subarray of src should be loaded - ENSURE_IS_COMPLETE = 104 # if the flag is set, check data will also check if the data is complete - NO_MARK_AS_READY = 105 # if the flag is set, the operation won't mark the data as ready after completed + LOAD_SUBARRAY = 102 # if the flag is set, it means a subarray of src should be loaded + ENSURE_IS_COMPLETE = 103 # if the flag is set, check data will also check if the data is complete def __init__(self, inst: int = NOOP, dst: int = -1, src: int = -1, flag: int = []): self.inst = inst @@ -47,14 +44,11 @@ def error() -> MemoryOperation: return MemoryOperation(MemoryOperation.ERROR) @staticmethod - def load(dst: int, src: int, on_different_device: bool = False, skip_src_check: bool = False, is_subarray: bool = False) -> MemoryOperation: + def load(dst: int, src: int, on_different_device: bool = False, is_subarray: bool = False) -> MemoryOperation: """ load all data from src to dst Need to switch device if `on_different_device` is true This could known by checking flag = SWITCH_DEVICE_FLAG - - Skip checking condition variable of src if `skip_src_check` is set - This could known by checking flag = MemoryOperation.SKIP_SRC_CHECK If `is_subarray` is True, it means a subarray of src will be loaded This could known by checking flag = MemoryOperation.LOAD_SUBARRAY @@ -64,34 +58,13 @@ def load(dst: int, src: int, on_different_device: bool = False, skip_src_check: flag.append(MemoryOperation.LOAD_SUBARRAY) if on_different_device: flag.append(MemoryOperation.SWITCH_DEVICE_FLAG) - if skip_src_check: - flag.append(MemoryOperation.SKIP_SRC_CHECK) return MemoryOperation(MemoryOperation.LOAD, dst, src, flag) @staticmethod - def evict(src: int, mark_as_ready: bool = True) -> MemoryOperation: + def evict(src: int) -> MemoryOperation: """ invalidate the data in src """ - flag = [] - if not mark_as_ready: - flag.append(MemoryOperation.NO_MARK_AS_READY) - - return MemoryOperation(MemoryOperation.EVICT, src=src, flag=flag) - - @staticmethod - def check_data(src: int, ensure_is_complete: bool = False) -> MemoryOperation: - """ check if the data is ready, wait if not - - If `ensure_is_complete` is True - if means need to wait until and data is completed and ready. - This could known by checking flag = MemoryOperation.ENSURE_IS_COMPLETE - """ - flag = [] - if ensure_is_complete: - flag.append(MemoryOperation.ENSURE_IS_COMPLETE) - - return MemoryOperation(MemoryOperation.CHECK_DATA, src=src, flag=flag) - + return MemoryOperation(MemoryOperation.EVICT, src=src) class Coherence: """ @@ -107,7 +80,6 @@ class Coherence: _local_states: Dict[int, int | Dict[int, int]] _versions: Dict[int, int | Dict[int, int]] _is_complete: Dict[int, bool] - _data_ready: Dict[int, bool | Dict[int, bool]] owner: int _latest_version: int _lock: threading.Lock @@ -138,52 +110,30 @@ def __init__(self, init_owner: int, num_gpu: int): self._is_complete[init_owner] = True # the copy is complete self._latest_version = 0 # the latest version in the system - # is data ready to use in this device? - # data is not ready if it need to be copied from somewhere (has an load operations in progress) - # and becomes ready is no data movement in progress - # if copy is subarray, value would be a Dict{slices_hash: bool} - # this provide a order when multiple threads are accessing the same data - # for example, if multiple threads read the same data on the same device, - # only one of them will need to performance datamovement and other are just wait ready = True - self._data_ready = {n: True for n in range(num_gpu)} - self._data_ready[CPU_INDEX] = True - # held the lock when updating states self._lock = threading.Lock() - def data_is_ready(self, device_id: int, ensure_is_complete: bool = False) -> bool: + def reset(self, new_owner: int): """ - Return True if data on `device_id` is ready to use, and there is no copy in progress. - - If `ensure_is_complete` is True, return false is the data is not complete even if it is ready + Reset the coherence state to only `new_owner` has a valid modified copy """ - if isinstance(self._data_ready[device_id], dict): # there are subarrays on this devices - if ensure_is_complete: - return False # data should be complete - return not (False in self._data_ready[device_id].values()) # all subarray need to be ready - else: - return self._data_ready[device_id] - - def set_data_as_ready(self, device_id: int, slices_hash: int = None) -> None: - """ - Mark data on `device_id` as ready to use, and there is no copy in progress. - - Args: - device_id: id of this device - slices_hash: hash code of the slices of the subarray to be manipulated - by default equals to None, which means the whole array is manipulated - """ - if slices_hash is not None: # move a subarray - self._data_ready[device_id][slices_hash] = True - else: - self._data_ready[device_id] = True + for device_id in self._local_states.keys(): + self._local_states[device_id] = self.INVALID + self._versions[device_id] = -1 + self._is_complete[device_id] = None + + self._local_states[new_owner] = self.MODIFIED + self.owner = new_owner + self._versions[new_owner] = 0 + self._is_complete[new_owner] = True + self._latest_version = 0 def _owner_is_latest(self) -> bool: """True if owner's has latest version""" return self._versions[self.owner] == self._latest_version def _write_back_to(self, device_id:int, new_state:int, on_different_device:bool = False, - this_device_id: int = None, skip_src_check_id: int = None) -> Tuple[List[MemoryOperation], bool]: + this_device_id: int = None) -> List[MemoryOperation]: """ Generate the list of write back MemoryOperation. Which make `device_id` has the latest version with a complete copy. @@ -193,7 +143,6 @@ def _write_back_to(self, device_id:int, new_state:int, on_different_device:bool new_state: new state of `dcvice_id` on_different_device: True if this device is not current deivce this_device_id: if `on_different_device` is True, this means current device ID. If None, ignore - skip_src_check_id: skip checking this device's src condition variable when it is not None Return: operations, has_copy (true if need to copy data to the device) @@ -261,8 +210,8 @@ def _write_back_to(self, device_id:int, new_state:int, on_different_device:bool # update latest version self._versions[device_id] = self._latest_version - return [MemoryOperation.load(device_id, t, on_different_device=on_different_device, skip_src_check=(skip_src_check_id == t)) for t in target] \ - + [MemoryOperation.evict(t) for t in evict_list], (len(target) != 0) + return [MemoryOperation.load(device_id, t, on_different_device=on_different_device) for t in target] \ + + [MemoryOperation.evict(t) for t in evict_list] def read(self, device_id: int, slices_hash: int = None) -> List[MemoryOperation]: """ Tell the protocol that this device read from the copy. @@ -279,109 +228,88 @@ def read(self, device_id: int, slices_hash: int = None) -> List[MemoryOperation] """ operations = [] - with self._lock: - if slices_hash is not None: # move a subarray - if self._is_complete[device_id] is True: # use existing complete data at this device - device_local_state = self._local_states[device_id] + if slices_hash is not None: # move a subarray + if self._is_complete[device_id] is True: # use existing complete data at this device + device_local_state = self._local_states[device_id] + else: + if not isinstance(self._local_states[device_id], dict): + self._versions[device_id] = {} + self._local_states[device_id] = {} + device_local_state = self.INVALID + elif slices_hash in self._local_states[device_id]: + device_local_state = self._local_states[device_id][slices_hash] else: - if not isinstance(self._local_states[device_id], dict): - self._versions[device_id] = {} - self._local_states[device_id] = {} - self._data_ready[device_id] = {} - device_local_state = self.INVALID - elif slices_hash in self._local_states[device_id]: - device_local_state = self._local_states[device_id][slices_hash] - else: - device_local_state = self.INVALID - self._is_complete[device_id] = False # this is a subarray - elif self._is_complete[device_id] is False: - # special case: need a complete copy but there are already subarrays in this deivce - # writeback this subarrays and then copy complete data from owner - - # write back to owner - # skip_src_check_id is required since device_id will be marked as not ready later - operations.extend(self._write_back_to(self.owner, self.SHARED, on_different_device=True, skip_src_check_id=device_id)[0]) - - # evict previous subarries at device_id - # should not mark the array as ready after eviction since load is not completed - operations.append(MemoryOperation.evict(device_id, mark_as_ready=False)) - - # copy from owner - # skip_src_checking is NOT required since owner will be ready after the above write back operaton - operations.append(MemoryOperation.load(device_id, self.owner)) - - # update status - self._data_ready[self.owner] = False - - # if there is on going operations on subarrays, no need to set it as False - if self.data_is_ready(device_id): - self._data_ready[device_id] = False - - self._is_complete[device_id] = True - self._versions[device_id] = self._versions[self.owner] - self._local_states[self.owner] = self.SHARED # owner is updated, so it is in SHARED states - self._local_states[device_id] = self.SHARED + device_local_state = self.INVALID + self._is_complete[device_id] = False # this is a subarray + elif self._is_complete[device_id] is False: + # special case: need a complete copy but there are already subarrays in this deivce + # writeback this subarrays and then copy complete data from owner - # change owner - if self._owner_is_latest(): - if self.owner == CPU_INDEX: - self.owner = device_id - elif device_id % 2 == 0 and self.owner % 2 != 0: # prefer device 0/2 over 1/3 - self.owner = device_id - else: - self.owner = device_id + # write back to owner + operations.extend(self._write_back_to(self.owner, self.SHARED, on_different_device=True)) - # skip the rest code - return operations - else: # move a complete copy and current device has no subarrays - device_local_state = self._local_states[device_id] - self._is_complete[device_id] = True # this is a complete array - - - if device_id == self.owner: - if device_local_state == self.SHARED: - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=True)) # check if the data is ready - elif device_local_state == self.MODIFIED: - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=True)) - # no need to check data is ready since assume no overlapping writers - else: # update it to latest - operations.extend(self._write_back_to(device_id, self.SHARED)[0]) - self._data_ready[device_id] = False + # evict previous subarries at device_id + operations.append(MemoryOperation.evict(device_id)) + + # copy from owner + operations.append(MemoryOperation.load(device_id, self.owner)) + + self._is_complete[device_id] = True + self._versions[device_id] = self._versions[self.owner] + self._local_states[self.owner] = self.SHARED # owner is updated, so it is in SHARED states + self._local_states[device_id] = self.SHARED + + # change owner + if self._owner_is_latest(): + if self.owner == CPU_INDEX: + self.owner = device_id + elif device_id % 2 == 0 and self.owner % 2 != 0: # prefer device 0/2 over 1/3 + self.owner = device_id else: - if device_local_state == self.INVALID: - if self._is_complete[device_id]: - operations.extend(self._write_back_to(device_id, self.SHARED)[0]) - - self._data_ready[device_id] = False - - # change owner - if self._owner_is_latest(): - if self.owner == CPU_INDEX: - self.owner = device_id - elif device_id % 2 == 0 and self.owner % 2 != 0: # prefer device 0/2 over 1/3 - self.owner = device_id - else: + self.owner = device_id + + # skip the rest code + return operations + else: # move a complete copy and current device has no subarrays + device_local_state = self._local_states[device_id] + self._is_complete[device_id] = True # this is a complete array + + + if device_id == self.owner: + if device_local_state == self.INVALID: # update it to latest + operations.extend(self._write_back_to(device_id, self.SHARED)) + else: + operations.append(MemoryOperation.noop()) + else: + if device_local_state == self.INVALID: + if self._is_complete[device_id]: + operations.extend(self._write_back_to(device_id, self.SHARED)) + + # change owner + if self._owner_is_latest(): + if self.owner == CPU_INDEX: self.owner = device_id - self._versions[device_id] = self._latest_version - else: # since we assume all array are disjoint, so could load directly - operations.append(MemoryOperation.load(dst=device_id, src=self.owner, is_subarray=True)) - - self._data_ready[device_id][slices_hash] = False - self._versions[device_id][slices_hash] = self._versions[self.owner] - elif device_local_state == self.MODIFIED: - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=self._is_complete[device_id])) - else: - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=self._is_complete[device_id])) + elif device_id % 2 == 0 and self.owner % 2 != 0: # prefer device 0/2 over 1/3 + self.owner = device_id + else: + self.owner = device_id + self._versions[device_id] = self._latest_version + else: # since we assume all array are disjoint, so could load directly + operations.append(MemoryOperation.load(dst=device_id, src=self.owner, is_subarray=True)) - # update status - if self._is_complete[device_id]: - if device_local_state == self.INVALID: - self._local_states[device_id] = self.SHARED + self._versions[device_id][slices_hash] = self._versions[self.owner] else: - if device_local_state == self.INVALID: - self._local_states[device_id][slices_hash] = self.SHARED + operations.append(MemoryOperation.noop()) - return operations + # update status + if self._is_complete[device_id]: + if device_local_state == self.INVALID: + self._local_states[device_id] = self.SHARED + else: + if device_local_state == self.INVALID: + self._local_states[device_id][slices_hash] = self.SHARED + + return operations def write(self, device_id: int, slices_hash: int = None) -> List[MemoryOperation]: """ Tell the protocol that this device write to the copy. @@ -396,145 +324,123 @@ def write(self, device_id: int, slices_hash: int = None) -> List[MemoryOperation """ operations = [] - with self._lock: - if slices_hash is not None: # move a subarray - if self._is_complete[device_id] is True: # use existing complete data at this device - device_local_state = self._local_states[device_id] + if slices_hash is not None: # move a subarray + if self._is_complete[device_id] is True: # use existing complete data at this device + device_local_state = self._local_states[device_id] + else: + if not isinstance(self._local_states[device_id], dict): + self._versions[device_id] = {} + self._local_states[device_id] = {} + device_local_state = self.INVALID + elif slices_hash in self._local_states[device_id]: + device_local_state = self._local_states[device_id][slices_hash] else: - if not isinstance(self._local_states[device_id], dict): - self._versions[device_id] = {} - self._local_states[device_id] = {} - self._data_ready[device_id] = {} - device_local_state = self.INVALID - elif slices_hash in self._local_states[device_id]: - device_local_state = self._local_states[device_id][slices_hash] - else: - device_local_state = self.INVALID - self._is_complete[device_id] = False # this is a subarray - elif self._is_complete[device_id] is False: - # special case: need a complete copy but there are already subarrays in this deivce - # writeback this subarrays and then copy complete data from owner - # - # no need to check this since assume no multiwriters - # operations.append(MemoryOperation.check_data(device_id)) - - # write back to owner - operations.extend(self._write_back_to(self.owner, self.MODIFIED, - on_different_device=True, skip_src_check_id=device_id)[0]) - - # copy from owner - operations.append(MemoryOperation.load(device_id, self.owner)) - - # update status - self._data_ready[self.owner] = False - self._data_ready[device_id] = False # won't deadlock since `skip_src_check_id` is set - - self._is_complete[device_id] = True - self._versions[device_id] = self._versions[self.owner] - self._local_states[device_id] = self.MODIFIED - self._local_states[self.owner] = self.INVALID # owner is invalid too + device_local_state = self.INVALID + self._is_complete[device_id] = False # this is a subarray + elif self._is_complete[device_id] is False: + # special case: need a complete copy but there are already subarrays in this deivce + # writeback this subarrays and then copy complete data from owner + + # write back to owner + operations.extend(self._write_back_to(self.owner, self.MODIFIED, + on_different_device=True)) + + # copy from owner + operations.append(MemoryOperation.load(device_id, self.owner)) - # change owner - self.owner = device_id + self._is_complete[device_id] = True + self._versions[device_id] = self._versions[self.owner] + self._local_states[device_id] = self.MODIFIED + self._local_states[self.owner] = self.INVALID # owner is invalid too - # skip the rest code - return operations - else: - device_local_state = self._local_states[device_id] - self._is_complete[device_id] = True # this is a complete array + # change owner + self.owner = device_id - if device_id == self.owner: - if device_local_state != self.MODIFIED: - ops, has_copy = self._write_back_to(device_id, self.MODIFIED) - operations.extend(ops) + # skip the rest code + return operations + else: + device_local_state = self._local_states[device_id] + self._is_complete[device_id] = True # this is a complete array - # if need to from other device - if has_copy: - self._data_ready[device_id] = False - self._latest_version += 1 - self._versions[device_id] = self._latest_version - else: - operations.append(MemoryOperation.noop()) - else: - if device_local_state == self.INVALID: - if self._is_complete[device_id]: - operations.extend(self._write_back_to(device_id, self.MODIFIED)[0]) + if device_id == self.owner: + if device_local_state != self.MODIFIED: + operations.extend(self._write_back_to(device_id, self.MODIFIED)) - self._data_ready[device_id] = False - self._latest_version += 1 - self._versions[device_id] = self._latest_version - - # change owner - self.owner = device_id - else: # since we assume all subarrays are disjoint, could load directly - operations.append(MemoryOperation.load(dst=device_id, src=self.owner, is_subarray=True)) - - self._data_ready[device_id][slices_hash] = False - self._versions[device_id][slices_hash] = self._versions[self.owner] + 1 - if self._owner_is_latest(): - self._latest_version += 1 - self._local_states[self.owner] = self.INVALID # invalidate overlapping copy - elif device_local_state == self.SHARED: - if self._is_complete[device_id]: - self._latest_version += 1 - self._versions[device_id] = self._latest_version + self._latest_version += 1 + self._versions[device_id] = self._latest_version + else: + operations.append(MemoryOperation.noop()) + else: + if device_local_state == self.INVALID: + if self._is_complete[device_id]: + operations.extend(self._write_back_to(device_id, self.MODIFIED)) - # change owner - self.owner = device_id + self._latest_version += 1 + self._versions[device_id] = self._latest_version - # evict others - ops, has_copy = self._write_back_to(device_id, self.MODIFIED) - operations.extend(ops) + # change owner + self.owner = device_id + else: # since we assume all subarrays are disjoint, could load directly + operations.append(MemoryOperation.load(dst=device_id, src=self.owner, is_subarray=True)) - # if need to from other device - if has_copy: - self._data_ready[device_id] = False - else: + self._versions[device_id][slices_hash] = self._versions[self.owner] + 1 + if self._owner_is_latest(): self._latest_version += 1 - self._versions[device_id][slices_hash] = self._latest_version - - # invalidate other complete copies - for id, state in self._local_states.items(): - # since we assume all subarrays are disjoint, so don't need to evict other subarrays - if not isinstance(state, dict): - if id != device_id: - self._local_states[id] = self.INVALID + self._local_states[self.owner] = self.INVALID # invalidate overlapping copy + elif device_local_state == self.SHARED: + if self._is_complete[device_id]: + self._latest_version += 1 + self._versions[device_id] = self._latest_version - if id != self.owner: # owner's buffer will be kept (so won't lost the last complete copy) - self._versions[id] = -1 - self._is_complete[id] = None - operations.append(MemoryOperation.evict(id)) + # change owner + self.owner = device_id - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=self._is_complete[device_id])) + # evict others + operations.extend(self._write_back_to(device_id, self.MODIFIED)) else: - operations.append(MemoryOperation.check_data(device_id, ensure_is_complete=self._is_complete[device_id])) - - # update status - if self._is_complete[device_id]: - if device_local_state != self.MODIFIED: - self._local_states[device_id] = self.MODIFIED + self._latest_version += 1 + self._versions[device_id][slices_hash] = self._latest_version + + # invalidate other complete copies + for id, state in self._local_states.items(): + # since we assume all subarrays are disjoint, so don't need to evict other subarrays + if not isinstance(state, dict): + if id != device_id: + self._local_states[id] = self.INVALID + + if id != self.owner: # owner's buffer will be kept (so won't lost the last complete copy) + self._versions[id] = -1 + self._is_complete[id] = None + operations.append(MemoryOperation.evict(id)) + if len(operations) == 0: + operations.append(MemoryOperation.noop()) else: - if device_local_state != self.MODIFIED: - self._local_states[device_id][slices_hash] = self.MODIFIED - return operations + operations.append(MemoryOperation.noop()) + + # update status + if self._is_complete[device_id]: + if device_local_state != self.MODIFIED: + self._local_states[device_id] = self.MODIFIED + else: + if device_local_state != self.MODIFIED: + self._local_states[device_id][slices_hash] = self.MODIFIED + return operations def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperation]: """ Tell the protocol that this device want to clear the copy. - Args: device_id: id of this device keep_one_copy: if true, writeback the last copy to CPU - Return: List[MemoryOperation], could return several MemoryOperations. And the order operations matter. - Note: if this device has the last copy and `keep_one_copy` is false, the whole protocol state will be INVALID then. And the system will lose the copy. Be careful when evict the last copy. """ device_local_state = self._local_states[device_id] operations = [] + evict_last_copy = False if device_local_state == self.INVALID: # already evicted, do nothing operations.append(MemoryOperation.noop()) @@ -548,39 +454,35 @@ def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperat break # this device owns the last copy - if new_owner is None: - if keep_one_copy: - if device_id == CPU_INDEX: - # the last copy is already at CPU, - # do nothing and skip the rest of the code - return [MemoryOperation.noop()] - else: - # write back the last copy to CPU - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) - - # now CPU has exclusive access to the data - self._global_state = self.MODIFIED - self._local_states[CPU_INDEX] = self.MODIFIED - - new_owner = CPU_INDEX - else: - self._global_state = self.INVALID # the system lose the last copy - self.owner = new_owner + if new_owner is None: + evict_last_copy = True + else: + # update states + self._local_states[device_id] = self.INVALID + self._versions[device_id] = -1 + self._is_complete[device_id] = None - # update states - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) + operations.append(MemoryOperation.evict(device_id)) else: # Modified, this device owns the last copy + evict_last_copy = True + + if evict_last_copy: if keep_one_copy: # write back to CPU - self.owner = CPU_INDEX - self._local_states[CPU_INDEX] = self.MODIFIED + if device_id != CPU_INDEX: + operations.extend(self._write_back_to(CPU_INDEX, self.MODIFIED, on_different_device=True, this_device_id=device_id)) - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) + self.owner = CPU_INDEX + self._local_states[CPU_INDEX] = self.MODIFIED + self._is_complete[device_id] = True + else: + return [MemoryOperation.noop()] else: self._global_state = self.INVALID # the system lose the last copy self.owner = None + self._versions[device_id] = -1 + self._is_complete[device_id] = None - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) + self._local_states[device_id] = self.INVALID + operations.append(MemoryOperation.evict(device_id)) return operations diff --git a/parla/parray/core.py b/parla/parray/core.py index 0b469a8..286d9da 100644 --- a/parla/parray/core.py +++ b/parla/parray/core.py @@ -24,6 +24,8 @@ SlicesType = Union[slice, int, tuple] +UINT64_LIMIT = (1 << 64 - 1) + class PArray: """Multi-dimensional array on a CPU or CUDA device. @@ -40,7 +42,7 @@ class PArray: _slices: List[SlicesType] _coherence_cv: Dict[int, threading.Condition] - def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None: + def __init__(self, array: ndarray, parent: "PArray" = None, slices=None, name: str = "NA") -> None: if parent is not None: # create a view (a subarray) of a PArray # inherit parent's buffer and coherence states # so this PArray will becomes a 'view' of its parents @@ -52,6 +54,9 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None # add current slices to the end self._slices.append(slices) + # self._name = parent._name + "::subarray::" + str(slices) + self._name = parent._name + "::subarray::" + str(self._slices) + # inherit parent's condition variables self._coherence_cv = parent._coherence_cv @@ -61,7 +66,9 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None # a unique ID for this subarray # which is the combine of parent id and slice hash self.parent_ID = parent.ID - self.ID = parent.ID * 31 + self._slices_hash # use a prime number to avoid collision + # use a prime number to avoid collision + # modulo over uint64 to make it compatible with C++ end + self.ID = (parent.ID * 31 + self._slices_hash) % UINT64_LIMIT self.nbytes = parent.nbytes # the bytes used by the complete array self.subarray_nbytes = array.nbytes # the bytes used by this subarray @@ -92,11 +99,19 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None self.nbytes = array.nbytes self.subarray_nbytes = self.nbytes # no subarray + self._name = name + # Register the parray with the scheduler task_runtime.get_scheduler_context().scheduler._available_resources.track_parray(self) # Properties: + @property + def name(self): + if self._name is None: + return "PArray::"+str(self.ID) + return self._name + @property def array(self) -> ndarray: """ @@ -174,6 +189,7 @@ def update(self, array) -> None: Note: should be called within the current task context Note: data should be put in OUT/INOUT fields of spawn + Note: should not call this over an sliced array """ this_device = self._current_device_index @@ -200,18 +216,68 @@ def update(self, array) -> None: # update size self.nbytes = array.nbytes + self.subarray_nbytes = self.nbytes + + self._slices = [] # reset coherence - self._coherence = Coherence(this_device, num_gpu) + self._coherence.reset(this_device) # update shape self._array.shape = array.shape - cupy.cuda.stream.get_current_stream().synchronize() + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() + + def print_overview(self) -> None: + """ + Print global overview of current PArray for debugging + """ + state_str_map = {0: "INVALID", + 1: "SHARED", + 2: "MODIFIED"} + + print(f"---Overview of PArray\n" + f"ID: {self.ID}, " + f"Name: {self._name}, " + f"Parent_ID: {self.parent_ID if self.ID != self.parent_ID else None}, " + f"Slice: {self._slices[0] if self.ID != self.parent_ID else None}, " + f"Bytes: {self.subarray_nbytes}, " + f"Owner: {'GPU ' + str(self._coherence.owner) if self._coherence.owner != CPU_INDEX else 'CPU'}") + for device_id, state in self._coherence._local_states.items(): + if device_id == CPU_INDEX: + device_name = "CPU" + else: + device_name = f"GPU {device_id}" + print(f"At {device_name}: ", end="") + + if isinstance(state, dict): + print( + f"state: {[state_str_map[s] for s in list(state.values())]}, including sliced copy: # states of slices is unordered wrt the below slices") + for slice, slice_id in zip(self._array._indices_map[device_id], range(len(self._array._indices_map[device_id]))): + print( + f"\tslice {slice_id} - indices: {slice}, bytes: {self._array._buffer[device_id][slice_id].nbytes}") + else: + print(f"state: {state_str_map[state]}") + print("---End of Overview") # slicing/indexing - def __getitem__(self, slices: SlicesType) -> PArray | Any: + def __getitem__(self, slices: SlicesType | numpy.ndarray | cupy.ndarray) -> PArray | Any: + """ + Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int), ndarray of numpy/cupy + Example: + A[0] # int + A[:] # slice + A[0,:,10] # tuple of int slice int + A[2:10:2, 0, [1, 3, 5]] # tuple of slice int list of Int + Note: `:` equals to slice(None, None, None) + Note: `None` or tuple of `None` is not acceptable (even if `numpy.ndarray` accept `None`) + # TODO: support `None` + """ + if isinstance(slices, numpy.ndarray) or isinstance(slices, cupy.ndarray): + slices = slices.tolist() + if self._slices: # resolve saved slices first ret = self.array[slices] else: @@ -233,9 +299,9 @@ def __getitem__(self, slices: SlicesType) -> PArray | Any: else: return ret - def __setitem__(self, slices: SlicesType, value: PArray | ndarray | Any) -> None: + def __setitem__(self, slices: SlicesType | numpy.ndarray | cupy.ndarray, value: PArray | ndarray | Any) -> None: """ - Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int) + Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int), ndarray of numpy/cupy Example: A[0] # int A[:] # slice @@ -244,11 +310,14 @@ def __setitem__(self, slices: SlicesType, value: PArray | ndarray | Any) -> None Note: `:` equals to slice(None, None, None) Note: `None` or tuple of `None` is not acceptable (even if `numpy.ndarray` accept `None`) - # TODO: support `None` and `ndarray` as slices + # TODO: support `None` """ if isinstance(value, PArray): value = value.array + if isinstance(slices, numpy.ndarray) or isinstance(slices, cupy.ndarray): + slices = slices.tolist() + if self._slices: # resolve saved slices first self.array.__setitem__(slices, value) else: @@ -282,9 +351,7 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None: with self._coherence_cv[device_id]: operations = self._coherence.evict(device_id, keep_one_copy) - for op in operations: - self._process_operation(op) - + self._process_operations(operations) # Coherence update operations: @@ -305,8 +372,10 @@ def _coherence_read(self, device_id: int = None, slices: SlicesType = None) -> N device_id = self._current_device_index # update protocol and get operation - operations = self._coherence.read(device_id, self._slices_hash) # locks involve - self._process_operations(operations, slices) # condition variable involve + with self._coherence._lock: # locks involve + operations = self._coherence.read(device_id, self._slices_hash) + self._process_operations(operations, slices) + print(f"read: {self._name}") def _coherence_write(self, device_id: int = None, slices: SlicesType = None) -> None: """Tell the coherence protocol a write happened on a device. @@ -325,8 +394,10 @@ def _coherence_write(self, device_id: int = None, slices: SlicesType = None) -> device_id = self._current_device_index # update protocol and get operation - operations = self._coherence.write(device_id, self._slices_hash) # locks involve - self._process_operations(operations, slices) # condition variable involve + with self._coherence._lock: # locks involve + operations = self._coherence.write(device_id, self._slices_hash) + self._process_operations(operations, slices) + print(f"write: {self._name}") # Device management methods: @@ -338,42 +409,27 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT for op in operations: if op.inst == MemoryOperation.NOOP: pass # do nothing - elif op.inst == MemoryOperation.CHECK_DATA: - if not self._coherence.data_is_ready(op.src): # if data is not ready, wait - with self._coherence_cv[op.src]: - while not self._coherence.data_is_ready(op.src): - self._coherence_cv[op.src].wait() elif op.inst == MemoryOperation.LOAD: - with self._coherence_cv[op.dst]: # hold the CV when moving data - - # if the flag is set, skip this checking - if not MemoryOperation.SKIP_SRC_CHECK in op.flag: - with self._coherence_cv[op.src]: # wait on src until it is ready - while not self._coherence.data_is_ready(op.src): - self._coherence_cv[op.src].wait() - - if MemoryOperation.LOAD_SUBARRAY in op.flag: - self._array.set_slices_mapping(op.dst, slices) # build slices mapping first + if MemoryOperation.LOAD_SUBARRAY in op.flag: + # build slices mapping first + self._array.set_slices_mapping(op.dst, slices) - # check flag to see if dst is current device - dst_is_current_device = op.flag != MemoryOperation.SWITCH_DEVICE_FLAG + # check flag to see if dst is current device + dst_is_current_device = op.flag != MemoryOperation.SWITCH_DEVICE_FLAG - # copy data - self._array.copy_data_between_device(op.dst, op.src, dst_is_current_device) - - # sync stream before set it as ready, so asyc call is ensured to be done + # copy data + if num_gpu > 0: cupy.cuda.stream.get_current_stream().synchronize() + + self._array.copy_data_between_device( + op.dst, op.src, dst_is_current_device) - # data is ready now - if MemoryOperation.LOAD_SUBARRAY in op.flag: - self._coherence.set_data_as_ready(op.dst, self._slices_hash) # mark it as done - else: - self._coherence.set_data_as_ready(op.dst) - self._coherence_cv[op.dst].notify_all() # let other threads know the data is ready + # sync stream before set it as ready, so asyc call is ensured to be done + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() elif op.inst == MemoryOperation.EVICT: - self._array.clear(op.src) # decrement the reference counter, relying on GC to free the memory - if MemoryOperation.NO_MARK_AS_READY not in op.flag: - self._coherence.set_data_as_ready(op.src, None) # mark it as done + # decrement the reference counter, relying on GC to free the memor + self._array.clear(op.src) elif op.inst == MemoryOperation.ERROR: raise RuntimeError("PArray gets an error from coherence protocol") else: @@ -480,7 +536,6 @@ def __ge__(x, y): else: return x.array.__ge__(y) - # Truth value of an array (bool): def __nonzero__(self): diff --git a/parla/parray/from_data.py b/parla/parray/from_data.py index a934f02..47e232d 100644 --- a/parla/parray/from_data.py +++ b/parla/parray/from_data.py @@ -7,7 +7,7 @@ cupy = numpy # work around of cupy.ndarray -def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=None, on_gpu=False): +def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=None, on_gpu=False, name: str = "NA"): """ Create a Parla array on the specific device (CPU by default). @@ -48,26 +48,29 @@ def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=N # if the input is already an ndarray if isinstance(object, (numpy.ndarray, cupy.ndarray)): if copy: - parray = PArray(object.copy()) + parray = PArray(object.copy(), name=name) else: - parray = PArray(object) - elif isinstance(object, PArray): # Already an PArray + parray = PArray(object, name=name) + elif isinstance(object, PArray): # Already an PArray if copy: - parray = PArray(object.array.copy()) + parray = PArray(object.array.copy(), name=name) else: - parray = PArray(object.array) - else: # create one if it is not an ndarray + parray = PArray(object.array, name=name) + else: # create one if it is not an ndarray if on_gpu: - parray = PArray(cupy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin)) + parray = PArray(cupy.array(object, dtype=dtype, + copy=copy, order=order, subok=subok, ndmin=ndmin), name=name) else: if like: - parray = PArray(numpy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin, like=like)) + parray = PArray(numpy.array( + object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin, like=like), name=name) else: - parray = PArray(numpy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin)) + parray = PArray(numpy.array( + object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin), name=name) return parray -def asarray(a, dtype=None, order=None, like=None, on_gpu=False): +def asarray(a, dtype=None, order=None, like=None, on_gpu=False, name: str = "NA"): """Converts an object to Parla array. This is equivalent to :class:``array(a, dtype, on_gpu, copy=False)``. @@ -104,7 +107,7 @@ def asarray(a, dtype=None, order=None, like=None, on_gpu=False): .. seealso:: :func:`numpy.asarray` """ - return array(a, dtype=dtype, copy=False, order=order, like=like, on_gpu=on_gpu) + return array(a, dtype=dtype, copy=False, order=order, like=like, on_gpu=on_gpu, name=name) def asarray_batch(*args): @@ -140,12 +143,12 @@ def get_parray(object): # recursively process Sequence or Dictionary return type(object)(accumulator) else: raise TypeError(f"Unsupported Type: {type(object)}") - + parla_arrays = [] for arg in args: parla_arrays.append(get_parray(arg)) - + if len(parla_arrays) == 1: - return parla_arrays[0] + return parla_arrays[0] else: return parla_arrays # recommend user to unpack this diff --git a/parla/parray/memory.py b/parla/parray/memory.py index 5ead20d..e582e80 100644 --- a/parla/parray/memory.py +++ b/parla/parray/memory.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING, Union, List, Dict, Tuple, Any import numpy +import ctypes #TODO: Fix this to be more stable and less of a hack. try: @@ -481,11 +482,13 @@ def get_slices_hash(self, global_slices: SlicesType) -> int: prime = 31 if not isinstance(global_slices, tuple): if isinstance(global_slices, list): - hash_value = hash_value * prime + hash(tuple(global_slices)) + # Built-int hash() method might return negtive value. + # c_size_t is to ensure it is not negative + hash_value = hash_value * prime + ctypes.c_size_t(hash(tuple(global_slices))).value elif isinstance(global_slices, slice): - hash_value = hash_value * prime + hash(global_slices.indices(self.shape[0])) + hash_value = hash_value * prime + ctypes.c_size_t(hash(global_slices.indices(self.shape[0]))).value else: - hash_value = hash_value * prime + hash(global_slices) + hash_value = hash_value * prime + ctypes.c_size_t(hash(global_slices)).value else: if len(self.shape) < len(global_slices): raise IndexError(f"index out of range, index:{global_slices}") @@ -493,11 +496,11 @@ def get_slices_hash(self, global_slices: SlicesType) -> int: for d in range(len(global_slices)): index = global_slices[d] if isinstance(index, list): - hash_value = hash_value * prime + hash(tuple(index)) + hash_value = hash_value * prime + ctypes.c_size_t(hash(tuple(index))).value elif isinstance(index, slice): - hash_value = hash_value * prime + hash(index.indices(self.shape[d])) + hash_value = hash_value * prime + ctypes.c_size_t(hash(index.indices(self.shape[d]))).value else: - hash_value = hash_value * prime + hash(index) + hash_value = hash_value * prime + ctypes.c_size_t(hash(index)).value return hash_value diff --git a/tests/test_parray.py b/tests/test_parray.py new file mode 100644 index 0000000..d3d20e0 --- /dev/null +++ b/tests/test_parray.py @@ -0,0 +1,92 @@ +import pytest + +from parla import Parla, spawn, TaskSpace, parray +from parla.cuda import gpu +from parla.cpu import cpu +from parla.parray.coherence import Coherence +import numpy as np + +def test_parray_creation(): + with Parla(): + A = parray.asarray([[1, 2], [3, 4]]) + + a = A[0] + assert A[0,1] == 2 + assert A[1,0] == 3 + assert A[1,1] == 4 + assert np.array_equal(A, np.asarray([[1, 2], [3, 4]])) + +def test_parray_task(): + with Parla(): + @spawn(placement=cpu) + def test(): + np.random.seed(10) + # Construct input data + a = np.array([[1, 2, 4, 5, 6], [1, 2, 4, 5, 6], [1, 2, 4, 5, 6], [1, 2, 4, 5, 6]]) + b = np.array([[1, 2, 4, 5, 6], [1, 2, 4, 5, 6], [1, 2, 4, 5, 6], [1, 2, 4, 5, 6]]) + a = parray.asarray(a, name="A") + b = parray.asarray(b, name="B") + + ts = TaskSpace("CopyBack") + + assert a._current_device_index == -1 + assert a._array._buffer[1] is None + assert a._array._buffer[-1] is not None + assert a._coherence._local_states[1] == Coherence.INVALID + assert a._coherence._local_states[-1] == Coherence.MODIFIED + + @spawn(ts[1], placement=gpu[1], output=[b]) + def check_array_write(): + assert b[0,0] == 1 + assert b._current_device_index == 1 + b.print_overview() + + b[1,1] = 0 + assert b[1,1] == 0 + assert b._array._buffer[1] is not None + assert b._array._buffer[-1] is None + assert b._coherence._local_states[-1] == Coherence.INVALID + assert b._coherence._local_states[1] == Coherence.MODIFIED + + @spawn(ts[2], dependencies=[ts[1]], placement=gpu[0], output=[a[0:2], a[2]]) + def check_array_slicing(): + assert a[1,0] == 1 + assert a._current_device_index == 0 + a[0:2].print_overview() + + a[1,1] = 0 + assert a[1,1] == 0 + assert a._array._buffer[-1] is not None + assert isinstance(a._array._buffer[0], list) + assert a._coherence._local_states[-1] == Coherence.INVALID + assert isinstance(a._coherence._local_states[0], dict) + + @spawn(ts[3], dependencies=[ts[2]], placement=cpu, output=[a]) + def check_array_write_back(): + assert a[1,1] == 0 + assert a._current_device_index == -1 + a.print_overview() + + assert a._array._buffer[-1] is not None + assert a._array._buffer[0] is None + assert a._coherence._local_states[-1] == Coherence.MODIFIED + assert a._coherence._local_states[0] == Coherence.INVALID + + @spawn(ts[4], dependencies=[ts[3]], placement=gpu[1], inout=[a]) + def check_array_update(): + a.update(np.array([1,2,3,4])) # here np array are converted to cupy array + + assert len(a) == 4 + assert a[-1] == 4 + assert a._coherence.owner == 1 + + @spawn(ts[5], dependencies=[ts[4]], placement=gpu[0], inout=[a]) + def check_array_indexing(): + a[np.array([0,2])] = 0 + assert len(a) == 4 + assert a[0] == 0 + assert a._coherence.owner == 0 + +if __name__=="__main__": + test_parray_creation() + test_parray_task() \ No newline at end of file diff --git a/tutorial/6_automatic_data_movement/README.md b/tutorial/6_automatic_data_movement/README.md index 2a3ae81..c7a9229 100644 --- a/tutorial/6_automatic_data_movement/README.md +++ b/tutorial/6_automatic_data_movement/README.md @@ -120,3 +120,34 @@ What's more, this memory model requires the application to be data race free in For example, if there is no dependence between task A and task B, they may be scheduled to run in parallel. If A write on a PArray and B read from it, a data race will happen between two tasks and the behavior is undefined. For fine grained data movement, read and write on two different portion of a PArray is not considered to be a race and could run correctly with parallel tasks. However, read and write on a portion and a complete copy will be considered as a race. + +#### Writeback of Subarray +Since we assume subarray are not overlapped, PArray runtime will not writeback changes of subarray in other deivces unless a writeback is triggered. + +That means the following example won't work since task_B will not see changes in task_A. +``` +@spawn(t[0], output=[arr[0]],placement=gpu[0]) +def task_a(): + arr[0] = 1 + +@spawn(t[1], dependencies=[t[0]], input=[arr[0]],placement=gpu[1]) +def task_b(): + assert arr[0] == 1 # will fail since task_a's change has not been writeback to other devices +``` + +Therefore, whenever you want to access overlapping subarray (including the same subarray in different devices), you need to insert a writeback task between them to sync the data, which should be + +``` +@spawn(t[0], output=[arr[0]],placement=gpu[0]) +def task_a(): + arr[0] = 1 + +@spawn(a[0], dependencies=[t[0]], inout=[arr],placement=cpu) # try to access array with `inout` will trigger writeback to that device +def write_back(): + pass # do nothing is okay + +@spawn(t[1], dependencies=[t[0], a[0]], input=[arr[0]],placement=gpu[1]) +def task_b(): + assert arr[0] == 1 # success now +``` +