This README contains the overview and lab instructions for Data Engineering Pipelines with Snowpark Python code contained in this repository.
If you are embarking on the Data Science lab, jump to the Data Science Lab Instructions in the steps/00_data_science_lab
folder.
➡️ for prerequisites to both the Data Science and Data Engineering labs, see this guide for detailed steps Snowpark Python Prerequisites.
Here is an overview of what we'll build in this lab:
This step will create the necessary role, warehouse, database, and schema for the hands-on lab. We will create 4 schemas for our data maturity model; external, RAW_POS, RAW_CUSTOMER, HARMONIZED, and ANALYTICS.
Before executing the SQL script, ensure you are logged into Snowflake via the extension in Codespaces. Select the Snowflake icon in the left navigation bar and verify you are connected, otherwise log in with your credentials. Once logged in, open the steps/01_setup_snowflake.sql
script by going to the file Explorer in the left navigation bar.
To run all the queries in this script, use the "Execute All Statements" button in the upper right corner of the editor window.
During this step we will be loading the raw Tasty Bytes POS and Customer loyalty data from raw Parquet files in s3://sfquickstarts/data-engineering-with-snowpark-python/
to our RAW_POS
and RAW_CUSTOMER
schemas in Snowflake. We are going to be orchestrating this process from Codespaces in Python using the Snowpark Python API. To put this in context, we are on step #2 in our data flow overview:
To load the raw data, we will be executing the steps/02_load_raw.py script
. To execute this script ensure that the pysnowpark
Conda environment is active, then run the following commands (which assume that your terminal has the root of your repository open):
cd steps
python 02_load_raw.py
During this step we will be "loading" the raw weather data to Snowflake. But "loading" is really the wrong word here. Because we're using Snowflake's unique data-sharing capability we don't actually need to copy the data to our Snowflake account with a custom ETL process. Instead, we can directly access the weather data shared by Weather Source in the Snowflake Data Marketplace. To put this in context, we are on step #3 in our data flow overview:
Weather Source is a leading provider of global weather and climate data and their OnPoint Product Suite provides businesses with the necessary weather and climate data to quickly generate meaningful and actionable insights for a wide range of use cases across industries. Let's connect to the Weather Source LLC: frostbyte
feed from Weather Source in the Snowflake Data Marketplace by following these steps:
- Login to Snowsight
- Ensure you are using the ACCOUNTADMIN role
- Click on the Marketplace link in the left navigation bar
- Enter "Weather Source LLC: frostbyte" in the search box and click return
- Click on the "Weather Source LLC: frostbyte" listing tile
- Click the blue "Get" button
- Expand the "Options" dialog
- Change the "Database name" to read "FROSTBYTE_WEATHERSOURCE" (all capital letters)
- Select the "HOL_ROLE" role to have access to the new database
- Click on the blue "Get" button
Open the steps/03_load_weather.sql
script in Codespaces from the file Explorer in the left navigation bar, and run the script, which contains the following SQL query.
SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100;
During this step we will be creating a view to simplify the raw POS schema by joining together 6 different tables and picking only the columns we need. What's really cool is that we're going to define that view with the Snowpark DataFrame API, then we're going to create a Snowflake stream on that view so that we can incrementally process changes to any of the POS tables. To put this in context, we are on step #4 in our data flow overview:
To create the view and stream, execute the steps/04_create_pos_view.py script
. Like we did in step 2, let's execute it from the terminal. The following commands assume that your terminal is already in the /steps
directory, if not then execute cd steps
then execute the application.
python 04_create_pos_view.py
During this step we will be creating and deploying our first Snowpark Python object to Snowflake, a user-defined function (or UDF). The UDF will be very basic to begin with, but in a future step, we'll update it to include a third-party Python package. Additionally, you will be introduced to the new SnowCLI, a new developer command line tool. SnowCLI makes building and deploying Snowpark Python objects to Snowflake a consistent experience for the developer. To put this in context, we are on step #5 in our data flow overview:
To test the UDF locally, you will execute the steps/05_fahrenheit_to_celsius_udf/app.py
script. Like we did in the previous steps, we'll execute it from the terminal. The following commands assume you are already in the steps
directory of the repository.
cd 05_fahrenheit_to_celsius_udf
python fahrenheit_to_celsius_udf/function.py 35
While developing a UDF, you can simply run it locally. If your UDF doesn't need to query data from Snowflake, this process will be entirely local.
To deploy your UDF to Snowflake we will use the SnowCLI tool. The SnowCLI tool will do all the heavy lifting of packaging up your application, copying it to a Snowflake stage, and creating the object in Snowflake.
snow snowpark build
snow snowpark deploy
In order to run the UDF in Snowflake we have a few options. Any UDF in Snowflake can be invoked through SQL as follows:
SELECT ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF(35);
With the SnowCLI utility you can also invoke the UDF from the terminal in Codespaces as follows:
snow snowpark execute function "fahrenheit_to_celsius_udf(35)"
During this step we will be creating and deploying our first Snowpark Python stored procedure (or sproc) to Snowflake. This sproc will merge changes from the HARMONIZED.POS_FLATTENED_V_STREAM
stream into our target HARMONIZED.ORDERS
table. To put this in context, we are on step #6 in our data flow overview:
To test the procedure locally, you will execute the steps/06_orders_update_sp/app.py
script. These commands assume you are still in the directory from previous step so we will need to go back one directory.
cd ..
cd 06_orders_update_sp
python orders_update_sp/procedure.py
While we’re developing the sproc, the Python code will run locally on your laptop, but the Snowpark DataFrame code will issue SQL queries to your Snowflake account.
To deploy your sproc to Snowflake we will again use the SnowCLI tool.
snow snowpark build
snow snowpark deploy
In order to run the sproc in Snowflake we have a few options. Any sproc in Snowflake can be invoked through SQL as follows:
CALL HARMONIZED.ORDERS_UPDATE_SP();
With the SnowCLI utility you can also invoke the UDF from the terminal in Codespaces as follows:
snow snowpark execute procedure "harmonized.orders_update_sp()"
During this step we will be creating and deploying our second Snowpark Python sproc to Snowflake. This sproc will join the HARMONIZED.ORDERS
data with the Weather Source data to create a final, aggregated table for analysis named ANALYTICS.DAILY_CITY_METRICS
. We will process the data incrementally from the HARMONIZED.ORDERS
table using another Snowflake Stream. And we will again use the Snowpark DataFrame merge()
method to merge/upsert the data. To put this in context, we are on step #7 in our data flow overview:
To test the procedure locally, you will execute the steps/07_daily_city_metrics_update_sp/app.py script.
cd ..
cd 07_daily_city_metrics_update_sp
python daily_city_metrics_update_sp/procedure.py
To deploy your sproc to Snowflake we will again use the SnowCLI tool.
snow snowpark build
snow snowpark deploy
In order to run the sproc in Snowflake you have a few options. Any sproc in Snowflake can be invoked through SQL as follows:
CALL ANALYTICS.DAILY_CITY_METRICS_UPDATE_SP();
With the SnowCLI utility you can also invoke the UDF from the terminal in Codespaces as follows:
snow snowpark execute procedure "daily_city_metrics_update_sp()"
During this step we will be orchestrating our new Snowpark pipelines with Snowflake's native orchestration feature named Tasks. We will create two tasks, one for each stored procedure, and chain them together. We will then run the tasks. To put this in context, we are on step #8 in our data flow overview:
Since this is a SQL script we will be using the native Codespaces extension to execute it. Open the steps/08_orchestrate_jobs.sql
script in Codespaces and run the whole thing using the "Execute All Statements" button in the upper right corner of the editor window.
In this step we did not create a schedule for our task DAG, so it will not run on its own at this point. In this script you will notice that we manually execute the DAG, like this:
EXECUTE TASK ORDERS_UPDATE_TASK;
To see what happened when you ran this task just now, highlight and run (using CMD/CTRL+Enter) this commented query in the script:
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
;
During this step, we will be adding new data to our POS order tables, then running our entire end-to-end pipeline to process the new data. This entire pipeline will be processing data incrementally thanks to Snowflake's advanced stream/CDC capabilities. To put this in context, we are on step #9 in our data flow overview:
Since this is a SQL script we will be using our native Codespaces extension to execute it. Open the steps/09_process_incrementally.sql
script in Codespaces and run the whole thing using the "Execute All Statements" button in the upper right corner of the editor window.
Like in the previous step, to see what happened when you ran this task DAG, highlight and run (using CMD/CTRL+Enter) this commented query in the script:
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
;
During this step we will be making a change to our FAHRENHEIT_TO_CELSIUS_UDF()
UDF and then deploying it via a CI/CD pipeline. We will be updating the FAHRENHEIT_TO_CELSIUS_UDF()
UDF to use a third-party Python package, pushing it to your forked GitHub repo, and finally deploying it using the SnowCLI in a GitHub Actions workflow! To put this in context, we are on step #10 in our data flow overview:
We will be replacing our hard-coded temperature conversion with a package from scipy
. First we will make a few changes to the steps/05_fahrenheit_to_celsius_udf/fahrenheit_to_celsius_udf/function.py
script. In this file we will be adding an import
command and replacing the body of the main()
function. Open the steps/05_fahrenheit_to_celsius_udf/fahrenheit_to_celsius_udf/function.py
script in Codespaces and replace this section:
import sys
def main(temp_f: float) -> float:
return (float(temp_f) - 32) * (5/9)
With this:
import sys
from scipy.constants import convert_temperature
def main(temp_f: float) -> float:
return convert_temperature(float(temp_f), 'F', 'C')
The second change we need to make is to add scipy to our requirements.txt
file. Open the steps/05_fahrenheit_to_celsius_udf/requirements.txt
file in Codespaces, add a newline with scipy
on it, and save it.
To test the UDF locally, you will execute the steps/05_fahrenheit_to_celsius_udf/app.py
script.
cd ..
cd 05_fahrenheit_to_celsius_udf
pip install -r requirements.txt
python fahrenheit_to_celsius_udf/function.py 35
Notice that this time we're also running pip install to make sure that our dependent packages are installed. Once your function runs successfully we'll be ready to deploy it via CI/CD.
In order for your GitHub Actions workflow to be able to connect to your Snowflake account you will need to store your Snowflake credentials in GitHub. This should already have been done as part of the prerequisites. If you have not yet set up the secrets, follow the steps in the prerequistes guide to complete this step.
Now that we have changes ready and tested, and our Snowflake credentials stored in GitHub, let's commit them to our local repository and then push them to your forked repository. This can certainly be done from the command line, but in this step we'll do so through Codespaces to make it easy.
Start by opening the "Source Control" extension in the left hand nav bar, you should see two files with changes. Click the +
(plus) sign at the right of each file name to stage the changes. Then enter a message in the "Message" box and click the blue Commit
button to commit the changes locally.
At this point those changes are only committed locally and have not yet been pushed to your forked repository in GitHub. To do that, simply click the blue Sync Changes
button to push these commits to GitHub.
This repository is already set up with a very simple GitHub Actions CI/CD pipeline. You can review the code for the workflow by opening the .github/workflows/build_and_deploy.yaml
file in Codespaces.
Now that we've created a data pipline to combine POS data with Weather data, let's create a simple Streamlit application to visualize the data and see if we can identify any correlation that warrants further discovery by the Analytics, or Data Science team.
To deploy the application, we will use the streamlit run command:
cd ..
streamlit run 11_create_streamlit_app.py --server.enableCORS false --server.enableXsrfProtection false
Note: All of the packages for this application have already been installed in the Conda environment via the conda_env.yml. Additionally, the --server
arguments in the streamlit run command are specific to launching the streamlit app and using github's compute. If running streamlit locally, these are not necessary.
To stop the Streamlit App run:
ctrl + c
Now that you’re finished with the lab, you can simply run the steps/12_teardown.sql
script to tear down the Snowflake environment. Open the steps/12_teardown.sql
script in Codespaces and run the whole thing using the "Execute All Statements" button in the upper right corner of the editor window.
In this Lab we have built a robust data engineering pipeline using Snowpark Python stored procedures. This pipeline processes data incrementally, is orchestrated with Snowflake tasks, and is deployed via a CI/CD pipeline. You also learned how to use Snowflake's new developer CLI tool and Visual Studio Code/Codespaces extension! This lab has only just touched on what's possible with Snowpark. Hopefully, you now have the building blocks and examples that you need to get started building your own data engineering pipeline with Snowpark Python. So, what will you build next?
If you want to go through this lab again in a self-paced format, visit Data Engineering Pipelines with Snowpark Python. This course is hosted by phData using the content from Snowflake's Data Engineering Pipelines with Snowpark Python quickstart, with customized content from the BUILD.Local Snowpark series you attended.