Skip to content

Commit

Permalink
Merge pull request #150 from ut-parla/fix/parla-update
Browse files Browse the repository at this point in the history
import new parray code from parla-experiment main
  • Loading branch information
yinengy authored Apr 16, 2023
2 parents cbe1469 + edd27a8 commit 4843c13
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 369 deletions.
506 changes: 204 additions & 302 deletions parla/parray/coherence.py

Large diffs are not rendered by default.

147 changes: 101 additions & 46 deletions parla/parray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
SlicesType = Union[slice, int, tuple]


UINT64_LIMIT = (1 << 64 - 1)

class PArray:
"""Multi-dimensional array on a CPU or CUDA device.
Expand All @@ -40,7 +42,7 @@ class PArray:
_slices: List[SlicesType]
_coherence_cv: Dict[int, threading.Condition]

def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None:
def __init__(self, array: ndarray, parent: "PArray" = None, slices=None, name: str = "NA") -> None:
if parent is not None: # create a view (a subarray) of a PArray
# inherit parent's buffer and coherence states
# so this PArray will becomes a 'view' of its parents
Expand All @@ -52,6 +54,9 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None
# add current slices to the end
self._slices.append(slices)

# self._name = parent._name + "::subarray::" + str(slices)
self._name = parent._name + "::subarray::" + str(self._slices)

# inherit parent's condition variables
self._coherence_cv = parent._coherence_cv

Expand All @@ -61,7 +66,9 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None
# a unique ID for this subarray
# which is the combine of parent id and slice hash
self.parent_ID = parent.ID
self.ID = parent.ID * 31 + self._slices_hash # use a prime number to avoid collision
# use a prime number to avoid collision
# modulo over uint64 to make it compatible with C++ end
self.ID = (parent.ID * 31 + self._slices_hash) % UINT64_LIMIT

self.nbytes = parent.nbytes # the bytes used by the complete array
self.subarray_nbytes = array.nbytes # the bytes used by this subarray
Expand Down Expand Up @@ -92,11 +99,19 @@ def __init__(self, array: ndarray, parent: "PArray" = None, slices=None) -> None
self.nbytes = array.nbytes
self.subarray_nbytes = self.nbytes # no subarray

self._name = name

# Register the parray with the scheduler
task_runtime.get_scheduler_context().scheduler._available_resources.track_parray(self)

# Properties:

@property
def name(self):
if self._name is None:
return "PArray::"+str(self.ID)
return self._name

@property
def array(self) -> ndarray:
"""
Expand Down Expand Up @@ -174,6 +189,7 @@ def update(self, array) -> None:
Note: should be called within the current task context
Note: data should be put in OUT/INOUT fields of spawn
Note: should not call this over an sliced array
"""
this_device = self._current_device_index

Expand All @@ -200,18 +216,68 @@ def update(self, array) -> None:

# update size
self.nbytes = array.nbytes
self.subarray_nbytes = self.nbytes

self._slices = []

# reset coherence
self._coherence = Coherence(this_device, num_gpu)
self._coherence.reset(this_device)

# update shape
self._array.shape = array.shape

cupy.cuda.stream.get_current_stream().synchronize()
if num_gpu > 0:
cupy.cuda.stream.get_current_stream().synchronize()

def print_overview(self) -> None:
"""
Print global overview of current PArray for debugging
"""
state_str_map = {0: "INVALID",
1: "SHARED",
2: "MODIFIED"}

print(f"---Overview of PArray\n"
f"ID: {self.ID}, "
f"Name: {self._name}, "
f"Parent_ID: {self.parent_ID if self.ID != self.parent_ID else None}, "
f"Slice: {self._slices[0] if self.ID != self.parent_ID else None}, "
f"Bytes: {self.subarray_nbytes}, "
f"Owner: {'GPU ' + str(self._coherence.owner) if self._coherence.owner != CPU_INDEX else 'CPU'}")
for device_id, state in self._coherence._local_states.items():
if device_id == CPU_INDEX:
device_name = "CPU"
else:
device_name = f"GPU {device_id}"
print(f"At {device_name}: ", end="")

if isinstance(state, dict):
print(
f"state: {[state_str_map[s] for s in list(state.values())]}, including sliced copy: # states of slices is unordered wrt the below slices")
for slice, slice_id in zip(self._array._indices_map[device_id], range(len(self._array._indices_map[device_id]))):
print(
f"\tslice {slice_id} - indices: {slice}, bytes: {self._array._buffer[device_id][slice_id].nbytes}")
else:
print(f"state: {state_str_map[state]}")
print("---End of Overview")

# slicing/indexing

def __getitem__(self, slices: SlicesType) -> PArray | Any:
def __getitem__(self, slices: SlicesType | numpy.ndarray | cupy.ndarray) -> PArray | Any:
"""
Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int), ndarray of numpy/cupy
Example:
A[0] # int
A[:] # slice
A[0,:,10] # tuple of int slice int
A[2:10:2, 0, [1, 3, 5]] # tuple of slice int list of Int
Note: `:` equals to slice(None, None, None)
Note: `None` or tuple of `None` is not acceptable (even if `numpy.ndarray` accept `None`)
# TODO: support `None`
"""
if isinstance(slices, numpy.ndarray) or isinstance(slices, cupy.ndarray):
slices = slices.tolist()

if self._slices: # resolve saved slices first
ret = self.array[slices]
else:
Expand All @@ -233,9 +299,9 @@ def __getitem__(self, slices: SlicesType) -> PArray | Any:
else:
return ret

def __setitem__(self, slices: SlicesType, value: PArray | ndarray | Any) -> None:
def __setitem__(self, slices: SlicesType | numpy.ndarray | cupy.ndarray, value: PArray | ndarray | Any) -> None:
"""
Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int)
Acceptable Slices: Slice, Int, tuple of (Slice, Int, List of Int), ndarray of numpy/cupy
Example:
A[0] # int
A[:] # slice
Expand All @@ -244,11 +310,14 @@ def __setitem__(self, slices: SlicesType, value: PArray | ndarray | Any) -> None
Note: `:` equals to slice(None, None, None)
Note: `None` or tuple of `None` is not acceptable (even if `numpy.ndarray` accept `None`)
# TODO: support `None` and `ndarray` as slices
# TODO: support `None`
"""
if isinstance(value, PArray):
value = value.array

if isinstance(slices, numpy.ndarray) or isinstance(slices, cupy.ndarray):
slices = slices.tolist()

if self._slices: # resolve saved slices first
self.array.__setitem__(slices, value)
else:
Expand Down Expand Up @@ -282,9 +351,7 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None:

with self._coherence_cv[device_id]:
operations = self._coherence.evict(device_id, keep_one_copy)
for op in operations:
self._process_operation(op)

self._process_operations(operations)

# Coherence update operations:

Expand All @@ -305,8 +372,10 @@ def _coherence_read(self, device_id: int = None, slices: SlicesType = None) -> N
device_id = self._current_device_index

# update protocol and get operation
operations = self._coherence.read(device_id, self._slices_hash) # locks involve
self._process_operations(operations, slices) # condition variable involve
with self._coherence._lock: # locks involve
operations = self._coherence.read(device_id, self._slices_hash)
self._process_operations(operations, slices)
print(f"read: {self._name}")

def _coherence_write(self, device_id: int = None, slices: SlicesType = None) -> None:
"""Tell the coherence protocol a write happened on a device.
Expand All @@ -325,8 +394,10 @@ def _coherence_write(self, device_id: int = None, slices: SlicesType = None) ->
device_id = self._current_device_index

