Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] first draft of a controller-worker wrapper for heat #823

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

fschlimb
Copy link
Contributor

Description

A first draft for a simple wrapper allowing controller-worker style execution of numpy codes using HeAT.

Changes proposed:

  • The current implementation is limited and still requires mpirun, but only rank0 will execute the program, workers will sit in a loop and wait for work
  • Working on abstractions which will allow attaching to processes and collaborate with ray and alike
  • Right now it's all or nothing, later we might want to allow entering/leaving a controller-worker region

Type of change

  • New feature (non-breaking change which adds functionality)

Due Diligence

Tested with 3 benchmarks: https://github.com/IntelPython/dpbench/tree/feature/dist/distributed
Just change import statement in the heat versions to activate cw4heat.

  • All split configurations tested
  • Multiple dtypes tested in relevant functions
  • Documentation updated (if needed)
  • Updated changelog.md under the title "Pending Additions"

Does this change modify the behaviour of other functions? If so, which?

no

skip ci

@mtar
Copy link
Collaborator

mtar commented Jun 25, 2021

GPU cluster tests are currently disabled on this Pull Request.

@fschlimb fschlimb changed the title first draft of a controller-worker wrapper for heat [WIP] first draft of a controller-worker wrapper for heat Jun 25, 2021
@fschlimb fschlimb force-pushed the Enhancement/cw4heat branch from 0d7edce to d9b8c92 Compare July 5, 2021 08:39
@fschlimb
Copy link
Contributor Author

fschlimb commented Jul 5, 2021

I used black versions 19 and 21 locally, always accepting current formatting (I used flags from pyproject.toml).
Any idea why it's behaving differently in CI?

@coquelin77
Copy link
Member

coquelin77 commented Jul 5, 2021

the recommended way to get black running in a way which is unified with the codebase is to use pre-commit (pip install pre-commit or conda install -c conda-forge pre-commit) then run pre-commit install. This will add the commit hooks to git and when there is a commit it should run everything automatically.

if you want to run the hooks without a commit the command is pre-commit run --all to run on all files, or individual files can also be named

@codecov
Copy link

codecov bot commented Jul 5, 2021

Codecov Report

Merging #823 (68dcc20) into main (4a1fc1a) will decrease coverage by 7.58%.
The diff coverage is 0.42%.

@@            Coverage Diff             @@
##             main     #823      +/-   ##
==========================================
- Coverage   94.62%   87.04%   -7.59%     
==========================================
  Files          65       69       +4     
  Lines        9976    10443     +467     
==========================================
- Hits         9440     9090     -350     
- Misses        536     1353     +817     
Flag Coverage Δ
gpu ?
unit 87.04% <0.42%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
heat/cw4heat/__init__.py 0.00% <0.00%> (ø)
heat/cw4heat/arrayapi.py 0.00% <0.00%> (ø)
heat/cw4heat/distributor.py 0.00% <0.00%> (ø)
heat/cw4heat/ray_runner.py 0.00% <0.00%> (ø)
heat/core/dndarray.py 94.11% <8.69%> (-2.80%) ⬇️
heat/optim/dp_optimizer.py 24.19% <0.00%> (-67.06%) ⬇️
heat/core/devices.py 86.66% <0.00%> (-11.12%) ⬇️
heat/core/communication.py 89.94% <0.00%> (-6.08%) ⬇️
heat/core/signal.py 95.08% <0.00%> (-4.92%) ⬇️
heat/core/stride_tricks.py 81.53% <0.00%> (-4.62%) ⬇️
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4a1fc1a...68dcc20. Read the comment docs.

@fschlimb fschlimb mentioned this pull request Jul 13, 2021
4 tasks
@fschlimb fschlimb force-pushed the Enhancement/cw4heat branch from aafd280 to 4a2727d Compare July 22, 2021 10:53
@fschlimb fschlimb force-pushed the Enhancement/cw4heat branch 3 times, most recently from d81e612 to 8c34fd9 Compare August 18, 2021 11:44
@fschlimb fschlimb force-pushed the Enhancement/cw4heat branch from a99ba9a to 1b0a4bf Compare September 1, 2021 09:36
Copy link
Member

@coquelin77 coquelin77 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things look pretty good here. good work!

There are some places where documentation could be a bit better, but i think that the best way to review this in the future would be to look at the distributor before the __init__.py. I left a few comments and questions in the code itself as well

from array arguments. For this we assume that array arguments never occur
after non-array arguments. Each function.task handles and passes array-typed
and non-array-types arguments separately.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have a simple code block to show how to run in CW mode. I can see it in the class below but maybe point to it here for clarity



def _setComm(c):
# return impl.use_comm(impl.MPICommunication(c.Create(c.group.Excl([0]))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. In some version of this the controller did not participate in computation so the workers had their own communicator. It turned out that the communication overhead triggered by the non-HeAT code on the controller quickly becomes too high.

