Skip to content

Commit

Permalink
Merge pull request #81 from steinadio/feature/bmorph_calibration
Browse files Browse the repository at this point in the history
Feature/bmorph calibration
  • Loading branch information
arbennett authored Jul 22, 2021
2 parents 20146d0 + b66d8fc commit b520de1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 17 deletions.
20 changes: 13 additions & 7 deletions bmorph/core/bmorph.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def marginalize_cdf(y_raw, z_raw, vals):

def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series,
raw_y: pd.Series, train_y: pd.Series, ref_y: pd.Series=None,
method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0, nsmooth=5) -> pd.Series:
method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0,
nsmooth=5, train_cdf_min=1e-4) -> pd.Series:
"""Conditional Quantile Mapping
Multidimensional conditional equidistant CDF matching function:
Expand Down Expand Up @@ -89,15 +90,15 @@ def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series,
# Lookup the associated flow values in the CDFs for the reference and training periods
mapped_ref = x_ref[np.argmin(np.abs(u_t - ref_cdf[nx_raw, :]), axis=1)]
mapped_train = x_train[np.argmin(np.abs(u_t - train_cdf[nx_raw, :]), axis=1)]
mapped_train[mapped_train < 1e-6] = 1e-6
mapped_train[mapped_train < train_cdf_min] = train_cdf_min
multipliers = pd.Series(mapped_ref / mapped_train, index=raw_x.index, name='multiplier')
if method == 'hist':
# Do some smoothing just to reduce effects of histogram bin edges
multipliers = multipliers.rolling(nsmooth, win_type='triang', min_periods=1).mean()
return multipliers


def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf):
def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf, train_cdf_min=1e-4):
'''Calculate multipliers using an adapted version of the EDCDFm technique
This routine implements part of the PresRat bias correction method from
Expand Down Expand Up @@ -156,7 +157,7 @@ def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf):

# Given u_t and train_cdf determine train_x
train_x = np.percentile(train_cdf, u_t)
train_x[train_x < 1e-6] = 1e-6
train_x[train_x < train_cdf_min] = train_cdf_min

# Given u_t and ref_cdf determine ref_x
ref_x = np.percentile(ref_cdf, u_t)
Expand All @@ -170,7 +171,7 @@ def bmorph(raw_ts, train_ts, ref_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_y=None, ref_y=None, train_y=None,
nsmooth=12, bw=3, xbins=200, ybins=10, rtol=1e-7, atol=0, method='hist',
smooth_multipliers=True):
smooth_multipliers=True, train_cdf_min=1e-4):
'''Morph `raw_ts` based on differences between `ref_ts` and `train_ts`
bmorph is an adaptation of the PresRat bias correction procedure from
Expand Down Expand Up @@ -221,6 +222,10 @@ def bmorph(raw_ts, train_ts, ref_ts,
Bins for the flow time series
ybins : int
Bins for the second time series
train_cdf_min : float (optional)
Minimum percentile allowed for train cdf. Defaults as 1e-4 to help handle
data spikes in corrections caused by multipliers being too large from
the ratio between reference and training flows being large.
Returns
-------
Expand Down Expand Up @@ -253,7 +258,7 @@ def bmorph(raw_ts, train_ts, ref_ts,
# Calculate the bmorph multipliers based on the smoothed time series and
# PDFs
bmorph_multipliers = edcdfm(raw_smoothed_ts[raw_apply_window],
raw_cdf, train_cdf, ref_cdf)
raw_cdf, train_cdf, ref_cdf, train_cdf_min)

# Apply the bmorph multipliers to the raw time series
bmorph_ts = bmorph_multipliers * raw_ts[raw_apply_window]
Expand All @@ -277,7 +282,8 @@ def bmorph(raw_ts, train_ts, ref_ts,
raw_smoothed_y[raw_cdf_window],
train_smoothed_y, ref_smoothed_y,
bw=bw, xbins=xbins, ybins=ybins,
rtol=rtol, atol=atol, method=method)
rtol=rtol, atol=atol, method=method,
train_cdf_min=train_cdf_min)
if smooth_multipliers:
bmorph_multipliers = bmorph_multipliers.rolling(window=nsmooth, min_periods=1, center=True).mean()
bmorph_ts = bmorph_multipliers[raw_apply_window] * raw_ts[raw_apply_window]
Expand Down
46 changes: 36 additions & 10 deletions bmorph/core/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from functools import partial
from tqdm.autonotebook import tqdm
from bmorph.util import mizuroute_utils as mizutil
import os

def apply_bmorph(raw_ts, train_ts, ref_ts,
apply_window, raw_train_window, ref_train_window,
Expand All @@ -13,7 +14,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts,
interval=pd.DateOffset(years=1),
overlap=60, n_smooth_long=None, n_smooth_short=5,
bw=3, xbins=200, ybins=10,
rtol=1e-6, atol=1e-8, method='hist', **kwargs):
rtol=1e-6, atol=1e-8, method='hist', train_cdf_min=1e-4, **kwargs):
"""Bias correction is performed by bmorph on user-defined intervals.
Parameters
Expand Down Expand Up @@ -122,7 +123,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_y, ref_y, train_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
bmorph_ts = bmorph_ts.append(bc_total)
bmorph_multipliers = bmorph_multipliers.append(bc_mult)

