Skip to content

Commit

Permalink
Merge pull request #168 from gerritholl/call-on-done
Browse files Browse the repository at this point in the history
Add callback functionality for dask-delayed dataset saving
  • Loading branch information
mraspaud authored Aug 31, 2023
2 parents ae95cb8 + 6bd0218 commit 5b50693
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 10 deletions.
5 changes: 5 additions & 0 deletions examples/pl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ product_list: &product_list
# # pass extra keyword arguments to Scene.load
# scene_load_kwargs:
# upper_right_corner: "NE"
# call_on_done:
# - !!python/name:trollflow2.plugins.callback_close
# - !!python/name:trollflow2.plugins.callback_move
# - !!python/name:trollflow2.plugins.callback_log
# early_moving: True # must be set with callback_move; see docs for details

areas:
omerc_bb:
Expand Down
185 changes: 179 additions & 6 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

"""Trollflow2 plugins."""

import collections.abc
import copy
import datetime as dt
import os
from contextlib import contextmanager, suppress
import pathlib
from contextlib import contextmanager, suppress, nullcontext
from logging import getLogger
from tempfile import NamedTemporaryFile
from urllib.parse import urlsplit, urlunsplit
Expand All @@ -30,8 +32,10 @@
import hdf5plugin # noqa

import dask
import dask.array as da
import dpath.util
import rasterio
from dask.delayed import Delayed
from posttroll.message import Message
from posttroll.publisher import create_publisher_from_dict_config
from pyorbital.astronomy import sun_zenith_angle
Expand All @@ -42,7 +46,7 @@
from satpy import Scene
from satpy.resample import get_area_def
from satpy.version import version as satpy_version
from satpy.writers import compute_writer_results
from satpy.writers import compute_writer_results, group_results_by_output_file
from trollsift import compose

from trollflow2.dict_tools import get_config_value, plist_iter
Expand Down Expand Up @@ -313,6 +317,23 @@ def save_datasets(job):
``staging_zone`` directory, such that the filename written to the
headers remains meaningful.
The product list may contain a ``call_on_done`` parameter.
This parameter has effect if and only if ``eager_writing`` is False
(which is the default). It should contain a list of references
to callables. Upon computation time, each callable will be
called with four arguments: the result of ``save_dataset``,
targets (if applicable), the full job dictionary, and the
dictionary describing the format config and output filename
that was written. The parameter ``targets`` is set to None
if using a writer where :meth:`~satpy.Scene.save_datasets`
does not return this. The callables must return again the
``save_dataset`` return value (possibly altered). This callback
could be used, for example, to ship products as soon as they are
successfully produced.
Three built-in are provided with Trollflow2: :func:`callback_close`,
:func:`callback_move` and :func:`callback_log`.
Other arguments defined in the job list (either directly under
``product_list``, or under ``formats``) are passed on to the satpy writer. The
arguments ``use_tmp_file``, ``staging_zone``, ``output_dir``,
Expand All @@ -323,16 +344,109 @@ def save_datasets(job):
base_config = job['input_mda'].copy()
base_config.pop('dataset', None)
eager_writing = job['product_list']['product_list'].get("eager_writing", False)
with renamed_files() as renames:
early_moving = job['product_list']['product_list'].get("early_moving", False)
call_on_done = job["product_list"]["product_list"].get("call_on_done", None)
if call_on_done is not None:
callbacks = [dask.delayed(c) for c in call_on_done]
else:
callbacks = None
if early_moving:
cm = nullcontext({})
else:
cm = renamed_files()
with cm as renames:
for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config):
obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
if obj is not None:
objs.append(obj)
late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing)
late_saver = _apply_callbacks(late_saver, callbacks, job, fmat_config)
if late_saver is not None:
objs.append(late_saver)
job['produced_files'].put(fmat_config['filename'])
if not eager_writing:
compute_writer_results(objs)


def _apply_callbacks(late_saver, callbacks, *args):
"""Apply callbacks if there are any.
If we are using callbacks via the ``call_on_done`` parameter, wrap
``late_saver`` with those iteratively. If not, return ``late_saver`` as is.
Here, ``late_saver`` is whatever :meth:`satpy.Scene.save_datasets`
returns.
"""
if callbacks is None:
return late_saver
if isinstance(late_saver, Delayed):
return _apply_callbacks_to_delayed(late_saver, callbacks, None, *args)
if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2:
if isinstance(late_saver[0], collections.abc.Sequence):
return _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args)
return _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args)
raise ValueError(
"Unrecognised return value type from ``save_datasets``, "
"don't know how to apply wrappers.")


