Skip to content

Commit

Permalink
feat: update top level import namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Jul 27, 2024
1 parent e0c4d43 commit 51b973f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 170 deletions.
201 changes: 31 additions & 170 deletions src/cdf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,173 +1,34 @@
import os
import pdb
import sys
import traceback
import typing as t
from pathlib import Path

import dlt
from dlt.sources.helpers import requests
from sqlmesh.core.config import ConnectionConfig, GatewayConfig, parse_connection_config

import cdf.legacy.constants as c
import cdf.legacy.context as context
import cdf.legacy.logger as logger
from cdf.legacy.project import Project, Workspace, load_project
from cdf.legacy.runtime import pipeline
from cdf.types import M, PathLike


@M.result
def find_nearest(path: t.Optional[PathLike] = None) -> Project:
"""Find the nearest project.
Recursively searches for a project file in the parent directories.
Args:
path (PathLike, optional): The path to start searching from. Defaults to ".".
Raises:
FileNotFoundError: If no project is found.
Returns:
Project: The nearest project.
"""
if path is None:
path = os.getenv("CDF_ROOT", ".")
project = None
path = Path(path).resolve()
if path.is_file():
path = path.parent
errors = []
tree = [path, *list(path.parents)]
while tree:
node = tree.pop(0)
logger.debug(f"Searching for project in {node}")
p = load_project(node)
if p.is_err():
errors.append(p.unwrap_err())
else:
project = p.unwrap()
break
if project is None:
for error in errors:
logger.error(error)
raise FileNotFoundError("No project found.") from errors[0]
return project


def is_main(module_name: t.Optional[str] = None) -> bool:
"""Check if the current module is being run as the main program in cdf context.
Also injects a hook in debug mode to allow dropping into user code via pdb.
Args:
module_name (str, optional): The name of the module to check. If None, the calling module is
checked. The most idiomatic usage is to pass `__name__` to check the current module.
Returns:
bool: True if the current module is the main program in cdf context.
"""
proceed = False
frame = sys._getframe()
while frame.f_back:
program_name = frame.f_globals.get(c.CDF_MAIN)
caller_mod = module_name or frame.f_globals["__name__"]
if proceed := (module_name == "__main__" or caller_mod == program_name):
break
frame = frame.f_back

if proceed and context.debug_mode.get():

def debug_hook(etype, value, tb) -> None:
traceback.print_exception(etype, value, tb)
pdb.post_mortem(tb)

sys.excepthook = debug_hook

return proceed


def get_active_project() -> Project:
"""Get the active project.
Raises:
ValueError: If no valid project is found in the context.
Returns:
Project: The active project.
"""
return context.active_project.get()


def get_workspace(path: t.Optional[PathLike] = None) -> M.Result[Workspace, Exception]:
"""Get a workspace from a path.
Args:
path (PathLike, optional): The path to get the workspace from. Defaults to ".".
Returns:
M.Result[Workspace, Exception]: The workspace or an error.
"""
return find_nearest(path).bind(
lambda p: (
p.get_workspace_from_path(path)
if path
else p.get_workspace(os.environ["CDF_WORKSPACE"])
)
)


with_config = dlt.sources.config.with_config

inject_config = dlt.config.value
inject_secret = dlt.secrets.value

session = requests.Client

transform_gateway = GatewayConfig
"""Gateway configuration for transforms."""


def transform_connection(type_: str, /, **kwargs) -> ConnectionConfig:
"""Create a connection configuration for transforms."""
return parse_connection_config({"type": type_, **kwargs})


# TODO: lets add a param which allows us to pass a tuple of sources
# and maybe some lifecycle hooks
def run_script(
module_name: str,
/,
source: t.Union[t.Callable[..., dlt.sources.DltSource], dlt.sources.DltSource],
*,
run_options: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> None:
"""A shorthand syntax for a cdf script with a single source which should run as a pipeline.
The first argument should almost always be `__name__`. This function conditionally executes
the source if the module is the main program in cdf context. This occurs either when invoked
through cdf <workspace> pipeline command or when the script is run directly by python.
"""
if is_main(module_name):
if callable(source):
source = source(**kwargs)
run_options = run_options or {}
print(pipeline().run(source, **run_options))

import cdf.core.configuration as conf
from cdf.core.component import (
DataPipeline,
DataPublisher,
Destination,
Operation,
Service,
Source,
)
from cdf.core.configuration import (
ConfigResolver,
Request,
map_config_section,
map_config_values,
)
from cdf.core.injector import Dependency, DependencyRegistry
from cdf.core.workspace import Workspace

__all__ = [
"pipeline",
"is_main",
"load_project",
"find_nearest",
"get_active_project",
"get_workspace",
"logger",
"with_config",
"inject_config",
"inject_secret",
"requests",
"session",
"conf",
"DataPipeline",
"DataPublisher",
"Destination",
"Operation",
"Service",
"Source",
"ConfigResolver",
"Request",
"map_config_section",
"map_config_values",
"Workspace",
"Dependency",
"DependencyRegistry",
]
3 changes: 3 additions & 0 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def main(
context.debug_mode.set(True)
logger.configure(log_level.upper() if log_level else "INFO")
logger.apply_patches()
logger.warning(
"The CDF CLI command is DEPRECATED and will be removed in a future release. A local python file which imports cdf and exposes the Workspace.cli method is the way to interact with CDF"
)
ctx.obj = load_project(path).bind(lambda p: p.get_workspace(workspace))


Expand Down

0 comments on commit 51b973f

Please sign in to comment.