Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generating a recipe's hash fails when using process_chunk and process_input functions #427

Closed
andersy005 opened this issue Oct 25, 2022 · 14 comments · Fixed by #429
Closed

Comments

@andersy005
Copy link
Member

andersy005 commented Oct 25, 2022

Recently, I've encountered a beam issue that has not been easy to reproduce. This seems to affect recipes that use the process_input and process_chunk functions.

Everything works perfectly fine when using apache beam locally. However, for reasons I don't understand, pangeo-forge-recipes is unable to hash these two inputs when using dataflow, resulting in the error below.

There are plenty of recipes/feedstocks that are blocked by this issue, and any help diagnosing the issue would be greatly appreciated. Cc @alxmrs, @rabernat, @derekocallaghan, @yuvipanda

Apologies for the lack of a minimal reproducible example. I haven't been able to come up with one :(

  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 827, in findsource
    raise OSError('source code not available')
OSError: source code not available [while running 'Start|cache_input|Reshuffle_000|prepare_target|Reshuffle_001|store_chunk|Reshuffle_002|finalize_target|Reshuffle_003/prepare_target-ptransform-56']
,

  a6170692e70616e67656f2d66-10201802-cyzc-harness-sbr9
      Root cause: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/core.py", line 1877, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
    fun(config=config)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 587, in prepare_target
    for k, v in config.get_execution_context().items():
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/base.py", line 59, in get_execution_context
    recipe_hash=self.sha256().hex(),
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/base.py", line 53, in sha256
    return dataclass_sha256(self, ignore_keys=self._hash_exclude_)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 73, in dataclass_sha256
    return dict_to_sha256(d)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 34, in dict_to_sha256
    b = dumps(
  File "/srv/conda/envs/notebook/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/srv/conda/envs/notebook/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/srv/conda/envs/notebook/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 22, in either_encode_or_hash
    return inspect.getsource(obj)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 1024, in getsource
    lines, lnum = getsourcelines(object)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 1006, in getsourcelines
    lines, lnum = findsource(object)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 827, in findsource
    raise OSError('source code not available')
OSError: source code not available

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/core.py", line 1877, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
    fun(config=config)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 587, in prepare_target
    for k, v in config.get_execution_context().items():
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/base.py", line 59, in get_execution_context
    recipe_hash=self.sha256().hex(),
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/base.py", line 53, in sha256
    return dataclass_sha256(self, ignore_keys=self._hash_exclude_)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 73, in dataclass_sha256
    return dict_to_sha256(d)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 34, in dict_to_sha256
    b = dumps(
  File "/srv/conda/envs/notebook/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/srv/conda/envs/notebook/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/srv/conda/envs/notebook/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/serialization.py", line 22, in either_encode_or_hash
    return inspect.getsource(obj)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 1024, in getsource
    lines, lnum = getsourcelines(object)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 1006, in getsourcelines
    lines, lnum = findsource(object)
  File "/srv/conda/envs/notebook/lib/python3.9/inspect.py", line 827, in findsource
    raise OSError('source code not available')
OSError: source code not available [while running 'Start|cache_input|Reshuffle_000|prepare_target|Reshuffle_001|store_chunk|Reshuffle_002|finalize_target|Reshuffle_003/prepare_target-ptransform-56']
@andersy005
Copy link
Member Author

Looks like this issue was documented by @cisaacstern in pangeo-forge/staged-recipes#183 (comment)

@alxmrs
Copy link
Contributor

alxmrs commented Oct 25, 2022

@andersy005: I'm happy to debug with you sometime this week. I have some time tomorrow and Friday mornings PST -- when are you most available?

@alxmrs
Copy link
Contributor

alxmrs commented Oct 25, 2022

Looking a bit at the sources: The inspect call here seems like a stress test for any python serializer. Are we sure that we want to serialize methods in python dataclasses?

@alxmrs
Copy link
Contributor

alxmrs commented Oct 25, 2022

My leading hypothesis is that the inspect call cannot find Dataclass fields that are Callables, like _compile:

_compiler: ClassVar[RecipeCompiler]

This is defined as an arbitrary callable:

RecipeCompiler = Callable[[BaseRecipe], Pipeline]

My first suggestion is to try to exclude the _compiler from the hash:

_hash_exclude_ = ["storage_config"]

If that doesn't work -- is there another Callable in the target subclass that is trying to be serialized in the dataclass? For example, process_input:

process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]] = None

To refine my question above: do we want to serialize callables like process_input in the hash? Of the linked recipes I've inspected in the OP, it looks like all of them use process_input.

If these are essential for the hash function -- I am happy to look into how we can get Beam's serializer to include these sources. Happy do some pickle engineering with you 🥒 .

@andersy005
Copy link
Member Author

thank you for looking into this, @alxmrs!

Happy do some pickle engineering with you 🥒 .

That would be great. i'm available tomorrow (wed 10/27) anytime between 9am - 11am PT and 12:30pm - 3:30pm PT

@alxmrs
Copy link
Contributor

alxmrs commented Oct 26, 2022

let's meet at 9:30 pst! ill send you a calendar invite in a bit. :)

@rabernat
Copy link
Contributor

Let me provide some background on the hash function. The goal here is to be able to determine if two invocations of a recipe are "the same": same inputs, same options, same processing steps, etc. In order to do that, Charles and I spent quite a bit of time developing these hashing capabilities and ensuring they are deterministic in #349 and #359. Our approach to hashing preprocessing functions, described here - #359 (comment), indeed involves using inspect.getsource and hashing the function source code.

Alternative solutions might investigate hashing the bytecode, or whatever else is available within the beam runtime.

As a more extreme solution, we could simply not compute the hash for any recipe with processing functions. I think that moving those blocked recipes forward is much more important than maintaining this somewhat fragile approach to hashing recipes. I'm not very optimistic that hashing can survive the beam refactor anyway--can we deterministically hash a beam Pipeline? I doubt it. So let's not spend too much time maintaining this feature.

yuvipanda added a commit to yuvipanda/pangeo-forge-recipes that referenced this issue Oct 26, 2022
This is guaranteed to be *not* stable across python versions (and
probably other implementation details), but without this stuff just
crashes

Fixes pangeo-forge#427
@yuvipanda
Copy link
Contributor

Yeah, I fell down this rabbit hole a few months (or weeks?) ago, and I just pushed my local branch up to this PR: #428. It might temporarily unblock us, but I agree that we should probably drop the hashing instead maybe?

@yuvipanda
Copy link
Contributor

pangeo-forge/pangeo-forge-runner#28 was my fix for pangeo-forge-runner, which allowed at least some functions (not all) to go through

@yuvipanda
Copy link
Contributor

Note that pangeo-forge/staged-recipes#183 (comment) is not just the hash, but more complex and relates (partially) to how beam runs code with the FnAPI. While #428 will unblock some it won't unblock all.

@alxmrs can you add me to the invite too? :)

yuvipanda added a commit to yuvipanda/pangeo-forge-recipes that referenced this issue Oct 26, 2022
This is guaranteed to be *not* stable across python versions (and
probably other implementation details), but without this stuff just
crashes

Fixes pangeo-forge#427
@derekocallaghan
Copy link
Contributor

Note that pangeo-forge/staged-recipes#183 (comment) is not just the hash, but more complex and relates (partially) to how beam runs code with the FnAPI. While #428 will unblock some it won't unblock all.

Yep, the issue I had with the ASCAT recipes is separate and related to subclassing XarrayZarrRecipe in order to insert a custom stage in the pipeline. This would currently fail even without a defined process_input or process_chunk. As @rabernat mentioned the other day, subclassing isn't recommended, where instead it'll eventually be possible to do something like this with custom Beam PTransforms.

@derekocallaghan
Copy link
Contributor

derekocallaghan commented Oct 26, 2022

I'd looked into this a bit last week when trying to debug the CCMP recipe issue, I've copied some of the comment text here along with a few other thoughts:

Looking at the staged/feedstock recipes that failed last week (EOOffshore CCMP, AGDC, LMR...) vs the recipe that ran successfully (eNATL60), the latter is the only one that doesn't define either a process_input or process_chunk function. The CCMP stack trace appears to be related to the inspect.getsource(obj) call where obj is either recipe.process_input or recipe.process_chunk.

