From d5602378e17aad1f4375553ed38d3d7fca0da999 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Sun, 6 Oct 2024 23:36:03 +0100 Subject: [PATCH] add glossaryTerm aspects --- .../ingestion/source/dremio/dremio_aspects.py | 16 +- .../ingestion/source/dremio/dremio_source.py | 143 ++++++++++++------ 2 files changed, 106 insertions(+), 53 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py index 1cdded7ae65d5d..3e8fe7ba101a08 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py @@ -20,7 +20,6 @@ make_domain_urn, ) from datahub.ingestion.source.dremio.dremio_profiling import DremioProfiler -from datahub.ingestion.source.metadata.business_glossary import make_glossary_term_urn from datahub.metadata.schema_classes import ( DatasetPropertiesClass, SchemaMetadataClass, @@ -41,7 +40,6 @@ TimeStampClass, SubTypesClass, DataPlatformInstanceClass, - DatasetKeyClass, FabricTypeClass, OwnershipClass, OwnerClass, @@ -52,7 +50,6 @@ AuditStampClass, ContainerClass, ContainerPropertiesClass, - GlossaryTermKeyClass, GlossaryTermInfoClass, StatusClass, DatasetProfileClass, @@ -156,6 +153,10 @@ def populate_container_aspects(self, container: DremioContainer) -> Dict[str, _A aspects[DataPlatformInstanceClass.get_aspect_name()] = self._create_data_platform_instance() aspects[SubTypesClass.ASPECT_NAME] = SubTypesClass(typeNames=[container.subclass]) aspects[StatusClass.ASPECT_NAME] = StatusClass(removed=False) + + if container.glossary_terms: + aspects[GlossaryTermsClass.ASPECT_NAME] = self._create_glossary_terms(container) + return aspects def populate_dataset_aspects(self, dataset: DremioDataset) -> Dict[str, _Aspect]: @@ -189,7 +190,6 @@ def populate_dataset_aspects(self, dataset: DremioDataset) -> Dict[str, _Aspect] def populate_glossary_term_aspects(self, glossary_term: DremioGlossaryTerm) -> Dict[str, _Aspect]: return { GlossaryTermInfoClass.ASPECT_NAME: self._create_glossary_term_info(glossary_term), - DataPlatformInstanceClass.ASPECT_NAME: self._create_data_platform_instance() } def populate_profile_aspect(self, profile_data: Dict) -> DatasetProfileClass: @@ -267,7 +267,13 @@ def _create_ownership(self, dataset: DremioDataset) -> OwnershipClass: ] ) - def _create_glossary_terms(self, dataset: DremioDataset) -> GlossaryTermsClass: + def _create_glossary_terms( + self, + dataset: Union[ + DremioDataset, + DremioContainer, + ], + ) -> GlossaryTermsClass: return GlossaryTermsClass( terms=[ GlossaryTermAssociationClass(urn=term.urn) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index b34b14a63a7f8e..78f93b5e96944d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -4,7 +4,7 @@ import logging import re from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Iterable, List, Optional, Dict, Any, Callable +from typing import Iterable, List, Optional, Dict, Any from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.decorators import ( @@ -47,7 +47,7 @@ DremioDataset, DremioEdition, DremioContainer, - DremioDatasetType, + DremioDatasetType, DremioGlossaryTerm, ) from datahub.ingestion.source.dremio.dremio_config import ( DremioSourceConfig, @@ -220,17 +220,22 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: containers = self.dremio_catalog.get_containers() - self.thread_execution( - function=self.process_container, - entities=containers, - ) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_dataset = { + executor.submit(self.process_container, container): container + for container in containers + } - datasets = self.dremio_catalog.get_datasets() + for future in as_completed(future_to_dataset): + container_info = future_to_dataset[future] + try: + yield from future.result() + except Exception as exc: + self.report.report_failure( + f"Failed to process dataset {container_info.path}.{container_info.resource_name}: {exc}" + ) - self.thread_execution( - function=self.process_dataset, - entities=datasets, - ) + datasets = self.dremio_catalog.get_datasets() with ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_dataset = { @@ -250,6 +255,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.config.include_query_lineage: self.get_query_lineage_workunits() + glossary_terms = self.dremio_catalog.get_glossary_terms() + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_dataset = { + executor.submit(self.process_glossary_term, glossary_term): glossary_term + for glossary_term in glossary_terms + } + + for future in as_completed(future_to_dataset): + glossary_term_info = future_to_dataset[future] + try: + yield from future.result() + except Exception as exc: + self.report.report_failure( + f"Failed to process glossary term {glossary_term_info.glossary_term}: {exc}" + ) + # Generate workunit for aggregated SQL parsing results for mcp in self.sql_parsing_aggregator.gen_metadata(): self.report.report_workunit(mcp.as_workunit()) @@ -277,19 +299,20 @@ def process_container(self, container_info: DremioContainer) -> List[MetadataWor ) for aspect_name, aspect in aspects.items(): - mcp = MetadataChangeProposalWrapper( - entityType="container", - entityUrn=container_urn, - aspectName=aspect_name, - aspect=aspect, - changeType=ChangeTypeClass.UPSERT, - ) - workunits.append( - MetadataWorkUnit( - id=f"{container_urn}-{aspect_name}", - mcp=mcp, + if aspect is not None: + mcp = MetadataChangeProposalWrapper( + entityType="container", + entityUrn=container_urn, + aspectName=aspect_name, + aspect=aspect, + changeType=ChangeTypeClass.UPSERT, + ) + workunits.append( + MetadataWorkUnit( + id=f"{container_urn}-{aspect_name}", + mcp=mcp, + ) ) - ) return workunits @@ -326,19 +349,20 @@ def process_dataset(self, dataset_info: DremioDataset) -> List[MetadataWorkUnit] ) for aspect_name, aspect in aspects.items(): - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - entityUrn=dataset_urn, - aspectName=aspect_name, - aspect=aspect, - changeType=ChangeTypeClass.UPSERT, - ) - workunits.append( - MetadataWorkUnit( - id=f"{dataset_urn}-{aspect_name}", - mcp=mcp, + if aspect is not None: + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + aspectName=aspect_name, + aspect=aspect, + changeType=ChangeTypeClass.UPSERT, + ) + workunits.append( + MetadataWorkUnit( + id=f"{dataset_urn}-{aspect_name}", + mcp=mcp, + ) ) - ) if dataset_info.dataset_type == DremioDatasetType.VIEW: if self.dremio_catalog.edition == DremioEdition.ENTERPRISE: @@ -396,6 +420,42 @@ def process_dataset(self, dataset_info: DremioDataset) -> List[MetadataWorkUnit] return workunits + def process_glossary_term(self, glossary_term_info: DremioGlossaryTerm) -> List[MetadataWorkUnit]: + """ + Process a Dremio container and generate metadata workunits. + """ + + workunits = [] + + glossary_term_urn = glossary_term_info.urn + + self.stale_entity_removal_handler.add_entity_to_state( + type="glossaryTerm", + urn=glossary_term_urn + ) + + aspects = self.dremio_aspects.populate_glossary_term_aspects( + glossary_term=glossary_term_info, + ) + + for aspect_name, aspect in aspects.items(): + if aspect is not None: + mcp = MetadataChangeProposalWrapper( + entityType="glossaryTerm", + entityUrn=glossary_term_urn, + aspectName=aspect_name, + aspect=aspect, + changeType=ChangeTypeClass.UPSERT, + ) + workunits.append( + MetadataWorkUnit( + id=f"{glossary_term_urn}-{aspect_name}", + mcp=mcp, + ) + ) + + return workunits + def generate_view_lineage( self, parents: List[str], dataset_urn: str ) -> Iterable[MetadataWorkUnit]: @@ -545,19 +605,6 @@ def _map_dremio_dataset_to_urn( env=self.config.env ) - def thread_execution(self, function: Callable, entities): - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - future_to_dataset = { - executor.submit(function, entity): entity - for entity in entities - } - - for future in as_completed(future_to_dataset): - try: - yield from future.result() - except Exception as exc: - self.report.report_failure(exc) - def get_report(self) -> SourceReport: """ Get the source report.