Skip to content

Commit

Permalink
Add feed integration
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloperezj committed Jul 8, 2024
1 parent 0a10bc3 commit a50361c
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
import demistomock as demisto # noqa: F401
from CommonServerPython import * # noqa: F401

import bz2
import io
import json
import tarfile
import urllib3

# Disable insecure warnings.
urllib3.disable_warnings()


FEED_STR = {
'apt': 'APT',
'cve': 'CVE',
'iot': 'IoT',
'mobile': 'Mobile',
'ransomware': 'Ransomware',
}


def _get_current_hour():
"""Gets current hour for Threat feeds."""
time_obj = datetime.utcnow() - timedelta(hours=2)
hour = time_obj.strftime('%Y%m%d%H')
return hour


def _get_indicators(response):
"""Gets indicators from response."""
indicators = []
decompressed_data = bz2.decompress(response)
tar_bytes = io.BytesIO(decompressed_data)
with tarfile.open(fileobj=tar_bytes, mode='r:') as tar:
for member in tar.getmembers():
file_data = tar.extractfile(member)
if file_data:
while line := file_data.readline():
decoded_data = line.decode('utf-8')
indicator = json.loads(decoded_data)
indicators.append(indicator)
return indicators


class Client(BaseClient):
"""Client for Google Threat Intelligence API."""

def fetch_indicators(self, feed_type: str = 'apt', hour: str = None):
"""Fetches indicators given a feed type and an hour."""
if not hour:
hour = _get_current_hour()
return self._http_request(
'GET',
f'threat_feeds/{feed_type}/hourly/{hour}',
resp_type='content',
)

def get_threat_feed(self, feed_type: str) -> list:
"""Retrieves matches for a given feed type."""
last_threat_feed = demisto.getIntegrationContext().get('last_threat_feed')

hour = _get_current_hour()

if last_threat_feed == hour:
return []

response = self.fetch_indicators(feed_type, hour)
matches = _get_indicators(response)
demisto.setIntegrationContext({'last_threat_feed': hour})
return matches


def test_module(client: Client) -> str:
client.fetch_indicators()
return 'ok'


def fetch_indicators_command(client: Client,
feed_type: str,
tlp_color: str = None,
feed_tags: list = None,
limit: int = 40) -> list[dict]:
"""Retrieves indicators from the feed
Args:
client (Client): Client object with request
tlp_color (str): Traffic Light Protocol color
feed_tags (list): Tags to assign fetched indicators
limit (int): limit the results
Returns:
Indicators.
"""
iterator = client.get_threat_feed(feed_type)
indicators = []
if limit > 0:
iterator = iterator[:limit]

# extract values from iterator
for item in iterator:
attributes = item.get('attributes', {})
type_ = FeedIndicatorType.File
raw_data = {
'value': attributes,
'type': type_,
}

# Create indicator object for each value.
# The object consists of a dictionary with required and optional keys and values, as described blow.
indicator_obj = {
# The indicator value.
'value': attributes['sha256'],
# The indicator type as defined in Cortex XSOAR.
# One can use the FeedIndicatorType class under CommonServerPython to populate this field.
'type': type_,
# The name of the service supplying this feed.
'service': 'Google Threat Intelligence',
# A dictionary that maps values to existing indicator fields defined in Cortex XSOAR.
# One can use this section in order to map custom indicator fields previously defined
# in Cortex XSOAR to their values.
'fields': {
'md5': attributes.get('md5'),
'sha1': attributes.get('sha1'),
'sha256': attributes.get('sha256'),
},
# A dictionary of the raw data returned from the feed source about the indicator.
'rawJSON': raw_data,
'sha256': attributes['sha256'],
'fileType': attributes.get('type_description'),
}

if feed_tags:
indicator_obj['fields']['tags'] = feed_tags

if tlp_color:
indicator_obj['fields']['trafficlightprotocol'] = tlp_color

indicators.append(indicator_obj)

return indicators


def get_indicators_command(client: Client,
params: Dict[str, str],
args: Dict[str, str]) -> CommandResults:
"""Wrapper for retrieving indicators from the feed to the war-room.
Args:
client: Client object with request
params: demisto.params()
args: demisto.args()
Returns:
Outputs.
"""
feed_type = params.get('feed_type', 'apt')
limit = int(args.get('limit', params.get('limit', 40)))
tlp_color = params.get('tlp_color')
feed_tags = argToList(params.get('feedTags', ''))
indicators = fetch_indicators_command(client, feed_type, tlp_color, feed_tags, limit)

human_readable = tableToMarkdown(
f'Indicators from Google Threat Intelligence {FEED_STR.get(feed_type, feed_type)} Feeds:',
indicators,
headers=[
'sha256',
'fileType',
],
headerTransform=string_to_table_header,
removeNull=True,
)

return CommandResults(
readable_output=human_readable,
outputs_prefix='',
outputs_key_field='',
raw_response=indicators,
outputs={},
)


def reset_last_threat_feed():
"""Reset last threat feed from the integration context"""
demisto.setIntegrationContext({})
return CommandResults(readable_output='Fetch history deleted successfully')


def main():
"""main function, parses params and runs command functions"""
params = demisto.params()

# If your Client class inherits from BaseClient, SSL verification is
# handled out of the box by it, just pass ``verify_certificate`` to
# the Client constructor
insecure = not params.get('insecure', False)