I had a look at reproducing the scenario with a combination of code snippets similar to what's used in pangeo_forge_recipes (XarrayZarrRecipe, BaseRecipe etc) and pangeo_forge_runner:

In [43]: from fsspec.implementations.local import LocalFileSystem
    ...: import inspect
    ...: from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget, StorageConfig, temporary_storage_config
    ...: from pangeo_forge_runner import Feedstock
    ...: from pathlib import Path
    ...: 
    ...: feedstock = Feedstock(Path(".../staged-recipes/recipes/eooffshore_ics_ccmp_v02_1_nrt_wind"))
    ...: 
    ...: recipes = feedstock.parse_recipes()
    ...: 
    ...: recipes = {k: r.copy_pruned() for k, r in recipes.items()}
    ...: 
    ...: recipe = recipes['eooffshore_ics_ccmp_v02_1_nrt_wind']
    ...: 
    ...: storage_config = temporary_storage_config()
    ...: storage_config.target = FSSpecTarget(LocalFileSystem(), f'./ccmp.zarr')
    ...: storage_config.cache = CacheFSSpecTarget(LocalFileSystem(), './input-cache')
    ...: recipe.storage_config = storage_config
    ...: 

In [44]: recipe
Out[44]: XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2}>, storage_config=StorageConfig(target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='./ccmp.zarr'), cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='./input-cache'), metadata=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='/tmp/tmps62mr0gl/urkdQjLc')), inputs_per_chunk=2000, target_chunks={'time': 8000, 'latitude': -1, 'longitude': -1}, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, consolidate_dimension_coordinates=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=<function ics_wind_speed_direction at 0x7fbf19f7df70>, process_chunk=None, lock_timeout=None, subset_inputs={}, open_input_with_kerchunk=False)

In [45]: recipe.sha256()
Out[45]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5'

In [46]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_)
Out[46]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5'

In [47]: either_encode_or_hash(recipe.process_input)
Out[47]: 'def ics_wind_speed_direction(ds, fname):\n    """\n    Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and\n    direction for the u and v components in the specified product. Dask arrays are\n    created for delayed execution.\n    """\n    import dask\n    import dask.array as da\n    from datetime import datetime\n    from metpy.calc import wind_direction, wind_speed\n    import xarray as xr\n\n    @dask.delayed\n    def delayed_metpy_fn(fn, u, v):\n        return fn(u, v).values\n\n    # ICS grid\n    geospatial_lat_min = 45.75\n    geospatial_lat_max = 58.25\n    geospatial_lon_min = 333.85\n    geospatial_lon_max = 355.35\n    icds = ds.sel(\n        latitude=slice(geospatial_lat_min, geospatial_lat_max),\n        longitude=slice(geospatial_lon_min, geospatial_lon_max),\n    )\n\n    # Remove subset of original attrs as they\'re no longer relevant\n    for attr in ["base_date", "date_created", "history"]:\n        del icds.attrs[attr]\n\n    # Update the grid attributes\n    icds.attrs.update(\n        {\n            "geospatial_lat_min": geospatial_lat_min,\n            "geospatial_lat_max": geospatial_lat_max,\n            "geospatial_lon_min": geospatial_lon_min,\n            "geospatial_lon_max": geospatial_lon_max,\n        }\n    )\n    u = icds.uwnd\n    v = icds.vwnd\n    # Original wind speed \'units\': \'m s-1\' attribute not accepted by MetPy,\n    # use the unit contained in ERA5 data\n    ccmp_wind_speed_units = u.units\n    era5_wind_speed_units = "m s**-1"\n    u.attrs["units"] = era5_wind_speed_units\n    v.attrs["units"] = era5_wind_speed_units\n\n    variables = [\n        {\n            "name": "wind_speed",\n            "metpy_fn": wind_speed,\n            "attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},\n        },\n        {\n            "name": "wind_direction",\n            "metpy_fn": wind_direction,\n            "attrs": {"long_name": "Wind direction", "units": "degree"},\n        },\n    ]\n\n    # CCMP provides u/v at a single height, 10m\n    for variable in variables:\n        icds[variable["name"]] = (\n            xr.DataArray(\n                da.from_delayed(\n                    delayed_metpy_fn(variable["metpy_fn"], u, v), u.shape, dtype=u.dtype\n                ),\n                coords=u.coords,\n                dims=u.dims,\n            )\n            .assign_coords(height=10)\n            .expand_dims(["height"])\n        )\n        icds[variable["name"]].attrs.update(variable["attrs"])\n\n    icds.height.attrs.update(\n        {\n            "long_name": "Height above the surface",\n            "standard_name": "height",\n            "units": "m",\n        }\n    )\n    # Restore units\n    for variable in ["uwnd", "vwnd"]:\n        icds[variable].attrs["units"] = ccmp_wind_speed_units\n\n    icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(\n        datetime.now(), "%Y-%m-%dT%H:%M:%SZ"\n    )\n    icds.attrs[\n        "eooffshore_zarr_details"\n    ] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."\n    return icds\n'

