Skip to content

passing a dataframe in udtf?  #2

Open
@shuaiwang88

Description

@shuaiwang88

Hello, I really like your blog post and the tutorial.
This is a question instead of an issue.

I am trying to duplicate your arima code using udtf. The blow code works great by using the automl pycaret package.

Now, I want to use the exogenous variables for the final prediction:

    prediction = predict_model(final_model, fh=8)
# NOW becomes:
    prediction= predict_model(final_slim_model, X=future_exog)

where future_exgo is a pandas dataframe - a snowflake table: it stores next 8 weeks' exgoenous variables forecast: more like your use temperature for a sales forecast, and you have next 8 weeks weather forecast, and you pass the weather info for the sales forecast.
Details are in here

The question is how do I pass the future_exog table (having a firm/store column for partition) in the snowflake udtf setting, and do I pass that in the last sql partition part from the code below?

from snowflake.snowpark.types import DateType, FloatType, IntegerType, PandasDataFrameType
from snowflake.snowpark.types import StructType, StructField
import pandas as pd
from datetime import date
from pycaret.time_series import *
import numpy as np
from sktime.transformations.series.summarize import WindowSummarizer


class generate_auto_ml_predictions_pycaret:
  # Define __init__ method that acts
  # on full partition before rows are processed

  def __init__(self):
    # from snowpark_upload  import con
    # Create empty list to store inputs
    self._data = []

  # Define process method that acts
  # on each individual input row
  def process(
      self, input_date: date, input_measure: float
  ):

    # Ingest rows into pandas DataFrame
    data = [input_date, input_measure]
    self._data.append(data)

  # Define end_partition method that acts
  # on full partition after rows are processed
  def end_partition(self):
    # Convert inputs to DataFrame
    df = pd.DataFrame(data=self._data, columns=["DATE", "MEASURE"])

    df['DATE'] = pd.PeriodIndex(df['DATE'], freq='W')
    df.set_index('DATE', inplace=True)
    idx = pd.period_range(min(df.index), max(df.index))
    df = df.reindex(idx, fill_value=np.nan)

    # setup and run the experiment

    kwargs1 = {"lag_feature": {"lag": [1, 2, 3, 4, 5, 6, 8, 10, 12],
                               "mean": [[1, 8]], 'std': [[1, 8]]}}

    fe_target_rr = [WindowSummarizer(n_jobs=-1, truncate="bfill", **kwargs1)]
    prediction_df = pd.DataFrame()
    setup(data=df,
          target='MEASURE',
          numeric_imputation_target='drift',
          fh=8,

          fe_target_rr=fe_target_rr,
          verbose=False)

    try:
      best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
                                  include=[
                                      "lasso_cds_dt",
                                      "ada_cds_dt", "snaive",
                                      "ets", "croston"
                                  ])

      # otherwise, remove the seasonality specific models
    except Exception as e:
        best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
                                    include=["lasso_cds_dt",
                                             "ada_cds_dt",
                                             "croston"])
    best_tuned_models = [tune_model(
        model, n_iter=3, round=2, optimize='RMSE') for model in best_model]
    best_blend_model = blend_models(best_tuned_models, method='mean')
    final_model = finalize_model(best_blend_model)

    prediction = predict_model(final_model, fh=8)
    prediction.reset_index(inplace=True)
    prediction.rename(columns={'index': 'year_week',
                      'y_pred': 'value'}, inplace=True)

    prediction['year_week'] = prediction['year_week'].dt.to_timestamp()

    return list(prediction.itertuples(index=False, name=None))
    # snowpark_session.write_pandas(prediction, table_name='udf_arima_test', auto_create_table=True, overwrite=True)
    # Output the result


# Add packages and data types
snowpark_session.add_packages('pandas', 'numpy', 'pycaret', 'sktime')
# 'pycaret-models', 'pycaret-parallel')
# snowpark_session.add_import("@STG_FILES_FOR_UDTFS/snowpark_upload.py")
# Define output schema
output_schema = StructType([
    StructField("DATE", DateType()), StructField("MEASURE", FloatType())
    # , StructField("PREDICTION_TRAIN", FloatType())
    # , StructField("PREDICTION_TEST", FloatType())
])

# Upload UDTF to Snowflake
snowpark_session.udtf.register(
    # , input_types = [DateType(), FloatType()]
    handler=generate_auto_ml_predictions_pycaret, output_schema=output_schema, input_types=[DateType(), IntegerType()], is_permanent=True, name='SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET', replace=True, stage_location='@UDTF_STAGE'
)


output = snowpark_session.sql("""
  select FIRM, DATE, MEASURE
  from WORKSPACE_DATASCIENCE."top_firms_task_weekly"
  ,table(SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET(YEAR_WEEK, TOTAL_TASKS)
        over (
          partition by FIRM
          order by YEAR_WEEK asc
        )
      )
""").to_pandas()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions