diff --git a/examples/livehunt_network_watch.py b/examples/livehunt_network_watch.py new file mode 100644 index 0000000..a02cd6b --- /dev/null +++ b/examples/livehunt_network_watch.py @@ -0,0 +1,225 @@ +#!/usr/local/bin/python +# Copyright © 2022 The vt-py authors. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Manage a set of YARA nethunting rules by adding/removing domains. + +This script automatically updates a set of nethunting rules (file, url, domain, +ip) in your VT account by adding and removing domains. + +Read more: +https://www.virustotal.com/gui/hunting-overview +https://developers.virustotal.com/v3.0/reference#livehunt +https://developers.virustotal.com/docs/nethunt +""" + +import argparse +import asyncio +import copy +import logging +import os +import re +import vt +import sys + + +API_KEY_ENV_VAR = 'VT_API_KEY' +RULESET_PREFIX = "auto_network_watch_" + + +def extract_domains_from_rule(rules): + """Extract the domain list from the comment of a yara rule.""" + domain_list = [] + for line in rules.split('*/')[0].split('---', 2)[1].splitlines(): + if not line: + continue + domain = line.split('* ', 2)[1] + if domain: + domain_list.append(domain) + return domain_list + + +def get_rulesets(): + """Retrieve a rule from VT to get currently monitored properties.""" + rulesets = {} + with vt.Client(os.environ.get(API_KEY_ENV_VAR)) as client: + try: + rulesets_it = client.iterator( + '/intelligence/hunting_rulesets', + params={'filter': f'name:{RULESET_PREFIX}* tag:autogenerated'}, + limit=10) + except vt.error.APIError as e: + print(f'Error retrieving {RULESET_PREFIX}* rulesets: {e}') + + if rulesets_it: + for ruleset in rulesets_it: + kind = ruleset.name.split(RULESET_PREFIX)[1] + rulesets[kind] = {'id': ruleset.id, + 'name': ruleset.name, + 'rules': ruleset.rules, + 'domains': extract_domains_from_rule(ruleset.rules)} + return rulesets + + +def render_template(kind, domains): + template_dir = os.path.join(os.path.dirname(__file__), 'netwatch_templates') + body_template = os.path.join(template_dir, '_body.yara') + if not os.path.exists(body_template): + print(f'ERROR: file {body_template} not found.') + sys.exit(1) + + domain_list = '\n * '.join(domains) + template = '' + with open(body_template, encoding='utf-8') as f: + template += f.read().replace('${domain_list}', domain_list) + template += '\n' + + kind_template = os.path.join(template_dir, kind + '.yara') + if not os.path.exists(kind_template): + print(f'ERROR: file {kind_template} not found.') + sys.exit(1) + with open(kind_template, encoding='utf-8') as f: + rule_block = f.read() + + for domain in domains: + domain_escaped = re.compile(r'[^\w\d]').sub('_', domain) + template += (rule_block. + replace('${domain}', domain). + replace('${domain_escaped}', domain_escaped)) + template += '\n' + return template + + +async def build_rulesets(queue, rulesets, domains): + for kind in ('file', 'url', 'domain', 'ip_address'): + task = { + 'name': RULESET_PREFIX + kind, + 'kind': kind, + 'rules': render_template(kind, domains)} + if rulesets.get(kind): + task['id'] = rulesets[kind].get('id') + await queue.put(task) + + +async def upload_rulesets(queue): + """Uploads selected files to VirusTotal.""" + async with vt.Client(os.environ.get(API_KEY_ENV_VAR)) as client: + while not queue.empty(): + task = await queue.get() + + name = task.get('name') + if task.get('id'): + ruleset = vt.Object( + obj_type='hunting_ruleset', + obj_attributes={ + 'rules': task.get('rules')}) + try: + # Fix for https://github.com/VirusTotal/vt-py/issues/155 issue. + await client.patch_async( + path='/intelligence/hunting_rulesets/' + task.get('id'), + json_data={'data': ruleset.to_dict()}) + print(f'Ruleset {name} updated.') + except vt.error.APIError as e: + print(f'Error updating {name}: {e}') + + else: + ruleset = vt.Object( + obj_type='hunting_ruleset', + obj_attributes={ + 'name': name, + 'match_object_type': task.get('kind'), + 'enabled': True, + 'tags': ('autogenerated',), + 'rules': task.get('rules')}) + try: + await client.post_object_async( + path='/intelligence/hunting_rulesets', obj=ruleset) + print(f'Ruleset {name} created.') + except vt.error.APIError as e: + print(f'Error saving {name}: {e}') + + queue.task_done() + + +def main(): + parser = argparse.ArgumentParser(description=( + 'Manage a set of YARA nethunting rules by adding/removing domains.')) + parser.add_argument('-l', '--list', action='store_true', + help='List current monitored domains.') + parser.add_argument('-a', '--add-domain', + help='Add a domain to the list.') + parser.add_argument('-d', '--delete-domain', + help='Remove a domain from the list.') + parser.add_argument('--workers', type=int, required=False, default=4, + help='number of concurrent workers') + args = parser.parse_args() + + + if os.environ.get(API_KEY_ENV_VAR) is None: + logging.critical(f'Please set {API_KEY_ENV_VAR} environment variable') + return + + rulesets = get_rulesets() + if not rulesets and not args.add_domain: + print('* Empty domain list, use --add-domain domain.tld to register one') + sys.exit(1) + + domains = rulesets.get('url', {}).get('domains', []) + if args.list: + if not domains: + print('* Empty domain list, use --add-domain domain.tld to register one') + sys.exit(0) + + print('Currently monitored domains:') + for domain in domains: + print(f'- {domain}') + sys.exit(0) + + new_domain_list = copy.copy(domains) + if args.add_domain: + new_domain_list.append(args.add_domain) + + if args.delete_domain: + if not args.delete_domain in new_domain_list: + print(f'* {args.delete_domain} not in list') + sys.exit(1) + new_domain_list.remove(args.delete_domain) + + new_domain_list = list(set(new_domain_list)) + new_domain_list.sort() + + if new_domain_list != domains: + print('Updating monitored list:') + for domain in new_domain_list: + print(f'- {domain}') + + # Update the rulesets + loop = asyncio.get_event_loop() + queue = asyncio.Queue(loop=loop) + + loop.create_task(build_rulesets(queue, rulesets, new_domain_list)) + + worker_tasks = [] + for _ in range(args.workers): + worker_tasks.append( + loop.create_task(upload_rulesets(queue))) + + # Wait until all worker tasks has completed. + loop.run_until_complete(asyncio.gather(*worker_tasks)) + loop.close() + + else: + print('Nothing to do') + +if __name__ == '__main__': + main() diff --git a/examples/netwatch_templates/_body.yara b/examples/netwatch_templates/_body.yara new file mode 100644 index 0000000..d81950f --- /dev/null +++ b/examples/netwatch_templates/_body.yara @@ -0,0 +1,11 @@ +/* + DO NOT MODIFY, THIS RULE WAS AUTOMATICALLY GENERATED USING + https://github.com/VirusTotal/vt-py/tree/master/examples/livehunt_network_watch.py + + Monitored domains: + --- + * ${domain_list} + +*/ + +import "vt" diff --git a/examples/netwatch_templates/domain.yara b/examples/netwatch_templates/domain.yara new file mode 100644 index 0000000..e703011 --- /dev/null +++ b/examples/netwatch_templates/domain.yara @@ -0,0 +1,8 @@ +rule network_watch_${domain_escaped} : ${domain_escaped} { +meta: + description = "Monitor new domains for ${domain}" + target_entity = "domain" +condition: + vt.net.domain.new_domain and + vt.net.domain.raw endswith "${domain}" +} diff --git a/examples/netwatch_templates/file.yara b/examples/netwatch_templates/file.yara new file mode 100644 index 0000000..b37a619 --- /dev/null +++ b/examples/netwatch_templates/file.yara @@ -0,0 +1,8 @@ +rule network_watch_${domain_escaped} : ${domain_escaped} { +meta: + description = "New files downloaded from domain ${domain}" + target_entity = "file" +condition: + vt.metadata.new_file and + vt.metadata.itw.domain.raw iendswith "${domain}" +} diff --git a/examples/netwatch_templates/ip_address.yara b/examples/netwatch_templates/ip_address.yara new file mode 100644 index 0000000..b000adf --- /dev/null +++ b/examples/netwatch_templates/ip_address.yara @@ -0,0 +1,7 @@ +rule network_watch_${domain_escaped} : ${domain_escaped} { +meta: + description = "New IP addresses resolving domain ${domain}" + target_entity = "ip_address" +condition: + vt.net.ip.reverse_lookup iendswith "${domain}" +} diff --git a/examples/netwatch_templates/url.yara b/examples/netwatch_templates/url.yara new file mode 100644 index 0000000..0785094 --- /dev/null +++ b/examples/netwatch_templates/url.yara @@ -0,0 +1,8 @@ +rule network_watch_${domain_escaped} : ${domain_escaped} { +meta: + description = "Monitor new URLs in ${domain}" + target_entity = "url" +condition: + vt.net.url.new_url and + vt.net.domain.raw == "${domain}" +}