Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GRROsqueryCollector to use threadpoolexecutor #696

Merged
merged 5 commits into from
Jan 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 79 additions & 50 deletions dftimewolf/lib/collectors/grr_hosts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Definition of modules for collecting data from GRR hosts."""

from concurrent.futures import ThreadPoolExecutor
import datetime
import os
import re
Expand Down Expand Up @@ -376,6 +377,7 @@ def GetThreadPoolSize(self) -> int:
"""Thread pool size."""
return GRR_THREAD_POOL_SIZE


class GRRYaraScanner(GRRFlow):
"""GRR Yara scanner.

Expand Down Expand Up @@ -980,64 +982,87 @@ def _DownloadResults(self,
results.append(data_frame)
return results

def _ProcessQuery(
self,
hostname: str,
client: Client,
osquery_container: containers.OsqueryQuery
) -> None:
"""Processes an osquery flow on a GRR client.

Args:
hostname: the GRR client hostname.
client: the GRR Client.
osquery_container: the OSQuery.
"""
hunt_args = osquery_flows.OsqueryFlowArgs(
query=osquery_container.query,
timeout_millis=self.timeout_millis,
ignore_stderr_errors=self.ignore_stderr_errors)

try:
flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args)
self._AwaitFlow(client, flow_id)
except DFTimewolfError as error:
self.ModuleError(
f'Error raised while launching/awaiting flow: {error.message}')
return
sydp marked this conversation as resolved.
Show resolved Hide resolved

name = osquery_container.name
description = osquery_container.description
query = osquery_container.query
flow_identifier = flow_id
client_identifier = client.client_id

results = self._DownloadResults(client, flow_id)
if not results:
results_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=pd.DataFrame(),
flow_identifier=flow_identifier,
client_identifier=client_identifier)
self.state.StoreContainer(results_container)
Comment on lines +1019 to +1027
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point in storing an empty dataframe here? Wouldn't it be better just to not store any container?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have it as an empty dataframe as the corresponding container attribute is currently not optional. It also simplifies the logic in downstream processing of the container.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question was more "why add a container at all" if the dataframe is going to be empty anyways.

Copy link
Collaborator Author

@sydp sydp Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops my bad, missed the second question. My rationale doing it this way was no result (i.e. empty data) is still a result and is useful feedback downstream to let the module/user know that the query was successful and there was no result.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that makes sense, thanks!

return

for data_frame in results:
self.logger.info(
f'{str(flow_id)} ({hostname}): {len(data_frame)} rows collected')

dataframe_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=data_frame,
flow_identifier=flow_identifier,
client_identifier=client_identifier)

self.state.StoreContainer(dataframe_container)

def Process(self, container: containers.Host) -> None:
"""Collect osquery results from a host with GRR.

Raises:
DFTimewolfError: if no artifacts specified nor resolved by platform.
"""
for client in self._FindClients([container.hostname]):
osquery_containers = self.GetContainers(containers.OsqueryQuery)

for osquery_container in osquery_containers:
hunt_args = osquery_flows.OsqueryFlowArgs(
query=osquery_container.query,
timeout_millis=self.timeout_millis,
ignore_stderr_errors=self.ignore_stderr_errors)

try:
flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args)
self._AwaitFlow(client, flow_id)
except DFTimewolfError as error:
self.ModuleError(
f'Error raised while launching/awaiting flow: {error.message}')
continue
client = self._GetClientBySelector(container.hostname)

name = osquery_container.name
description = osquery_container.description
query = osquery_container.query
hostname = container.hostname
flow_identifier = flow_id
client_identifier = client.client_id

results = self._DownloadResults(client, flow_id)
if not results:
results_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=pd.DataFrame(),
flow_identifier=flow_identifier,
client_identifier=client_identifier)
self.StoreContainer(results_container)
continue
osquery_containers = self.state.GetContainers(containers.OsqueryQuery)

for data_frame in results:
self.logger.info(
f'{str(flow_id)} ({container.hostname}): {len(data_frame)} rows '
'collected')

dataframe_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=data_frame,
flow_identifier=flow_identifier,
client_identifier=client_identifier)
host_osquery_futures = []
with ThreadPoolExecutor(self.GetQueryThreadPoolSize()) as executor:
for osquery_container in osquery_containers:
host_osquery_future = executor.submit(
self._ProcessQuery, container.hostname, client, osquery_container)
host_osquery_futures.append(host_osquery_future)

self.StoreContainer(dataframe_container)
for host_osquery_future in host_osquery_futures:
if host_osquery_future.exception():
sydp marked this conversation as resolved.
Show resolved Hide resolved
self.logger.error(
f'Error with osquery flow {str(host_osquery_future.exception())}')

def PreProcess(self) -> None:
"""Not implemented."""
Expand Down Expand Up @@ -1067,7 +1092,7 @@ def PostProcess(self) -> None:
continue
hostname = container.hostname
client_id = container.client_identifier
flow_id = container.client_identifier
flow_id = container.flow_identifier
query = container.query

output_file_path = os.path.join(
Expand All @@ -1085,6 +1110,10 @@ def GetThreadOnContainerType(self) -> Type[interface.AttributeContainer]:
"""This module operates on Host containers."""
return containers.Host

def GetQueryThreadPoolSize(self) -> int:
"""Get the number of osquery threads."""
return 4 # Arbitrary


class GRRFlowCollector(GRRFlow):
"""Flow collector.
Expand Down