From 51b973f69a4e53c14ec995b7b63b4123200f3f35 Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Sat, 27 Jul 2024 02:01:49 -0700 Subject: [PATCH] feat: update top level import namespace --- src/cdf/__init__.py | 201 +++++++------------------------------------- src/cdf/cli.py | 3 + 2 files changed, 34 insertions(+), 170 deletions(-) diff --git a/src/cdf/__init__.py b/src/cdf/__init__.py index 002441d..4df446a 100644 --- a/src/cdf/__init__.py +++ b/src/cdf/__init__.py @@ -1,173 +1,34 @@ -import os -import pdb -import sys -import traceback -import typing as t -from pathlib import Path - -import dlt -from dlt.sources.helpers import requests -from sqlmesh.core.config import ConnectionConfig, GatewayConfig, parse_connection_config - -import cdf.legacy.constants as c -import cdf.legacy.context as context -import cdf.legacy.logger as logger -from cdf.legacy.project import Project, Workspace, load_project -from cdf.legacy.runtime import pipeline -from cdf.types import M, PathLike - - -@M.result -def find_nearest(path: t.Optional[PathLike] = None) -> Project: - """Find the nearest project. - - Recursively searches for a project file in the parent directories. - - Args: - path (PathLike, optional): The path to start searching from. Defaults to ".". - - Raises: - FileNotFoundError: If no project is found. - - Returns: - Project: The nearest project. - """ - if path is None: - path = os.getenv("CDF_ROOT", ".") - project = None - path = Path(path).resolve() - if path.is_file(): - path = path.parent - errors = [] - tree = [path, *list(path.parents)] - while tree: - node = tree.pop(0) - logger.debug(f"Searching for project in {node}") - p = load_project(node) - if p.is_err(): - errors.append(p.unwrap_err()) - else: - project = p.unwrap() - break - if project is None: - for error in errors: - logger.error(error) - raise FileNotFoundError("No project found.") from errors[0] - return project - - -def is_main(module_name: t.Optional[str] = None) -> bool: - """Check if the current module is being run as the main program in cdf context. - - Also injects a hook in debug mode to allow dropping into user code via pdb. - - Args: - module_name (str, optional): The name of the module to check. If None, the calling module is - checked. The most idiomatic usage is to pass `__name__` to check the current module. - - Returns: - bool: True if the current module is the main program in cdf context. - """ - proceed = False - frame = sys._getframe() - while frame.f_back: - program_name = frame.f_globals.get(c.CDF_MAIN) - caller_mod = module_name or frame.f_globals["__name__"] - if proceed := (module_name == "__main__" or caller_mod == program_name): - break - frame = frame.f_back - - if proceed and context.debug_mode.get(): - - def debug_hook(etype, value, tb) -> None: - traceback.print_exception(etype, value, tb) - pdb.post_mortem(tb) - - sys.excepthook = debug_hook - - return proceed - - -def get_active_project() -> Project: - """Get the active project. - - Raises: - ValueError: If no valid project is found in the context. - - Returns: - Project: The active project. - """ - return context.active_project.get() - - -def get_workspace(path: t.Optional[PathLike] = None) -> M.Result[Workspace, Exception]: - """Get a workspace from a path. - - Args: - path (PathLike, optional): The path to get the workspace from. Defaults to ".". - - Returns: - M.Result[Workspace, Exception]: The workspace or an error. - """ - return find_nearest(path).bind( - lambda p: ( - p.get_workspace_from_path(path) - if path - else p.get_workspace(os.environ["CDF_WORKSPACE"]) - ) - ) - - -with_config = dlt.sources.config.with_config - -inject_config = dlt.config.value -inject_secret = dlt.secrets.value - -session = requests.Client - -transform_gateway = GatewayConfig -"""Gateway configuration for transforms.""" - - -def transform_connection(type_: str, /, **kwargs) -> ConnectionConfig: - """Create a connection configuration for transforms.""" - return parse_connection_config({"type": type_, **kwargs}) - - -# TODO: lets add a param which allows us to pass a tuple of sources -# and maybe some lifecycle hooks -def run_script( - module_name: str, - /, - source: t.Union[t.Callable[..., dlt.sources.DltSource], dlt.sources.DltSource], - *, - run_options: t.Optional[t.Dict[str, t.Any]] = None, - **kwargs: t.Any, -) -> None: - """A shorthand syntax for a cdf script with a single source which should run as a pipeline. - - The first argument should almost always be `__name__`. This function conditionally executes - the source if the module is the main program in cdf context. This occurs either when invoked - through cdf pipeline command or when the script is run directly by python. - """ - if is_main(module_name): - if callable(source): - source = source(**kwargs) - run_options = run_options or {} - print(pipeline().run(source, **run_options)) - +import cdf.core.configuration as conf +from cdf.core.component import ( + DataPipeline, + DataPublisher, + Destination, + Operation, + Service, + Source, +) +from cdf.core.configuration import ( + ConfigResolver, + Request, + map_config_section, + map_config_values, +) +from cdf.core.injector import Dependency, DependencyRegistry +from cdf.core.workspace import Workspace __all__ = [ - "pipeline", - "is_main", - "load_project", - "find_nearest", - "get_active_project", - "get_workspace", - "logger", - "with_config", - "inject_config", - "inject_secret", - "requests", - "session", + "conf", + "DataPipeline", + "DataPublisher", + "Destination", + "Operation", + "Service", + "Source", + "ConfigResolver", + "Request", + "map_config_section", + "map_config_values", + "Workspace", + "Dependency", + "DependencyRegistry", ] diff --git a/src/cdf/cli.py b/src/cdf/cli.py index afdad94..26cf1ca 100644 --- a/src/cdf/cli.py +++ b/src/cdf/cli.py @@ -105,6 +105,9 @@ def main( context.debug_mode.set(True) logger.configure(log_level.upper() if log_level else "INFO") logger.apply_patches() + logger.warning( + "The CDF CLI command is DEPRECATED and will be removed in a future release. A local python file which imports cdf and exposes the Workspace.cli method is the way to interact with CDF" + ) ctx.obj = load_project(path).bind(lambda p: p.get_workspace(workspace))