Skip to content

Commit

Permalink
fix(gfs-etl): Use open_mfdataset
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Jun 12, 2024
1 parent 051b99d commit 0f41606
Showing 1 changed file with 18 additions and 35 deletions.
53 changes: 18 additions & 35 deletions containers/gfs/download_combine_gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,6 @@ def convert_file(file: str, outfolder: str) -> str | None:
os.remove(file)
return outfile

def _combine_datasets(dsp1: str, dsp2: str) -> str:
"""Consolidate two datasets into a single one.
Args:
dsp1: Path to the first dataset.
dsp2: Path to the second dataset.
step: The dimension to concatenate on.
"""
cdsp = pathlib.Path(dsp1).parent / f"{uuid.uuid4().hex}.zarr"
with(
zarr.DirectoryStore(cdsp) as store,
):
ds1 = xr.open_zarr(dsp1)
ds2 = xr.open_zarr(dsp2)
if ds1.coords["init_time"].values == ds2.coords["init_time"].values:
ds = xr.concat([ds1, ds2], dim="step")
elif ds1.coords["step"].values == ds2.coords["step"].values:
ds = xr.concat([ds1, ds2], dim="init_time")
else:
raise ValueError("No combination of datasets implemented for given dimensions.")
ds.to_zarr(store, compute=True)
shutil.rmtree(dsp1, ignore_errors=True)
shutil.rmtree(dsp2, ignore_errors=True)
return cdsp.as_posix()

def run(path: str, config: Config, date: dt.date, run: str) -> str:
"""Download GFS data, combine, and save for a single run."""
# Dowload files first
Expand All @@ -198,13 +173,12 @@ def run(path: str, config: Config, date: dt.date, run: str) -> str:
)
pool.close()
pool.join()
results = [r for r in results if r is not None]
else:
results: list[str] = []
for url in urls:
result = download_url(url, f"{path}/{date:%Y%m%d}/{run}/")
if result is not None:
results.append(result)

not_done = False
except Exception as e:
log.error(e)
Expand All @@ -217,18 +191,27 @@ def run(path: str, config: Config, date: dt.date, run: str) -> str:

run_files: list[str] = list(glob(f"{path}/{date:%Y%m%d}/{run}/*{run}.*.grib2"))
dataset_paths: list[str] = []
for file in run_files:
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=path + "/.work")
if ds_path is not None:
dataset_paths.append(ds_path)
# Only paralellize if there are more files than cpus
if len(run_files) > cpu_count():
pool = Pool(cpu_count())
dataset_paths = pool.starmap(
convert_file,
[(file, path + "/.work") for file in run_files],
)
pool.close()
pool.join()
dataset_paths = [dp for dp in dataset_paths if dp is not None]
else:
for file in run_files:
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=path + "/.work")
if ds_path is not None:
dataset_paths.append(ds_path)
log.debug(f"Converted {len(dataset_paths)} files for {date}:{run}")

run_ds_path: str = functools.reduce(_combine_datasets, dataset_paths)
log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{run}")

log.info("Combining run datasets and applying compression")
run_ds: xr.Dataset = xr.open_zarr(run_ds_path)
run_ds: xr.Dataset = xr.open_mfdataset(dataset_paths)
encoding = {var: {"compressor": Blosc2("zstd", clevel=9)} for var in run_ds.data_vars}
encoding["init_time"] = {"units": "nanoseconds since 1970-01-01"}
outpath = f"{path}/{date:%Y%m%d}/{date:%Y%m%d}{run}.zarr.zip"
Expand Down

0 comments on commit 0f41606

Please sign in to comment.