From d0d99994b759d3db410d1da7e88dd393492e5597 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Thu, 28 Apr 2022 00:14:04 -0500 Subject: [PATCH] Use per-device locks to update resource status --- parla/task_runtime.py | 47 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 5b8020f7..826bc169 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -1074,14 +1074,12 @@ class ResourcePool: # Resource pools track device resources. Environments are a separate issue and are not tracked here. Instead, # tasks will consume resources based on their devices even though those devices are bundled into an environment. - def __init__(self): self._monitor = threading.Condition(threading.Lock()) - # Devices are stored in a dict keyed by the device. # Each entry stores a dict with cores, memory, etc. info based on the architecture self._devices = self._initial_resources() - + self._device_locks = [threading.Lock() for d in range(len(self._devices))] # Parla tracks managed PArrays' locations # Index into dict with id(array), then with device. True means the array is present there # We use the unique id of the array as the key because PArray is an unhashable class @@ -1089,7 +1087,15 @@ def __init__(self): @staticmethod def _initial_resources(): - return {dev: {name: amt for name, amt in dev.resources.items()} for dev in get_all_devices()} + res_dict = {} + i = 0 + for dev in get_all_devices(): + res_dict[dev] = {} + res_dict[dev]["dev_id"] = i + i += 1 + for name, amt in dev.resources.items(): + res_dict[dev][name] = amt + return res_dict ### RESOURCE ALLOCATION CALLS ### # These may be over-engineered, by I (Sean) haven't touched them. @@ -1126,8 +1132,7 @@ def check_resources_availability(self, d: Device, resources: ResourceDict): :param resources: The resources to deallocate. """ logger.debug("[ResourcePool] Acquiring monitor in check_resources_availability()") - with self._monitor: - + with self._device_locks[self._devices[d]["dev_id"]]: is_available = True for name, amount in resources.items(): dres = self._devices[d] @@ -1144,7 +1149,7 @@ def check_resources_availability(self, d: Device, resources: ResourceDict): def _atomically_update_resources(self, d: Device, resources: ResourceDict, multiplier, block: bool): logger.debug("[ResourcePool] Acquiring monitor in atomically_update_resources()") - with self._monitor: + with self._device_locks[self._devices[d]["dev_id"]]: to_release = [] success = True for name, v in resources.items(): @@ -1174,23 +1179,17 @@ def _update_resource(self, dev: Device, res: str, amount: float, block: bool): if dev.architecture.id == 'gpu' and res == 'vcus': return True try: - while True: # contains return - logger.debug("Trying to allocate %d %s on %r", amount, res, dev) - dres = self._devices[dev] - if -amount <= dres[res]: - dres[res] += amount - if amount > 0: - self._monitor.notify_all() - assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res) - assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res) - return True - else: - logger.info("If you're seeing this message, you probably have an issue.") - logger.info("The current mapper should never try to allocate resources that aren't actually available") - if block: - self._monitor.wait() - else: - return False + logger.debug("Trying to allocate %d %s on %r", amount, res, dev) + dres = self._devices[dev] + if -amount <= dres[res]: + dres[res] += amount + assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res) + assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res) + return True + else: + logger.info("If you're seeing this message, you probably have an issue.") + logger.info("The current mapper should never try to allocate resources that aren't actually available") + return False except KeyError: raise ValueError("Resource {}.{} does not exist".format(dev, res))