In [48]: inspect.isfunction(recipe.process_input)
Out[48]: True

In [49]: inspect.getsource(recipe.process_input)
Out[49]: 'def ics_wind_speed_direction(ds, fname):\n    """\n    Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and\n    direction for the u and v components in the specified product. Dask arrays are\n    created for delayed execution.\n    """\n    import dask\n    import dask.array as da\n    from datetime import datetime\n    from metpy.calc import wind_direction, wind_speed\n    import xarray as xr\n\n    @dask.delayed\n    def delayed_metpy_fn(fn, u, v):\n        return fn(u, v).values\n\n    # ICS grid\n    geospatial_lat_min = 45.75\n    geospatial_lat_max = 58.25\n    geospatial_lon_min = 333.85\n    geospatial_lon_max = 355.35\n    icds = ds.sel(\n        latitude=slice(geospatial_lat_min, geospatial_lat_max),\n        longitude=slice(geospatial_lon_min, geospatial_lon_max),\n    )\n\n    # Remove subset of original attrs as they\'re no longer relevant\n    for attr in ["base_date", "date_created", "history"]:\n        del icds.attrs[attr]\n\n    # Update the grid attributes\n    icds.attrs.update(\n        {\n            "geospatial_lat_min": geospatial_lat_min,\n            "geospatial_lat_max": geospatial_lat_max,\n            "geospatial_lon_min": geospatial_lon_min,\n            "geospatial_lon_max": geospatial_lon_max,\n        }\n    )\n    u = icds.uwnd\n    v = icds.vwnd\n    # Original wind speed \'units\': \'m s-1\' attribute not accepted by MetPy,\n    # use the unit contained in ERA5 data\n    ccmp_wind_speed_units = u.units\n    era5_wind_speed_units = "m s**-1"\n    u.attrs["units"] = era5_wind_speed_units\n    v.attrs["units"] = era5_wind_speed_units\n\n    variables = [\n        {\n            "name": "wind_speed",\n            "metpy_fn": wind_speed,\n            "attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},\n        },\n        {\n            "name": "wind_direction",\n            "metpy_fn": wind_direction,\n            "attrs": {"long_name": "Wind direction", "units": "degree"},\n        },\n    ]\n\n    # CCMP provides u/v at a single height, 10m\n    for variable in variables:\n        icds[variable["name"]] = (\n            xr.DataArray(\n                da.from_delayed(\n                    delayed_metpy_fn(variable["metpy_fn"], u, v), u.shape, dtype=u.dtype\n                ),\n                coords=u.coords,\n                dims=u.dims,\n            )\n            .assign_coords(height=10)\n            .expand_dims(["height"])\n        )\n        icds[variable["name"]].attrs.update(variable["attrs"])\n\n    icds.height.attrs.update(\n        {\n            "long_name": "Height above the surface",\n            "standard_name": "height",\n            "units": "m",\n        }\n    )\n    # Restore units\n    for variable in ["uwnd", "vwnd"]:\n        icds[variable].attrs["units"] = ccmp_wind_speed_units\n\n    icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(\n        datetime.now(), "%Y-%m-%dT%H:%M:%SZ"\n    )\n    icds.attrs[\n        "eooffshore_zarr_details"\n    ] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."\n    return icds\n'