def _apply_callbacks_to_delayed(delayed, callbacks, *args):
"""Recursively apply the callbacks to the delayed object.
Args:
delayed: dask Delayed object to which callbacks are applied
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
delayed type with callbacks applied
"""
delayed = callbacks[0](delayed, *args)
for callback in callbacks[1:]:
delayed = callback(delayed, *args)
return delayed


def _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args):
"""Apply callbacks to multiple sources/targets pairs.
Taking source/target pairs such as returned by
:meth:`satpy.Scene.save_datasets`, split those by file and turn them all in
delayed types by calling :func:`dask.array.store`, then apply callbacks.
Args:
late_saver: tuple of ``(sources, targets)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
list of delayed types
"""
delayeds = []
for (src, targ) in group_results_by_output_file(*late_saver):
delayed = da.store(src, targ, compute=False)
delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, targ, *args))
return delayeds


def _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args):
"""Apply callbacks to single source/target pairs.
Taking a single source/target pair such as may be returned by
:meth:`satpy.Scene.save_datasets`, turn this into a delayed type
type by calling :func:`dask.array.store`, then apply callbacks.
Args:
late_saver: tuple of ``(source, target)`` such as may be returned
by :meth:`satpy.Scene.save_datasets`.
callbacks: list of dask Delayed objects to apply
*args: remaining arguments passed to callbacks
Returns:
delayed types
"""
(src, targ) = late_saver
delayed = da.store(src, targ, compute=False)
return _apply_callbacks_to_delayed(delayed, callbacks, [targ], *args)


def product_missing_from_scene(product, scene):
"""Check if product is missing from the scene."""
if not isinstance(product, (tuple, list)):
Expand Down Expand Up @@ -941,3 +1055,62 @@ def _product_meets_min_valid_data_fraction(
logger.debug(f"Found {rel_valid:%}>{min_frac:%}, keeping "
f"{prod_name:s} for area {area_name:s} in the worklist")
return True


def callback_log(obj, targs, job, fmat_config):
"""Log written files as callback for save_datasets call_on_done.
Callback function that can be used with the :func:`save_datasets`
``call_on_done`` functionality. Will log a message with loglevel INFO to
report that the filename was written successfully along with its size.
If using :func:`callback_move` in combination with
:func:`callback_log`, you must call :func:`callback_log` AFTER
:func:`callback_move`, because the logger looks for the final
destination of the file, not the temporary one.
"""
filename = fmat_config["filename"]
size = os.path.getsize(filename)
logger.info(f"Wrote {filename:s} successfully, total {size:d} bytes.")
return obj


def callback_move(obj, targs, job, fmat_config):
"""Move files as a callback by save_datasets call_on_done.
Callback function that can be used with the :func:`save_datasets`
``call_on_done`` functionality. Moves the file to the directory indicated
with ``output_dir`` in the configuration. This directory will be
created if needed.
This callback must be used with ``staging_zone`` and ``early_moving`` MUST
be set in the configuration. If used in combination with
:func:`callback_log`, you must call :func:`callback_log` AFTER
:func:`callback_move`, because the logger looks for the final destination
of the file, not the temporary one.
"""
destfile = pathlib.Path(fmat_config["filename"])
srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"])
srcfile = srcdir / destfile.name
logger.debug(f"Moving {srcfile!s} to {destfile!s}")
srcfile.rename(destfile)
return obj


def callback_close(obj, targs, job, fmat_config):
"""Close files as a callback where needed.
When using callbacks with writers that return a ``(src, target)`` pair for
``da.store``, satpy doesn't close the file until after computation is
completed. That means there may be data that have been computed, but not
yet written to disk. This is normally the case for the geotiff writer.
For callbacks that depend on the files to be complete, the file should be
closed first. This callback should be prepended in this case.
If passed a ``dask.Delayed`` object, this callback does nothing. If passed
a ``(src, targ)`` pair, it closes the target.
"""
if targs:
for targ in targs:
targ.close()
return obj
Loading

0 comments on commit 5b50693

Please sign in to comment.