diff --git a/01-ingest-autoloader/01-data-ingestion.py b/01-ingest-autoloader/01-data-ingestion.py
index a6eccad..28dc97e 100644
--- a/01-ingest-autoloader/01-data-ingestion.py
+++ b/01-ingest-autoloader/01-data-ingestion.py
@@ -1,19 +1,19 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # 1.1: Load the raw data using Databricks Autoloader
-# MAGIC
+# MAGIC
# MAGIC Our first step is to extract messages from external system into our Lakehouse.
-# MAGIC
+# MAGIC
# MAGIC This is typically done consuming a message queue (kafka), or files being uploaded in a blob storage in an incremental fashion.
-# MAGIC
+# MAGIC
# MAGIC We want to be able to ingest the new data so that our dbt pipeline can do the remaining steps.
-# MAGIC
+# MAGIC
# MAGIC In this example, we'll consume files from a blob storage. However we could easily have consume from any other system like a kafka queue.
-# MAGIC
+# MAGIC
# MAGIC We'll be using Databricks Autoloader (`cloudFile` format) to incrementally load new data and append them to our raw tables. Re-running this job will only consume new data, handling all schema inference, evolution and scalability for us.
-# MAGIC
+# MAGIC
# MAGIC For more details on Autoloader, install `dbdemos.install('auto-loader')`
-# MAGIC
+# MAGIC
# MAGIC
# COMMAND ----------
@@ -23,23 +23,23 @@
# COMMAND ----------
# DBTITLE 1,Incrementally ingest all folders
-def incrementally_ingest_folder(path, format, table):
- (spark.readStream
- .format("cloudFiles")
- .option("cloudFiles.format", format)
- .option("cloudFiles.inferColumnTypes", "true")
- .option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}")
- .load(path)
- .writeStream
- .format("delta")
- .option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}")
- .trigger(availableNow = True)
- .outputMode("append")
- .toTable(table))
+# def incrementally_ingest_folder(path, format, table):
+# (spark.readStream
+# .format("cloudFiles")
+# .option("cloudFiles.format", format)
+# .option("cloudFiles.inferColumnTypes", "true")
+# .option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}")
+# .load(path)
+# .writeStream
+# .format("delta")
+# .option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}")
+# .trigger(availableNow = True)
+# .outputMode("append")
+# .toTable(table))
-spark.sql('create database if not exists dbdemos;')
-incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users')
-incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders')
-incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events')
+# spark.sql('create database if not exists dbdemos;')
+# incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users')
+# incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders')
+# incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events')
-print('Congrats, our new data has been consumed and incrementally added to our bronze tables')
+# print('Congrats, our new data has been consumed and incrementally added to our bronze tables')
diff --git a/01-ingest-autoloader/_resources/00-setup.py b/01-ingest-autoloader/_resources/00-setup.py
index 8d309d5..8d196ec 100644
--- a/01-ingest-autoloader/_resources/00-setup.py
+++ b/01-ingest-autoloader/_resources/00-setup.py
@@ -3,9 +3,20 @@
# COMMAND ----------
+# MAGIC %sql
+# MAGIC
+# MAGIC CREATE CATALOG IF NOT EXISTS `dbdemos`;
+# MAGIC CREATE SCHEMA IF NOT EXISTS `dbdemos`.`dbt-retail`;
+# MAGIC
+# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`orders`;
+# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`users`;
+# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`events`
+
+# COMMAND ----------
+
reset_all_data = dbutils.widgets.get("reset_all_data") == "true"
-raw_data_location = "dbdemos/dbt-retail"
-folder = "/dbdemos/dbt-retail"
+raw_data_location = "Volumes/dbdemos/dbt-retail"
+folder = "/Volumes/dbdemos/dbt-retail"
#Return true if the folder is empty or does not exists
def is_folder_empty(folder):
@@ -16,11 +27,11 @@ def is_folder_empty(folder):
if reset_all_data or is_folder_empty(folder+"/orders") or is_folder_empty(folder+"/users") or is_folder_empty(folder+"/events"):
#data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env)
- print(f"Generating data under {folder} , please wait a few sec...")
+ print(f"Generating data under {folder} , please wait a few secs...")
path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
parent_count = path[path.rfind("01-ingest-autoloader"):].count('/') - 1
prefix = "./" if parent_count == 0 else parent_count*"../"
prefix = f'{prefix}_resources/'
dbutils.notebook.run(prefix+"01-load-data", 600)
else:
- print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.")
+ print("Data already exists. Run with reset_all_data=true to force a data cleanup for your local demo.")
diff --git a/01-ingest-autoloader/_resources/01-load-data.py b/01-ingest-autoloader/_resources/01-load-data.py
index 6b5b07b..b5da66e 100644
--- a/01-ingest-autoloader/_resources/01-load-data.py
+++ b/01-ingest-autoloader/_resources/01-load-data.py
@@ -21,7 +21,7 @@ def cleanup_folder(path):
# COMMAND ----------
-folder = "/dbdemos/dbt-retail"
+folder = "/Volumes/dbdemos/dbt-retail"
if reset_all_data:
print("resetting all data...")
if folder.count('/') > 2:
diff --git a/README.md b/README.md
index 1e0697c..2456a5c 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
This content demo how Databricks can run dbt pipelines, integrated with Databricks Workflow.
-This demo replicate the Delta Live Table (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')`
+This demo replicate the Delta Live Tables (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')`
## Running dbt on Databricks
@@ -87,7 +87,7 @@ This demo is broken up into the following building blocks. View the sub-folders
### Feedback
---
Got comments and feedback?
-Feel free to reach out to ```mendelsohn.chan@databricks.com``` or ```quentin.ambard.databricks.com```
+Feel free to reach out to ```shabbir.khanbhai@databricks.com``` or ```quentin.ambard.databricks.com```
diff --git a/models/dbt_c360_bronze_events.sql b/models/dbt_c360_bronze_events.sql
new file mode 100644
index 0000000..85baa33
--- /dev/null
+++ b/models/dbt_c360_bronze_events.sql
@@ -0,0 +1,8 @@
+{{
+ config(
+ materialized='streaming_table'
+ )
+}}
+select
+ *
+from stream read_files('/Volumes/dbdemos/dbt-retail/events', format=>'csv')
\ No newline at end of file
diff --git a/models/dbt_c360_bronze_orders.sql b/models/dbt_c360_bronze_orders.sql
new file mode 100644
index 0000000..4318818
--- /dev/null
+++ b/models/dbt_c360_bronze_orders.sql
@@ -0,0 +1,8 @@
+{{
+ config(
+ materialized='streaming_table'
+ )
+}}
+select
+ *
+from stream read_files('/Volumes/dbdemos/dbt-retail/orders', format=>'json')
diff --git a/models/dbt_c360_bronze_users.sql b/models/dbt_c360_bronze_users.sql
new file mode 100644
index 0000000..a85be81
--- /dev/null
+++ b/models/dbt_c360_bronze_users.sql
@@ -0,0 +1,8 @@
+{{
+ config(
+ materialized='streaming_table'
+ )
+}}
+select
+ *
+from stream read_files('/Volumes/dbdemos/dbt-retail/users', format=>'json')
\ No newline at end of file
diff --git a/models/dbt_c360_silver_events.sql b/models/dbt_c360_silver_events.sql
index 6fc4b86..eb0f110 100644
--- a/models/dbt_c360_silver_events.sql
+++ b/models/dbt_c360_silver_events.sql
@@ -10,4 +10,4 @@ select
platform,
action,
url
-from dbdemos.dbt_c360_bronze_events
\ No newline at end of file
+from {{ ref('dbt_c360_bronze_events') }}
\ No newline at end of file
diff --git a/models/dbt_c360_silver_orders.sql b/models/dbt_c360_silver_orders.sql
index 6af2716..98ba569 100644
--- a/models/dbt_c360_silver_orders.sql
+++ b/models/dbt_c360_silver_orders.sql
@@ -9,4 +9,4 @@ select
user_id,
cast(item_count as int),
to_timestamp(transaction_date, "MM-dd-yyyy HH:mm:ss") as creation_date
-from dbdemos.dbt_c360_bronze_orders
\ No newline at end of file
+from {{ ref('dbt_c360_bronze_orders') }}
\ No newline at end of file
diff --git a/models/dbt_c360_silver_users.sql b/models/dbt_c360_silver_users.sql
index f01d12b..f5cbde3 100644
--- a/models/dbt_c360_silver_users.sql
+++ b/models/dbt_c360_silver_users.sql
@@ -16,4 +16,4 @@ select
cast(gender as int),
cast(age_group as int),
cast(churn as int) as churn
-from dbdemos.dbt_c360_bronze_users
\ No newline at end of file
+from {{ ref('dbt_c360_bronze_users') }}
\ No newline at end of file
diff --git a/profiles.yml b/profiles.yml
index 3fcf14c..061c5f9 100644
--- a/profiles.yml
+++ b/profiles.yml
@@ -1,14 +1,14 @@
dbdemos_dbt_c360:
target: local
outputs:
- #run DBT locally from your IDEA and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup)
+ #run DBT locally from your IDE and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup)
#Make sure you have pip install dbt-databricks in your local env
#Run the project locally with:
#DBT_DATABRICKS_HOST=xxx.cloud.databricks.com DBT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/xxxx DBT_DATABRICKS_TOKEN=dapixxxx dbt run
local:
type: databricks
- catalog: hive_metastore
- schema: dbdemos
+ catalog: dbdemos
+ schema: dbt-retail
host: "{{ env_var('DBT_DATABRICKS_HOST') }}"
http_path: "{{ env_var('DBT_DATABRICKS_HTTP_PATH') }}" #SQL warehouse Connection details
token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" # Personal Access Token (PAT)