In [50]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_)
Out[50]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5'

As we're excluding storage_config when generating the hash, we could possibly also exclude process_input and process_chunk, as I imagine the recipe file_pattern will mostly be sufficient for the hash?

In [51]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_ + ['process_input'])
Out[51]: b'\x14w\xd5\xcbH\x15At:\x1bY;\xbb/P\x90%t\xf3\xd9T\\P\x9a\xf4\xf5\xa18\x0b\xa7M\xa8'

I don't think _compiler needs to be explicitly excluded, as it's currently excluded with the use of dataclasses.asdict():

_compiler = xarray_zarr_recipe_compiler

d = asdict(dclass, dict_factory=dict_drop_empty)

E.g.

In [27]: from dataclasses import asdict

In [28]: from pangeo_forge_recipes.serialization import dict_drop_empty

In [29]: asdict(recipe, dict_factory=dict_drop_empty)
Out[29]: 
{'file_pattern': <FilePattern {'time': 2}>,
 'storage_config': {'target': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
   'root_path': '/tmp/tmpz2tbd4o2/VTmsSMTh'},
  'cache': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
   'root_path': '/tmp/tmpz2tbd4o2/HC5bJHu4'},
  'metadata': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
   'root_path': '/tmp/tmpz2tbd4o2/80Soxcza'}},
 'inputs_per_chunk': 1,
 'target_chunks': {'time': 8000, 'latitude': -1, 'longitude': -1},
 'cache_inputs': True,
 'copy_input_to_local_file': False,
 'consolidate_zarr': True,
 'consolidate_dimension_coordinates': True,
 'delete_input_encoding': True,
 'process_input': <function ics_wind_speed_direction(ds: xarray.core.dataset.Dataset, fname: str) -> xarray.core.dataset.Dataset>,
 'open_input_with_kerchunk': False,
 'concat_dim': 'time',
 'concat_dim_chunks': 8000,
 'cache_metadata': True}

Suggested changes:

In pangeo_forge_recipes.recipes.base.BaseRecipe

_hash_exclude_ = ["process_chunk", "process_input", "storage_config"]

and in pangeo_forge_recipes.serialization.dataclass_sha256()

if k in d:
    del d[k]

Afaict, apart from tests, BaseRecipe.sha256() is currently (only?) called from BaseRecipe.get_execution_context(), which itself is called from XarrayZarrRecipe.prepare_target() (this scenario), and generating a default job name in pangeo_forge_runner.commands.bake.Bake.

Although a recipe file_pattern can define a custom format_function Callable, it isn't affected by the issue as a separate sha256() is used:

which will be called by:

Any format_function definition is excluded:

# we exclude the format function and combine dims from ``root`` because they determine the

I'd still be curious why this previously worked for feedstocks (e.g. AGDC), it might be worth looking into replicating the environment used for the previous successful feedstock runs.

@rabernat
Copy link
Contributor

rabernat commented Oct 26, 2022

we could possibly also exclude process_input and process_chunk

We could exclude them, but this would defeat the point of hashing, because two recipes with very different processing code (and therefore very different results) would end up with the same hash. At that point, the hash is useless, and we should just drop this feature.

The logic for excluding storage config is very different. That is about where the data are going, but not the contents of the dataset itself.

@derekocallaghan
Copy link
Contributor

derekocallaghan commented Oct 26, 2022

we could possibly also exclude process_input and process_chunk

We could exclude them, but this would defeat the point of hashing, because two recipes with very different processing code (and therefore very different results) would end up with the same hash. At that point, the hash is useless, and we should just drop this feature.

The logic for excluding storage config is very different. That is about where the data are going, but not the contents of the dataset itself.

Thanks, that makes sense.

I'd still be curious about what changed between the previous successful feedstock recipe bakes and the current failures. E.g. AGDC seems to have worked in the staged environment, but not in production. If the hash issue is resolved here, e.g. with #429, it might the case that other issues appear. Having said that, if I understand correctly, the CCMP recipe will have called process_input as part of the cache_input stage, which precedes the problematic prepare_target stage. so maybe no further issues will occur.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants