From 6a1e16b55fc01b439e9ae675c75bb54110c48244 Mon Sep 17 00:00:00 2001 From: sydp Date: Thu, 22 Dec 2022 22:09:29 +1100 Subject: [PATCH 1/4] Redone to use threadpoolexecutor --- dftimewolf/lib/collectors/grr_hosts.py | 122 +++++++++++++++---------- 1 file changed, 72 insertions(+), 50 deletions(-) diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index 4ae6ef97..349ad1a5 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """Definition of modules for collecting data from GRR hosts.""" +from concurrent.futures import ThreadPoolExecutor, Future import datetime import os import re @@ -980,64 +981,85 @@ def _DownloadResults(self, results.append(data_frame) return results + def _ProcessQuery( + self, + hostname: str, + osquery_container: containers.OsqueryQuery + ) -> None: + """Processes an osquery flow on a GRR client. + + Args: + hostname (str): the GRR client hostname. + osquery_container (containers.OsqueryQuery): the osquery. + """ + client = self._GetClientBySelector(hostname) + + hunt_args = osquery_flows.OsqueryFlowArgs( + query=osquery_container.query, + timeout_millis=self.timeout_millis, + ignore_stderr_errors=self.ignore_stderr_errors) + + try: + flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args) + self._AwaitFlow(client, flow_id) + except DFTimewolfError as error: + self.ModuleError( + f'Error raised while launching/awaiting flow: {error.message}') + return + + name = osquery_container.name + description = osquery_container.description + query = osquery_container.query + flow_identifier = flow_id + client_identifier = client.client_id + + results = self._DownloadResults(client, flow_id) + if not results: + results_container = containers.OsqueryResult( + name=name, + description=description, + query=query, + hostname=hostname, + data_frame=pd.DataFrame(), + flow_identifier=flow_identifier, + client_identifier=client_identifier) + self.state.StoreContainer(results_container) + return + + for data_frame in results: + self.logger.info( + f'{str(flow_id)} ({hostname}): {len(data_frame)} rows collected') + + dataframe_container = containers.OsqueryResult( + name=name, + description=description, + query=query, + hostname=hostname, + data_frame=data_frame, + flow_identifier=flow_identifier, + client_identifier=client_identifier) + + self.state.StoreContainer(dataframe_container) + def Process(self, container: containers.Host) -> None: """Collect osquery results from a host with GRR. Raises: DFTimewolfError: if no artifacts specified nor resolved by platform. """ - for client in self._FindClients([container.hostname]): - osquery_containers = self.state.GetContainers(containers.OsqueryQuery) + osquery_containers = self.state.GetContainers(containers.OsqueryQuery) + host_osquery_futures = [] + with ThreadPoolExecutor() as executor: for osquery_container in osquery_containers: - hunt_args = osquery_flows.OsqueryFlowArgs( - query=osquery_container.query, - timeout_millis=self.timeout_millis, - ignore_stderr_errors=self.ignore_stderr_errors) - - try: - flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args) - self._AwaitFlow(client, flow_id) - except DFTimewolfError as error: - self.ModuleError( - f'Error raised while launching/awaiting flow: {error.message}') - continue - - name = osquery_container.name - description = osquery_container.description - query = osquery_container.query - hostname = container.hostname - flow_identifier = flow_id - client_identifier = client.client_id - - results = self._DownloadResults(client, flow_id) - if not results: - results_container = containers.OsqueryResult( - name=name, - description=description, - query=query, - hostname=hostname, - data_frame=pd.DataFrame(), - flow_identifier=flow_identifier, - client_identifier=client_identifier) - self.state.StoreContainer(results_container) - continue - - for data_frame in results: - self.logger.info( - f'{str(flow_id)} ({container.hostname}): {len(data_frame)} rows ' - 'collected') - - dataframe_container = containers.OsqueryResult( - name=name, - description=description, - query=query, - hostname=hostname, - data_frame=data_frame, - flow_identifier=flow_identifier, - client_identifier=client_identifier) - - self.state.StoreContainer(dataframe_container) + host_osquery_future = executor.submit( + self._ProcessQuery, container.hostname, osquery_container) + host_osquery_futures.append(host_osquery_future) + + for host_osquery_future in host_osquery_futures: + if host_osquery_future.exception(): + raise DFTimewolfError('Error with osquery future') \ + from host_osquery_future.exception() def PreProcess(self) -> None: """Not implemented.""" From 15138b100d1c1096e15a95a62fb16953aa6fc0fc Mon Sep 17 00:00:00 2001 From: sydp Date: Thu, 22 Dec 2022 22:16:53 +1100 Subject: [PATCH 2/4] Unused import --- dftimewolf/lib/collectors/grr_hosts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index 349ad1a5..b8bf5615 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Definition of modules for collecting data from GRR hosts.""" -from concurrent.futures import ThreadPoolExecutor, Future +from concurrent.futures import ThreadPoolExecutor import datetime import os import re From 69fed9372232b7fa89d48fb51a362ab24f44838c Mon Sep 17 00:00:00 2001 From: sydp Date: Wed, 28 Dec 2022 07:05:48 +1100 Subject: [PATCH 3/4] Updates per review --- dftimewolf/lib/collectors/grr_hosts.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index b8bf5615..1e2b3c16 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -377,6 +377,7 @@ def GetThreadPoolSize(self) -> int: """Thread pool size.""" return GRR_THREAD_POOL_SIZE + class GRRYaraScanner(GRRFlow): """GRR Yara scanner. @@ -984,16 +985,16 @@ def _DownloadResults(self, def _ProcessQuery( self, hostname: str, + client: Client, osquery_container: containers.OsqueryQuery ) -> None: """Processes an osquery flow on a GRR client. Args: hostname (str): the GRR client hostname. + client (Client): the Grr Client. osquery_container (containers.OsqueryQuery): the osquery. """ - client = self._GetClientBySelector(hostname) - hunt_args = osquery_flows.OsqueryFlowArgs( query=osquery_container.query, timeout_millis=self.timeout_millis, @@ -1047,19 +1048,21 @@ def Process(self, container: containers.Host) -> None: Raises: DFTimewolfError: if no artifacts specified nor resolved by platform. """ + client = self._GetClientBySelector(container.hostname) + osquery_containers = self.state.GetContainers(containers.OsqueryQuery) host_osquery_futures = [] - with ThreadPoolExecutor() as executor: + with ThreadPoolExecutor(self.GetQueryThreadPoolSize()) as executor: for osquery_container in osquery_containers: host_osquery_future = executor.submit( - self._ProcessQuery, container.hostname, osquery_container) + self._ProcessQuery, container.hostname, client, osquery_container) host_osquery_futures.append(host_osquery_future) for host_osquery_future in host_osquery_futures: if host_osquery_future.exception(): - raise DFTimewolfError('Error with osquery future') \ - from host_osquery_future.exception() + self.logger.error( + f'Error with osquery flow {str(host_osquery_future.exception())}') def PreProcess(self) -> None: """Not implemented.""" @@ -1089,7 +1092,7 @@ def PostProcess(self) -> None: continue hostname = container.hostname client_id = container.client_identifier - flow_id = container.client_identifier + flow_id = container.flow_identifier query = container.query output_file_path = os.path.join( @@ -1107,6 +1110,10 @@ def GetThreadOnContainerType(self) -> Type[interface.AttributeContainer]: """This module operates on Host containers.""" return containers.Host + def GetQueryThreadPoolSize(self) -> int: + """Get the number of osquery threads.""" + return 4 # Arbitrary + class GRRFlowCollector(GRRFlow): """Flow collector. From 92b3d52e097784c949e0eac8305738965d4c34ef Mon Sep 17 00:00:00 2001 From: sydp Date: Thu, 29 Dec 2022 11:52:40 +1100 Subject: [PATCH 4/4] Update dftimewolf/lib/collectors/grr_hosts.py Co-authored-by: Thomas Chopitea --- dftimewolf/lib/collectors/grr_hosts.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index 1e2b3c16..0696b6eb 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -991,9 +991,9 @@ def _ProcessQuery( """Processes an osquery flow on a GRR client. Args: - hostname (str): the GRR client hostname. - client (Client): the Grr Client. - osquery_container (containers.OsqueryQuery): the osquery. + hostname: the GRR client hostname. + client: the GRR Client. + osquery_container: the OSQuery. """ hunt_args = osquery_flows.OsqueryFlowArgs( query=osquery_container.query,