diff --git a/tf_authoritative_scanner/scanner.py b/tf_authoritative_scanner/scanner.py index 3643879..5dc5c0d 100755 --- a/tf_authoritative_scanner/scanner.py +++ b/tf_authoritative_scanner/scanner.py @@ -5,7 +5,7 @@ import sys import argparse -VERSION = "1.0.1" # Define the version constant +VERSION = "1.0.2" class TFAuthoritativeScanner: @@ -38,8 +38,7 @@ class TFAuthoritativeScanner: exception_comment_pattern = re.compile(r"#\s*terraform_authoritative_scanner_ok") - def __init__(self, file_or_directory, include_dotdirs, verbosity=0): - self.file_or_directory = file_or_directory + def __init__(self, include_dotdirs, verbosity=0): self.include_dotdirs = include_dotdirs self.verbosity = verbosity @@ -49,7 +48,7 @@ def check_file_for_authoritative_resources(self, file_path): authoritative_lines = [] excepted_lines = [] - non_authoritative = True + authoritative = False previous_line = "" for line_number, line in enumerate(lines, start=1): stripped_line = line.strip() @@ -63,97 +62,90 @@ def check_file_for_authoritative_resources(self, file_path): previous_line ): authoritative_lines.append({"line_number": line_number, "line": stripped_line}) - non_authoritative = False + authoritative = True else: excepted_lines.append({"line_number": line_number, "line": stripped_line}) previous_line = stripped_line return { - # arrays of dicts + "file_path": file_path, + "authoritative": authoritative, "authoritative_lines": authoritative_lines, "excepted_lines": excepted_lines, - # boolean - "non_authoritative": non_authoritative, } - def check_directory_for_authoritative_resources(self): - all_authoritative_lines = [] - non_authoritative_files = [] - all_excluded_lines = [] - total_files = 0 - for root, dirs, files in os.walk(self.file_or_directory): + def _scan_directory(self, directory): + for root, dirs, files in os.walk(directory): if not self.include_dotdirs: - # Exclude directories starting with '.' dirs[:] = [d for d in dirs if not d.startswith(".")] for file in files: if file.endswith(".tf"): - total_files += 1 - file_path = os.path.join(root, file) - result = self.check_file_for_authoritative_resources(file_path) - authoritative_lines = result["authoritative_lines"] - non_authoritative = result["non_authoritative"] - excepted_lines = result["excepted_lines"] - if authoritative_lines: - all_authoritative_lines.append( - {"file_path": file_path, "authoritative_lines": authoritative_lines} - ) - if excepted_lines: - all_excluded_lines.append({"file_path": file_path, "excepted_lines": excepted_lines}) - if non_authoritative: - non_authoritative_files.append({"file_path": file_path}) + yield os.path.join(root, file) - return { - # arrays of dicts - "all_excluded_lines": all_excluded_lines, - "all_authoritative_lines": all_authoritative_lines, - "non_authoritative_files": non_authoritative_files, - # integer - "total_files": total_files, - } + def check_paths_for_authoritative_resources(self, directory): + results = [] + total_files = 0 + for path in directory: + if os.path.isdir(path): + files = self._scan_directory(path) + else: + files = [path] + + for file_path in files: + total_files += 1 + file_entry = self.check_file_for_authoritative_resources(file_path) + results.append(file_entry) + return {"files_scanned": total_files, "results": results} + + def run(self, paths): + total_files = 0 + results = [] + authoritative_files_found = 0 + + call_result = self.check_paths_for_authoritative_resources(paths) + results = call_result.get("results") + total_files = call_result.get("files_scanned") + + for file_entry in results: + file_path = file_entry["file_path"] - def run(self): - if os.path.isdir(self.file_or_directory): - result = self.check_directory_for_authoritative_resources() - all_authoritative_lines = result["all_authoritative_lines"] - total_files = result["total_files"] - non_authoritative_files = result["non_authoritative_files"] - excluded_files = result["all_excluded_lines"] - # TODO: would be nicer to have a data structure keyed on file_path vs just put into different arrrays - # - ordering of output messages could be a bit odd (out of normal order) - if self.verbosity: - for item in excluded_files: - file_path = item["file_path"] - lines = item["excepted_lines"] - for item in lines: + if file_entry["authoritative"]: + authoritative_files_found += 1 + if self.verbosity: + authoritative_lines = file_entry["authoritative_lines"] + for item in authoritative_lines: line_number = item["line_number"] line = item["line"] - print(f"EXCLUDED: {file_path}:{line_number}: {line}") - for item in non_authoritative_files: - file_path = item["file_path"] - print(f"OK: {file_path}") - if all_authoritative_lines: - for item in all_authoritative_lines: - file_path = item["file_path"] - lines = item["authoritative_lines"] - for item in lines: + print(f"AUTHORITATIVE: {file_path}:{line_number}: {line}") + elif file_entry["excepted_lines"]: + if self.verbosity: + excepted_lines = file_entry["excepted_lines"] + for item in excepted_lines: line_number = item["line_number"] line = item["line"] - print(f"AUTHORITATIVE: {file_path}:{line_number}: {line}") - authoritative_files = len(all_authoritative_lines) - print(f"FAIL: {authoritative_files} of {total_files} scanned files are authoritative.") - sys.exit(1) + print(f"EXCEPTED: {file_path}:{line_number}: {line}") else: - authoritative_files = len(all_authoritative_lines) - print(f"PASS: {authoritative_files} of {total_files} scanned files are authoritative.") - sys.exit(0) + if self.verbosity: + print(f"OK: {file_path}") + + if authoritative_files_found > 0: + print(f"FAIL: {authoritative_files_found} of {total_files} scanned files are authoritative.") + sys.exit(1) else: - # a file was given - raise SystemExit("Error: File mode doesn't work yet.") + print(f"PASS: {authoritative_files_found} of {total_files} scanned files are authoritative.") + sys.exit(0) + + +def _verify_paths(paths): + for path in paths: + if not os.path.exists(path): + print(f"Error: The path '{path}' does not exist.") + sys.exit(1) def main(): parser = argparse.ArgumentParser(description="Static analysis of Terraform files for authoritative GCP resources.") - parser.add_argument("file_or_directory", help="the Terraform file or directory to scan") + parser.add_argument("paths", metavar="path", type=str, nargs="+", help="File or directory to scan") parser.add_argument( "-i", "--include-dotdirs", @@ -175,14 +167,9 @@ def main(): ) args = parser.parse_args() - # check if args.directory is a file, if it is, set args.directory to the parent directory - - if not os.path.exists(args.file_or_directory): - print(f"Error: The directory {args.file_or_directory} does not exist.", file=sys.stderr) - sys.exit(1) - - scanner = TFAuthoritativeScanner(args.file_or_directory, args.include_dotdirs, args.verbose) - scanner.run() + _verify_paths(args.paths) + scanner = TFAuthoritativeScanner(args.include_dotdirs, args.verbose) + scanner.run(args.paths) if __name__ == "__main__": diff --git a/tf_authoritative_scanner/scanner_test.py b/tf_authoritative_scanner/scanner_test.py index 68939db..575b326 100644 --- a/tf_authoritative_scanner/scanner_test.py +++ b/tf_authoritative_scanner/scanner_test.py @@ -1,218 +1,204 @@ -import os import pytest +import os import tempfile import subprocess + from tf_authoritative_scanner.scanner import TFAuthoritativeScanner -@pytest.fixture -def temp_tf_file(): - with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: - temp_file.write(b""" - resource "google_project_iam_binding" "binding" { - project = "my-project" - role = "roles/viewer" - members = [ - "user:viewer@example.com", - ] - } - """) - temp_file.close() - yield temp_file.name - os.remove(temp_file.name) - - -@pytest.fixture -def temp_tf_dir(temp_tf_file): - temp_dir = tempfile.TemporaryDirectory() - tf_file_path = os.path.join(temp_dir.name, os.path.basename(temp_tf_file)) - with open(tf_file_path, "w") as f: - f.write(""" - resource "google_project_iam_binding" "binding" { - project = "my-project" - role = "roles/viewer" - members = [ - "user:viewer@example.com", - ] - } - - # terraform_authoritative_scanner_ok - resource "google_project_iam_binding" "binding" { - project = "my-project" - role = "roles/viewer" - members = [ - "user:viewer@example.com", - ] - } - """) - yield temp_dir.name - temp_dir.cleanup() - - -@pytest.fixture -def temp_tf_file_with_exception_same_line(): - with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: - temp_file.write(b""" - resource "google_project_iam_binding" "binding" { # terraform_authoritative_scanner_ok - project = "my-project" - role = "roles/viewer" - members = [ - "user:viewer@example.com", - ] - } - """) - temp_file.close() - yield temp_file.name - os.remove(temp_file.name) - - -@pytest.fixture -def temp_tf_file_with_exception_previous_line(): - with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: - temp_file.write(b""" - # terraform_authoritative_scanner_ok - resource "google_project_iam_binding" "binding" { - project = "my-project" - role = "roles/viewer" - members = [ - "user:viewer@example.com", - ] - } - """) - temp_file.close() - yield temp_file.name - os.remove(temp_file.name) - - -@pytest.fixture -def temp_tf_file_authoritative_resource_name_but_not_resource(): - with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: - temp_file.write(b""" - resource "non_authoritative" "google_deployment_accounts_compute_admin_google_project_iam_binding" { - project = "fxci-production-level3-workers" - role = "roles/compute.admin" - member = "serviceAccount:test3333" - } - """) - temp_file.close() - yield temp_file.name - os.remove(temp_file.name) - - -@pytest.fixture -def temp_non_authoritative_tf_file(): - with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: - temp_file.write(b""" - resource "google_compute_instance" "instance" { - name = "test-instance" - machine_type = "n1-standard-1" - zone = "us-central1-a" - } - """) - temp_file.close() - yield temp_file.name - os.remove(temp_file.name) - - -def test_check_file_for_authoritative_resources(temp_tf_file): - scanner = TFAuthoritativeScanner(temp_tf_file, include_dotdirs=False) - authoritative_lines = scanner.check_file_for_authoritative_resources(temp_tf_file) - assert len(authoritative_lines) > 0 - - -def test_check_directory_for_authoritative_resources(temp_tf_dir): - scanner = TFAuthoritativeScanner(temp_tf_dir, include_dotdirs=False) - result = scanner.check_directory_for_authoritative_resources() - all_authoritative_lines = result.get("all_authoritative_lines") - non_authoritative_files = result.get("non_authoritative_files") - excepted_lines = result.get("all_excluded_lines") - total_files = len(all_authoritative_lines) + len(non_authoritative_files) - - assert total_files == 1 - assert len(all_authoritative_lines) > 0 - assert len(excepted_lines) == 1 - - -def test_directory_not_exists(): - with pytest.raises(SystemExit): - scanner = TFAuthoritativeScanner("non_existent_directory", include_dotdirs=False) - scanner.run() - - -def test_run_verbosity(temp_tf_dir, capsys): - scanner = TFAuthoritativeScanner(temp_tf_dir, include_dotdirs=False, verbosity=1) - with pytest.raises(SystemExit): - scanner.run() - captured = capsys.readouterr() - assert "AUTHORITATIVE" in captured.out - - -def test_run_non_verbosity(temp_tf_dir, capsys): - scanner = TFAuthoritativeScanner(temp_tf_dir, include_dotdirs=False, verbosity=0) - with pytest.raises(SystemExit): - scanner.run() - captured = capsys.readouterr() - assert "1 of 1 scanned files are authoritative" in captured.out - - -def test_temp_tf_file_authoritative_resource_name_but_not_resource( - temp_tf_dir, temp_tf_file_authoritative_resource_name_but_not_resource -): - scanner = TFAuthoritativeScanner(temp_tf_dir, include_dotdirs=False) - result = scanner.check_file_for_authoritative_resources(temp_tf_file_authoritative_resource_name_but_not_resource) - authoritative_lines = result["authoritative_lines"] - excepted_lines = result["excepted_lines"] - # see 'Known Issues' in README.md - # - ideally this would be 0 authoritative lines and 1 excepted line - assert len(authoritative_lines) == 1 - assert len(excepted_lines) == 0 - - -def test_run_verbose_level_2(temp_non_authoritative_tf_file, capsys): - temp_dir = tempfile.TemporaryDirectory() - temp_file_path = os.path.join(temp_dir.name, os.path.basename(temp_non_authoritative_tf_file)) - with open(temp_file_path, "w") as f: - f.write(""" - resource "google_compute_instance" "instance" { - name = "test-instance" - machine_type = "n1-standard-1" - zone = "us-central1-a" - } - """) - - scanner = TFAuthoritativeScanner(temp_dir.name, include_dotdirs=False, verbosity=2) - with pytest.raises(SystemExit): - scanner.run() - captured = capsys.readouterr() - assert "OK: " in captured.out - - -def test_exception_comment_same_line(temp_tf_file_with_exception_same_line): - scanner = TFAuthoritativeScanner(temp_tf_file_with_exception_same_line, include_dotdirs=False) - result = scanner.check_file_for_authoritative_resources(temp_tf_file_with_exception_same_line) - authoritative_lines = result["authoritative_lines"] - excepted_lines = result["excepted_lines"] - assert len(authoritative_lines) == 0 - assert len(excepted_lines) == 1 - - -def test_exception_comment_previous_line(temp_tf_file_with_exception_previous_line): - scanner = TFAuthoritativeScanner(temp_tf_file_with_exception_previous_line, include_dotdirs=False) - result = scanner.check_file_for_authoritative_resources(temp_tf_file_with_exception_previous_line) - authoritative_lines = result["authoritative_lines"] - _non_authoritative = result["non_authoritative"] - excepted_lines = result["excepted_lines"] - assert len(authoritative_lines) == 0 - assert len(excepted_lines) == 1 - - -def test_main_function(temp_tf_dir): - result = subprocess.run(["tfas", temp_tf_dir], capture_output=True, text=True) - assert "1 of 1 scanned files are authoritative" in result.stdout - assert result.returncode == 1 - - -def test_main_function_invalid(): - result = subprocess.run(["tfas", "bad_path_xyy888"], capture_output=True, text=True) - # assert "1 of 1 scanned files are authoritative" in result.stdout - assert result.returncode == 1 +class TestTFAuthoritativeScanner: + @pytest.fixture + def scanner(self): + return TFAuthoritativeScanner(include_dotdirs=False, verbosity=1) + + @pytest.fixture + def mock_file(self, tmp_path): + file = tmp_path / "test.tf" + file.write_text('resource "google_project_iam_binding" "test" {}') + return file + + @pytest.fixture + def temp_tf_file(self): + with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: + temp_file.write(b""" + resource "google_project_iam_binding" "binding" { + project = "my-project" + role = "roles/viewer" + members = [ + "user:viewer@example.com", + ] + } + """) + temp_file.close() + yield temp_file.name + os.remove(temp_file.name) + + @pytest.fixture + def temp_tf_dir(self, temp_tf_file): + temp_dir = tempfile.TemporaryDirectory() + tf_file_path = os.path.join(temp_dir.name, os.path.basename(temp_tf_file)) + with open(tf_file_path, "w") as f: + f.write(""" + resource "google_project_iam_binding" "binding" { + project = "my-project" + role = "roles/viewer" + members = [ + "user:viewer@example.com", + ] + } + + # terraform_authoritative_scanner_ok + resource "google_project_iam_binding" "binding" { + project = "my-project" + role = "roles/viewer" + members = [ + "user:viewer@example.com", + ] + } + """) + yield temp_dir.name + temp_dir.cleanup() + + @pytest.fixture + def temp_tf_file_with_exception_same_line(self): + with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: + temp_file.write(b""" + resource "google_project_iam_binding" "binding" { # terraform_authoritative_scanner_ok + project = "my-project" + role = "roles/viewer" + members = [ + "user:viewer@example.com", + ] + } + """) + temp_file.close() + yield temp_file.name + os.remove(temp_file.name) + + @pytest.fixture + def temp_tf_file_with_exception_previous_line(self): + with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: + temp_file.write(b""" + # terraform_authoritative_scanner_ok + resource "google_project_iam_binding" "binding" { + project = "my-project" + role = "roles/viewer" + members = [ + "user:viewer@example.com", + ] + } + """) + temp_file.close() + yield temp_file.name + os.remove(temp_file.name) + + @pytest.fixture + def temp_tf_file_authoritative_resource_name_but_not_resource(self): + with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: + temp_file.write(b""" + resource "non_authoritative" "google_deployment_accounts_compute_admin_google_project_iam_binding" { + project = "fxci-production-level3-workers" + role = "roles/compute.admin" + member = "serviceAccount:test3333" + } + """) + temp_file.close() + yield temp_file.name + os.remove(temp_file.name) + + @pytest.fixture + def temp_non_authoritative_tf_file(self): + with tempfile.NamedTemporaryFile(suffix=".tf", delete=False) as temp_file: + temp_file.write(b""" + resource "google_compute_instance" "instance" { + name = "test-instance" + machine_type = "n1-standard-1" + zone = "us-central1-a" + } + """) + temp_file.close() + yield temp_file.name + os.remove(temp_file.name) + + def test_initialization(self, scanner): + assert not scanner.include_dotdirs + assert scanner.verbosity == 1 + + def test_check_file_for_authoritative_resources(self, scanner, mock_file): + result = scanner.check_file_for_authoritative_resources(mock_file) + assert result["authoritative"] + + def test_check_file_for_authoritative_resources_with_exception(self, scanner, tmp_path): + file = tmp_path / "test_exception.tf" + file.write_text('# terraform_authoritative_scanner_ok\nresource "google_project_iam_binding" "test" {}') + result = scanner.check_file_for_authoritative_resources(file) + assert len(result["excepted_lines"]) == 1 + + def test_check_directory_fail(self, scanner, temp_tf_dir): + r = scanner.check_paths_for_authoritative_resources([temp_tf_dir]) + assert r["files_scanned"] == 1 + assert len(r["results"]) == 1 + assert r["results"][0]["authoritative"] + assert len(r["results"][0]["authoritative_lines"]) == 1 + assert len(r["results"][0]["excepted_lines"]) == 1 + + def test_check_directory_ok(self, scanner, temp_non_authoritative_tf_file): + r = scanner.check_paths_for_authoritative_resources([temp_non_authoritative_tf_file]) + assert r["files_scanned"] == 1 + assert len(r["results"]) == 1 + assert not r["results"][0]["authoritative"] + assert len(r["results"][0]["authoritative_lines"]) == 0 + assert len(r["results"][0]["excepted_lines"]) == 0 + + def test_check_known_issue(self, scanner, temp_tf_file_authoritative_resource_name_but_not_resource): + # see 'Known Issues' in README.md + # - ideally this would be 0 authoritative lines and 1 excepted line + r = scanner.check_paths_for_authoritative_resources([temp_tf_file_authoritative_resource_name_but_not_resource]) + assert r["files_scanned"] == 1 + assert len(r["results"][0]["authoritative_lines"]) == 1 + assert len(r["results"][0]["excepted_lines"]) == 0 + + def test_check_exclude_comment_inline(self, scanner, temp_tf_file_with_exception_same_line): + r = scanner.check_paths_for_authoritative_resources([temp_tf_file_with_exception_same_line]) + assert r["files_scanned"] == 1 + assert len(r["results"][0]["authoritative_lines"]) == 0 + assert len(r["results"][0]["excepted_lines"]) == 1 + + def test_check_exclude_comment_previous_line(self, scanner, temp_tf_file_with_exception_previous_line): + r = scanner.check_paths_for_authoritative_resources([temp_tf_file_with_exception_previous_line]) + assert r["files_scanned"] == 1 + assert len(r["results"][0]["authoritative_lines"]) == 0 + assert len(r["results"][0]["excepted_lines"]) == 1 + + # main tests + + def test_main_function(self, temp_tf_dir): + result = subprocess.run(["tfas", temp_tf_dir], capture_output=True, text=True) + assert "1 of 1 scanned files are authoritative" in result.stdout + assert result.returncode == 1 + + def test_main_function_invalid(self): + result = subprocess.run(["tfas", "bad_path_xyy888"], capture_output=True, text=True) + assert result.returncode == 1 + + def test_main_directory_fail(self, temp_tf_dir): + result = subprocess.run(["tfas", temp_tf_dir], capture_output=True, text=True) + assert result.stderr == "" + assert result.stdout == "FAIL: 1 of 1 scanned files are authoritative.\n" + assert result.returncode == 1 + + def test_main_directory_ok(self, temp_non_authoritative_tf_file): + result = subprocess.run(["tfas", "-v", temp_non_authoritative_tf_file], capture_output=True, text=True) + assert result.stderr == "" + assert "PASS: 0 of 1 scanned files are authoritative.\n" in result.stdout + assert result.returncode == 0 + + def test_main_directory_verbose(self, temp_tf_dir): + result = subprocess.run(["tfas", "-v", temp_tf_dir], capture_output=True, text=True) + assert result.stderr == "" + assert result.returncode == 1 + + def test_main_directory_exception(self, temp_tf_file_with_exception_same_line): + result = subprocess.run(["tfas", "-v", temp_tf_file_with_exception_same_line], capture_output=True, text=True) + assert result.stderr == "" + assert result.returncode == 0