-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] first draft of a controller-worker wrapper for heat #823
base: main
Are you sure you want to change the base?
Conversation
GPU cluster tests are currently disabled on this Pull Request. |
0d7edce
to
d9b8c92
Compare
I used black versions 19 and 21 locally, always accepting current formatting (I used flags from pyproject.toml). |
the recommended way to get if you want to run the hooks without a commit the command is |
Codecov Report
@@ Coverage Diff @@
## main #823 +/- ##
==========================================
- Coverage 94.62% 87.04% -7.59%
==========================================
Files 65 69 +4
Lines 9976 10443 +467
==========================================
- Hits 9440 9090 -350
- Misses 536 1353 +817
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
aafd280
to
4a2727d
Compare
d81e612
to
8c34fd9
Compare
a99ba9a
to
1b0a4bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Things look pretty good here. good work!
There are some places where documentation could be a bit better, but i think that the best way to review this in the future would be to look at the distributor
before the __init__.py
. I left a few comments and questions in the code itself as well
from array arguments. For this we assume that array arguments never occur | ||
after non-array arguments. Each function.task handles and passes array-typed | ||
and non-array-types arguments separately. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to have a simple code block to show how to run in CW mode. I can see it in the class below but maybe point to it here for clarity
|
||
|
||
def _setComm(c): | ||
# return impl.use_comm(impl.MPICommunication(c.Create(c.group.Excl([0])))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably. In some version of this the controller did not participate in computation so the workers had their own communicator. It turned out that the communication overhead triggered by the non-HeAT code on the controller quickly becomes too high.
For now we assume all ranks (controller and workers) are started through mpirun, | ||
workers will never leave distributor.start() and so this function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this assumption is something that we should make generally. if we assume that the mpi ranks are spawned by mpirun/exec/slurm, then we can move between systems much more easily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What "systems" are you referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the systems that i refer to here are general MPI capable clusters. If we build on the assumption that the ranks are started by the scheduler, it is probably a bit easier to work with in the future. I think that this comment was a bit superfluous on my end. if we were to set up some way to get the same general calls, we could just as easily use NCCL or gloo. My intention originally was to make the assumptions clear, but you have done that in this comment
def reset(): | ||
""" | ||
Reset all internal state. | ||
Distributed objects created before calling reset cannot be used afterwards. | ||
""" | ||
_runner.distributor.reset() | ||
|
||
|
||
def sync(): | ||
""" | ||
Trigger all computation. | ||
""" | ||
_runner.distributor.go(True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these functions need to have global _runner
called before to make sure that they are using the right one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
def __exit__(self, exc_type, exc_value, exc_traceback): | ||
if _runner.comm.rank == 0: | ||
fini() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am seeing _runner
a lot already, should this be a class parameter instead of a global? would that break the syntax with Ray or the arrayapi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. Passing it around would require a class-variable anyway unless we want to break the API. Semantically _runner
is a module-private singleton, needed at various places within the module (e.g. in more than one class). Making it a module-global object looks like the most pythonic approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense to me
""" | ||
A task queue, each rank holds one for queuing up local tasks. | ||
We currently dissallow submitting tasks by on-root ranks. | ||
Non-root ranks get their TaskQueue set in the recv-lop if init(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-root ranks get their TaskQueue set in the recv-lop if init(). | |
Non-root ranks get their TaskQueue set in the recv-lop of init(). |
else: | ||
print("Entering worker loop", flush=True) | ||
done = False | ||
header = None | ||
while not done: | ||
# wait in bcast for work | ||
header = self._comm.bcast(header, 0) | ||
# then see what we need to do | ||
if header[0] == TASK: | ||
self._tQueue._taskQueue = header[1] | ||
elif header[0] == GET: | ||
# We do not support arrays yet, scalars do not need communication | ||
assert False | ||
elif header[0] == GO: | ||
self._tQueue.go() | ||
if header[1]: | ||
self._comm.Barrier() | ||
elif header[0] == GETPART: | ||
if self._comm.rank == header[1]: | ||
val = _RemoteTask.getVal(header[2]) | ||
attr = getattr(val, header[3]) | ||
self._comm.send(attr, dest=0, tag=GETPART) | ||
elif header[0] == PUBPART: | ||
val = _RemoteTask.getVal(header[1]) | ||
attr = header[3](getattr(val, header[2])) | ||
self._comm.gather(attr, root=0) | ||
elif header[0] == RESET: | ||
_RemoteTask.reset() | ||
self._tQueue.clear() | ||
# Handle._reset() | ||
elif header[0] == END: | ||
done = True | ||
self._comm.Barrier() | ||
break | ||
else: | ||
raise Exception("Worker received unknown tag") | ||
MPI.Finalize() | ||
if doExit: | ||
sys.exit() | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of this can be dropped down a tab block. the other if block returns
We keep a static dictionary mapping globally unique identifiers to dependent | ||
global objects (like heat.DNDarrays). This keeps the objects alive and allows | ||
communicating through simple integers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this become a memory leak for larger applications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lately added some GC for arrays. Whenever an array gets deleted on the controller the corresponding objects will eventually get deleted from all these dicts, too. This does not work yet for non-array objects and I am not sure if/how we could do this automatically. There is a reset
available to allow explicitly emptying the dicts.
with ht.cw4h() as cw: | ||
if cw.controller(): | ||
a = ht.arange(8, split=0) | ||
b = ht.ones(8, split=0) | ||
c = a @ b | ||
# assert hasattr(c, "__partitioned__") | ||
print(type(c)) | ||
p = c.__partitioned__() | ||
print(c.shape, c, p) | ||
for k, v in p["partitions"].items(): | ||
print(k, p["get"](v["data"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the possible memory leaks mentioned before, does cleanup of the cw
object cleanup the handle dictionary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't think we can just reset the dict when cw
gets out of scope because we might want some of the arrays/values to persist (e.g. maintain Python semantics). As mentioned above, at least the arrays should get correctly removed from the dict anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are targeting larger datasets in general, i think that removing the array is enough. If the dict gets so large that its a problem then the reset
is always an option.
👇 Click on the image for a new way to code review
Legend |
Description
A first draft for a simple wrapper allowing controller-worker style execution of numpy codes using HeAT.
Changes proposed:
Type of change
Due Diligence
Tested with 3 benchmarks: https://github.com/IntelPython/dpbench/tree/feature/dist/distributed
Just change import statement in the heat versions to activate cw4heat.
Does this change modify the behaviour of other functions? If so, which?
no
skip ci