Skip to content

Commit

Permalink
More Generic Layers2Pressure Class (#25)
Browse files Browse the repository at this point in the history
* generalizes layers2pressure object to other grids, uses pfull not level

* generalize to use any level name

* remove the sys.path stuff

* bump version to 0.2.0

* note about hydrostatic approximation
  • Loading branch information
timothyas authored Sep 26, 2024
1 parent 85e5b9c commit 83f03a6
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 87 deletions.
56 changes: 29 additions & 27 deletions docs/example_pressure_interpolation.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ufs2arco"
version = "0.1.2"
version = "0.2.0"
description = "Tools for converting Unified Forecast System (UFS) output to Analysis Ready, Cloud Optimized (ARCO) format"
authors = [
{name="Timothy Smith", email="[email protected]"},
Expand Down
2 changes: 1 addition & 1 deletion ufs2arco/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.1.2"
__version__ = "0.2.0"

from .cice6dataset import CICE6Dataset
from .fv3dataset import FV3Dataset
Expand Down
226 changes: 168 additions & 58 deletions ufs2arco/layers2pressure.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,135 @@
import os
import yaml
from typing import Optional
import numpy as np
import xarray as xr

class Layers2Pressure():
"""A class to interpolate from Lagrangian layers to pressure levels, and also compute layer thickness, etc"""
"""A class to interpolate from Lagrangian layers to pressure levels, and also compute layer thickness, etc
Args:
ak, bk (np.ndarray, optional): coefficients that define the vertical grid. If not provided, will
default to the 127 vertical levels defined for NOAA GFS
pfull, phalf (np.ndarray, optional): if ak and bk are provided, these are not necessary since they
can be computed from those coefficients. However, these can be provided in case there are slight
numerical differences between how these values are computed here, versus the grid information
from an existing dataset.
level_name, interface_name (str, optional): names to use for the vertical coordinate at
cell center and interfaces, defaulting to "pfull" and "phalf" as in FV3. However,
new names can be provided in order to work with datasets that use a different name.
"""
g = 9.80665 # m / s^2
Rd = 287.05 # J / kg / K
Rv = 461.5 # J / kg / K
q_min = 1e-10 # kg / kg
z_vir = Rv / Rd - 1.
xds = {"ak": None, "bk": None, "pfull": None, "phalf": None}

def __init__(self):
def __init__(
self,
ak: Optional[np.ndarray]=None,
bk: Optional[np.ndarray]=None,
pfull: Optional[np.ndarray]=None,
phalf: Optional[np.ndarray]=None,
level_name: Optional[str]="pfull",
interface_name: Optional[str]="phalf",
):

self.level_name = level_name
self.interface_name = interface_name

if ak is None and bk is None:
self.xds = self._get_default_vertical_grid()
elif (ak is not None and bk is None) or (ak is None and bk is not None):
raise TypeError("Must provide ak and bk, or neither, not just one.")
else:

newxds = self._get_xds(ak, bk, pfull, phalf)
self.xds = newxds


def _get_default_vertical_grid(self):

# read vertical coordinate information
vcoord_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "replay_vertical_levels.yaml")
with open(vcoord_path, "r") as f:
vcoords = yaml.safe_load(f)

k = np.arange(len(vcoords["pfull"]))
kp1 = np.arange(len(vcoords["ak"]))
self.ak = xr.DataArray(
vcoords["ak"],
coords={"kp1": kp1},
dims=("kp1",),
name="ak",
ak = np.array(vcoords["ak"])
bk = np.array(vcoords["bk"])
pfull = np.array(vcoords["pfull"])
return self._get_xds(ak, bk, pfull)


def _get_xds(
self,
ak: np.ndarray,
bk: np.ndarray,
pfull: Optional[np.ndarray]=None,
phalf: Optional[np.ndarray]=None,
) -> xr.Dataset:

ak = ak.values if isinstance(ak, xr.DataArray) else ak
bk = bk.values if isinstance(bk, xr.DataArray) else bk

# Compute phalf and pfull if necessary
if phalf is None:
phalf = (ak + 100_000 * bk) / 100 # hPa

if pfull is None:
pfull = 0.5*phalf[1:] + 0.5*phalf[:-1]


# Now convert
phalf = xr.DataArray(
phalf,
coords={self.interface_name: phalf},
dims=(self.interface_name,),
)
pfull = xr.DataArray(
pfull,
coords={self.level_name: pfull},
dims=(self.level_name,),
)
self.bk = xr.DataArray(
vcoords["bk"],
coords=self.ak.coords,
dims=self.ak.dims,
name="bk",

ak = xr.DataArray(
ak,
coords={self.interface_name: phalf},
dims=(self.interface_name,),
)
self.level = xr.DataArray(
vcoords["pfull"],
coords={"k": k},
dims=("k",),
name="level",
bk = xr.DataArray(
bk,
coords=ak.coords,
dims=ak.dims,
)

def calc_pressure_interfaces(self, pressfc) -> xr.DataArray:
xds = xr.Dataset({
"ak": ak,
"bk": bk,
self.level_name: pfull,
self.interface_name: phalf,
})
xds = xds.set_coords(["ak", "bk", self.interface_name, self.level_name])
return xds

@property
def ak(self):
return self.xds["ak"]

@property
def bk(self):
return self.xds["bk"]

@property
def pfull(self):
return self.xds[self.level_name]

@property
def phalf(self):
return self.xds[self.interface_name]


def calc_pressure_interfaces(self, pressfc: xr.DataArray) -> xr.DataArray:
"""Compute pressure at vertical grid interfaces
Args:
Expand All @@ -51,7 +141,7 @@ def calc_pressure_interfaces(self, pressfc) -> xr.DataArray:

return self.ak.astype(pressfc.dtype) + pressfc*self.bk.astype(pressfc.dtype)

def calc_pressure_thickness(self, prsi) -> xr.DataArray:
def calc_pressure_thickness(self, prsi: xr.DataArray) -> xr.DataArray:
"""Compute pressure thickness at each vertical level
Args:
Expand All @@ -60,23 +150,28 @@ def calc_pressure_thickness(self, prsi) -> xr.DataArray:
Returns:
dpres (xr.DataArray): Pressure thickness at each vertical level (Pa)
"""
dpres = prsi.diff("kp1", label="lower")
return self._dkp1_to_level(dpres)
dpres = prsi.diff(self.interface_name, label="lower")
return self._dphalf_to_pfull(dpres, name="dpres")

def calc_dlogp(self, prsi) -> xr.DataArray:
def calc_dlogp(self, prsi: xr.DataArray) -> xr.DataArray:
"""Compute difference of log of interface pressure
Args:
prsi (xr.DataArray): Pressure at vertical grid interfaces
Returns:
dlogp (xr.DataArray): np.log(prsi).diff("kp1")
dlogp (xr.DataArray): np.log(prsi).diff("phalf")
"""
dlogp = np.log(prsi).diff("kp1", label="lower")
return self._dkp1_to_level(dlogp)
dlogp = np.log(prsi).diff(self.interface_name, label="lower")
return self._dphalf_to_pfull(dlogp, name="dlogp")

def calc_delz(self, pressfc, temp, spfh) -> xr.DataArray:
"""Compute layer thickness at each vertical level
def calc_delz(
self,
pressfc: xr.DataArray,
temp: xr.DataArray,
spfh: xr.DataArray,
) -> xr.DataArray:
"""Compute a hydrostatic approximation of the layer thickness at each vertical level.
Args:
pressfc, temp, spfh (xr.DataArray): surface pressure, temperature, and specific humidity
Expand All @@ -86,13 +181,22 @@ def calc_delz(self, pressfc, temp, spfh) -> xr.DataArray:
"""
prsi = self.calc_pressure_interfaces(pressfc)
dlogp = self.calc_dlogp(prsi)
dlogp = dlogp.sel(level=temp["level"])
dlogp = dlogp.sel({self.level_name: temp[self.level_name]})
spfh_thresh = spfh.where(spfh > self.q_min, self.q_min)
return -self.Rd / self.g * temp * (1. + self.z_vir*spfh_thresh) * dlogp

def calc_layer_mean_pressure(self, pressfc, temp, spfh, delz) -> xr.DataArray:
def calc_layer_mean_pressure(
self,
pressfc: xr.DataArray,
temp: xr.DataArray,
spfh: xr.DataArray,
delz: xr.DataArray,
) -> xr.DataArray:
"""Compute pressure at vertical grid cell center (i.e., layer mean)
Note:
If ``delz`` is computed by :meth:`calc_delz`, then a hydrostatic approximation is used. This may be inconsistent with the original simulation/dataset.
Args:
pressfc, temp, spfh, delz (xr.DataArray): surface pressure, temperature, specific humidity, and layer thickness
Expand All @@ -102,7 +206,7 @@ def calc_layer_mean_pressure(self, pressfc, temp, spfh, delz) -> xr.DataArray:

prsi = self.calc_pressure_interfaces(pressfc)
dpres = self.calc_pressure_thickness(prsi)
dpres = dpres.sel(level=temp["level"])
dpres = dpres.sel({self.level_name: temp[self.level_name]})

# rTv computed here:
# https://github.com/NOAA-GFDL/GFDL_atmos_cubed_sphere/blob/ab195d5026ca4c221b6cbb3888c8ae92d711f89a/driver/fvGFS/atmosphere.F90#L2199-L2200
Expand All @@ -117,9 +221,12 @@ def calc_layer_mean_pressure(self, pressfc, temp, spfh, delz) -> xr.DataArray:
return dpres*rTv / (-self.g*delz)


def calc_geopotential(self, hgtsfc, delz):
def calc_geopotential(self, hgtsfc: xr.DataArray, delz: xr.DataArray) -> xr.DataArray:
"""Compute geopotential field
Note:
If ``delz`` is computed by :meth:`calc_delz`, then a hydrostatic approximation is used. This may be inconsistent with the original simulation/dataset.
Args:
hgtsfc, delz (xr.DataArray): surface/orographic height, and vertical layer thickness
Expand All @@ -129,18 +236,18 @@ def calc_geopotential(self, hgtsfc, delz):

# a coordinate helper
kp1_left = xr.DataArray(
np.arange(len(self.level)),
coords={"level": self.level.values},
np.arange(len(self.pfull)),
coords={self.level_name: self.pfull.values},
)

# Geopotential at the surface
phi0 = self.g * hgtsfc
phi0 = phi0.expand_dims({"kp1": [len(self.level)]})
phi0 = phi0.expand_dims({"kp1": [len(self.pfull)]})

# Concatenate, cumulative sum from the ground to TOA
dz = self.g*np.abs(delz)
dz["kp1"] = kp1_left.sel(level=delz["level"])
dz = dz.swap_dims({"level": "kp1"}).drop_vars("level")
dz["kp1"] = kp1_left.sel({self.level_name: delz[self.level_name]})
dz = dz.swap_dims({self.level_name: "kp1"}).drop_vars(self.level_name)

phii = xr.concat([dz,phi0], dim="kp1")

Expand All @@ -150,12 +257,18 @@ def calc_geopotential(self, hgtsfc, delz):

# At last, geopotential is interfacial value + .5 * layer thickness * gravity
geopotential = phii - 0.5 * dz
geopotential = geopotential.swap_dims({"kp1": "level"}).drop_vars("kp1")
geopotential = geopotential.swap_dims({"kp1": self.level_name}).drop_vars("kp1")
geopotential.attrs["units"] = "m**2 / s**2"
geopotential.attrs["description"] = "Diagnosed using ufs2arco.Layers2Pressure.calc_geopotential"
return geopotential

def interp2pressure(self, xda, pstar, prsl, cds=None):
def interp2pressure(
self,
xda: xr.DataArray,
pstar: float,
prsl: xr.DataArray,
cds: Optional[xr.Dataset]=None,
):
"""Interpolate data on FV3 native vertical grid to pressure level (p*)
Args:
Expand All @@ -171,17 +284,16 @@ def interp2pressure(self, xda, pstar, prsl, cds=None):
if cds is None:
cds = self.get_interp_coefficients(pstar, prsl)

xda_left = xda.where(cds["is_left"]).sum("level")
xda_right = xda.where(cds["is_right"]).sum("level")
xda_left = xda.where(cds["is_left"]).sum(self.level_name)
xda_right = xda.where(cds["is_right"]).sum(self.level_name)

result = xda_left + (xda_right - xda_left) * cds["factor"]

mask = (cds["is_right"].sum("level") > 0) & (cds["is_left"].sum("level") > 0)
mask = (cds["is_right"].sum(self.level_name) > 0) & (cds["is_left"].sum(self.level_name) > 0)
return result.where(mask)


@staticmethod
def get_interp_coefficients(pstar: float, prsl: xr.DataArray) -> xr.Dataset:
def get_interp_coefficients(self, pstar: float, prsl: xr.DataArray) -> xr.Dataset:
"""Compute the coefficients needed to interpolate between pressure levels
Args:
Expand All @@ -195,12 +307,12 @@ def get_interp_coefficients(pstar: float, prsl: xr.DataArray) -> xr.Dataset:
dloglev = np.log(pstar) - np.log(prsl)

# get coefficients representing pressure distance left and right of pstar
is_left = dlev == dlev.where(dlev>=0).min("level")
is_right = dlev == dlev.where(dlev<=0).max("level")
is_left = dlev == dlev.where(dlev>=0).min(self.level_name)
is_right = dlev == dlev.where(dlev<=0).max(self.level_name)

# TODO: add extrapolation from level to ground here
p_left = dloglev.where(is_left).sum("level")
p_right = -dloglev.where(is_right).sum("level")
p_left = dloglev.where(is_left).sum(self.level_name)
p_right = -dloglev.where(is_right).sum(self.level_name)

denominator = p_left + p_right
denominator = denominator.where(denominator > 1e-6, 1e-6)
Expand All @@ -221,12 +333,10 @@ def get_interp_coefficients(pstar: float, prsl: xr.DataArray) -> xr.Dataset:
"factor": factor,
})




def _dkp1_to_level(self, xda):
xda = xda.rename({"kp1": "k"})
xda["level"] = self.level
xda = xda.swap_dims({"k": "level"})
xda = xda.drop_vars({"k"})
return xda
def _dphalf_to_pfull(self, xda, name):
xds = xda.to_dataset(name=name)
xds = xds.assign_coords(self.pfull.coords)
xds[self.level_name] = xds[self.level_name].swap_dims({self.level_name: self.interface_name})
xds = xds.swap_dims({self.interface_name: self.level_name})
xds = xds.drop_vars(self.interface_name)
return xds[name]

0 comments on commit 83f03a6

Please sign in to comment.