From cbc50f515f192badf14f66ac6891d17ddce71217 Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Wed, 5 Jun 2024 17:04:09 -0400 Subject: [PATCH] feat: Add command to generate a dbt exposures file We'll use this in aspects-dbt CI to keep the model to dashboard mappings up to date. --- tutoraspects/commands_v1.py | 21 ++ .../pythonpath/collect_dbt_lineage.py | 195 ++++++++++++++++++ .../build/aspects-superset/requirements.txt | 1 + .../templates/base-docker-compose-services | 2 + 4 files changed, 219 insertions(+) create mode 100644 tutoraspects/templates/aspects/apps/superset/pythonpath/collect_dbt_lineage.py diff --git a/tutoraspects/commands_v1.py b/tutoraspects/commands_v1.py index f792b4a51..499c32ced 100644 --- a/tutoraspects/commands_v1.py +++ b/tutoraspects/commands_v1.py @@ -190,6 +190,26 @@ def performance_metrics( ] +# Ex: "tutor local do collect-dbt-lineage" +@click.command(context_settings={"ignore_unknown_options": True}) +def collect_dbt_lineage() -> (list)[tuple[str, str]]: + """ + Job to dump a list of dbt resources used in Superset. + + aspects-dbt uses this to update the list of exposures so we can identify which + models are being used, and where. + """ + return [ + ( + "superset", + "set -e && " + "echo 'Collecting dbt lineage...' && " + "python /app/pythonpath/collect_dbt_lineage.py &&" + "echo 'Done!';", + ), + ] + + # Ex: "tutor local do dump_data_to_clickhouse " @click.command(context_settings={"ignore_unknown_options": True}) @click.option( @@ -375,6 +395,7 @@ def check_superset_assets(): import_assets, performance_metrics, init_clickhouse, + collect_dbt_lineage, ) COMMANDS = (aspects,) diff --git a/tutoraspects/templates/aspects/apps/superset/pythonpath/collect_dbt_lineage.py b/tutoraspects/templates/aspects/apps/superset/pythonpath/collect_dbt_lineage.py new file mode 100644 index 000000000..f8edf0648 --- /dev/null +++ b/tutoraspects/templates/aspects/apps/superset/pythonpath/collect_dbt_lineage.py @@ -0,0 +1,195 @@ +""" +Parse all Superset datasets for SQL, find links back to dbt assets. + +This is used in aspects-dbt to update the exposures. +""" + +# Necessary to access Superset +from create_assets import app + +import json +import logging +import os +import re + +import click +import ruamel.yaml +from superset.extensions import db +from superset.models.dashboard import Dashboard + + +logger = logging.getLogger("collect_dbt_lineage") + +DBT_DATA_PATH = "/app/dbt_manifest/" +DBT_MANIFEST_PATH = os.path.join(DBT_DATA_PATH, "manifest.json") + + +def get_tables_from_dbt(): + """ + Take generated metadata from the last dbt run to find known models + """ + with open(DBT_MANIFEST_PATH) as f: + dbt_manifest = json.load(f) + + tables = {} + for table_type in ["nodes", "sources"]: + manifest_subset = dbt_manifest[table_type] + + for table_key_long in manifest_subset: + table = manifest_subset[table_key_long] + name = table["name"] + schema = table["schema"] + database = table["database"] + source = table["unique_id"].split(".")[-2] + table_key = schema + "." + name + + # fail if it breaks uniqueness constraint + assert table_key not in tables, \ + f"Table {table_key} is a duplicate name (schema + table) across databases. " \ + "This would result in incorrect matching between Superset and dbt. " \ + + tables[table_key] = { + "name": name, + "schema": schema, + "database": database, + "type": table_type[:-1], + "ref": + f"ref('{name}')" if table_type == "nodes" + else f"source('{source}', '{name}')" + } + + assert tables, "Manifest is empty!" + + return tables + + +def get_tables_from_sql(sql): + """ + Find table names in sql. + + This is a hack around how sqlfluff and sqlparse both choke on Superset Jinja + currently. We should invest in making one of those tools work if this proves + useful. This likely doesn't catch everything or has false positives where + Superset virtual datasets share names with dbt models. + """ + sql = re.sub(r'(--.*)|(#.*)', '', sql) # remove line comments + sql = re.sub(r'\s+', ' ', sql).lower() # make it one line + sql = re.sub(r'(/\*(.|\n)*\*/)', '', sql) # remove block comments + + regex = re.compile(r'\b(from|join)\b\s+(\"?(\w+)\"?(\.))?\"?(\w+)\"?\b') # regex for tables + tables_match = regex.findall(sql) + tables = [table[2] + '.' + table[4] if table[2] != '' else table[4] # full name if with schema + for table in tables_match + if table[4] != 'unnest'] # remove false positive + tables = set(tables) # remove duplicates + + return list(tables) + + +class YamlFormatted(ruamel.yaml.YAML): + def __init__(self): + super(YamlFormatted, self).__init__() + self.default_flow_style = False + self.allow_unicode = True + self.encoding = 'utf-8' + self.block_seq_indent = 2 + self.indent = 4 + + +def get_slice_tables(slice, dbt_tables): + """ + Find the tables used in a dataset. + """ + name = slice.table.name + schema = slice.table.schema + dataset_key = f"{schema}.{name}" + + # only add datasets that are in dashboards, optionally limit to one database + kind = "virtual" if slice.table.is_virtual else "table" + if kind == "virtual": # built on custom sql + sql = slice.table.get_rendered_sql() + tables = get_tables_from_sql(sql) + else: + tables = [dataset_key] + + logger.info(f"Found tables {tables}") + dbt_refs = [ + dbt_tables[table]["ref"] + for table in tables + if table in dbt_tables + ] + + return dbt_refs + + +def get_dashboard_dict(dashboard, dbt_tables): + """ + Get dashboard metadata and all dataset dbt dependencies. + """ + logger.info(f"Dashboard: {dashboard.slug}") + + dependencies = set() + for slice in dashboard.slices: + logger.info(slice) + dependencies.update( + get_slice_tables(slice, dbt_tables=dbt_tables) + ) + + return { + "name": dashboard.slug, + "label": dashboard.dashboard_title, + "type": "dashboard", + "description": dashboard.description or "", + "url": dashboard.external_url or "", + "depends_on": list(sorted(dependencies)), + "owner": {"name": ""}, + } + + +def write_exposures_yaml(exposure_dashboards): + exposures_yaml = ruamel.yaml.comments.CommentedSeq(exposure_dashboards) + + exposures_yaml_schema = { + 'version': 2, + 'exposures': exposures_yaml + } + + outfile = os.path.join(DBT_DATA_PATH, "superset_exposures.yaml") + logger.info(f"Writing exposures to {outfile}") + exposures_yaml_file = YamlFormatted() + with open(outfile, 'w+', encoding='utf-8') as f: + exposures_yaml_file.dump(exposures_yaml_schema, f) + + +@click.command( + help="""Creates a superset_exposures.yaml file in a shared data dir so that + aspects_dbt can add it to the project in CI and allow dbt docs to show + which models are being used in Superset.""" +) +def collect_dbt_lineage(): + """ + Pull SQL from datasets, compare with dbt manifest.json to find used assets. + """ + dbt_tables = get_tables_from_dbt() + + target_dashboards = {{SUPERSET_EMBEDDABLE_DASHBOARDS}} + + dashboards = ( + db.session.query(Dashboard) + .filter(Dashboard.slug.in_(target_dashboards)) + .all() + ) + + if not dashboards: + logger.warning(f"No dashboard found!") + + exposure_dashboards = [] + for dashboard in dashboards: + exposure_dashboards.append(get_dashboard_dict(dashboard, dbt_tables)) + + write_exposures_yaml(exposure_dashboards) + + +if __name__ == "__main__": + logger.info(f"Collecting dbt lineage, will write output to {DBT_DATA_PATH}.") + collect_dbt_lineage() diff --git a/tutoraspects/templates/aspects/build/aspects-superset/requirements.txt b/tutoraspects/templates/aspects/build/aspects-superset/requirements.txt index da4a4f4b9..21f06b891 100644 --- a/tutoraspects/templates/aspects/build/aspects-superset/requirements.txt +++ b/tutoraspects/templates/aspects/build/aspects-superset/requirements.txt @@ -1,4 +1,5 @@ authlib # OAuth2 clickhouse-connect>0.5,<1.0 sentry-sdk[flask] +ruamel-yaml==0.18.6 urllib3>=1.26.15,<2 diff --git a/tutoraspects/templates/base-docker-compose-services b/tutoraspects/templates/base-docker-compose-services index 9ebb790a5..7835371b6 100644 --- a/tutoraspects/templates/base-docker-compose-services +++ b/tutoraspects/templates/base-docker-compose-services @@ -8,6 +8,8 @@ image: {{ DOCKER_IMAGE_SUPERSET }} - ../../env/plugins/aspects/apps/superset/scripts:/app/scripts - ../../env/plugins/aspects/build/aspects-superset/localization:/app/localization - ../../env/plugins/aspects/build/aspects-superset/openedx-assets:/app/openedx-assets + # Shared with Aspects job so it has access to the state file for the lineage push job + - ../../data/aspects-dbt:/app/dbt_manifest restart: unless-stopped environment: DATABASE_DIALECT: {{ SUPERSET_DB_DIALECT }}