Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback functionality for dask-delayed dataset saving #168

Merged
merged 44 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
17af3e5
Added callback test
gerritholl Nov 8, 2022
22587aa
Add functionality for callback when computation done
gerritholl Nov 8, 2022
07216d2
Fix call_on_done docu
gerritholl Nov 8, 2022
a2e92fe
Documentation clarifiraciotn
gerritholl Nov 8, 2022
fa30b70
Allow multiple call-on-done callbacks
gerritholl Nov 9, 2022
40d1c30
Added logging and moving callbacks
gerritholl Nov 9, 2022
21e1a3d
Fix recursive calling
gerritholl Nov 9, 2022
b07b4eb
Add stronger test for successful writing
gerritholl Nov 9, 2022
8c828be
When logging, report on the written size.
gerritholl Nov 9, 2022
07d2ac5
Add closer-callback
gerritholl Nov 11, 2022
f14d7f9
Make callback_close more flexible
gerritholl Nov 11, 2022
0a82a27
Fixing style errors.
stickler-ci Nov 11, 2022
14fdce8
callback_move should update filename
gerritholl Nov 11, 2022
ff60ddd
Add unit test for missing file problem
gerritholl Nov 11, 2022
d30feb1
Fixing style errors.
stickler-ci Nov 11, 2022
849df63
revert the moving logic
gerritholl Nov 11, 2022
9da76fb
Merge branch 'call-on-done' of github.com:gerritholl/trollflow2 into …
gerritholl Nov 11, 2022
a462de6
Fixing style errors.
stickler-ci Nov 11, 2022
253365e
Merge branch 'main' into call-on-done
gerritholl Nov 11, 2022
e7396a7
Merge branch 'call-on-done' of github.com:gerritholl/trollflow2 into …
gerritholl Nov 11, 2022
ec02361
Clarify order of callbacks
gerritholl Nov 14, 2022
6ea3d2c
more realistic test expectation
gerritholl Nov 14, 2022
1d0bbc7
Compare contents, not size
gerritholl Nov 15, 2022
b1567d9
Apply callback once per set of targets
gerritholl Nov 17, 2022
ab88f83
Fixing style errors.
stickler-ci Nov 17, 2022
a9608ff
Do not pass src to callbacks
gerritholl Nov 18, 2022
b5f989f
Merge branch 'main' into call-on-done
gerritholl Feb 6, 2023
a833153
Add missing import
gerritholl Feb 6, 2023
4fdce8d
add early_moving to example playlist
gerritholl Feb 17, 2023
7a6ac4a
Doc fix: four, not five arguments
gerritholl Feb 28, 2023
8226e8c
Merge branch 'main' into call-on-done
mraspaud Mar 6, 2023
fdee225
Merge branch 'main' into call-on-done
pnuu Mar 8, 2023
78b646f
Fix logger names
pnuu Mar 9, 2023
1f56dd1
Add fix and test for delayed-based collback
gerritholl Mar 10, 2023
e350c10
Rename private function source/s target/s
gerritholl Mar 10, 2023
8a37c43
Change conditional context manager logic
gerritholl Mar 10, 2023
863ffb2
Merge branch 'main' into call-on-done
pnuu Mar 13, 2023
aebc323
Merge branch 'main' into call-on-done
pnuu Mar 27, 2023
21fd916
Merge branch 'main' into call-on-done
pnuu Apr 20, 2023
ddd70c8
Merge branch 'main' into call-on-done
pnuu May 2, 2023
a7940dd
Merge branch 'main' into call-on-done
pnuu Aug 11, 2023
13afd9c
Merge branch 'main' into call-on-done
gerritholl Aug 30, 2023
fc75b4d
Merge remote-tracking branch 'origin/call-on-done' into call-on-done
gerritholl Aug 30, 2023
6bd0218
Use None as a sentinel
gerritholl Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/pl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ product_list: &product_list
# # pass extra keyword arguments to Scene.load
# scene_load_kwargs:
# upper_right_corner: "NE"
# call_on_done:
# - !!python/name:trollflow2.plugins.callback_close
# - !!python/name:trollflow2.plugins.callback_move
# - !!python/name:trollflow2.plugins.callback_log
# early_moving: True # must be set with callback_move; see docs for details

