Skip to content

Commit

Permalink
Added retry statements and improved logging for asm sync code
Browse files Browse the repository at this point in the history
  • Loading branch information
arng4108 committed Jan 3, 2025
1 parent defc8fc commit 5989b89
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 39 deletions.
14 changes: 10 additions & 4 deletions src/pe_asm/data/cyhy_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def insert_sectors(conn, sectors_list):
password = db_password_key()
for sector in sectors_list:
try:
print(sector)
# print(sector)
cur = conn.cursor()
sql = """
INSERT INTO sectors(id, acronym, name, email, contact_name, retired, first_seen, last_seen, password) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, PGP_SYM_ENCRYPT(%s, %s))
Expand Down Expand Up @@ -342,6 +342,7 @@ def query_pe_report_on_orgs(conn):
SELECT organizations_uid, cyhy_db_name, name, agency_type
FROM organizations o
WHERE report_on = True
ORDER BY cyhy_db_name ASC
"""
# Option 2: Run on orgs where run_scans/fceb/fceb_child/demo are true (excluding the 142 PE orgs)
# sql = """
Expand All @@ -351,6 +352,7 @@ def query_pe_report_on_orgs(conn):
# o.report_on = False
# AND
# (o.run_scans OR o.fceb OR o.fceb_child OR o.demo)
# ORDER BY cyhy_db_name ASC
# """

df = pd.read_sql(sql, conn)
Expand Down Expand Up @@ -489,7 +491,7 @@ def execute_ips(conn, df):
cursor = conn.cursor()
extras.execute_values(cursor, sql.format(table, cols), tpls, page_size=100000)
conn.commit()
LOGGER.info("%s new IPs successfully upserted into ip table...", len(df))
# LOGGER.info("%s new IPs successfully upserted into ip table...", len(df)) # reducing logging
except (Exception, psycopg2.DatabaseError) as err:
# Show error and close connection if failed
LOGGER.error("There was a problem with your database query %s", err)
Expand Down Expand Up @@ -519,6 +521,8 @@ def query_roots(conn):
o.report_on = True
AND
r.enumerate_subs = True
ORDER BY
cyhy_db_name ASC
"""
# Option 2: Run on orgs where run_scans/fceb/fceb_child/demo are true (excluding the 142 PE orgs)
# sql = """
Expand All @@ -536,6 +540,8 @@ def query_roots(conn):
# (o.run_scans OR o.fceb OR o.fceb_child OR o.demo)
# AND
# r.enumerate_subs = True
# ORDER BY
# cyhy_db_name ASC
# """

df = pd.read_sql(sql, conn)
Expand All @@ -548,7 +554,7 @@ def insert_sub_domains(conn, df):
try:
# Execute insert query
df = df.drop_duplicates()
df["current"] = True
df.insert(len(df.columns), "current", True)
tpls = [tuple(x) for x in df.to_numpy()]
cols = ",".join(list(df.columns))
table = "sub_domains"
Expand Down Expand Up @@ -623,7 +629,7 @@ def update_shodan_ips(conn, df):
try:
extras.execute_values(cursor, sql.format(table, cols), tpls)
conn.commit()
print("Data inserted using execute_values() successfully..")
print("Shodan data inserted using execute_values() successfully..")
except (Exception, psycopg2.DatabaseError) as err:
show_psycopg2_exception(err)
cursor.close()
Expand Down
17 changes: 13 additions & 4 deletions src/pe_asm/helpers/enumerate_subs_from_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import json
import logging
import time

# Third-Party Libraries
import pandas as pd
Expand Down Expand Up @@ -36,10 +37,18 @@ def enumerate_roots(root_domain, root_uid):
)
headers = {"Content-Type": "application/json"}
response = requests.request("POST", url, headers=headers, data=payload)

# Retry clause
retry_count, max_retries, time_delay = 1, 10, 5
while response.status_code != 200 and retry_count <= max_retries:
LOGGER.warning(f"Retrying WhoisXML API endpoint (code {response.status_code}), attempt {retry_count} of {max_retries} (url: {url})")
time.sleep(time_delay)
response = requests.request("POST", url, headers=headers, data=payload)
retry_count += 1

