From 7b09a9a30b8aa52c16830d2dd3ca41887c35a62b Mon Sep 17 00:00:00 2001 From: Dr Griffith Rees Date: Mon, 16 Sep 2024 14:55:35 +0100 Subject: [PATCH] refactor: `crop` methods in `resample.py` to classes in `crop.py` --- python/clim_recal/config.py | 75 ++- python/clim_recal/crop.py | 837 ++++++++++++++++++++++++++++++ python/clim_recal/pipeline.py | 6 +- python/clim_recal/resample.py | 745 +++++++++++++------------- python/clim_recal/utils/core.py | 16 + python/clim_recal/utils/data.py | 6 + python/clim_recal/utils/xarray.py | 32 +- python/conftest.py | 7 +- python/tests/test_crop.py | 109 ++++ python/tests/test_resample.py | 143 +++-- python/tests/test_utils_xarray.py | 14 +- 11 files changed, 1523 insertions(+), 467 deletions(-) create mode 100644 python/clim_recal/crop.py create mode 100644 python/tests/test_crop.py diff --git a/python/clim_recal/config.py b/python/clim_recal/config.py index 2d5ef35f..570bc808 100644 --- a/python/clim_recal/config.py +++ b/python/clim_recal/config.py @@ -8,17 +8,23 @@ from osgeo import gdal from tqdm import TqdmExperimentalWarning, tqdm +from .crop import CPMRegionCropManager, HADsRegionCropManager from .debiasing.debias_wrapper import BaseRunConfig, RunConfig, RunConfigType from .resample import ( - CPM_OUTPUT_LOCAL_PATH, - HADS_OUTPUT_LOCAL_PATH, RAW_CPM_PATH, RAW_HADS_PATH, CPMResamplerManager, HADsResamplerManager, ) from .utils.core import console, product_dict, results_path -from .utils.data import MethodOptions, RegionOptions, RunOptions, VariableOptions +from .utils.data import ( + CPM_OUTPUT_PATH, + HADS_OUTPUT_PATH, + MethodOptions, + RegionOptions, + RunOptions, + VariableOptions, +) warnings.filterwarnings("ignore", category=TqdmExperimentalWarning) @@ -104,8 +110,8 @@ class ClimRecalConfig(BaseRunConfig): output_path: PathLike = DEFAULT_OUTPUT_PATH resample_folder: PathLike = DEFAULT_RESAMPLE_FOLDER crops_folder: PathLike = DEFAULT_CROPS_FOLDER - hads_output_folder: PathLike = HADS_OUTPUT_LOCAL_PATH - cpm_output_folder: PathLike = CPM_OUTPUT_LOCAL_PATH + hads_output_folder: PathLike = HADS_OUTPUT_PATH + cpm_output_folder: PathLike = CPM_OUTPUT_PATH cpm_kwargs: dict = field(default_factory=dict) hads_kwargs: dict = field(default_factory=dict) resample_start_index: int = 0 @@ -228,28 +234,63 @@ def __post_init__(self) -> None: input_paths=self.cpm_input_path, variables=self.variables, runs=self.runs, - resample_paths=self.resample_cpm_path, - crop_paths=self.crops_path, - resample_start_index=self.resample_start_index, - resample_stop_index=self.resample_stop_index, - crop_start_index=self.crop_start_index, - crop_stop_index=self.crop_stop_index, + # resample_paths=self.resample_cpm_path, + output_paths=self.resample_cpm_path, + # crop_paths=self.crops_path, + start_index=self.resample_start_index, + stop_index=self.resample_stop_index, + # resample_start_index=self.resample_start_index, + # resample_stop_index=self.resample_stop_index, + # crop_start_index=self.crop_start_index, + # crop_stop_index=self.crop_stop_index, **self.cpm_kwargs, ) self.set_cpm_for_coord_alignment() self.hads_manager = HADsResamplerManager( input_paths=self.hads_input_path, variables=self.variables, - resample_paths=self.resample_hads_path, - crop_paths=self.crops_path, - resample_start_index=self.resample_start_index, - resample_stop_index=self.resample_stop_index, - crop_start_index=self.crop_start_index, - crop_stop_index=self.crop_stop_index, + # resample_paths=self.resample_hads_path, + output_paths=self.resample_hads_path, + # crop_paths=self.crops_path, + start_index=self.resample_start_index, + stop_index=self.resample_stop_index, + # resample_start_index=self.resample_start_index, + # resample_stop_index=self.resample_stop_index, + # crop_start_index=self.crop_start_index, + # crop_stop_index=self.crop_stop_index, cpm_for_coord_alignment=self.cpm_for_coord_alignment, cpm_for_coord_alignment_path_converted=self.cpm_for_coord_alignment_path_converted, **self.hads_kwargs, ) + self.cpm_crops_manager = CPMRegionCropManager( + input_paths=self.cpm_output_folder, + variables=self.variables, + runs=self.runs, + # resample_paths=self.resample_cpm_path, + output_paths=self.cropped_cpm_path, + # crop_paths=self.crops_path, + start_index=self.crop_start_index, + stop_index=self.crop_stop_index, + # resample_start_index=self.resample_start_index, + # resample_stop_index=self.resample_stop_index, + # crop_start_index=self.crop_start_index, + # crop_stop_index=self.crop_stop_index, + **self.cpm_kwargs, + ) + self.hads_crops_manager = HADsRegionCropManager( + input_paths=self.hads_output_folder, + variables=self.variables, + # resample_paths=self.resample_cpm_path, + output_paths=self.cropped_hads_path, + # crop_paths=self.crops_path, + start_index=self.crop_start_index, + stop_index=self.crop_stop_index, + # resample_start_index=self.resample_start_index, + # resample_stop_index=self.resample_stop_index, + # crop_start_index=self.crop_start_index, + # crop_stop_index=self.crop_stop_index, + **self.cpm_kwargs, + ) self.total_cpus: int | None = cpu_count() if self.cpus == None or (self.total_cpus and self.cpus >= self.total_cpus): self.cpus = 1 if not self.total_cpus else self.total_cpus - 1 diff --git a/python/clim_recal/crop.py b/python/clim_recal/crop.py new file mode 100644 index 00000000..7ec806ae --- /dev/null +++ b/python/clim_recal/crop.py @@ -0,0 +1,837 @@ +from dataclasses import dataclass, field +from datetime import date +from logging import getLogger +from os import PathLike, cpu_count +from pathlib import Path +from typing import Any, Iterable, Iterator, Sequence + +from tqdm.rich import trange +from xarray import Dataset +from xarray.core.types import T_Dataset + +from .resample import ( + RAW_CPM_TASMAX_PATH, + RAW_HADS_TASMAX_PATH, + RESAMPLING_OUTPUT_PATH, + ResamplerBase, + ResamplerManagerBase, +) +from .utils.core import console, multiprocess_execute +from .utils.data import ( + CPM_CROP_OUTPUT_PATH, + CPM_END_DATE, + CPM_OUTPUT_PATH, + CPM_START_DATE, + HADS_CROP_OUTPUT_PATH, + HADS_END_DATE, + HADS_OUTPUT_PATH, + HADS_START_DATE, + RegionOptions, + RunOptions, + VariableOptions, +) +from .utils.xarray import crop_xarray, region_crop_file_name + +logger = getLogger(__name__) + +# RESAMPLING_OUTPUT_PATH: Final[PathLike] = ( +# CLIMATE_DATA_MOUNT_PATH / "CPM-365/andys-two-gdal-step-approach/resample" +# ) +# CPM_CROP_OUTPUT_PATH: Final[Path] = Path("cpm-crop") +# HADS_CROP_OUTPUT_PATH: Final[Path] = Path("hads-crop") + + +@dataclass(kw_only=True, repr=False) +class RegionCropperBase(ResamplerBase): + """Manage resampling HADs datafiles for modelling. + + Attributes + ---------- + input_path + `Path` to `HADs` files to process. + output + `Path` to save processed `HADS` files. + input_files + `Path` or `Paths` of `NCF` files to resample. + crop + Path or file to spatially crop `input_files` with. + start_index + First index of file to iterate processing from. + stop_index + Last index of files to iterate processing from as a count from `resample_start_index`. + If `None`, this will simply iterate over all available files. + """ + + crop_region: RegionOptions | str | None = RegionOptions.GLASGOW + crop_path: PathLike = RESAMPLING_OUTPUT_PATH + + def range_crop_projection( + self, + start: int | None = None, + stop: int | None = None, + step: int = 1, + override_export_path: Path | None = None, + return_results: bool = False, + # possible meanse of reducing memory issues by removing + # xarray instance while keeping paths for logging purposes + # delete_xarray_after_save: bool = True, + **kwargs, + ) -> Iterator[Path]: + start = start or self.stop_index + stop = stop or self.stop_index + self._export_paths: list[Path | T_Dataset] = [] + if stop is None: + stop = len(self) + console.print(f"Cropping to '{self.output_path}'") + for index in trange(start, stop, step): + self._export_paths.append( + self.crop_projection( + # region=region, + index=index, + override_export_path=override_export_path, + return_results=return_results, + **kwargs, + ) + ) + yield self._export_paths[-1] + # return export_paths + + def crop_projection( + self, + index: int = 0, + override_export_path: Path | None = None, + return_results: bool = False, + sync_reprojection_paths: bool = True, + **kwargs, + ) -> Path | T_Dataset: + """Crop a projection to `region` geometry.""" + console.log(f"Preparing to crop `_reprojected_paths` index {index} from {self}") + try: + assert hasattr(self, "_reprojected_paths") + except AssertionError: + if sync_reprojection_paths: + self._sync_reprojected_paths() + else: + raise AttributeError( + f"'_reprojected_paths' must be set. " + "Run after 'self.to_reprojection()' or set as a " + "list directly." + ) + try: + assert self.crop_region in RegionOptions + except AttributeError: + raise ValueError( + f"'{self.crop_path}' not in 'RegionOptions': {RegionOptions.all()}" + ) + path: PathLike = override_export_path or Path(self.crop_path) # / (region) + path.mkdir(exist_ok=True, parents=True) + resampled_xr: Dataset = self._reprojected_paths[index] + console.log(f"From {self} crop {resampled_xr}") + cropped: Dataset = crop_xarray( + xr_time_series=resampled_xr, + crop_box=RegionOptions.bounding_box(self.crop_region), + **kwargs, + ) + cropped_file_name: str = region_crop_file_name( + self.crop_region, resampled_xr.name + ) + export_path: Path = path / cropped_file_name + cropped.to_netcdf(export_path) + if not hasattr(self, "_cropped_paths"): + self._cropped_paths: list[PathLike] = [] + self._cropped_paths.append(export_path) + if return_results: + return cropped + else: + return export_path + + # def execute_crops(self, skip_crop: bool = False, **kwargs) -> list[Path] | None: + # """Run all specified crops.""" + # return self.range_crop_projection(**kwargs) if not skip_crop else None + + def execute(self, skip_crop: bool = False, **kwargs) -> Iterator[Path] | None: + """Run all specified crops.""" + return self.range_crop_projection(**kwargs) if not skip_crop else None + + +@dataclass(kw_only=True, repr=False) +class HADsRegionCropper(RegionCropperBase): + """Manage resampling HADs datafiles for modelling. + + Attributes + ---------- + input_path + `Path` to `HADs` files to process. + output + `Path` to save processed `HADS` files. + input_files + `Path` or `Paths` of `NCF` files to resample. + crop + Path or file to spatially crop `input_files` with. + start_index + First index of file to iterate processing from. + stop_index + Last index of files to iterate processing from as a count from `resample_start_index`. + If `None`, this will simply iterate over all available files. + + Examples + -------- + >>> if not is_data_mounted: + ... pytest.skip(mount_doctest_skip_message) + >>> resample_test_hads_output_path: Path = getfixture( + ... 'resample_test_hads_output_path') + >>> hads_cropper: HADsResampler = HADsResampler( # doctest: +SKIP + ... output_path=resample_test_hads_output_path, + ... ) + >>> hads_cropper # doctest: +SKIP + + >>> pprint(hads_resampler.input_files) # doctest: +SKIP + (...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_19800101-19800131.nc'), + ...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_19800201-19800229.nc'), + ..., + ...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_20211201-20211231.nc')) + """ + + input_path: PathLike | None = RAW_HADS_TASMAX_PATH + crop_path: PathLike = RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_PATH + + +@dataclass(kw_only=True, repr=False) +class CPMRegionCropper(RegionCropperBase): + """Manage resampling HADs datafiles for modelling. + + Attributes + ---------- + input_path + `Path` to `HADs` files to process. + output + `Path` to save processed `HADS` files. + input_files + `Path` or `Paths` of `NCF` files to resample. + crop + Path or file to spatially crop `input_files` with. + start_index + First index of file to iterate processing from. + stop_index + Last index of files to iterate processing from as a count from `resample_start_index`. + If `None`, this will simply iterate over all available files. + + Examples + -------- + >>> if not is_data_mounted: + ... pytest.skip(mount_doctest_skip_message) + >>> resample_test_hads_output_path: Path = getfixture( + ... 'resample_test_hads_output_path') + >>> hads_cropper: HADsResampler = HADsResampler( # doctest: +SKIP + ... output_path=resample_test_hads_output_path, + ... ) + >>> hads_cropper # doctest: +SKIP + + >>> pprint(hads_resampler.input_files) # doctest: +SKIP + (...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_19800101-19800131.nc'), + ...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_19800201-19800229.nc'), + ..., + ...Path('.../tasmax/day/tasmax_hadukgrid_uk_1km_day_20211201-20211231.nc')) + """ + + input_path: PathLike | None = RAW_CPM_TASMAX_PATH + crop_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_PATH + + +@dataclass(kw_only=True, repr=False) +class RegionCropperManagerBase(ResamplerManagerBase): + """Base class to inherit for `HADs` and `CPM` resampler managers.""" + + # input_paths: PathLike | Sequence[PathLike] = Path() + # resample_paths: PathLike | Sequence[PathLike] = Path() + output_paths: PathLike | Sequence[PathLike] = Path() + # variables: Sequence[VariableOptions | str] = (VariableOptions.default(),) + crop_regions: tuple[RegionOptions | str, ...] = RegionOptions.all() + # crop_paths: Sequence[PathLike] | PathLike = Path() + # sub_path: Path = Path() + # start_index: int = 0 + # stop_index: int | None = None + # crop_start_index: int = 0 + # crop_stop_index: int | None = None + # start_date: date | None = None + # end_date: date | None = None + configs: list[HADsRegionCropper | CPMRegionCropper] = field(default_factory=list) + config_default_kwargs: dict[str, Any] = field(default_factory=dict) + # resampler_class: type[HADsResampler | CPMResampler] | None = None + calc_class: type[HADsRegionCropper | CPMRegionCropper] | None = None + # cpus: int | None = None + # _input_path_dict: dict[Path, str] = field(default_factory=dict) + # _output_path_dict: dict[PathLike, VariableOptions | str] = field( + # default_factory=dict + # ) + # _output_path_dict: dict[PathLike, VariableOptions | str] = field( + # default_factory=dict + # ) + # _strict_fail_if_var_in_input_path: bool = True + # _allow_check_fail: bool = False + _raw_input_path_dict: dict[Path, VariableOptions | str] = field( + default_factory=dict + ) + + # class VarirableInBaseImportPathError(Exception): + # """Checking import path validity for `self.variables`.""" + # + # pass + + def __post_init__(self) -> None: + """Populate config attributes.""" + if not self.crop_regions: + self.crop_regions = () + self.check_paths() + self.total_cpus: int | None = cpu_count() + if not self.cpus: + self.cpus = 1 if not self.total_cpus else self.total_cpus + # self.cpm_for_coord_alignment: T_Dataset | PathLike = RAW_CPM_TASMAX_PATH + + # + # @property + # def input_folder(self) -> Path | None: + # """Return `self._input_path` set by `set_input_paths()`.""" + # if hasattr(self, "_input_path"): + # return Path(self._input_path) + # else: + # return None + + # @property + # def resample_folder(self) -> Path | None: + # """Return `self._output_path` set by `set_resample_paths()`.""" + # if hasattr(self, "_output_path"): + # return Path(self._output_path) + # else: + # return None + + # @property + # def output_folder(self) -> Path | None: + # """Return `self._output_path` set by `set_resample_paths()`.""" + # if hasattr(self, "_output_path"): + # return Path(self._output_path) + # else: + # return None + + @property + def crop_folder(self) -> Path | None: + """Return `self._output_path` set by `set_resample_paths()`.""" + if hasattr(self, "_crop_path"): + return Path(self._crop_path) + else: + return None + + def __repr__(self) -> str: + """Summary of `self` configuration as a `str`.""" + return ( + f"<{self.__class__.__name__}(" + f"variables_count={len(self.variables)}, " + f"input_paths_count={len(self.input_paths) if isinstance(self.input_paths, Sequence) else 1})>" + ) + + # def _gen_output_folder_paths( + # self, + # path: PathLike, + # append_input_path_dict: bool = False, + # append_output_path_dict: bool = False, + # ) -> Iterator[tuple[Path, Path]]: + # """Yield paths of resampled `self.variables` and `self.runs`.""" + # for var in self.variables: + # input_path: Path = Path(path) / var / self.sub_path + # resample_path: Path = Path(path) / var + # if append_input_path_dict: + # self._input_path_dict[input_path] = var + # if append_output_path_dict: + # self._output_path_dict[resample_path] = var + # yield input_path, resample_path + + def check_paths( + self, run_set_input_paths: bool = True, run_set_output_paths: bool = True + ): + """Check and set `input`, `resample` and `crop` paths.""" + if run_set_input_paths: + self.set_input_paths() + + # if run_set_data_paths: + # self.set_resample_paths() + if run_set_output_paths: + self.set_output_paths() + assert isinstance(self.input_paths, Iterable) + # assert isinstance(self.resample_paths, Iterable) + if self.output_paths: + try: + assert isinstance(self.output_paths, Iterable) + except AssertionError: + raise ValueError( + f"'output_paths' not iterable for {self}. Hint: try setting 'run_set_output_paths' to 'True'." + ) + # assert len(self.input_paths) == len(self.resample_paths) + for path in self.input_paths: + try: + assert Path(path).exists() + assert Path(path).is_dir() + except AssertionError: + message: str = ( + f"One of 'self.input_paths' in {self} not valid: '{path}'" + ) + if self._allow_check_fail: + logger.error(message) + else: + raise FileExistsError(message) + try: + assert path in self._input_path_dict + except AssertionError: + NotImplementedError( + f"Syncing `self._input_path_dict` with changes to `self.input_paths`." + ) + + def set_input_paths(self): + """Propagate `self.input_paths` if needed.""" + if isinstance(self.input_paths, PathLike): + self._input_path = self.input_paths + self.input_paths = tuple( + input_path + for input_path, _ in self._gen_input_folder_paths( + self.input_paths, + append_input_path_dict=True, + append_raw_input_path_dict=True, + ) + ) + if self._strict_fail_if_var_in_input_path: + for var in self.variables: + try: + assert var not in str(self._input_path) + except AssertionError: + raise self.VarirableInBaseImportPathError( + f"Folder named '{var}' in self._input_path: " + f"'{self._input_path}'. Try passing a parent path or " + f"set '_strict_fail_if_var_in_input_path' to 'False'." + ) + + # + # def set_resample_paths(self): + # """Propagate `self.resample_paths` if needed.""" + # self._set_input_paths() + # if isinstance(self.resample_paths, PathLike): + # self._output_path = self.resample_paths + # self.resample_paths = tuple( + # resample_path + # for _, resample_path in self._gen_output_folder_paths( + # self.resample_paths, append_output_path_dict=True + # ) + # ) + def _gen_output_folder_paths( + self, + path: PathLike, + append_output_path_dict: bool = False, + ) -> Iterator[Path | None]: + """Return a Generator of paths of `self.variables` and `self.crops`.""" + raise NotImplementedError + + def _gen_input_folder_paths( + self, + path: PathLike, + append_input_path_dict: bool = False, + # append_output_path_dict: bool = False, + append_raw_input_path_dict: bool = False, + ) -> Iterator[tuple[Path, Path]]: + raise NotImplementedError + + def set_output_paths(self) -> None: + """Propagate `self.resample_paths` if needed.""" + if isinstance(self.output_paths, PathLike): + self._output_paths = self.output_paths + self.output_paths = tuple( + self._gen_output_folder_paths( + self.output_paths, + append_output_path_dict=True, # append_raw_input_path_dict=True + ) + ) + + # def yield_configs(self) -> Iterable[ResamplerBase]: + # """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" + # self.check_paths() + # assert isinstance(self.resample_paths, Iterable) + # for index, var_path in enumerate(self._input_path_dict.items()): + # yield self.calc_class( + # input_path=var_path[0], + # output_path=self.resample_paths[index], + # variable_name=var_path[1], + # start_index=self.start_index, + # stop_index=self.stop_index, + # **self.config_default_kwargs, + # ) + # + def yield_crop_configs(self) -> Iterable[ResamplerBase]: + """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" + self.check_paths() + try: + assert isinstance(self.input_paths, Iterable) + assert isinstance(self.output_paths, Iterable) + except AssertionError as error: + raise error + # assert isinstance(self.resample_paths, Iterable) + # assert isinstance(self.output_paths, Iterable) + for index, input_paths in enumerate(self._input_path_dict.items()): + for crop_path, region in self._output_path_dict.items(): + yield self.calc_class( + input_path=input_paths[0], + # output_path=self.resample_paths[index], + output_path=crop_path, + variable_name=input_resample_paths[1], + start_index=self.start_index, + stop_index=self.stop_index, + # crop_path=crop_path, + # Todo: remove below if single crop configs iterate over all + # crop_regions=self.crop_regions, + # crop_regions=(region,), + crop_region=region, + **self.config_default_kwargs, + ) + + # def __len__(self) -> int: + # """Return the length of `self.input_files`.""" + # return ( + # len(self.input_paths[self.start_index : self.stop_index]) + # if isinstance(self.input_paths, Sequence) + # else 0 + # ) + + # @property + # def max_count(self) -> int: + # """Maximum length of `self.input_files` ignoring `start_index` and `start_index`.""" + # return len(self.input_paths) if isinstance(self.input_paths, Sequence) else 0 + + # def __iter__(self) -> Iterator[Path] | None: + # if isinstance(self.input_paths, Sequence): + # for file_path in self.input_paths[ + # self.start_index : self.stop_index + # ]: + # yield Path(file_path) + # else: + # return None + + # def __getitem__(self, key: int | slice) -> Path | tuple[Path, ...] | None: + # if not self.input_paths or not isinstance(self.input_paths, Sequence): + # return None + # elif isinstance(key, int): + # return Path(self.input_paths[key]) + # elif isinstance(key, slice): + # return tuple(Path(path) for path in self.input_paths[key]) + # else: + # raise IndexError(f"Can only index with 'int', not: '{key}'") + # # + # def execute_resample_configs( + # self, multiprocess: bool = False, cpus: int | None = None + # ) -> tuple[ResamplerBase, ...]: + # """Run all resampler configurations + # + # Parameters + # ---------- + # multiprocess + # If `True` run parameters in `resample_configs` with `multiprocess_execute`. + # cpus + # Number of `cpus` to pass to `multiprocess_execute`. + # """ + # resamplers: tuple[ResamplerBase, ...] = tuple(self.yield_configs()) + # results: list[list[Path] | None] = [] + # if multiprocess: + # cpus = cpus or self.cpus + # if self.total_cpus and cpus: + # cpus = min(cpus, self.total_cpus - 1) + # results = multiprocess_execute(resamplers, method_name="execute", cpus=cpus) + # else: + # for resampler in resamplers: + # print(resampler) + # results.append(resampler.execute()) + # return resamplers + + def execute_configs( + self, multiprocess: bool = False, cpus: int | None = None + ) -> tuple[ResamplerBase, ...]: + """Run all resampler configurations + + Parameters + ---------- + multiprocess + If `True` run parameters in `resample_configs` with `multiprocess_execute`. + cpus + Number of `cpus` to pass to `multiprocess_execute`. + """ + croppers: tuple[ResamplerBase, ...] = tuple(self.yield_crop_configs()) + results: list[list[Path] | None] = [] + if multiprocess: + cpus = cpus or self.cpus + if self.total_cpus and cpus: + cpus = min(cpus, self.total_cpus - 1) + results = multiprocess_execute(croppers, method_name="execute", cpus=cpus) + else: + for cropper in croppers: + print(cropper) + results.append(cropper.execute()) + return croppers + + +@dataclass(kw_only=True, repr=False) +class HADsRegionCropManager(RegionCropperManagerBase): + """Class to manage processing HADs resampling. + + Attributes + ---------- + input_paths + `Path` or `Paths` to `CPM` files to process. If `Path`, will be propegated with files matching + resample_paths + `Path` or `Paths` to to save processed `CPM` files to. If `Path` will be propagated to match `input_paths`. + variables + Which `VariableOptions` to include. + crop_regions + `RegionOptions` (like Manchester, Scotland etc.) to crop results to. + crop_paths + Where to save region crop files. + sub_path + `Path` to include at the stem of `input_paths`. + start_index + Index to begin iterating input files for `resampling` or `cropping`. + stop_index + Index to to run from `start_index` to when `resampling` or + `cropping`. If `None`, iterate full list of paths. + start_date + Not yet implemented, but in future from what date to generate start index from. + end_date + Not yet implemented, but in future from what date to generate stop index from. + configs + List of `HADsResampler` instances to iterate `resampling` or `cropping`. + config_default_kwargs + Parameters passed to all running `self.configs`. + calc_class + `class` to construct all `self.configs` instances with. + cpus + Number of `cpu` cores to use during multiprocessing. + cpm_for_coord_alignment + `CPM` `Path` or `Dataset` to match alignment with. + cpm_for_coord_alignment_path_converted + Whether a `Path` passed to `cpm_for_coord_alignment` should be processed. + + Examples + -------- + >>> if not is_data_mounted: + ... pytest.skip(mount_doctest_skip_message) + >>> resample_test_hads_output_path: Path = getfixture( + ... 'resample_test_hads_output_path') + >>> hads_resampler_manager: HADsResamplerManager = HADsResamplerManager( + ... variables=VariableOptions.all(), + ... output_paths=resample_test_hads_output_path, + ... ) + >>> hads_resampler_manager + + """ + + input_paths: PathLike | Sequence[PathLike] = HADS_OUTPUT_PATH + # resample_paths: PathLike | Sequence[PathLike] = ( + # RESAMPLING_OUTPUT_PATH / HADS_OUTPUT_PATH + # ) + output_paths: Sequence[PathLike] | PathLike = ( + RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_PATH + ) + # sub_path: Path = Path() + start_date: date = HADS_START_DATE + end_date: date = HADS_END_DATE + configs: list[HADsRegionCropper] = field(default_factory=list) + config_default_kwargs: dict[str, Any] = field(default_factory=dict) + calc_class: type[HADsRegionCropper] = HADsRegionCropper + # cpm_for_coord_alignment: T_Dataset | PathLike = RAW_CPM_TASMAX_PATH + # cpm_for_coord_alignment_path_converted: bool = False + + def _gen_input_folder_paths( + self, + path: PathLike, + append_input_path_dict: bool = False, + append_raw_input_path_dict: bool = False, + # append_output_path_dict: bool = False, + ) -> Iterator[tuple[Path, Path]]: + """Yield paths of resampled `self.variables` and `self.runs`.""" + for var in self.variables: + raw_input_path: Path = Path(path) / var / self.sub_path + input_path: Path = Path(path) / var + if append_input_path_dict: + self._input_path_dict[input_path] = var + if append_raw_input_path_dict: + self._raw_input_path_dict[raw_input_path] = var + # if append_output_path_dict: + # self._output_path_dict[output_path] = var + yield input_path, raw_input_path + + def _gen_output_folder_paths( + self, path: PathLike, append_output_path_dict: bool = False + ) -> Iterator[Path | None]: + """Return a Generator of paths of `self.variables` and `self.crops`.""" + if not self.crop_regions: + return None + if not self.input_paths: + self._gen_input_folder_paths( + self.input_paths, + append_input_path_dict=True, + # Assuming crop paths need to take over + # append_output_path_dict=True + ) + # if not self._output_path_dict: + # self._gen_output_folder_paths( + # self.input_paths, + # append_input_path_dict=True, + # append_output_path_dict=True, + # ) + for var in self.variables: + for region in self.crop_regions: + crop_path = Path(path) / HADS_OUTPUT_PATH / region / var + if append_output_path_dict: + self._output_path_dict[crop_path] = region + yield crop_path + + +@dataclass(kw_only=True, repr=False) +class CPMRegionCropManager(RegionCropperManagerBase): + """Class to manage processing CPM resampling. + + Attributes + ---------- + input_paths + `Path` or `Paths` to `CPM` files to process. If `Path`, will be propegated with files matching + resample_paths + `Path` or `Paths` to to save processed `CPM` files to. If `Path` will be propagated to match `input_paths`. + variables + Which `VariableOptions` to include. + runs + Which `RunOptions` to include. + crop_regions + `RegionOptions` (like Manchester, Scotland etc.) to crop results to. + crop_paths + Where to save region crop files. + sub_path + `Path` to include at the stem of `input_paths`. + start_index + Index to begin iterating input files for `resampling` or `cropping`. + stop_index + Index to to run from `start_index` to when `resampling` or + `cropping`. If `None`, iterate full list of paths. + start_date + Not yet implemented, but in future from what date to generate start index from. + end_date + Not yet implemented, but in future from what date to generate stop index from. + configs + List of `HADsResampler` instances to iterate `resampling` or `cropping`. + config_default_kwargs + Parameters passed to all running `self.configs`. + calc_class + `class` to construct all `self.configs` instances with. + cpus + Number of `cpu` cores to use during multiprocessing. + + + Examples + -------- + >>> if not is_data_mounted: + ... pytest.skip(mount_doctest_skip_message) + >>> resample_test_cpm_output_path: Path = getfixture( + ... 'resample_test_cpm_output_path') + >>> cpm_resampler_manager: CPMResamplerManager = CPMResamplerManager( + ... stop_index=9, + ... resample_paths=resample_test_cpm_output_path, + ... output_paths=resample_test_cpm_output_path, + ... ) + >>> cpm_resampler_manager + + >>> configs: tuple[CPMResampler, ...] = tuple( + ... cpm_resampler_manager.yield_configs()) + >>> pprint(configs) + (, + , + , + ) + """ + + input_paths: PathLike | Sequence[PathLike] = ( + RESAMPLING_OUTPUT_PATH / CPM_OUTPUT_PATH + ) + # resample_paths: PathLike | Sequence[PathLike] = ( + # RESAMPLING_OUTPUT_PATH / CPM_OUTPUT_PATH + # ) + output_paths: PathLike | Sequence[PathLike] = ( + RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_PATH + ) + # sub_path: Path = CPM_SUB_PATH + start_date: date = CPM_START_DATE + end_date: date = CPM_END_DATE + configs: list[CPMRegionCropper] = field(default_factory=list) + calc_class: type[CPMRegionCropper] = CPMRegionCropper + # Runs are CPM simulations, not applicalbe to HADs + runs: Sequence[RunOptions | str] = RunOptions.preferred() + # crop_paths = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_PATH + + def _gen_input_folder_paths( + self, + path: PathLike, + append_input_path_dict: bool = False, + # append_output_path_dict: bool = False, + append_raw_input_path_dict: bool = False, + cpm_paths: bool = True, + ) -> Iterator[tuple[Path, Path]]: + """Return a Generator of paths of `self.variables` and `self.runs`.""" + for var in self.variables: + for run_type in self.runs: + if cpm_paths: + input_raw_path: Path = ( + Path(path) + / VariableOptions.cpm_value(var) + / run_type + / self.sub_path + ) + input_path: Path = ( + Path(path) / VariableOptions.cpm_value(var) / run_type + ) + else: + input_raw_path = Path(path) / var / run_type / self.sub_path + input_path = Path(path) / var / run_type + if append_input_path_dict: + self._input_path_dict[input_path] = var + if append_raw_input_path_dict: + self._raw_input_path_dict[input_raw_path] = var + yield input_path, input_raw_path + + def _gen_output_folder_paths( + self, + path: PathLike, + append_output_path_dict: bool = False, + cpm_paths: bool = True, + ) -> Iterator[Path]: + """Return a Generator of paths of `self.variables` and `self.crops`.""" + for var in self.variables: + for region in self.crop_regions: + for run_type in self.runs: + if cpm_paths: + crop_path: Path = ( + Path(path) + / CPM_OUTPUT_PATH + / region + / VariableOptions.cpm_value(var) + / run_type + ) + else: + crop_path: Path = ( + Path(path) / CPM_OUTPUT_PATH / region / var / run_type + ) + if append_output_path_dict: + self._output_path_dict[crop_path] = region + yield crop_path diff --git a/python/clim_recal/pipeline.py b/python/clim_recal/pipeline.py index 1de25001..ecd2aaa4 100644 --- a/python/clim_recal/pipeline.py +++ b/python/clim_recal/pipeline.py @@ -227,10 +227,10 @@ def main( Note the `_allow_check_fail` parameters support running the examples without data mounted from a server. + >>> if not is_data_mounted: + ... pytest.skip(mount_doctest_skip_message) >>> main(variables=("rainfall", "tasmin"), ... output_path=test_runs_output_path, - ... cpm_kwargs=dict(_allow_check_fail=True), - ... hads_kwargs=dict(_allow_check_fail=True), ... ) 'set_cpm_for_coord_alignment' for 'HADs' not speficied. Defaulting to 'self.cpm_input_path': '...' @@ -239,7 +239,7 @@ def main( regions_count=1, methods_count=1, cpm_folders_count=2, hads_folders_count=2, resample_start_index=0, resample_stop_index=None, - cpus=...)> + crop_star_index=0, cpus=...)> diff --git a/python/clim_recal/resample.py b/python/clim_recal/resample.py index 4eec9df1..87006eb3 100644 --- a/python/clim_recal/resample.py +++ b/python/clim_recal/resample.py @@ -10,31 +10,36 @@ from logging import getLogger from os import PathLike, cpu_count from pathlib import Path -from typing import Any, Callable, Final, Iterable, Iterator, Literal, Sequence +from typing import Any, Final, Iterable, Iterator, Literal, Sequence import dill as pickle import rioxarray # nopycln: import from osgeo.gdal import Dataset as GDALDataset from rich import print -from tqdm.rich import trange from xarray import Dataset from xarray.core.types import T_Dataset from clim_recal.debiasing.debias_wrapper import VariableOptions -from .utils.core import climate_data_mount_path, console, multiprocess_execute +from .utils.core import ( + _get_source_path, + climate_data_mount_path, + console, + multiprocess_execute, +) from .utils.data import ( CPM_END_DATE, + CPM_OUTPUT_PATH, CPM_RAW_X_COLUMN_NAME, CPM_RAW_Y_COLUMN_NAME, CPM_START_DATE, CPM_SUB_PATH, HADS_END_DATE, + HADS_OUTPUT_PATH, HADS_START_DATE, HADS_SUB_PATH, HADS_XDIM, HADS_YDIM, - RegionOptions, RunOptions, VariableOptions, ) @@ -45,10 +50,9 @@ ReprojectFuncType, apply_geo_func, cpm_reproject_with_standard_calendar, - crop_xarray, get_cpm_for_coord_alignment, hads_resample_and_reproject, - region_crop_file_name, + trange_wrapper, ) logger = getLogger(__name__) @@ -68,10 +72,10 @@ RAW_HADS_TASMAX_PATH: Final[PathLike] = RAW_HADS_PATH / "tasmax/day" RAW_CPM_TASMAX_PATH: Final[PathLike] = RAW_CPM_PATH / "tasmax/01/latest" -CPM_OUTPUT_LOCAL_PATH: Final[Path] = Path("cpm") -HADS_OUTPUT_LOCAL_PATH: Final[Path] = Path("hads") -CPM_CROP_OUTPUT_LOCAL_PATH: Final[Path] = Path("cpm-crop") -HADS_CROP_OUTPUT_LOCAL_PATH: Final[Path] = Path("hads-crop") +# CPM_OUTPUT_PATH: Final[Path] = Path(CPM_OUTPUT_PATH) +# HADS_OUTPUT_LOCAL_PATH: Final[Path] = Path(HADS_OUTPUT_PATH) +# CPM_CROP_OUTPUT_LOCAL_PATH: Final[Path] = Path("cpm-crop") +# HADS_CROP_OUTPUT_LOCAL_PATH: Final[Path] = Path("hads-crop") NETCDF_OR_TIF = Literal[TIF_EXTENSION_STR, NETCDF_EXTENSION_STR] @@ -88,7 +92,7 @@ def reproject_2_2km_filename(path: Path) -> Path: @dataclass(kw_only=True) -class ResamblerBase: +class ResamplerBase: """Base class to inherit for `HADs` and `CPM`.""" input_path: PathLike | None = Path() @@ -96,17 +100,17 @@ class ResamblerBase: variable_name: VariableOptions | str = VariableOptions.default() input_files: Iterable[PathLike] | None = None cpus: int | None = None - crop_region: RegionOptions | str | None = RegionOptions.GLASGOW - crop_path: PathLike = RESAMPLING_OUTPUT_PATH + # crop_region: RegionOptions | str | None = RegionOptions.GLASGOW + # crop_path: PathLike = RESAMPLING_OUTPUT_PATH final_crs: str = BRITISH_NATIONAL_GRID_EPSG input_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR export_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR input_file_x_column_name: str = "" input_file_y_column_name: str = "" - resample_start_index: int = 0 - resample_stop_index: int | None = None - crop_start_index: int = 0 - crop_stop_index: int | None = None + start_index: int = 0 + stop_index: int | None = None + # resample_start_index: int = 0 + # resample_stop_index: int | None = None def __post_init__(self) -> None: """Generate related attributes.""" @@ -118,8 +122,8 @@ def __post_init__(self) -> None: ) self.set_input_files() Path(self.output_path).mkdir(parents=True, exist_ok=True) - if self.crop_region: - Path(self.crop_path).mkdir(parents=True, exist_ok=True) + # if self.crop_region: + # Path(self.crop_path).mkdir(parents=True, exist_ok=True) self.total_cpus: int | None = cpu_count() if not self.cpus: self.cpus = 1 if not self.total_cpus else self.total_cpus @@ -127,21 +131,19 @@ def __post_init__(self) -> None: def __len__(self) -> int: """Return the length of `self.input_files`.""" return ( - len(self.input_files[self.resample_start_index : self.resample_stop_index]) + len(self.input_files[self.start_index : self.stop_index]) if isinstance(self.input_files, Sequence) else 0 ) @property def max_count(self) -> int: - """Maximum length of `self.input_files` ignoring `resample_start_index` and `resample_start_index`.""" + """Maximum length of `self.input_files` ignoring `start_index` and `start_index`.""" return len(self.input_files) if isinstance(self.input_files, Sequence) else 0 def __iter__(self) -> Iterator[Path] | None: if self.input_files and isinstance(self.input_files, Sequence): - for file_path in self.input_files[ - self.resample_start_index : self.resample_stop_index - ]: + for file_path in self.input_files[self.start_index : self.stop_index]: yield Path(file_path) else: return None @@ -199,27 +201,27 @@ def _output_path( path.mkdir(exist_ok=True, parents=True) return path - def _range_call( - self, - method: Callable, - start: int, - stop: int | None, - step: int, - override_export_path: Path | None = None, - source_to_index: Iterable | None = None, - ) -> list[Path | T_Dataset]: - export_paths: list[Path | T_Dataset] = [] - if stop is None: - stop = len(self) - for index in trange(start, stop, step): - export_paths.append( - method( - index=index, - override_export_path=override_export_path, - source_to_index=source_to_index, - ) - ) - return export_paths + # def _range_call( + # self, + # method: Callable, + # start: int, + # stop: int | None, + # step: int, + # override_export_path: Path | None = None, + # source_to_index: Iterable | None = None, + # ) -> list[Path | T_Dataset]: + # export_paths: list[Path | T_Dataset] = [] + # if stop is None: + # stop = len(self) + # for index in trange(start, stop, step): + # export_paths.append( + # method( + # index=index, + # override_export_path=override_export_path, + # source_to_index=source_to_index, + # ) + # ) + # return export_paths def range_to_reprojection( self, @@ -228,25 +230,42 @@ def range_to_reprojection( step: int = 1, override_export_path: Path | None = None, source_to_index: Sequence | None = None, - ) -> list[Path]: - start = start or self.resample_start_index - stop = stop or self.resample_stop_index - return self._range_call( - method=self.to_reprojection, + ) -> Iterator[Path]: + # start = start or self.resample_start_index + # stop = stop or self.resample_stop_index + start = start or self.start_index + stop = stop or self.stop_index + # for + # export_paths: list[Path | T_Dataset] = [] + if stop is None: + stop = len(self) + return trange_wrapper( + instance=self, + calc=self.to_reprojection, start=start, stop=stop, step=step, + default_export_path=Path(self.output_path), override_export_path=override_export_path, source_to_index=source_to_index, ) + # yield self.to_reprojection(index=start, stop=stop, step=step) + # return self._range_call( + # method=self.to_reprojection, + # start=start, + # stop=stop, + # step=step, + # override_export_path=override_export_path, + # source_to_index=source_to_index, + # ) def execute(self, skip_spatial: bool = False, **kwargs) -> list[Path] | None: """Run all steps for processing""" return self.range_to_reprojection(**kwargs) if not skip_spatial else None - def execute_crops(self, skip_crop: bool = False, **kwargs) -> list[Path] | None: - """Run all specified crops.""" - return self.range_crop_projection(**kwargs) if not skip_crop else None + # def execute_crops(self, skip_crop: bool = False, **kwargs) -> list[Path] | None: + # """Run all specified crops.""" + # return self.range_crop_projection(**kwargs) if not skip_crop else None def _sync_reprojected_paths( self, overwrite_output_path: PathLike | None = None @@ -262,89 +281,89 @@ def _sync_reprojected_paths( if local_path.is_file() and local_path.suffix == f".{NETCDF_EXTENSION_STR}" ] - def range_crop_projection( - self, - start: int | None = None, - stop: int | None = None, - step: int = 1, - override_export_path: Path | None = None, - return_results: bool = False, - # possible meanse of reducing memory issues by removing - # xarray instance while keeping paths for logging purposes - # delete_xarray_after_save: bool = True, - **kwargs, - ) -> list[Path]: - start = start or self.crop_stop_index - stop = stop or self.crop_stop_index - export_paths: list[Path | T_Dataset] = [] - if stop is None: - stop = len(self) - console.print(f"Cropping to '{self.crop_path}'") - for index in trange(start, stop, step): - export_paths.append( - self.crop_projection( - # region=region, - index=index, - override_export_path=override_export_path, - return_results=return_results, - **kwargs, - ) - ) - return export_paths - - def crop_projection( - self, - index: int = 0, - override_export_path: Path | None = None, - return_results: bool = False, - sync_reprojection_paths: bool = True, - **kwargs, - ) -> Path | T_Dataset: - """Crop a projection to `region` geometry.""" - console.log(f"Preparing to crop `_reprojected_paths` index {index} from {self}") - try: - assert hasattr(self, "_reprojected_paths") - except AssertionError: - if sync_reprojection_paths: - self._sync_reprojected_paths() - else: - raise AttributeError( - f"'_reprojected_paths' must be set. " - "Run after 'self.to_reprojection()' or set as a " - "list directly." - ) - try: - assert self.crop_region in RegionOptions - except AttributeError: - raise ValueError( - f"'{self.crop_path}' not in 'RegionOptions': {RegionOptions.all()}" - ) - path: PathLike = override_export_path or Path(self.crop_path) # / (region) - path.mkdir(exist_ok=True, parents=True) - resampled_xr: Dataset = self._reprojected_paths[index] - - console.log(f"From {self} crop {resampled_xr}") - cropped: Dataset = crop_xarray( - xr_time_series=resampled_xr, - crop_box=RegionOptions.bounding_box(self.crop_region), - **kwargs, - ) - cropped_file_name: str = region_crop_file_name( - self.crop_region, resampled_xr.name - ) - export_path: Path = path / cropped_file_name - cropped.to_netcdf(export_path) - if not hasattr(self, "_cropped_paths"): - self._cropped_paths: list[PathLike] = [] - self._cropped_paths.append(export_path) - if return_results: - return cropped - else: - return export_path + # def range_crop_projection( + # self, + # start: int | None = None, + # stop: int | None = None, + # step: int = 1, + # override_export_path: Path | None = None, + # return_results: bool = False, + # # possible meanse of reducing memory issues by removing + # # xarray instance while keeping paths for logging purposes + # # delete_xarray_after_save: bool = True, + # **kwargs, + # ) -> list[Path]: + # start = start or self.crop_stop_index + # stop = stop or self.crop_stop_index + # export_paths: list[Path | T_Dataset] = [] + # if stop is None: + # stop = len(self) + # console.print(f"Cropping to '{self.crop_path}'") + # for index in trange(start, stop, step): + # export_paths.append( + # self.crop_projection( + # # region=region, + # index=index, + # override_export_path=override_export_path, + # return_results=return_results, + # **kwargs, + # ) + # ) + # return export_paths + # + # def crop_projection( + # self, + # index: int = 0, + # override_export_path: Path | None = None, + # return_results: bool = False, + # sync_reprojection_paths: bool = True, + # **kwargs, + # ) -> Path | T_Dataset: + # """Crop a projection to `region` geometry.""" + # console.log(f"Preparing to crop `_reprojected_paths` index {index} from {self}") + # try: + # assert hasattr(self, "_reprojected_paths") + # except AssertionError: + # if sync_reprojection_paths: + # self._sync_reprojected_paths() + # else: + # raise AttributeError( + # f"'_reprojected_paths' must be set. " + # "Run after 'self.to_reprojection()' or set as a " + # "list directly." + # ) + # try: + # assert self.crop_region in RegionOptions + # except AttributeError: + # raise ValueError( + # f"'{self.crop_path}' not in 'RegionOptions': {RegionOptions.all()}" + # ) + # path: PathLike = override_export_path or Path(self.crop_path) # / (region) + # path.mkdir(exist_ok=True, parents=True) + # resampled_xr: Dataset = self._reprojected_paths[index] + # + # console.log(f"From {self} crop {resampled_xr}") + # cropped: Dataset = crop_xarray( + # xr_time_series=resampled_xr, + # crop_box=RegionOptions.bounding_box(self.crop_region), + # **kwargs, + # ) + # cropped_file_name: str = region_crop_file_name( + # self.crop_region, resampled_xr.name + # ) + # export_path: Path = path / cropped_file_name + # cropped.to_netcdf(export_path) + # if not hasattr(self, "_cropped_paths"): + # self._cropped_paths: list[PathLike] = [] + # self._cropped_paths.append(export_path) + # if return_results: + # return cropped + # else: + # return export_path @dataclass(kw_only=True, repr=False) -class HADsResampler(ResamblerBase): +class HADsResampler(ResamplerBase): """Manage resampling HADs datafiles for modelling. Attributes @@ -367,10 +386,10 @@ class HADsResampler(ResamblerBase): Column name in `input_files` or `input` for `y` coordinates. input_file_extension File extensions to glob `input_files` with. - resample_start_index + start_index First index of file to iterate processing from. - resample_stop_index - Last index of files to iterate processing from as a count from `resample_start_index`. + stop_index + Last index of files to iterate processing from as a count from `start_index`. If `None`, this will simply iterate over all available files. cpm_for_coord_alignment `CPM` `Path` or `Dataset` to match alignment with. @@ -404,11 +423,11 @@ class HADsResampler(ResamblerBase): """ input_path: PathLike | None = RAW_HADS_TASMAX_PATH - output_path: PathLike = RESAMPLING_OUTPUT_PATH / HADS_OUTPUT_LOCAL_PATH + output_path: PathLike = RESAMPLING_OUTPUT_PATH / HADS_OUTPUT_PATH input_files: Iterable[PathLike] | None = None cpus: int | None = None - crop_path: PathLike = RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_LOCAL_PATH + # crop_path: PathLike = RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_LOCAL_PATH final_crs: str = BRITISH_NATIONAL_GRID_EPSG input_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR export_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR @@ -427,22 +446,27 @@ def set_cpm_for_coord_alignment(self) -> None: def to_reprojection( self, + export_path: PathLike, index: int = 0, - override_export_path: Path | None = None, - return_results: bool = False, + # override_export_path: Path | None = None, + # return_results: bool = False, + # export_path: PathLike = path source_to_index: Sequence | None = None, ) -> Path | T_Dataset: - source_path: Path = self._get_source_path( - index=index, source_to_index=source_to_index + # source_path: Path = self._get_source_path( + # index=index, source_to_index=source_to_index + # ) + source_path: Path = _get_source_path( + self, index=index, source_to_index=source_to_index ) - path: PathLike = self.output_path + # path PathLike = self.output_path console.log(f"Setting 'cpm_for_coord_alignment' for {self}") self.set_cpm_for_coord_alignment() console.log(f"Set 'cpm_for_coord_alignment' for {self}") return apply_geo_func( source_path=source_path, func=self._resample_func, - export_folder=path, + export_folder=export_path, # Leaving in case we return to using warp # export_path_as_output_path_kwarg=True, # to_netcdf=False, @@ -457,7 +481,7 @@ def to_reprojection( @dataclass(kw_only=True, repr=False) -class CPMResampler(ResamblerBase): +class CPMResampler(ResamplerBase): """CPM specific changes to HADsResampler. Attributes @@ -492,23 +516,21 @@ class CPMResampler(ResamblerBase): >>> cpm_resampler: CPMResampler = CPMResampler( ... input_path=RAW_CPM_TASMAX_PATH, ... output_path=resample_test_cpm_output_path, - ... input_file_extension=TIF_EXTENSION_STR, ... ) >>> cpm_resampler - >>> pprint(cpm_resampler.input_files) - (...Path('.../tasmax/05/latest/tasmax_...-cpm_uk_2.2km_05_day_19801201-19811130.tif'), - ...Path('.../tasmax/05/latest/tasmax_...-cpm_uk_2.2km_05_day_19811201-19821130.tif'), + (...Path('.../tasmax/01/latest/tasmax_...-cpm_uk_2.2km_01_day_19801201-19811130.nc'), + ...Path('.../tasmax/01/latest/tasmax_...-cpm_uk_2.2km_01_day_19811201-19821130.nc'), ... - ...Path('.../tasmax/05/latest/tasmax_...-cpm_uk_2.2km_05_day_20791201-20801130.tif')) - + ...Path('.../tasmax/01/latest/tasmax_...-cpm_uk_2.2km_01_day_20791201-20801130.nc')) """ input_path: PathLike | None = RAW_CPM_TASMAX_PATH - output_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_OUTPUT_LOCAL_PATH - crop_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_LOCAL_PATH + output_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_OUTPUT_PATH + # crop_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_LOCAL_PATH input_file_x_column_name: str = CPM_RAW_X_COLUMN_NAME input_file_y_column_name: str = CPM_RAW_Y_COLUMN_NAME prior_time_series: PathLike | Dataset | None = None @@ -525,8 +547,11 @@ def to_reprojection( return_results: bool = False, source_to_index: Sequence | None = None, ) -> Path | T_Dataset: - source_path: Path = self._get_source_path( - index=index, source_to_index=source_to_index + # source_path: Path = self._get_source_path( + # index=index, source_to_index=source_to_index + # ) + source_path: Path = _get_source_path( + self, index=index, source_to_index=source_to_index ) path: PathLike = self.output_path console.log(f"Reprojecting index CPM {index}...") @@ -565,32 +590,34 @@ def __getstate__(self): @dataclass(kw_only=True) -class ResamblerManagerBase: +class ResamplerManagerBase: """Base class to inherit for `HADs` and `CPM` resampler managers.""" input_paths: PathLike | Sequence[PathLike] = Path() - resample_paths: PathLike | Sequence[PathLike] = Path() + # resample_paths: PathLike | Sequence[PathLike] = Path() + output_paths: PathLike | Sequence[PathLike] = Path() variables: Sequence[VariableOptions | str] = (VariableOptions.default(),) - crop_regions: tuple[RegionOptions | str, ...] | None = RegionOptions.all() - crop_paths: Sequence[PathLike] | PathLike = Path() + # crop_regions: tuple[RegionOptions | str, ...] | None = RegionOptions.all() + # crop_paths: Sequence[PathLike] | PathLike = Path() sub_path: Path = Path() - resample_start_index: int = 0 - resample_stop_index: int | None = None - crop_start_index: int = 0 - crop_stop_index: int | None = None + start_index: int = 0 + stop_index: int | None = None + # crop_start_index: int = 0 + # crop_stop_index: int | None = None start_date: date | None = None end_date: date | None = None configs: list[HADsResampler | CPMResampler] = field(default_factory=list) config_default_kwargs: dict[str, Any] = field(default_factory=dict) - resampler_class: type[HADsResampler | CPMResampler] | None = None + # resampler_class: type[HADsResampler | CPMResampler] | None = None + calc_class: type[HADsResampler | CPMResampler] | None = None cpus: int | None = None _input_path_dict: dict[Path, str] = field(default_factory=dict) - _resampled_path_dict: dict[PathLike, VariableOptions | str] = field( - default_factory=dict - ) - _cropped_path_dict: dict[PathLike, VariableOptions | str] = field( + _output_path_dict: dict[PathLike, VariableOptions | str] = field( default_factory=dict ) + # _cropped_path_dict: dict[PathLike, VariableOptions | str] = field( + # default_factory=dict + # ) _strict_fail_if_var_in_input_path: bool = True _allow_check_fail: bool = False @@ -601,8 +628,8 @@ class VarirableInBaseImportPathError(Exception): def __post_init__(self) -> None: """Populate config attributes.""" - if not self.crop_regions: - self.crop_regions = () + # if not self.crop_regions: + # self.crop_regions = () self.check_paths() self.total_cpus: int | None = cpu_count() if not self.cpus: @@ -618,20 +645,20 @@ def input_folder(self) -> Path | None: return None @property - def resample_folder(self) -> Path | None: - """Return `self._output_path` set by `set_resample_paths()`.""" + def output_folder(self) -> Path | None: + """Return `self._output_path` set by `set_output_paths()`.""" if hasattr(self, "_output_path"): return Path(self._output_path) else: return None - @property - def crop_folder(self) -> Path | None: - """Return `self._output_path` set by `set_resample_paths()`.""" - if hasattr(self, "_crop_path"): - return Path(self._crop_path) - else: - return None + # @property + # def crop_folder(self) -> Path | None: + # """Return `self._output_path` set by `set_output_paths()`.""" + # if hasattr(self, "_crop_path"): + # return Path(self._crop_path) + # else: + # return None def __repr__(self) -> str: """Summary of `self` configuration as a `str`.""" @@ -641,41 +668,42 @@ def __repr__(self) -> str: f"input_paths_count={len(self.input_paths) if isinstance(self.input_paths, Sequence) else 1})>" ) - def _gen_resample_folder_paths( + def _gen_output_folder_paths( self, path: PathLike, append_input_path_dict: bool = False, - append_resampled_path_dict: bool = False, + append_output_path_dict: bool = False, ) -> Iterator[tuple[Path, Path]]: """Yield paths of resampled `self.variables` and `self.runs`.""" for var in self.variables: input_path: Path = Path(path) / var / self.sub_path - resample_path: Path = Path(path) / var + output_path: Path = Path(path) / var if append_input_path_dict: self._input_path_dict[input_path] = var - if append_resampled_path_dict: - self._resampled_path_dict[resample_path] = var - yield input_path, resample_path + if append_output_path_dict: + self._output_path_dict[output_path] = var + yield input_path, output_path def check_paths( - self, run_set_data_paths: bool = True, run_set_crop_paths: bool = True + self, + run_set_data_paths: bool = True, # run_set_crop_paths: bool = True ): """Check and set `input`, `resample` and `crop` paths.""" if run_set_data_paths: - self.set_resample_paths() - if run_set_crop_paths: - self.set_crop_paths() + self.set_output_paths() + # if run_set_crop_paths: + # self.set_crop_paths() assert isinstance(self.input_paths, Iterable) - assert isinstance(self.resample_paths, Iterable) - if self.crop_paths: - try: - assert isinstance(self.crop_paths, Iterable) - except AssertionError: - raise ValueError( - f"'crop_paths' not iterable for {self}. Hint: try setting 'run_set_crop_paths' to 'True'." - ) - assert len(self.input_paths) == len(self.resample_paths) + assert isinstance(self.output_paths, Iterable) + # if self.crop_paths: + # try: + # assert isinstance(self.crop_paths, Iterable) + # except AssertionError: + # raise ValueError( + # f"'crop_paths' not iterable for {self}. Hint: try setting 'run_set_crop_paths' to 'True'." + # ) + assert len(self.input_paths) == len(self.output_paths) for path in self.input_paths: try: assert Path(path).exists() @@ -701,7 +729,7 @@ def _set_input_paths(self): self._input_path = self.input_paths self.input_paths = tuple( input_path - for input_path, _ in self._gen_resample_folder_paths( + for input_path, _ in self._gen_output_folder_paths( self.input_paths, append_input_path_dict=True ) ) @@ -716,82 +744,82 @@ def _set_input_paths(self): f"set '_strict_fail_if_var_in_input_path' to 'False'." ) - def set_resample_paths(self): - """Propagate `self.resample_paths` if needed.""" + def set_output_paths(self): + """Propagate `self.output_paths` if needed.""" self._set_input_paths() - if isinstance(self.resample_paths, PathLike): - self._output_path = self.resample_paths - self.resample_paths = tuple( - resample_path - for _, resample_path in self._gen_resample_folder_paths( - self.resample_paths, append_resampled_path_dict=True - ) - ) - - def set_crop_paths(self) -> None: - """Propagate `self.resample_paths` if needed.""" - if isinstance(self.crop_paths, PathLike): - self._crop_path = self.crop_paths - self.crop_paths = tuple( - self._gen_crop_folder_paths( - self.crop_paths, append_cropped_path_dict=True + if isinstance(self.output_paths, PathLike): + self._output_path = self.output_paths + self.output_paths = tuple( + output_path + for _, output_path in self._gen_output_folder_paths( + self.output_paths, append_output_path_dict=True ) ) - def yield_configs(self) -> Iterable[ResamblerBase]: + # def set_crop_paths(self) -> None: + # """Propagate `self.output_paths` if needed.""" + # if isinstance(self.crop_paths, PathLike): + # self._crop_path = self.crop_paths + # self.crop_paths = tuple( + # self._gen_crop_folder_paths( + # self.crop_paths, append_cropped_path_dict=True + # ) + # ) + + def yield_configs(self) -> Iterable[ResamplerBase]: """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" self.check_paths() - assert isinstance(self.resample_paths, Iterable) + # try: + assert isinstance(self.output_paths, Iterable) + assert isinstance(self.calc_class, type(ResamplerBase)) for index, var_path in enumerate(self._input_path_dict.items()): - yield self.resampler_class( + yield self.calc_class( input_path=var_path[0], - output_path=self.resample_paths[index], + output_path=self.output_paths[index], variable_name=var_path[1], - resample_start_index=self.resample_start_index, - resample_stop_index=self.resample_stop_index, + start_index=self.start_index, + stop_index=self.stop_index, **self.config_default_kwargs, ) - def yield_crop_configs(self) -> Iterable[ResamblerBase]: - """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" - self.check_paths() - assert isinstance(self.input_paths, Iterable) - assert isinstance(self.resample_paths, Iterable) - assert isinstance(self.crop_paths, Iterable) - for index, input_resample_paths in enumerate(self._resampled_path_dict.items()): - for crop_path, region in self._cropped_path_dict.items(): - yield self.resampler_class( - input_path=input_resample_paths[0], - output_path=self.resample_paths[index], - variable_name=input_resample_paths[1], - resample_start_index=self.resample_start_index, - resample_stop_index=self.resample_stop_index, - crop_path=crop_path, - # Todo: remove below if single crop configs iterate over all - # crop_regions=self.crop_regions, - # crop_regions=(region,), - crop_region=region, - **self.config_default_kwargs, - ) + # def yield_crop_configs(self) -> Iterable[ResamplerBase]: + # """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" + # self.check_paths() + # assert isinstance(self.input_paths, Iterable) + # assert isinstance(self.output_paths, Iterable) + # assert isinstance(self.crop_paths, Iterable) + # for index, input_output_paths in enumerate(self._output_path_dict.items()): + # for crop_path, region in self._cropped_path_dict.items(): + # yield self.calc_class( + # input_path=input_output_paths[0], + # output_path=self.output_paths[index], + # variable_name=input_output_paths[1], + # start_index=self.start_index, + # stop_index=self.stop_index, + # crop_path=crop_path, + # # Todo: remove below if single crop configs iterate over all + # # crop_regions=self.crop_regions, + # # crop_regions=(region,), + # crop_region=region, + # **self.config_default_kwargs, + # ) def __len__(self) -> int: """Return the length of `self.input_files`.""" return ( - len(self.input_paths[self.resample_start_index : self.resample_stop_index]) + len(self.input_paths[self.start_index : self.stop_index]) if isinstance(self.input_paths, Sequence) else 0 ) @property def max_count(self) -> int: - """Maximum length of `self.input_files` ignoring `resample_start_index` and `resample_start_index`.""" + """Maximum length of `self.input_files` ignoring `start_index` and `start_index`.""" return len(self.input_paths) if isinstance(self.input_paths, Sequence) else 0 def __iter__(self) -> Iterator[Path] | None: if isinstance(self.input_paths, Sequence): - for file_path in self.input_paths[ - self.resample_start_index : self.resample_stop_index - ]: + for file_path in self.input_paths[self.start_index : self.stop_index]: yield Path(file_path) else: return None @@ -806,9 +834,9 @@ def __getitem__(self, key: int | slice) -> Path | tuple[Path, ...] | None: else: raise IndexError(f"Can only index with 'int', not: '{key}'") - def execute_resample_configs( + def execute_configs( self, multiprocess: bool = False, cpus: int | None = None - ) -> tuple[ResamblerBase, ...]: + ) -> tuple[ResamplerBase, ...]: """Run all resampler configurations Parameters @@ -818,7 +846,7 @@ def execute_resample_configs( cpus Number of `cpus` to pass to `multiprocess_execute`. """ - resamplers: tuple[ResamblerBase, ...] = tuple(self.yield_configs()) + resamplers: tuple[ResamplerBase, ...] = tuple(self.yield_configs()) results: list[list[Path] | None] = [] if multiprocess: cpus = cpus or self.cpus @@ -831,43 +859,43 @@ def execute_resample_configs( results.append(resampler.execute()) return resamplers - def execute_crop_configs( - self, multiprocess: bool = False, cpus: int | None = None - ) -> tuple[ResamblerBase, ...]: - """Run all resampler configurations - - Parameters - ---------- - multiprocess - If `True` run parameters in `resample_configs` with `multiprocess_execute`. - cpus - Number of `cpus` to pass to `multiprocess_execute`. - """ - croppers: tuple[ResamblerBase, ...] = tuple(self.yield_crop_configs()) - results: list[list[Path] | None] = [] - if multiprocess: - cpus = cpus or self.cpus - if self.total_cpus and cpus: - cpus = min(cpus, self.total_cpus - 1) - results = multiprocess_execute( - croppers, method_name="execute_crops", cpus=cpus - ) - else: - for cropper in croppers: - print(cropper) - results.append(cropper.execute_crops()) - return croppers + # def execute_crop_configs( + # self, multiprocess: bool = False, cpus: int | None = None + # ) -> tuple[ResamplerBase, ...]: + # """Run all resampler configurations + # + # Parameters + # ---------- + # multiprocess + # If `True` run parameters in `resample_configs` with `multiprocess_execute`. + # cpus + # Number of `cpus` to pass to `multiprocess_execute`. + # """ + # croppers: tuple[ResamplerBase, ...] = tuple(self.yield_crop_configs()) + # results: list[list[Path] | None] = [] + # if multiprocess: + # cpus = cpus or self.cpus + # if self.total_cpus and cpus: + # cpus = min(cpus, self.total_cpus - 1) + # results = multiprocess_execute( + # croppers, method_name="execute_crops", cpus=cpus + # ) + # else: + # for cropper in croppers: + # print(cropper) + # results.append(cropper.execute_crops()) + # return croppers @dataclass(kw_only=True, repr=False) -class HADsResamplerManager(ResamblerManagerBase): +class HADsResamplerManager(ResamplerManagerBase): """Class to manage processing HADs resampling. Attributes ---------- input_paths `Path` or `Paths` to `CPM` files to process. If `Path`, will be propegated with files matching - resample_paths + output_paths `Path` or `Paths` to to save processed `CPM` files to. If `Path` will be propagated to match `input_paths`. variables Which `VariableOptions` to include. @@ -877,10 +905,10 @@ class HADsResamplerManager(ResamblerManagerBase): Where to save region crop files. sub_path `Path` to include at the stem of `input_paths`. - resample_start_index + start_index Index to begin iterating input files for `resampling` or `cropping`. - resample_stop_index - Index to to run from `resample_start_index` to when `resampling` or + stop_index + Index to to run from `start_index` to when `resampling` or `cropping`. If `None`, iterate full list of paths. start_date Not yet implemented, but in future from what date to generate start index from. @@ -890,7 +918,7 @@ class HADsResamplerManager(ResamblerManagerBase): List of `HADsResampler` instances to iterate `resampling` or `cropping`. config_default_kwargs Parameters passed to all running `self.configs`. - resampler_class + calc_class `class` to construct all `self.configs` instances with. cpus Number of `cpu` cores to use during multiprocessing. @@ -907,25 +935,25 @@ class HADsResamplerManager(ResamblerManagerBase): ... 'resample_test_hads_output_path') >>> hads_resampler_manager: HADsResamplerManager = HADsResamplerManager( ... variables=VariableOptions.all(), - ... resample_paths=resample_test_hads_output_path, + ... output_paths=resample_test_hads_output_path, ... ) >>> hads_resampler_manager """ input_paths: PathLike | Sequence[PathLike] = RAW_HADS_PATH - resample_paths: PathLike | Sequence[PathLike] = ( - RESAMPLING_OUTPUT_PATH / HADS_OUTPUT_LOCAL_PATH - ) - crop_paths: Sequence[PathLike] | PathLike = ( - RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_LOCAL_PATH + output_paths: PathLike | Sequence[PathLike] = ( + RESAMPLING_OUTPUT_PATH / HADS_OUTPUT_PATH ) + # crop_paths: Sequence[PathLike] | PathLike = ( + # RESAMPLING_OUTPUT_PATH / HADS_CROP_OUTPUT_LOCAL_PATH + # ) sub_path: Path = HADS_SUB_PATH start_date: date = HADS_START_DATE end_date: date = HADS_END_DATE configs: list[HADsResampler] = field(default_factory=list) config_default_kwargs: dict[str, Any] = field(default_factory=dict) - resampler_class: type[HADsResampler] = HADsResampler + calc_class: type[HADsResampler] = HADsResampler cpm_for_coord_alignment: T_Dataset | PathLike = RAW_CPM_TASMAX_PATH cpm_for_coord_alignment_path_converted: bool = False @@ -937,24 +965,24 @@ def __repr__(self) -> str: f"input_paths_count={len(self.input_paths) if isinstance(self.input_paths, Sequence) else 1})>" ) - def _gen_crop_folder_paths( - self, path: PathLike, append_cropped_path_dict: bool = False - ) -> Iterator[Path | None]: - """Return a Generator of paths of `self.variables` and `self.crops`.""" - if not self.crop_regions: - return None - if not self._resampled_path_dict: - self._gen_resample_folder_paths( - self.input_paths, - append_input_path_dict=True, - append_resampled_path_dict=True, - ) - for var in self.variables: - for region in self.crop_regions: - crop_path = Path(path) / "hads" / region / var - if append_cropped_path_dict: - self._cropped_path_dict[crop_path] = region - yield crop_path + # def _gen_crop_folder_paths( + # self, path: PathLike, append_cropped_path_dict: bool = False + # ) -> Iterator[Path | None]: + # """Return a Generator of paths of `self.variables` and `self.crops`.""" + # if not self.crop_regions: + # return None + # if not self._output_path_dict: + # self._gen_output_folder_paths( + # self.input_paths, + # append_input_path_dict=True, + # append_output_path_dict=True, + # ) + # for var in self.variables: + # for region in self.crop_regions: + # crop_path = Path(path) / HADS_OUTPUT_PATH / region / var + # if append_cropped_path_dict: + # self._cropped_path_dict[crop_path] = region + # yield crop_path def set_cpm_for_coord_alignment(self) -> None: """Check if `cpm_for_coord_alignment` is a `Dataset`, process if a `Path`.""" @@ -966,15 +994,15 @@ def set_cpm_for_coord_alignment(self) -> None: def yield_configs(self) -> Iterable[HADsResampler]: """Generate a `CPMResampler` or `HADsResampler` for `self.input_paths`.""" self.check_paths() - assert isinstance(self.resample_paths, Iterable) + assert isinstance(self.output_paths, Iterable) # assert isinstance(self.crop_paths, Iterable) for index, var_path in enumerate(self._input_path_dict.items()): - yield self.resampler_class( + yield self.calc_class( input_path=var_path[0], - output_path=self.resample_paths[index], + output_path=self.output_paths[index], variable_name=var_path[1], - resample_start_index=self.resample_start_index, - resample_stop_index=self.resample_stop_index, + start_index=self.start_index, + stop_index=self.stop_index, cpm_for_coord_alignment=self.cpm_for_coord_alignment, cpm_for_coord_alignment_path_converted=self.cpm_for_coord_alignment_path_converted, **self.config_default_kwargs, @@ -982,14 +1010,14 @@ def yield_configs(self) -> Iterable[HADsResampler]: @dataclass(kw_only=True, repr=False) -class CPMResamplerManager(ResamblerManagerBase): +class CPMResamplerManager(ResamplerManagerBase): """Class to manage processing CPM resampling. Attributes ---------- input_paths `Path` or `Paths` to `CPM` files to process. If `Path`, will be propegated with files matching - resample_paths + output_paths `Path` or `Paths` to to save processed `CPM` files to. If `Path` will be propagated to match `input_paths`. variables Which `VariableOptions` to include. @@ -1001,10 +1029,10 @@ class CPMResamplerManager(ResamblerManagerBase): Where to save region crop files. sub_path `Path` to include at the stem of `input_paths`. - resample_start_index + start_index Index to begin iterating input files for `resampling` or `cropping`. - resample_stop_index - Index to to run from `resample_start_index` to when `resampling` or + stop_index + Index to to run from `start_index` to when `resampling` or `cropping`. If `None`, iterate full list of paths. start_date Not yet implemented, but in future from what date to generate start index from. @@ -1014,7 +1042,7 @@ class CPMResamplerManager(ResamblerManagerBase): List of `HADsResampler` instances to iterate `resampling` or `cropping`. config_default_kwargs Parameters passed to all running `self.configs`. - resampler_class + calc_class `class` to construct all `self.configs` instances with. cpus Number of `cpu` cores to use during multiprocessing. @@ -1027,9 +1055,8 @@ class CPMResamplerManager(ResamblerManagerBase): >>> resample_test_cpm_output_path: Path = getfixture( ... 'resample_test_cpm_output_path') >>> cpm_resampler_manager: CPMResamplerManager = CPMResamplerManager( - ... resample_stop_index=9, - ... resample_paths=resample_test_cpm_output_path, - ... crop_paths=resample_test_cpm_output_path, + ... stop_index=9, + ... output_paths=resample_test_cpm_output_path, ... ) >>> cpm_resampler_manager str: f"input_paths_count={len(self.input_paths) if isinstance(self.input_paths, Sequence) else 1})>" ) - def _gen_resample_folder_paths( + def _gen_output_folder_paths( self, path: PathLike, append_input_path_dict: bool = False, - append_resampled_path_dict: bool = False, + append_output_path_dict: bool = False, cpm_paths: bool = True, ) -> Iterator[tuple[Path, Path]]: """Return a Generator of paths of `self.variables` and `self.runs`.""" @@ -1097,38 +1124,38 @@ def _gen_resample_folder_paths( / run_type / self.sub_path ) - resample_path: Path = ( + output_path: Path = ( Path(path) / VariableOptions.cpm_value(var) / run_type ) else: input_path = Path(path) / var / run_type / self.sub_path - resample_path = Path(path) / var / run_type + output_path = Path(path) / var / run_type if append_input_path_dict: self._input_path_dict[input_path] = var - if append_resampled_path_dict: - self._resampled_path_dict[resample_path] = var - yield input_path, resample_path - - def _gen_crop_folder_paths( - self, - path: PathLike, - append_cropped_path_dict: bool = False, - cpm_paths: bool = True, - ) -> Iterator[Path]: - """Return a Generator of paths of `self.variables` and `self.crops`.""" - for var in self.variables: - for region in self.crop_regions: - for run_type in self.runs: - if cpm_paths: - crop_path: Path = ( - Path(path) - / "cpm" - / region - / VariableOptions.cpm_value(var) - / run_type - ) - else: - crop_path: Path = Path(path) / "cpm" / region / var / run_type - if append_cropped_path_dict: - self._cropped_path_dict[crop_path] = region - yield crop_path + if append_output_path_dict: + self._output_path_dict[output_path] = var + yield input_path, output_path + + # def _gen_crop_folder_paths( + # self, + # path: PathLike, + # append_cropped_path_dict: bool = False, + # cpm_paths: bool = True, + # ) -> Iterator[Path]: + # """Return a Generator of paths of `self.variables` and `self.crops`.""" + # for var in self.variables: + # for region in self.crop_regions: + # for run_type in self.runs: + # if cpm_paths: + # crop_path: Path = ( + # Path(path) + # / CPM_OUTPUT_PATH + # / region + # / VariableOptions.cpm_value(var) + # / run_type + # ) + # else: + # crop_path: Path = Path(path) / CPM_OUTPUT_PATH / region / var / run_type + # if append_cropped_path_dict: + # self._cropped_path_dict[crop_path] = region + # yield crop_path diff --git a/python/clim_recal/utils/core.py b/python/clim_recal/utils/core.py index 919c472d..1f2662b7 100644 --- a/python/clim_recal/utils/core.py +++ b/python/clim_recal/utils/core.py @@ -25,9 +25,13 @@ Union, ) +# try: from rich.console import Console from tqdm import TqdmExperimentalWarning, tqdm +# except ImportError: +# pass + logger = getLogger(__name__) console = Console() @@ -763,6 +767,18 @@ def climate_data_mount_path( return path +def _get_source_path( + instance, index: int, source_to_index: Sequence | None = None +) -> Path: + """Return a path indexed from `source_to_index` or `self`.""" + if source_to_index is None: + return instance[index] + elif isinstance(source_to_index, str): + return getattr(instance, source_to_index)[index] + else: + return source_to_index[index] + + def is_climate_data_mounted(mount_path: PathLike | None = None) -> bool: """Check if `CLIMATE_DATA_MOUNT_PATH` is mounted. diff --git a/python/clim_recal/utils/data.py b/python/clim_recal/utils/data.py index 274a8526..2bb7446b 100644 --- a/python/clim_recal/utils/data.py +++ b/python/clim_recal/utils/data.py @@ -40,6 +40,12 @@ CPM_RESOLUTION_METERS: Final[int] = 2200 +HADS_OUTPUT_PATH: Final[Path] = Path("hads") +CPM_OUTPUT_PATH: Final[Path] = Path("cpm") + +CPM_CROP_OUTPUT_PATH: Final[Path] = Path("cpm-crop") +HADS_CROP_OUTPUT_PATH: Final[Path] = Path("hads-crop") + AuthorshipType = Union[ str | tuple[str, ...], dict[str, str] | dict[str, dict[str, str]] | dict[str, Collection[str]] diff --git a/python/clim_recal/utils/xarray.py b/python/clim_recal/utils/xarray.py index f5264f08..5f46cf40 100644 --- a/python/clim_recal/utils/xarray.py +++ b/python/clim_recal/utils/xarray.py @@ -4,7 +4,7 @@ from os import PathLike from pathlib import Path from tempfile import NamedTemporaryFile, _TemporaryFileWrapper -from typing import Any, Callable, Final +from typing import Any, Callable, Final, Iterable, Iterator, Sequence import numpy as np import rioxarray # nopycln: import @@ -24,6 +24,7 @@ from osgeo.gdal import config_option as config_GDAL_option from rasterio.enums import Resampling from tqdm import tqdm +from tqdm.rich import trange from xarray import CFTimeIndex, DataArray, Dataset, cftime_range, open_dataset from xarray.coding.calendar_ops import convert_calendar from xarray.core.types import ( @@ -1312,3 +1313,32 @@ def region_crop_file_name( else: final_suffix = str(file_name) return "_".join(("crop", str(crop_region), final_suffix)) + + +def trange_wrapper( + instance: Sequence, + calc: Callable, + start: int, + stop: int | None, + step: int, + default_export_path: Path, + override_export_path: Path | None = None, + source_to_index: Iterable | None = None, +) -> Iterator[Path | T_Dataset]: + # export_paths: list[Path | T_Dataset] = [] + if stop is None: + stop = len(instance) + export_path: Path = override_export_path or default_export_path + for index in trange(start, stop, step): + # export_paths.append( + # method( + yield calc( + # path=export_path, + index=index, + # override_export_path=override_export_path, + export_path=export_path, + source_to_index=source_to_index, + ) + # ) + # ) + # return export_paths diff --git a/python/conftest.py b/python/conftest.py index 3c3a8cc2..3e9abc1c 100644 --- a/python/conftest.py +++ b/python/conftest.py @@ -13,7 +13,6 @@ from clim_recal.config import ClimRecalConfig from clim_recal.debiasing.debias_wrapper import RegionOptions -from clim_recal.resample import CPM_OUTPUT_LOCAL_PATH, HADS_OUTPUT_LOCAL_PATH from clim_recal.utils.core import ( ISO_DATE_FORMAT_STR, check_package_path, @@ -22,7 +21,7 @@ is_platform_darwin, results_path, ) -from clim_recal.utils.data import BoundsTupleType +from clim_recal.utils.data import CPM_OUTPUT_PATH, HADS_OUTPUT_PATH, BoundsTupleType from clim_recal.utils.server import CondaLockFileManager from clim_recal.utils.xarray import ( GLASGOW_GEOM_LOCAL_PATH, @@ -460,14 +459,14 @@ def clim_runner( def resample_test_cpm_output_path( test_runs_output_path: Path, ) -> Path: - return test_runs_output_path / CPM_OUTPUT_LOCAL_PATH + return test_runs_output_path / CPM_OUTPUT_PATH @pytest.fixture def resample_test_hads_output_path( test_runs_output_path: Path, ) -> Path: - return test_runs_output_path / HADS_OUTPUT_LOCAL_PATH + return test_runs_output_path / HADS_OUTPUT_PATH @pytest.fixture diff --git a/python/tests/test_crop.py b/python/tests/test_crop.py new file mode 100644 index 00000000..10382a8f --- /dev/null +++ b/python/tests/test_crop.py @@ -0,0 +1,109 @@ +from pathlib import Path +from typing import Any + +import pytest +from numpy.testing import assert_allclose +from xarray import open_dataset +from xarray.core.types import T_Dataset + +from clim_recal.crop import ( + CPMRegionCropManager, + CPMRegionCropper, + HADsRegionCropManager, + HADsRegionCropper, + RegionCropperManagerBase, +) +from clim_recal.resample import ( + CPMResamplerManager, + HADsResamplerManager, + ResamplerBase, + ResamplerManagerBase, +) +from clim_recal.utils.data import ( + CPM_CROP_OUTPUT_PATH, + HADS_CROP_OUTPUT_PATH, + RegionOptions, + RunOptions, +) + + +@pytest.mark.localcache +@pytest.mark.slow +@pytest.mark.mount +@pytest.mark.parametrize("crop_manager", (HADsRegionCropManager, CPMRegionCropManager)) +# @pytest.mark.parametrize("multiprocess", (False, True)) +def test_execute_crop_configs( + crop_manager: type[RegionCropperManagerBase], + # multiprocess: bool, + tmp_path: Path, + resample_test_hads_output_path: Path, + resample_test_cpm_output_path: Path, + tasmax_hads_1980_raw_path: Path, + tasmax_cpm_1980_raw_path: Path, + tasmax_cpm_1980_converted_path: Path, +) -> None: + """Test running default HADs spatial projection.""" + multiprocess: bool = False + input_path: Path + crop_path: Path + resampler_manager_kwargs: dict[str, Any] = {} + crop_manager_kwargs: dict[str, Any] = {} + resampler: ResamplerManagerBase + if crop_manager is HADsRegionCropManager: + input_path = tasmax_hads_1980_raw_path.parent + crop_path = resample_test_hads_output_path / "manage" / HADS_CROP_OUTPUT_PATH + resampler_manager_kwargs["cpm_for_coord_alignment"] = ( + tasmax_cpm_1980_converted_path + ) + resampler_manager_kwargs["cpm_for_coord_alignment_path_converted"] = True + resampler = HADsResamplerManager( + input_paths=input_path, + output_paths=tmp_path, + # crop_paths=crop_path, + stop_index=1, + _strict_fail_if_var_in_input_path=False, + **resampler_manager_kwargs, + ) + else: + input_path = tasmax_cpm_1980_raw_path.parent + crop_path = resample_test_cpm_output_path / "manage" / CPM_CROP_OUTPUT_PATH + resampler_manager_kwargs["runs"] = crop_manager_kwargs["runs"] = ( + RunOptions.ONE, + ) + resampler = CPMResamplerManager( + input_paths=input_path, + output_paths=tmp_path, + stop_index=1, + _strict_fail_if_var_in_input_path=False, + **resampler_manager_kwargs, + ) + crop_config: RegionCropperManagerBase = crop_manager( + input_paths=tmp_path, + output_paths=crop_path, + stop_index=1, + _strict_fail_if_var_in_input_path=False, + **crop_manager_kwargs, + ) + if isinstance(resampler, HADsResamplerManager): + resampler.set_cpm_for_coord_alignment() + + _: tuple[ResamplerBase, ...] = resampler.execute_configs(multiprocess=multiprocess) + region_crops: tuple[HADsRegionCropper | CPMRegionCropper, ...] = ( + crop_config.execute_configs(multiprocess=multiprocess) + ) + region_crop_dict: dict[str, tuple[Path, ...]] = { + crop.crop_region: tuple(Path(crop.output_paths).iterdir()) + for crop in region_crops + } + assert len(region_crop_dict) == len(region_crops) == len(RegionOptions) + for region, path in region_crop_dict.items(): + cropped_region: T_Dataset = open_dataset(path[0]) + bbox = RegionOptions.bounding_box(region) + assert_allclose(cropped_region["x"].max(), bbox.xmax, rtol=0.1) + assert_allclose(cropped_region["x"].min(), bbox.xmin, rtol=0.1) + assert_allclose(cropped_region["y"].max(), bbox.ymax, rtol=0.1) + assert_allclose(cropped_region["y"].min(), bbox.ymin, rtol=0.1) + if isinstance(crop_config, HADsResamplerManager): + assert len(cropped_region["time"]) == 31 + else: + assert len(cropped_region["time"]) == 365 diff --git a/python/tests/test_resample.py b/python/tests/test_resample.py index fc6d9557..7752ade5 100644 --- a/python/tests/test_resample.py +++ b/python/tests/test_resample.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, Final +from typing import Final import numpy as np import pytest @@ -9,16 +9,13 @@ from xarray.core.types import T_Dataset from clim_recal.resample import ( - CPM_CROP_OUTPUT_LOCAL_PATH, - HADS_CROP_OUTPUT_LOCAL_PATH, CPMResampler, CPMResamplerManager, HADsResampler, HADsResamplerManager, - ResamblerManagerBase, + ResamplerBase, ) from clim_recal.utils.core import CLI_DATE_FORMAT_STR -from clim_recal.utils.data import RegionOptions, RunOptions from clim_recal.utils.xarray import ( FINAL_CONVERTED_CPM_WIDTH, FINAL_RESAMPLE_LON_COL, @@ -170,11 +167,11 @@ def test_execute_resample_configs( test_config = HADsResamplerManager( input_paths=tasmax_hads_1980_raw_path.parent, resample_paths=tmp_path, - crop_paths=tmp_path, + # crop_paths=tmp_path, stop_index=1, ) - resamplers: tuple[HADsResampler | CPMResampler, ...] = ( - test_config.execute_resample_configs(multiprocess=multiprocess) + resamplers: tuple[ResamplerBase, ...] = test_config.execute_configs( + multiprocess=multiprocess ) export: T_Dataset = open_dataset(resamplers[0][0]) assert len(export.time) == 31 @@ -185,68 +182,68 @@ def test_execute_resample_configs( ).all() -@pytest.mark.localcache -@pytest.mark.slow -@pytest.mark.mount -@pytest.mark.parametrize("manager", (CPMResamplerManager, HADsResamplerManager)) -# @pytest.mark.parametrize("multiprocess", (False, True)) -def test_execute_crop_configs( - manager: ResamblerManagerBase, - # multiprocess: bool, - tmp_path: Path, - resample_test_hads_output_path: Path, - resample_test_cpm_output_path: Path, - tasmax_hads_1980_raw_path: Path, - tasmax_cpm_1980_raw_path: Path, - tasmax_cpm_1980_converted_path: Path, -) -> None: - """Test running default HADs spatial projection.""" - multiprocess: bool = False - input_path: Path - crop_path: Path - manager_kwargs: dict[str, Any] = {} - if manager is HADsResamplerManager: - input_path = tasmax_hads_1980_raw_path.parent - crop_path = ( - resample_test_hads_output_path / "manage" / HADS_CROP_OUTPUT_LOCAL_PATH - ) - manager_kwargs["cpm_for_coord_alignment"] = tasmax_cpm_1980_converted_path - manager_kwargs["cpm_for_coord_alignment_path_converted"] = True - else: - input_path = tasmax_cpm_1980_raw_path.parent - crop_path = ( - resample_test_cpm_output_path / "manage" / CPM_CROP_OUTPUT_LOCAL_PATH - ) - manager_kwargs["runs"] = (RunOptions.ONE,) - test_config: ResamblerManagerBase = manager( - input_paths=input_path, - resample_paths=tmp_path, - crop_paths=crop_path, - stop_index=1, - _strict_fail_if_var_in_input_path=False, - **manager_kwargs, - ) - if isinstance(test_config, HADsResamplerManager): - test_config.set_cpm_for_coord_alignment = tasmax_cpm_1980_converted_path - - _: tuple[HADsResampler | CPMResampler, ...] = test_config.execute_resample_configs( - multiprocess=multiprocess - ) - region_crops: tuple[HADsResampler | CPMResampler, ...] = ( - test_config.execute_crop_configs(multiprocess=multiprocess) - ) - region_crop_dict: dict[str, tuple[Path, ...]] = { - crop.crop_region: tuple(Path(crop.crop_path).iterdir()) for crop in region_crops - } - assert len(region_crop_dict) == len(region_crops) == len(RegionOptions) - for region, path in region_crop_dict.items(): - cropped_region: T_Dataset = open_dataset(path[0]) - bbox = RegionOptions.bounding_box(region) - assert_allclose(cropped_region["x"].max(), bbox.xmax, rtol=0.1) - assert_allclose(cropped_region["x"].min(), bbox.xmin, rtol=0.1) - assert_allclose(cropped_region["y"].max(), bbox.ymax, rtol=0.1) - assert_allclose(cropped_region["y"].min(), bbox.ymin, rtol=0.1) - if isinstance(test_config, HADsResamplerManager): - assert len(cropped_region["time"]) == 31 - else: - assert len(cropped_region["time"]) == 365 +# @pytest.mark.localcache +# @pytest.mark.slow +# @pytest.mark.mount +# @pytest.mark.parametrize("manager", (CPMResamplerManager, HADsResamplerManager)) +# # @pytest.mark.parametrize("multiprocess", (False, True)) +# def test_execute_crop_configs( +# manager: ResamplerManagerBase, +# # multiprocess: bool, +# tmp_path: Path, +# resample_test_hads_output_path: Path, +# resample_test_cpm_output_path: Path, +# tasmax_hads_1980_raw_path: Path, +# tasmax_cpm_1980_raw_path: Path, +# tasmax_cpm_1980_converted_path: Path, +# ) -> None: +# """Test running default HADs spatial projection.""" +# multiprocess: bool = False +# input_path: Path +# crop_path: Path +# manager_kwargs: dict[str, Any] = {} +# if manager is HADsResamplerManager: +# input_path = tasmax_hads_1980_raw_path.parent +# crop_path = ( +# resample_test_hads_output_path / "manage" / HADS_CROP_OUTPUT_LOCAL_PATH +# ) +# manager_kwargs["cpm_for_coord_alignment"] = tasmax_cpm_1980_converted_path +# manager_kwargs["cpm_for_coord_alignment_path_converted"] = True +# else: +# input_path = tasmax_cpm_1980_raw_path.parent +# crop_path = ( +# resample_test_cpm_output_path / "manage" / CPM_CROP_OUTPUT_LOCAL_PATH +# ) +# manager_kwargs["runs"] = (RunOptions.ONE,) +# test_config: ResamplerManagerBase = manager( +# input_paths=input_path, +# resample_paths=tmp_path, +# crop_paths=crop_path, +# stop_index=1, +# _strict_fail_if_var_in_input_path=False, +# **manager_kwargs, +# ) +# if isinstance(test_config, HADsResamplerManager): +# test_config.set_cpm_for_coord_alignment = tasmax_cpm_1980_converted_path +# +# _: tuple[HADsResampler | CPMResampler, ...] = test_config.execute_resample_configs( +# multiprocess=multiprocess +# ) +# region_crops: tuple[HADsResampler | CPMResampler, ...] = ( +# test_config.execute_crop_configs(multiprocess=multiprocess) +# ) +# region_crop_dict: dict[str, tuple[Path, ...]] = { +# crop.crop_region: tuple(Path(crop.crop_path).iterdir()) for crop in region_crops +# } +# assert len(region_crop_dict) == len(region_crops) == len(RegionOptions) +# for region, path in region_crop_dict.items(): +# cropped_region: T_Dataset = open_dataset(path[0]) +# bbox = RegionOptions.bounding_box(region) +# assert_allclose(cropped_region["x"].max(), bbox.xmax, rtol=0.1) +# assert_allclose(cropped_region["x"].min(), bbox.xmin, rtol=0.1) +# assert_allclose(cropped_region["y"].max(), bbox.ymax, rtol=0.1) +# assert_allclose(cropped_region["y"].min(), bbox.ymin, rtol=0.1) +# if isinstance(test_config, HADsResamplerManager): +# assert len(cropped_region["time"]) == 31 +# else: +# assert len(cropped_region["time"]) == 365 diff --git a/python/tests/test_utils_xarray.py b/python/tests/test_utils_xarray.py index bc4194c3..09e14468 100644 --- a/python/tests/test_utils_xarray.py +++ b/python/tests/test_utils_xarray.py @@ -11,12 +11,8 @@ from xarray import Dataset, open_dataset from xarray.core.types import T_DataArray, T_Dataset -from clim_recal.resample import ( - CPM_CROP_OUTPUT_LOCAL_PATH, - HADS_CROP_OUTPUT_LOCAL_PATH, - CPMResampler, - HADsResampler, -) +from clim_recal.crop import CPM_CROP_OUTPUT_PATH, HADS_CROP_OUTPUT_PATH +from clim_recal.resample import CPMResampler, HADsResampler from clim_recal.utils.core import ( CLI_DATE_FORMAT_STR, DateType, @@ -407,7 +403,7 @@ def test_crop_xarray( if data_type == HadUKGrid: output_path: Path = resample_test_hads_output_path / config crop_path: Path = ( - resample_test_hads_output_path / config / HADS_CROP_OUTPUT_LOCAL_PATH + resample_test_hads_output_path / config / HADS_CROP_OUTPUT_PATH ) test_config = HADsResampler( @@ -418,9 +414,7 @@ def test_crop_xarray( else: assert data_type == UKCPLocalProjections output_path: Path = resample_test_cpm_output_path / config - crop_path: Path = ( - resample_test_cpm_output_path / config / CPM_CROP_OUTPUT_LOCAL_PATH - ) + crop_path: Path = resample_test_cpm_output_path / config / CPM_CROP_OUTPUT_PATH test_config = CPMResampler( input_path=tasmax_cpm_1980_raw_path.parent, output_path=output_path,