Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add datahub apply source #12482

Merged
merged 3 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
"datahub-gc = datahub.ingestion.source.gc.datahub_gc:DataHubGcSource",
"datahub-apply = datahub.ingestion.source.apply.datahub_apply:DataHubApplySource",
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
"mlflow = datahub.ingestion.source.mlflow:MLflowSource",
Expand Down
65 changes: 1 addition & 64 deletions metadata-ingestion/src/datahub/cli/container_cli.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
import logging
from typing import Any, List

import click
import progressbar

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
DomainsClass,
GlossaryTermAssociationClass,
OwnerClass,
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.ingestion.source.apply.datahub_apply import apply_association_to_container

logger = logging.getLogger(__name__)

Expand All @@ -24,58 +13,6 @@ def container() -> None:
pass


def apply_association_to_container(
container_urn: str,
association_urn: str,
association_type: str,
) -> None:
"""
Common function to add either tags, terms, domains, or owners to child datasets (for now).

Args:
container_urn: The URN of the container
association_urn: The URN of the tag, term, or user to apply
association_type: One of 'tag', 'term', 'domain' or 'owner'
"""
urns: List[str] = []
graph = get_default_graph()
logger.info(f"Using {graph}")
urns.extend(
graph.get_urns_by_filter(
container=container_urn, batch_size=1000, entity_types=["dataset"]
)
)

all_patches: List[Any] = []
for urn in urns:
builder = DatasetPatchBuilder(urn)
patches: List[Any] = []
if association_type == "tag":
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
elif association_type == "term":
patches = builder.add_term(
GlossaryTermAssociationClass(association_urn)
).build()
elif association_type == "owner":
patches = builder.add_owner(
OwnerClass(
owner=association_urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
).build()
elif association_type == "domain":
patches = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DomainsClass(domains=[association_urn]),
)
]
all_patches.extend(patches)
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
for mcp in mcps_iter:
graph.emit(mcp)


@container.command()
@click.option("--container-urn", required=True, type=str)
@click.option("--tag-urn", required=True, type=str)
Expand Down
Empty file.
223 changes: 223 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import logging
from functools import partial
from typing import Any, Iterable, List, Optional, Union

import progressbar
from pydantic import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.metadata.schema_classes import (
DomainsClass,
GlossaryTermAssociationClass,
MetadataChangeProposalClass,
OwnerClass,
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.specific.dataset import DatasetPatchBuilder

logger = logging.getLogger(__name__)


def apply_association_to_container(
container_urn: str,
association_urn: str,
association_type: str,
emit: bool = True,
graph: Optional[DataHubGraph] = None,
) -> Optional[List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]]:
"""
Common function to add either tags, terms, domains, or owners to child datasets (for now).

Args:
container_urn: The URN of the container
association_urn: The URN of the tag, term, or user to apply
association_type: One of 'tag', 'term', 'domain' or 'owner'
"""
urns: List[str] = [container_urn]
if not graph:
graph = get_default_graph()
logger.info(f"Using {graph}")
urns.extend(

Check warning on line 53 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L49-L53

Added lines #L49 - L53 were not covered by tests
graph.get_urns_by_filter(
container=container_urn,
batch_size=1000,
entity_types=["dataset", "container"],
)
)

all_patches: List[Any] = []
for urn in urns:
builder = DatasetPatchBuilder(urn)
patches: List[Any] = []
if association_type == "tag":
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
elif association_type == "term":
patches = builder.add_term(

Check warning on line 68 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L61-L68

Added lines #L61 - L68 were not covered by tests
GlossaryTermAssociationClass(association_urn)
).build()
elif association_type == "owner":
patches = builder.add_owner(

Check warning on line 72 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L71-L72

Added lines #L71 - L72 were not covered by tests
OwnerClass(
owner=association_urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
).build()
elif association_type == "domain":
patches = [

Check warning on line 79 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L78-L79

Added lines #L78 - L79 were not covered by tests
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DomainsClass(domains=[association_urn]),
)
]
all_patches.extend(patches)
if emit:
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
for mcp in mcps_iter:
graph.emit(mcp)
return None

Check warning on line 90 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L85-L90

Added lines #L85 - L90 were not covered by tests
else:
return all_patches

Check warning on line 92 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L92

Added line #L92 was not covered by tests


class DomainApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply domain hierarchichaly. Currently only containers and datasets are supported",
)
domain_urn: str = Field(default="")


class TagApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply tag hierarchichaly. Currently only containers and datasets are supported",
)
tag_urn: str = Field(default="")


class TermApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply term hierarchichaly. Currently only containers and datasets are supported",
)
term_urn: str = Field(default="")


class OwnerApplyConfig(ConfigModel):
assets: List[str] = Field(
default_factory=list,
description="List of assets to apply owner hierarchichaly. Currently only containers and datasets are supported",
)
owner_urn: str = Field(default="")


class DataHubApplyConfig(ConfigModel):
domain_apply: Optional[List[DomainApplyConfig]] = Field(
default=None,
description="List to apply domains to assets",
)
tag_apply: Optional[List[TagApplyConfig]] = Field(
default=None,
description="List to apply tags to assets",
)
term_apply: Optional[List[TermApplyConfig]] = Field(
default=None,
description="List to apply terms to assets",
)
owner_apply: Optional[List[OwnerApplyConfig]] = Field(
default=None,
description="List to apply owners to assets",
)


@platform_name("DataHubApply")
@config_class(DataHubApplyConfig)
@support_status(SupportStatus.TESTING)
class DataHubApplySource(Source):
"""
This source is a helper over CLI
so people can use the helper to apply various metadata changes to DataHub
via Managed Ingestion
"""

def __init__(self, ctx: PipelineContext, config: DataHubApplyConfig):
self.ctx = ctx
self.config = config
self.report = SourceReport()
self.graph = ctx.require_graph()

Check warning on line 160 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L157-L160

Added lines #L157 - L160 were not covered by tests

def _yield_workunits(
self,
proposals: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
],
) -> Iterable[MetadataWorkUnit]:
for proposal in proposals:
if isinstance(proposal, MetadataChangeProposalWrapper):
yield proposal.as_workunit()

Check warning on line 170 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L168-L170

Added lines #L168 - L170 were not covered by tests
else:
yield MetadataWorkUnit(

Check warning on line 172 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L172

Added line #L172 was not covered by tests
id=MetadataWorkUnit.generate_workunit_id(proposal),
mcp_raw=proposal,
)

def _handle_assets(
self, assets: List[str], apply_urn: str, apply_type: str
) -> Iterable[MetadataWorkUnit]:
for asset in assets:
change_proposals = apply_association_to_container(

Check warning on line 181 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L180-L181

Added lines #L180 - L181 were not covered by tests
asset, apply_urn, apply_type, emit=False, graph=self.graph
)
assert change_proposals is not None
yield from self._yield_workunits(change_proposals)

Check warning on line 185 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L184-L185

Added lines #L184 - L185 were not covered by tests

def _yield_domain(self) -> Iterable[MetadataWorkUnit]:
if not self.config.domain_apply:
return
for apply in self.config.domain_apply:
yield from self._handle_assets(apply.assets, apply.domain_urn, "domain")

Check warning on line 191 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L188-L191

Added lines #L188 - L191 were not covered by tests

def _yield_tag(self) -> Iterable[MetadataWorkUnit]:
if not self.config.tag_apply:
return
for apply in self.config.tag_apply:
yield from self._handle_assets(apply.assets, apply.tag_urn, "tag")

Check warning on line 197 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L194-L197

Added lines #L194 - L197 were not covered by tests

def _yield_term(self) -> Iterable[MetadataWorkUnit]:
if not self.config.term_apply:
return
for apply in self.config.term_apply:
yield from self._handle_assets(apply.assets, apply.term_urn, "term")

Check warning on line 203 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L200-L203

Added lines #L200 - L203 were not covered by tests

def _yield_owner(self) -> Iterable[MetadataWorkUnit]:
if not self.config.owner_apply:
return
for apply in self.config.owner_apply:
yield from self._handle_assets(apply.assets, apply.owner_urn, "owner")

Check warning on line 209 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L206-L209

Added lines #L206 - L209 were not covered by tests

def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
yield from self._yield_domain()
yield from self._yield_tag()
yield from self._yield_term()
yield from self._yield_owner()

Check warning on line 217 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L214-L217

Added lines #L214 - L217 were not covered by tests

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [partial(auto_workunit_reporter, self.get_report())]

Check warning on line 220 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L220

Added line #L220 was not covered by tests

def get_report(self) -> SourceReport:
return self.report

Check warning on line 223 in metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/apply/datahub_apply.py#L223

Added line #L223 was not covered by tests
Loading