-
Notifications
You must be signed in to change notification settings - Fork 517
/
utils.py
134 lines (101 loc) · 4.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import inspect
import tomllib
from pathlib import Path
from typing import Union
import click
import urllib3
import yaml
from detection_rules.misc import get_elasticsearch_client
from .definitions import HUNTING_DIR, Hunt
def get_hunt_path(uuid: str, file_path: str) -> (Path, str):
"""Resolve the path of the hunting query using either a UUID or file path."""
if uuid:
# Load the index and find the hunt by UUID
index_data = load_index_file()
for data_source, hunts in index_data.items():
if uuid in hunts:
hunt_data = hunts[uuid]
# Combine the relative path from the index with the HUNTING_DIR
hunt_path = HUNTING_DIR / hunt_data['path']
return hunt_path.resolve(), None
return None, f"No hunt found for UUID: {uuid}"
elif file_path:
# Use the provided file path
hunt_path = Path(file_path)
if not hunt_path.is_file():
return None, f"No file found at path: {file_path}"
return hunt_path.resolve(), None
return None, "Either UUID or file path must be provided."
def load_index_file() -> dict:
"""Load the hunting index.yml file."""
index_file = HUNTING_DIR / "index.yml"
if not index_file.exists():
click.echo(f"No index.yml found at {index_file}.")
return {}
with open(index_file, 'r') as f:
hunting_index = yaml.safe_load(f)
return hunting_index
def load_toml(source: Union[Path, str]) -> Hunt:
"""Load and validate TOML content as Hunt dataclass."""
if isinstance(source, Path):
if not source.is_file():
raise FileNotFoundError(f"TOML file not found: {source}")
contents = source.read_text(encoding="utf-8")
else:
contents = source
toml_dict = tomllib.loads(contents)
# Validate and load the content into the Hunt dataclass
return Hunt(**toml_dict["hunt"])
def load_all_toml(base_path: Path):
"""Load all TOML files from the directory and return a list of Hunt configurations and their paths."""
hunts = []
for toml_file in base_path.rglob("*.toml"):
hunt_config = load_toml(toml_file)
hunts.append((hunt_config, toml_file))
return hunts
def save_index_file(base_path: Path, directories: dict) -> None:
"""Save the updated index.yml file."""
index_file = base_path / "index.yml"
with open(index_file, 'w') as f:
yaml.safe_dump(directories, f, default_flow_style=False, sort_keys=False)
print(f"Index YAML updated at: {index_file}")
def validate_link(link: str):
"""Validate and return the link."""
http = urllib3.PoolManager()
response = http.request('GET', link)
if response.status != 200:
raise ValueError(f"Invalid link: {link}")
def update_index_yml(base_path: Path) -> None:
"""Update index.yml based on the current TOML files."""
directories = load_index_file()
# Load all TOML files recursively
toml_files = base_path.rglob("queries/*.toml")
for toml_file in toml_files:
# Load TOML and extract hunt configuration
hunt_config = load_toml(toml_file)
folder_name = toml_file.parent.parent.name
uuid = hunt_config.uuid
entry = {
'name': hunt_config.name,
'path': f"./{toml_file.relative_to(base_path).as_posix()}",
'mitre': hunt_config.mitre
}
# Check if the folder_name exists and if it's a list, convert it to a dictionary
if folder_name not in directories:
directories[folder_name] = {uuid: entry}
else:
if isinstance(directories[folder_name], list):
# Convert the list to a dictionary, using UUIDs as keys
directories[folder_name] = {item['uuid']: item for item in directories[folder_name]}
directories[folder_name][uuid] = entry
# Save the updated index.yml
save_index_file(base_path, directories)
def filter_elasticsearch_params(config: dict) -> dict:
"""Filter out unwanted keys from the config by inspecting the Elasticsearch client constructor."""
# Get the parameter names from the Elasticsearch class constructor
es_params = inspect.signature(get_elasticsearch_client).parameters
return {k: v for k, v in config.items() if k in es_params}