Skip to content

Updating to UC and Streaming Tables #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
50 changes: 25 additions & 25 deletions 01-ingest-autoloader/01-data-ingestion.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # 1.1: Load the raw data using Databricks Autoloader
# MAGIC
# MAGIC
# MAGIC Our first step is to extract messages from external system into our Lakehouse.
# MAGIC
# MAGIC
# MAGIC This is typically done consuming a message queue (kafka), or files being uploaded in a blob storage in an incremental fashion.
# MAGIC
# MAGIC
# MAGIC We want to be able to ingest the new data so that our dbt pipeline can do the remaining steps.
# MAGIC
# MAGIC
# MAGIC In this example, we'll consume files from a blob storage. However we could easily have consume from any other system like a kafka queue.
# MAGIC
# MAGIC
# MAGIC We'll be using Databricks Autoloader (`cloudFile` format) to incrementally load new data and append them to our raw tables. Re-running this job will only consume new data, handling all schema inference, evolution and scalability for us.
# MAGIC
# MAGIC
# MAGIC For more details on Autoloader, install `dbdemos.install('auto-loader')`
# MAGIC
# MAGIC
# MAGIC <img width="1px" src="https://www.google-analytics.com/collect?v=1&gtm=GTM-NKQ8TT7&tid=UA-163989034-1&cid=555&aip=1&t=event&ec=field_demos&ea=display&dp=%2F42_field_demos%2Ffeatures%2Fdbt%2Fnotebook_01&dt=FEATURE_DBT" />

# COMMAND ----------
Expand All @@ -23,23 +23,23 @@
# COMMAND ----------

# DBTITLE 1,Incrementally ingest all folders
def incrementally_ingest_folder(path, format, table):
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", format)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}")
.load(path)
.writeStream
.format("delta")
.option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}")
.trigger(availableNow = True)
.outputMode("append")
.toTable(table))
# def incrementally_ingest_folder(path, format, table):
# (spark.readStream
# .format("cloudFiles")
# .option("cloudFiles.format", format)
# .option("cloudFiles.inferColumnTypes", "true")
# .option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}")
# .load(path)
# .writeStream
# .format("delta")
# .option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}")
# .trigger(availableNow = True)
# .outputMode("append")
# .toTable(table))

spark.sql('create database if not exists dbdemos;')
incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users')
incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders')
incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events')
# spark.sql('create database if not exists dbdemos;')
# incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users')
# incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders')
# incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events')

print('Congrats, our new data has been consumed and incrementally added to our bronze tables')
# print('Congrats, our new data has been consumed and incrementally added to our bronze tables')
19 changes: 15 additions & 4 deletions 01-ingest-autoloader/_resources/00-setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,20 @@

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC CREATE CATALOG IF NOT EXISTS `dbdemos`;
# MAGIC CREATE SCHEMA IF NOT EXISTS `dbdemos`.`dbt-retail`;
# MAGIC
# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`orders`;
# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`users`;
# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`events`

# COMMAND ----------

reset_all_data = dbutils.widgets.get("reset_all_data") == "true"
raw_data_location = "dbdemos/dbt-retail"
folder = "/dbdemos/dbt-retail"
raw_data_location = "Volumes/dbdemos/dbt-retail"
folder = "/Volumes/dbdemos/dbt-retail"

#Return true if the folder is empty or does not exists
def is_folder_empty(folder):
Expand All @@ -16,11 +27,11 @@ def is_folder_empty(folder):

if reset_all_data or is_folder_empty(folder+"/orders") or is_folder_empty(folder+"/users") or is_folder_empty(folder+"/events"):
#data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env)
print(f"Generating data under {folder} , please wait a few sec...")
print(f"Generating data under {folder} , please wait a few secs...")
path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
parent_count = path[path.rfind("01-ingest-autoloader"):].count('/') - 1
prefix = "./" if parent_count == 0 else parent_count*"../"
prefix = f'{prefix}_resources/'
dbutils.notebook.run(prefix+"01-load-data", 600)
else:
print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.")
print("Data already exists. Run with reset_all_data=true to force a data cleanup for your local demo.")
2 changes: 1 addition & 1 deletion 01-ingest-autoloader/_resources/01-load-data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def cleanup_folder(path):

# COMMAND ----------

folder = "/dbdemos/dbt-retail"
folder = "/Volumes/dbdemos/dbt-retail"
if reset_all_data:
print("resetting all data...")
if folder.count('/') > 2:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

This content demo how Databricks can run dbt pipelines, integrated with Databricks Workflow.

This demo replicate the Delta Live Table (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')`
This demo replicate the Delta Live Tables (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')`

## Running dbt on Databricks

Expand Down Expand Up @@ -87,7 +87,7 @@ This demo is broken up into the following building blocks. View the sub-folders
### Feedback
---
Got comments and feedback? <br/>
Feel free to reach out to ```mendelsohn.chan@databricks.com``` or ```quentin.ambard.databricks.com```
Feel free to reach out to ```shabbir.khanbhai@databricks.com``` or ```quentin.ambard.databricks.com```



Expand Down
8 changes: 8 additions & 0 deletions models/dbt_c360_bronze_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{
config(
materialized='streaming_table'
)
}}
select
*
from stream read_files('/Volumes/dbdemos/dbt-retail/events', format=>'csv')
8 changes: 8 additions & 0 deletions models/dbt_c360_bronze_orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{
config(
materialized='streaming_table'
)
}}
select
*
from stream read_files('/Volumes/dbdemos/dbt-retail/orders', format=>'json')
8 changes: 8 additions & 0 deletions models/dbt_c360_bronze_users.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{
config(
materialized='streaming_table'
)
}}
select
*
from stream read_files('/Volumes/dbdemos/dbt-retail/users', format=>'json')
2 changes: 1 addition & 1 deletion models/dbt_c360_silver_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ select
platform,
action,
url
from dbdemos.dbt_c360_bronze_events
from {{ ref('dbt_c360_bronze_events') }}
2 changes: 1 addition & 1 deletion models/dbt_c360_silver_orders.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ select
user_id,
cast(item_count as int),
to_timestamp(transaction_date, "MM-dd-yyyy HH:mm:ss") as creation_date
from dbdemos.dbt_c360_bronze_orders
from {{ ref('dbt_c360_bronze_orders') }}
2 changes: 1 addition & 1 deletion models/dbt_c360_silver_users.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ select
cast(gender as int),
cast(age_group as int),
cast(churn as int) as churn
from dbdemos.dbt_c360_bronze_users
from {{ ref('dbt_c360_bronze_users') }}
6 changes: 3 additions & 3 deletions profiles.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
dbdemos_dbt_c360:
target: local
outputs:
#run DBT locally from your IDEA and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup)
#run DBT locally from your IDE and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup)
#Make sure you have pip install dbt-databricks in your local env
#Run the project locally with:
#DBT_DATABRICKS_HOST=xxx.cloud.databricks.com DBT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/xxxx DBT_DATABRICKS_TOKEN=dapixxxx dbt run
local:
type: databricks
catalog: hive_metastore
schema: dbdemos
catalog: dbdemos
schema: dbt-retail
host: "{{ env_var('DBT_DATABRICKS_HOST') }}"
http_path: "{{ env_var('DBT_DATABRICKS_HTTP_PATH') }}" #SQL warehouse Connection details
token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" # Personal Access Token (PAT)
Expand Down