From 0f416066f519cc017071643519b0e8034df47ae4 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 12 Jun 2024 15:50:26 +0100 Subject: [PATCH] fix(gfs-etl): Use open_mfdataset --- containers/gfs/download_combine_gfs.py | 53 +++++++++----------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/containers/gfs/download_combine_gfs.py b/containers/gfs/download_combine_gfs.py index 0f1cd27..e24df8e 100644 --- a/containers/gfs/download_combine_gfs.py +++ b/containers/gfs/download_combine_gfs.py @@ -150,31 +150,6 @@ def convert_file(file: str, outfolder: str) -> str | None: os.remove(file) return outfile -def _combine_datasets(dsp1: str, dsp2: str) -> str: - """Consolidate two datasets into a single one. - - Args: - dsp1: Path to the first dataset. - dsp2: Path to the second dataset. - step: The dimension to concatenate on. - """ - cdsp = pathlib.Path(dsp1).parent / f"{uuid.uuid4().hex}.zarr" - with( - zarr.DirectoryStore(cdsp) as store, - ): - ds1 = xr.open_zarr(dsp1) - ds2 = xr.open_zarr(dsp2) - if ds1.coords["init_time"].values == ds2.coords["init_time"].values: - ds = xr.concat([ds1, ds2], dim="step") - elif ds1.coords["step"].values == ds2.coords["step"].values: - ds = xr.concat([ds1, ds2], dim="init_time") - else: - raise ValueError("No combination of datasets implemented for given dimensions.") - ds.to_zarr(store, compute=True) - shutil.rmtree(dsp1, ignore_errors=True) - shutil.rmtree(dsp2, ignore_errors=True) - return cdsp.as_posix() - def run(path: str, config: Config, date: dt.date, run: str) -> str: """Download GFS data, combine, and save for a single run.""" # Dowload files first @@ -198,13 +173,12 @@ def run(path: str, config: Config, date: dt.date, run: str) -> str: ) pool.close() pool.join() + results = [r for r in results if r is not None] else: - results: list[str] = [] for url in urls: result = download_url(url, f"{path}/{date:%Y%m%d}/{run}/") if result is not None: results.append(result) - not_done = False except Exception as e: log.error(e) @@ -217,18 +191,27 @@ def run(path: str, config: Config, date: dt.date, run: str) -> str: run_files: list[str] = list(glob(f"{path}/{date:%Y%m%d}/{run}/*{run}.*.grib2")) dataset_paths: list[str] = [] - for file in run_files: - log.debug(f"Converting {file}") - ds_path = convert_file(file=file, outfolder=path + "/.work") - if ds_path is not None: - dataset_paths.append(ds_path) + # Only paralellize if there are more files than cpus + if len(run_files) > cpu_count(): + pool = Pool(cpu_count()) + dataset_paths = pool.starmap( + convert_file, + [(file, path + "/.work") for file in run_files], + ) + pool.close() + pool.join() + dataset_paths = [dp for dp in dataset_paths if dp is not None] + else: + for file in run_files: + log.debug(f"Converting {file}") + ds_path = convert_file(file=file, outfolder=path + "/.work") + if ds_path is not None: + dataset_paths.append(ds_path) log.debug(f"Converted {len(dataset_paths)} files for {date}:{run}") - run_ds_path: str = functools.reduce(_combine_datasets, dataset_paths) - log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{run}") log.info("Combining run datasets and applying compression") - run_ds: xr.Dataset = xr.open_zarr(run_ds_path) + run_ds: xr.Dataset = xr.open_mfdataset(dataset_paths) encoding = {var: {"compressor": Blosc2("zstd", clevel=9)} for var in run_ds.data_vars} encoding["init_time"] = {"units": "nanoseconds since 1970-01-01"} outpath = f"{path}/{date:%Y%m%d}/{date:%Y%m%d}{run}.zarr.zip"