From c8d6d96d0bfdd9d7d1c7534d591336286b09204b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 21 Jan 2025 14:19:16 -0800 Subject: [PATCH 1/4] improve source/sink api docs --- .../src/datahub/ingestion/api/decorators.py | 3 +++ metadata-ingestion/src/datahub/ingestion/api/sink.py | 12 ++++++++++++ .../src/datahub/ingestion/api/source.py | 9 ++------- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py index d32c0b85ceef4c..6aa534d5920eba 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py +++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py @@ -1,3 +1,4 @@ +import abc from dataclasses import dataclass from enum import Enum, auto from typing import Callable, Dict, Optional, Type @@ -25,6 +26,8 @@ def wrapper(cls: Type) -> Type: # add the create method only if it has not been overridden from the base Source.create method cls.create = classmethod(default_create) + abc.update_abstractmethods(cls) + return cls return wrapper diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 655e6bb22fa8d1..44148c751df828 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -110,6 +110,10 @@ def __init__(self, ctx: PipelineContext, config: SinkConfig): self.__post_init__() def __post_init__(self) -> None: + """Hook called after the sink's main initialization is complete. + + Sink subclasses can override this method to customize initialization. + """ pass @classmethod @@ -117,9 +121,17 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Self": return cls(ctx, cls.get_config_class().parse_obj(config_dict)) def handle_work_unit_start(self, workunit: WorkUnit) -> None: + """Called at the start of each new workunit. + + This method is deprecated and will be removed in a future release. + """ pass def handle_work_unit_end(self, workunit: WorkUnit) -> None: + """Called at the end of each workunit. + + This method is deprecated and will be removed in a future release. + """ pass @abstractmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index b04ffdb3258934..e8f1a2fc3e4ff4 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -400,13 +400,8 @@ class Source(Closeable, metaclass=ABCMeta): ctx: PipelineContext @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: - # Technically, this method should be abstract. However, the @config_class - # decorator automatically generates a create method at runtime if one is - # not defined. Python still treats the class as abstract because it thinks - # the create method is missing. To avoid the class becoming abstract, we - # can't make this method abstract. - raise NotImplementedError('sources must implement "create"') + @abstractmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: ... def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: """A list of functions that transforms the workunits produced by this source. From 2131890116b16e6da280494bb2f20ceebd9c4433 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 21 Jan 2025 14:24:30 -0800 Subject: [PATCH 2/4] various other improvements --- metadata-ingestion/src/datahub/emitter/enum_helpers.py | 6 ++++-- metadata-ingestion/src/datahub/emitter/mce_builder.py | 4 ++++ .../src/datahub/utilities/urns/_urn_base.py | 8 ++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/enum_helpers.py b/metadata-ingestion/src/datahub/emitter/enum_helpers.py index 89949ab3717ff3..89916ffd899c3f 100644 --- a/metadata-ingestion/src/datahub/emitter/enum_helpers.py +++ b/metadata-ingestion/src/datahub/emitter/enum_helpers.py @@ -1,11 +1,13 @@ from typing import List, Type +from typing_extensions import LiteralString -def get_enum_options(_class: Type[object]) -> List[str]: + +def get_enum_options(class_: Type[object]) -> List[LiteralString]: """Get the valid values for an enum in the datahub.metadata.schema_classes module.""" return [ value - for name, value in vars(_class).items() + for name, value in vars(class_).items() if not callable(value) and not name.startswith("_") ] diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index f5da90a86c9ef6..9fa060266a7ab8 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -440,6 +440,10 @@ def can_add_aspect_to_snapshot( def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool: + # TODO: This is specific to snapshot types. We have a more general method + # in `entity_supports_aspect`, which should be used instead. This method + # should be deprecated, and all usages should be replaced. + SnapshotType = type(mce.proposedSnapshot) return can_add_aspect_to_snapshot(SnapshotType, AspectType) diff --git a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py index e8e22cd85ac9ff..55144ebf61b197 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py +++ b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py @@ -6,6 +6,7 @@ from deprecated import deprecated from typing_extensions import Self +from datahub._codegen.aspect import _Aspect from datahub.utilities.urns.error import InvalidUrnError URN_TYPES: Dict[str, Type["_SpecificUrn"]] = {} @@ -270,7 +271,7 @@ def make_form_urn(form: str) -> str: class _SpecificUrn(Urn): - ENTITY_TYPE: str = "" + ENTITY_TYPE: ClassVar[str] = "" def __init_subclass__(cls) -> None: # Validate the subclass. @@ -286,7 +287,10 @@ def __init_subclass__(cls) -> None: return super().__init_subclass__() @classmethod - def underlying_key_aspect_type(cls) -> Type: + def underlying_key_aspect_type(cls) -> Type[_Aspect]: + raise NotImplementedError() + + def to_key_aspect(self) -> _Aspect: raise NotImplementedError() @classmethod From da27014755dd1a48c62fee90eaed4790a09ff9ad Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 22 Jan 2025 15:03:38 -0800 Subject: [PATCH 3/4] remove abc.update_abstractmethods --- .../src/datahub/ingestion/api/decorators.py | 3 --- .../src/datahub/ingestion/api/source.py | 12 ++++++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py index 6aa534d5920eba..d32c0b85ceef4c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py +++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py @@ -1,4 +1,3 @@ -import abc from dataclasses import dataclass from enum import Enum, auto from typing import Callable, Dict, Optional, Type @@ -26,8 +25,6 @@ def wrapper(cls: Type) -> Type: # add the create method only if it has not been overridden from the base Source.create method cls.create = classmethod(default_create) - abc.update_abstractmethods(cls) - return cls return wrapper diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index e8f1a2fc3e4ff4..e401d6e976b6d0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -400,8 +400,16 @@ class Source(Closeable, metaclass=ABCMeta): ctx: PipelineContext @classmethod - @abstractmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: ... + def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: + # Technically, this method should be abstract. However, the @config_class + # decorator automatically generates a create method at runtime if one is + # not defined. Python still treats the class as abstract because it thinks + # the create method is missing. + # + # Once we're on Python 3.10, we can use the abc.update_abstractmethods(cls) + # method in the config_class decorator. That would allow us to make this + # method abstract. + raise NotImplementedError('sources must implement "create"') def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: """A list of functions that transforms the workunits produced by this source. From f0cbecf6b8b6fe02a62f864e3f90fe1728e08991 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 22 Jan 2025 15:08:15 -0800 Subject: [PATCH 4/4] add comment --- metadata-ingestion/src/datahub/ingestion/api/decorators.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py index d32c0b85ceef4c..b521dc5e9e7f5d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py +++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py @@ -25,6 +25,8 @@ def wrapper(cls: Type) -> Type: # add the create method only if it has not been overridden from the base Source.create method cls.create = classmethod(default_create) + # TODO: Once we're on Python 3.10, we should call abc.update_abstractmethods here. + return cls return wrapper