You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, I really like your blog post and the tutorial.
This is a question instead of an issue.
I am trying to duplicate your arima code using udtf. The blow code works great by using the automl pycaret package.
Now, I want to use the exogenous variables for the final prediction:
prediction = predict_model(final_model, fh=8)
# NOW becomes:
prediction= predict_model(final_slim_model, X=future_exog)
where future_exgo is a pandas dataframe - a snowflake table: it stores next 8 weeks' exgoenous variables forecast: more like your use temperature for a sales forecast, and you have next 8 weeks weather forecast, and you pass the weather info for the sales forecast.
Details are in here
The question is how do I pass the future_exog table (having a firm/store column for partition) in the snowflake udtf setting, and do I pass that in the last sql partition part from the code below?
from snowflake.snowpark.types import DateType, FloatType, IntegerType, PandasDataFrameType
from snowflake.snowpark.types import StructType, StructField
import pandas as pd
from datetime import date
from pycaret.time_series import *
import numpy as np
from sktime.transformations.series.summarize import WindowSummarizer
class generate_auto_ml_predictions_pycaret:
# Define __init__ method that acts
# on full partition before rows are processed
def __init__(self):
# from snowpark_upload import con
# Create empty list to store inputs
self._data = []
# Define process method that acts
# on each individual input row
def process(
self, input_date: date, input_measure: float
):
# Ingest rows into pandas DataFrame
data = [input_date, input_measure]
self._data.append(data)
# Define end_partition method that acts
# on full partition after rows are processed
def end_partition(self):
# Convert inputs to DataFrame
df = pd.DataFrame(data=self._data, columns=["DATE", "MEASURE"])
df['DATE'] = pd.PeriodIndex(df['DATE'], freq='W')
df.set_index('DATE', inplace=True)
idx = pd.period_range(min(df.index), max(df.index))
df = df.reindex(idx, fill_value=np.nan)
# setup and run the experiment
kwargs1 = {"lag_feature": {"lag": [1, 2, 3, 4, 5, 6, 8, 10, 12],
"mean": [[1, 8]], 'std': [[1, 8]]}}
fe_target_rr = [WindowSummarizer(n_jobs=-1, truncate="bfill", **kwargs1)]
prediction_df = pd.DataFrame()
setup(data=df,
target='MEASURE',
numeric_imputation_target='drift',
fh=8,
fe_target_rr=fe_target_rr,
verbose=False)
try:
best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
include=[
"lasso_cds_dt",
"ada_cds_dt", "snaive",
"ets", "croston"
])
# otherwise, remove the seasonality specific models
except Exception as e:
best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
include=["lasso_cds_dt",
"ada_cds_dt",
"croston"])
best_tuned_models = [tune_model(
model, n_iter=3, round=2, optimize='RMSE') for model in best_model]
best_blend_model = blend_models(best_tuned_models, method='mean')
final_model = finalize_model(best_blend_model)
prediction = predict_model(final_model, fh=8)
prediction.reset_index(inplace=True)
prediction.rename(columns={'index': 'year_week',
'y_pred': 'value'}, inplace=True)
prediction['year_week'] = prediction['year_week'].dt.to_timestamp()
return list(prediction.itertuples(index=False, name=None))
# snowpark_session.write_pandas(prediction, table_name='udf_arima_test', auto_create_table=True, overwrite=True)
# Output the result
# Add packages and data types
snowpark_session.add_packages('pandas', 'numpy', 'pycaret', 'sktime')
# 'pycaret-models', 'pycaret-parallel')
# snowpark_session.add_import("@STG_FILES_FOR_UDTFS/snowpark_upload.py")
# Define output schema
output_schema = StructType([
StructField("DATE", DateType()), StructField("MEASURE", FloatType())
# , StructField("PREDICTION_TRAIN", FloatType())
# , StructField("PREDICTION_TEST", FloatType())
])
# Upload UDTF to Snowflake
snowpark_session.udtf.register(
# , input_types = [DateType(), FloatType()]
handler=generate_auto_ml_predictions_pycaret, output_schema=output_schema, input_types=[DateType(), IntegerType()], is_permanent=True, name='SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET', replace=True, stage_location='@UDTF_STAGE'
)
output = snowpark_session.sql("""
select FIRM, DATE, MEASURE
from WORKSPACE_DATASCIENCE."top_firms_task_weekly"
,table(SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET(YEAR_WEEK, TOTAL_TASKS)
over (
partition by FIRM
order by YEAR_WEEK asc
)
)
""").to_pandas()
The text was updated successfully, but these errors were encountered:
Hello, I really like your blog post and the tutorial.
This is a question instead of an issue.
I am trying to duplicate your arima code using udtf. The blow code works great by using the automl pycaret package.
Now, I want to use the exogenous variables for the final prediction:
where future_exgo is a pandas dataframe - a snowflake table: it stores next 8 weeks' exgoenous variables forecast: more like your use temperature for a sales forecast, and you have next 8 weeks weather forecast, and you pass the weather info for the sales forecast.
Details are in here
The question is how do I pass the future_exog table (having a firm/store column for partition) in the snowflake udtf setting, and do I pass that in the last sql partition part from the code below?
The text was updated successfully, but these errors were encountered: