-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Is there any way of having .map_blocks
be even more opaque to dask?
#8414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
You have a shuffle like thing going from Tower 3 to Tower 4. That's probably it. What's happening there? Also you can write from within map_blocks and return 0. Does that fit your workload |
@max-sixty I have run into similar issues, and wrote a package called xpartition to facilitate this kind of crude / pragmatic writing approach. I have been hesitant to advertise it widely as I have many of the same reservations as you, but despite that I find myself using it regularly for important things. |
@max-sixty I was doing a similar thing last week, hit the same issue and after trying a thousand strategies I ended up working around it with a custom scheduler inside the map_blocks as you did 😢 |
FYI EDIT: Also doing this
is essentially what I would like to do with |
Eeeek, sounds like this is a common issue then... Without wanting to blast (Goes without saying that I am very happy to be wrong, not trying to throw stones at the hard work your team has done)
That would work well, but it needs the indexes, and that's effectively prevented by #8409, because we get >1GB task graphs.
Yes, good observation. Speculatively — I think this is splitting the source read by data variable, then shuffling to do the calc, then writing each destination data variable. If that's correct, then another approach could be to try and keep the tasks for each block together, rather than organizing by each data variable. I'll try and get to checking whether that assertion is correct.
I'm very up for checking this out, and some of the team pointed me towards it (hence my DM last week @TomNicholas !). I am way over on trying new things vs getting work done, and I would need to set up an env for running cubed. I'd also like to try beam! |
I'll defer to @fjetter, who has been playing a lot with Dask graph ordering recently and having decent success. He may have a new version for you to try. If you're able to provide an mcve that recreates your currently unhappy graph I'd encourage you to do so. There's pretty good activity in this space right now. |
IDK you have one dask array per variable anyway. But maybe the viz is misleading. This kind of shuffle type workload is where distributed falls apart. Also why are you splitting by variable? In general, I'm also confused as to how a shuffle type workload gets expressed as a blockwise workload without a major rechunking/rearrangement (i.e. a shuffle) in the middle.
Could try with the current state of #8412 |
It wasn't me splitting by variable! 😄 dask seems to be doing it. I'll try and get together an MCVE... |
OK, here's an MCVE. It requires more specificity than I expected — I think my read that things were working last week was correct actually, because the calcs were slightly different. It relies on the nested calcs, such that (though that was the beauty of ds = xr.Dataset(
data_vars=dict(
a=(("x", "y"), np.arange(80).reshape(8, 10)),
b=(("y"), np.arange(10)),
)
).chunk(x=1)
def f(ds):
d = ds.a * ds.b
return (d * ds.b).sum("y")
result = ds.map_blocks(f).compute() Here's the graph — you can see the lack of basic parallelism that @dcherian pointed out above — though obv less severe than the big example above: ![]() |
Excellent @max-sixty! @tomwhite we should add this to the distributed arrays repo examples, and try it with cubed |
This is basically the same as pangeo-data/distributed-array-examples#2 which @fjetter tried to fix with dask/dask#10535 EDIT: And yeah the viz was misleading. It's not a shuffle, it's blockwise ops on different arrays, but only purely blockwise if you're unchunked on |
@dcherian right yeah, I see that now. We should still try it out! |
Nice! Yes I've been following that, it looks quite promising. Unfortunately I have been running on |
Yes, indeed. That includes already the fix. However, I'm working on a follow up that should work even better here dask/dask#10557 At least for your MCVE this new PR works as intended. The following graphs show the prioritization of tasks / the order in which we run tasks.
Note how all tasks after the
The tasks are only loaded when necessary The PR in question has a couple of performance issues to figure out but it would help a lot if you could try it out on your real problem and let me know if this is indeed helping you. FWIW the performance problems of this PR would show up as a longer delay before your computation kicks off. However, judging by the graph I'm seeing you may not even be impacted by this. I really appreciate that you put in the effort to create the minimal example. We can put this into our benchmark suite and add a unit test to |
That's awesome @fjetter ! (FYI it's difficult for me to try out code that's not on PyPI for this problem, but I will do when it's there) |
One thought a couple of days later — having dask execute this graph reasonably would be ideal. But the original issue was whether we could be more opaque to dask, which is a plausible way of reducing the risk of poor execution. IIUC, I don't know how feasible it is to do that. I previously used |
Isn't this pretty much the model of xarray-beam? |
This would be a bigger change than I envisioned, because it would need to handle cases where some variables don't share dimensions with the chunked dimension. (I'm actually not sure how xarray-beam handles this — maybe it requires all variables to be chunked on the "distributed" dimension?) I'm not sure there's much to do here apart from wait for upstream / find alternatives in the meantime. So planning to close unless anyone has ideas for this. |
I still don't understand what's stopping us rewriting |
I apologize that this is almost off-topic but I just merged dask/dask#10660 which should fix the performance problems that triggered this conversation. From what I can tell this fixes the issue and I believe the new approach is more robust to your problems. If you encounter more issues like this, please let me know. I believe with the new implementation we can be more responsive to those issues. |
You have to pass around more than arrays; random dicts as Honestly, I'd much rather work on
YEAH! Thanks @fjetter. IMO we can close. |
This looks like a big improvement @fjetter ! Thank you for all your hard work here, user experience should be improved greatly by these efforts. I'm still confused by why we can't just dispatch to |
Is your feature request related to a problem?
Currently I have a workload which does something a bit like:
(the actual calc is a bit more complicated! And while I don't have a MVCE of the full calc, I pasted a task graph below)
Dask — while very impressive in many ways — handles this extremely badly, because it attempts to load the whole of
ds
into memory before writing out any chunks. There are lots of issues on this in the dask repo; it seems like an intractable problem for dask.Describe the solution you'd like
I was hoping to make the internals of this task opaque to dask, so it became a much dumber task runner — just map over the blocks, running the function and writing the result, block by block. I thought I had some success with
.map_blocks
last week — the internals of the calc are now opaque at least. But the dask cluster is falling over again, I think because the write is seen as a separate task.Is there any way to make the write more opaque too?
Describe alternatives you've considered
I've built a homegrown thing which is really hacky which does this on a custom scheduler — just runs the functions and writes with
region
. I'd much prefer to use & contribute to the broader ecosystem...Additional context
(It's also possible I'm making some basic error — and I do remember it working much better last week — so please feel free to direct me / ask me for more examples, if this doesn't ring true)
The text was updated successfully, but these errors were encountered: