diff --git a/src/cdf/core/component/__init__.py b/src/cdf/core/component/__init__.py index 9c9140c..308b450 100644 --- a/src/cdf/core/component/__init__.py +++ b/src/cdf/core/component/__init__.py @@ -1,10 +1,10 @@ import typing as t from .base import Component, Entrypoint, ServiceLevelAgreement -from .operation import Operation -from .pipeline import DataPipeline -from .publisher import DataPublisher -from .service import Service +from .operation import Operation, OperationProto +from .pipeline import DataPipeline, DataPipelineProto +from .publisher import DataPublisher, DataPublisherProto +from .service import Service, ServiceProto __all__ = [ "DataPipeline", @@ -20,10 +20,26 @@ "ServiceLevelAgreement", ] -ServiceDef = t.Union[Service, t.Dict[str, t.Any]] -DataPipelineDef = t.Union[DataPipeline, t.Dict[str, t.Any]] -DataPublisherDef = t.Union[DataPublisher, t.Dict[str, t.Any]] -OperationDef = t.Union[Operation, t.Dict[str, t.Any]] +ServiceDef = t.Union[ + Service, + t.Callable[..., ServiceProto], + t.Dict[str, t.Any], +] +DataPipelineDef = t.Union[ + DataPipeline, + t.Callable[..., DataPipelineProto], + t.Dict[str, t.Any], +] +DataPublisherDef = t.Union[ + DataPublisher, + t.Callable[..., DataPublisherProto], + t.Dict[str, t.Any], +] +OperationDef = t.Union[ + Operation, + t.Callable[..., OperationProto], + t.Dict[str, t.Any], +] TComponent = t.TypeVar("TComponent", bound=t.Union[Component, Entrypoint]) TComponentDef = t.TypeVar( diff --git a/src/cdf/core/component/operation.py b/src/cdf/core/component/operation.py index d5324f6..2feb527 100644 --- a/src/cdf/core/component/operation.py +++ b/src/cdf/core/component/operation.py @@ -1,5 +1,7 @@ from .base import Entrypoint +OperationProto = int -class Operation(Entrypoint[int], frozen=True): + +class Operation(Entrypoint[OperationProto], frozen=True): """A generic callable that returns an exit code.""" diff --git a/src/cdf/core/component/pipeline.py b/src/cdf/core/component/pipeline.py index c9ac985..d90f411 100644 --- a/src/cdf/core/component/pipeline.py +++ b/src/cdf/core/component/pipeline.py @@ -1,9 +1,7 @@ import inspect import typing as t -import pydantic - -from .base import Entrypoint, _get_bind_func, _unwrap_entrypoint +from .base import Entrypoint if t.TYPE_CHECKING: from dlt.common.destination import Destination as DltDestination @@ -11,17 +9,18 @@ from dlt.pipeline.pipeline import Pipeline as DltPipeline +DataPipelineProto = t.Tuple[ + "DltPipeline", + t.Union[ + t.Callable[..., "LoadInfo"], + t.Callable[..., t.Iterator["LoadInfo"]], + ], # run + t.List[t.Callable[..., None]], # tests +] + + class DataPipeline( - Entrypoint[ - t.Tuple[ - "DltPipeline", - t.Union[ - t.Callable[..., "LoadInfo"], - t.Callable[..., t.Iterator["LoadInfo"]], - ], - t.List[t.Callable[..., None]], - ] - ], + Entrypoint[DataPipelineProto], frozen=True, ): """A data pipeline which loads data from a source to a destination.""" diff --git a/src/cdf/core/component/publisher.py b/src/cdf/core/component/publisher.py index e810ded..6a8af17 100644 --- a/src/cdf/core/component/publisher.py +++ b/src/cdf/core/component/publisher.py @@ -2,21 +2,16 @@ from .base import Entrypoint - -def _ping() -> bool: - """A default preflight check which always returns True.""" - return bool("pong") +DataPublisherProto = t.Tuple[ + t.Callable[..., None], # run + t.Callable[..., bool], # preflight + t.Optional[t.Callable[..., None]], # success hook + t.Optional[t.Callable[..., None]], # failure hook +] class DataPublisher( - Entrypoint[ - t.Tuple[ - t.Callable[..., None], # run - t.Callable[..., bool], # preflight - t.Optional[t.Callable[..., None]], # success hook - t.Optional[t.Callable[..., None]], # failure hook - ] - ], + Entrypoint[DataPublisherProto], frozen=True, ): """A data publisher which pushes data to an operational system.""" diff --git a/src/cdf/core/component/service.py b/src/cdf/core/component/service.py index 5f8449d..e33331b 100644 --- a/src/cdf/core/component/service.py +++ b/src/cdf/core/component/service.py @@ -2,6 +2,8 @@ from .base import Component +ServiceProto = t.Any -class Service(Component[t.Any], frozen=True): + +class Service(Component[ServiceProto], frozen=True): """A service that the workspace provides. IE an API, database, requests client, etc.""" diff --git a/src/cdf/core/workspace.py b/src/cdf/core/workspace.py index a8c4851..60425ea 100644 --- a/src/cdf/core/workspace.py +++ b/src/cdf/core/workspace.py @@ -448,7 +448,8 @@ def run(): name="exchangerate_pipeline", owner="Alex", description="A test pipeline", - ) + ), + test_pipeline, # we can use the proto directly with assumptions ], )