Comment on lines +106 to +107
For now we assume all ranks (controller and workers) are started through mpirun,
workers will never leave distributor.start() and so this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this assumption is something that we should make generally. if we assume that the mpi ranks are spawned by mpirun/exec/slurm, then we can move between systems much more easily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What "systems" are you referring to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the systems that i refer to here are general MPI capable clusters. If we build on the assumption that the ranks are started by the scheduler, it is probably a bit easier to work with in the future. I think that this comment was a bit superfluous on my end. if we were to set up some way to get the same general calls, we could just as easily use NCCL or gloo. My intention originally was to make the assumptions clear, but you have done that in this comment

Comment on lines +164 to +176
def reset():
"""
Reset all internal state.
Distributed objects created before calling reset cannot be used afterwards.
"""
_runner.distributor.reset()


def sync():
"""
Trigger all computation.
"""
_runner.distributor.go(True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these functions need to have global _runner called before to make sure that they are using the right one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Comment on lines +196 to +198
def __exit__(self, exc_type, exc_value, exc_traceback):
if _runner.comm.rank == 0:
fini()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am seeing _runner a lot already, should this be a class parameter instead of a global? would that break the syntax with Ray or the arrayapi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Passing it around would require a class-variable anyway unless we want to break the API. Semantically _runner is a module-private singleton, needed at various places within the module (e.g. in more than one class). Making it a module-global object looks like the most pythonic approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense to me

"""
A task queue, each rank holds one for queuing up local tasks.
We currently dissallow submitting tasks by on-root ranks.
Non-root ranks get their TaskQueue set in the recv-lop if init().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Non-root ranks get their TaskQueue set in the recv-lop if init().
Non-root ranks get their TaskQueue set in the recv-lop of init().

Comment on lines +145 to +184
else:
print("Entering worker loop", flush=True)
done = False
header = None
while not done:
# wait in bcast for work
header = self._comm.bcast(header, 0)
# then see what we need to do
if header[0] == TASK:
self._tQueue._taskQueue = header[1]
elif header[0] == GET:
# We do not support arrays yet, scalars do not need communication
assert False
elif header[0] == GO:
self._tQueue.go()
if header[1]:
self._comm.Barrier()
elif header[0] == GETPART:
if self._comm.rank == header[1]:
val = _RemoteTask.getVal(header[2])
attr = getattr(val, header[3])
self._comm.send(attr, dest=0, tag=GETPART)
elif header[0] == PUBPART:
val = _RemoteTask.getVal(header[1])
attr = header[3](getattr(val, header[2]))
self._comm.gather(attr, root=0)
elif header[0] == RESET:
_RemoteTask.reset()
self._tQueue.clear()
# Handle._reset()
elif header[0] == END:
done = True
self._comm.Barrier()
break
else:
raise Exception("Worker received unknown tag")
MPI.Finalize()
if doExit:
sys.exit()
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of this can be dropped down a tab block. the other if block returns

heat/cw4heat/distributor.py Show resolved Hide resolved
Comment on lines +348 to +350
We keep a static dictionary mapping globally unique identifiers to dependent
global objects (like heat.DNDarrays). This keeps the objects alive and allows
communicating through simple integers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this become a memory leak for larger applications?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lately added some GC for arrays. Whenever an array gets deleted on the controller the corresponding objects will eventually get deleted from all these dicts, too. This does not work yet for non-array objects and I am not sure if/how we could do this automatically. There is a reset available to allow explicitly emptying the dicts.

Comment on lines +8 to +18
with ht.cw4h() as cw:
if cw.controller():
a = ht.arange(8, split=0)
b = ht.ones(8, split=0)
c = a @ b
# assert hasattr(c, "__partitioned__")
print(type(c))
p = c.__partitioned__()
print(c.shape, c, p)
for k, v in p["partitions"].items():
print(k, p["get"](v["data"]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the possible memory leaks mentioned before, does cleanup of the cw object cleanup the handle dictionary?

Copy link
Contributor Author

@fschlimb fschlimb Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't think we can just reset the dict when cw gets out of scope because we might want some of the arrays/values to persist (e.g. maintain Python semantics). As mentioned above, at least the arrays should get correctly removed from the dict anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are targeting larger datasets in general, i think that removing the array is enough. If the dict gets so large that its a problem then the reset is always an option.

@ghost
Copy link

ghost commented Jun 1, 2022

👇 Click on the image for a new way to code review
  • Make big changes easier — review code in small groups of related files

  • Know where to start — see the whole change at a glance

  • Take a code tour — explore the change with an interactive tour

  • Make comments and review — all fully sync’ed with github

    Try it now!

Review these changes using an interactive CodeSee Map

Legend

CodeSee Map Legend

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants