diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index b515623df..ae39f1a9c 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -238,6 +238,10 @@ def read_reservoir_parameter_file( usace_hybrid, rfc_forecast, lake_index_field="lake_id", + usgs_gage_id_field = "usgs_gage_id", + usgs_lake_id_field = "usgs_lake_id", + usace_gage_id_field = "usace_gage_id", + usace_lake_id_field = "usace_lake_id", lake_id_mask=None, ): @@ -275,12 +279,25 @@ def read_reservoir_parameter_file( ----- """ - with xr.open_dataset(reservoir_parameter_file) as ds: ds = ds.swap_dims({"feature_id": lake_index_field}) ds_new = ds["reservoir_type"] df1 = ds_new.sel({lake_index_field: list(lake_id_mask)}).to_dataframe() + usgs_crosswalk = pd.DataFrame( + data = ds[usgs_gage_id_field].to_numpy(), + index = ds[usgs_lake_id_field].to_numpy(), + columns = [usgs_gage_id_field] + ) + usgs_crosswalk.index.name = usgs_lake_id_field + + usace_crosswalk = pd.DataFrame( + data = ds[usace_gage_id_field].to_numpy(), + index = ds[usace_lake_id_field].to_numpy(), + columns = [usace_gage_id_field] + ) + usace_crosswalk.index.name = usace_lake_id_field + # drop duplicate indices df1 = (df1.reset_index() .drop_duplicates(subset="lake_id") @@ -294,8 +311,8 @@ def read_reservoir_parameter_file( df1[df1['reservoir_type'] == 3] = 1 if rfc_forecast == False: df1[df1['reservoir_type'] == 4] = 1 - - return df1 + + return df1, usgs_crosswalk, usace_crosswalk def get_ql_from_csv(qlat_input_file, index_col=0): @@ -998,8 +1015,21 @@ def _read_timeslice_file(f): return timeslice_observations, observation_quality +def _interpolate_one(df, interpolation_limit, frequency): + + interp_out = (df.resample('min'). + interpolate( + limit = interpolation_limit, + limit_direction = 'both' + ). + resample(frequency). + asfreq(). + to_numpy() + ) + return interp_out + def get_obs_from_timeslices( - crosswalk_file, + crosswalk_df, crosswalk_gage_field, crosswalk_dest_field, timeslice_files, @@ -1007,21 +1037,18 @@ def get_obs_from_timeslices( interpolation_limit, frequency_secs, t0, - cpu_pool + cpu_pool, ): """ Read observations from TimeSlice files, interpolate available observations and organize into a Pandas DataFrame Aguments - -------- - - crosswalk_file (str): full directory path to channel segment - cross walk file (RouteLink.nc) - - - crosswalk_gage_field (str): fieldname of gage ID data in crosswalk file + -------- + - crosswalk_gage_field (str): fieldname of gage ID data in crosswalk dataframe - - crosswalk_dest_field (str): fieldname of destination data in crosswalk - file. For streamflow DA, this is the field + - crosswalk_dest_field (str): fieldname of destination data in link_gage_df. + For streamflow DA, this is the field containing segment IDs. For reservoir DA, this is the field containing waterbody IDs. @@ -1038,7 +1065,7 @@ def get_obs_from_timeslices( - t0 (datetime): Initialization time of simulation set - cpu_pool (int): Number of CPUs used for parallel - TimeSlice reading + TimeSlice reading and interolation Returns ------- @@ -1075,51 +1102,20 @@ def get_obs_from_timeslices( # concatenate dataframes timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1) timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1) - - # Open the cross walk file and build a dataframe of gages and their corresponding - # linkID locations. This dataframe will be joined to the `timeslice_obs_df`, so - # that gage observations are indexed by linkID, rather than gageID. - # - # TODO: Re-write this section to allow the crosswalk file to be flexile. For - # reservoir DA, we would use a differenct cross walk (gage<>waterbody) than - # we would for streamflow DA. - # - with xr.open_dataset(crosswalk_file) as ds: - - # assemble list of gage IDs as bytestrings, this list includes - # the entire gages variable from the crosswalk file, most of which - # are empty because most segments are not co-loacated with gages - gage_list = list(map(bytes.strip, ds[crosswalk_gage_field].values)) - - # create mask for populated (alphanumeric) gage IDs - gage_mask = list(map(bytes.isalnum, gage_list)) - - # apply mask to gage_list, isolating only populated gage IDs - gage_da = list(map(bytes.strip, ds[crosswalk_gage_field][gage_mask].values)) - - # convert gage IDs to unicode strings - gage_da = np.asarray(gage_da).astype(' gage crosswalk data + df = crosswalk_df.reset_index() + df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype('