Skip to content

Commit

Permalink
add frequency validation and futr_df debugging methods (Nixtla#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez authored and twobitunicorn committed Dec 7, 2023
1 parent 2a5c3d9 commit c9b84e5
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 42 deletions.
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ dependencies:
- black
- polars
- "ray[tune]>=2.2.0"
- utilsforecast>=0.0.17
- utilsforecast>=0.0.19
92 changes: 73 additions & 19 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
" LocalRobustScaler, \n",
" LocalStandardScaler,\n",
")\n",
"from utilsforecast.validation import validate_freq\n",
"\n",
"import neuralforecast.config as nf_config\n",
"from neuralforecast.tsdataset import TimeSeriesDataset\n",
Expand Down Expand Up @@ -259,7 +260,7 @@
" \n",
" def __init__(self, \n",
" models: List[Any],\n",
" freq: str,\n",
" freq: Union[str, int],\n",
" local_scaler_type: Optional[str] = None):\n",
" \"\"\"\n",
" The `core.StatsForecast` class allows you to efficiently fit multiple `NeuralForecast` models \n",
Expand All @@ -272,9 +273,8 @@
" models : List[typing.Any]\n",
" Instantiated `neuralforecast.models` \n",
" see [collection here](https://nixtla.github.io/neuralforecast/models.html).\n",
" freq : str\n",
" Frequency of the data, \n",
" see [panda's available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).\n",
" freq : str or int\n",
" Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.\n",
" local_scaler_type : str, optional (default=None)\n",
" Scaler to apply per-serie to all features before fitting, which is inverted after predicting.\n",
" Can be 'standard', 'robust', 'robust-iqr', 'minmax' or 'boxcox'\n",
Expand Down Expand Up @@ -381,6 +381,7 @@
"\n",
" # Process and save new dataset (in self)\n",
" if df is not None:\n",
" validate_freq(df['ds'], self.freq)\n",
" self.dataset, self.uids, self.last_dates, self.ds \\\n",
" = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, predict_only=False)\n",
" self.sort_df = sort_df\n",
Expand All @@ -402,6 +403,51 @@
"\n",
" self._fitted = True\n",
"\n",
" def make_future_dataframe(self, df: Optional[DataFrame] = None) -> DataFrame:\n",
" \"\"\"Create a dataframe with all ids and future times in the forecasting horizon.\n",
"\n",
" Parameters\n",
" ----------\n",
" df : pandas or polars DataFrame, optional (default=None)\n",
" DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.\n",
" Only required if this is different than the one used in the fit step.\n",
" \"\"\"\n",
" if df is not None:\n",
" df = ufp.sort(df, by=['unique_id', 'ds'])\n",
" last_times_by_id = ufp.group_by_agg(\n",
" df, by='unique_id', aggs={'ds': 'max'}, maintain_order=True\n",
" )\n",
" uids = last_times_by_id['unique_id']\n",
" last_times = last_times_by_id['ds']\n",
" else:\n",
" uids = self.uids\n",
" last_times = self.last_dates\n",
" return ufp.make_future_dataframe(\n",
" uids=uids,\n",
" last_times=last_times,\n",
" freq=self.freq,\n",
" h=self.h,\n",
" id_col='unique_id',\n",
" time_col='ds',\n",
" )\n",
"\n",
" def get_missing_future(\n",
" self, futr_df: DataFrame, df: Optional[DataFrame] = None\n",
" ) -> DataFrame:\n",
" \"\"\"Get the missing ids and times combinations in `futr_df`.\n",
" \n",
" Parameters\n",
" ----------\n",
" futr_df : pandas or polars DataFrame\n",
" DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous.\n",
" df : pandas or polars DataFrame, optional (default=None)\n",
" DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.\n",
" Only required if this is different than the one used in the fit step.\n",
" \"\"\"\n",
" expected = self.make_future_dataframe(df)\n",
" ids = ['unique_id', 'ds']\n",
" return ufp.anti_join(expected, futr_df[ids], on=ids)\n",
"\n",
" def predict(self,\n",
" df: Optional[DataFrame] = None,\n",
" static_df: Optional[DataFrame] = None,\n",
Expand Down Expand Up @@ -455,6 +501,7 @@
"\n",
" # Process new dataset but does not store it.\n",
" if df is not None:\n",
" validate_freq(df['ds'], self.freq)\n",
" dataset, uids, last_dates, _ = self._prepare_fit(\n",
" df=df, static_df=static_df, sort_df=sort_df, predict_only=True\n",
" )\n",
Expand All @@ -474,16 +521,13 @@
" cols += [model_name + n for n in model.loss.output_names]\n",
"\n",
" # Placeholder dataframe for predictions with unique_id and ds\n",
" if isinstance(self.uids, pl_Series):\n",
" df_constructor = pl_DataFrame\n",
" else:\n",
" df_constructor = pd.DataFrame\n",
" starts = ufp.offset_times(last_dates, self.freq, 1)\n",
" fcsts_df = df_constructor(\n",
" {\n",
" 'unique_id': ufp.repeat(self.uids, self.h),\n",
" 'ds': ufp.time_ranges(starts, freq=self.freq, periods=self.h),\n",
" }\n",
" fcsts_df = ufp.make_future_dataframe(\n",
" uids=uids,\n",
" last_times=last_dates,\n",
" freq=self.freq,\n",
" h=self.h,\n",
" id_col='unique_id',\n",
" time_col='ds',\n",
" )\n",
"\n",
" # Update and define new forecasting dataset\n",
Expand All @@ -492,13 +536,22 @@
" else:\n",
" futr_orig_rows = futr_df.shape[0]\n",
" futr_df = ufp.join(futr_df, fcsts_df, on=['unique_id', 'ds'])\n",
" base_err_msg = f'`futr_df` must have one row per id and ds in the forecasting horizon ({self.h}).'\n",
" if futr_df.shape[0] < fcsts_df.shape[0]:\n",
" raise ValueError(base_err_msg)\n",
" if df is None:\n",
" expected_cmd = 'make_future_dataframe()'\n",
" missing_cmd = 'get_missing_future(futr_df)'\n",
" else:\n",
" expected_cmd = 'make_future_dataframe(df)'\n",
" missing_cmd = 'get_missing_future(futr_df, df)'\n",
" raise ValueError(\n",
" 'There are missing combinations of ids and times in `futr_df`.\\n'\n",
" f'You can run the `{expected_cmd}` method to get the expected combinations or '\n",
" f'the `{missing_cmd}` method to get the missing combinations.'\n",
" )\n",
" if futr_orig_rows > futr_df.shape[0]:\n",
" dropped_rows = futr_orig_rows - futr_df.shape[0]\n",
" warnings.warn(\n",
" f'Dropped {dropped_rows:,} unused rows from `futr_df`. ' + base_err_msg\n",
" f'Dropped {dropped_rows:,} unused rows from `futr_df`.'\n",
" )\n",
" if any(ufp.is_none(futr_df[col]).any() for col in needed_futr_exog):\n",
" raise ValueError('Found null values in `futr_df`')\n",
Expand All @@ -522,7 +575,7 @@
" fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n",
"\n",
" # Declare predictions pd.DataFrame\n",
" if df_constructor is pl_DataFrame:\n",
" if isinstance(self.uids, pl_Series):\n",
" fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))\n",
" else:\n",
" fcsts = pd.DataFrame(fcsts, columns=cols)\n",
Expand Down Expand Up @@ -583,6 +636,7 @@
"\n",
" # Process and save new dataset (in self)\n",
" if df is not None:\n",
" validate_freq(df['ds'], self.freq)\n",
" self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(\n",
" df=df, static_df=static_df, sort_df=sort_df, predict_only=False\n",
" )\n",
Expand Down Expand Up @@ -1250,7 +1304,7 @@
"nf = NeuralForecast(models=models, freq='M')\n",
"nf.fit(AirPassengersPanel_train)\n",
"# not enough rows in futr_df raises an error\n",
"test_fail(lambda: nf.predict(futr_df=AirPassengersPanel_test.head()), contains='must have one row per id and ds')\n",
"test_fail(lambda: nf.predict(futr_df=AirPassengersPanel_test.head()), contains='There are missing combinations')\n",
"# extra rows issues a warning\n",
"with warnings.catch_warnings(record=True) as issued_warnings:\n",
" warnings.simplefilter('always', UserWarning)\n",
Expand Down
4 changes: 4 additions & 0 deletions neuralforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@
'neuralforecast.core.NeuralForecast.cross_validation': ( 'core.html#neuralforecast.cross_validation',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.fit': ('core.html#neuralforecast.fit', 'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.get_missing_future': ( 'core.html#neuralforecast.get_missing_future',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.load': ('core.html#neuralforecast.load', 'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.make_future_dataframe': ( 'core.html#neuralforecast.make_future_dataframe',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.predict': ( 'core.html#neuralforecast.predict',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.predict_insample': ( 'core.html#neuralforecast.predict_insample',
Expand Down
96 changes: 75 additions & 21 deletions neuralforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
LocalRobustScaler,
LocalStandardScaler,
)
from utilsforecast.validation import validate_freq

import neuralforecast.config as nf_config
from .tsdataset import TimeSeriesDataset
Expand Down Expand Up @@ -158,7 +159,10 @@ def _warn_id_as_idx():
# %% ../nbs/core.ipynb 10
class NeuralForecast:
def __init__(
self, models: List[Any], freq: str, local_scaler_type: Optional[str] = None
self,
models: List[Any],
freq: Union[str, int],
local_scaler_type: Optional[str] = None,
):
"""
The `core.StatsForecast` class allows you to efficiently fit multiple `NeuralForecast` models
Expand All @@ -171,9 +175,8 @@ def __init__(
models : List[typing.Any]
Instantiated `neuralforecast.models`
see [collection here](https://nixtla.github.io/neuralforecast/models.html).
freq : str
Frequency of the data,
see [panda's available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).
freq : str or int
Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.
local_scaler_type : str, optional (default=None)
Scaler to apply per-serie to all features before fitting, which is inverted after predicting.
Can be 'standard', 'robust', 'robust-iqr', 'minmax' or 'boxcox'
Expand Down Expand Up @@ -289,6 +292,7 @@ def fit(

# Process and save new dataset (in self)
if df is not None:
validate_freq(df["ds"], self.freq)
self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(
df=df, static_df=static_df, sort_df=sort_df, predict_only=False
)
Expand All @@ -314,6 +318,51 @@ def fit(

self._fitted = True

def make_future_dataframe(self, df: Optional[DataFrame] = None) -> DataFrame:
"""Create a dataframe with all ids and future times in the forecasting horizon.
Parameters
----------
df : pandas or polars DataFrame, optional (default=None)
DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
Only required if this is different than the one used in the fit step.
"""
if df is not None:
df = ufp.sort(df, by=["unique_id", "ds"])
last_times_by_id = ufp.group_by_agg(
df, by="unique_id", aggs={"ds": "max"}, maintain_order=True
)
uids = last_times_by_id["unique_id"]
last_times = last_times_by_id["ds"]
else:
uids = self.uids
last_times = self.last_dates
return ufp.make_future_dataframe(
uids=uids,
last_times=last_times,
freq=self.freq,
h=self.h,
id_col="unique_id",
time_col="ds",
)

def get_missing_future(
self, futr_df: DataFrame, df: Optional[DataFrame] = None
) -> DataFrame:
"""Get the missing ids and times combinations in `futr_df`.
Parameters
----------
futr_df : pandas or polars DataFrame
DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous.
df : pandas or polars DataFrame, optional (default=None)
DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
Only required if this is different than the one used in the fit step.
"""
expected = self.make_future_dataframe(df)
ids = ["unique_id", "ds"]
return ufp.anti_join(expected, futr_df[ids], on=ids)

def predict(
self,
df: Optional[DataFrame] = None,
Expand Down Expand Up @@ -373,6 +422,7 @@ def predict(

# Process new dataset but does not store it.
if df is not None:
validate_freq(df["ds"], self.freq)
dataset, uids, last_dates, _ = self._prepare_fit(
df=df, static_df=static_df, sort_df=sort_df, predict_only=True
)
Expand All @@ -393,16 +443,13 @@ def predict(
cols += [model_name + n for n in model.loss.output_names]

# Placeholder dataframe for predictions with unique_id and ds
if isinstance(self.uids, pl_Series):
df_constructor = pl_DataFrame
else:
df_constructor = pd.DataFrame
starts = ufp.offset_times(last_dates, self.freq, 1)
fcsts_df = df_constructor(
{
"unique_id": ufp.repeat(self.uids, self.h),
"ds": ufp.time_ranges(starts, freq=self.freq, periods=self.h),
}
fcsts_df = ufp.make_future_dataframe(
uids=uids,
last_times=last_dates,
freq=self.freq,
h=self.h,
id_col="unique_id",
time_col="ds",
)

# Update and define new forecasting dataset
Expand All @@ -411,15 +458,21 @@ def predict(
else:
futr_orig_rows = futr_df.shape[0]
futr_df = ufp.join(futr_df, fcsts_df, on=["unique_id", "ds"])
base_err_msg = f"`futr_df` must have one row per id and ds in the forecasting horizon ({self.h})."
if futr_df.shape[0] < fcsts_df.shape[0]:
raise ValueError(base_err_msg)
if df is None:
expected_cmd = "make_future_dataframe()"
missing_cmd = "get_missing_future(futr_df)"
else:
expected_cmd = "make_future_dataframe(df)"
missing_cmd = "get_missing_future(futr_df, df)"
raise ValueError(
"There are missing combinations of ids and times in `futr_df`.\n"
f"You can run the `{expected_cmd}` method to get the expected combinations or "
f"the `{missing_cmd}` method to get the missing combinations."
)
if futr_orig_rows > futr_df.shape[0]:
dropped_rows = futr_orig_rows - futr_df.shape[0]
warnings.warn(
f"Dropped {dropped_rows:,} unused rows from `futr_df`. "
+ base_err_msg
)
warnings.warn(f"Dropped {dropped_rows:,} unused rows from `futr_df`.")
if any(ufp.is_none(futr_df[col]).any() for col in needed_futr_exog):
raise ValueError("Found null values in `futr_df`")
futr_dataset = dataset.align(futr_df)
Expand All @@ -442,7 +495,7 @@ def predict(
fcsts = self._scalers_target_inverse_transform(fcsts, indptr)

# Declare predictions pd.DataFrame
if df_constructor is pl_DataFrame:
if isinstance(self.uids, pl_Series):
fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))
else:
fcsts = pd.DataFrame(fcsts, columns=cols)
Expand Down Expand Up @@ -505,6 +558,7 @@ def cross_validation(

# Process and save new dataset (in self)
if df is not None:
validate_freq(df["ds"], self.freq)
self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(
df=df, static_df=static_df, sort_df=sort_df, predict_only=False
)
Expand Down
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ language = English
custom_sidebar = True
license = apache2
status = 2
requirements = numpy>=1.21.6 pandas>=1.3.5 torch>=2.0.0 pytorch-lightning>=2.0.0 ray[tune]>=2.2.0 optuna utilsforecast>=0.0.17 numba
requirements = numpy>=1.21.6 pandas>=1.3.5 torch>=2.0.0 pytorch-lightning>=2.0.0 ray[tune]>=2.2.0 optuna utilsforecast>=0.0.19 numba
dev_requirements = nbdev black mypy flake8 matplotlib hyperopt polars pyarrow
nbs_path = nbs
doc_path = _docs
Expand Down

0 comments on commit c9b84e5

Please sign in to comment.