diff --git a/pangeo_forge/recipe.py b/pangeo_forge/recipe.py index b07f0b18f..96a809668 100644 --- a/pangeo_forge/recipe.py +++ b/pangeo_forge/recipe.py @@ -132,6 +132,7 @@ class NetCDFtoZarrRecipe(BaseRecipe): :param sequence_dim: The dimension name along which the inputs will be concatenated. :param inputs_per_chunk: The number of inputs to use in each chunk. :param nitems_per_input: The length of each input along the `sequence_dim` dimension. + :param processed_input_nb: The number of inputs already used to generate the existing dataset. :param target: A location in which to put the dataset. Can also be assigned at run time. :param input_cache: A location in which to cache temporary data. :param require_cache: Whether to allow opening inputs directly which have not @@ -154,6 +155,7 @@ class NetCDFtoZarrRecipe(BaseRecipe): sequence_dim: Optional[str] = None inputs_per_chunk: int = 1 nitems_per_input: int = 1 + processed_input_nb: int = 0 target: Optional[AbstractTarget] = field(default_factory=UninitializedTarget) input_cache: Optional[AbstractTarget] = field(default_factory=UninitializedTarget) require_cache: bool = True @@ -436,16 +438,18 @@ class NetCDFtoZarrMultiVarSequentialRecipe(NetCDFtoZarrRecipe): """There are muliples sequences of input files (but all along the same dimension.) Different variables live in different files. + :param processed_input_urls: The inputs already used to generate the existing dataset. :param input_pattern: An pattern used to generate the input file names. """ + processed_input_urls: Iterable[str] = field(repr=False, default_factory=list) input_pattern: VariableSequencePattern = field(default_factory=VariableSequencePattern) def __post_init__(self): super().__post_init__() self._variables = self.input_pattern.keys["variable"] - self.processed_input_nb = 0 # TODO - self._inputs = {k: {"url": v, "processed": False} for k, v in self.input_pattern} + self.processed_input_nb = len(self.processed_input_urls) // len(self._variables) + self._inputs = {k: {"url": v, "processed": v in self.processed_input_urls} for k, v in self.input_pattern} # input keys are tuples like # ("temp", 0) # ("temp", 1) diff --git a/tests/test_recipe.py b/tests/test_recipe.py index 8ebf3ee24..d97318220 100644 --- a/tests/test_recipe.py +++ b/tests/test_recipe.py @@ -200,3 +200,42 @@ def test_NetCDFtoZarrMultiVarSequentialRecipe( ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute() assert ds_target.identical(daily_xarray_dataset) + + +def test_NetCDFtoZarrMultiVarSequentialRecipeIncremental( + daily_xarray_dataset, netcdf_local_paths_by_variable, tmp_target, tmp_cache +): + paths, items_per_file, fnames_by_variable, path_format = netcdf_local_paths_by_variable + pattern1 = VariableSequencePattern( + path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 4))} + ) + r1 = recipe.NetCDFtoZarrMultiVarSequentialRecipe( + input_pattern=pattern1, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + + processed_input_urls = [v for k, v in pattern1] + pattern2 = VariableSequencePattern( + path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 2))} + ) + r2 = recipe.NetCDFtoZarrMultiVarSequentialRecipe( + processed_input_urls=processed_input_urls, + input_pattern=pattern2, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + # check that r2 needs r1 to be executed first + with pytest.raises(FileNotFoundError): + _manually_execute_recipe(r2) + _manually_execute_recipe(r1) + _manually_execute_recipe(r2) + + ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute() + assert ds_target.identical(daily_xarray_dataset)