Skip to content

Commit

Permalink
compatible with SelfMadeHydroDataset; param_range_dict file could be …
Browse files Browse the repository at this point in the history
…None and model use default setting; param_range_dict could be set for ga calibration
  • Loading branch information
OuyangWenyu committed Aug 15, 2024
1 parent 9202ec2 commit 9f9df57
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 127 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ No more unnecessary columns are allowed.
For time series csv files, et and node1_flow are optional. If you don't have them, you can ignore them.
The units of all variables could be different, but they cannot be missed and should be put in `()` in the column name.

1. Use [prepare_data.py](https://github.com/OuyangWenyu/hydro-model-xaj/tree/master/scripts) -- run the following code to transform the data format to the required format:
1. Use [prepare_data.py](https://github.com/OuyangWenyu/hydromodel/tree/master/scripts) -- run the following code to transform the data format to the required format:
```Shell
$ python prepare_data.py --origin_data_dir <your_data_directory_for_hydromodel>
```
Expand All @@ -101,7 +101,7 @@ To run calibration with CAMLES dataset, you can use the following code:

```Shell
# just an example the hyper-parameters of the model and the algorithm should be tried by yourself
$ python calibrate_xaj.py --data_type camels --data_dir camels_us --exp expcamels001 --cv_fold 2 --warmup 365 --period 2007-01-01 2014-01-01 --calibrate_period 2007-01-01 2014-01-01 --test_period 2007-01-01 2014-01-01 --basin_id 01439500 06885500 08104900 09510200 --model "{\"name\": \"xaj\", \"source_type\": \"sources5mm\", \"source_book\": \"HF\"}" --algorithm "{\"name\": \"SCE_UA\", \"random_seed\": 1234, \"rep\": 10, \"ngs\": 10, \"kstop\": 5, \"peps\": 0.1, \"pcento\": 0.1}" --loss "{\"type\": \"time_series\", \"obj_func\": \"RMSE\", \"events\": null}"
$ python calibrate_xaj.py --data_type camels --data_dir "C:/Users/wenyu/OneDrive/data/camels/camels_us" --exp expcamels001 --cv_fold 2 --warmup 365 --period 2007-01-01 2014-01-01 --calibrate_period 2007-01-01 2014-01-01 --test_period 2007-01-01 2014-01-01 --basin_id 01439500 06885500 08104900 09510200 --model "{\"name\": \"xaj\", \"source_type\": \"sources5mm\", \"source_book\": \"HF\"}" --algorithm "{\"name\": \"SCE_UA\", \"random_seed\": 1234, \"rep\": 10, \"ngs\": 10, \"kstop\": 5, \"peps\": 0.1, \"pcento\": 0.1}" --loss "{\"type\": \"time_series\", \"obj_func\": \"RMSE\", \"events\": null}"
```

To use your own data, run the following code:
Expand Down Expand Up @@ -192,9 +192,9 @@ needed, and it is not implemented yet. The following links may be useful:
- https://github.com/ecoon/watershed-workflow
- https://github.com/ConnectedSystems/Streamfall.jl

**NOTE: We also provide a differentiable version of XAJ, which is based on the [PyTorch](https://pytorch.org/) framework.**
We also provide a differentiable version of XAJ, which is based on the [PyTorch](https://pytorch.org/) framework.

The idea comes from this paper: [From calibration to parameter learning: Harnessing the scaling effects of big data in geoscientific modeling](http://dx.doi.org/10.1038/s41467-021-26107-z) by Tsai et al. (2021). We use the same structure as the original XAJ model but replace the original Numpy code with PyTorch. Then we can use the automatic differentiation technique and stochastic gradient descent algorithms to optimize all parameters. The advantage of this method is that we can use the same code to optimize many basins at once and use big data to improve the model performance. Generally, with the native parallel computing ability of PyTorch, the differentiable version is faster than the original version without any parallel processing. The differentiable version is also in the `models` directory.
The idea comes from this paper: [From calibration to parameter learning: Harnessing the scaling effects of big data in geoscientific modeling](http://dx.doi.org/10.1038/s41467-021-26107-z) by Tsai et al. (2021). We use the same structure as the original XAJ model but replace the original Numpy code with PyTorch. Then we can use the automatic differentiation technique and stochastic gradient descent algorithms to optimize all parameters. The advantage of this method is that we can use the same code to optimize many basins at once and use big data to improve the model performance. Generally, with the native parallel computing ability of PyTorch, the differentiable version is faster than the original version without any parallel processing. The differentiable version is in [torchhydro](https://github.com/OuyangWenyu/torchhydro).

Other implementations for XAJ:

Expand Down
29 changes: 29 additions & 0 deletions hydromodel/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
"""
Author: Wenyu Ouyang
Date: 2024-08-14 16:34:32
LastEditTime: 2024-08-15 12:01:25
LastEditors: Wenyu Ouyang
Description: Some common functions and variables for datasets.
FilePath: \hydromodel\hydromodel\datasets\__init__.py
Copyright (c) 2023-2024 Wenyu Ouyang. All rights reserved.
"""

from hydrodataset import Camels
from hydrodatasource.reader.data_source import SelfMadeHydroDataset

PRCP_NAME = "prcp(mm/day)"
PET_NAME = "pet(mm/day)"
ET_NAME = "et(mm/day)"
Expand Down Expand Up @@ -49,3 +62,19 @@ def get_unit_from_name(name_with_unit):
The unit of the variable, e.g., "mm/day".
"""
return name_with_unit.split("(")[1].strip(")") if "(" in name_with_unit else ""


datasource_dict = {
"camels": Camels,
"selfmadehydrodataset": SelfMadeHydroDataset,
}

datasource_vars_dict = {
# all vars are in the sequence of [pr, pet, flow] with different names
"camels": ["prcp", "PET", "streamflow"],
"selfmadehydrodataset": [
"total_precipitation_hourly",
"potential_evaporation_hourly",
"streamflow",
],
}
113 changes: 58 additions & 55 deletions hydromodel/datasets/data_preprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Author: Wenyu Ouyang
Date: 2022-10-25 21:16:22
LastEditTime: 2024-05-20 20:08:10
LastEditTime: 2024-08-15 14:26:02
LastEditors: Wenyu Ouyang
Description: preprocess data for models in hydro-model-xaj
FilePath: \hydromodel\hydromodel\datasets\data_preprocess.py
Expand Down Expand Up @@ -393,34 +393,37 @@ def cross_valid_data(ts_data, period, warmup, cv_fold, freq="1D"):
return train_test_data


def get_basin_area(data_type, data_dir, basin_ids) -> xr.Dataset:
def get_basin_area(basin_ids, data_type, data_dir, **kwargs) -> xr.Dataset:
"""_summary_
Parameters
----------
data_type : _type_
_description_
data_dir : _type_
_description_
basin_ids : _type_
_description_
basin_ids : list of str
all the basin ids, sorted by the order of the id
data_type : str
the type of the data source, type in datasource_dict.keys() or 'owndata'
data_dir : str
the directory of the data source
**kwargs
some optional parameters for the data source
Returns
-------
xr.Dataset
_description_
"""
area_name = remove_unit_from_name(AREA_NAME)
if data_type == "camels":
camels_data_dir = os.path.join(
SETTING["local_data_path"]["datasets-origin"], "camels", data_dir
)
camels = Camels(camels_data_dir)
basin_area = camels.read_area(basin_ids)
if data_type in datasource_dict.keys():
datasource = datasource_dict[data_type](data_dir, **kwargs)
basin_area = datasource.read_area(basin_ids)
elif data_type == "owndata":
attr_data = xr.open_dataset(os.path.join(data_dir, "attributes.nc"))
# to guarantee the column name is same as the column name in the time series data
basin_area = attr_data[[area_name]].rename({"id": "basin"})
else:
raise NotImplementedError(
"You should set the data type as 'owndata' or type in datasource_dict.keys()"
)
return basin_area


Expand Down Expand Up @@ -451,30 +454,36 @@ def get_ts_from_diffsource(data_type, data_dir, periods, basin_ids):
prcp_name = remove_unit_from_name(PRCP_NAME)
pet_name = remove_unit_from_name(PET_NAME)
flow_name = remove_unit_from_name(FLOW_NAME)
basin_area = get_basin_area(data_type, data_dir, basin_ids)
if data_type == "camels":
camels_data_dir = os.path.join(
SETTING["local_data_path"]["datasets-origin"], "camels", data_dir
basin_area = get_basin_area(basin_ids, data_type, data_dir)
if data_type in datasource_dict.keys():
datasource = datasource_dict[data_type](data_dir)
p_pet_flow_vars = datasource_vars_dict[data_type]
ts_data = datasource.read_ts_xrdataset(basin_ids, periods, p_pet_flow_vars)
if isinstance(ts_data, dict):
# We only support one time-unit in the data source, we select the first
ts_data = ts_data[list(ts_data.keys())[0]]
# get streamflow and convert the unit
qobs_ = ts_data[p_pet_flow_vars[-1:]]
target_unit = ts_data[p_pet_flow_vars[0]].attrs.get("units", "unknown")
if qobs_[p_pet_flow_vars[-1]].attrs.get("units", "unknown") != target_unit:
r_mmd = streamflow_unit_conv(qobs_, basin_area, target_unit=target_unit)
ts_data[flow_name] = r_mmd[p_pet_flow_vars[-1]]
ts_data[flow_name].attrs["units"] = target_unit
ts_data = ts_data.rename(
{
p_pet_flow_vars[0]: prcp_name,
p_pet_flow_vars[1]: pet_name,
p_pet_flow_vars[2]: flow_name,
}
)
camels = Camels(camels_data_dir)
ts_data = camels.read_ts_xrdataset(
basin_ids, periods, ["prcp", "PET", "streamflow"]
)
# trans unit to mm/time_interval
qobs_ = ts_data[["streamflow"]]
target_unit = ts_data["prcp"].attrs.get("units", "unknown")
r_mmd = streamflow_unit_conv(qobs_, basin_area, target_unit=target_unit)
ts_data[flow_name] = r_mmd["streamflow"]
ts_data[flow_name].attrs["units"] = target_unit
ts_data = ts_data.rename({"PET": pet_name})
# ts_data = ts_data.drop_vars('streamflow')
elif data_type == "owndata":
ts_data = xr.open_dataset(os.path.join(data_dir, "timeseries.nc"))
target_unit = ts_data[prcp_name].attrs.get("units", "unknown")
qobs_ = ts_data[[flow_name]]
r_mmd = streamflow_unit_conv(qobs_, basin_area, target_unit=target_unit)
ts_data[flow_name] = r_mmd[flow_name]
ts_data[flow_name].attrs["units"] = target_unit
if qobs_[flow_name].attrs.get("units", "unknown") != target_unit:
r_mmd = streamflow_unit_conv(qobs_, basin_area, target_unit=target_unit)
ts_data[flow_name] = r_mmd[flow_name]
ts_data[flow_name].attrs["units"] = target_unit
ts_data = ts_data.sel(time=slice(periods[0], periods[1]))
else:
raise NotImplementedError(
Expand Down Expand Up @@ -512,16 +521,14 @@ def cross_val_split_tsdata(
data_type, data_dir, cv_fold, train_period, test_period, periods, warmup, basin_ids
):
ts_data = get_ts_from_diffsource(data_type, data_dir, periods, basin_ids)
if cv_fold <= 1:
# no cross validation
periods = np.sort(
[train_period[0], train_period[1], test_period[0], test_period[1]]
)
train_and_test_data = split_train_test(ts_data, train_period, test_period)
else:
if cv_fold > 1:
# cross validation
train_and_test_data = cross_valid_data(ts_data, periods, warmup, cv_fold)
return train_and_test_data
return cross_valid_data(ts_data, periods, warmup, cv_fold)
# no cross validation
periods = np.sort(
[train_period[0], train_period[1], test_period[0], test_period[1]]
)
return split_train_test(ts_data, train_period, test_period)


def get_rr_events(rain, flow, basin_area):
Expand All @@ -532,21 +539,17 @@ def get_rr_events(rain, flow, basin_area):
basin_area.isel(basin=0).to_array().to_numpy() * ureg.km**2,
target_unit=flow.units,
)
# 正则表达式匹配 mm/xh 和 mm/xd 格式
match = re.match(r"mm/(\d+)(h|d)", flow.units)

if match:
num, unit = match.groups()
num = int(num)
if unit == "h":
multiple = num
elif unit == "d":
multiple = num * 24
else:
raise ValueError(f"Unsupported unit: {unit}")
else:
if not (match := re.match(r"mm/(\d+)(h|d)", flow.units)):
raise ValueError(f"Invalid unit format: {flow.units}")

num, unit = match.groups()
num = int(num)
if unit == "h":
multiple = num
elif unit == "d":
multiple = num * 24
else:
raise ValueError(f"Unsupported unit: {unit}")
print(f"flow.units = { flow.units}, multiple = {multiple}")
rr_events = {}
for basin in basin_area.basin.values:
Expand Down
21 changes: 11 additions & 10 deletions hydromodel/models/gr4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
from numba import jit

from hydromodel.models.model_config import read_model_param_dict
from hydromodel.models.model_config import MODEL_PARAM_DICT
from hydromodel.models.xaj import uh_conv


Expand Down Expand Up @@ -204,12 +204,15 @@ def gr4j(p_and_e, parameters, warmup_length: int, return_state=False, **kwargs):
Union[np.array, tuple]
streamflow or (streamflow, states)
"""
pr_file = kwargs.get("param_range_file", None)
model_param_dict = read_model_param_dict(pr_file)
x1_scale = model_param_dict["gr4j"]["param_range"]["x1"]
x2_sacle = model_param_dict["gr4j"]["param_range"]["x2"]
x3_scale = model_param_dict["gr4j"]["param_range"]["x3"]
x4_scale = model_param_dict["gr4j"]["param_range"]["x4"]
model_param_dict = kwargs.get("gr4j", None)
if model_param_dict is None:
model_param_dict = MODEL_PARAM_DICT["gr4j"]
# params
param_ranges = model_param_dict["param_range"]
x1_scale = param_ranges["x1"]
x2_sacle = param_ranges["x2"]
x3_scale = param_ranges["x3"]
x4_scale = param_ranges["x4"]
x1 = x1_scale[0] + parameters[:, 0] * (x1_scale[1] - x1_scale[0])
x2 = x2_sacle[0] + parameters[:, 1] * (x2_sacle[1] - x2_sacle[0])
x3 = x3_scale[0] + parameters[:, 2] * (x3_scale[1] - x3_scale[0])
Expand Down Expand Up @@ -253,6 +256,4 @@ def gr4j(p_and_e, parameters, warmup_length: int, return_state=False, **kwargs):
q, r = routing(q9[i, :, 0], q1[i, :, 0], x2, x3, r)
streamflow_[i, :] = q
streamflow = np.expand_dims(streamflow_, axis=2)
if return_state:
return streamflow, ets, s, r
return streamflow, ets
return (streamflow, ets, s, r) if return_state else (streamflow, ets)
19 changes: 11 additions & 8 deletions hydromodel/models/hymod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy as np
from numba import jit

from hydromodel.models.model_config import read_model_param_dict
from hydromodel.models.model_config import MODEL_PARAM_DICT


def hymod(p_and_e, parameters, warmup_length=30, return_state=False, **kwargs):
Expand Down Expand Up @@ -30,14 +30,17 @@ def hymod(p_and_e, parameters, warmup_length=30, return_state=False, **kwargs):
Union[list, np.array]
streamflow, x_slow, x_quick, x_loss or streamflow
"""
pr_file = kwargs.get("param_range_file", None)
model_param_dict = read_model_param_dict(pr_file)
model_param_dict = kwargs.get("hymod", None)
if model_param_dict is None:
model_param_dict = MODEL_PARAM_DICT["hymod"]
# params
param_ranges = model_param_dict["param_range"]
# parameter, 2-dim variable: [parameter=1, basin]
cmax_scale = model_param_dict["hymod"]["param_range"]["cmax"]
bexp_sacle = model_param_dict["hymod"]["param_range"]["bexp"]
alpha_scale = model_param_dict["hymod"]["param_range"]["alpha"]
ks_scale = model_param_dict["hymod"]["param_range"]["ks"]
kq_scale = model_param_dict["hymod"]["param_range"]["kq"]
cmax_scale = param_ranges["cmax"]
bexp_sacle = param_ranges["bexp"]
alpha_scale = param_ranges["alpha"]
ks_scale = param_ranges["ks"]
kq_scale = param_ranges["kq"]
cmax = cmax_scale[0] + parameters[:, 0] * (cmax_scale[1] - cmax_scale[0])
bexp = bexp_sacle[0] + parameters[:, 1] * (bexp_sacle[1] - bexp_sacle[0])
alpha = alpha_scale[0] + parameters[:, 2] * (alpha_scale[1] - alpha_scale[0])
Expand Down
33 changes: 26 additions & 7 deletions hydromodel/models/xaj.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Author: Wenyu Ouyang
Date: 2021-12-10 23:01:02
LastEditTime: 2024-05-19 11:41:06
LastEditTime: 2024-08-15 16:18:47
LastEditors: Wenyu Ouyang
Description: Core code for XinAnJiang model
FilePath: /hydro-model-xaj/hydromodel/models/xaj.py
Expand All @@ -14,7 +14,7 @@
from numba import jit
from scipy.special import gamma

from hydromodel.models.model_config import read_model_param_dict
from hydromodel.models.model_config import MODEL_PARAM_DICT

PRECISION = 1e-5

Expand Down Expand Up @@ -746,10 +746,11 @@ def xaj(
source_type = kwargs.get("source_type", "sources")
source_book = kwargs.get("source_book", "HF")
time_interval_hours = kwargs.get("time_interval_hours", 1)
pr_file = kwargs.get("param_range_file", None)
model_param_dict = read_model_param_dict(pr_file)
model_param_dict = kwargs.get(f"{model_name}", None)
if model_param_dict is None:
model_param_dict = MODEL_PARAM_DICT[f"{model_name}"]
# params
param_ranges = model_param_dict[model_name]["param_range"]
param_ranges = model_param_dict["param_range"]
if model_name == "xaj":
route_method = "CSL"
elif model_name == "xaj_mz":
Expand Down Expand Up @@ -832,7 +833,16 @@ def xaj(
)
elif source_type == "sources5mm":
(rs, ri, rg), (s, fr) = sources5mm(
pe, r, sm, ex, ki, kg, s0, fr0, time_interval_hours=time_interval_hours, book=source_book
pe,
r,
sm,
ex,
ki,
kg,
s0,
fr0,
time_interval_hours=time_interval_hours,
book=source_book,
)
else:
raise NotImplementedError("No such divide-sources method")
Expand All @@ -846,7 +856,16 @@ def xaj(
)
elif source_type == "sources5mm":
(rs, ri, rg), (s, fr) = sources5mm(
pe, r, sm, ex, ki, kg, s, fr, time_interval_hours=time_interval_hours, book=source_book
pe,
r,
sm,
ex,
ki,
kg,
s,
fr,
time_interval_hours=time_interval_hours,
book=source_book,
)
else:
raise NotImplementedError("No such divide-sources method")
Expand Down
Loading

0 comments on commit 9f9df57

Please sign in to comment.