Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback functionality for dask-delayed dataset saving #168

Merged
merged 44 commits into from
Aug 31, 2023

Conversation

gerritholl
Copy link
Member

@gerritholl gerritholl commented Nov 8, 2022

Add functionality for a callback function that is called as soon as a file has been written. This could be used to ship out written files as soon as they are done, rather than waiting for all files to be complete.

Added a test for functionality of a callback to be executed upon every
completed delayed writing.
Added functionality for a callback function that is called as soon as
the computation is completed.
Fix documentation for call_on_done and add example to the example
playlist.
@gerritholl
Copy link
Member Author

With thanks to @martindurant for the StackOverflow answer at https://stackoverflow.com/a/74354842/974555 pointing out how to do this :)

@codecov
Copy link

codecov bot commented Nov 8, 2022

Codecov Report

Merging #168 (6bd0218) into main (ae95cb8) will increase coverage by 0.16%.
The diff coverage is 99.29%.

@@            Coverage Diff             @@
##             main     #168      +/-   ##
==========================================
+ Coverage   95.56%   95.73%   +0.16%     
==========================================
  Files          13       13              
  Lines        2821     2954     +133     
==========================================
+ Hits         2696     2828     +132     
- Misses        125      126       +1     
Flag Coverage Δ
unittests 95.73% <99.29%> (+0.16%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
trollflow2/plugins/__init__.py 94.18% <98.36%> (+0.40%) ⬆️
trollflow2/tests/test_trollflow2.py 98.51% <100.00%> (+0.10%) ⬆️

@gerritholl gerritholl marked this pull request as draft November 9, 2022 07:31
@gerritholl
Copy link
Member Author

Putting on draft as I'm trying to figure out a good way for the callback function to get the information it may need.

For the call-on-done functionality, allow multiple callbacks to be
called each in turn.
When having mulitp.le call on dones, don't call save_dataset multiple
times
Add stronger test to confirm writing was successful.  Turns out it
wasn't.
@gerritholl
Copy link
Member Author

Works wonderfully in the unit tests, but in the real world, the files to be written are empty when the callbacks are called, at least for GeoTIFF.

And the trollflow2 launcher complains about missing files.

@gerritholl
Copy link
Member Author

It works for some writers but not others. Example:

import os.path
import xarray as xr
import dask.array as da
from dask import delayed
from pyresample import create_area_def
from satpy.tests.utils import make_fake_scene

fake_area = create_area_def("sargasso", 4326, resolution=1, width=5, height=5, center=(0, 0))
fake_scene = make_fake_scene(
    {"dragon_top_height": (dat := xr.DataArray(
        dims=("y", "x"),
        data=da.arange(25).reshape((5, 5)))),
     "penguin_bottom_height": dat,
     "kraken_depth": dat},
    daskify=True,
    area=fake_area)

def on_done(result, fn):
    print("Size for", fn, "is", os.path.getsize(fn))
    return result

objs = []
for i in range(5):
    fn = f"/tmp/test{i:d}.tif"
    obj = fake_scene.save_dataset("dragon_top_height", filename=fn,
        compute=False)
    objs.append(delayed(on_done)(obj, fn))
print(objs)
da.compute(objs)

For .tif, this results in:

Size for /tmp/test0.tif is 0
Size for /tmp/test4.tif is 0
Size for /tmp/test3.tif is 0
Size for /tmp/test1.tif is 0
Size for /tmp/test2.tif is 0

For .png:

Size for /tmp/test3.png is 100
Size for /tmp/test2.png is 100
Size for /tmp/test0.png is 100
Size for /tmp/test4.png is 100
Size for /tmp/test1.png is 100

For .nc:

Size for /tmp/test4.nc is 22742
Size for /tmp/test1.nc is 22742
Size for /tmp/test2.nc is 22742
Size for /tmp/test0.nc is 22742
Size for /tmp/test3.nc is 22742

@gerritholl
Copy link
Member Author

The difference is that for the NetCDF and simple_image writers, the result of save_dataset is a Delayed object. For geotiff, it's a list of a dask array with a RIODataset object.

@gerritholl
Copy link
Member Author

gerritholl commented Nov 10, 2022

I also tried to replace

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
obs.append(delayed(on_done)(obj, fn))

with

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
(src, targ) = obj
objs.append(delayed(on_done)(da.store(src, targ, compute=False), fn))

but the problem remains, when on_done is called the files still have size 0.

@gerritholl
Copy link
Member Author

When I add

targ.close()

it results in

Size for /tmp/test0.tif is 551
Size for /tmp/test2.tif is 551
Size for /tmp/test1.tif is 551
Size for /tmp/test4.tif is 551
Size for /tmp/test3.tif is 551

which corresponds to only the header

@djhoese
Copy link
Member

djhoese commented Nov 10, 2022

When you run this code that you tried:

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
(src, targ) = obj
objs.append(delayed(on_done)(da.store(src, targ, compute=False), fn))

Can you add a print to on_done to print out what the argument is? Is it a Delayed object? Or is it None? If it is None then I'm extremely surprised at the size 0 files. Oh but it sounds like you needed to close the file for it to flush everything to disk, maybe?

@gerritholl
Copy link
Member Author

The value of result is (None, None):

Size for /tmp/test4.tif is 551
[None, None]
Size for /tmp/test3.tif is 551
[None, None]
Size for /tmp/test2.tif is 551
[None, None]
Size for /tmp/test0.tif is 551
[None, None]
Size for /tmp/test1.tif is 551
[None, None]

Oh but it sounds like you needed to close the file for it to flush everything to disk, maybe?

Right. Example without any pytroll components:

from dask import delayed
import dask.array as da
import os.path
import os
import h5py

ds = da.arange(100).reshape(10, 10)
out = "/tmp/myfile.h5"
os.unlink(out)
f = h5py.File(out, mode='a')

dset = f.create_dataset("data", ds.shape, dtype='f8')

def on_done(result, fn):
    print("Size for", fn, "is now", os.path.getsize(fn))
    return result

x = da.store(ds, dset, compute=False)

da.compute(delayed(on_done)(x, out))
print("Size after computing for", out, "is", os.path.getsize(out))
f.close()
print("Final size for", out, "is", os.path.getsize(out))

which results in

Size for /tmp/myfile.h5 is now 96
Size after computing for /tmp/myfile.h5 is 96
Final size for /tmp/myfile.h5 is 2848

So the issue here appears to be that computing does not result in the file being flushed/closed. I guess for the other writers, the file gets closed before computation is finished.

We might need to put a .flush() somewhere. Not sure where, and not sure what the impacts of doing so might be.

Add a callback that closes the file.  This should be used before logging
or moving.
@gerritholl
Copy link
Member Author

I've added a callback that does a close (there's no flush exposed) and it seems to work for GeoTIFF now, at least the unit tests pass. Will do a live test later today.

