Skip to content

Commit

Permalink
handle multiple paths (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
aerickson authored Jul 30, 2024
1 parent 8636f50 commit 677f4bb
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 290 deletions.
143 changes: 65 additions & 78 deletions tf_authoritative_scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import argparse

VERSION = "1.0.1" # Define the version constant
VERSION = "1.0.2"


class TFAuthoritativeScanner:
Expand Down Expand Up @@ -38,8 +38,7 @@ class TFAuthoritativeScanner:

exception_comment_pattern = re.compile(r"#\s*terraform_authoritative_scanner_ok")

def __init__(self, file_or_directory, include_dotdirs, verbosity=0):
self.file_or_directory = file_or_directory
def __init__(self, include_dotdirs, verbosity=0):
self.include_dotdirs = include_dotdirs
self.verbosity = verbosity

Expand All @@ -49,7 +48,7 @@ def check_file_for_authoritative_resources(self, file_path):

authoritative_lines = []
excepted_lines = []
non_authoritative = True
authoritative = False
previous_line = ""
for line_number, line in enumerate(lines, start=1):
stripped_line = line.strip()
Expand All @@ -63,97 +62,90 @@ def check_file_for_authoritative_resources(self, file_path):
previous_line
):
authoritative_lines.append({"line_number": line_number, "line": stripped_line})
non_authoritative = False
authoritative = True
else:
excepted_lines.append({"line_number": line_number, "line": stripped_line})
previous_line = stripped_line

return {
# arrays of dicts
"file_path": file_path,
"authoritative": authoritative,
"authoritative_lines": authoritative_lines,
"excepted_lines": excepted_lines,
# boolean
"non_authoritative": non_authoritative,
}

def check_directory_for_authoritative_resources(self):
all_authoritative_lines = []
non_authoritative_files = []
all_excluded_lines = []
total_files = 0
for root, dirs, files in os.walk(self.file_or_directory):
def _scan_directory(self, directory):
for root, dirs, files in os.walk(directory):
if not self.include_dotdirs:
# Exclude directories starting with '.'
dirs[:] = [d for d in dirs if not d.startswith(".")]
for file in files:
if file.endswith(".tf"):
total_files += 1
file_path = os.path.join(root, file)
result = self.check_file_for_authoritative_resources(file_path)
authoritative_lines = result["authoritative_lines"]
non_authoritative = result["non_authoritative"]
excepted_lines = result["excepted_lines"]
if authoritative_lines:
all_authoritative_lines.append(
{"file_path": file_path, "authoritative_lines": authoritative_lines}
)
if excepted_lines:
all_excluded_lines.append({"file_path": file_path, "excepted_lines": excepted_lines})
if non_authoritative:
non_authoritative_files.append({"file_path": file_path})
yield os.path.join(root, file)

return {
# arrays of dicts
"all_excluded_lines": all_excluded_lines,
"all_authoritative_lines": all_authoritative_lines,
"non_authoritative_files": non_authoritative_files,
# integer
"total_files": total_files,
}
def check_paths_for_authoritative_resources(self, directory):
results = []
total_files = 0
for path in directory:
if os.path.isdir(path):
files = self._scan_directory(path)
else:
files = [path]

for file_path in files:
total_files += 1
file_entry = self.check_file_for_authoritative_resources(file_path)
results.append(file_entry)
return {"files_scanned": total_files, "results": results}

def run(self, paths):
total_files = 0
results = []
authoritative_files_found = 0

call_result = self.check_paths_for_authoritative_resources(paths)
results = call_result.get("results")
total_files = call_result.get("files_scanned")

for file_entry in results:
file_path = file_entry["file_path"]

def run(self):
if os.path.isdir(self.file_or_directory):
result = self.check_directory_for_authoritative_resources()
all_authoritative_lines = result["all_authoritative_lines"]
total_files = result["total_files"]
non_authoritative_files = result["non_authoritative_files"]
excluded_files = result["all_excluded_lines"]
# TODO: would be nicer to have a data structure keyed on file_path vs just put into different arrrays
# - ordering of output messages could be a bit odd (out of normal order)
if self.verbosity:
for item in excluded_files:
file_path = item["file_path"]
lines = item["excepted_lines"]
for item in lines:
if file_entry["authoritative"]:
authoritative_files_found += 1
if self.verbosity:
authoritative_lines = file_entry["authoritative_lines"]
for item in authoritative_lines:
line_number = item["line_number"]
line = item["line"]
print(f"EXCLUDED: {file_path}:{line_number}: {line}")
for item in non_authoritative_files:
file_path = item["file_path"]
print(f"OK: {file_path}")
if all_authoritative_lines:
for item in all_authoritative_lines:
file_path = item["file_path"]
lines = item["authoritative_lines"]
for item in lines:
print(f"AUTHORITATIVE: {file_path}:{line_number}: {line}")
elif file_entry["excepted_lines"]:
if self.verbosity:
excepted_lines = file_entry["excepted_lines"]
for item in excepted_lines:
line_number = item["line_number"]
line = item["line"]
print(f"AUTHORITATIVE: {file_path}:{line_number}: {line}")
authoritative_files = len(all_authoritative_lines)
print(f"FAIL: {authoritative_files} of {total_files} scanned files are authoritative.")
sys.exit(1)
print(f"EXCEPTED: {file_path}:{line_number}: {line}")
else:
authoritative_files = len(all_authoritative_lines)
print(f"PASS: {authoritative_files} of {total_files} scanned files are authoritative.")
sys.exit(0)
if self.verbosity:
print(f"OK: {file_path}")

if authoritative_files_found > 0:
print(f"FAIL: {authoritative_files_found} of {total_files} scanned files are authoritative.")
sys.exit(1)
else:
# a file was given
raise SystemExit("Error: File mode doesn't work yet.")
print(f"PASS: {authoritative_files_found} of {total_files} scanned files are authoritative.")
sys.exit(0)


def _verify_paths(paths):
for path in paths:
if not os.path.exists(path):
print(f"Error: The path '{path}' does not exist.")
sys.exit(1)


def main():
parser = argparse.ArgumentParser(description="Static analysis of Terraform files for authoritative GCP resources.")
parser.add_argument("file_or_directory", help="the Terraform file or directory to scan")
parser.add_argument("paths", metavar="path", type=str, nargs="+", help="File or directory to scan")
parser.add_argument(
"-i",
"--include-dotdirs",
Expand All @@ -175,14 +167,9 @@ def main():
)
args = parser.parse_args()

# check if args.directory is a file, if it is, set args.directory to the parent directory

if not os.path.exists(args.file_or_directory):
print(f"Error: The directory {args.file_or_directory} does not exist.", file=sys.stderr)
sys.exit(1)

scanner = TFAuthoritativeScanner(args.file_or_directory, args.include_dotdirs, args.verbose)
scanner.run()
_verify_paths(args.paths)
scanner = TFAuthoritativeScanner(args.include_dotdirs, args.verbose)
scanner.run(args.paths)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 677f4bb

Please sign in to comment.