Skip to content

Commit

Permalink
Remic/entozud (#404)
Browse files Browse the repository at this point in the history
* pandas not in use

* arguments rather than loop

* refactoring of enactstozarr to enable appending to zarr as well as create zarr

* typo

* let to_zar fail and explain path it not a directory

* no more shutil

* removing useless legacy code

* not general enough

* removing strftime from printing

* f strings are now short enough to fit on one line

* convert a datetime to datatime64 before being able to compare it to such

* set_up_dims words just as well with np.datetime64
  • Loading branch information
remicousin authored Feb 26, 2024
1 parent e81d6b0 commit e78f080
Showing 1 changed file with 201 additions and 44 deletions.
245 changes: 201 additions & 44 deletions enacts/enactstozarr.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,85 @@
import os
import shutil
import sys
import numpy as np
import xarray as xr
import datetime as dt
from pathlib import Path
import pingrid
import pandas as pd
from functools import partial
import calc


CONFIG = pingrid.load_config(os.environ["CONFIG"])
VARIABLE = sys.argv[1] #e.g. precip, tmax, tmin -- check your config
TIME_RES = sys.argv[2] #e.g. daily, or dekadal -- check in your config
INPUT_PATH = (
f'{CONFIG["datasets"][TIME_RES]["nc_path"]}'
f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][0]}'
)
OUTPUT_PATH = (
(
f'{CONFIG["datasets"][TIME_RES]["zarr_path"]}'
f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][0]}'
) if CONFIG['datasets'][TIME_RES]['vars'][VARIABLE][1] is None
else (
f'{CONFIG["datasets"][TIME_RES]["zarr_path"]}'
f'{CONFIG["datasets"][TIME_RES]["vars"][VARIABLE][1]}'
)
)
CHUNKS = CONFIG['datasets'][TIME_RES]['chunks']
ZARR_RESOLUTION = CONFIG['datasets'][TIME_RES]["zarr_resolution"]


def set_up_dims(xda, time_res="daily"):
"""Sets up spatial and temporal dimensions from a set of time-dependent netcdf
ENACTS files.
To be used in `preprocess` of `xarray.open_mfdataset` .
Using some Ingrid naming conventions.
Parameters
----------
xda : DataArray
from the list of `paths` from `xarray.open_mfdataset`
time_res : str, optional
indicates the time resolution of the set of time-dependent files.
Default is "daily" and other option is "dekadal"
datestr = Path(xda.encoding["source"]).name.split("_")[2]
Returns
-------
DataArray of X (longitude), Y (latitude) and T (time, daily or dekadal)
See Also
--------
xarray.open_mfdataset, filename2datetime64
"""
return xda.expand_dims(T = [filename2datetime64(
Path(xda.encoding["source"]), time_res=time_res,
)]).rename({'Lon': 'X','Lat': 'Y'})


def filename2datetime64(file, time_res="daily"):
"""Return time associated with an ENACTS filename in datetime
In case of dekadal, returns the first day of the dekad (i.e. 1, 11, or 21)
Parameters
----------
file : pathlib(Path)
file to extract date from name
time_res : str, optional
indicates the time resolution of the file.
Default is "daily" and other option is "dekadal"
Returns
-------
numpy.datetime64
See Also
--------
set_up_dims, convert
"""
datestr = file.name.split("_")[2]
year = int(datestr[0:4])
month = int(datestr[4:6])
if time_res == "daily":
Expand All @@ -25,12 +90,25 @@ def set_up_dims(xda, time_res="daily"):
raise Exception(
"time resolution must be 'daily' or 'dekadal' "
)
xda = xda.expand_dims(T = [dt.datetime(year, month, day)])
xda = xda.rename({'Lon': 'X','Lat': 'Y'})

return xda
return np.datetime64(dt.datetime(year, month, day))


def regridding(data, resolution):
"""Spatial regridding of `data` to `resolution` .
Does nothing if current resolution is close (according to numpy) to resolution.
Parameters
----------
data : DataArray
data of X and Y to regrid.
resolution : real
resolution to regrid to.
Returns
-------
DataArray of `data` regridded to `resolution` .
"""
if not np.isclose(data['X'][1] - data['X'][0], resolution):
# TODO this method of regridding is inaccurate because it pretends
# that (X, Y) define a Euclidian space. In reality, grid cells
Expand All @@ -41,55 +119,134 @@ def regridding(data, resolution):
# and look into xESMF.
#
# [1] https://climatedataguide.ucar.edu/climate-data-tools-and-analysis/regridding-overview
print("Your data will be regridded. Refer to function documentation for more information on this.")
print((
f"Your data will be regridded."
f"Refer to function documentation for more information on this."
))
data = data.interp(
X=np.arange(data.X.min(), data.X.max() + resolution, resolution),
Y=np.arange(data.Y.min(),data.Y.max() + resolution, resolution),
)
return data

def convert(variable, time_res="daily"):
print(f"converting files for: {time_res} {variable}")

