Skip to content

Commit

Permalink
feat(ingest/mssql): populate dataTransformLogic aspect for stored pro…
Browse files Browse the repository at this point in the history
…cs (#12244)
  • Loading branch information
hsheth2 authored Jan 22, 2025
1 parent bb4d6bc commit ac44f4a
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class MSSQLDataJob:
entity: Union[StoredProcedure, JobStep]
type: str = "dataJob"
source: str = "mssql"
external_url: str = ""
external_url: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
incoming: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -228,7 +228,7 @@ class MSSQLDataFlow:
entity: Union[MSSQLJob, MSSQLProceduresContainer]
type: str = "dataFlow"
source: str = "mssql"
external_url: str = ""
external_url: Optional[str] = None
flow_properties: Dict[str, str] = field(default_factory=dict)

def add_property(
Expand Down
37 changes: 26 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import ProgrammingError, ResourceClosedError

import datahub.metadata.schema_classes as models
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -49,21 +50,15 @@
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.metadata.schema_classes import (
BooleanTypeClass,
NumberTypeClass,
StringTypeClass,
UnionTypeClass,
)
from datahub.utilities.file_backed_collections import FileBackedList

logger: logging.Logger = logging.getLogger(__name__)

register_custom_type(sqlalchemy.dialects.mssql.BIT, BooleanTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.MONEY, NumberTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.SMALLMONEY, NumberTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.SQL_VARIANT, UnionTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.UNIQUEIDENTIFIER, StringTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.BIT, models.BooleanTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.MONEY, models.NumberTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.SMALLMONEY, models.NumberTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.SQL_VARIANT, models.UnionTypeClass)
register_custom_type(sqlalchemy.dialects.mssql.UNIQUEIDENTIFIER, models.StringTypeClass)


class SQLServerConfig(BasicSQLAlchemyConfig):
Expand Down Expand Up @@ -651,6 +646,26 @@ def construct_job_workunits(
entityUrn=data_job.urn,
aspect=data_job.as_datajob_input_output_aspect,
).as_workunit()

if (
self.config.include_stored_procedures_code
and isinstance(data_job.entity, StoredProcedure)
and data_job.entity.code is not None
):
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=models.DataTransformLogicClass(
transforms=[
models.DataTransformClass(
queryStatement=models.QueryStatementClass(
value=data_job.entity.code,
language=models.QueryLanguageClass.SQL,
),
)
]
),
).as_workunit()

# TODO: Add SubType when it appear

def construct_flow_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "",
"name": "Weekly Demo Data Backup"
}
},
Expand All @@ -113,17 +112,16 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "a06cfdca-b65e-42de-8db2-8c21c183c5dd",
"job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-12-26 12:03:35.420000",
"date_modified": "2024-12-26 12:03:35.590000",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
"command": "ALTER DATABASE DemoData SET READ_ONLY"
},
"externalUrl": "",
"name": "Weekly Demo Data Backup",
"type": {
"string": "MSSQL_JOB_STEP"
Expand Down Expand Up @@ -2259,7 +2257,6 @@
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "",
"name": "DemoData.Foo.stored_procedures"
}
},
Expand All @@ -2282,10 +2279,9 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-12-26 12:03:35.230000",
"date_modified": "2024-12-26 12:03:35.230000"
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
},
"externalUrl": "",
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
"string": "MSSQL_STORED_PROCEDURE"
Expand All @@ -2298,6 +2294,29 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
Expand All @@ -2310,10 +2329,9 @@
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2024-12-26 12:03:35.237000",
"date_modified": "2024-12-26 12:03:35.237000"
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
},
"externalUrl": "",
"name": "DemoData.Foo.NewProc",
"type": {
"string": "MSSQL_STORED_PROCEDURE"
Expand All @@ -2326,6 +2344,29 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
Expand Down Expand Up @@ -4928,7 +4969,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735214618898,
"time": 1735588784503,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down Expand Up @@ -5051,7 +5092,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735214618906,
"time": 1735588784511,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "",
"name": "Weekly Demo Data Backup"
}
},
Expand All @@ -113,17 +112,16 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "a06cfdca-b65e-42de-8db2-8c21c183c5dd",
"job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-12-26 12:03:35.420000",
"date_modified": "2024-12-26 12:03:35.590000",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
"command": "ALTER DATABASE DemoData SET READ_ONLY"
},
"externalUrl": "",
"name": "Weekly Demo Data Backup",
"type": {
"string": "MSSQL_JOB_STEP"
Expand Down Expand Up @@ -2259,7 +2257,6 @@
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "",
"name": "DemoData.Foo.stored_procedures"
}
},
Expand All @@ -2282,10 +2279,9 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-12-26 12:03:35.230000",
"date_modified": "2024-12-26 12:03:35.230000"
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
},
"externalUrl": "",
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
"string": "MSSQL_STORED_PROCEDURE"
Expand All @@ -2298,6 +2294,29 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "dataTransformLogic",
"aspect": {
"json": {
"transforms": [
{
"queryStatement": {
"value": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"language": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
Expand Down Expand Up @@ -2675,7 +2694,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735214621644,
"time": 1735588789629,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Loading

0 comments on commit ac44f4a

Please sign in to comment.