Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data contextualization using sql graph to single_tech_samples #619

Merged
merged 9 commits into from
Nov 12, 2024
1 change: 1 addition & 0 deletions archive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Here is a list of the archived samples:
- [Azure Databricks - Basic IaC](./../archive/single_tech_samples/databricks_basic_azure_databricks_environment/README.md)
- [Azure Databricks - Data exfiltration](./../archive/single_tech_samples/databricks_enterprise_azure_databricks_environment/README.md)
- [Azure Databricks - IaC cluster provisioning](./../archive/single_tech_samples/databricks_cluster_provisioning_and_data_access/README.md)
- [Azure Databricks - Data Contextualization based on SQL graph](./../archive/single_tech_samples/databricks_data_contextualization_sql_graph/README.md)
- [Azure Data Factory - CI/CD](./../archive/single_tech_samples/adf_cicd/README.md)
- [Azure Data Share](./../archive/single_tech_samples/datashare_automated_data_sharing/README.md)
- [Microsoft Purview IaC](./../archive/single_tech_samples/purview_iac/README.md)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Introduction

Data contextualization is a process to put related information together to make the data more useful and easier to digest and interpret. This sample code demonstrates how to contextualize data by looking up the relevant context from a graph model in Azure SQL Database.

## Getting Started

Take the following steps to run the sample solution:

1. If you don't have an Azure account, create one for free [here](https://azure.microsoft.com/en-us/free/).
2. Create [Azure Databricks workspace](https://learn.microsoft.com/en-us/azure/databricks/getting-started/).
3. Create a single database - [Azure SQL Database](https://learn.microsoft.com/en-us/azure/azure-sql/database/single-database-create-quickstart?view=azuresql&tabs=azure-portal)
4. Create Graph tables
- Enter the following query in the Query editor pane.

![Create-SQL-graph](./images/Create-SQL-graph.png)
- Enter the [Create Graph Table Script](./src/sql/create-graph.sql) in the Query editor pane.

![Create-SQL-graph](./images/Run-SQL-graph.png)
5. Import notebooks into Azure Databricks
- Select import button from the menu.

![import](./images/import_databricks.png)
- select notebook sample code to upload
- [demo-setup.py](./src/notebooks/demo-setup.py)
- [demo-contextualization.py](./src/notebooks/demo-contextualizaion.py)
- Please replace your database connection information

![import](./images/demo-databricks-notebook-01.png)
- Run demo-setup notebook
- Run demo-contextualization notebook
- If you successfully run the notebook, you will see contextualized tables in the database.

![import](./images/contextualizedTable.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Databricks notebook source
jdbcUsername = "<Username>"
jdbcPassword = "<Password>"
jdbcHostname = "<Hostname>.database.windows.net"
jdbcPort = 1433
jdbcDatabase ="<DatabaseName>"

jdbc_url = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};user={jdbcUsername};password={jdbcPassword};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;"

pushdown_query = "(SELECT a.Alarm_Type, b.Asset_ID\
FROM Alarm a, belongs_to, Asset b, is_associated_with, Quality_System c\
WHERE MATCH (a-(belongs_to)->c-(is_associated_with)->b)) as new_tbl"

ala2ass = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", pushdown_query).load()


# COMMAND ----------

display(ala2ass)

# COMMAND ----------

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql.functions import *
def save_df(df, table_name):
df = df.withColumn("valid_from", current_timestamp())
df = df.withColumn("valid_till", to_timestamp(lit("2099-12-31 23:59:59.0000") ,"yyyy-MM-dd HH:mm:ss.SSSS"))
df.write.mode("overwrite").format("delta").option("path", "/mnt/example/reference/{}".format(table_name)).option("overwriteSchema", "true").saveAsTable(table_name)

# COMMAND ----------

save_df(ala2ass, "ala2ass")

# COMMAND ----------

# MAGIC %sql
# MAGIC select *
# MAGIC from tbl_alarm_master

# COMMAND ----------

def process_data(system):
# Get last_commit_version in table_commit_version for the data source table
last_commit_version = spark.sql(f"select max(last_commit_version) as last_commit_version from table_commit_version where table_name='{system}'").collect()[0].asDict()['last_commit_version']
# Get the max(_commit_version) from the table_changes
max_commit_version = spark.sql(f"select max(_commit_version) as max_commit_version from table_changes('{system}',1)").collect()[0].asDict()['max_commit_version']

# Query and process the newly added data since the last_commit_version
df_tlb_change = spark.sql(f"select * from table_changes('{system}',{last_commit_version})")

if(last_commit_version == max_commit_version + 1):
return None

df = spark.sql(f"select raw.alarm_id, raw.alarm_type, raw.alarm_desc, raw.valid_from, raw.valid_till,a.asset_id context_asset from table_changes('{system}',{last_commit_version}) raw left join ala2ass a on raw.alarm_type = a.alarm_type")

max_commit_version = max_commit_version + 1
# Update last_commit_version in table_commit_version for the data source table
spark.sql(f"update table_commit_version set last_commit_version={max_commit_version} where table_name='{system}'")

return df, df_tlb_change

# COMMAND ----------

df_alarm_master, df_tlb_change = process_data('tbl_alarm_master')

# COMMAND ----------

display(df_alarm_master)

# COMMAND ----------

display(df_tlb_change)

# COMMAND ----------

df_alarm_master.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "Tbl_Alarm_Master") \
.mode("append") \
.save()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Databricks notebook source
dbutils.fs.rm("/mnt/example/raw/tbl_alarm_master", True)
dbutils.fs.rm("/mnt/example/table_commit_version", True)

# COMMAND ----------

# MAGIC %sql
# MAGIC drop table if exists tbl_alarm_master;
# MAGIC drop table if exists table_commit_version;

# COMMAND ----------

# MAGIC %sql
# MAGIC CREATE TABLE tbl_alarm_master (alarm_id INT, alarm_type STRING, alarm_desc STRING, valid_from TIMESTAMP, valid_till TIMESTAMP)
# MAGIC USING DELTA
# MAGIC LOCATION '/mnt/example/raw/tbl_alarm_master'
# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true)

# COMMAND ----------

# MAGIC %sql
# MAGIC INSERT INTO tbl_alarm_master VALUES (1, "Carbon Monoxide Warning", "TAG_1", "2023-01-01 00:00:00.0000", "2999-12-31 23:59:59.0000"),
# MAGIC (2, "Fire Warning", "TAG_2", "2023-01-01 00:00:00.0001", "2999-12-31 23:59:59.0000"),
# MAGIC (3, "Flood Warning", "TAG_3", "2023-01-01 00:00:00.0002", "2999-12-31 23:59:59.0000")

# COMMAND ----------

# MAGIC %sql
# MAGIC CREATE TABLE table_commit_version (table_name STRING, last_commit_version LONG, updated_at TIMESTAMP)
# MAGIC USING DELTA
# MAGIC LOCATION '/mnt/example/table_commit_version'

# COMMAND ----------

# MAGIC %sql
# MAGIC INSERT INTO table_commit_version VALUES('tbl_alarm_master', 1, current_timestamp())

# COMMAND ----------

# MAGIC %sql
# MAGIC select * from table_changes('tbl_alarm_master',1)

# COMMAND ----------

# MAGIC %sql
# MAGIC INSERT INTO tbl_alarm_master VALUES (4, "Flood Warning", "TAG_4", "2023-01-01 00:00:00.0000", "2999-12-31 23:59:59.0000");
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
DROP TABLE IF EXISTS Alarm;
DROP TABLE IF EXISTS Asset;
DROP TABLE IF EXISTS Quality_System;

CREATE TABLE Alarm(ID INTEGER PRIMARY KEY, Alarm_Type VARCHAR(100)) AS NODE;
CREATE TABLE Asset (ID INTEGER PRIMARY KEY, Asset_ID VARCHAR(100)) AS NODE;
CREATE TABLE Quality_System (ID INTEGER PRIMARY KEY, Quality_ID VARCHAR(100)) AS NODE;

INSERT INTO Alarm (ID, Alarm_Type)
VALUES (1, 'Fire Warning'),
(2, 'Flood Warning'),
(3, 'Carbon Monoxide Warning');

INSERT INTO Asset (ID, Asset_ID)
VALUES (1, 'AE0520'),
(2, 'AE0530'),
(3, 'AE0690');

INSERT INTO Quality_System (ID, Quality_ID)
VALUES (1, 'MA_0520_001'),
(2, 'MA_0530_002'),
(3, 'MA_0690_003');

DROP TABLE IF EXISTS belongs_to;
CREATE TABLE belongs_to AS EDGE;


INSERT INTO [dbo].[belongs_to]
VALUES ((SELECT $node_id FROM Alarm WHERE ID = '1'), (SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0520_001')),
((SELECT $node_id FROM Alarm WHERE ID = '2'), (SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0530_002')),
((SELECT $node_id FROM Alarm WHERE ID = '3'), (SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0690_003'));

DROP TABLE IF EXISTS is_associated_with;
CREATE TABLE is_associated_with AS EDGE;

INSERT INTO [dbo].[is_associated_with]
VALUES ((SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0520_001'), (SELECT $node_id FROM Asset WHERE ID = '1')),
((SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0530_002'), (SELECT $node_id FROM Asset WHERE ID = '2')),
((SELECT $node_id FROM Quality_System WHERE Quality_ID = 'MA_0690_003'), (SELECT $node_id FROM Asset WHERE ID = '3'));
Loading