diff --git a/01-ingest-autoloader/01-data-ingestion.py b/01-ingest-autoloader/01-data-ingestion.py index a6eccad..28dc97e 100644 --- a/01-ingest-autoloader/01-data-ingestion.py +++ b/01-ingest-autoloader/01-data-ingestion.py @@ -1,19 +1,19 @@ # Databricks notebook source # MAGIC %md # MAGIC # 1.1: Load the raw data using Databricks Autoloader -# MAGIC +# MAGIC # MAGIC Our first step is to extract messages from external system into our Lakehouse. -# MAGIC +# MAGIC # MAGIC This is typically done consuming a message queue (kafka), or files being uploaded in a blob storage in an incremental fashion. -# MAGIC +# MAGIC # MAGIC We want to be able to ingest the new data so that our dbt pipeline can do the remaining steps. -# MAGIC +# MAGIC # MAGIC In this example, we'll consume files from a blob storage. However we could easily have consume from any other system like a kafka queue. -# MAGIC +# MAGIC # MAGIC We'll be using Databricks Autoloader (`cloudFile` format) to incrementally load new data and append them to our raw tables. Re-running this job will only consume new data, handling all schema inference, evolution and scalability for us. -# MAGIC +# MAGIC # MAGIC For more details on Autoloader, install `dbdemos.install('auto-loader')` -# MAGIC +# MAGIC # MAGIC # COMMAND ---------- @@ -23,23 +23,23 @@ # COMMAND ---------- # DBTITLE 1,Incrementally ingest all folders -def incrementally_ingest_folder(path, format, table): - (spark.readStream - .format("cloudFiles") - .option("cloudFiles.format", format) - .option("cloudFiles.inferColumnTypes", "true") - .option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}") - .load(path) - .writeStream - .format("delta") - .option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}") - .trigger(availableNow = True) - .outputMode("append") - .toTable(table)) +# def incrementally_ingest_folder(path, format, table): +# (spark.readStream +# .format("cloudFiles") +# .option("cloudFiles.format", format) +# .option("cloudFiles.inferColumnTypes", "true") +# .option("cloudFiles.schemaLocation", f"/dbdemos/dbt-retail/_schemas/{table}") +# .load(path) +# .writeStream +# .format("delta") +# .option("checkpointLocation", f"/dbdemos/dbt-retail/_checkpoints/{table}") +# .trigger(availableNow = True) +# .outputMode("append") +# .toTable(table)) -spark.sql('create database if not exists dbdemos;') -incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users') -incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders') -incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events') +# spark.sql('create database if not exists dbdemos;') +# incrementally_ingest_folder('/dbdemos/dbt-retail/users', 'json', 'dbdemos.dbt_c360_bronze_users') +# incrementally_ingest_folder('/dbdemos/dbt-retail/orders', 'json', 'dbdemos.dbt_c360_bronze_orders') +# incrementally_ingest_folder('/dbdemos/dbt-retail/events', 'csv', 'dbdemos.dbt_c360_bronze_events') -print('Congrats, our new data has been consumed and incrementally added to our bronze tables') +# print('Congrats, our new data has been consumed and incrementally added to our bronze tables') diff --git a/01-ingest-autoloader/_resources/00-setup.py b/01-ingest-autoloader/_resources/00-setup.py index 8d309d5..8d196ec 100644 --- a/01-ingest-autoloader/_resources/00-setup.py +++ b/01-ingest-autoloader/_resources/00-setup.py @@ -3,9 +3,20 @@ # COMMAND ---------- +# MAGIC %sql +# MAGIC +# MAGIC CREATE CATALOG IF NOT EXISTS `dbdemos`; +# MAGIC CREATE SCHEMA IF NOT EXISTS `dbdemos`.`dbt-retail`; +# MAGIC +# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`orders`; +# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`users`; +# MAGIC CREATE VOLUME IF NOT EXISTS `dbdemos`.`dbt-retail`.`events` + +# COMMAND ---------- + reset_all_data = dbutils.widgets.get("reset_all_data") == "true" -raw_data_location = "dbdemos/dbt-retail" -folder = "/dbdemos/dbt-retail" +raw_data_location = "Volumes/dbdemos/dbt-retail" +folder = "/Volumes/dbdemos/dbt-retail" #Return true if the folder is empty or does not exists def is_folder_empty(folder): @@ -16,11 +27,11 @@ def is_folder_empty(folder): if reset_all_data or is_folder_empty(folder+"/orders") or is_folder_empty(folder+"/users") or is_folder_empty(folder+"/events"): #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) - print(f"Generating data under {folder} , please wait a few sec...") + print(f"Generating data under {folder} , please wait a few secs...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() parent_count = path[path.rfind("01-ingest-autoloader"):].count('/') - 1 prefix = "./" if parent_count == 0 else parent_count*"../" prefix = f'{prefix}_resources/' dbutils.notebook.run(prefix+"01-load-data", 600) else: - print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.") + print("Data already exists. Run with reset_all_data=true to force a data cleanup for your local demo.") diff --git a/01-ingest-autoloader/_resources/01-load-data.py b/01-ingest-autoloader/_resources/01-load-data.py index 6b5b07b..b5da66e 100644 --- a/01-ingest-autoloader/_resources/01-load-data.py +++ b/01-ingest-autoloader/_resources/01-load-data.py @@ -21,7 +21,7 @@ def cleanup_folder(path): # COMMAND ---------- -folder = "/dbdemos/dbt-retail" +folder = "/Volumes/dbdemos/dbt-retail" if reset_all_data: print("resetting all data...") if folder.count('/') > 2: diff --git a/README.md b/README.md index 1e0697c..2456a5c 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ This content demo how Databricks can run dbt pipelines, integrated with Databricks Workflow. -This demo replicate the Delta Live Table (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')` +This demo replicate the Delta Live Tables (DLT) pipeline in the lakehouse c360 databricks demo available in `dbdemos.install('lakehouse-retail-c360')` ## Running dbt on Databricks @@ -87,7 +87,7 @@ This demo is broken up into the following building blocks. View the sub-folders ### Feedback --- Got comments and feedback?
-Feel free to reach out to ```mendelsohn.chan@databricks.com``` or ```quentin.ambard.databricks.com``` +Feel free to reach out to ```shabbir.khanbhai@databricks.com``` or ```quentin.ambard.databricks.com``` diff --git a/models/dbt_c360_bronze_events.sql b/models/dbt_c360_bronze_events.sql new file mode 100644 index 0000000..85baa33 --- /dev/null +++ b/models/dbt_c360_bronze_events.sql @@ -0,0 +1,8 @@ +{{ + config( + materialized='streaming_table' + ) +}} +select + * +from stream read_files('/Volumes/dbdemos/dbt-retail/events', format=>'csv') \ No newline at end of file diff --git a/models/dbt_c360_bronze_orders.sql b/models/dbt_c360_bronze_orders.sql new file mode 100644 index 0000000..4318818 --- /dev/null +++ b/models/dbt_c360_bronze_orders.sql @@ -0,0 +1,8 @@ +{{ + config( + materialized='streaming_table' + ) +}} +select + * +from stream read_files('/Volumes/dbdemos/dbt-retail/orders', format=>'json') diff --git a/models/dbt_c360_bronze_users.sql b/models/dbt_c360_bronze_users.sql new file mode 100644 index 0000000..a85be81 --- /dev/null +++ b/models/dbt_c360_bronze_users.sql @@ -0,0 +1,8 @@ +{{ + config( + materialized='streaming_table' + ) +}} +select + * +from stream read_files('/Volumes/dbdemos/dbt-retail/users', format=>'json') \ No newline at end of file diff --git a/models/dbt_c360_silver_events.sql b/models/dbt_c360_silver_events.sql index 6fc4b86..eb0f110 100644 --- a/models/dbt_c360_silver_events.sql +++ b/models/dbt_c360_silver_events.sql @@ -10,4 +10,4 @@ select platform, action, url -from dbdemos.dbt_c360_bronze_events \ No newline at end of file +from {{ ref('dbt_c360_bronze_events') }} \ No newline at end of file diff --git a/models/dbt_c360_silver_orders.sql b/models/dbt_c360_silver_orders.sql index 6af2716..98ba569 100644 --- a/models/dbt_c360_silver_orders.sql +++ b/models/dbt_c360_silver_orders.sql @@ -9,4 +9,4 @@ select user_id, cast(item_count as int), to_timestamp(transaction_date, "MM-dd-yyyy HH:mm:ss") as creation_date -from dbdemos.dbt_c360_bronze_orders \ No newline at end of file +from {{ ref('dbt_c360_bronze_orders') }} \ No newline at end of file diff --git a/models/dbt_c360_silver_users.sql b/models/dbt_c360_silver_users.sql index f01d12b..f5cbde3 100644 --- a/models/dbt_c360_silver_users.sql +++ b/models/dbt_c360_silver_users.sql @@ -16,4 +16,4 @@ select cast(gender as int), cast(age_group as int), cast(churn as int) as churn -from dbdemos.dbt_c360_bronze_users \ No newline at end of file +from {{ ref('dbt_c360_bronze_users') }} \ No newline at end of file diff --git a/profiles.yml b/profiles.yml index 3fcf14c..061c5f9 100644 --- a/profiles.yml +++ b/profiles.yml @@ -1,14 +1,14 @@ dbdemos_dbt_c360: target: local outputs: - #run DBT locally from your IDEA and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup) + #run DBT locally from your IDE and execute on a SQL warehouse (https://docs.getdbt.com/reference/warehouse-setups/databricks-setup) #Make sure you have pip install dbt-databricks in your local env #Run the project locally with: #DBT_DATABRICKS_HOST=xxx.cloud.databricks.com DBT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/xxxx DBT_DATABRICKS_TOKEN=dapixxxx dbt run local: type: databricks - catalog: hive_metastore - schema: dbdemos + catalog: dbdemos + schema: dbt-retail host: "{{ env_var('DBT_DATABRICKS_HOST') }}" http_path: "{{ env_var('DBT_DATABRICKS_HTTP_PATH') }}" #SQL warehouse Connection details token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" # Personal Access Token (PAT)