Skip to content

Commit

Permalink
[urlhaus] Caching and state output (#1026)
Browse files Browse the repository at this point in the history
  • Loading branch information
sommerda authored Mar 10, 2023
1 parent 6eb02c6 commit 23f01f5
Showing 1 changed file with 72 additions and 28 deletions.
100 changes: 72 additions & 28 deletions external-import/urlhaus/src/urlhaus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ssl
import sys
import time
import traceback
import urllib.request

import certifi
Expand Down Expand Up @@ -31,7 +32,7 @@ def __init__(self):
"URLHAUS_IMPORT_OFFLINE", ["urlhaus", "import_offline"], config, False, True
)
self.urlhaus_interval = get_config_variable(
"URLHAUS_INTERVAL", ["urlhaus", "interval"], config, True
"URLHAUS_INTERVAL", ["urlhaus", "interval"], config, False
)
self.create_indicators = get_config_variable(
"URLHAUS_CREATE_INDICATORS",
Expand All @@ -58,8 +59,8 @@ def __init__(self):
description="abuse.ch is operated by a random swiss guy fighting malware for non-profit, running a couple of projects helping internet service providers and network operators protecting their infrastructure from malware.",
)

def get_interval(self):
return int(self.urlhaus_interval) * 60 * 60 * 24
def get_interval(self, offset=0):
return (float(self.urlhaus_interval) * 60 * 60 * 24) + offset

def next_run(self, seconds):
return
Expand All @@ -84,15 +85,19 @@ def run(self):
self.helper.log_info("Connector has never run")
# If the last_run is more than interval-1 day
if last_run is None or (
(timestamp - last_run)
> ((int(self.urlhaus_interval) - 1) * 60 * 60 * 24)
(timestamp - last_run) > self.get_interval(offset=-1)
):
self.helper.log_info("Connector will run!")
now = datetime.datetime.utcfromtimestamp(timestamp)
friendly_name = "URLhaus run @ " + now.strftime("%Y-%m-%d %H:%M:%S")
work_id = self.helper.api.work.initiate_work(
self.helper.connect_id, friendly_name
)

# initialize the threat cache with each run
if self.threats_from_labels:
treat_cache = {}

try:
response = urllib.request.urlopen(
self.urlhaus_csv_url,
Expand All @@ -110,8 +115,39 @@ def run(self):
)
rdr = csv.reader(filter(lambda row: row[0] != "#", fp))
bundle_objects = []
## the csv-file hast the following columns
# id,dateadded,url,url_status,last_online,threat,tags,urlhaus_link,reporter
for row in rdr:

if (
current_state is not None
and "last_processed_entry" in current_state
):
last_processed_entry = current_state[
"last_processed_entry"
] # epoch time format
else:
self.helper.log_info(
"'last_processed_entry' state not found, setting it to epoch start."
)
last_processed_entry = 0 # start of the epoch

last_processed_entry_running_max = last_processed_entry

for i, row in enumerate(rdr):
entry_date = parse(row[1])

if i % 5000 == 0:
self.helper.log_info(
f"Process entry {i} with dateadded='{entry_date.strftime('%Y-%m-%d %H:%M:%S')}'"
)

# skip entry if newer events already processed in the past
if last_processed_entry > entry_date.timestamp():
continue
last_processed_entry_running_max = max(
entry_date.timestamp(), last_processed_entry_running_max
)

if row[3] == "online" or self.urlhaus_import_offline:
external_reference = stix2.ExternalReference(
source_name="Abuse.ch URLhaus",
Expand Down Expand Up @@ -139,13 +175,16 @@ def run(self):
if self.threats_from_labels:
for label in row[6].split(","):
if label:
custom_attributes = """
id
standard_id
entity_type
"""
threat = (
self.helper.api.stix_domain_object.read(
# implementing a primitive caching
try:
threat = treat_cache[label]
except KeyError:
custom_attributes = """
id
standard_id
entity_type
"""
threat = self.helper.api.stix_domain_object.read(
filters=[
{
"key": "name",
Expand All @@ -155,22 +194,22 @@ def run(self):
first=1,
customAttributes=custom_attributes,
)
)
treat_cache[label] = threat

if threat is not None:
date = parse(row[1])
relation = stix2.Relationship(
id=StixCoreRelationship.generate_id(
"related-to",
stix_observable.id,
threat["standard_id"],
date,
date,
entry_date,
entry_date,
),
source_ref=stix_observable.id,
target_ref=threat["standard_id"],
relationship_type="related-to",
start_time=date,
stop_time=date
start_time=entry_date,
stop_time=entry_date
+ datetime.timedelta(0, 3),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity[
Expand All @@ -179,8 +218,8 @@ def run(self):
object_marking_refs=[
stix2.TLP_WHITE
],
created=date,
modified=date,
created=entry_date,
modified=entry_date,
allow_custom=True,
)
bundle_objects.append(relation)
Expand All @@ -199,14 +238,19 @@ def run(self):
os.remove(
os.path.dirname(os.path.abspath(__file__)) + "/data.csv"
)
except Exception as e:
self.helper.log_error(str(e))
except Exception:
self.helper.log_error(traceback.format_exc())
# Store the current timestamp as a last run
message = "Connector successfully run, storing last_run as " + str(
timestamp
)
self.helper.log_info(message)
self.helper.set_state({"last_run": timestamp})
self.helper.set_state(
{
"last_run": timestamp,
"last_processed_entry": last_processed_entry_running_max,
}
)
self.helper.api.work.to_processed(work_id, message)
self.helper.log_info(
"Last_run stored, next run in: "
Expand All @@ -224,8 +268,8 @@ def run(self):
except (KeyboardInterrupt, SystemExit):
self.helper.log_info("Connector stop")
sys.exit(0)
except Exception as e:
self.helper.log_error(str(e))
except Exception:
self.helper.log_error(traceback.format_exc())

if self.helper.connect_run_and_terminate:
self.helper.log_info("Connector stop")
Expand All @@ -238,7 +282,7 @@ def run(self):
try:
URLhausConnector = URLhaus()
URLhausConnector.run()
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
time.sleep(10)
sys.exit(0)

0 comments on commit 23f01f5

Please sign in to comment.