From 5989b8963edfed36f8c824a16861e508b9b0c0a9 Mon Sep 17 00:00:00 2001 From: arng4108 Date: Fri, 3 Jan 2025 15:46:33 -0500 Subject: [PATCH] Added retry statements and improved logging for asm sync code --- src/pe_asm/data/cyhy_db_query.py | 14 ++++-- .../helpers/enumerate_subs_from_root.py | 17 ++++++-- .../helpers/fill_cidrs_from_cyhy_assets.py | 2 +- src/pe_asm/helpers/get_cyhy_assets.py | 10 +++++ .../helpers/link_subs_and_ips_from_ips.py | 43 ++++++++++--------- .../helpers/link_subs_and_ips_from_subs.py | 8 ++-- src/pe_asm/helpers/shodan_dedupe.py | 12 +++--- 7 files changed, 67 insertions(+), 39 deletions(-) diff --git a/src/pe_asm/data/cyhy_db_query.py b/src/pe_asm/data/cyhy_db_query.py index 529dfe7d..de4cce10 100644 --- a/src/pe_asm/data/cyhy_db_query.py +++ b/src/pe_asm/data/cyhy_db_query.py @@ -234,7 +234,7 @@ def insert_sectors(conn, sectors_list): password = db_password_key() for sector in sectors_list: try: - print(sector) + # print(sector) cur = conn.cursor() sql = """ INSERT INTO sectors(id, acronym, name, email, contact_name, retired, first_seen, last_seen, password) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, PGP_SYM_ENCRYPT(%s, %s)) @@ -342,6 +342,7 @@ def query_pe_report_on_orgs(conn): SELECT organizations_uid, cyhy_db_name, name, agency_type FROM organizations o WHERE report_on = True + ORDER BY cyhy_db_name ASC """ # Option 2: Run on orgs where run_scans/fceb/fceb_child/demo are true (excluding the 142 PE orgs) # sql = """ @@ -351,6 +352,7 @@ def query_pe_report_on_orgs(conn): # o.report_on = False # AND # (o.run_scans OR o.fceb OR o.fceb_child OR o.demo) + # ORDER BY cyhy_db_name ASC # """ df = pd.read_sql(sql, conn) @@ -489,7 +491,7 @@ def execute_ips(conn, df): cursor = conn.cursor() extras.execute_values(cursor, sql.format(table, cols), tpls, page_size=100000) conn.commit() - LOGGER.info("%s new IPs successfully upserted into ip table...", len(df)) + # LOGGER.info("%s new IPs successfully upserted into ip table...", len(df)) # reducing logging except (Exception, psycopg2.DatabaseError) as err: # Show error and close connection if failed LOGGER.error("There was a problem with your database query %s", err) @@ -519,6 +521,8 @@ def query_roots(conn): o.report_on = True AND r.enumerate_subs = True + ORDER BY + cyhy_db_name ASC """ # Option 2: Run on orgs where run_scans/fceb/fceb_child/demo are true (excluding the 142 PE orgs) # sql = """ @@ -536,6 +540,8 @@ def query_roots(conn): # (o.run_scans OR o.fceb OR o.fceb_child OR o.demo) # AND # r.enumerate_subs = True + # ORDER BY + # cyhy_db_name ASC # """ df = pd.read_sql(sql, conn) @@ -548,7 +554,7 @@ def insert_sub_domains(conn, df): try: # Execute insert query df = df.drop_duplicates() - df["current"] = True + df.insert(len(df.columns), "current", True) tpls = [tuple(x) for x in df.to_numpy()] cols = ",".join(list(df.columns)) table = "sub_domains" @@ -623,7 +629,7 @@ def update_shodan_ips(conn, df): try: extras.execute_values(cursor, sql.format(table, cols), tpls) conn.commit() - print("Data inserted using execute_values() successfully..") + print("Shodan data inserted using execute_values() successfully..") except (Exception, psycopg2.DatabaseError) as err: show_psycopg2_exception(err) cursor.close() diff --git a/src/pe_asm/helpers/enumerate_subs_from_root.py b/src/pe_asm/helpers/enumerate_subs_from_root.py index 2a4e5967..be215aee 100644 --- a/src/pe_asm/helpers/enumerate_subs_from_root.py +++ b/src/pe_asm/helpers/enumerate_subs_from_root.py @@ -3,6 +3,7 @@ import datetime import json import logging +import time # Third-Party Libraries import pandas as pd @@ -36,10 +37,18 @@ def enumerate_roots(root_domain, root_uid): ) headers = {"Content-Type": "application/json"} response = requests.request("POST", url, headers=headers, data=payload) + + # Retry clause + retry_count, max_retries, time_delay = 1, 10, 5 + while response.status_code != 200 and retry_count <= max_retries: + LOGGER.warning(f"Retrying WhoisXML API endpoint (code {response.status_code}), attempt {retry_count} of {max_retries} (url: {url})") + time.sleep(time_delay) + response = requests.request("POST", url, headers=headers, data=payload) + retry_count += 1 + data = response.json() sub_domains = data["domainsList"] - print(len(sub_domains)) - + # print(len(sub_domains)) data_source = get_data_source_uid("WhoisXML") # First add the root domain to the subs table @@ -87,7 +96,7 @@ def get_subdomains(staging=False, orgs_df=None): roots_df = sqs_query_roots(conn, orgs_df["organizations_uid"][0]) total_roots = len(roots_df.index) - LOGGER.info("Got %d root domains.", total_roots) + LOGGER.info("Found %d root domains", total_roots) # Loop through roots count = 0 @@ -104,7 +113,7 @@ def get_subdomains(staging=False, orgs_df=None): count += 1 if count % 10 == 0 or count == total_roots: - LOGGER.info("\t\t%d/%d complete.", count, total_roots) + LOGGER.info("\t\t%d/%d roots enumerated", count, total_roots) # Close database connection conn.close() diff --git a/src/pe_asm/helpers/fill_cidrs_from_cyhy_assets.py b/src/pe_asm/helpers/fill_cidrs_from_cyhy_assets.py index 42e1fa86..0b67c679 100644 --- a/src/pe_asm/helpers/fill_cidrs_from_cyhy_assets.py +++ b/src/pe_asm/helpers/fill_cidrs_from_cyhy_assets.py @@ -49,7 +49,7 @@ def fill_cidrs(staging, orgs): (network["network"], org_id, "cyhy_db", first_seen, last_seen), ) except Exception as e: - print(e) + LOGGER.error(e) continue conn.commit() cur.close() diff --git a/src/pe_asm/helpers/get_cyhy_assets.py b/src/pe_asm/helpers/get_cyhy_assets.py index 169af32f..5ec59895 100644 --- a/src/pe_asm/helpers/get_cyhy_assets.py +++ b/src/pe_asm/helpers/get_cyhy_assets.py @@ -4,6 +4,7 @@ # Standard Python Libraries import datetime import logging +import time # Third-Party Libraries from bs4 import BeautifulSoup @@ -37,6 +38,15 @@ def dotgov_domains(): """Get list of dotgov domains from the github repo.""" URL = "https://github.com/cisagov/dotgov-data/blob/main/current-federal.csv" r = requests.get(URL) + + # Retry clause + retry_count, max_retries, time_delay = 1, 10, 5 + while r.status_code != 200 and retry_count <= max_retries: + LOGGER.warning(f"Retrying Github dotgov repo (code {r.status_code}), attempt {retry_count} of {max_retries} (url: {URL})") + time.sleep(time_delay) + r = requests.get(URL) + retry_count += 1 + soup = BeautifulSoup(r.content, features="lxml") table = soup.find_all("table") df = pd.read_html(str(table))[0] diff --git a/src/pe_asm/helpers/link_subs_and_ips_from_ips.py b/src/pe_asm/helpers/link_subs_and_ips_from_ips.py index 8ccea3b9..94d9e1bb 100644 --- a/src/pe_asm/helpers/link_subs_and_ips_from_ips.py +++ b/src/pe_asm/helpers/link_subs_and_ips_from_ips.py @@ -33,15 +33,21 @@ def reverseLookup(ip_obj, failed_ips, conn, thread): url = f"https://dns-history.whoisxmlapi.com/api/v1?apiKey={WHOIS_KEY}&ip={ip_obj['ip']}" payload = {} headers = {} - response = requests.request("GET", url, headers=headers, data=payload).json() - if response.get("code") == 429: - response = requests.request("GET", url, headers=headers, data=payload).json() - if response.get("code") == 429: - response = requests.request( - "GET", url, headers=headers, data=payload - ).json() - if response.get("code") == 429: - failed_ips.append(ip_obj["ip"]) + response = requests.request("GET", url, headers=headers, data=payload) + + # Retry clause + retry_count, max_retries, time_delay = 1, 3, 3 + while response.status_code != 200 and retry_count <= max_retries: + LOGGER.warning(f"Retrying WhoisXML API endpoint (code {response.status_code}), attempt {retry_count} of {max_retries} (url: {url})") + time.sleep(time_delay) + response = requests.request("GET", url, headers=headers, data=payload) + retry_count += 1 + # If API call still unsuccessful + if response.status_code != 200: + bad_ip = ip_obj["ip"] + LOGGER.error(f"Max retries reached for {bad_ip}, labeling as failed") + failed_ips.append(ip_obj["ip"]) + response = response.json() found_domains = [] try: @@ -52,7 +58,7 @@ def reverseLookup(ip_obj, failed_ips, conn, thread): result = response["result"] for domain in result: - print(domain) + # print(domain) try: found_domains.append( { @@ -90,8 +96,8 @@ def link_domain_from_ip(ip_obj, org_uid, data_source, failed_ips, conn, thread): ), ) row = cur.fetchone() - print("Row after procedure") - print(row) + # print("Row after procedure") + # print(row) conn.commit() cur.close() return found_domains @@ -100,17 +106,14 @@ def link_domain_from_ip(ip_obj, org_uid, data_source, failed_ips, conn, thread): def run_ip_chunk(org_uid, ips_df, thread, conn): """Run the provided chunk through the linking process.""" count = 0 - last_100 = time.time() + last_chunk = time.time() failed_ips = [] for ip_index, ip in ips_df.iterrows(): - # Set up logging for every 100 IPs + # Log progress count += 1 if count % 10000 == 0: - LOGGER.info(f"{thread}: Currently Running ips: {count}/{len(ips_df)}") - LOGGER.info( - f"{thread}: {time.time() - last_100} seconds for the last 50 IPs" - ) - last_100 = time.time() + LOGGER.info(f"{thread}: Running IPs: {count}/{len(ips_df)}, {time.time() - last_chunk} seconds for the last IP chunk") + last_chunk = time.time() # Link domain from IP try: @@ -147,7 +150,7 @@ def connect_subs_from_ips(staging, orgs_df=None): else: conn = pe_db_connect() LOGGER.info( - "Running on %s. %d/%d complete.", org["cyhy_db_name"], org_count, num_orgs + "Running on %s, %d/%d", org["cyhy_db_name"], org_count, num_orgs ) # Query IPs org_uid = org["organizations_uid"] diff --git a/src/pe_asm/helpers/link_subs_and_ips_from_subs.py b/src/pe_asm/helpers/link_subs_and_ips_from_subs.py index 5fba04f2..cdf02f18 100644 --- a/src/pe_asm/helpers/link_subs_and_ips_from_subs.py +++ b/src/pe_asm/helpers/link_subs_and_ips_from_subs.py @@ -43,7 +43,7 @@ def link_ip_from_domain(sub, root_uid, org_uid, data_source, conn): (DATE, ip_hash, ip, org_uid, sub, data_source, root_uid, None), ) row = cur.fetchone() - print(row) + # print(row) conn.commit() cur.close() return 1 @@ -66,7 +66,7 @@ def connect_ips_from_subs(staging, orgs_df=None): conn.close() # Loop through orgs - org_count = 0 + org_count = 1 for org_index, org_row in orgs_df.iterrows(): # Connect to database if staging: @@ -74,13 +74,13 @@ def connect_ips_from_subs(staging, orgs_df=None): else: conn = pe_db_connect() LOGGER.info( - "Running on %s. %d/%d complete.", + "Running on %s, %d/%d", org_row["cyhy_db_name"], org_count, num_orgs, ) org_uid = org_row["organizations_uid"] - print(org_uid) + # print(org_uid) # Query sub-domains subs_df = query_subs(str(org_uid), conn) diff --git a/src/pe_asm/helpers/shodan_dedupe.py b/src/pe_asm/helpers/shodan_dedupe.py index 8a0efa55..d1195ce0 100644 --- a/src/pe_asm/helpers/shodan_dedupe.py +++ b/src/pe_asm/helpers/shodan_dedupe.py @@ -185,7 +185,7 @@ def ip_dedupe(api, ips, agency_type, conn): try: hosts = api.host(ips[i * 100 : (i + 1) * 100]) except shodan.APIError as err: - print("Error: {}".format(err)) + LOGGER.error(f"Error: {err}") continue if isinstance(hosts, list): for h in hosts: @@ -324,7 +324,7 @@ def dedupe(staging, orgs_df=None): api = shodan_api_init()[0] # Loop through orgs - org_count = 0 + org_count = 1 for org_index, org in orgs_df.iterrows(): # Connect to database if staging: @@ -332,23 +332,23 @@ def dedupe(staging, orgs_df=None): else: conn = pe_db_connect() LOGGER.info( - "Running on %s. %d/%d complete.", + "Running on %s, %d/%d", org["cyhy_db_name"], org_count, num_orgs, ) # Query CIDRS cidrs = query_cidrs_by_org(conn, org["organizations_uid"]) - LOGGER.info(f"{len(cidrs)} cidrs found") + LOGGER.info(f"{len(cidrs)} CIDRs found") # Run cidr dedupe if there are CIDRs if len(cidrs) > 0: cidr_dedupe(cidrs, api, org["agency_type"], conn) # Get IPs related to current sub-domains - LOGGER.info("Grabbing floating IPs") + LOGGER.info("Retrieving floating IPs") ips = query_floating_ips(conn, org["organizations_uid"]) - LOGGER.info("Got Ips") + LOGGER.info("Floating IPs retrieved") if len(ips) > 0: LOGGER.info("Running dedupe on IPs") ip_dedupe(api, ips, org["agency_type"], conn)