From e78f080736c9f2584a713418f373db55f2108154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Cousin?= <42619415+remicousin@users.noreply.github.com> Date: Mon, 26 Feb 2024 14:47:02 -0500 Subject: [PATCH] Remic/entozud (#404) * pandas not in use * arguments rather than loop * refactoring of enactstozarr to enable appending to zarr as well as create zarr * typo * let to_zar fail and explain path it not a directory * no more shutil * removing useless legacy code * not general enough * removing strftime from printing * f strings are now short enough to fit on one line * convert a datetime to datatime64 before being able to compare it to such * set_up_dims words just as well with np.datetime64 --- enacts/enactstozarr.py | 245 +++++++++++++++++++++++++++++++++-------- 1 file changed, 201 insertions(+), 44 deletions(-) diff --git a/enacts/enactstozarr.py b/enacts/enactstozarr.py index 65e068787..5dfede5d7 100644 --- a/enacts/enactstozarr.py +++ b/enacts/enactstozarr.py @@ -1,20 +1,85 @@ import os -import shutil import sys import numpy as np import xarray as xr import datetime as dt from pathlib import Path import pingrid -import pandas as pd from functools import partial +import calc CONFIG = pingrid.load_config(os.environ["CONFIG"]) +VARIABLE = sys.argv[1] #e.g. precip, tmax, tmin -- check your config +TIME_RES = sys.argv[2] #e.g. daily, or dekadal -- check in your config +INPUT_PATH = ( + f'{CONFIG["datasets"][TIME_RES]["nc_path"]}' + f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][0]}' +) +OUTPUT_PATH = ( + ( + f'{CONFIG["datasets"][TIME_RES]["zarr_path"]}' + f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][0]}' + ) if CONFIG['datasets'][TIME_RES]['vars'][VARIABLE][1] is None + else ( + f'{CONFIG["datasets"][TIME_RES]["zarr_path"]}' + f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][1]}' + ) +) +CHUNKS = CONFIG['datasets'][TIME_RES]['chunks'] +ZARR_RESOLUTION = CONFIG['datasets'][TIME_RES]["zarr_resolution"] + def set_up_dims(xda, time_res="daily"): + """Sets up spatial and temporal dimensions from a set of time-dependent netcdf + ENACTS files. + + To be used in `preprocess` of `xarray.open_mfdataset` . + Using some Ingrid naming conventions. + + Parameters + ---------- + xda : DataArray + from the list of `paths` from `xarray.open_mfdataset` + time_res : str, optional + indicates the time resolution of the set of time-dependent files. + Default is "daily" and other option is "dekadal" - datestr = Path(xda.encoding["source"]).name.split("_")[2] + Returns + ------- + DataArray of X (longitude), Y (latitude) and T (time, daily or dekadal) + + See Also + -------- + xarray.open_mfdataset, filename2datetime64 + """ + return xda.expand_dims(T = [filename2datetime64( + Path(xda.encoding["source"]), time_res=time_res, + )]).rename({'Lon': 'X','Lat': 'Y'}) + + +def filename2datetime64(file, time_res="daily"): + """Return time associated with an ENACTS filename in datetime + + In case of dekadal, returns the first day of the dekad (i.e. 1, 11, or 21) + + Parameters + ---------- + file : pathlib(Path) + file to extract date from name + time_res : str, optional + indicates the time resolution of the file. + Default is "daily" and other option is "dekadal" + + Returns + ------- + numpy.datetime64 + + See Also + -------- + set_up_dims, convert + """ + datestr = file.name.split("_")[2] year = int(datestr[0:4]) month = int(datestr[4:6]) if time_res == "daily": @@ -25,12 +90,25 @@ def set_up_dims(xda, time_res="daily"): raise Exception( "time resolution must be 'daily' or 'dekadal' " ) - xda = xda.expand_dims(T = [dt.datetime(year, month, day)]) - xda = xda.rename({'Lon': 'X','Lat': 'Y'}) - - return xda + return np.datetime64(dt.datetime(year, month, day)) + def regridding(data, resolution): + """Spatial regridding of `data` to `resolution` . + + Does nothing if current resolution is close (according to numpy) to resolution. + + Parameters + ---------- + data : DataArray + data of X and Y to regrid. + resolution : real + resolution to regrid to. + + Returns + ------- + DataArray of `data` regridded to `resolution` . + """ if not np.isclose(data['X'][1] - data['X'][0], resolution): # TODO this method of regridding is inaccurate because it pretends # that (X, Y) define a Euclidian space. In reality, grid cells @@ -41,55 +119,134 @@ def regridding(data, resolution): # and look into xESMF. # # [1] https://climatedataguide.ucar.edu/climate-data-tools-and-analysis/regridding-overview - print("Your data will be regridded. Refer to function documentation for more information on this.") + print(( + f"Your data will be regridded." + f"Refer to function documentation for more information on this." + )) data = data.interp( X=np.arange(data.X.min(), data.X.max() + resolution, resolution), Y=np.arange(data.Y.min(),data.Y.max() + resolution, resolution), ) return data -def convert(variable, time_res="daily"): - print(f"converting files for: {time_res} {variable}") - zarr_resolution = CONFIG['datasets'][time_res]["zarr_resolution"] - input_path, output_path, var_name = CONFIG['datasets'][time_res]['vars'][variable] - nc_path = f"{CONFIG['datasets'][time_res]['nc_path']}{input_path}" - netcdf = list(sorted(Path(nc_path).glob("*.nc"))) +def nc2xr(paths, var_name, time_res="daily", zarr_resolution=None, chunks={}): + """Open mutiple daily or dekadal ENACTS files as a single dataset. + + Optionally spatially regrids and + coerces all arrays in this dataset into dask arrays with the given chunks. + + Parameters + ---------- + paths : str or nested sequence of paths + Either a string glob in the form "path/to/my/files/*.nc" + or an explicit list of files to open. Paths can be given as strings + or as pathlib Paths. + var_name : str + name of the ENACTS variable in the nc files + time_res : str, optional + indicates the time resolution of the set of files. + Default is "daily" and other option is "dekadal" + zarr_resolution : real, optional + spatial resolution to regrid to. + chunks : int, tuple of int, "auto" or mapping of hashable to int, optional + Chunk sizes along each dimension X, Y and T. + Returns + ------- + Xarray.Dataset containing variable `var_name` and coordinates X, Y and T + + See Also + -------- + xarray.open_mfdataset, set_up_dims, regridding, xarray.DataArray.chunk + """ data = xr.open_mfdataset( - netcdf, - preprocess = partial(set_up_dims, time_res=time_res), - parallel=False + paths, + preprocess=partial(set_up_dims, time_res=time_res), + parallel=False, )[var_name] if zarr_resolution != None: - print("attempting regrid") + print("attempting regrid") data = regridding(data, zarr_resolution) + return xr.Dataset().merge(data.chunk(chunks=chunks)) - data = data.chunk(chunks=CONFIG['datasets'][time_res]['chunks']) - - if output_path == None: - zarr = f"{CONFIG['datasets'][time_res]['zarr_path']}{input_path}" + +def convert( + input_path, + output_path, + var_name, + time_res="daily", + zarr_resolution=None, + chunks={} +): + """Converts a set of ENACTS files into zarr store. + + Either create a new one or append an existing one + + Parameters + ---------- + input_path : str + path where the ENACTS nc files are + output_path : str + path where the zarr store is (to append to) or will be (to create). + To create, (last element of the) path is expected not to exist. + To append, path is expected to point to a zarr store. + var_name : str + name of the ENACTS variable in the nc files + time_res : str, optional + indicates the time resolution of the set of files. + Default is "daily" and other option is "dekadal" + zarr_resolution : real, optional + spatial resolution to regrid to. + chunks : int, tuple of int, "auto" or mapping of hashable to int, optional + Chunk sizes along each dimension X, Y and T. + + Returns + ------- + output_path : where the zarr store has been written + + See Also + -------- + calc.read_zarr_data, filename2datetime64, nc2xr, xarray.Dataset.to_zarr + """ + print(f"converting files for: {time_res} {var_name}") + netcdf = list(sorted(Path(input_path).glob("*.nc"))) + if Path(output_path).is_dir() : + current_zarr = calc.read_zarr_data(output_path) + last_T_zarr = current_zarr["T"][-1] + last_T_nc = filename2datetime64(netcdf[-1], time_res=time_res) + if last_T_nc < last_T_zarr.values : + print(f'nc set ({last_T_nc}) ends before zarrs ({last_T_zarr})') + print("Not changing existing zarr") + elif last_T_nc == last_T_zarr.values : + print(f'both sets end same date: {last_T_nc}') + print("Not changing existing zarr") + else : + print(f'appending nc to zarr from {last_T_zarr.values} to {last_T_nc}') + nc2xr( + netcdf, + var_name, + time_res=time_res, + zarr_resolution=zarr_resolution, + chunks=chunks, + ).to_zarr(store=output_path, append_dim="T") else: - zarr = f"{CONFIG['datasets'][time_res]['zarr_path']}{output_path}" - - shutil.rmtree(zarr, ignore_errors=True) - os.mkdir(zarr) - - xr.Dataset().merge(data).to_zarr( - store = zarr - ) - - if not os.access(zarr, os.W_OK | os.X_OK): - sys.exit("can't write to output directory") - - print(f"conversion for {variable} complete.") - return zarr - -temporal_resolutions = [ - "daily", - "dekadal", -] -for t in temporal_resolutions: - for v in CONFIG['datasets'][t]['vars']: - convert(v, time_res=t) + nc2xr( + netcdf, + var_name, + time_res=time_res, + zarr_resolution=zarr_resolution, + chunks=chunks, + ).to_zarr(store=output_path) + print(f"conversion for {var_name} complete.") + return output_path + +convert( + INPUT_PATH, + OUTPUT_PATH, + VARIABLE, + time_res=TIME_RES, + zarr_resolution=ZARR_RESOLUTION, + chunks=CHUNKS, +)