Skip to content

Commit

Permalink
Expedite time slice interpolation routine (#536)
Browse files Browse the repository at this point in the history
* lake-gage crosswalk creation

* use cross walk dataframes and parallelize interpolation

* name index

* return hybrid lake-gage crosswalks

* save link_gage_df for preprocess

* pass cross walk dataframe to get_obs_from_timeslices

* simplify crosswalk with dataframes

* fix link_lake_df creation

* change waterbody types to 1 if USGS or USACE hybrid obs are unavailable

* save usgs and usace crosswalk dfs to preprocess dict

* add reservoir crosswalk dfs to list of things to unpack from preprocess and return

* add crosswalk to list of returns from unpack_nwm_preprocess

Co-authored-by: awlostowski-noaa <[email protected]>
  • Loading branch information
awlostowski-noaa and awlostowski-noaa authored Mar 13, 2022
1 parent 8fb6226 commit 29897f2
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 83 deletions.
129 changes: 71 additions & 58 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ def read_reservoir_parameter_file(
usace_hybrid,
rfc_forecast,
lake_index_field="lake_id",
usgs_gage_id_field = "usgs_gage_id",
usgs_lake_id_field = "usgs_lake_id",
usace_gage_id_field = "usace_gage_id",
usace_lake_id_field = "usace_lake_id",
lake_id_mask=None,
):

Expand Down Expand Up @@ -275,12 +279,25 @@ def read_reservoir_parameter_file(
-----
"""

with xr.open_dataset(reservoir_parameter_file) as ds:
ds = ds.swap_dims({"feature_id": lake_index_field})
ds_new = ds["reservoir_type"]
df1 = ds_new.sel({lake_index_field: list(lake_id_mask)}).to_dataframe()

usgs_crosswalk = pd.DataFrame(
data = ds[usgs_gage_id_field].to_numpy(),
index = ds[usgs_lake_id_field].to_numpy(),
columns = [usgs_gage_id_field]
)
usgs_crosswalk.index.name = usgs_lake_id_field

usace_crosswalk = pd.DataFrame(
data = ds[usace_gage_id_field].to_numpy(),
index = ds[usace_lake_id_field].to_numpy(),
columns = [usace_gage_id_field]
)
usace_crosswalk.index.name = usace_lake_id_field

# drop duplicate indices
df1 = (df1.reset_index()
.drop_duplicates(subset="lake_id")
Expand All @@ -294,8 +311,8 @@ def read_reservoir_parameter_file(
df1[df1['reservoir_type'] == 3] = 1
if rfc_forecast == False:
df1[df1['reservoir_type'] == 4] = 1

return df1
return df1, usgs_crosswalk, usace_crosswalk


def get_ql_from_csv(qlat_input_file, index_col=0):
Expand Down Expand Up @@ -998,30 +1015,40 @@ def _read_timeslice_file(f):

return timeslice_observations, observation_quality

def _interpolate_one(df, interpolation_limit, frequency):

interp_out = (df.resample('min').
interpolate(
limit = interpolation_limit,
limit_direction = 'both'
).
resample(frequency).
asfreq().
to_numpy()
)
return interp_out

def get_obs_from_timeslices(
crosswalk_file,
crosswalk_df,
crosswalk_gage_field,
crosswalk_dest_field,
timeslice_files,
qc_threshold,
interpolation_limit,
frequency_secs,
t0,
cpu_pool
cpu_pool,
):
"""
Read observations from TimeSlice files, interpolate available observations
and organize into a Pandas DataFrame
Aguments
--------
- crosswalk_file (str): full directory path to channel segment
cross walk file (RouteLink.nc)
- crosswalk_gage_field (str): fieldname of gage ID data in crosswalk file
--------
- crosswalk_gage_field (str): fieldname of gage ID data in crosswalk dataframe
- crosswalk_dest_field (str): fieldname of destination data in crosswalk
file. For streamflow DA, this is the field
- crosswalk_dest_field (str): fieldname of destination data in link_gage_df.
For streamflow DA, this is the field
containing segment IDs. For reservoir DA,
this is the field containing waterbody IDs.
Expand All @@ -1038,7 +1065,7 @@ def get_obs_from_timeslices(
- t0 (datetime): Initialization time of simulation set
- cpu_pool (int): Number of CPUs used for parallel
TimeSlice reading
TimeSlice reading and interolation
Returns
-------
Expand Down Expand Up @@ -1075,51 +1102,20 @@ def get_obs_from_timeslices(
# concatenate dataframes
timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1)
timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1)

# Open the cross walk file and build a dataframe of gages and their corresponding
# linkID locations. This dataframe will be joined to the `timeslice_obs_df`, so
# that gage observations are indexed by linkID, rather than gageID.
#
# TODO: Re-write this section to allow the crosswalk file to be flexile. For
# reservoir DA, we would use a differenct cross walk (gage<>waterbody) than
# we would for streamflow DA.
#
with xr.open_dataset(crosswalk_file) as ds:

# assemble list of gage IDs as bytestrings, this list includes
# the entire gages variable from the crosswalk file, most of which
# are empty because most segments are not co-loacated with gages
gage_list = list(map(bytes.strip, ds[crosswalk_gage_field].values))

# create mask for populated (alphanumeric) gage IDs
gage_mask = list(map(bytes.isalnum, gage_list))

# apply mask to gage_list, isolating only populated gage IDs
gage_da = list(map(bytes.strip, ds[crosswalk_gage_field][gage_mask].values))

# convert gage IDs to unicode strings
gage_da = np.asarray(gage_da).astype('<U15')

# package crosswalk data into a dictionary
data_var_dict = {
crosswalk_gage_field: gage_da,
crosswalk_dest_field: ds[crosswalk_dest_field].values[gage_mask]
}

# construct crosswalk dataframe, set gage ID field as index
df = (pd.DataFrame(data = data_var_dict).
set_index(crosswalk_gage_field))


# Link <> gage crosswalk data
df = crosswalk_df.reset_index()
df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype('<U15')
df = df.set_index(crosswalk_gage_field)

# join crosswalk data with timeslice data, indexed on crosswalk destination field
observation_df = (df.join(timeslice_obs_df).
reset_index().
rename(columns={"index": crosswalk_gage_field}).
set_index(crosswalk_dest_field).
drop([crosswalk_gage_field], axis=1))

observation_qual_df = (df.join(timeslice_qual_df).
reset_index().
rename(columns={"index": crosswalk_gage_field}).
set_index(crosswalk_dest_field).
drop([crosswalk_gage_field], axis=1))

Expand All @@ -1146,14 +1142,31 @@ def get_obs_from_timeslices(
frequency = str(int(frequency_secs/60))+"min"

# interpolate and resample frequency
observation_df_T = (observation_df_T.resample('min').
interpolate(
limit = interpolation_limit,
limit_direction = 'both'
).
resample(frequency).
asfreq()
)
buffer_df = observation_df_T.resample(frequency).asfreq()
with Parallel(n_jobs=cpu_pool) as parallel:

jobs = []
interp_chunks = ()
step = 200
for a, i in enumerate(range(0, len(observation_df_T.columns), step)):

start = i
if (i+step-1) < buffer_df.shape[1]:
stop = i+(step)
else:
stop = buffer_df.shape[1]

jobs.append(
delayed(_interpolate_one)(observation_df_T.iloc[:,start:stop], interpolation_limit, frequency)
)

interp_chunks = parallel(jobs)

observation_df_T = pd.DataFrame(
data = np.concatenate(interp_chunks, axis = 1),
columns = buffer_df.columns,
index = buffer_df.index
)

# re-transpose, making link the index
observation_df_new = observation_df_T.transpose()
Expand Down
9 changes: 9 additions & 0 deletions src/troute-nwm/src/nwm_routing/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ def main_v03(argv):
reaches_bytw,
rconn,
link_gage_df,
usgs_lake_gage_crosswalk,
usace_lake_gage_crosswalk,
diffusive_network_data,
topobathy_data,
) = unpack_nwm_preprocess_data(
Expand All @@ -311,6 +313,8 @@ def main_v03(argv):
reaches_bytw,
rconn,
link_gage_df,
usgs_lake_gage_crosswalk,
usace_lake_gage_crosswalk,
diffusive_network_data,
topobathy_data,
) = nwm_network_preprocess(
Expand Down Expand Up @@ -381,11 +385,14 @@ def main_v03(argv):
break_network_at_waterbodies,
segment_index,
link_gage_df,
usgs_lake_gage_crosswalk,
usace_lake_gage_crosswalk,
link_lake_crosswalk,
lastobs_df.index,
cpu_pool,
t0,
)


if showtiming:
forcing_end_time = time.time()
Expand Down Expand Up @@ -469,6 +476,8 @@ def main_v03(argv):
break_network_at_waterbodies,
segment_index,
link_gage_df,
usgs_lake_gage_crosswalk,
usace_lake_gage_crosswalk,
link_lake_crosswalk,
lastobs_df.index,
cpu_pool,
Expand Down
Loading

0 comments on commit 29897f2

Please sign in to comment.