Skip to content

Commit

Permalink
Fix GHAS data issues (#10)
Browse files Browse the repository at this point in the history
* fix issues with GHAS scanning and some missing columns from the output
  • Loading branch information
austimkelly authored Feb 6, 2024
1 parent f1aacc7 commit e704b87
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
2 changes: 1 addition & 1 deletion example_report/report_202402021149.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ <h1>About Secret Synth</h1><p>Secret Synth is a meta-secret scanner solution tha
<td id="T_4fc0a_row7_col1" class="data row7 col1" >132</td>
</tr>
<tr>
<td id="T_4fc0a_row8_col0" class="data row8 col0" >Repos without GHAS Secrets Scanning Enabled</td>
<td id="T_4fc0a_row8_col0" class="data row8 col0" >Repos with GHAS Secrets Scanning Disabled</td>
<td id="T_4fc0a_row8_col1" class="data row8 col1" >0</td>
</tr>
<tr>
Expand Down
23 changes: 21 additions & 2 deletions org-scan/csv_coalesce.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,34 @@ def merge_csv_all_tools(keep_secrets,
row['repo_name'] = row.pop('repo', '')
row['file'] = row.pop('html_url', '')
row['line'] = row.pop('unavailable - see alert in Github', '')
row['secret'] = row.pop('unavailable - see alert in Github', '')
if not keep_secrets:
row['secret'] = hash_secret(row.pop('secret', ''))
else:
row['secret'] = row.pop('secret', '')
row['match'] = row.pop('unavailable - see alert in Github', '')
row['detector'] = row.pop('rule', '')
row['detector'] = row.pop('secret_type_display_name', '')

# only in ghas
row['ghas_number'] = row.pop('number', '')
row['ghas_rule'] = row.pop('rule', '')
row['ghas_state'] = row.pop('state', '')
row['ghas_created_at'] = row.pop('created_at', '')
row['ghas_updated_at'] = row.pop('updated_at', '')
row['ghas_url'] = row.pop('url', '')
row['ghas_html_url'] = row.pop('html_url', '')
row['ghas_locations_url'] = row.pop('locations_url', '')
row['ghas_secret_type'] = row.pop('secret_type', '')
row['ghas_secret_type_display_name'] = row.pop('secret_type_display_name', '')
row['ghas_validity'] = row.pop('validity', '')
row['ghas_resolution'] = row.pop('resolution', '')
row['ghas_resolved_by'] = row.pop('resolved_by', '')
row['ghas_resolved_at'] = row.pop('resolved_at', '')
row['ghas_resolution_comment'] = row.pop('resolution_comment', '')
row['ghas_push_protection_bypassed'] = row.pop('push_protection_bypassed', '')
row['ghas_push_protection_bypassed_by'] = row.pop('push_protection_bypassed_by', '')
row['ghas_push_protection_bypassed_at'] = row.pop('push_protection_bypassed_at', '')



writer.writerow(row)

Expand Down
67 changes: 54 additions & 13 deletions org-scan/ghas_secret_alerts_fetch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import requests
import csv

verbose_logging = True
verbose_logging = False

def fetch_repos(account_type, account, headers, logger=None, page=1, per_page=100):
repos = []
Expand All @@ -26,6 +26,7 @@ def fetch_repos(account_type, account, headers, logger=None, page=1, per_page=10

return repos

# Returns a list of repos where secret scanning is disabled
def fetch_ghas_secret_scanning_alerts(owner_type,
owners, headers,
report_name,
Expand All @@ -35,25 +36,42 @@ def fetch_ghas_secret_scanning_alerts(owner_type,
if dry_run:
print(f"dry-run: Calling Github REST API for all repos under orgs: {owners}")
return

# Open the CSV file
with open(report_name, 'w', newline='') as csvfile:
fieldnames = ['owner', 'repo', 'number', 'rule', 'state', 'created_at', 'html_url']
fieldnames = ['repo', 'rule', 'owner', 'number', 'created_at', 'updated_at', 'url', 'html_url', 'locations_url', 'state', 'secret_type',
'secret_type_display_name', 'secret', 'validity', 'resolution', 'resolved_by', 'resolved_at',
'resolution_comment', 'push_protection_bypassed', 'push_protection_bypassed_by',
'push_protection_bypassed_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
repos_without_secrets_scanning = []
repos_secret_scanning_disabled = []

writer.writeheader()
for owner in owners:
repos = fetch_repos(owner_type, owner, headers, logger=logger)
# For each repo, get the secret scanning alerts

# If verbose logging enabled, print the list of repos names only and the total number of repos
if verbose_logging:
print(f"List of repos for {owner}: {', '.join([repo['name'] for repo in repos])}")
print(f"Total number of repos to get GHAS Security Alerts for {owner}: {len(repos)}")

for repo in repos:

# If verbose logging enabled, print the repo name
if verbose_logging:
print(f"****************Getting GHAS Security Alerts for {owner}/{repo['name']}")

if verbose_logging:
print(f"Calling https://api.github.com/repos/{owner}/{repo['name']}/secret-scanning/alerts ...")

# https://docs.github.com/en/rest/secret-scanning/secret-scanning?apiVersion=2022-11-28#list-secret-scanning-alerts-for-a-repository
alerts_response = requests.get(f'https://api.github.com/repos/{owner}/{repo["name"]}/secret-scanning/alerts', headers=headers)
alerts = alerts_response.json()

# print alerts json response for each repo
# if verbose and alert, print the alerts for the repo
if verbose_logging:
print(f"Calling https://api.github.com/repos/{owner}/{repo['name']}/secret-scanning/alerts ...")
print(alerts)
print(f"Alerts for {owner}/{repo['name']}: {alerts}")

# Check if message contains {'message': 'Resource not accessible by personal access token'}
if isinstance(alerts, dict) and 'message' in alerts and alerts['message'] == 'Resource not accessible by personal access token':
Expand All @@ -64,21 +82,44 @@ def fetch_ghas_secret_scanning_alerts(owner_type,

# If the response is a dictionary with a 'message' key, skip this iteration
if isinstance(alerts, dict) and 'message' in alerts:
print(f"Skipping {repo['name']}: {alerts['message']}")
if verbose_logging:
print(f"Skipping {repo['name']}: {alerts['message']}")

if 'message' in alerts and alerts['message'] == 'Secret scanning is disabled on this repository.':
repos_without_secrets_scanning.append(repo)
repos_secret_scanning_disabled.append(repo)
if logger:
logger.info(f"GHAS Secret scanning is disabled on repository: {repo['name']}")
continue

# Write each alert to the CSV file
for alert in alerts:

# if verbose, print the row to be written to the CSV file
# if verbose_logging:
# print(f"Alert found: {alert}")

writer.writerow({
'owner': owner, # org or user
'repo': repo['name'],
'number': alert['number'],
'rule': alert['secret_type'], # Use 'secret_type' instead of 'rule'
'state': alert['state'],
'owner': owner, # org or user
'number': alert['number'],
'created_at': alert['created_at'],
'updated_at': alert['updated_at'],
'url': alert['url'],
'html_url': alert['html_url'],
'locations_url': alert['locations_url'],
'state': alert['state'],
'secret_type': alert['secret_type'],
'secret_type_display_name': alert['secret_type_display_name'],
'secret': alert['secret'],
'validity': alert['validity'],
'resolution': alert['resolution'],
'resolved_by': alert['resolved_by'],
'resolved_at': alert['resolved_at'],
'resolution_comment': alert['resolution_comment'],
'push_protection_bypassed': alert['push_protection_bypassed'],
'push_protection_bypassed_by': alert['push_protection_bypassed_by'],
'push_protection_bypassed_at': alert['push_protection_bypassed_at']
})

return repos_without_secrets_scanning
return repos_secret_scanning_disabled
2 changes: 1 addition & 1 deletion org-scan/html_report_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def output_to_html(metrics,
repo_level_summary_text = '<p>This section provides detailed metrics for each repository scanned. This just gives you an idea of the quantity of secrets discovered by each tool and the total number of secrets in the entire repository.</p>'
detector_summary_text = '<p>Every tool emits a detector type. The table below just gives you an aggregated view of the types of secrets that have been found and the magnitude of each. This does not indicate which tool found the secret.</p>'
report_links_summary_text = '<p>Here you can find the raw data of all the secrets in the merged_scan_results_report. The first few columns represent the generic information found among all tools. Any fields starting with np_, gl_, gh_, or th_ are specifics to those tools.</p>'
timing_metrics_summary_text = '<p>Total scan time for each tool and as a percentage of whole.</p>'
timing_metrics_summary_text = '<p>Total scan time for each tool and as a percentage of whole. GHAS Secrets is never included here since local scanning is not supported.</p>'

# Write the HTML to a file
with open(report_path, 'w') as f:
Expand Down
18 changes: 12 additions & 6 deletions org-scan/secretsynth.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@
REPORTS_DIR = f"./reports/reports_{timestamp}" # This is where aggregated results are saved
ERROR_LOG_FILE = f"./reports/reports_{timestamp}/error_log_{timestamp}.log" # This is where error messages are saved
checkout_dir = "./checkout"
headers = {"Authorization": f"token {TOKEN}"}
github_rest_headers = {
"Authorization": f"token {TOKEN}",
"X-GitHub-Api-Version": "2022-11-28",
"Accept": "application/vnd.github+json"
}

def check_commands():
commands = {
Expand Down Expand Up @@ -102,7 +106,7 @@ def check_commands():
exit(1)


def fetch_repos(account_type, account, headers, internal_type=False, page=1, per_page=100):
def fetch_repos(account_type, account, github_rest_headers, internal_type=False, page=1, per_page=100):

repos = []
while True:
Expand All @@ -114,7 +118,7 @@ def fetch_repos(account_type, account, headers, internal_type=False, page=1, per
print(f"dry-run: Calling {repos_url}...")
break;

response = requests.get(repos_url, headers=headers)
response = requests.get(repos_url, headers=github_rest_headers)
data = response.json()

if isinstance(data, dict) and "message" in data:
Expand Down Expand Up @@ -173,7 +177,7 @@ def analyze_merged_results(merged_results,

# Create a DataFrame with the metrics
metrics = pd.DataFrame({
'Metric': ['Time of Report', 'Arguments', 'Owners', 'Scanning Source Tools', 'Total Repos on Disk', 'Total Repos with Secrets', 'Total Secrets by Source', 'Total Secrets (all tools)', 'Repos without GHAS Secrets Scanning Enabled', 'Total Distinct Secrets', 'Secret Matches Count (Experimental)', 'Total Errors in Log'],
'Metric': ['Time of Report', 'Arguments', 'Owners', 'Scanning Source Tools', 'Total Repos on Disk', 'Total Repos with Secrets', 'Total Secrets by Source', 'Total Secrets (all tools)', 'Repos with GHAS Secrets Scanning Disabled', 'Total Distinct Secrets', 'Secret Matches Count (Experimental)', 'Total Errors in Log'],
'Value': [now, cmd_args, owners, distinct_sources, total_repos_on_disk, total_repos_with_secrets, total_secrets_by_source, total_secrets, repos_without_ghas_secrets_scanning, total_distinct_secrets, matches_line_count, err_line_count]
})

Expand Down Expand Up @@ -285,7 +289,7 @@ def count_top_level_dirs(directory):
url = f"https://api.github.com/{ORG_TYPE}/{owner}/repos"
print(f"Getting list of repositories from {url}...")

repos = fetch_repos(ORG_TYPE, owner, headers, INTERNAL_REPOS_FLAG,)
repos = fetch_repos(ORG_TYPE, owner, github_rest_headers, INTERNAL_REPOS_FLAG,)

# Check if the response is a dictionary containing an error message
if isinstance(repos, dict) and "message" in repos:
Expand Down Expand Up @@ -356,7 +360,7 @@ def count_top_level_dirs(directory):

ghas_secret_alerts_filename = f"{REPORTS_DIR}/ghas_secret_alerts_{timestamp}.csv"
if not SKIP_GHAS:
repos_without_ghas_secrets_enabled = fetch_ghas_secret_scanning_alerts(ORG_TYPE, OWNERS, headers, ghas_secret_alerts_filename, DRY_RUN, LOGGER)
repos_without_ghas_secrets_enabled = fetch_ghas_secret_scanning_alerts(ORG_TYPE, OWNERS, github_rest_headers, ghas_secret_alerts_filename, DRY_RUN, LOGGER)
else:
repos_without_ghas_secrets_enabled = None

Expand Down Expand Up @@ -387,6 +391,8 @@ def count_top_level_dirs(directory):
os.remove(trufflehog_report_filename)
if os.path.isfile(noseyparker_report_filename):
os.remove(noseyparker_report_filename)
if os.path.isfile(ghas_secret_alerts_filename):
os.remove(ghas_secret_alerts_filename)

# Aggregate report results
metrics, repo_metrics, detector_metrics = analyze_merged_results(merged_report_name, matches_report_name, ERROR_LOG_FILE, repos_without_ghas_secrets_enabled)
Expand Down
3 changes: 3 additions & 0 deletions org-scan/ss_unittests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ def test_1_dry_run(self):
# Run the command and capture the output
result = subprocess.run(['python3', 'secretsynth.py', '--dry-run', '--owners', 'foo,bar', '--org-type', 'orgs'], capture_output=True)

print(result.stderr)
# Check that the command completed successfully
self.assertEqual(result.returncode, 0)

def test_2_invalid_args(self):
# Run the command with an invalid argument and capture the output
result = subprocess.run(['python3', 'secretsynth.py', '--invalid-arg'], capture_output=True)

print(result.stderr)
# Check that the command failed
self.assertNotEqual(result.returncode, 0)

def test_3_skip_all_scanners(self):
# Run the command with arguments to skip all scanners and capture the output
result = subprocess.run(['python3', 'secretsynth.py', '--org-type', 'users', '--owners', 'swell-consulting', '--skip-ghas', '--skip-trufflehog', '--skip-gitleaks', '--skip-noseyparker'], capture_output=True)

print(result.stderr)
# Check that the command completed successfully
self.assertEqual(result.returncode, 0)

Expand Down

0 comments on commit e704b87

Please sign in to comment.