Skip to content

Commit

Permalink
feat(ingest): add datahub apply source (#12482)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 29, 2025
1 parent 22e012a commit cc0d43f
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 64 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
"datahub-gc = datahub.ingestion.source.gc.datahub_gc:DataHubGcSource",
"datahub-apply = datahub.ingestion.source.apply.datahub_apply:DataHubApplySource",
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
"mlflow = datahub.ingestion.source.mlflow:MLflowSource",
Expand Down
65 changes: 1 addition & 64 deletions metadata-ingestion/src/datahub/cli/container_cli.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
import logging
from typing import Any, List

import click
import progressbar

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
DomainsClass,
GlossaryTermAssociationClass,
OwnerClass,
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.ingestion.source.apply.datahub_apply import apply_association_to_container

logger = logging.getLogger(__name__)

Expand All @@ -24,58 +13,6 @@ def container() -> None:
pass


def apply_association_to_container(
container_urn: str,
association_urn: str,
association_type: str,
) -> None:
"""
Common function to add either tags, terms, domains, or owners to child datasets (for now).
Args:
container_urn: The URN of the container
association_urn: The URN of the tag, term, or user to apply
association_type: One of 'tag', 'term', 'domain' or 'owner'
"""
urns: List[str] = []
graph = get_default_graph()
logger.info(f"Using {graph}")
urns.extend(
graph.get_urns_by_filter(
container=container_urn, batch_size=1000, entity_types=["dataset"]
)
)

all_patches: List[Any] = []
for urn in urns:
builder = DatasetPatchBuilder(urn)
patches: List[Any] = []
if association_type == "tag":
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
elif association_type == "term":
patches = builder.add_term(
GlossaryTermAssociationClass(association_urn)
).build()
elif association_type == "owner":
patches = builder.add_owner(
OwnerClass(
owner=association_urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
).build()
elif association_type == "domain":
patches = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DomainsClass(domains=[association_urn]),
)
]
all_patches.extend(patches)
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
for mcp in mcps_iter:
graph.emit(mcp)


@container.command()
@click.option("--container-urn", required=True, type=str)
@click.option("--tag-urn", required=True, type=str)
Expand Down
Empty file.
223 changes: 223 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import logging
from functools import partial
from typing import Any, Iterable, List, Optional, Union

import progressbar
from pydantic import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.metadata.schema_classes import (
DomainsClass,
GlossaryTermAssociationClass,
MetadataChangeProposalClass,
OwnerClass,
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.specific.dataset import DatasetPatchBuilder

logger = logging.getLogger(__name__)


def apply_association_to_container(
container_urn: str,
association_urn: str,
association_type: str,
emit: bool = True,
graph: Optional[DataHubGraph] = None,
) -> Optional[List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]]:
"""
Common function to add either tags, terms, domains, or owners to child datasets (for now).
Args:
container_urn: The URN of the container
association_urn: The URN of the tag, term, or user to apply
association_type: One of 'tag', 'term', 'domain' or 'owner'
"""
urns: List[str] = [container_urn]
if not graph:
graph = get_default_graph()
logger.info(f"Using {graph}")
urns.extend(
graph.get_urns_by_filter(
container=container_urn,
batch_size=1000,
entity_types=["dataset", "container"],
)
)

all_patches: List[Any] = []
for urn in urns:
builder = DatasetPatchBuilder(urn)
patches: List[Any] = []
if association_type == "tag":
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
elif association_type == "term":
patches = builder.add_term(
GlossaryTermAssociationClass(association_urn)
).build()
elif association_type == "owner":
patches = builder.add_owner(
OwnerClass(
owner=association_urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
).build()
elif association_type == "domain":
patches = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DomainsClass(domains=[association_urn]),
)
]
all_patches.extend(patches)
if emit:
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
for mcp in mcps_iter:
graph.emit(mcp)
return None
else:
return all_patches


class DomainApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply domain hierarchichaly. Currently only containers and datasets are supported",
)
domain_urn: str = Field(default="")


class TagApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply tag hierarchichaly. Currently only containers and datasets are supported",
)
tag_urn: str = Field(default="")


class TermApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply term hierarchichaly. Currently only containers and datasets are supported",
)
term_urn: str = Field(default="")


class OwnerApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply owner hierarchichaly. Currently only containers and datasets are supported",
)
owner_urn: str = Field(default="")


class DataHubApplyConfig(ConfigModel):
domain_apply: Optional[List[DomainApplyConfig]] = Field(
default=None,
description="List to apply domains to assets",
)
tag_apply: Optional[List[TagApplyConfig]] = Field(
default=None,
description="List to apply tags to assets",
)
term_apply: Optional[List[TermApplyConfig]] = Field(
default=None,
description="List to apply terms to assets",
)
owner_apply: Optional[List[OwnerApplyConfig]] = Field(
default=None,
description="List to apply owners to assets",
)


@platform_name("DataHubApply")
@config_class(DataHubApplyConfig)
@support_status(SupportStatus.TESTING)
class DataHubApplySource(Source):
"""
This source is a helper over CLI
so people can use the helper to apply various metadata changes to DataHub
via Managed Ingestion
"""

def __init__(self, ctx: PipelineContext, config: DataHubApplyConfig):
self.ctx = ctx
self.config = config
self.report = SourceReport()
self.graph = ctx.require_graph()

def _yield_workunits(
self,
proposals: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
],
) -> Iterable[MetadataWorkUnit]:
for proposal in proposals:
if isinstance(proposal, MetadataChangeProposalWrapper):
yield proposal.as_workunit()
else:
yield MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(proposal),
mcp_raw=proposal,
)

def _handle_assets(
self, assets: List[str], apply_urn: str, apply_type: str
) -> Iterable[MetadataWorkUnit]:
for asset in assets:
change_proposals = apply_association_to_container(
asset, apply_urn, apply_type, emit=False, graph=self.graph
)
assert change_proposals is not None
yield from self._yield_workunits(change_proposals)

def _yield_domain(self) -> Iterable[MetadataWorkUnit]:
if not self.config.domain_apply:
return
for apply in self.config.domain_apply:
yield from self._handle_assets(apply.assets, apply.domain_urn, "domain")

def _yield_tag(self) -> Iterable[MetadataWorkUnit]:
if not self.config.tag_apply:
return
for apply in self.config.tag_apply:
yield from self._handle_assets(apply.assets, apply.tag_urn, "tag")

def _yield_term(self) -> Iterable[MetadataWorkUnit]:
if not self.config.term_apply:
return
for apply in self.config.term_apply:
yield from self._handle_assets(apply.assets, apply.term_urn, "term")

def _yield_owner(self) -> Iterable[MetadataWorkUnit]:
if not self.config.owner_apply:
return
for apply in self.config.owner_apply:
yield from self._handle_assets(apply.assets, apply.owner_urn, "owner")

def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
yield from self._yield_domain()
yield from self._yield_tag()
yield from self._yield_term()
yield from self._yield_owner()

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [partial(auto_workunit_reporter, self.get_report())]

def get_report(self) -> SourceReport:
return self.report

0 comments on commit cc0d43f

Please sign in to comment.