zarr_resolution = CONFIG['datasets'][time_res]["zarr_resolution"]
input_path, output_path, var_name = CONFIG['datasets'][time_res]['vars'][variable]
nc_path = f"{CONFIG['datasets'][time_res]['nc_path']}{input_path}"
netcdf = list(sorted(Path(nc_path).glob("*.nc")))
def nc2xr(paths, var_name, time_res="daily", zarr_resolution=None, chunks={}):
"""Open mutiple daily or dekadal ENACTS files as a single dataset.
Optionally spatially regrids and
coerces all arrays in this dataset into dask arrays with the given chunks.
Parameters
----------
paths : str or nested sequence of paths
Either a string glob in the form "path/to/my/files/*.nc"
or an explicit list of files to open. Paths can be given as strings
or as pathlib Paths.
var_name : str
name of the ENACTS variable in the nc files
time_res : str, optional
indicates the time resolution of the set of files.
Default is "daily" and other option is "dekadal"
zarr_resolution : real, optional
spatial resolution to regrid to.
chunks : int, tuple of int, "auto" or mapping of hashable to int, optional
Chunk sizes along each dimension X, Y and T.
Returns
-------
Xarray.Dataset containing variable `var_name` and coordinates X, Y and T
See Also
--------
xarray.open_mfdataset, set_up_dims, regridding, xarray.DataArray.chunk
"""
data = xr.open_mfdataset(
netcdf,
preprocess = partial(set_up_dims, time_res=time_res),
parallel=False
paths,
preprocess=partial(set_up_dims, time_res=time_res),
parallel=False,
)[var_name]
if zarr_resolution != None:
print("attempting regrid")
print("attempting regrid")
data = regridding(data, zarr_resolution)
return xr.Dataset().merge(data.chunk(chunks=chunks))

data = data.chunk(chunks=CONFIG['datasets'][time_res]['chunks'])

if output_path == None:
zarr = f"{CONFIG['datasets'][time_res]['zarr_path']}{input_path}"

def convert(
input_path,
output_path,
var_name,
time_res="daily",
zarr_resolution=None,
chunks={}
):
"""Converts a set of ENACTS files into zarr store.
Either create a new one or append an existing one
Parameters
----------
input_path : str
path where the ENACTS nc files are
output_path : str
path where the zarr store is (to append to) or will be (to create).
To create, (last element of the) path is expected not to exist.
To append, path is expected to point to a zarr store.
var_name : str
name of the ENACTS variable in the nc files
time_res : str, optional
indicates the time resolution of the set of files.
Default is "daily" and other option is "dekadal"
zarr_resolution : real, optional
spatial resolution to regrid to.
chunks : int, tuple of int, "auto" or mapping of hashable to int, optional
Chunk sizes along each dimension X, Y and T.
Returns
-------
output_path : where the zarr store has been written
See Also
--------
calc.read_zarr_data, filename2datetime64, nc2xr, xarray.Dataset.to_zarr
"""
print(f"converting files for: {time_res} {var_name}")
netcdf = list(sorted(Path(input_path).glob("*.nc")))
if Path(output_path).is_dir() :
current_zarr = calc.read_zarr_data(output_path)
last_T_zarr = current_zarr["T"][-1]
last_T_nc = filename2datetime64(netcdf[-1], time_res=time_res)
if last_T_nc < last_T_zarr.values :
print(f'nc set ({last_T_nc}) ends before zarrs ({last_T_zarr})')
print("Not changing existing zarr")
elif last_T_nc == last_T_zarr.values :
print(f'both sets end same date: {last_T_nc}')
print("Not changing existing zarr")
else :
print(f'appending nc to zarr from {last_T_zarr.values} to {last_T_nc}')
nc2xr(
netcdf,
var_name,
time_res=time_res,
zarr_resolution=zarr_resolution,
chunks=chunks,
).to_zarr(store=output_path, append_dim="T")
else:
zarr = f"{CONFIG['datasets'][time_res]['zarr_path']}{output_path}"

shutil.rmtree(zarr, ignore_errors=True)
os.mkdir(zarr)

xr.Dataset().merge(data).to_zarr(
store = zarr
)

if not os.access(zarr, os.W_OK | os.X_OK):
sys.exit("can't write to output directory")

print(f"conversion for {variable} complete.")
return zarr

temporal_resolutions = [
"daily",
"dekadal",
]
for t in temporal_resolutions:
for v in CONFIG['datasets'][t]['vars']:
convert(v, time_res=t)
nc2xr(
netcdf,
var_name,
time_res=time_res,
zarr_resolution=zarr_resolution,
chunks=chunks,
).to_zarr(store=output_path)
print(f"conversion for {var_name} complete.")
return output_path

convert(
INPUT_PATH,
OUTPUT_PATH,
VARIABLE,
time_res=TIME_RES,
zarr_resolution=ZARR_RESOLUTION,
chunks=CHUNKS,
)

0 comments on commit e78f080

Please sign in to comment.