From 5b070757a73fc870c077da821d532a64bd88317b Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 9 Oct 2024 14:11:19 +0200 Subject: [PATCH 1/5] make cli commands pluggable --- dlt/cli/_dlt.py | 530 +----------------- dlt/cli/command_wrappers.py | 187 ++++++ dlt/cli/plugins.py | 415 ++++++++++++++ dlt/common/configuration/plugins.py | 12 +- .../specs/pluggable_cli_command.py | 18 + tests/cli/common/test_telemetry_command.py | 15 +- 6 files changed, 661 insertions(+), 516 deletions(-) create mode 100644 dlt/cli/command_wrappers.py create mode 100644 dlt/cli/plugins.py create mode 100644 dlt/common/configuration/specs/pluggable_cli_command.py diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index 0a4a86b9de..e2a1b02c48 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -1,199 +1,19 @@ -from typing import Any, Sequence, Optional -import yaml -import os +from typing import Any, Sequence, Type, cast, List, Dict import argparse -import click from dlt.version import __version__ -from dlt.common.json import json -from dlt.common.schema import Schema -from dlt.common.typing import DictStrAny from dlt.common.runners import Venv +from dlt.common.configuration.specs.pluggable_cli_command import SupportsCliCommand import dlt.cli.echo as fmt -from dlt.cli import utils -from dlt.pipeline.exceptions import CannotRestorePipelineException -from dlt.cli.init_command import ( - init_command, - list_sources_command, - DLT_INIT_DOCS_URL, - DEFAULT_VERIFIED_SOURCES_REPO, +from dlt.cli.command_wrappers import ( + deploy_command_wrapper, + telemetry_change_status_command_wrapper, ) -from dlt.cli.pipeline_command import pipeline_command, DLT_PIPELINE_COMMAND_DOCS_URL -from dlt.cli.telemetry_command import ( - DLT_TELEMETRY_DOCS_URL, - change_telemetry_status_command, - telemetry_status_command, -) - -try: - from dlt.cli import deploy_command - from dlt.cli.deploy_command import ( - PipelineWasNotRun, - DLT_DEPLOY_DOCS_URL, - DeploymentMethods, - COMMAND_DEPLOY_REPO_LOCATION, - SecretFormats, - ) -except ModuleNotFoundError: - pass DEBUG_FLAG = False - - -def on_exception(ex: Exception, info: str) -> None: - click.secho(str(ex), err=True, fg="red") - fmt.note("Please refer to %s for further assistance" % fmt.bold(info)) - if DEBUG_FLAG: - raise ex - - -@utils.track_command("init", False, "source_name", "destination_type") -def init_command_wrapper( - source_name: str, - destination_type: str, - repo_location: str, - branch: str, - omit_core_sources: bool = False, -) -> int: - try: - init_command( - source_name, - destination_type, - repo_location, - branch, - omit_core_sources, - ) - except Exception as ex: - on_exception(ex, DLT_INIT_DOCS_URL) - return -1 - return 0 - - -@utils.track_command("list_sources", False) -def list_sources_command_wrapper(repo_location: str, branch: str) -> int: - try: - list_sources_command(repo_location, branch) - except Exception as ex: - on_exception(ex, DLT_INIT_DOCS_URL) - return -1 - return 0 - - -@utils.track_command("deploy", False, "deployment_method") -def deploy_command_wrapper( - pipeline_script_path: str, - deployment_method: str, - repo_location: str, - branch: Optional[str] = None, - **kwargs: Any, -) -> int: - try: - utils.ensure_git_command("deploy") - except Exception as ex: - click.secho(str(ex), err=True, fg="red") - return -1 - - from git import InvalidGitRepositoryError, NoSuchPathError - - try: - deploy_command.deploy_command( - pipeline_script_path=pipeline_script_path, - deployment_method=deployment_method, - repo_location=repo_location, - branch=branch, - **kwargs, - ) - except (CannotRestorePipelineException, PipelineWasNotRun) as ex: - fmt.note( - "You must run the pipeline locally successfully at least once in order to deploy it." - ) - on_exception(ex, DLT_DEPLOY_DOCS_URL) - return -2 - except InvalidGitRepositoryError: - click.secho( - "No git repository found for pipeline script %s." % fmt.bold(pipeline_script_path), - err=True, - fg="red", - ) - fmt.note("If you do not have a repository yet, you can do either of:") - fmt.note( - "- Run the following command to initialize new repository: %s" % fmt.bold("git init") - ) - fmt.note( - "- Add your local code to Github as described here: %s" - % fmt.bold( - "https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github" - ) - ) - fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL)) - return -3 - except NoSuchPathError as path_ex: - click.secho("The pipeline script does not exist\n%s" % str(path_ex), err=True, fg="red") - return -4 - except Exception as ex: - on_exception(ex, DLT_DEPLOY_DOCS_URL) - return -5 - return 0 - - -@utils.track_command("pipeline", True, "operation") -def pipeline_command_wrapper( - operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, **command_kwargs: Any -) -> int: - try: - pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, **command_kwargs) - return 0 - except CannotRestorePipelineException as ex: - click.secho(str(ex), err=True, fg="red") - click.secho( - "Try command %s to restore the pipeline state from destination" - % fmt.bold(f"dlt pipeline {pipeline_name} sync") - ) - return -1 - except Exception as ex: - on_exception(ex, DLT_PIPELINE_COMMAND_DOCS_URL) - return -2 - - -@utils.track_command("schema", False, "operation") -def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool) -> int: - with open(file_path, "rb") as f: - if os.path.splitext(file_path)[1][1:] == "json": - schema_dict: DictStrAny = json.load(f) - else: - schema_dict = yaml.safe_load(f) - s = Schema.from_dict(schema_dict) - if format_ == "json": - schema_str = json.dumps(s.to_dict(remove_defaults=remove_defaults), pretty=True) - else: - schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults) - fmt.echo(schema_str) - return 0 - - -@utils.track_command("telemetry", False) -def telemetry_status_command_wrapper() -> int: - try: - telemetry_status_command() - except Exception as ex: - on_exception(ex, DLT_TELEMETRY_DOCS_URL) - return -1 - return 0 - - -@utils.track_command("telemetry_switch", False, "enabled") -def telemetry_change_status_command_wrapper(enabled: bool) -> int: - try: - change_telemetry_status_command(enabled) - except Exception as ex: - on_exception(ex, DLT_TELEMETRY_DOCS_URL) - return -1 - return 0 - - ACTION_EXECUTED = False @@ -304,276 +124,23 @@ def main() -> int: ) subparsers = parser.add_subparsers(dest="command") - init_cmd = subparsers.add_parser( - "init", - help=( - "Creates a pipeline project in the current folder by adding existing verified source or" - " creating a new one from template." - ), - ) - init_cmd.add_argument( - "--list-sources", - "-l", - default=False, - action="store_true", - help="List available sources", - ) - init_cmd.add_argument( - "source", - nargs="?", - help=( - "Name of data source for which to create a pipeline. Adds existing verified source or" - " creates a new pipeline template if verified source for your data source is not yet" - " implemented." - ), - ) - init_cmd.add_argument( - "destination", nargs="?", help="Name of a destination ie. bigquery or redshift" - ) - init_cmd.add_argument( - "--location", - default=DEFAULT_VERIFIED_SOURCES_REPO, - help="Advanced. Uses a specific url or local path to verified sources repository.", - ) - init_cmd.add_argument( - "--branch", - default=None, - help="Advanced. Uses specific branch of the init repository to fetch the template.", - ) - - init_cmd.add_argument( - "--omit-core-sources", - default=False, - action="store_true", - help=( - "When present, will not create the new pipeline with a core source of the given name" - " but will take a source of this name from the default or provided location." - ), - ) - - # deploy command requires additional dependencies - try: - # make sure the name is defined - _ = deploy_command - deploy_comm = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False - ) - deploy_comm.add_argument( - "--location", - default=COMMAND_DEPLOY_REPO_LOCATION, - help="Advanced. Uses a specific url or local path to pipelines repository.", - ) - deploy_comm.add_argument( - "--branch", - help="Advanced. Uses specific branch of the deploy repository to fetch the template.", - ) - - deploy_cmd = subparsers.add_parser( - "deploy", help="Creates a deployment package for a selected pipeline script" - ) - deploy_cmd.add_argument( - "pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script" - ) - deploy_sub_parsers = deploy_cmd.add_subparsers(dest="deployment_method") - - # deploy github actions - deploy_github_cmd = deploy_sub_parsers.add_parser( - DeploymentMethods.github_actions.value, - help="Deploys the pipeline to Github Actions", - parents=[deploy_comm], - ) - deploy_github_cmd.add_argument( - "--schedule", - required=True, - help=( - "A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *'" - " will run the pipeline every 30 minutes. Remember to enclose the scheduler" - " expression in quotation marks!" - ), - ) - deploy_github_cmd.add_argument( - "--run-manually", - default=True, - action="store_true", - help="Allows the pipeline to be run manually form Github Actions UI.", - ) - deploy_github_cmd.add_argument( - "--run-on-push", - default=False, - action="store_true", - help="Runs the pipeline with every push to the repository.", - ) - - # deploy airflow composer - deploy_airflow_cmd = deploy_sub_parsers.add_parser( - DeploymentMethods.airflow_composer.value, - help="Deploys the pipeline to Airflow", - parents=[deploy_comm], - ) - deploy_airflow_cmd.add_argument( - "--secrets-format", - default=SecretFormats.toml.value, - choices=[v.value for v in SecretFormats], - required=False, - help="Format of the secrets", - ) - except NameError: - # create placeholder command - deploy_cmd = subparsers.add_parser( - "deploy", - help=( - 'Install additional dependencies with pip install "dlt[cli]" to create deployment' - " packages" - ), - add_help=False, - ) - deploy_cmd.add_argument("--help", "-h", nargs="?", const=True) - deploy_cmd.add_argument( - "pipeline_script_path", metavar="pipeline-script-path", nargs=argparse.REMAINDER - ) - - schema = subparsers.add_parser("schema", help="Shows, converts and upgrades schemas") - schema.add_argument( - "file", help="Schema file name, in yaml or json format, will autodetect based on extension" - ) - schema.add_argument( - "--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format" - ) - schema.add_argument( - "--remove-defaults", - action="store_true", - help="Does not show default hint values", - default=True, - ) - - pipe_cmd = subparsers.add_parser( - "pipeline", help="Operations on pipelines that were ran locally" - ) - pipe_cmd.add_argument( - "--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines" - ) - pipe_cmd.add_argument( - "--hot-reload", - default=False, - action="store_true", - help="Reload streamlit app (for core development)", - ) - pipe_cmd.add_argument("pipeline_name", nargs="?", help="Pipeline name") - pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None) - pipe_cmd.add_argument( - "--verbose", - "-v", - action="count", - default=0, - help="Provides more information for certain commands.", - dest="verbosity", - ) - - pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False) - - pipe_cmd_sync_parent = argparse.ArgumentParser(add_help=False) - pipe_cmd_sync_parent.add_argument( - "--destination", help="Sync from this destination when local pipeline state is missing." - ) - pipe_cmd_sync_parent.add_argument( - "--dataset-name", help="Dataset name to sync from when local pipeline state is missing." - ) - - pipeline_subparsers.add_parser( - "info", help="Displays state of the pipeline, use -v or -vv for more info" - ) - pipeline_subparsers.add_parser( - "show", - help="Generates and launches Streamlit app with the loading status and dataset explorer", - ) - pipeline_subparsers.add_parser( - "failed-jobs", - help=( - "Displays information on all the failed loads in all completed packages, failed jobs" - " and associated error messages" - ), - ) - pipeline_subparsers.add_parser( - "drop-pending-packages", - help=( - "Deletes all extracted and normalized packages including those that are partially" - " loaded." - ), - ) - pipeline_subparsers.add_parser( - "sync", - help=( - "Drops the local state of the pipeline and resets all the schemas and restores it from" - " destination. The destination state, data and schemas are left intact." - ), - parents=[pipe_cmd_sync_parent], - ) - pipeline_subparsers.add_parser( - "trace", help="Displays last run trace, use -v or -vv for more info" - ) - pipe_cmd_schema = pipeline_subparsers.add_parser("schema", help="Displays default schema") - pipe_cmd_schema.add_argument( - "--format", - choices=["json", "yaml"], - default="yaml", - help="Display schema in this format", - ) - pipe_cmd_schema.add_argument( - "--remove-defaults", - action="store_true", - help="Does not show default hint values", - default=True, - ) + # load plugins + from dlt.common.configuration import plugins - pipe_cmd_drop = pipeline_subparsers.add_parser( - "drop", - help="Selectively drop tables and reset state", - parents=[pipe_cmd_sync_parent], - epilog=( - f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for more" - " info" - ), - ) - pipe_cmd_drop.add_argument( - "resources", - nargs="*", - help=( - "One or more resources to drop. Can be exact resource name(s) or regex pattern(s)." - " Regex patterns must start with re:" - ), - ) - pipe_cmd_drop.add_argument( - "--drop-all", - action="store_true", - default=False, - help="Drop all resources found in schema. Supersedes [resources] argument.", - ) - pipe_cmd_drop.add_argument( - "--state-paths", nargs="*", help="State keys or json paths to drop", default=() - ) - pipe_cmd_drop.add_argument( - "--schema", - help="Schema name to drop from (if other than default schema).", - dest="schema_name", - ) - pipe_cmd_drop.add_argument( - "--state-only", - action="store_true", - help="Only wipe state for matching resources without dropping tables.", - default=False, - ) - - pipe_cmd_package = pipeline_subparsers.add_parser( - "load-package", help="Displays information on load package, use -v or -vv for more info" - ) - pipe_cmd_package.add_argument( - "load_id", - metavar="load-id", - nargs="?", - help="Load id of completed or normalized package. Defaults to the most recent package.", - ) + m = plugins.manager() + commands = cast(List[Type[SupportsCliCommand]], m.hook.plug_cli()) - subparsers.add_parser("telemetry", help="Shows telemetry status") + # install available commands + installed_commands: Dict[str, SupportsCliCommand] = {} + for c in commands: + command = c() + # perevent plugins overwriting internal commands + assert ( + command.command not in installed_commands.keys() + ), f"Command {command.command} is already installed" + command_parser = subparsers.add_parser(command.command, help=command.help_string) + command.configure_parser(command_parser) + installed_commands[command.command] = command args = parser.parse_args() @@ -585,61 +152,8 @@ def main() -> int: " the current virtual environment instead." ) - if args.command == "schema": - return schema_command_wrapper(args.file, args.format, args.remove_defaults) - elif args.command == "pipeline": - if args.list_pipelines: - return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity) - else: - command_kwargs = dict(args._get_kwargs()) - if not command_kwargs.get("pipeline_name"): - pipe_cmd.print_usage() - return -1 - command_kwargs["operation"] = args.operation or "info" - del command_kwargs["command"] - del command_kwargs["list_pipelines"] - return pipeline_command_wrapper(**command_kwargs) - elif args.command == "init": - if args.list_sources: - return list_sources_command_wrapper(args.location, args.branch) - else: - if not args.source or not args.destination: - init_cmd.print_usage() - return -1 - else: - return init_command_wrapper( - args.source, - args.destination, - args.location, - args.branch, - args.omit_core_sources, - ) - elif args.command == "deploy": - try: - deploy_args = vars(args) - if deploy_args.get("deployment_method") is None: - print_help(deploy_cmd) - return -1 - else: - return deploy_command_wrapper( - pipeline_script_path=deploy_args.pop("pipeline_script_path"), - deployment_method=deploy_args.pop("deployment_method"), - repo_location=deploy_args.pop("location"), - branch=deploy_args.pop("branch"), - **deploy_args, - ) - except (NameError, KeyError): - fmt.warning( - "Please install additional command line dependencies to use deploy command:" - ) - fmt.secho('pip install "dlt[cli]"', bold=True) - fmt.echo( - "We ask you to install those dependencies separately to keep our core library small" - " and make it work everywhere." - ) - return -1 - elif args.command == "telemetry": - return telemetry_status_command_wrapper() + if args.command in installed_commands: + return installed_commands[args.command].execute(args) else: print_help(parser) return -1 diff --git a/dlt/cli/command_wrappers.py b/dlt/cli/command_wrappers.py new file mode 100644 index 0000000000..3deb38bf83 --- /dev/null +++ b/dlt/cli/command_wrappers.py @@ -0,0 +1,187 @@ +from typing import Any, Optional +import yaml +import os +import click + +from dlt.version import __version__ +from dlt.common.json import json +from dlt.common.schema import Schema +from dlt.common.typing import DictStrAny + +import dlt.cli.echo as fmt +from dlt.cli import utils +from dlt.pipeline.exceptions import CannotRestorePipelineException + +from dlt.cli.init_command import ( + init_command, + list_sources_command, + DLT_INIT_DOCS_URL, +) +from dlt.cli.pipeline_command import pipeline_command, DLT_PIPELINE_COMMAND_DOCS_URL +from dlt.cli.telemetry_command import ( + DLT_TELEMETRY_DOCS_URL, + change_telemetry_status_command, + telemetry_status_command, +) + +try: + from dlt.cli import deploy_command + from dlt.cli.deploy_command import ( + PipelineWasNotRun, + DLT_DEPLOY_DOCS_URL, + ) +except ModuleNotFoundError: + pass + +DEBUG_FLAG = False + + +def on_exception(ex: Exception, info: str) -> None: + click.secho(str(ex), err=True, fg="red") + fmt.note("Please refer to %s for further assistance" % fmt.bold(info)) + if DEBUG_FLAG: + raise ex + + +@utils.track_command("init", False, "source_name", "destination_type") +def init_command_wrapper( + source_name: str, + destination_type: str, + repo_location: str, + branch: str, + omit_core_sources: bool = False, +) -> int: + try: + init_command( + source_name, + destination_type, + repo_location, + branch, + omit_core_sources, + ) + except Exception as ex: + on_exception(ex, DLT_INIT_DOCS_URL) + return -1 + return 0 + + +@utils.track_command("list_sources", False) +def list_sources_command_wrapper(repo_location: str, branch: str) -> int: + try: + list_sources_command(repo_location, branch) + except Exception as ex: + on_exception(ex, DLT_INIT_DOCS_URL) + return -1 + return 0 + + +@utils.track_command("pipeline", True, "operation") +def pipeline_command_wrapper( + operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, **command_kwargs: Any +) -> int: + try: + pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, **command_kwargs) + return 0 + except CannotRestorePipelineException as ex: + click.secho(str(ex), err=True, fg="red") + click.secho( + "Try command %s to restore the pipeline state from destination" + % fmt.bold(f"dlt pipeline {pipeline_name} sync") + ) + return -1 + except Exception as ex: + on_exception(ex, DLT_PIPELINE_COMMAND_DOCS_URL) + return -2 + + +@utils.track_command("deploy", False, "deployment_method") +def deploy_command_wrapper( + pipeline_script_path: str, + deployment_method: str, + repo_location: str, + branch: Optional[str] = None, + **kwargs: Any, +) -> int: + try: + utils.ensure_git_command("deploy") + except Exception as ex: + click.secho(str(ex), err=True, fg="red") + return -1 + + from git import InvalidGitRepositoryError, NoSuchPathError + + try: + deploy_command.deploy_command( + pipeline_script_path=pipeline_script_path, + deployment_method=deployment_method, + repo_location=repo_location, + branch=branch, + **kwargs, + ) + except (CannotRestorePipelineException, PipelineWasNotRun) as ex: + fmt.note( + "You must run the pipeline locally successfully at least once in order to deploy it." + ) + on_exception(ex, DLT_DEPLOY_DOCS_URL) + return -2 + except InvalidGitRepositoryError: + click.secho( + "No git repository found for pipeline script %s." % fmt.bold(pipeline_script_path), + err=True, + fg="red", + ) + fmt.note("If you do not have a repository yet, you can do either of:") + fmt.note( + "- Run the following command to initialize new repository: %s" % fmt.bold("git init") + ) + fmt.note( + "- Add your local code to Github as described here: %s" + % fmt.bold( + "https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github" + ) + ) + fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL)) + return -3 + except NoSuchPathError as path_ex: + click.secho("The pipeline script does not exist\n%s" % str(path_ex), err=True, fg="red") + return -4 + except Exception as ex: + on_exception(ex, DLT_DEPLOY_DOCS_URL) + return -5 + return 0 + + +@utils.track_command("schema", False, "operation") +def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool) -> int: + with open(file_path, "rb") as f: + if os.path.splitext(file_path)[1][1:] == "json": + schema_dict: DictStrAny = json.load(f) + else: + schema_dict = yaml.safe_load(f) + s = Schema.from_dict(schema_dict) + if format_ == "json": + schema_str = json.dumps(s.to_dict(remove_defaults=remove_defaults), pretty=True) + else: + schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults) + fmt.echo(schema_str) + return 0 + + +@utils.track_command("telemetry", False) +def telemetry_status_command_wrapper() -> int: + try: + telemetry_status_command() + except Exception as ex: + on_exception(ex, DLT_TELEMETRY_DOCS_URL) + return -1 + return 0 + + +@utils.track_command("telemetry_switch", False, "enabled") +def telemetry_change_status_command_wrapper(enabled: bool) -> int: + try: + change_telemetry_status_command(enabled) + except Exception as ex: + on_exception(ex, DLT_TELEMETRY_DOCS_URL) + return -1 + return 0 diff --git a/dlt/cli/plugins.py b/dlt/cli/plugins.py new file mode 100644 index 0000000000..af74825905 --- /dev/null +++ b/dlt/cli/plugins.py @@ -0,0 +1,415 @@ +from typing import Type + +import argparse +import dlt.cli.echo as fmt + + +from dlt.common.configuration import plugins +from dlt.common.configuration.specs.pluggable_cli_command import SupportsCliCommand +from dlt.cli.init_command import ( + DEFAULT_VERIFIED_SOURCES_REPO, +) +from dlt.cli.command_wrappers import ( + init_command_wrapper, + list_sources_command_wrapper, + pipeline_command_wrapper, + schema_command_wrapper, + telemetry_status_command_wrapper, + deploy_command_wrapper, +) +from dlt.cli.pipeline_command import DLT_PIPELINE_COMMAND_DOCS_URL + +try: + from dlt.cli.deploy_command import ( + DeploymentMethods, + COMMAND_DEPLOY_REPO_LOCATION, + SecretFormats, + ) +except ModuleNotFoundError: + pass + + +@plugins.hookspec() +def plug_cli() -> SupportsCliCommand: + """Spec for plugin hook that returns current run context.""" + + +class InitCommand(SupportsCliCommand): + command = "init" + help_string = ( + "Creates a pipeline project in the current folder by adding existing verified source or" + " creating a new one from template." + ) + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + + parser.add_argument( + "--list-sources", + "-l", + default=False, + action="store_true", + help="List available sources", + ) + parser.add_argument( + "source", + nargs="?", + help=( + "Name of data source for which to create a pipeline. Adds existing verified" + " source or creates a new pipeline template if verified source for your data" + " source is not yet implemented." + ), + ) + parser.add_argument( + "destination", nargs="?", help="Name of a destination ie. bigquery or redshift" + ) + parser.add_argument( + "--location", + default=DEFAULT_VERIFIED_SOURCES_REPO, + help="Advanced. Uses a specific url or local path to verified sources repository.", + ) + parser.add_argument( + "--branch", + default=None, + help="Advanced. Uses specific branch of the init repository to fetch the template.", + ) + + parser.add_argument( + "--omit-core-sources", + default=False, + action="store_true", + help=( + "When present, will not create the new pipeline with a core source of the given" + " name but will take a source of this name from the default or provided" + " location." + ), + ) + + def execute(self, args: argparse.Namespace) -> int: + if args.list_sources: + return list_sources_command_wrapper(args.location, args.branch) + else: + if not args.source or not args.destination: + self.parser.print_usage() + return -1 + else: + return init_command_wrapper( + args.source, + args.destination, + args.location, + args.branch, + args.omit_core_sources, + ) + + +class PipelineCommand(SupportsCliCommand): + command = "pipeline" + help_string = "Operations on pipelines that were ran locally" + + def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None: + self.parser = pipe_cmd + + pipe_cmd.add_argument( + "--list-pipelines", + "-l", + default=False, + action="store_true", + help="List local pipelines", + ) + pipe_cmd.add_argument( + "--hot-reload", + default=False, + action="store_true", + help="Reload streamlit app (for core development)", + ) + pipe_cmd.add_argument("pipeline_name", nargs="?", help="Pipeline name") + pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None) + pipe_cmd.add_argument( + "--verbose", + "-v", + action="count", + default=0, + help="Provides more information for certain commands.", + dest="verbosity", + ) + + pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False) + + pipe_cmd_sync_parent = argparse.ArgumentParser(add_help=False) + pipe_cmd_sync_parent.add_argument( + "--destination", help="Sync from this destination when local pipeline state is missing." + ) + pipe_cmd_sync_parent.add_argument( + "--dataset-name", help="Dataset name to sync from when local pipeline state is missing." + ) + + pipeline_subparsers.add_parser( + "info", help="Displays state of the pipeline, use -v or -vv for more info" + ) + pipeline_subparsers.add_parser( + "show", + help=( + "Generates and launches Streamlit app with the loading status and dataset explorer" + ), + ) + pipeline_subparsers.add_parser( + "failed-jobs", + help=( + "Displays information on all the failed loads in all completed packages, failed" + " jobs and associated error messages" + ), + ) + pipeline_subparsers.add_parser( + "drop-pending-packages", + help=( + "Deletes all extracted and normalized packages including those that are partially" + " loaded." + ), + ) + pipeline_subparsers.add_parser( + "sync", + help=( + "Drops the local state of the pipeline and resets all the schemas and restores it" + " from destination. The destination state, data and schemas are left intact." + ), + parents=[pipe_cmd_sync_parent], + ) + pipeline_subparsers.add_parser( + "trace", help="Displays last run trace, use -v or -vv for more info" + ) + pipe_cmd_schema = pipeline_subparsers.add_parser("schema", help="Displays default schema") + pipe_cmd_schema.add_argument( + "--format", + choices=["json", "yaml"], + default="yaml", + help="Display schema in this format", + ) + pipe_cmd_schema.add_argument( + "--remove-defaults", + action="store_true", + help="Does not show default hint values", + default=True, + ) + + pipe_cmd_drop = pipeline_subparsers.add_parser( + "drop", + help="Selectively drop tables and reset state", + parents=[pipe_cmd_sync_parent], + epilog=( + f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for" + " more info" + ), + ) + pipe_cmd_drop.add_argument( + "resources", + nargs="*", + help=( + "One or more resources to drop. Can be exact resource name(s) or regex pattern(s)." + " Regex patterns must start with re:" + ), + ) + pipe_cmd_drop.add_argument( + "--drop-all", + action="store_true", + default=False, + help="Drop all resources found in schema. Supersedes [resources] argument.", + ) + pipe_cmd_drop.add_argument( + "--state-paths", nargs="*", help="State keys or json paths to drop", default=() + ) + pipe_cmd_drop.add_argument( + "--schema", + help="Schema name to drop from (if other than default schema).", + dest="schema_name", + ) + pipe_cmd_drop.add_argument( + "--state-only", + action="store_true", + help="Only wipe state for matching resources without dropping tables.", + default=False, + ) + + pipe_cmd_package = pipeline_subparsers.add_parser( + "load-package", help="Displays information on load package, use -v or -vv for more info" + ) + pipe_cmd_package.add_argument( + "load_id", + metavar="load-id", + nargs="?", + help="Load id of completed or normalized package. Defaults to the most recent package.", + ) + + def execute(self, args: argparse.Namespace) -> int: + if args.list_pipelines: + return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity) + else: + command_kwargs = dict(args._get_kwargs()) + if not command_kwargs.get("pipeline_name"): + self.parser.print_usage() + return -1 + command_kwargs["operation"] = args.operation or "info" + del command_kwargs["command"] + del command_kwargs["list_pipelines"] + return pipeline_command_wrapper(**command_kwargs) + + +class SchemaCommand(SupportsCliCommand): + command = "schema" + help_string = "Shows, converts and upgrades schemas" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + + parser.add_argument( + "file", + help="Schema file name, in yaml or json format, will autodetect based on extension", + ) + parser.add_argument( + "--format", + choices=["json", "yaml"], + default="yaml", + help="Display schema in this format", + ) + parser.add_argument( + "--remove-defaults", + action="store_true", + help="Does not show default hint values", + default=True, + ) + + def execute(self, args: argparse.Namespace) -> int: + return schema_command_wrapper(args.file, args.format, args.remove_defaults) + + +class TelemetryCommand(SupportsCliCommand): + command = "telemetry" + help_string = "Shows telemetry status" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + + def execute(self, args: argparse.Namespace) -> int: + return telemetry_status_command_wrapper() + + +# TODO: ensure the command reacts the correct way if dependencies are not installed +# thsi has changed a bit in this impl +class DeployCommand(SupportsCliCommand): + command = "deploy" + help_string = "Creates a deployment package for a selected pipeline script" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + deploy_cmd = parser + deploy_comm = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False + ) + deploy_comm.add_argument( + "--location", + default=COMMAND_DEPLOY_REPO_LOCATION, + help="Advanced. Uses a specific url or local path to pipelines repository.", + ) + deploy_comm.add_argument( + "--branch", + help="Advanced. Uses specific branch of the deploy repository to fetch the template.", + ) + + deploy_cmd.add_argument( + "pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script" + ) + deploy_sub_parsers = deploy_cmd.add_subparsers(dest="deployment_method") + + # deploy github actions + deploy_github_cmd = deploy_sub_parsers.add_parser( + DeploymentMethods.github_actions.value, + help="Deploys the pipeline to Github Actions", + parents=[deploy_comm], + ) + deploy_github_cmd.add_argument( + "--schedule", + required=True, + help=( + "A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *'" + " will run the pipeline every 30 minutes. Remember to enclose the scheduler" + " expression in quotation marks!" + ), + ) + deploy_github_cmd.add_argument( + "--run-manually", + default=True, + action="store_true", + help="Allows the pipeline to be run manually form Github Actions UI.", + ) + deploy_github_cmd.add_argument( + "--run-on-push", + default=False, + action="store_true", + help="Runs the pipeline with every push to the repository.", + ) + + # deploy airflow composer + deploy_airflow_cmd = deploy_sub_parsers.add_parser( + DeploymentMethods.airflow_composer.value, + help="Deploys the pipeline to Airflow", + parents=[deploy_comm], + ) + deploy_airflow_cmd.add_argument( + "--secrets-format", + default=SecretFormats.toml.value, + choices=[v.value for v in SecretFormats], + required=False, + help="Format of the secrets", + ) + + def execute(self, args: argparse.Namespace) -> int: + try: + deploy_args = vars(args) + if deploy_args.get("deployment_method") is None: + self.parser.print_help() + return -1 + else: + return deploy_command_wrapper( + pipeline_script_path=deploy_args.pop("pipeline_script_path"), + deployment_method=deploy_args.pop("deployment_method"), + repo_location=deploy_args.pop("location"), + branch=deploy_args.pop("branch"), + **deploy_args, + ) + except (NameError, KeyError): + fmt.warning( + "Please install additional command line dependencies to use deploy command:" + ) + fmt.secho('pip install "dlt[cli]"', bold=True) + fmt.echo( + "We ask you to install those dependencies separately to keep our core library small" + " and make it work everywhere." + ) + return -1 + + +# +# Register all commands +# +@plugins.hookimpl(specname="plug_cli") +def plug_cli_init() -> Type[SupportsCliCommand]: + return InitCommand + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_pipeline() -> Type[SupportsCliCommand]: + return PipelineCommand + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_schema() -> Type[SupportsCliCommand]: + return SchemaCommand + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_telemetry() -> Type[SupportsCliCommand]: + return TelemetryCommand + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_deploy() -> Type[SupportsCliCommand]: + return DeployCommand diff --git a/dlt/common/configuration/plugins.py b/dlt/common/configuration/plugins.py index 727725a758..ac9cdd56a8 100644 --- a/dlt/common/configuration/plugins.py +++ b/dlt/common/configuration/plugins.py @@ -17,12 +17,20 @@ def __init__(self) -> None: super().__init__() self.manager = pluggy.PluginManager("dlt") - # we need to solve circular deps somehow + # TODO: we need to solve circular deps somehow + + # run_context from dlt.common.runtime import run_context - # register self.manager.add_hookspecs(run_context) self.manager.register(run_context) + + # cli + from dlt.cli import plugins + + self.manager.add_hookspecs(plugins) + self.manager.register(plugins) + load_setuptools_entrypoints(self.manager) diff --git a/dlt/common/configuration/specs/pluggable_cli_command.py b/dlt/common/configuration/specs/pluggable_cli_command.py new file mode 100644 index 0000000000..fd4fbb35f7 --- /dev/null +++ b/dlt/common/configuration/specs/pluggable_cli_command.py @@ -0,0 +1,18 @@ +from typing import Protocol + +import argparse + + +class SupportsCliCommand(Protocol): + """Protocol for defining one dlt cli command""" + + command: str + help_string: str + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + """Configures the parser for the given argument""" + ... + + def execute(self, args: argparse.Namespace) -> int: + """Executes the command with the given arguments""" + ... diff --git a/tests/cli/common/test_telemetry_command.py b/tests/cli/common/test_telemetry_command.py index fc67dde5fa..b0a3ff502c 100644 --- a/tests/cli/common/test_telemetry_command.py +++ b/tests/cli/common/test_telemetry_command.py @@ -140,14 +140,17 @@ def instrument_raises_2(in_raises_2: bool) -> int: def test_instrumentation_wrappers() -> None: - from dlt.cli._dlt import ( - init_command_wrapper, - list_sources_command_wrapper, + from dlt.cli.deploy_command import ( + DeploymentMethods, + COMMAND_DEPLOY_REPO_LOCATION, + ) + from dlt.cli.init_command import ( DEFAULT_VERIFIED_SOURCES_REPO, - pipeline_command_wrapper, + ) + from dlt.cli.command_wrappers import ( + init_command_wrapper, deploy_command_wrapper, - COMMAND_DEPLOY_REPO_LOCATION, - DeploymentMethods, + list_sources_command_wrapper, ) with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send): From d19192ef63b2613aa6db33ab6d65edd7fa1d7b02 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 9 Oct 2024 14:33:57 +0200 Subject: [PATCH 2/5] make deploy command behave correctly if not available --- dlt/cli/plugins.py | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/dlt/cli/plugins.py b/dlt/cli/plugins.py index af74825905..0cdd3e7e96 100644 --- a/dlt/cli/plugins.py +++ b/dlt/cli/plugins.py @@ -25,8 +25,10 @@ COMMAND_DEPLOY_REPO_LOCATION, SecretFormats, ) + + deploy_command_available = True except ModuleNotFoundError: - pass + deploy_command_available = False @plugins.hookspec() @@ -304,6 +306,14 @@ def configure_parser(self, parser: argparse.ArgumentParser) -> None: deploy_comm = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False ) + + deploy_cmd.add_argument( + "pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script" + ) + + if not deploy_command_available: + return + deploy_comm.add_argument( "--location", default=COMMAND_DEPLOY_REPO_LOCATION, @@ -314,9 +324,6 @@ def configure_parser(self, parser: argparse.ArgumentParser) -> None: help="Advanced. Uses specific branch of the deploy repository to fetch the template.", ) - deploy_cmd.add_argument( - "pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script" - ) deploy_sub_parsers = deploy_cmd.add_subparsers(dest="deployment_method") # deploy github actions @@ -362,20 +369,8 @@ def configure_parser(self, parser: argparse.ArgumentParser) -> None: ) def execute(self, args: argparse.Namespace) -> int: - try: - deploy_args = vars(args) - if deploy_args.get("deployment_method") is None: - self.parser.print_help() - return -1 - else: - return deploy_command_wrapper( - pipeline_script_path=deploy_args.pop("pipeline_script_path"), - deployment_method=deploy_args.pop("deployment_method"), - repo_location=deploy_args.pop("location"), - branch=deploy_args.pop("branch"), - **deploy_args, - ) - except (NameError, KeyError): + # exit if deploy command is not available + if not deploy_command_available: fmt.warning( "Please install additional command line dependencies to use deploy command:" ) @@ -386,6 +381,19 @@ def execute(self, args: argparse.Namespace) -> int: ) return -1 + deploy_args = vars(args) + if deploy_args.get("deployment_method") is None: + self.parser.print_help() + return -1 + else: + return deploy_command_wrapper( + pipeline_script_path=deploy_args.pop("pipeline_script_path"), + deployment_method=deploy_args.pop("deployment_method"), + repo_location=deploy_args.pop("location"), + branch=deploy_args.pop("branch"), + **deploy_args, + ) + # # Register all commands From 88760b1993f2285aa07cedf82112116f9dc44557 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 9 Oct 2024 16:40:05 +0200 Subject: [PATCH 3/5] add global debug flag --- dlt/cli/_dlt.py | 5 ++--- dlt/cli/command_wrappers.py | 5 ++--- dlt/cli/debug.py | 18 ++++++++++++++++++ tests/cli/common/test_cli_invoke.py | 4 ++-- 4 files changed, 24 insertions(+), 8 deletions(-) create mode 100644 dlt/cli/debug.py diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index e2a1b02c48..5ca26fcc8b 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -11,9 +11,9 @@ deploy_command_wrapper, telemetry_change_status_command_wrapper, ) +from dlt.cli import debug -DEBUG_FLAG = False ACTION_EXECUTED = False @@ -88,9 +88,8 @@ def __call__( values: Any, option_string: str = None, ) -> None: - global DEBUG_FLAG # will show stack traces (and maybe more debug things) - DEBUG_FLAG = True + debug.enable_debug() def main() -> int: diff --git a/dlt/cli/command_wrappers.py b/dlt/cli/command_wrappers.py index 3deb38bf83..6b98bac0e1 100644 --- a/dlt/cli/command_wrappers.py +++ b/dlt/cli/command_wrappers.py @@ -23,6 +23,7 @@ change_telemetry_status_command, telemetry_status_command, ) +from dlt.cli import debug try: from dlt.cli import deploy_command @@ -33,13 +34,11 @@ except ModuleNotFoundError: pass -DEBUG_FLAG = False - def on_exception(ex: Exception, info: str) -> None: click.secho(str(ex), err=True, fg="red") fmt.note("Please refer to %s for further assistance" % fmt.bold(info)) - if DEBUG_FLAG: + if debug.is_debug_enabled(): raise ex diff --git a/dlt/cli/debug.py b/dlt/cli/debug.py new file mode 100644 index 0000000000..18cfd284ce --- /dev/null +++ b/dlt/cli/debug.py @@ -0,0 +1,18 @@ +"""Provides a global debug setting for the CLI""" + +_DEBUG_FLAG = False + + +def enable_debug() -> None: + global _DEBUG_FLAG + _DEBUG_FLAG = True + + +def disable_debug() -> None: + global _DEBUG_FLAG + _DEBUG_FLAG = False + + +def is_debug_enabled() -> bool: + global _DEBUG_FLAG + return _DEBUG_FLAG diff --git a/tests/cli/common/test_cli_invoke.py b/tests/cli/common/test_cli_invoke.py index 97db8ab86b..eef1af03ad 100644 --- a/tests/cli/common/test_cli_invoke.py +++ b/tests/cli/common/test_cli_invoke.py @@ -86,9 +86,9 @@ def test_invoke_pipeline(script_runner: ScriptRunner) -> None: assert "LoadPackageNotFound" in result.stderr finally: # reset debug flag so other tests may pass - from dlt.cli import _dlt + from dlt.cli import debug - _dlt.DEBUG_FLAG = False + debug.disable_debug() def test_invoke_init_chess_and_template(script_runner: ScriptRunner) -> None: From 143b412168f1c1ec248907da3462c8133454a779 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 10 Oct 2024 15:50:28 +0200 Subject: [PATCH 4/5] * move plugin interface * add tests for cli plugin discovery * allow plugins to overwrite core cli commands --- dlt/cli/__init__.py | 1 + dlt/cli/_dlt.py | 13 ++++--- dlt/cli/plugins.py | 2 +- .../reference.py} | 0 .../dlt_example_plugin/__init__.py | 39 ++++++++++++++++++- tests/plugins/test_plugin_discovery.py | 13 +++++++ 6 files changed, 61 insertions(+), 7 deletions(-) rename dlt/{common/configuration/specs/pluggable_cli_command.py => cli/reference.py} (100%) diff --git a/dlt/cli/__init__.py b/dlt/cli/__init__.py index e69de29bb2..2c129d95b7 100644 --- a/dlt/cli/__init__.py +++ b/dlt/cli/__init__.py @@ -0,0 +1 @@ +from dlt.cli.reference import SupportsCliCommand diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index 5ca26fcc8b..5c5adbf914 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -3,7 +3,7 @@ from dlt.version import __version__ from dlt.common.runners import Venv -from dlt.common.configuration.specs.pluggable_cli_command import SupportsCliCommand +from dlt.cli import SupportsCliCommand import dlt.cli.echo as fmt @@ -67,6 +67,10 @@ def __call__( option_string: str = None, ) -> None: fmt.ALWAYS_CHOOSE_DEFAULT = True + fmt.note( + "Non interactive mode. Default choices are automatically made for confirmations and" + " prompts." + ) class DebugAction(argparse.Action): @@ -128,15 +132,14 @@ def main() -> int: m = plugins.manager() commands = cast(List[Type[SupportsCliCommand]], m.hook.plug_cli()) + # NOTE: plugin commands are added in reverse order so that the last added command (coming from external plugin) + # overwrites internal commands + commands.reverse() # install available commands installed_commands: Dict[str, SupportsCliCommand] = {} for c in commands: command = c() - # perevent plugins overwriting internal commands - assert ( - command.command not in installed_commands.keys() - ), f"Command {command.command} is already installed" command_parser = subparsers.add_parser(command.command, help=command.help_string) command.configure_parser(command_parser) installed_commands[command.command] = command diff --git a/dlt/cli/plugins.py b/dlt/cli/plugins.py index 0cdd3e7e96..2041d6b369 100644 --- a/dlt/cli/plugins.py +++ b/dlt/cli/plugins.py @@ -5,7 +5,7 @@ from dlt.common.configuration import plugins -from dlt.common.configuration.specs.pluggable_cli_command import SupportsCliCommand +from dlt.cli import SupportsCliCommand from dlt.cli.init_command import ( DEFAULT_VERIFIED_SOURCES_REPO, ) diff --git a/dlt/common/configuration/specs/pluggable_cli_command.py b/dlt/cli/reference.py similarity index 100% rename from dlt/common/configuration/specs/pluggable_cli_command.py rename to dlt/cli/reference.py diff --git a/tests/plugins/dlt_example_plugin/dlt_example_plugin/__init__.py b/tests/plugins/dlt_example_plugin/dlt_example_plugin/__init__.py index 345559e701..4377196320 100644 --- a/tests/plugins/dlt_example_plugin/dlt_example_plugin/__init__.py +++ b/tests/plugins/dlt_example_plugin/dlt_example_plugin/__init__.py @@ -1,8 +1,11 @@ import os -from typing import ClassVar +import argparse + +from typing import ClassVar, Type from dlt.common.configuration import plugins from dlt.common.configuration.specs.pluggable_run_context import SupportsRunContext +from dlt.cli import SupportsCliCommand from dlt.common.runtime.run_context import RunContext, DOT_DLT from tests.utils import TEST_STORAGE_ROOT @@ -27,3 +30,37 @@ def data_dir(self) -> str: @plugins.hookimpl(specname="plug_run_context") def plug_run_context_impl() -> SupportsRunContext: return RunContextTest() + + +class ExampleCommand(SupportsCliCommand): + command: str = "example" + help_string: str = "Example command" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + parser.add_argument("--name", type=str, help="Name to print") + + def execute(self, args: argparse.Namespace) -> int: + print(f"Example command executed with name: {args.name}") + return 33 + + +class InitCommand(SupportsCliCommand): + command: str = "init" + help_string: str = "Init command" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + pass + + def execute(self, args: argparse.Namespace) -> int: + print("Plugin overwrote init command") + return 55 + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_example() -> Type[SupportsCliCommand]: + return ExampleCommand + + +@plugins.hookimpl(specname="plug_cli") +def plug_cli_init_new() -> Type[SupportsCliCommand]: + return InitCommand diff --git a/tests/plugins/test_plugin_discovery.py b/tests/plugins/test_plugin_discovery.py index 3fe18860d7..6bb85d04f5 100644 --- a/tests/plugins/test_plugin_discovery.py +++ b/tests/plugins/test_plugin_discovery.py @@ -11,6 +11,7 @@ from dlt.common.configuration import plugins from dlt.common.runtime import run_context from tests.utils import TEST_STORAGE_ROOT +from pytest_console_scripts import ScriptRunner @pytest.fixture(scope="module", autouse=True) @@ -51,3 +52,15 @@ def test_example_plugin() -> None: context = run_context.current() assert context.name == "dlt-test" assert context.data_dir == os.path.abspath(TEST_STORAGE_ROOT) + + +def test_cli_hook(script_runner: ScriptRunner) -> None: + # new command + result = script_runner.run(["dlt", "example", "--name", "John"]) + assert result.returncode == 33 + assert "Example command executed with name: John" in result.stdout + + # overwritten pipeline command + result = script_runner.run(["dlt", "init"]) + assert result.returncode == 55 + assert "Plugin overwrote init command" in result.stdout From 76ca3686eeadddd6fb63d960601dea393a5b4ec7 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 10 Oct 2024 16:32:38 +0200 Subject: [PATCH 5/5] ensure plugins take precedence over core commands --- dlt/cli/_dlt.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index 5c5adbf914..4b7f217e24 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -132,14 +132,13 @@ def main() -> int: m = plugins.manager() commands = cast(List[Type[SupportsCliCommand]], m.hook.plug_cli()) - # NOTE: plugin commands are added in reverse order so that the last added command (coming from external plugin) - # overwrites internal commands - commands.reverse() # install available commands installed_commands: Dict[str, SupportsCliCommand] = {} for c in commands: command = c() + if command.command in installed_commands.keys(): + continue command_parser = subparsers.add_parser(command.command, help=command.help_string) command.configure_parser(command_parser) installed_commands[command.command] = command