From c38d4bd815a6b6363b4f8ce04cbc22ce56b3a6dd Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Fri, 26 Apr 2024 20:34:18 -0700 Subject: [PATCH] feat: script source runner and more robust is main func --- src/cdf/__init__.py | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/cdf/__init__.py b/src/cdf/__init__.py index f032e12..f687bed 100644 --- a/src/cdf/__init__.py +++ b/src/cdf/__init__.py @@ -42,23 +42,26 @@ def find_nearest(path: PathLike = ".") -> Project: return project -def is_main(name: t.Optional[str] = None) -> bool: +def is_main(module_name: t.Optional[str] = None) -> bool: """Check if the current module is being run as the main program in cdf context. Also injects a hook in debug mode to allow dropping into user code via pdb. Args: - name (str, optional): The name of the module to check. If None, the calling module is + module_name (str, optional): The name of the module to check. If None, the calling module is checked. The most idiomatic usage is to pass `__name__` to check the current module. Returns: bool: True if the current module is the main program in cdf context. """ - frame = sys._getframe(1) - - _main = frame.f_globals.get(c.CDF_MAIN) - _name = name or frame.f_globals["__name__"] - proceed = _name in ("__main__", _main) + proceed = False + frame = sys._getframe() + while frame.f_back: + program_name = frame.f_globals.get(c.CDF_MAIN) + caller_mod = module_name or frame.f_globals["__name__"] + if proceed := (module_name == "__main__" or caller_mod == program_name): + break + frame = frame.f_back if proceed and context.debug_mode.get(): @@ -116,6 +119,29 @@ def transform_connection(type_: str, /, **kwargs) -> ConnectionConfig: return parse_connection_config({"type": type_, **kwargs}) +# TODO: lets add a param which allows us to pass a tuple of sources +# and maybe some lifecycle hooks +def run_script( + module_name: str, + /, + source: t.Union[t.Callable[..., dlt.sources.DltSource], dlt.sources.DltSource], + *, + run_options: t.Optional[t.Dict[str, t.Any]] = None, + **kwargs: t.Any, +) -> None: + """A shorthand syntax for a cdf script with a single source which should run as a pipeline. + + The first argument should almost always be `__name__`. This function conditionally executes + the source if the module is the main program in cdf context. This occurs either when invoked + through cdf pipeline command or when the script is run directly by python. + """ + if is_main(module_name): + if callable(source): + source = source(**kwargs) + run_options = run_options or {} + print(pipeline().run(source, **run_options)) + + __all__ = [ "pipeline", "is_main",