diff --git a/pangeo_forge_recipes/builders/__init__.py b/pangeo_forge_recipes/builders/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pangeo_forge_recipes/builders/cmip.py b/pangeo_forge_recipes/builders/cmip.py new file mode 100644 index 00000000..cca707b5 --- /dev/null +++ b/pangeo_forge_recipes/builders/cmip.py @@ -0,0 +1,67 @@ +from typing import List, Optional, Union + +from cmip6_preprocessing.postprocessing import match_metrics +from cmip6_preprocessing.preprocessing import combined_preprocessing +from cmip6_preprocessing.utils import google_cmip_col + +from ..patterns import pattern_from_file_sequence +from ..recipes import XarrayZarrRecipe + + +def cmip6_weighted_mean_recipe( + facets: dict, + weight_coord: str, + mean_dims: Union[List[str], str], + coarsen_time: Optional[int] = None, +) -> XarrayZarrRecipe: + + # Initialize CMIP6 catalog + col = google_cmip_col() + + # Get path to variable data according to facets + # Note: might be superceded by opener refactor (custom opener which takes facets as kwargs) + cat = col.search(**facets) + path_list = cat.df["zstore"].tolist() + assert len(path_list) == 1 + path = path_list[0] + + # Load all available weights to enable imperfect matching, see: + # https://cmip6-preprocessing.readthedocs.io/en/latest/postprocessing.html#Handling-grid-metrics-in-CMIP6 + cat_weight = col.search( + variable_id=weight_coord, source_id=facets["source_id"], grid_label=facets["grid_label"], + ) + ddict_weight = cat_weight.to_dataset_dict( + zarr_kwargs={"consolidated": True, "use_cftime": True}, + preprocess=combined_preprocessing, + aggregate=False, + ) + + def process_input(ds, fname): + ds = combined_preprocessing(ds) + + ddict = dict(blank=ds) + ddict = match_metrics(ddict, ddict_weight, [weight_coord]) + + assert len(ddict) == 1 + _, ds_out = ddict.popitem() + + # TODO: write weight provenance to ds_out attributes here + ds_out = ds_out.weighted(ds_out[weight_coord].fillna(0)).mean(mean_dims) + + if coarsen_time is not None: + ds_out = ds_out.coarsen(time=coarsen_time, boundary="trim").mean() + + return ds_out + + bypass_open = True + pattern = pattern_from_file_sequence( + [path], "time", is_opendap=bypass_open, # Haha, okay this is weird, obviously. + ) + xarray_open_kwargs = dict(engine="zarr") + recipe = XarrayZarrRecipe( + pattern, + target_chunks={"time": 1}, + xarray_open_kwargs=xarray_open_kwargs, + process_input=process_input, + ) + return recipe diff --git a/pangeo_forge_recipes/patterns.py b/pangeo_forge_recipes/patterns.py index 36460919..a1e967de 100644 --- a/pangeo_forge_recipes/patterns.py +++ b/pangeo_forge_recipes/patterns.py @@ -151,11 +151,11 @@ def __init__( self.fsspec_open_kwargs = fsspec_open_kwargs if fsspec_open_kwargs else {} self.query_string_secrets = query_string_secrets if query_string_secrets else {} self.is_opendap = is_opendap - if self.fsspec_open_kwargs and self.is_opendap: - raise ValueError( - "OPeNDAP inputs are not opened with `fsspec`. " - "`is_opendap` must be `False` when passing `fsspec_open_kwargs`." - ) + # if self.fsspec_open_kwargs and self.is_opendap: + # raise ValueError( + # "OPeNDAP inputs are not opened with `fsspec`. " + # "`is_opendap` must be `False` when passing `fsspec_open_kwargs`." + # ) def __repr__(self): return f"" diff --git a/pangeo_forge_recipes/recipes/xarray_zarr.py b/pangeo_forge_recipes/recipes/xarray_zarr.py index 23ce9c1a..e286bb16 100644 --- a/pangeo_forge_recipes/recipes/xarray_zarr.py +++ b/pangeo_forge_recipes/recipes/xarray_zarr.py @@ -716,15 +716,15 @@ def __post_init__(self): self.cache_inputs = False if self.open_input_with_fsspec_reference: raise ValueError("Can't generate references on opendap inputs") - if "engine" in self.xarray_open_kwargs: - if self.xarray_open_kwargs["engine"] != "netcdf4": - raise ValueError( - "Opendap inputs only work with `xarray_open_kwargs['engine'] == 'netcdf4'`" - ) - else: - new_kw = self.xarray_open_kwargs.copy() - new_kw["engine"] = "netcdf4" - self.xarray_open_kwargs = new_kw + # if "engine" in self.xarray_open_kwargs: + # if self.xarray_open_kwargs["engine"] != "netcdf4": + # raise ValueError( + # "Opendap inputs only work with `xarray_open_kwargs['engine'] == 'netcdf4'`" + # ) + # else: + # new_kw = self.xarray_open_kwargs.copy() + # new_kw["engine"] = "netcdf4" + # self.xarray_open_kwargs = new_kw elif self.cache_inputs is None: self.cache_inputs = True # old defult diff --git a/setup.cfg b/setup.cfg index d4a8d695..deafc820 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,7 +60,7 @@ max-line-length = 100 [isort] known_first_party=pangeo_forge_recipes -known_third_party=aiohttp,apache_beam,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr +known_third_party=aiohttp,apache_beam,click,cmip6_preprocessing,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr multi_line_output=3 include_trailing_comma=True force_grid_wrap=0