@gerritholl gerritholl marked this pull request as ready for review November 11, 2022 11:47
@gerritholl gerritholl marked this pull request as draft November 11, 2022 12:38
@gerritholl
Copy link
Member Author

gerritholl commented Nov 11, 2022

Passes unit tests, but fails the field test with

Traceback (most recent call last):
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/launcher.py", line 377, in process
    cwrk.pop('fun')(job, **cwrk)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/plugins/__init__.py", line 334, in save_datasets
    compute_writer_results(objs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/satpy/writers/__init__.py", line 536, in compute_writer_results
    da.compute(delayeds)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/base.py", line 602, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/plugins/__init__.py", line 974, in callback_close
    obj[1].close()
AttributeError: 'tuple' object has no attribute 'close'

inspection reveals that obj here is:

[(array([[[ 27,  27,  20, ...,  29,  29,  29],
        [ 27,  27,  20, ...,  28,  28,  28],
        [ 27,  27,  20, ...,  28,  28,  28],
        ...,
        [ 24,  23,  23, ..., 103, 103, 100],
        [ 23,  23,  23, ..., 103, 103, 100],
        [ 23,  23,  23, ..., 100,  97,  97]]], dtype=uint8), 6, 119), (<trollimage.xrimage.RIODataset object at 0x7effbea3d480>, <trollimage.xrimage.RIOTag object at 0x7effbea3eef0>, <trollimage.xrimage.RIOTag object at 0x7eff7109c910>)]

meaning that rather than target being a single RIODataset, it is actually a tuple of (RIODataset, RIOTag, RIOTag) objects… now how/why does that happen?

@gerritholl
Copy link
Member Author

A quick test shows that for this case, everything seems to work as expected if I loop through obj[1] and close all members, but why is obj[1] sometimes a RIODataset and sometimes a tuple?

@djhoese
Copy link
Member

djhoese commented Nov 11, 2022

I mentioned the reasoning for the multi-element target on slack (tiff tags using dask arrays) and that we should maybe add a wrapper object in XRImage. For the on_done callback, should it maybe be doing the closing? Or maybe be chained after another function that does the closing?

Make callback_close more flexible, such that it can handle the case
where there are dask-dependent attributes, that must also be closed.
Fix documentation for save_datasets: the callbacks pass four, not five,
parameters.
@pnuu
Copy link
Member

pnuu commented Mar 3, 2023

I guess it makes sense that for large area the RAM usage is lower when the fully used data can be released and cleaned.

Copy link
Member

@pnuu pnuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just couple of comments, otherwise LGTM.

trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
@mraspaud
Copy link
Member

mraspaud commented Mar 6, 2023

@gerritholl I made an attempt at fixing the merge conflicts

@pnuu
Copy link
Member

pnuu commented Mar 9, 2023

I fixed the two logger names that were not present in my logging update already merged in main.

Add a test for a callback when the writer returns a delayed-object, and
fix a bug exposed by this test.

Avoid duplicate documentation, just refer from one place to the other.
@gerritholl gerritholl requested a review from pnuu March 10, 2023 10:57
pnuu
pnuu previously approved these changes Mar 10, 2023
Copy link
Member

@pnuu pnuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@mraspaud mraspaud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks sound in the logic. However it adds quite some complexity in the code, which I think calls for being extra careful on code cleanliness. Herein, some comments that could help with that.

trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
trollflow2/plugins/__init__.py Outdated Show resolved Hide resolved
Rename the private functions for applying to either single or multiple
sources and targets.
@pnuu
Copy link
Member

pnuu commented May 2, 2023

I fixed the couple merge conflicts. At least I hope I did.

Copy link
Member

@mraspaud mraspaud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for adding this feature!

@mraspaud mraspaud merged commit 5b50693 into pytroll:main Aug 31, 2023
@mraspaud mraspaud added the enhancement New feature or request label Aug 31, 2023
@gerritholl gerritholl deleted the call-on-done branch August 23, 2024 10:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Eager processing for faster and more continuous image delivery
5 participants