# update protocol and get operation
operations = self._coherence.write(device_id, self._slices_hash) # locks involve
self._process_operations(operations, slices) # condition variable involve
with self._coherence._lock: # locks involve
operations = self._coherence.write(device_id, self._slices_hash)
self._process_operations(operations, slices)
print(f"write: {self._name}")

# Device management methods:

Expand All @@ -338,42 +409,27 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT
for op in operations:
if op.inst == MemoryOperation.NOOP:
pass # do nothing
elif op.inst == MemoryOperation.CHECK_DATA:
if not self._coherence.data_is_ready(op.src): # if data is not ready, wait
with self._coherence_cv[op.src]:
while not self._coherence.data_is_ready(op.src):
self._coherence_cv[op.src].wait()
elif op.inst == MemoryOperation.LOAD:
with self._coherence_cv[op.dst]: # hold the CV when moving data

# if the flag is set, skip this checking
if not MemoryOperation.SKIP_SRC_CHECK in op.flag:
with self._coherence_cv[op.src]: # wait on src until it is ready
while not self._coherence.data_is_ready(op.src):
self._coherence_cv[op.src].wait()

if MemoryOperation.LOAD_SUBARRAY in op.flag:
self._array.set_slices_mapping(op.dst, slices) # build slices mapping first
if MemoryOperation.LOAD_SUBARRAY in op.flag:
# build slices mapping first
self._array.set_slices_mapping(op.dst, slices)

# check flag to see if dst is current device
dst_is_current_device = op.flag != MemoryOperation.SWITCH_DEVICE_FLAG
# check flag to see if dst is current device
dst_is_current_device = op.flag != MemoryOperation.SWITCH_DEVICE_FLAG

# copy data
self._array.copy_data_between_device(op.dst, op.src, dst_is_current_device)

# sync stream before set it as ready, so asyc call is ensured to be done
# copy data
if num_gpu > 0:
cupy.cuda.stream.get_current_stream().synchronize()

self._array.copy_data_between_device(
op.dst, op.src, dst_is_current_device)

# data is ready now
if MemoryOperation.LOAD_SUBARRAY in op.flag:
self._coherence.set_data_as_ready(op.dst, self._slices_hash) # mark it as done
else:
self._coherence.set_data_as_ready(op.dst)
self._coherence_cv[op.dst].notify_all() # let other threads know the data is ready
# sync stream before set it as ready, so asyc call is ensured to be done
if num_gpu > 0:
cupy.cuda.stream.get_current_stream().synchronize()
elif op.inst == MemoryOperation.EVICT:
self._array.clear(op.src) # decrement the reference counter, relying on GC to free the memory
if MemoryOperation.NO_MARK_AS_READY not in op.flag:
self._coherence.set_data_as_ready(op.src, None) # mark it as done
# decrement the reference counter, relying on GC to free the memor
self._array.clear(op.src)
elif op.inst == MemoryOperation.ERROR:
raise RuntimeError("PArray gets an error from coherence protocol")
else:
Expand Down Expand Up @@ -480,7 +536,6 @@ def __ge__(x, y):
else:
return x.array.__ge__(y)


# Truth value of an array (bool):

def __nonzero__(self):
Expand Down
33 changes: 18 additions & 15 deletions parla/parray/from_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
cupy = numpy # work around of cupy.ndarray


