Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use per-device locks to update resource status #123

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions parla/task_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,22 +1074,28 @@ class ResourcePool:

# Resource pools track device resources. Environments are a separate issue and are not tracked here. Instead,
# tasks will consume resources based on their devices even though those devices are bundled into an environment.

def __init__(self):
self._monitor = threading.Condition(threading.Lock())

# Devices are stored in a dict keyed by the device.
# Each entry stores a dict with cores, memory, etc. info based on the architecture
self._devices = self._initial_resources()

self._device_locks = [threading.Lock() for d in range(len(self._devices))]
# Parla tracks managed PArrays' locations
# Index into dict with id(array), then with device. True means the array is present there
# We use the unique id of the array as the key because PArray is an unhashable class
self._managed_parrays = {}

@staticmethod
def _initial_resources():
return {dev: {name: amt for name, amt in dev.resources.items()} for dev in get_all_devices()}
res_dict = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to give index; individual lock exists for each device. But I have no idea how to write it in more pythonic way :-)

i = 0
for dev in get_all_devices():
res_dict[dev] = {}
res_dict[dev]["dev_id"] = i
i += 1
for name, amt in dev.resources.items():
res_dict[dev][name] = amt
return res_dict

### RESOURCE ALLOCATION CALLS ###
# These may be over-engineered, by I (Sean) haven't touched them.
Expand Down Expand Up @@ -1126,8 +1132,7 @@ def check_resources_availability(self, d: Device, resources: ResourceDict):
:param resources: The resources to deallocate.
"""
logger.debug("[ResourcePool] Acquiring monitor in check_resources_availability()")
with self._monitor:

with self._device_locks[self._devices[d]["dev_id"]]:
is_available = True
for name, amount in resources.items():
dres = self._devices[d]
Expand All @@ -1144,7 +1149,7 @@ def check_resources_availability(self, d: Device, resources: ResourceDict):

def _atomically_update_resources(self, d: Device, resources: ResourceDict, multiplier, block: bool):
logger.debug("[ResourcePool] Acquiring monitor in atomically_update_resources()")
with self._monitor:
with self._device_locks[self._devices[d]["dev_id"]]:
to_release = []
success = True
for name, v in resources.items():
Expand Down Expand Up @@ -1174,23 +1179,17 @@ def _update_resource(self, dev: Device, res: str, amount: float, block: bool):
if dev.architecture.id == 'gpu' and res == 'vcus':
return True
try:
while True: # contains return
logger.debug("Trying to allocate %d %s on %r", amount, res, dev)
dres = self._devices[dev]
if -amount <= dres[res]:
dres[res] += amount
if amount > 0:
self._monitor.notify_all()
assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res)
assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res)
return True
else:
logger.info("If you're seeing this message, you probably have an issue.")
logger.info("The current mapper should never try to allocate resources that aren't actually available")
if block:
self._monitor.wait()
else:
return False
logger.debug("Trying to allocate %d %s on %r", amount, res, dev)
dres = self._devices[dev]
if -amount <= dres[res]:
dres[res] += amount
assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res)
assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res)
return True
else:
logger.info("If you're seeing this message, you probably have an issue.")
logger.info("The current mapper should never try to allocate resources that aren't actually available")
return False
except KeyError:
raise ValueError("Resource {}.{} does not exist".format(dev, res))

Expand Down