areas:
omerc_bb:
Expand Down
191 changes: 181 additions & 10 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

"""Trollflow2 plugins."""

import collections.abc
import datetime as dt
import os
import pathlib
from contextlib import contextmanager, suppress
from logging import getLogger
from tempfile import NamedTemporaryFile
Expand All @@ -29,8 +31,10 @@
import hdf5plugin # noqa

import dask
import dask.array as da
import dpath.util
import rasterio
from dask.delayed import Delayed
from posttroll.message import Message
from posttroll.publisher import create_publisher_from_dict_config
from pyorbital.astronomy import sun_zenith_angle
Expand All @@ -41,7 +45,7 @@
from satpy import Scene
from satpy.resample import get_area_def
from satpy.version import version as satpy_version
from satpy.writers import compute_writer_results
from satpy.writers import compute_writer_results, group_results_by_output_file
from trollsift import compose

from trollflow2.dict_tools import get_config_value, plist_iter
Expand Down Expand Up @@ -260,16 +264,17 @@ def _create_data_query(product, res):


@contextmanager
def renamed_files():
def renamed_files(do_renames):
"""Context renaming files."""
renames = {}

yield renames

for tmp_name, actual_name in renames.items():
target_scheme = urlsplit(actual_name).scheme
if target_scheme in ('', 'file'):
os.rename(tmp_name, actual_name)
if do_renames:
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
for tmp_name, actual_name in renames.items():
target_scheme = urlsplit(actual_name).scheme
if target_scheme in ('', 'file'):
os.rename(tmp_name, actual_name)


def save_datasets(job):
Expand All @@ -292,6 +297,23 @@ def save_datasets(job):
``staging_zone`` directory, such that the filename written to the
headers remains meaningful.

The product list may contain a ``call_on_done`` parameter.
This parameter has effect if and only if ``eager_writing`` is False
(which is the default). It should contain a list of references
to callables. Upon computation time, each callable will be
called with four arguments: the result of ``save_dataset``,
targets (if applicable), the full job dictionary, and the
dictionary describing the format config and output filename
that was written. The parameter ``targets`` is set to None
if using a writer where :meth:`~satpy.Scene.save_datasets`
does not return this. The callables must return again the
``save_dataset`` return value (possibly altered). This callback
could be used, for example, to ship products as soon as they are
successfully produced.

Three built-in are provided with Trollflow2: :func:`callback_close`,
:func:`callback_move` and :func:`callback_log`.

Other arguments defined in the job list (either directly under
``product_list``, or under ``formats``) are passed on to the satpy writer. The
arguments ``use_tmp_file``, ``staging_zone``, ``output_dir``,
Expand All @@ -302,16 +324,106 @@ def save_datasets(job):
base_config = job['input_mda'].copy()
base_config.pop('dataset', None)
eager_writing = job['product_list']['product_list'].get("eager_writing", False)
with renamed_files() as renames:
early_moving = job['product_list']['product_list'].get("early_moving", False)
sentinel = object()
call_on_done = job["product_list"]["product_list"].get("call_on_done", sentinel)
if call_on_done is not sentinel:
callbacks = [dask.delayed(c) for c in call_on_done]
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
else:
callbacks = None
with renamed_files(not early_moving) as renames:
for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config):
obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
if obj is not None:
objs.append(obj)
late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
late_saver = _apply_callbacks(late_saver, callbacks, job, fmat_config)
if late_saver is not None:
objs.append(late_saver)
job['produced_files'].put(fmat_config['filename'])
if not eager_writing:
compute_writer_results(objs)


def _apply_callbacks(late_saver, callbacks, *args):
"""Apply callbacks if there are any.

If we are using callbacks via the ``call_on_done`` parameter, wrap
``late_saver`` with those iteratively. If not, return ``late_saver`` as is.
Here, ``late_saver`` is whatever :meth:`satpy.Scene.save_datasets`
returns.
"""
if callbacks is None:
return late_saver
if isinstance(late_saver, Delayed):
return _apply_callbacks_to_delayed(late_saver, callbacks, None, *args)
if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2:
if isinstance(late_saver[0], collections.abc.Sequence):
return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args)
return _apply_callbacks_to_source_and_target(late_saver, callbacks, *args)
raise ValueError(
"Unrecognised return value type from ``save_datasets``, "
"don't know how to apply wrappers.")


