Skip to content

Commit

Permalink
add glossaryTerm aspects
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Oct 6, 2024
1 parent f598fa6 commit d560237
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
make_domain_urn,
)
from datahub.ingestion.source.dremio.dremio_profiling import DremioProfiler
from datahub.ingestion.source.metadata.business_glossary import make_glossary_term_urn
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
SchemaMetadataClass,
Expand All @@ -41,7 +40,6 @@
TimeStampClass,
SubTypesClass,
DataPlatformInstanceClass,
DatasetKeyClass,
FabricTypeClass,
OwnershipClass,
OwnerClass,
Expand All @@ -52,7 +50,6 @@
AuditStampClass,
ContainerClass,
ContainerPropertiesClass,
GlossaryTermKeyClass,
GlossaryTermInfoClass,
StatusClass,
DatasetProfileClass,
Expand Down Expand Up @@ -156,6 +153,10 @@ def populate_container_aspects(self, container: DremioContainer) -> Dict[str, _A
aspects[DataPlatformInstanceClass.get_aspect_name()] = self._create_data_platform_instance()
aspects[SubTypesClass.ASPECT_NAME] = SubTypesClass(typeNames=[container.subclass])
aspects[StatusClass.ASPECT_NAME] = StatusClass(removed=False)

if container.glossary_terms:
aspects[GlossaryTermsClass.ASPECT_NAME] = self._create_glossary_terms(container)

return aspects

def populate_dataset_aspects(self, dataset: DremioDataset) -> Dict[str, _Aspect]:
Expand Down Expand Up @@ -189,7 +190,6 @@ def populate_dataset_aspects(self, dataset: DremioDataset) -> Dict[str, _Aspect]
def populate_glossary_term_aspects(self, glossary_term: DremioGlossaryTerm) -> Dict[str, _Aspect]:
return {
GlossaryTermInfoClass.ASPECT_NAME: self._create_glossary_term_info(glossary_term),
DataPlatformInstanceClass.ASPECT_NAME: self._create_data_platform_instance()
}

def populate_profile_aspect(self, profile_data: Dict) -> DatasetProfileClass:
Expand Down Expand Up @@ -267,7 +267,13 @@ def _create_ownership(self, dataset: DremioDataset) -> OwnershipClass:
]
)

def _create_glossary_terms(self, dataset: DremioDataset) -> GlossaryTermsClass:
def _create_glossary_terms(
self,
dataset: Union[
DremioDataset,
DremioContainer,
],
) -> GlossaryTermsClass:
return GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(urn=term.urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Iterable, List, Optional, Dict, Any, Callable
from typing import Iterable, List, Optional, Dict, Any

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -47,7 +47,7 @@
DremioDataset,
DremioEdition,
DremioContainer,
DremioDatasetType,
DremioDatasetType, DremioGlossaryTerm,
)
from datahub.ingestion.source.dremio.dremio_config import (
DremioSourceConfig,
Expand Down Expand Up @@ -220,17 +220,22 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

containers = self.dremio_catalog.get_containers()

self.thread_execution(
function=self.process_container,
entities=containers,
)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_dataset = {
executor.submit(self.process_container, container): container
for container in containers
}

datasets = self.dremio_catalog.get_datasets()
for future in as_completed(future_to_dataset):
container_info = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.report_failure(
f"Failed to process dataset {container_info.path}.{container_info.resource_name}: {exc}"
)

self.thread_execution(
function=self.process_dataset,
entities=datasets,
)
datasets = self.dremio_catalog.get_datasets()

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_dataset = {
Expand All @@ -250,6 +255,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.include_query_lineage:
self.get_query_lineage_workunits()

glossary_terms = self.dremio_catalog.get_glossary_terms()

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_dataset = {
executor.submit(self.process_glossary_term, glossary_term): glossary_term
for glossary_term in glossary_terms
}

for future in as_completed(future_to_dataset):
glossary_term_info = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.report_failure(
f"Failed to process glossary term {glossary_term_info.glossary_term}: {exc}"
)

# Generate workunit for aggregated SQL parsing results
for mcp in self.sql_parsing_aggregator.gen_metadata():
self.report.report_workunit(mcp.as_workunit())
Expand Down Expand Up @@ -277,19 +299,20 @@ def process_container(self, container_info: DremioContainer) -> List[MetadataWor
)

for aspect_name, aspect in aspects.items():
mcp = MetadataChangeProposalWrapper(
entityType="container",
entityUrn=container_urn,
aspectName=aspect_name,
aspect=aspect,
changeType=ChangeTypeClass.UPSERT,
)
workunits.append(
MetadataWorkUnit(
id=f"{container_urn}-{aspect_name}",
mcp=mcp,
if aspect is not None:
mcp = MetadataChangeProposalWrapper(
entityType="container",
entityUrn=container_urn,
aspectName=aspect_name,
aspect=aspect,
changeType=ChangeTypeClass.UPSERT,
)
workunits.append(
MetadataWorkUnit(
id=f"{container_urn}-{aspect_name}",
mcp=mcp,
)
)
)

return workunits

Expand Down Expand Up @@ -326,19 +349,20 @@ def process_dataset(self, dataset_info: DremioDataset) -> List[MetadataWorkUnit]
)

for aspect_name, aspect in aspects.items():
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
aspectName=aspect_name,
aspect=aspect,
changeType=ChangeTypeClass.UPSERT,
)
workunits.append(
MetadataWorkUnit(
id=f"{dataset_urn}-{aspect_name}",
mcp=mcp,
if aspect is not None:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
aspectName=aspect_name,
aspect=aspect,
changeType=ChangeTypeClass.UPSERT,
)
workunits.append(
MetadataWorkUnit(
id=f"{dataset_urn}-{aspect_name}",
mcp=mcp,
)
)
)

if dataset_info.dataset_type == DremioDatasetType.VIEW:
if self.dremio_catalog.edition == DremioEdition.ENTERPRISE:
Expand Down Expand Up @@ -396,6 +420,42 @@ def process_dataset(self, dataset_info: DremioDataset) -> List[MetadataWorkUnit]

return workunits

def process_glossary_term(self, glossary_term_info: DremioGlossaryTerm) -> List[MetadataWorkUnit]:
"""
Process a Dremio container and generate metadata workunits.
"""

workunits = []

glossary_term_urn = glossary_term_info.urn

self.stale_entity_removal_handler.add_entity_to_state(
type="glossaryTerm",
urn=glossary_term_urn
)

aspects = self.dremio_aspects.populate_glossary_term_aspects(
glossary_term=glossary_term_info,
)

for aspect_name, aspect in aspects.items():
if aspect is not None:
mcp = MetadataChangeProposalWrapper(
entityType="glossaryTerm",
entityUrn=glossary_term_urn,
aspectName=aspect_name,
aspect=aspect,
changeType=ChangeTypeClass.UPSERT,
)
workunits.append(
MetadataWorkUnit(
id=f"{glossary_term_urn}-{aspect_name}",
mcp=mcp,
)
)

return workunits

def generate_view_lineage(
self, parents: List[str], dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -545,19 +605,6 @@ def _map_dremio_dataset_to_urn(
env=self.config.env
)

def thread_execution(self, function: Callable, entities):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_dataset = {
executor.submit(function, entity): entity
for entity in entities
}

for future in as_completed(future_to_dataset):
try:
yield from future.result()
except Exception as exc:
self.report.report_failure(exc)

def get_report(self) -> SourceReport:
"""
Get the source report.
Expand Down

0 comments on commit d560237

Please sign in to comment.