-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update yeti analyzer #2930
Merged
Merged
Update yeti analyzer #2930
Changes from 8 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
076be3e
Update yeti analyzer
tomchop 743ede9
Fix test attempt 1
tomchop 9d02c27
Fix test attempt 2
tomchop ca1d33c
Try updated source attributes
tomchop 0f4f1c8
Fix test
tomchop 98eb168
fix import order
tomchop e19c94e
More linter fixes
tomchop 0cbda64
No colors anymore
tomchop 05c7822
Merge branch 'master' into yetianalyzer
berggren e7088c2
Merge branch 'master' into yetianalyzer
jkppr 80ec853
Merge branch 'master' into yetianalyzer
jkppr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
import json | ||
import re | ||
|
||
from typing import Dict, List | ||
|
||
from flask import current_app | ||
import requests | ||
|
||
|
@@ -15,8 +17,8 @@ class YetiIndicators(interface.BaseAnalyzer): | |
"""Analyzer for Yeti threat intel indicators.""" | ||
|
||
NAME = "yetiindicators" | ||
DISPLAY_NAME = "Yeti threat intel indicators" | ||
DESCRIPTION = "Mark events using Yeti threat intel indicators" | ||
DISPLAY_NAME = "Yeti CTI indicators" | ||
DESCRIPTION = "Mark events using CTI indicators from Yeti" | ||
|
||
DEPENDENCIES = frozenset(["domain"]) | ||
|
||
|
@@ -30,23 +32,32 @@ def __init__(self, index_name, sketch_id, timeline_id=None): | |
""" | ||
super().__init__(index_name, sketch_id, timeline_id=timeline_id) | ||
self.intel = {} | ||
self.yeti_api_root = current_app.config.get("YETI_API_ROOT") | ||
self.yeti_web_root = current_app.config.get("YETI_API_ROOT") | ||
self.yeti_web_root.replace("/api", "") | ||
root = current_app.config.get("YETI_API_ROOT") | ||
if root.endswith("/"): | ||
root = root[:-1] | ||
self.yeti_api_root = root | ||
self.yeti_web_root = root.replace("/api/v2", "") | ||
Comment on lines
+38
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is to support old configurations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is to ensure that it's easier to maintain future versions of the Yeti API. |
||
self.yeti_api_key = current_app.config.get("YETI_API_KEY") | ||
|
||
def get_neighbors(self, entity_id): | ||
def get_neighbors(self, yeti_object: Dict) -> List[Dict]: | ||
"""Retrieves a list of neighbors associated to a given entity. | ||
|
||
Args: | ||
entity_id (str): STIX ID of the entity to get associated inticators | ||
from. (typically an Intrusion Set or an Incident) | ||
yeti_object: The Yeti object to get neighbors from. | ||
|
||
Returns: | ||
A list of JSON objects describing a Yeti object. | ||
A list of JSON objects describing a Yeti entity. | ||
""" | ||
extended_id = f"{yeti_object['root_type']}/{yeti_object['id']}" | ||
results = requests.post( | ||
f"{self.yeti_api_root}/entities/{entity_id}/neighbors/", | ||
f"{self.yeti_api_root}/graph/search", | ||
json={ | ||
"source": extended_id, | ||
"link_type": "", | ||
"hops": 1, | ||
"direction": "any", | ||
"include_original": False, | ||
}, | ||
headers={"X-Yeti-API": self.yeti_api_key}, | ||
) | ||
if results.status_code != 200: | ||
|
@@ -57,10 +68,10 @@ def get_neighbors(self, entity_id): | |
|
||
return neighbors | ||
|
||
def get_indicators(self, indicator_type): | ||
def get_indicators(self, indicator_type: str) -> None: | ||
"""Populates the intel attribute with entities from Yeti.""" | ||
response = requests.post( | ||
self.yeti_api_root + "/indicators/filter/", | ||
self.yeti_api_root + "/indicators/search", | ||
json={"name": "", "type": indicator_type}, | ||
headers={"X-Yeti-API": self.yeti_api_key}, | ||
) | ||
|
@@ -69,15 +80,22 @@ def get_indicators(self, indicator_type): | |
f"Error {response.status_code} retrieving indicators from Yeti:" | ||
+ response.json() | ||
) | ||
self.intel = {item["id"]: item for item in response.json()} | ||
for item in response.json(): | ||
item["compiled_regexp"] = re.compile(item["pattern"]) | ||
self.intel[item["id"]] = item | ||
data = response.json() | ||
self.intel = {item["id"]: item for item in data["indicators"]} | ||
for _id, indicator in self.intel.items(): | ||
indicator["compiled_regexp"] = re.compile(indicator["pattern"]) | ||
self.intel[_id] = indicator | ||
|
||
def mark_event(self, indicator, event, neighbors): | ||
def mark_event( | ||
self, indicator: Dict, event: interface.Event, neighbors: List[Dict] | ||
): | ||
"""Annotate an event with data from indicators and neighbors. | ||
|
||
Tags with skull emoji, adds a comment to the event. | ||
Args: | ||
indicator: a dictionary representing a Yeti indicator object. | ||
event: a Timesketch sketch Event object. | ||
neighbors: a list of Yeti entities related to the indicator. | ||
""" | ||
event.add_emojis([emojis.get_emoji("SKULL")]) | ||
tags = [] | ||
|
@@ -104,7 +122,7 @@ def run(self): | |
if not self.yeti_api_root or not self.yeti_api_key: | ||
return "No Yeti configuration settings found, aborting." | ||
|
||
self.get_indicators("x-regex") | ||
self.get_indicators("regex") | ||
|
||
entities_found = set() | ||
total_matches = 0 | ||
|
@@ -131,7 +149,7 @@ def run(self): | |
} | ||
|
||
events = self.event_stream(query_dsl=query_dsl, return_fields=["message"]) | ||
neighbors = self.get_neighbors(indicator["id"]) | ||
neighbors = self.get_neighbors(indicator) | ||
|
||
for event in events: | ||
total_matches += 1 | ||
|
@@ -140,7 +158,7 @@ def run(self): | |
for n in neighbors: | ||
entities_found.add(f"{n['name']}:{n['type']}") | ||
|
||
uri = f"{self.yeti_web_root}/entities/indicator/{indicator['id']}" | ||
uri = f"{self.yeti_web_root}/indicators/{indicator['id']}" | ||
intel = { | ||
"externalURI": uri, | ||
"ioc": indicator["pattern"], | ||
|
@@ -153,7 +171,11 @@ def run(self): | |
new_indicators.add(indicator["id"]) | ||
|
||
if not total_matches: | ||
return "No indicators were found in the timeline." | ||
self.output.result_status = "SUCCESS" | ||
self.output.result_priority = "NOTE" | ||
note = "No indicators were found in the timeline." | ||
self.output.result_summary = note | ||
return str(self.output) | ||
|
||
for entity in entities_found: | ||
name, _type = entity.split(":") | ||
|
@@ -171,10 +193,15 @@ def run(self): | |
overwrite=True, | ||
) | ||
|
||
return ( | ||
success_note = ( | ||
f"{total_matches} events matched {len(new_indicators)} " | ||
f"new indicators. Found: {', '.join(entities_found)}" | ||
) | ||
self.output.result_status = "SUCCESS" | ||
self.output.result_priority = "HIGH" | ||
self.output.result_summary = success_note | ||
|
||
return str(self.output) | ||
|
||
|
||
manager.AnalysisManager.register_analyzer(YetiIndicators) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this handled at the Yeti web layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it's to avoid sending queries like
/api/v2//blah
. It also depends on how people have their webserver setup, I suppose.