From a46806614fcab1e627400749c60f82243e7f0cf4 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 3 Mar 2021 23:02:47 +0100 Subject: [PATCH 1/2] Support incremental appending --- pangeo_forge/recipe.py | 73 ++++++++++++++++++++++++++---------------- tests/test_recipe.py | 37 +++++++++++++++++++-- 2 files changed, 80 insertions(+), 30 deletions(-) diff --git a/pangeo_forge/recipe.py b/pangeo_forge/recipe.py index ad74e622..b07f0b18 100644 --- a/pangeo_forge/recipe.py +++ b/pangeo_forge/recipe.py @@ -204,7 +204,7 @@ def _prepare_target(): chunk_key ).chunk() # make sure data are not in memory init_dsets.append(chunk_ds) - # TODO: create csutomizable option for this step + # TODO: create customizable option for this step # How to combine attrs is particularly important. It seems like # xarray is missing a "minimal" option to only keep the attrs # that are the same among all input variables. @@ -234,36 +234,47 @@ def _prepare_target(): @property def cache_input(self) -> Callable: def cache_func(input_key: Hashable) -> None: - logger.info(f"Caching input {input_key}") - fname = self._inputs[input_key] - # TODO: add a check for whether the input is already cached? - with input_opener(fname, mode="rb", **self.fsspec_open_kwargs) as source: - with self.input_cache.open(fname, mode="wb") as target: - # TODO: make this configurable? Would we ever want to change it? - BLOCK_SIZE = 10_000_000 # 10 MB - while True: - data = source.read(BLOCK_SIZE) - if not data: - break - target.write(data) + if self._inputs[input_key]["processed"]: + logger.info(f"Dry-run: caching input {input_key}") + else: + logger.info(f"Caching input {input_key}") + fname = self._inputs[input_key]["url"] + # TODO: add a check for whether the input is already cached? + with input_opener(fname, mode="rb", **self.fsspec_open_kwargs) as source: + with self.input_cache.open(fname, mode="wb") as target: + # TODO: make this configurable? Would we ever want to change it? + BLOCK_SIZE = 10_000_000 # 10 MB + while True: + data = source.read(BLOCK_SIZE) + if not data: + break + target.write(data) return cache_func @property def store_chunk(self) -> Callable: def _store_chunk(chunk_key): - ds_chunk = self.open_chunk(chunk_key) + write_region = self.region_for_chunk(chunk_key) + if all( + [ + self._inputs[input_key]["processed"] + for input_key in self.inputs_for_chunk(chunk_key) + ] + ): + logger.info(f"Dry-run: storing chunk '{chunk_key}' to Zarr region {write_region}") + else: + ds_chunk = self.open_chunk(chunk_key) - def drop_vars(ds): - # writing a region means that all the variables MUST have sequence_dim - to_drop = [v for v in ds.variables if self.sequence_dim not in ds[v].dims] - return ds.drop_vars(to_drop) + def drop_vars(ds): + # writing a region means that all the variables MUST have sequence_dim + to_drop = [v for v in ds.variables if self.sequence_dim not in ds[v].dims] + return ds.drop_vars(to_drop) - ds_chunk = drop_vars(ds_chunk) - target_mapper = self.target.get_mapper() - write_region = self.region_for_chunk(chunk_key) - logger.info(f"Storing chunk '{chunk_key}' to Zarr region {write_region}") - ds_chunk.to_zarr(target_mapper, region=write_region) + ds_chunk = drop_vars(ds_chunk) + target_mapper = self.target.get_mapper() + logger.info(f"Storing chunk '{chunk_key}' to Zarr region {write_region}") + ds_chunk.to_zarr(target_mapper, region=write_region) return _store_chunk @@ -297,7 +308,7 @@ def input_opener(self, fname: str): yield f def open_input(self, input_key: Hashable): - fname = self._inputs[input_key] + fname = self._inputs[input_key]["url"] with self.input_opener(fname) as f: logger.info(f"Opening input with Xarray {input_key}: '{fname}'") ds = xr.open_dataset(f, **self.xarray_open_kwargs) @@ -357,7 +368,7 @@ def expand_target_dim(self, dim, dimsize): # now explicity write the sequence coordinate to avoid missing data # when reopening if dim in zgroup: - zgroup[dim][:] = 0 + zgroup[dim][self.nitems_per_input * self.processed_input_nb :] = 0 # noqa: E203 def inputs_for_chunk(self, chunk_key): return self._chunks_inputs[chunk_key] @@ -392,15 +403,20 @@ class NetCDFtoZarrSequentialRecipe(NetCDFtoZarrRecipe): """There is only one sequence of input files. Each file can contain many variables. + :param processed_input_urls: The inputs already used to generate the existing dataset. :param input_urls: The inputs used to generate the dataset. """ + processed_input_urls: Iterable[str] = field(repr=False, default_factory=list) input_urls: Iterable[str] = field(repr=False, default_factory=list) def __post_init__(self): super().__post_init__() - input_pattern = ExplicitURLSequence(self.input_urls) - self._inputs = {k: v for k, v in input_pattern} + self.processed_input_nb = len(self.processed_input_urls) + input_pattern = ExplicitURLSequence(self.processed_input_urls + self.input_urls) + self._inputs = { + k: {"url": v, "processed": k < self.processed_input_nb} for k, v in input_pattern + } self._chunks_inputs = { k: v for k, v in enumerate(chunked_iterable(self._inputs, self.inputs_per_chunk)) } @@ -428,7 +444,8 @@ class NetCDFtoZarrMultiVarSequentialRecipe(NetCDFtoZarrRecipe): def __post_init__(self): super().__post_init__() self._variables = self.input_pattern.keys["variable"] - self._inputs = {k: v for k, v in self.input_pattern} + self.processed_input_nb = 0 # TODO + self._inputs = {k: {"url": v, "processed": False} for k, v in self.input_pattern} # input keys are tuples like # ("temp", 0) # ("temp", 1) diff --git a/tests/test_recipe.py b/tests/test_recipe.py index 761c5a40..8ebf3ee2 100644 --- a/tests/test_recipe.py +++ b/tests/test_recipe.py @@ -27,6 +27,41 @@ def _manually_execute_recipe(r): r.finalize_target() +def test_NetCDFtoZarrSequentialRecipeIncremental( + daily_xarray_dataset, netcdf_local_paths, tmp_target, tmp_cache +): + + paths, items_per_file = netcdf_local_paths + n = len(paths) // 2 + + paths1 = paths[:n] + r = recipe.NetCDFtoZarrSequentialRecipe( + input_urls=paths1, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + _manually_execute_recipe(r) + + paths2 = paths[n:] + r = recipe.NetCDFtoZarrSequentialRecipe( + processed_input_urls=paths1, + input_urls=paths2, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + _manually_execute_recipe(r) + + ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).load() + ds_expected = daily_xarray_dataset.compute() + assert ds_target.identical(ds_expected) + + @pytest.mark.parametrize( "username, password", [("foo", "bar"), ("foo", "wrong"),], # noqa: E231 ) @@ -164,6 +199,4 @@ def test_NetCDFtoZarrMultiVarSequentialRecipe( _manually_execute_recipe(r) ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute() - print(ds_target) - print(daily_xarray_dataset) assert ds_target.identical(daily_xarray_dataset) From 29d5fd0f75998fc1db1d0514289a9ef8c605d67a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sat, 6 Mar 2021 22:27:29 +0100 Subject: [PATCH 2/2] Also support NetCDFtoZarrMultiVarSequentialRecipe --- pangeo_forge/recipe.py | 11 +++++++++-- tests/test_recipe.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pangeo_forge/recipe.py b/pangeo_forge/recipe.py index b07f0b18..375e5f45 100644 --- a/pangeo_forge/recipe.py +++ b/pangeo_forge/recipe.py @@ -132,6 +132,7 @@ class NetCDFtoZarrRecipe(BaseRecipe): :param sequence_dim: The dimension name along which the inputs will be concatenated. :param inputs_per_chunk: The number of inputs to use in each chunk. :param nitems_per_input: The length of each input along the `sequence_dim` dimension. + :param processed_input_nb: The number of inputs already used to generate the existing dataset. :param target: A location in which to put the dataset. Can also be assigned at run time. :param input_cache: A location in which to cache temporary data. :param require_cache: Whether to allow opening inputs directly which have not @@ -154,6 +155,7 @@ class NetCDFtoZarrRecipe(BaseRecipe): sequence_dim: Optional[str] = None inputs_per_chunk: int = 1 nitems_per_input: int = 1 + processed_input_nb: int = 0 target: Optional[AbstractTarget] = field(default_factory=UninitializedTarget) input_cache: Optional[AbstractTarget] = field(default_factory=UninitializedTarget) require_cache: bool = True @@ -436,16 +438,21 @@ class NetCDFtoZarrMultiVarSequentialRecipe(NetCDFtoZarrRecipe): """There are muliples sequences of input files (but all along the same dimension.) Different variables live in different files. + :param processed_input_urls: The inputs already used to generate the existing dataset. :param input_pattern: An pattern used to generate the input file names. """ + processed_input_urls: Iterable[str] = field(repr=False, default_factory=list) input_pattern: VariableSequencePattern = field(default_factory=VariableSequencePattern) def __post_init__(self): super().__post_init__() self._variables = self.input_pattern.keys["variable"] - self.processed_input_nb = 0 # TODO - self._inputs = {k: {"url": v, "processed": False} for k, v in self.input_pattern} + self.processed_input_nb = len(self.processed_input_urls) // len(self._variables) + self._inputs = { + k: {"url": v, "processed": v in self.processed_input_urls} + for k, v in self.input_pattern + } # input keys are tuples like # ("temp", 0) # ("temp", 1) diff --git a/tests/test_recipe.py b/tests/test_recipe.py index 8ebf3ee2..d9731822 100644 --- a/tests/test_recipe.py +++ b/tests/test_recipe.py @@ -200,3 +200,42 @@ def test_NetCDFtoZarrMultiVarSequentialRecipe( ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute() assert ds_target.identical(daily_xarray_dataset) + + +def test_NetCDFtoZarrMultiVarSequentialRecipeIncremental( + daily_xarray_dataset, netcdf_local_paths_by_variable, tmp_target, tmp_cache +): + paths, items_per_file, fnames_by_variable, path_format = netcdf_local_paths_by_variable + pattern1 = VariableSequencePattern( + path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 4))} + ) + r1 = recipe.NetCDFtoZarrMultiVarSequentialRecipe( + input_pattern=pattern1, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + + processed_input_urls = [v for k, v in pattern1] + pattern2 = VariableSequencePattern( + path_format, keys={"variable": ["foo", "bar"], "n": list(range(len(paths) // 2))} + ) + r2 = recipe.NetCDFtoZarrMultiVarSequentialRecipe( + processed_input_urls=processed_input_urls, + input_pattern=pattern2, + sequence_dim="time", + inputs_per_chunk=1, + nitems_per_input=items_per_file, + target=tmp_target, + input_cache=tmp_cache, + ) + # check that r2 needs r1 to be executed first + with pytest.raises(FileNotFoundError): + _manually_execute_recipe(r2) + _manually_execute_recipe(r1) + _manually_execute_recipe(r2) + + ds_target = xr.open_zarr(tmp_target.get_mapper(), consolidated=True).compute() + assert ds_target.identical(daily_xarray_dataset)