Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dbt): add only_include_if_in_catalog flag for dbt core #11314

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
upstream_nodes=upstream_nodes,
materialization=materialization,
catalog_type=catalog_type,
missing_from_catalog=False, # This doesn't really apply to dbt Cloud.
meta=meta,
query_tag={}, # TODO: Get this from the dbt API.
tags=tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class DBTSourceReport(StaleEntityRemovalSourceReport):
default_factory=LossyList
)

in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList)
nodes_filtered: LossyList[str] = field(default_factory=LossyList)


class EmitDirective(ConfigEnum):
Expand Down Expand Up @@ -528,6 +528,7 @@ class DBTNode:
materialization: Optional[str] # table, view, ephemeral, incremental, snapshot
# see https://docs.getdbt.com/reference/artifacts/manifest-json
catalog_type: Optional[str]
missing_from_catalog: bool # indicates if the node was missing from the catalog.json

owner: Optional[str]

Expand Down Expand Up @@ -853,6 +854,9 @@ def get_column_type(
TypeClass = resolve_postgres_modified_type(column_type)
elif dbt_adapter == "vertica":
TypeClass = resolve_vertica_modified_type(column_type)
elif dbt_adapter == "snowflake":
# Snowflake types are uppercase, so we check that.
TypeClass = _field_type_mapping.get(column_type.upper())

# if still not found, report the warning
if TypeClass is None:
Expand Down Expand Up @@ -1034,6 +1038,7 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
key = node.dbt_name

if not self.config.node_name_pattern.allowed(key):
self.report.nodes_filtered.append(key)
continue

nodes.append(node)
Expand Down
46 changes: 37 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ class DBTCoreConfig(DBTCommonConfig):
"See https://docs.getdbt.com/reference/artifacts/run-results-json.",
)

only_include_if_in_catalog: bool = Field(
default=False,
description="[experimental] If true, only include nodes that are also present in the catalog file. "
"This is useful if you only want to include models that have been built by the associated run.",
)

# Because we now also collect model performance metadata, the "test_results" field was renamed to "run_results".
_convert_test_results_path = pydantic_renamed_field(
"test_results_path", "run_results_paths", transform=lambda x: [x] if x else []
Expand Down Expand Up @@ -156,6 +162,7 @@ def extract_dbt_entities(
manifest_adapter: str,
use_identifiers: bool,
tag_prefix: str,
only_include_if_in_catalog: bool,
report: DBTSourceReport,
) -> List[DBTNode]:
sources_by_id = {x["unique_id"]: x for x in sources_results}
Expand Down Expand Up @@ -194,12 +201,22 @@ def extract_dbt_entities(

# It's a source
catalog_node = all_catalog_entities.get(key)
missing_from_catalog = catalog_node is None
catalog_type = None

if catalog_node is None:
if materialization not in {"test", "ephemeral"}:
# Test and ephemeral nodes will never show up in the catalog.
report.in_manifest_but_missing_catalog.append(key)
missing_from_catalog = False

if not only_include_if_in_catalog:
report.warning(
title="Node missing from catalog",
message="Found a node in the manifest file but not in the catalog. "
"This usually means the catalog file was not generated by `dbt docs generate` and so is incomplete. "
"Some metadata, such as column types and descriptions, will be impacted.",
context=key,
)
else:
catalog_type = all_catalog_entities[key]["metadata"]["type"]

Expand Down Expand Up @@ -264,6 +281,7 @@ def extract_dbt_entities(
upstream_nodes=upstream_nodes,
materialization=materialization,
catalog_type=catalog_type,
missing_from_catalog=missing_from_catalog,
meta=meta,
query_tag=query_tag_props,
tags=tags,
Expand Down Expand Up @@ -291,14 +309,6 @@ def extract_dbt_entities(

dbt_entities.append(dbtNode)

if report.in_manifest_but_missing_catalog:
# We still want this to show up as a warning, but don't want to spam the warnings section
# if there's a lot of them.
report.warning(
"in_manifest_but_missing_catalog",
f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.",
)

return dbt_entities


Expand Down Expand Up @@ -535,6 +545,7 @@ def loadManifestAndCatalog(
manifest_adapter,
self.config.use_identifiers,
self.config.tag_prefix,
self.config.only_include_if_in_catalog,
self.report,
)

Expand Down Expand Up @@ -588,6 +599,23 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:

return all_nodes, additional_custom_props

def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes = super()._filter_nodes(all_nodes)

if not self.config.only_include_if_in_catalog:
return nodes

filtered_nodes = []
for node in nodes:
if node.missing_from_catalog:
# TODO: We need to do some additional testing of this flag to validate that it doesn't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a bit scary. :)

# drop important things entirely (e.g. sources).
self.report.nodes_filtered.append(node.dbt_name)
else:
filtered_nodes.append(node)

return filtered_nodes

def get_external_url(self, node: DBTNode) -> Optional[str]:
if self.config.git_info and node.dbt_file_path:
return self.config.git_info.get_url_for_file_path(node.dbt_file_path)
Expand Down
Loading