Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Derived CMIP6 data recipe builder (WIP) #252

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
67 changes: 67 additions & 0 deletions pangeo_forge_recipes/builders/cmip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List, Optional, Union

from cmip6_preprocessing.postprocessing import match_metrics
from cmip6_preprocessing.preprocessing import combined_preprocessing
from cmip6_preprocessing.utils import google_cmip_col

from ..patterns import pattern_from_file_sequence
from ..recipes import XarrayZarrRecipe


def cmip6_weighted_mean_recipe(
facets: dict,
weight_coord: str,
mean_dims: Union[List[str], str],
coarsen_time: Optional[int] = None,
) -> XarrayZarrRecipe:

# Initialize CMIP6 catalog
col = google_cmip_col()

# Get path to variable data according to facets
# Note: might be superceded by opener refactor (custom opener which takes facets as kwargs)
cat = col.search(**facets)
path_list = cat.df["zstore"].tolist()
assert len(path_list) == 1
path = path_list[0]

# Load all available weights to enable imperfect matching, see:
# https://cmip6-preprocessing.readthedocs.io/en/latest/postprocessing.html#Handling-grid-metrics-in-CMIP6
cat_weight = col.search(
variable_id=weight_coord, source_id=facets["source_id"], grid_label=facets["grid_label"],
)
ddict_weight = cat_weight.to_dataset_dict(
zarr_kwargs={"consolidated": True, "use_cftime": True},
preprocess=combined_preprocessing,
aggregate=False,
)

def process_input(ds, fname):
ds = combined_preprocessing(ds)

ddict = dict(blank=ds)
ddict = match_metrics(ddict, ddict_weight, [weight_coord])

assert len(ddict) == 1
_, ds_out = ddict.popitem()

# TODO: write weight provenance to ds_out attributes here
ds_out = ds_out.weighted(ds_out[weight_coord].fillna(0)).mean(mean_dims)

if coarsen_time is not None:
ds_out = ds_out.coarsen(time=coarsen_time, boundary="trim").mean()

return ds_out

bypass_open = True
pattern = pattern_from_file_sequence(
[path], "time", is_opendap=bypass_open, # Haha, okay this is weird, obviously.
)
xarray_open_kwargs = dict(engine="zarr")
recipe = XarrayZarrRecipe(
pattern,
target_chunks={"time": 1},
xarray_open_kwargs=xarray_open_kwargs,
process_input=process_input,
)
return recipe
10 changes: 5 additions & 5 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ def __init__(
self.fsspec_open_kwargs = fsspec_open_kwargs if fsspec_open_kwargs else {}
self.query_string_secrets = query_string_secrets if query_string_secrets else {}
self.is_opendap = is_opendap
if self.fsspec_open_kwargs and self.is_opendap:
raise ValueError(
"OPeNDAP inputs are not opened with `fsspec`. "
"`is_opendap` must be `False` when passing `fsspec_open_kwargs`."
)
# if self.fsspec_open_kwargs and self.is_opendap:
# raise ValueError(
# "OPeNDAP inputs are not opened with `fsspec`. "
# "`is_opendap` must be `False` when passing `fsspec_open_kwargs`."
# )

def __repr__(self):
return f"<FilePattern {self.dims}>"
Expand Down
18 changes: 9 additions & 9 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,15 @@ def __post_init__(self):
self.cache_inputs = False
if self.open_input_with_fsspec_reference:
raise ValueError("Can't generate references on opendap inputs")
if "engine" in self.xarray_open_kwargs:
if self.xarray_open_kwargs["engine"] != "netcdf4":
raise ValueError(
"Opendap inputs only work with `xarray_open_kwargs['engine'] == 'netcdf4'`"
)
else:
new_kw = self.xarray_open_kwargs.copy()
new_kw["engine"] = "netcdf4"
self.xarray_open_kwargs = new_kw
# if "engine" in self.xarray_open_kwargs:
# if self.xarray_open_kwargs["engine"] != "netcdf4":
# raise ValueError(
# "Opendap inputs only work with `xarray_open_kwargs['engine'] == 'netcdf4'`"
# )
# else:
# new_kw = self.xarray_open_kwargs.copy()
# new_kw["engine"] = "netcdf4"
# self.xarray_open_kwargs = new_kw
elif self.cache_inputs is None:
self.cache_inputs = True # old defult

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ max-line-length = 100

[isort]
known_first_party=pangeo_forge_recipes
known_third_party=aiohttp,apache_beam,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
known_third_party=aiohttp,apache_beam,click,cmip6_preprocessing,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down