Skip to content

Commit

Permalink
feat: script source runner and more robust is main func
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Apr 27, 2024
1 parent 3c72a8a commit c38d4bd
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions src/cdf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,26 @@ def find_nearest(path: PathLike = ".") -> Project:
return project


def is_main(name: t.Optional[str] = None) -> bool:
def is_main(module_name: t.Optional[str] = None) -> bool:
"""Check if the current module is being run as the main program in cdf context.
Also injects a hook in debug mode to allow dropping into user code via pdb.
Args:
name (str, optional): The name of the module to check. If None, the calling module is
module_name (str, optional): The name of the module to check. If None, the calling module is
checked. The most idiomatic usage is to pass `__name__` to check the current module.
Returns:
bool: True if the current module is the main program in cdf context.
"""
frame = sys._getframe(1)

_main = frame.f_globals.get(c.CDF_MAIN)
_name = name or frame.f_globals["__name__"]
proceed = _name in ("__main__", _main)
proceed = False
frame = sys._getframe()
while frame.f_back:
program_name = frame.f_globals.get(c.CDF_MAIN)
caller_mod = module_name or frame.f_globals["__name__"]
if proceed := (module_name == "__main__" or caller_mod == program_name):
break
frame = frame.f_back

if proceed and context.debug_mode.get():

Expand Down Expand Up @@ -116,6 +119,29 @@ def transform_connection(type_: str, /, **kwargs) -> ConnectionConfig:
return parse_connection_config({"type": type_, **kwargs})


# TODO: lets add a param which allows us to pass a tuple of sources
# and maybe some lifecycle hooks
def run_script(
module_name: str,
/,
source: t.Union[t.Callable[..., dlt.sources.DltSource], dlt.sources.DltSource],
*,
run_options: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> None:
"""A shorthand syntax for a cdf script with a single source which should run as a pipeline.
The first argument should almost always be `__name__`. This function conditionally executes
the source if the module is the main program in cdf context. This occurs either when invoked
through cdf <workspace> pipeline command or when the script is run directly by python.
"""
if is_main(module_name):
if callable(source):
source = source(**kwargs)
run_options = run_options or {}
print(pipeline().run(source, **run_options))


__all__ = [
"pipeline",
"is_main",
Expand Down

0 comments on commit c38d4bd

Please sign in to comment.