Skip to content

Commit

Permalink
Also support NetCDFtoZarrMultiVarSequentialRecipe
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Mar 6, 2021
1 parent a468066 commit 89ffe45
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pangeo_forge/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class NetCDFtoZarrRecipe(BaseRecipe):
:param sequence_dim: The dimension name along which the inputs will be concatenated.
:param inputs_per_chunk: The number of inputs to use in each chunk.
:param nitems_per_input: The length of each input along the `sequence_dim` dimension.
:param processed_input_nb: The number of inputs already used to generate the existing dataset.
:param target: A location in which to put the dataset. Can also be assigned at run time.
:param input_cache: A location in which to cache temporary data.
:param require_cache: Whether to allow opening inputs directly which have not
Expand All @@ -154,6 +155,7 @@ class NetCDFtoZarrRecipe(BaseRecipe):
sequence_dim: Optional[str] = None
inputs_per_chunk: int = 1
nitems_per_input: int = 1
processed_input_nb: int = 0
target: Optional[AbstractTarget] = field(default_factory=UninitializedTarget)
input_cache: Optional[AbstractTarget] = field(default_factory=UninitializedTarget)
require_cache: bool = True
Expand Down Expand Up @@ -436,16 +438,18 @@ class NetCDFtoZarrMultiVarSequentialRecipe(NetCDFtoZarrRecipe):
"""There are muliples sequences of input files (but all along the same dimension.)
Different variables live in different files.
:param processed_input_urls: The inputs already used to generate the existing dataset.
:param input_pattern: An pattern used to generate the input file names.
"""

processed_input_urls: Iterable[str] = field(repr=False, default_factory=list)
input_pattern: VariableSequencePattern = field(default_factory=VariableSequencePattern)

def __post_init__(self):
super().__post_init__()
self._variables = self.input_pattern.keys["variable"]
self.processed_input_nb = 0 # TODO
self._inputs = {k: {"url": v, "processed": False} for k, v in self.input_pattern}
self.processed_input_nb = len(self.processed_input_urls) // len(self._variables)
self._inputs = {k: {"url": v, "processed": v in self.processed_input_urls} for k, v in self.input_pattern}
# input keys are tuples like
# ("temp", 0)
# ("temp", 1)
Expand Down
39 changes: 39 additions & 0 deletions tests/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,42 @@ def test_NetCDFtoZarrMultiVarSequentialRecipe(

ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute()
assert ds_target.identical(daily_xarray_dataset)


def test_NetCDFtoZarrMultiVarSequentialRecipeIncremental(
daily_xarray_dataset, netcdf_local_paths_by_variable, tmp_target, tmp_cache
):
paths, items_per_file, fnames_by_variable, path_format = netcdf_local_paths_by_variable
pattern1 = VariableSequencePattern(
path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 4))}
)
r1 = recipe.NetCDFtoZarrMultiVarSequentialRecipe(
input_pattern=pattern1,
sequence_dim="time",
inputs_per_chunk=1,
nitems_per_input=items_per_file,
target=tmp_target,
input_cache=tmp_cache,
)

processed_input_urls = [v for k, v in pattern1]
pattern2 = VariableSequencePattern(
path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 2))}
)
r2 = recipe.NetCDFtoZarrMultiVarSequentialRecipe(
processed_input_urls=processed_input_urls,
input_pattern=pattern2,
sequence_dim="time",
inputs_per_chunk=1,
nitems_per_input=items_per_file,
target=tmp_target,
input_cache=tmp_cache,
)
# check that r2 needs r1 to be executed first
with pytest.raises(FileNotFoundError):
_manually_execute_recipe(r2)
_manually_execute_recipe(r1)
_manually_execute_recipe(r2)

ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute()
assert ds_target.identical(daily_xarray_dataset)

0 comments on commit 89ffe45

Please sign in to comment.