Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update yeti analyzer #2930

Merged
merged 11 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions timesketch/lib/analyzers/yetiindicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import re

from typing import Dict, List

from flask import current_app
import requests

Expand All @@ -15,8 +17,8 @@ class YetiIndicators(interface.BaseAnalyzer):
"""Analyzer for Yeti threat intel indicators."""

NAME = "yetiindicators"
DISPLAY_NAME = "Yeti threat intel indicators"
DESCRIPTION = "Mark events using Yeti threat intel indicators"
DISPLAY_NAME = "Yeti CTI indicators"
DESCRIPTION = "Mark events using CTI indicators from Yeti"

DEPENDENCIES = frozenset(["domain"])

Expand All @@ -30,23 +32,32 @@ def __init__(self, index_name, sketch_id, timeline_id=None):
"""
super().__init__(index_name, sketch_id, timeline_id=timeline_id)
self.intel = {}
self.yeti_api_root = current_app.config.get("YETI_API_ROOT")
self.yeti_web_root = current_app.config.get("YETI_API_ROOT")
self.yeti_web_root.replace("/api", "")
root = current_app.config.get("YETI_API_ROOT")
if root.endswith("/"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this handled at the Yeti web layer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's to avoid sending queries like /api/v2//blah. It also depends on how people have their webserver setup, I suppose.

root = root[:-1]
self.yeti_api_root = root
self.yeti_web_root = root.replace("/api/v2", "")
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is to support old configurations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is to ensure that it's easier to maintain future versions of the Yeti API.

self.yeti_api_key = current_app.config.get("YETI_API_KEY")

def get_neighbors(self, entity_id):
def get_neighbors(self, yeti_object: Dict) -> List[Dict]:
"""Retrieves a list of neighbors associated to a given entity.

Args:
entity_id (str): STIX ID of the entity to get associated inticators
from. (typically an Intrusion Set or an Incident)
yeti_object: The Yeti object to get neighbors from.

Returns:
A list of JSON objects describing a Yeti object.
A list of JSON objects describing a Yeti entity.
"""
extended_id = f"{yeti_object['root_type']}/{yeti_object['id']}"
results = requests.post(
f"{self.yeti_api_root}/entities/{entity_id}/neighbors/",
f"{self.yeti_api_root}/graph/search",
json={
"source": extended_id,
"link_type": "",
"hops": 1,
"direction": "any",
"include_original": False,
},
headers={"X-Yeti-API": self.yeti_api_key},
)
if results.status_code != 200:
Expand All @@ -57,10 +68,10 @@ def get_neighbors(self, entity_id):

return neighbors

def get_indicators(self, indicator_type):
def get_indicators(self, indicator_type: str) -> None:
"""Populates the intel attribute with entities from Yeti."""
response = requests.post(
self.yeti_api_root + "/indicators/filter/",
self.yeti_api_root + "/indicators/search",
json={"name": "", "type": indicator_type},
headers={"X-Yeti-API": self.yeti_api_key},
)
Expand All @@ -69,15 +80,22 @@ def get_indicators(self, indicator_type):
f"Error {response.status_code} retrieving indicators from Yeti:"
+ response.json()
)
self.intel = {item["id"]: item for item in response.json()}
for item in response.json():
item["compiled_regexp"] = re.compile(item["pattern"])
self.intel[item["id"]] = item
data = response.json()
self.intel = {item["id"]: item for item in data["indicators"]}
for _id, indicator in self.intel.items():
indicator["compiled_regexp"] = re.compile(indicator["pattern"])
self.intel[_id] = indicator

def mark_event(self, indicator, event, neighbors):
def mark_event(
self, indicator: Dict, event: interface.Event, neighbors: List[Dict]
):
"""Annotate an event with data from indicators and neighbors.

Tags with skull emoji, adds a comment to the event.
Args:
indicator: a dictionary representing a Yeti indicator object.
event: a Timesketch sketch Event object.
neighbors: a list of Yeti entities related to the indicator.
"""
event.add_emojis([emojis.get_emoji("SKULL")])
tags = []
Expand All @@ -104,7 +122,7 @@ def run(self):
if not self.yeti_api_root or not self.yeti_api_key:
return "No Yeti configuration settings found, aborting."

self.get_indicators("x-regex")
self.get_indicators("regex")

entities_found = set()
total_matches = 0
Expand All @@ -131,7 +149,7 @@ def run(self):
}

events = self.event_stream(query_dsl=query_dsl, return_fields=["message"])
neighbors = self.get_neighbors(indicator["id"])
neighbors = self.get_neighbors(indicator)

for event in events:
total_matches += 1
Expand All @@ -140,7 +158,7 @@ def run(self):
for n in neighbors:
entities_found.add(f"{n['name']}:{n['type']}")

uri = f"{self.yeti_web_root}/entities/indicator/{indicator['id']}"
uri = f"{self.yeti_web_root}/indicators/{indicator['id']}"
intel = {
"externalURI": uri,
"ioc": indicator["pattern"],
Expand All @@ -153,7 +171,11 @@ def run(self):
new_indicators.add(indicator["id"])

if not total_matches:
return "No indicators were found in the timeline."
self.output.result_status = "SUCCESS"
self.output.result_priority = "NOTE"
note = "No indicators were found in the timeline."
self.output.result_summary = note
return str(self.output)

for entity in entities_found:
name, _type = entity.split(":")
Expand All @@ -171,10 +193,15 @@ def run(self):
overwrite=True,
)

return (
success_note = (
f"{total_matches} events matched {len(new_indicators)} "
f"new indicators. Found: {', '.join(entities_found)}"
)
self.output.result_status = "SUCCESS"
self.output.result_priority = "HIGH"
self.output.result_summary = success_note

return str(self.output)


manager.AnalysisManager.register_analyzer(YetiIndicators)
71 changes: 38 additions & 33 deletions timesketch/lib/analyzers/yetiindicators_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Tests for ThreatintelPlugin."""
from __future__ import unicode_literals

import copy
import json
import re
import mock

Expand All @@ -12,29 +10,39 @@
from timesketch.lib.testlib import MockDataStore

MOCK_YETI_INTEL = {
"x-regex--6ebc9344-1111-4d65-8bdd-b6dddf613068": {
"id": "x-regex--6ebc9344-1111-4d65-8bdd-b6dddf613068",
"name": "Secret Fancy Bear c2",
"pattern": "baddomain\\.com",
"compiled_regexp": re.compile("baddomain\\.com"),
"type": "x-regex",
"12345": {
"id": "12345",
"name": "Random regex",
"pattern": "[0-9a-f]",
"compiled_regexp": re.compile(r"[0-9a-f]+\.com"),
"type": "regex",
}
}

MOCK_YETI_NEIGHBORS = [
{
"id": "x-incident--6ebc9344-1111-4d65-8bdd-b6dddf613068",
"name": "Random incident",
"type": "x-incident",
"id": "98765",
"name": "Bad malware",
"type": "malware",
}
]

MATCHING_DOMAIN_MESSAGE = {"message": "baddomain.com"}
OK_DOMAIN_MESSAGE = {"message": "okdomain.com"}
MATCHING_DOMAIN_MESSAGE = {
"__ts_timeline_id": 1,
"es_index": "",
"es_id": "",
"label": "",
"timestamp": 1410895419859714,
"timestamp_desc": "",
"datetime": "2014-09-16T19:23:40+00:00",
"source_short": "",
"source_long": "",
"message": "c0ffeebabe.com",
}


class TestThreatintelPlugin(BaseTest):
"""Tests the functionality of the analyzer."""
class TestYetiIndicators(BaseTest):
"""Tests the functionality of the YetiIndicators analyzer."""

def setUp(self):
super().setUp()
Expand All @@ -51,22 +59,21 @@ def setUp(self):
)
def test_indicator_match(self, mock_get_indicators, mock_get_neighbors):
"""Test that ES queries for indicators are correctly built."""
analyzer = yetiindicators.YetiIndicators("test_index", 1)
analyzer = yetiindicators.YetiIndicators("test_index", 1, 123)
analyzer.datastore.client = mock.Mock()
analyzer.intel = MOCK_YETI_INTEL
mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS

event = copy.deepcopy(MockDataStore.event_dict)
event["_source"].update(MATCHING_DOMAIN_MESSAGE)
analyzer.datastore.import_event("test_index", event["_source"], "0")
analyzer.datastore.import_event("test_index", MATCHING_DOMAIN_MESSAGE, "0")

message = analyzer.run()
message = json.loads(analyzer.run())
self.assertEqual(
message,
("1 events matched 1 new indicators. Found: Random incident:x-incident"),
message["result_summary"],
"1 events matched 1 new indicators. Found: Bad malware:malware",
)
mock_get_indicators.assert_called_once()
mock_get_neighbors.assert_called_once()
self.assertEqual(analyzer.tagged_events["0"]["tags"], ["bad-malware"])

# Mock the OpenSearch datastore.
@mock.patch("timesketch.lib.analyzers.interface.OpenSearchDataStore", MockDataStore)
Expand All @@ -78,29 +85,27 @@ def test_indicator_match(self, mock_get_indicators, mock_get_neighbors):
)
def test_indicator_nomatch(self, mock_get_indicators, mock_get_neighbors):
"""Test that ES queries for indicators are correctly built."""
analyzer = yetiindicators.YetiIndicators("test_index", 1)
analyzer = yetiindicators.YetiIndicators("test_index", 1, 123)
analyzer.datastore.client = mock.Mock()
analyzer.intel = MOCK_YETI_INTEL
mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS

# event = copy.deepcopy(MockDataStore.event_dict)
# event["_source"].update(OK_DOMAIN_MESSAGE)
# analyzer.datastore.import_event("test_index", event["_source"], "0")

message = analyzer.run()
self.assertEqual(message, "No indicators were found in the timeline.")
message = json.loads(analyzer.run())
self.assertEqual(
message["result_summary"], "No indicators were found in the timeline."
)
mock_get_indicators.assert_called_once()
mock_get_neighbors.asset_called_once()

@mock.patch("timesketch.lib.analyzers.interface.OpenSearchDataStore", MockDataStore)
def test_slug(self):
analyzer = yetiindicators.YetiIndicators("test_index", 1)
analyzer = yetiindicators.YetiIndicators("test_index", 1, 123)
mock_event = mock.Mock()
mock_event.get_comments.return_value = []
analyzer.mark_event(
MOCK_YETI_INTEL["x-regex--6ebc9344-1111-4d65-8bdd-b6dddf613068"],
MOCK_YETI_INTEL["12345"],
mock_event,
MOCK_YETI_NEIGHBORS,
)
# The name of the entity is "Random incident"
mock_event.add_tags.assert_called_once_with(["random-incident"])
mock_event.add_tags.assert_called_once_with(["bad-malware"])
Loading