# If your Client class inherits from BaseClient, system proxy is handled
# out of the box by it, just pass ``proxy`` to the Client constructor
proxy = params.get('proxy', False)

command = demisto.command()
args = demisto.args()

demisto.debug(f'Command being called is {command}')

try:
client = Client(
base_url='https://www.virustotal.com/api/v3/',
verify=insecure,
proxy=proxy,
headers={
'x-apikey': params['credentials']['password'],
'x-tool': 'CortexGTIFeeds',
}
)

if command == 'test-module':
# This is the call made when pressing the integration Test button.
return_results(test_module(client))

elif command == 'gti-feed-get-indicators':
# This is the command that fetches a limited number of indicators
# from the feed source and displays them in the war room.
return_results(get_indicators_command(client, params, args))

elif command == 'gti-feed-reset-fetch-indicators':
return_results(reset_last_threat_feed())

elif command == 'fetch-indicators':
# This is the command that initiates a request to the feed endpoint
# and create new indicators objects from the data fetched. If the
# integration instance is configured to fetch indicators, then this
# is the commandthat will be executed at the specified feed fetch
# interval.
feed_type = params.get('feed_type', 'apt')
tlp_color = params.get('tlp_color')
feed_tags = argToList(params.get('feedTags'))
limit = int(params.get('limit', 40))
indicators = fetch_indicators_command(client, feed_type, tlp_color, feed_tags, limit)
for iter_ in batch(indicators, batch_size=2000):
demisto.createIndicators(iter_)

else:
raise NotImplementedError(f'Command {command} is not implemented.')

# Log exceptions and return errors
except Exception as e:
demisto.error(traceback.format_exc()) # Print the traceback
return_error(f'Failed to execute {command} command.\nError:\n{str(e)}')


if __name__ in ['__main__', 'builtin', 'builtins']:
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
category: Data Enrichment & Threat Intelligence
commonfields:
id: Google Threat Intelligence Feeds
version: -1
configuration:
- display: API Key (leave empty. Fill in the API key in the password field.)
displaypassword: API Key
name: credentials
type: 9
required: true
hiddenusername: true
- display: Feed type
name: feed_type
defaultvalue: apt
type: 15
options:
- apt
- cve
- iot
- mobile
- ransomware
- display: Limit
name: limit
defaultvalue: 40
type: 0
additionalinfo: Limit of indicators to fetch from retrohunt job results.
required: false
- display: Fetch indicators
name: feed
defaultvalue: "true"
type: 8
required: false
- display: Indicator Reputation
name: feedReputation
defaultvalue: feedInstanceReputationNotSet
type: 18
options:
- None
- Good
- Suspicious
- Bad
additionalinfo: Indicators from this integration instance will be marked with this reputation.
required: false
- display: Source Reliability
name: feedReliability
defaultvalue: F - Reliability cannot be judged
type: 15
required: true
options:
- A - Completely reliable
- B - Usually reliable
- C - Fairly reliable
- D - Not usually reliable
- E - Unreliable
- F - Reliability cannot be judged
additionalinfo: Reliability of the source providing the intelligence data.
- display: ""
name: feedExpirationPolicy
defaultvalue: indicatorType
type: 17
options:
- never
- interval
- indicatorType
- suddenDeath
required: false
- display: ""
name: feedExpirationInterval
defaultvalue: "20160"
type: 1
required: false
- display: Feed Fetch Interval
name: feedFetchInterval
defaultvalue: "30"
type: 19
required: false
- display: Bypass exclusion list
name: feedBypassExclusionList
type: 8
additionalinfo: When selected, the exclusion list is ignored for indicators from this feed. This means that if an indicator from this feed is on the exclusion list, the indicator might still be added to the system.
required: false
- name: feedTags
display: Tags
type: 0
additionalinfo: Supports CSV values.
required: false
- name: tlp_color
display: Traffic Light Protocol Color
options:
- RED
- AMBER
- GREEN
- WHITE
type: 15
additionalinfo: The Traffic Light Protocol (TLP) designation to apply to indicators fetched from the feed.
required: false
- additionalinfo: Incremental feeds pull only new or modified indicators that have been sent from the integration. The determination if the indicator is new or modified happens on the 3rd-party vendor's side, so only indicators that are new or modified are sent to Cortex XSOAR. Therefore, all indicators coming from these feeds are labeled new or modified.
defaultvalue: 'true'
display: Incremental feed
hidden: true
name: feedIncremental
required: false
type: 8
description: Use this feed integration to fetch Google Threat Intelligence Feeds matches.
display: Google Threat Intelligence Feeds
name: Google Threat Intelligence Feeds
script:
commands:
- arguments:
- name: limit
defaultValue: "40"
description: The maximum number of results to return.
description: Gets the matches from the latest Feed.
name: gti-feed-get-indicators
- description: "This command will reset your fetch history."
name: gti-feed-reset-fetch-indicators
dockerimage: demisto/python3:3.10.13.84405
feed: true
runonce: false
script: "-"
subtype: python3
type: python
fromversion: 5.5.0
tests:
- No tests (auto formatted)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Authorization:
Your API key can be found in your Google Threat Intelligence account user menu.
Your API key carries all your privileges, so keep it secure and don't share it with anyone.

### Cortex XSOAR version 6.0.0 and below:
Fill anything in the "Username" fill box. Use you API key in the password fill box.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit a50361c

Please sign in to comment.