def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=None, on_gpu=False):
def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=None, on_gpu=False, name: str = "NA"):
"""
Create a Parla array on the specific device (CPU by default).
Expand Down Expand Up @@ -48,26 +48,29 @@ def array(object, dtype=None, copy=True, order='K', subok=False, ndmin=0, like=N
# if the input is already an ndarray
if isinstance(object, (numpy.ndarray, cupy.ndarray)):
if copy:
parray = PArray(object.copy())
parray = PArray(object.copy(), name=name)
else:
parray = PArray(object)
elif isinstance(object, PArray): # Already an PArray
parray = PArray(object, name=name)
elif isinstance(object, PArray): # Already an PArray
if copy:
parray = PArray(object.array.copy())
parray = PArray(object.array.copy(), name=name)
else:
parray = PArray(object.array)
else: # create one if it is not an ndarray
parray = PArray(object.array, name=name)
else: # create one if it is not an ndarray
if on_gpu:
parray = PArray(cupy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin))
parray = PArray(cupy.array(object, dtype=dtype,
copy=copy, order=order, subok=subok, ndmin=ndmin), name=name)
else:
if like:
parray = PArray(numpy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin, like=like))
parray = PArray(numpy.array(
object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin, like=like), name=name)
else:
parray = PArray(numpy.array(object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin))
parray = PArray(numpy.array(
object, dtype=dtype, copy=copy, order=order, subok=subok, ndmin=ndmin), name=name)
return parray


def asarray(a, dtype=None, order=None, like=None, on_gpu=False):
def asarray(a, dtype=None, order=None, like=None, on_gpu=False, name: str = "NA"):
"""Converts an object to Parla array.
This is equivalent to :class:``array(a, dtype, on_gpu, copy=False)``.
Expand Down Expand Up @@ -104,7 +107,7 @@ def asarray(a, dtype=None, order=None, like=None, on_gpu=False):
.. seealso:: :func:`numpy.asarray`
"""
return array(a, dtype=dtype, copy=False, order=order, like=like, on_gpu=on_gpu)
return array(a, dtype=dtype, copy=False, order=order, like=like, on_gpu=on_gpu, name=name)


def asarray_batch(*args):
Expand Down Expand Up @@ -140,12 +143,12 @@ def get_parray(object): # recursively process Sequence or Dictionary
return type(object)(accumulator)
else:
raise TypeError(f"Unsupported Type: {type(object)}")

parla_arrays = []
for arg in args:
parla_arrays.append(get_parray(arg))

if len(parla_arrays) == 1:
return parla_arrays[0]
return parla_arrays[0]
else:
return parla_arrays # recommend user to unpack this
15 changes: 9 additions & 6 deletions parla/parray/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import TYPE_CHECKING, Union, List, Dict, Tuple, Any

import numpy
import ctypes

#TODO: Fix this to be more stable and less of a hack.
try:
Expand Down Expand Up @@ -481,23 +482,25 @@ def get_slices_hash(self, global_slices: SlicesType) -> int:
prime = 31
if not isinstance(global_slices, tuple):
if isinstance(global_slices, list):
hash_value = hash_value * prime + hash(tuple(global_slices))
# Built-int hash() method might return negtive value.
# c_size_t is to ensure it is not negative
hash_value = hash_value * prime + ctypes.c_size_t(hash(tuple(global_slices))).value
elif isinstance(global_slices, slice):
hash_value = hash_value * prime + hash(global_slices.indices(self.shape[0]))
hash_value = hash_value * prime + ctypes.c_size_t(hash(global_slices.indices(self.shape[0]))).value
else:
hash_value = hash_value * prime + hash(global_slices)
hash_value = hash_value * prime + ctypes.c_size_t(hash(global_slices)).value
else:
if len(self.shape) < len(global_slices):
raise IndexError(f"index out of range, index:{global_slices}")

for d in range(len(global_slices)):
index = global_slices[d]
if isinstance(index, list):
hash_value = hash_value * prime + hash(tuple(index))
hash_value = hash_value * prime + ctypes.c_size_t(hash(tuple(index))).value
elif isinstance(index, slice):
hash_value = hash_value * prime + hash(index.indices(self.shape[d]))
hash_value = hash_value * prime + ctypes.c_size_t(hash(index.indices(self.shape[d]))).value
else:
hash_value = hash_value * prime + hash(index)
hash_value = hash_value * prime + ctypes.c_size_t(hash(index)).value

return hash_value

Expand Down
Loading

0 comments on commit 4843c13

Please sign in to comment.