data = response.json()
sub_domains = data["domainsList"]
print(len(sub_domains))

# print(len(sub_domains))
data_source = get_data_source_uid("WhoisXML")

# First add the root domain to the subs table
Expand Down Expand Up @@ -87,7 +96,7 @@ def get_subdomains(staging=False, orgs_df=None):
roots_df = sqs_query_roots(conn, orgs_df["organizations_uid"][0])

total_roots = len(roots_df.index)
LOGGER.info("Got %d root domains.", total_roots)
LOGGER.info("Found %d root domains", total_roots)

# Loop through roots
count = 0
Expand All @@ -104,7 +113,7 @@ def get_subdomains(staging=False, orgs_df=None):

count += 1
if count % 10 == 0 or count == total_roots:
LOGGER.info("\t\t%d/%d complete.", count, total_roots)
LOGGER.info("\t\t%d/%d roots enumerated", count, total_roots)

# Close database connection
conn.close()
Expand Down
2 changes: 1 addition & 1 deletion src/pe_asm/helpers/fill_cidrs_from_cyhy_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def fill_cidrs(staging, orgs):
(network["network"], org_id, "cyhy_db", first_seen, last_seen),
)
except Exception as e:
print(e)
LOGGER.error(e)
continue
conn.commit()
cur.close()
Expand Down
10 changes: 10 additions & 0 deletions src/pe_asm/helpers/get_cyhy_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Standard Python Libraries
import datetime
import logging
import time

# Third-Party Libraries
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -37,6 +38,15 @@ def dotgov_domains():
"""Get list of dotgov domains from the github repo."""
URL = "https://github.com/cisagov/dotgov-data/blob/main/current-federal.csv"
r = requests.get(URL)

# Retry clause
retry_count, max_retries, time_delay = 1, 10, 5
while r.status_code != 200 and retry_count <= max_retries:
LOGGER.warning(f"Retrying Github dotgov repo (code {r.status_code}), attempt {retry_count} of {max_retries} (url: {URL})")
time.sleep(time_delay)
r = requests.get(URL)
retry_count += 1

soup = BeautifulSoup(r.content, features="lxml")
table = soup.find_all("table")
df = pd.read_html(str(table))[0]
Expand Down
43 changes: 23 additions & 20 deletions src/pe_asm/helpers/link_subs_and_ips_from_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ def reverseLookup(ip_obj, failed_ips, conn, thread):
url = f"https://dns-history.whoisxmlapi.com/api/v1?apiKey={WHOIS_KEY}&ip={ip_obj['ip']}"
payload = {}
headers = {}
response = requests.request("GET", url, headers=headers, data=payload).json()
if response.get("code") == 429:
response = requests.request("GET", url, headers=headers, data=payload).json()
if response.get("code") == 429:
response = requests.request(
"GET", url, headers=headers, data=payload
).json()
if response.get("code") == 429:
failed_ips.append(ip_obj["ip"])
response = requests.request("GET", url, headers=headers, data=payload)

# Retry clause
retry_count, max_retries, time_delay = 1, 3, 3
while response.status_code != 200 and retry_count <= max_retries:
LOGGER.warning(f"Retrying WhoisXML API endpoint (code {response.status_code}), attempt {retry_count} of {max_retries} (url: {url})")
time.sleep(time_delay)
response = requests.request("GET", url, headers=headers, data=payload)
retry_count += 1
# If API call still unsuccessful
if response.status_code != 200:
bad_ip = ip_obj["ip"]
LOGGER.error(f"Max retries reached for {bad_ip}, labeling as failed")
failed_ips.append(ip_obj["ip"])
response = response.json()

found_domains = []
try:
Expand All @@ -52,7 +58,7 @@ def reverseLookup(ip_obj, failed_ips, conn, thread):

result = response["result"]
for domain in result:
print(domain)
# print(domain)
try:
found_domains.append(
{
Expand Down Expand Up @@ -90,8 +96,8 @@ def link_domain_from_ip(ip_obj, org_uid, data_source, failed_ips, conn, thread):
),
)
row = cur.fetchone()
print("Row after procedure")
print(row)
# print("Row after procedure")
# print(row)
conn.commit()
cur.close()
return found_domains
Expand All @@ -100,17 +106,14 @@ def link_domain_from_ip(ip_obj, org_uid, data_source, failed_ips, conn, thread):
def run_ip_chunk(org_uid, ips_df, thread, conn):
"""Run the provided chunk through the linking process."""
count = 0
last_100 = time.time()
last_chunk = time.time()
failed_ips = []
for ip_index, ip in ips_df.iterrows():
# Set up logging for every 100 IPs
# Log progress
count += 1
if count % 10000 == 0:
LOGGER.info(f"{thread}: Currently Running ips: {count}/{len(ips_df)}")
LOGGER.info(
f"{thread}: {time.time() - last_100} seconds for the last 50 IPs"
)
last_100 = time.time()
LOGGER.info(f"{thread}: Running IPs: {count}/{len(ips_df)}, {time.time() - last_chunk} seconds for the last IP chunk")
last_chunk = time.time()

# Link domain from IP
try:
Expand Down Expand Up @@ -147,7 +150,7 @@ def connect_subs_from_ips(staging, orgs_df=None):
else:
conn = pe_db_connect()
LOGGER.info(
"Running on %s. %d/%d complete.", org["cyhy_db_name"], org_count, num_orgs
"Running on %s, %d/%d", org["cyhy_db_name"], org_count, num_orgs
)
# Query IPs
org_uid = org["organizations_uid"]
Expand Down
8 changes: 4 additions & 4 deletions src/pe_asm/helpers/link_subs_and_ips_from_subs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def link_ip_from_domain(sub, root_uid, org_uid, data_source, conn):
(DATE, ip_hash, ip, org_uid, sub, data_source, root_uid, None),
)
row = cur.fetchone()
print(row)
# print(row)
conn.commit()
cur.close()
return 1
Expand All @@ -66,21 +66,21 @@ def connect_ips_from_subs(staging, orgs_df=None):
conn.close()

# Loop through orgs
org_count = 0
org_count = 1
for org_index, org_row in orgs_df.iterrows():
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
LOGGER.info(
"Running on %s. %d/%d complete.",
"Running on %s, %d/%d",
org_row["cyhy_db_name"],
org_count,
num_orgs,
)
org_uid = org_row["organizations_uid"]
print(org_uid)
# print(org_uid)

# Query sub-domains
subs_df = query_subs(str(org_uid), conn)
Expand Down
12 changes: 6 additions & 6 deletions src/pe_asm/helpers/shodan_dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def ip_dedupe(api, ips, agency_type, conn):
try:
hosts = api.host(ips[i * 100 : (i + 1) * 100])
except shodan.APIError as err:
print("Error: {}".format(err))
LOGGER.error(f"Error: {err}")
continue
if isinstance(hosts, list):
for h in hosts:
Expand Down Expand Up @@ -324,31 +324,31 @@ def dedupe(staging, orgs_df=None):
api = shodan_api_init()[0]

# Loop through orgs
org_count = 0
org_count = 1
for org_index, org in orgs_df.iterrows():
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
LOGGER.info(
"Running on %s. %d/%d complete.",
"Running on %s, %d/%d",
org["cyhy_db_name"],
org_count,
num_orgs,
)
# Query CIDRS
cidrs = query_cidrs_by_org(conn, org["organizations_uid"])
LOGGER.info(f"{len(cidrs)} cidrs found")
LOGGER.info(f"{len(cidrs)} CIDRs found")

# Run cidr dedupe if there are CIDRs
if len(cidrs) > 0:
cidr_dedupe(cidrs, api, org["agency_type"], conn)

# Get IPs related to current sub-domains
LOGGER.info("Grabbing floating IPs")
LOGGER.info("Retrieving floating IPs")
ips = query_floating_ips(conn, org["organizations_uid"])
LOGGER.info("Got Ips")
LOGGER.info("Floating IPs retrieved")
if len(ips) > 0:
LOGGER.info("Running dedupe on IPs")
ip_dedupe(api, ips, org["agency_type"], conn)
Expand Down

0 comments on commit 5989b89

Please sign in to comment.