Expand Down Expand Up @@ -150,7 +151,8 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
train_upstream_y=None, train_downstream_y=None,
ref_upstream_y=None, ref_downstream_y=None,
n_smooth_long=None, n_smooth_short=5,
bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, method='hist', **kwargs):
bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8,
method='hist', train_cdf_min=1e-4, **kwargs):
"""Bias correction performed by blending bmorphed flows on user defined intervals.
Blendmorph is used to perform spatially consistent bias correction, this function
Expand Down Expand Up @@ -298,22 +300,22 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_upstream_y, ref_upstream_y, train_upstream_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
bc_down_total, bc_down_mult = bmorph.bmorph(
raw_downstream_ts, train_downstream_ts, ref_downstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_downstream_y, ref_downstream_y, train_downstream_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
else:
bc_up_total, bc_up_mult = bmorph.bmorph(
raw_upstream_ts, train_upstream_ts, ref_upstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
n_smooth_short)
n_smooth_short, train_cdf_min=train_cdf_min)
bc_down_total, bc_down_mult = bmorph.bmorph(
raw_downstream_ts, train_downstream_ts, ref_downstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
n_smooth_short)
n_smooth_short, train_cdf_min=train_cdf_min)

bc_multiplier = (blend_factor * bc_up_mult) + ((1 - blend_factor) * bc_down_mult)
bc_total = (blend_factor * bc_up_total) + ((1 - blend_factor) * bc_down_total)
Expand Down Expand Up @@ -392,7 +394,7 @@ def _scbc_u_seg(ds, apply_window, raw_train_window, ref_train_window,
return scbc_u_flows, scbc_u_mults, scbc_u_locals


def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None):
def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None, save_mults=False):
"""
Applies Spatially Consistent Bias Correction (SCBC) by
bias correcting local flows and re-routing them through
Expand All @@ -413,13 +415,17 @@ def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None):
for descriptions of the options and their choices.
client: dask.Client (optional)
A `client` object to manage parallel computation.
save_mults: boolean (optional)
Whether to save multipliers from bmorph for diagnosis. If True,
multipliers are saved in the same directory as local flows. Defaults
as False to not save multipliers.
Returns
-------
region_totals: xr.Dataset
The rerouted, total, bias corrected flows for the region
"""
def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):
def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow', mult_path=None):
flows = [r[0] for r in results]
mults = [r[1] for r in results]
local = [r[2] for r in results]
Expand All @@ -428,8 +434,23 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):
local_ds['time'] = flows[0].index
local_ds.name = out_varname
local_ds = local_ds.where(local_ds >= 1e-4, other=1e-4)
try:
os.remove(file_path)
except OSError:
pass
local_ds.transpose().to_netcdf(file_path)

if mult_path:
try:
os.remove(mult_path)
except OSError:
pass
mult_ds = xr.DataArray(np.vstack(mults), dims=('seg','time'))
mult_ds['seg'] = segs
mult_ds['time'] = flows[0].index
mult_ds.name = 'mults'
mult_ds.transpose().to_netcdf(mult_path)

if 'condition_var' in bmorph_config.keys() and bmorph_config['condition_var']:
scbc_type = 'conditional'
scbc_fun = partial(_scbc_c_seg, **bmorph_config)
Expand All @@ -447,7 +468,12 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):

out_file = (f'{bmorph_config["data_path"]}/input/'
f'{bmorph_config["output_prefix"]}_local_{scbc_type}_scbc.nc')
unpack_and_write_netcdf(results, ds['seg'], out_file)
if save_mults:
mult_file = (f'{bmorph_config["data_path"]}/input/'
f'{bmorph_config["output_prefix"]}_local_mult_{scbc_type}_scbc.nc')
else:
mult_file = None
unpack_and_write_netcdf(results, ds['seg'], out_file, mult_path=mult_file)
config_path, mizuroute_config = mizutil.write_mizuroute_config(
bmorph_config["output_prefix"],
scbc_type, bmorph_config['apply_window'],
Expand Down

0 comments on commit b520de1

Please sign in to comment.