Skip to content

passing a dataframe in udtf? #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
shuaiwang88 opened this issue Feb 10, 2024 · 0 comments
Open

passing a dataframe in udtf? #2

shuaiwang88 opened this issue Feb 10, 2024 · 0 comments

Comments

@shuaiwang88
Copy link

Hello, I really like your blog post and the tutorial.
This is a question instead of an issue.

I am trying to duplicate your arima code using udtf. The blow code works great by using the automl pycaret package.

Now, I want to use the exogenous variables for the final prediction:

    prediction = predict_model(final_model, fh=8)
# NOW becomes:
    prediction= predict_model(final_slim_model, X=future_exog)

where future_exgo is a pandas dataframe - a snowflake table: it stores next 8 weeks' exgoenous variables forecast: more like your use temperature for a sales forecast, and you have next 8 weeks weather forecast, and you pass the weather info for the sales forecast.
Details are in here

The question is how do I pass the future_exog table (having a firm/store column for partition) in the snowflake udtf setting, and do I pass that in the last sql partition part from the code below?

from snowflake.snowpark.types import DateType, FloatType, IntegerType, PandasDataFrameType
from snowflake.snowpark.types import StructType, StructField
import pandas as pd
from datetime import date
from pycaret.time_series import *
import numpy as np
from sktime.transformations.series.summarize import WindowSummarizer


class generate_auto_ml_predictions_pycaret:
  # Define __init__ method that acts
  # on full partition before rows are processed

  def __init__(self):
    # from snowpark_upload  import con
    # Create empty list to store inputs
    self._data = []

  # Define process method that acts
  # on each individual input row
  def process(
      self, input_date: date, input_measure: float
  ):

    # Ingest rows into pandas DataFrame
    data = [input_date, input_measure]
    self._data.append(data)

  # Define end_partition method that acts
  # on full partition after rows are processed
  def end_partition(self):
    # Convert inputs to DataFrame
    df = pd.DataFrame(data=self._data, columns=["DATE", "MEASURE"])

    df['DATE'] = pd.PeriodIndex(df['DATE'], freq='W')
    df.set_index('DATE', inplace=True)
    idx = pd.period_range(min(df.index), max(df.index))
    df = df.reindex(idx, fill_value=np.nan)

    # setup and run the experiment

    kwargs1 = {"lag_feature": {"lag": [1, 2, 3, 4, 5, 6, 8, 10, 12],
                               "mean": [[1, 8]], 'std': [[1, 8]]}}

    fe_target_rr = [WindowSummarizer(n_jobs=-1, truncate="bfill", **kwargs1)]
    prediction_df = pd.DataFrame()
    setup(data=df,
          target='MEASURE',
          numeric_imputation_target='drift',
          fh=8,

          fe_target_rr=fe_target_rr,
          verbose=False)

    try:
      best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
                                  include=[
                                      "lasso_cds_dt",
                                      "ada_cds_dt", "snaive",
                                      "ets", "croston"
                                  ])

      # otherwise, remove the seasonality specific models
    except Exception as e:
        best_model = compare_models(sort='RMSE', verbose=True, n_select=2,
                                    include=["lasso_cds_dt",
                                             "ada_cds_dt",
                                             "croston"])
    best_tuned_models = [tune_model(
        model, n_iter=3, round=2, optimize='RMSE') for model in best_model]
    best_blend_model = blend_models(best_tuned_models, method='mean')
    final_model = finalize_model(best_blend_model)

    prediction = predict_model(final_model, fh=8)
    prediction.reset_index(inplace=True)
    prediction.rename(columns={'index': 'year_week',
                      'y_pred': 'value'}, inplace=True)

    prediction['year_week'] = prediction['year_week'].dt.to_timestamp()

    return list(prediction.itertuples(index=False, name=None))
    # snowpark_session.write_pandas(prediction, table_name='udf_arima_test', auto_create_table=True, overwrite=True)
    # Output the result


# Add packages and data types
snowpark_session.add_packages('pandas', 'numpy', 'pycaret', 'sktime')
# 'pycaret-models', 'pycaret-parallel')
# snowpark_session.add_import("@STG_FILES_FOR_UDTFS/snowpark_upload.py")
# Define output schema
output_schema = StructType([
    StructField("DATE", DateType()), StructField("MEASURE", FloatType())
    # , StructField("PREDICTION_TRAIN", FloatType())
    # , StructField("PREDICTION_TEST", FloatType())
])

# Upload UDTF to Snowflake
snowpark_session.udtf.register(
    # , input_types = [DateType(), FloatType()]
    handler=generate_auto_ml_predictions_pycaret, output_schema=output_schema, input_types=[DateType(), IntegerType()], is_permanent=True, name='SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET', replace=True, stage_location='@UDTF_STAGE'
)


output = snowpark_session.sql("""
  select FIRM, DATE, MEASURE
  from WORKSPACE_DATASCIENCE."top_firms_task_weekly"
  ,table(SNOWPARK_GENERATE_AUTO_ARIMA_PREDICTIONS_PYCARET(YEAR_WEEK, TOTAL_TASKS)
        over (
          partition by FIRM
          order by YEAR_WEEK asc
        )
      )
""").to_pandas()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant