-
Notifications
You must be signed in to change notification settings - Fork 127
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(nethunt): multiobj network watch example
- Loading branch information
Showing
6 changed files
with
267 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
#!/usr/local/bin/python | ||
# Copyright © 2022 The vt-py authors. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Manage a set of YARA nethunting rules by adding/removing domains. | ||
This script automatically updates a set of nethunting rules (file, url, domain, | ||
ip) in your VT account by adding and removing domains. | ||
Read more: | ||
https://www.virustotal.com/gui/hunting-overview | ||
https://developers.virustotal.com/v3.0/reference#livehunt | ||
https://developers.virustotal.com/docs/nethunt | ||
""" | ||
|
||
import argparse | ||
import asyncio | ||
import copy | ||
import logging | ||
import os | ||
import re | ||
import vt | ||
import sys | ||
|
||
|
||
API_KEY_ENV_VAR = 'VT_API_KEY' | ||
RULESET_PREFIX = "auto_network_watch_" | ||
|
||
|
||
def extract_domains_from_rule(rules): | ||
"""Extract the domain list from the comment of a yara rule.""" | ||
domain_list = [] | ||
for line in rules.split('*/')[0].split('---', 2)[1].splitlines(): | ||
if not line: | ||
continue | ||
domain = line.split('* ', 2)[1] | ||
if domain: | ||
domain_list.append(domain) | ||
return domain_list | ||
|
||
|
||
def get_rulesets(): | ||
"""Retrieve a rule from VT to get currently monitored properties.""" | ||
rulesets = {} | ||
with vt.Client(os.environ.get(API_KEY_ENV_VAR)) as client: | ||
try: | ||
rulesets_it = client.iterator( | ||
'/intelligence/hunting_rulesets', | ||
params={'filter': f'name:{RULESET_PREFIX}* tag:autogenerated'}, | ||
limit=10) | ||
except vt.error.APIError as e: | ||
print(f'Error retrieving {RULESET_PREFIX}* rulesets: {e}') | ||
|
||
if rulesets_it: | ||
for ruleset in rulesets_it: | ||
kind = ruleset.name.split(RULESET_PREFIX)[1] | ||
rulesets[kind] = {'id': ruleset.id, | ||
'name': ruleset.name, | ||
'rules': ruleset.rules, | ||
'domains': extract_domains_from_rule(ruleset.rules)} | ||
return rulesets | ||
|
||
|
||
def render_template(kind, domains): | ||
template_dir = os.path.join(os.path.dirname(__file__), 'netwatch_templates') | ||
body_template = os.path.join(template_dir, '_body.yara') | ||
if not os.path.exists(body_template): | ||
print(f'ERROR: file {body_template} not found.') | ||
sys.exit(1) | ||
|
||
domain_list = '\n * '.join(domains) | ||
template = '' | ||
with open(body_template, encoding='utf-8') as f: | ||
template += f.read().replace('${domain_list}', domain_list) | ||
template += '\n' | ||
|
||
kind_template = os.path.join(template_dir, kind + '.yara') | ||
if not os.path.exists(kind_template): | ||
print(f'ERROR: file {kind_template} not found.') | ||
sys.exit(1) | ||
with open(kind_template, encoding='utf-8') as f: | ||
rule_block = f.read() | ||
|
||
for domain in domains: | ||
domain_escaped = re.compile(r'[^\w\d]').sub('_', domain) | ||
template += (rule_block. | ||
replace('${domain}', domain). | ||
replace('${domain_escaped}', domain_escaped)) | ||
template += '\n' | ||
return template | ||
|
||
|
||
async def build_rulesets(queue, rulesets, domains): | ||
for kind in ('file', 'url', 'domain', 'ip_address'): | ||
task = { | ||
'name': RULESET_PREFIX + kind, | ||
'kind': kind, | ||
'rules': render_template(kind, domains)} | ||
if rulesets.get(kind): | ||
task['id'] = rulesets[kind].get('id') | ||
await queue.put(task) | ||
|
||
|
||
async def upload_rulesets(queue): | ||
"""Uploads selected files to VirusTotal.""" | ||
async with vt.Client(os.environ.get(API_KEY_ENV_VAR)) as client: | ||
while not queue.empty(): | ||
task = await queue.get() | ||
|
||
name = task.get('name') | ||
if task.get('id'): | ||
ruleset = vt.Object( | ||
obj_type='hunting_ruleset', | ||
obj_attributes={ | ||
'rules': task.get('rules')}) | ||
try: | ||
# Fix for https://github.com/VirusTotal/vt-py/issues/155 issue. | ||
await client.patch_async( | ||
path='/intelligence/hunting_rulesets/' + task.get('id'), | ||
json_data={'data': ruleset.to_dict()}) | ||
print(f'Ruleset {name} updated.') | ||
except vt.error.APIError as e: | ||
print(f'Error updating {name}: {e}') | ||
|
||
else: | ||
ruleset = vt.Object( | ||
obj_type='hunting_ruleset', | ||
obj_attributes={ | ||
'name': name, | ||
'match_object_type': task.get('kind'), | ||
'enabled': True, | ||
'tags': ('autogenerated',), | ||
'rules': task.get('rules')}) | ||
try: | ||
await client.post_object_async( | ||
path='/intelligence/hunting_rulesets', obj=ruleset) | ||
print(f'Ruleset {name} created.') | ||
except vt.error.APIError as e: | ||
print(f'Error saving {name}: {e}') | ||
|
||
queue.task_done() | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description=( | ||
'Manage a set of YARA nethunting rules by adding/removing domains.')) | ||
parser.add_argument('-l', '--list', action='store_true', | ||
help='List current monitored domains.') | ||
parser.add_argument('-a', '--add-domain', | ||
help='Add a domain to the list.') | ||
parser.add_argument('-d', '--delete-domain', | ||
help='Remove a domain from the list.') | ||
parser.add_argument('--workers', type=int, required=False, default=4, | ||
help='number of concurrent workers') | ||
args = parser.parse_args() | ||
|
||
|
||
if os.environ.get(API_KEY_ENV_VAR) is None: | ||
logging.critical(f'Please set {API_KEY_ENV_VAR} environment variable') | ||
return | ||
|
||
rulesets = get_rulesets() | ||
if not rulesets and not args.add_domain: | ||
print('* Empty domain list, use --add-domain domain.tld to register one') | ||
sys.exit(1) | ||
|
||
domains = rulesets.get('url', {}).get('domains', []) | ||
if args.list: | ||
if not domains: | ||
print('* Empty domain list, use --add-domain domain.tld to register one') | ||
sys.exit(0) | ||
|
||
print('Currently monitored domains:') | ||
for domain in domains: | ||
print(f'- {domain}') | ||
sys.exit(0) | ||
|
||
new_domain_list = copy.copy(domains) | ||
if args.add_domain: | ||
new_domain_list.append(args.add_domain) | ||
|
||
if args.delete_domain: | ||
if not args.delete_domain in new_domain_list: | ||
print(f'* {args.delete_domain} not in list') | ||
sys.exit(1) | ||
new_domain_list.remove(args.delete_domain) | ||
|
||
new_domain_list = list(set(new_domain_list)) | ||
new_domain_list.sort() | ||
|
||
if new_domain_list != domains: | ||
print('Updating monitored list:') | ||
for domain in new_domain_list: | ||
print(f'- {domain}') | ||
|
||
# Update the rulesets | ||
loop = asyncio.get_event_loop() | ||
queue = asyncio.Queue(loop=loop) | ||
|
||
loop.create_task(build_rulesets(queue, rulesets, new_domain_list)) | ||
|
||
worker_tasks = [] | ||
for _ in range(args.workers): | ||
worker_tasks.append( | ||
loop.create_task(upload_rulesets(queue))) | ||
|
||
# Wait until all worker tasks has completed. | ||
loop.run_until_complete(asyncio.gather(*worker_tasks)) | ||
loop.close() | ||
|
||
else: | ||
print('Nothing to do') | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
/* | ||
DO NOT MODIFY, THIS RULE WAS AUTOMATICALLY GENERATED USING | ||
https://github.com/VirusTotal/vt-py/tree/master/examples/livehunt_network_watch.py | ||
Monitored domains: | ||
--- | ||
* ${domain_list} | ||
*/ | ||
|
||
import "vt" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
rule network_watch_${domain_escaped} : ${domain_escaped} { | ||
meta: | ||
description = "Monitor new domains for ${domain}" | ||
target_entity = "domain" | ||
condition: | ||
vt.net.domain.new_domain and | ||
vt.net.domain.raw endswith "${domain}" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
rule network_watch_${domain_escaped} : ${domain_escaped} { | ||
meta: | ||
description = "New files downloaded from domain ${domain}" | ||
target_entity = "file" | ||
condition: | ||
vt.metadata.new_file and | ||
vt.metadata.itw.domain.raw iendswith "${domain}" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
rule network_watch_${domain_escaped} : ${domain_escaped} { | ||
meta: | ||
description = "New IP addresses resolving domain ${domain}" | ||
target_entity = "ip_address" | ||
condition: | ||
vt.net.ip.reverse_lookup iendswith "${domain}" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
rule network_watch_${domain_escaped} : ${domain_escaped} { | ||
meta: | ||
description = "Monitor new URLs in ${domain}" | ||
target_entity = "url" | ||
condition: | ||
vt.net.url.new_url and | ||
vt.net.domain.raw == "${domain}" | ||
} |