def _apply_callbacks_to_delayed(delayed, callbacks, *args):
"""Recursively apply the callbacks to the delayed object.

Args:
delayed: dask Delayed object to which callbacks are applied
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks

Returns:
delayed type with callbacks applied
"""
delayed = callbacks[0](delayed, *args)
for callback in callbacks[1:]:
delayed = callback(delayed, *args)
return delayed


def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args):
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
"""Apply callbacks to multiple sources/targets pairs.

Taking source/target pairs such as returned by
:meth:`satpy.Scene.save_datasets`, split those by file and turn them all in
delayed types by calling :func:`dask.array.store`, then apply callbacks.

Args:
late_saver: tuple of ``(sources, targets)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks

Returns:
list of delayed types
"""
delayeds = []
for (src, targ) in group_results_by_output_file(*late_saver):
delayed = da.store(src, targ, compute=False)
delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, targ, *args))
return delayeds


def _apply_callbacks_to_source_and_target(late_saver, callbacks, *args):
"""Apply callbacks to single source/target pairs.

Taking a single source/target pair such as may be returned by
:meth:`satpy.Scene.save_datasets`, turn this into a delayed type
type by calling :func:`dask.array.store`, then apply callbacks.

Args:
late_saver: tuple of ``(source, target)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks

Returns:
delayed types
"""
(src, targ) = late_saver
delayed = da.store(src, targ, compute=False)
return _apply_callbacks_to_delayed(delayed, callbacks, [targ], *args)


def product_missing_from_scene(product, scene):
"""Check if product is missing from the scene."""
if not isinstance(product, (tuple, list)):
Expand Down Expand Up @@ -920,3 +1032,62 @@ def _product_meets_min_valid_data_fraction(
logger.debug(f"Found {rel_valid:%}>{min_frac:%}, keeping "
f"{prod_name:s} for area {area_name:s} in the worklist")
return True


def callback_log(obj, targs, job, fmat_config):
"""Log written files as callback for save_datasets call_on_done.

Callback function that can be used with the :func:`save_datasets`
``call_on_done`` functionality. Will log a message with loglevel INFO to
report that the filename was written successfully along with its size.

If using :func:`callback_move` in combination with
:func:`callback_log`, you must call :func:`callback_log` AFTER
:func:`callback_move`, because the logger looks for the final
destination of the file, not the temporary one.
"""
filename = fmat_config["filename"]
size = os.path.getsize(filename)
logger.info(f"Wrote {filename:s} successfully, total {size:d} bytes.")
return obj


def callback_move(obj, targs, job, fmat_config):
"""Move files as a callback by save_datasets call_on_done.

Callback function that can be used with the :func:`save_datasets`
``call_on_done`` functionality. Moves the file to the directory indicated
with ``output_dir`` in the configuration. This directory will be
created if needed.

This callback must be used with ``staging_zone`` and ``early_moving`` MUST
be set in the configuration. If used in combination with
:func:`callback_log`, you must call :func:`callback_log` AFTER
:func:`callback_move`, because the logger looks for the final destination
of the file, not the temporary one.
"""
destfile = pathlib.Path(fmat_config["filename"])
srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"])
srcfile = srcdir / destfile.name
logger.debug(f"Moving {srcfile!s} to {destfile!s}")
srcfile.rename(destfile)
return obj


def callback_close(obj, targs, job, fmat_config):
"""Close files as a callback where needed.

When using callbacks with writers that return a ``(src, target)`` pair for
``da.store``, satpy doesn't close the file until after computation is
completed. That means there may be data that have been computed, but not
yet written to disk. This is normally the case for the geotiff writer.
For callbacks that depend on the files to be complete, the file should be
closed first. This callback should be prepended in this case.

If passed a ``dask.Delayed`` object, this callback does nothing. If passed
a ``(src, targ)`` pair, it closes the target.
"""
if targs:
for targ in targs:
targ.close()
return obj
Loading