diff --git a/lint-workflow/Taskfile.yml b/lint-workflow/Taskfile.yml index 83315751..83c9b785 100644 --- a/lint-workflow/Taskfile.yml +++ b/lint-workflow/Taskfile.yml @@ -12,9 +12,17 @@ tasks: cmds: - pipenv run pytest tests - test:unit:cov: + test:unit: + cmds: + - pipenv run pytest tests + + test:cov: + cmds: + - pipenv run pytest --cov-report term --cov=src tests + + test:cov:detailed: cmds: - - pipenv run pytest tests --cov=src + - pipenv run pytest --cov-report term-missing --cov=src tests test:e2e:lint: cmds: diff --git a/lint-workflow/cli.py b/lint-workflow/cli.py index b5d995c5..f659e09c 100644 --- a/lint-workflow/cli.py +++ b/lint-workflow/cli.py @@ -6,13 +6,13 @@ # from src.rules import workflow_rules, job_rules, step_rules, uses_step_rules, run_step_rules import settings -from src.actions import Actions +from src.actions import ActionsCmd from src.utils import Settings, SettingsError -from src.lint import Linter +from src.lint import LinterCmd try: - lint_settings = Settings( + local_settings = Settings( enabled_rules=settings.enabled_rules, approved_actions=settings.approved_actions ) except: @@ -23,49 +23,24 @@ ) ) -linter = Linter(settings=lint_settings) -actions = Actions(settings=lint_settings) -# print(lint_rules.workflow) +def main(input_args=None): + """CLI utility to lint GitHub Action Workflows. + A CLI utility to enforce coding standards on GitHub Action workflows. The + utility also provides other sub-commands to assist with other workflow + maintenance tasks; such as maintaining the list of approved GitHub Actions. + """ + linter_cmd = LinterCmd(settings=local_settings) + actions_cmd = ActionsCmd(settings=local_settings) -def main(input_args=None): # Read arguments from command line. parser = argparse.ArgumentParser(prog="workflow-linter") parser.add_argument("-v", "--verbose", action="store_true", default=False) subparsers = parser.add_subparsers(required=True, dest="command") - parser_actions = subparsers.add_parser("actions", help="actions help") - parser_actions.add_argument( - "-o", "--output", action="store", default="actions.json" - ) - subparsers_actions = parser_actions.add_subparsers( - required=True, dest="actions_command" - ) - - parser_actions_update = subparsers_actions.add_parser( - "update", help="update action versions" - ) - - parser_actions_add = subparsers_actions.add_parser( - "add", help="add action to approved list" - ) - parser_actions_add.add_argument("name", help="action name [git owener/repo]") - - parser_lint = subparsers.add_parser("lint", help="lint help") - parser_lint.add_argument( - "-s", - "--strict", - action="store_true", - help="return non-zero exit code on warnings as well as errors", - ) - parser_lint.add_argument("-f", "--files", action="append", help="files to lint") - parser_lint.add_argument( - "--output", - action="store", - help="output format: [stdout|json|md]", - default="stdout", - ) + subparsers = LinterCmd.extend_parser(subparsers) + subparsers = ActionsCmd.extend_parser(subparsers) # Pull the arguments from the command line input_args = sys.argv[1:] @@ -78,17 +53,16 @@ def main(input_args=None): print(f"Args:\n{args}") if args.command == "lint": - return linter.run(args.files) + return linter_cmd.run(args.files) if args.command == "actions": if args.actions_command == "add": - return actions.add(args.name, args.output) + return actions_cmd.add(args.name, args.output) elif args.actions_command == "update": - return actions.update(args.output) + return actions_cmd.update(args.output) return -1 if __name__ == "__main__": return_code = main() - # print(memoized_action_update_urls) sys.exit(return_code) diff --git a/lint-workflow/src/actions.py b/lint-workflow/src/actions.py index e85f6c83..f25f9917 100644 --- a/lint-workflow/src/actions.py +++ b/lint-workflow/src/actions.py @@ -1,3 +1,4 @@ +import argparse import json import logging import os @@ -9,10 +10,34 @@ from src.utils import Colors, LintFinding, Settings, SettingsError, Action -class Actions: +class ActionsCmd: def __init__(self, settings: Settings = None) -> None: self.settings = settings + @staticmethod + def extend_parser(subparsers: argparse.ArgumentParser) -> argparse.ArgumentParser: + """Extends the CLI subparser with the options for ActionCmd. + + Add 'actions add' and 'actions update' to the CLI as sub-commands + along with the options and arguments for each. + """ + parser_actions = subparsers.add_parser("actions", help="actions help") + parser_actions.add_argument( + "-o", "--output", action="store", default="actions.json" + ) + subparsers_actions = parser_actions.add_subparsers( + required=True, dest="actions_command" + ) + parser_actions_update = subparsers_actions.add_parser( + "update", help="update action versions" + ) + parser_actions_add = subparsers_actions.add_parser( + "add", help="add action to approved list" + ) + parser_actions_add.add_argument("name", help="action name [git owener/repo]") + + return subparsers + def get_github_api_response( self, url: str, action_name: str ) -> Union[urllib.response.BaseHTTPResponse, None]: @@ -89,7 +114,26 @@ def get_latest_version(self, action: Action) -> Tuple[str, str]: return Action(name=action.name, version=tag_name, sha=sha) + def save_actions(self, updated_actions: dict[str, Action], filename: str) -> None: + """Save Actions to disk. + + This is used to track the list of approved actions. + """ + with open(filename, "w") as action_file: + converted_updated_actions = { + name: asdict(action) for name, action in updated_actions.items() + } + action_file.write( + json.dumps(converted_updated_actions, indent=2, sort_keys=True) + ) + def add(self, new_action_name: str, filename: str) -> None: + """Sub-command to add a new Action to the list of approved Actions. + + 'actions add' will add an Action and all of its metadata and dump all + approved actions (including the new one) to either the default JSON file + or the one provided by '--output' + """ print("Actions: add") updated_actions = self.settings.approved_actions proposed_action = Action(name=new_action_name) @@ -98,15 +142,15 @@ def add(self, new_action_name: str, filename: str) -> None: latest = self.get_latest_version(proposed_action) updated_actions[latest.name] = latest - with open(filename, "w") as action_file: - converted_updated_actions = { - name: asdict(action) for name, action in updated_actions.items() - } - action_file.write( - json.dumps(converted_updated_actions, indent=2, sort_keys=True) - ) + self.save_actions(updated_actions, filename) def update(self, filename: str) -> None: + """Sub-command to update all of the versions of the approved actions. + + 'actions update' will update all of the approved to the newest version + and dump all of the new data to either the default JSON file or the + one provided by '--output' + """ print("Actions: update") updated_actions = {} for action in self.settings.approved_actions.values(): @@ -123,10 +167,5 @@ def update(self, filename: str) -> None: print(f" - {action.name} \033[{Colors.green}ok\033[0m") updated_actions[action.name] = latest_release - with open(filename, "w") as action_file: - converted_updated_actions = { - name: asdict(action) for name, action in updated_actions.items() - } - action_file.write( - json.dumps(converted_updated_actions, indent=2, sort_keys=True) - ) + self.save_actions(updated_actions, filename) + diff --git a/lint-workflow/src/lint.py b/lint-workflow/src/lint.py index f08a85e6..97ba8c6f 100644 --- a/lint-workflow/src/lint.py +++ b/lint-workflow/src/lint.py @@ -1,3 +1,4 @@ +import argparse import os from src.load import WorkflowBuilder, Rules @@ -10,10 +11,32 @@ } -class Linter: +class LinterCmd: def __init__(self, settings: Settings = None, verbose: bool = True) -> None: self.rules = Rules(settings=settings, verbose=verbose) + @staticmethod + def extend_parser(subparsers: argparse.ArgumentParser) -> argparse.ArgumentParser: + """Extends the CLI subparser with the options for LintCmd. + + Add 'lint' as a sub command along with its options and arguments + """ + parser_lint = subparsers.add_parser("lint", help="lint help") + parser_lint.add_argument( + "-s", + "--strict", + action="store_true", + help="return non-zero exit code on warnings as well as errors", + ) + parser_lint.add_argument("-f", "--files", action="append", help="files to lint") + parser_lint.add_argument( + "--output", + action="store", + help="output format: [stdout|json|md]", + default="stdout", + ) + return subparsers + def get_max_error_level(self, findings: list[LintFinding]) -> int: """Get max error level from list of findings.""" if len(findings) == 0: diff --git a/lint-workflow/tests/rules/test_name_capitalized.py b/lint-workflow/tests/rules/test_name_capitalized.py index bee36fae..12acf926 100644 --- a/lint-workflow/tests/rules/test_name_capitalized.py +++ b/lint-workflow/tests/rules/test_name_capitalized.py @@ -13,14 +13,10 @@ @pytest.fixture def correct_workflow(): - return WorkflowBuilder.build(f"{FIXTURE_DIR}/test-min.yaml") - - -@pytest.fixture -def incorrect_workflow_name(): workflow = """\ --- -name: test +name: Test Workflow + on: workflow_dispatch: @@ -36,10 +32,10 @@ def incorrect_workflow_name(): @pytest.fixture -def incorrect_job_name(): +def incorrect_workflow(): workflow = """\ --- -name: Test +name: test on: workflow_dispatch: @@ -48,31 +44,29 @@ def incorrect_job_name(): name: test runs-on: ubuntu-latest steps: - - name: Test + - name: test run: echo test """ return WorkflowBuilder.build(yaml=yaml.load(workflow), from_file=False) @pytest.fixture -def incorrect_step_name(): +def missing_name_workflow(): workflow = """\ --- -name: Test on: workflow_dispatch: jobs: job-key: - name: Test runs-on: ubuntu-latest steps: - - name: test - run: echo test + - run: echo test """ return WorkflowBuilder.build(yaml=yaml.load(workflow), from_file=False) + @pytest.fixture def rule(): return RuleNameCapitalized() @@ -89,16 +83,27 @@ def test_rule_on_correct_workflow(rule, correct_workflow): assert result == True -def test_rule_on_incorrect_workflow_name(rule, incorrect_workflow_name): - result, message = rule.fn(incorrect_workflow_name) +def test_rule_on_incorrect_workflow_name(rule, incorrect_workflow): + result, message = rule.fn(incorrect_workflow) assert result == False -def test_rule_on_incorrect_workflow_name(rule, incorrect_job_name): - result, message = rule.fn(incorrect_job_name.jobs["job-key"]) +def test_rule_on_incorrect_job_name(rule, incorrect_workflow): + result, message = rule.fn(incorrect_workflow.jobs["job-key"]) assert result == False -def test_rule_on_incorrect_workflow_name(rule, incorrect_step_name): - result, message = rule.fn(incorrect_step_name.jobs["job-key"].steps[0]) +def test_rule_on_incorrect_step_name(rule, incorrect_workflow): + result, message = rule.fn(incorrect_workflow.jobs["job-key"].steps[0]) assert result == False + + +def test_rule_on_missing_names(rule, missing_name_workflow): + result, message = rule.fn(missing_name_workflow) + assert result == True + + result, message = rule.fn(missing_name_workflow.jobs["job-key"]) + assert result == True + + result, message = rule.fn(missing_name_workflow.jobs["job-key"].steps[0]) + assert result == True diff --git a/lint-workflow/tests/rules/test_step_approved.py b/lint-workflow/tests/rules/test_step_approved.py index 2d3109b9..7f2d83a7 100644 --- a/lint-workflow/tests/rules/test_step_approved.py +++ b/lint-workflow/tests/rules/test_step_approved.py @@ -47,6 +47,12 @@ def correct_workflow(): - name: Test Bitwarden Action uses: bitwarden/gh-actions/get-keyvault-secrets@main + + - name: Test Local Action + uses: ./actions/test-action + + - name: Test Run Action + run: echo "test" """ return WorkflowBuilder.build(yaml=yaml.load(workflow), from_file=False) @@ -83,6 +89,12 @@ def test_rule_on_correct_workflow(rule, correct_workflow): result, message = rule.fn(correct_workflow.jobs["job-key"].steps[1]) assert result == True + result, message = rule.fn(correct_workflow.jobs["job-key"].steps[2]) + assert result == True + + result, message = rule.fn(correct_workflow.jobs["job-key"].steps[3]) + assert result == True + def test_rule_on_incorrect_workflow(rule, incorrect_workflow): result, message = rule.fn(incorrect_workflow.jobs["job-key"].steps[0]) @@ -92,3 +104,11 @@ def test_rule_on_incorrect_workflow(rule, incorrect_workflow): result, message = rule.fn(incorrect_workflow.jobs["job-key"].steps[1]) assert result == False assert "Action is out of date" in message + + +def test_fail_compatibility(rule, correct_workflow): + finding = rule.execute(correct_workflow) + assert "Workflow not compatible with" in finding.description + + finding = rule.execute(correct_workflow.jobs["job-key"]) + assert "Job not compatible with" in finding.description diff --git a/lint-workflow/tests/rules/test_step_pinned.py b/lint-workflow/tests/rules/test_step_pinned.py index 9206c74a..c917c769 100644 --- a/lint-workflow/tests/rules/test_step_pinned.py +++ b/lint-workflow/tests/rules/test_step_pinned.py @@ -27,6 +27,12 @@ def correct_workflow(): - name: Test Internal Action uses: bitwarden/gh-actions/get-keyvault-secrets@main + + - name: Test Local Action + uses: ./actions/test-action + + - name: Test Run Action + run: echo "test" """ return WorkflowBuilder.build(yaml=yaml.load(workflow), from_file=False) @@ -66,6 +72,12 @@ def test_rule_on_correct_workflow(rule, correct_workflow): result, message = rule.fn(correct_workflow.jobs["job-key"].steps[1]) assert result == True + result, message = rule.fn(correct_workflow.jobs["job-key"].steps[2]) + assert result == True + + result, message = rule.fn(correct_workflow.jobs["job-key"].steps[3]) + assert result == True + def test_rule_on_incorrect_workflow_external_branch(rule, incorrect_workflow): result, message = rule.fn(incorrect_workflow.jobs["job-key"].steps[0]) @@ -83,3 +95,11 @@ def test_rule_on_incorrect_workflow_internal_commit(rule, incorrect_workflow): result, message = rule.fn(incorrect_workflow.jobs["job-key"].steps[2]) assert result == False assert "Please pin to main" in message + + +def test_fail_compatibility(rule, correct_workflow): + finding = rule.execute(correct_workflow) + assert "Workflow not compatible with" in finding.description + + finding = rule.execute(correct_workflow.jobs["job-key"]) + assert "Job not compatible with" in finding.description diff --git a/lint-workflow/tests/test_load.py b/lint-workflow/tests/test_load.py index 7ed6317e..38131563 100644 --- a/lint-workflow/tests/test_load.py +++ b/lint-workflow/tests/test_load.py @@ -7,6 +7,8 @@ from .conftest import FIXTURE_DIR from .context import src +from src.utils import Settings + yaml = YAML() @@ -43,5 +45,4 @@ def test_load_workflow_from_file(workflow_filename: str) -> None: def test_load_workflow_from_yaml(workflow_yaml: CommentedMap) -> None: workflow = src.load.WorkflowBuilder.build(yaml=workflow_yaml, from_file=False) - assert type(workflow) == src.models.Workflow diff --git a/lint-workflow/tests/test_rule.py b/lint-workflow/tests/test_rule.py index 5fb5e782..4e43a8a1 100644 --- a/lint-workflow/tests/test_rule.py +++ b/lint-workflow/tests/test_rule.py @@ -67,6 +67,16 @@ def fn(self, obj: Union[Workflow, Job, Step]) -> bool: return obj.name is not None, self.message +class RuleException(Rule): + def __init__(self): + self.message = "should raise Exception" + self.on_fail = "error" + + def fn(self, obj: Union[Workflow, Job, Step]) -> bool: + raise Exception("test Exception") + return True, self.message + + @pytest.fixture def step_rule(): return RuleStep() @@ -77,6 +87,11 @@ def exists_rule(): return RuleNameExists() +@pytest.fixture +def exception_rule(): + return RuleException() + + def test_build_lint_message(step_rule, correct_workflow): assert step_rule.build_lint_message("test", correct_workflow) == "Workflow => test" @@ -119,3 +134,7 @@ def test_incorrect_rule_execution(exists_rule, incorrect_workflow): "name must exist" in exists_rule.execute(incorrect_workflow.jobs["job-key"].steps[0]).description ) + + +def test_exception_rule_execution(exception_rule, incorrect_workflow): + assert "failed to apply" in exception_rule.execute(incorrect_workflow).description diff --git a/lint-workflow/tests/test_utils.py b/lint-workflow/tests/test_utils.py index 87c6d5fa..e9b667c8 100644 --- a/lint-workflow/tests/test_utils.py +++ b/lint-workflow/tests/test_utils.py @@ -6,3 +6,35 @@ from .conftest import FIXTURE_DIR from .context import src + +from src.utils import Action + + +def test_action_eq(): + action_def = { + "name": "bitwarden/sm-action", + "version": "1.0.0", + "sha": "some-sha" + } + + action_a = Action(**action_def) + action_b = Action(**action_def) + + assert (action_a == action_b) == True + assert (action_a != action_b) == False + + +def test_action_ne(): + action_a = Action( + name = "bitwarden/sm-action", + version = "1.0.0", + sha = "some-sha" + ) + action_b = Action( + name = "bitwarden/sm-action", + version = "1.1.0", + sha = "some-other-sha" + ) + + assert (action_a == action_b) == False + assert (action_a != action_b) == True