diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index 6bb09ba9..3ac64f85 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """Definition of modules for collecting data from GRR hosts.""" +from concurrent.futures import ThreadPoolExecutor import datetime import os import re @@ -376,6 +377,7 @@ def GetThreadPoolSize(self) -> int: """Thread pool size.""" return GRR_THREAD_POOL_SIZE + class GRRYaraScanner(GRRFlow): """GRR Yara scanner. @@ -980,64 +982,87 @@ def _DownloadResults(self, results.append(data_frame) return results + def _ProcessQuery( + self, + hostname: str, + client: Client, + osquery_container: containers.OsqueryQuery + ) -> None: + """Processes an osquery flow on a GRR client. + + Args: + hostname: the GRR client hostname. + client: the GRR Client. + osquery_container: the OSQuery. + """ + hunt_args = osquery_flows.OsqueryFlowArgs( + query=osquery_container.query, + timeout_millis=self.timeout_millis, + ignore_stderr_errors=self.ignore_stderr_errors) + + try: + flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args) + self._AwaitFlow(client, flow_id) + except DFTimewolfError as error: + self.ModuleError( + f'Error raised while launching/awaiting flow: {error.message}') + return + + name = osquery_container.name + description = osquery_container.description + query = osquery_container.query + flow_identifier = flow_id + client_identifier = client.client_id + + results = self._DownloadResults(client, flow_id) + if not results: + results_container = containers.OsqueryResult( + name=name, + description=description, + query=query, + hostname=hostname, + data_frame=pd.DataFrame(), + flow_identifier=flow_identifier, + client_identifier=client_identifier) + self.state.StoreContainer(results_container) + return + + for data_frame in results: + self.logger.info( + f'{str(flow_id)} ({hostname}): {len(data_frame)} rows collected') + + dataframe_container = containers.OsqueryResult( + name=name, + description=description, + query=query, + hostname=hostname, + data_frame=data_frame, + flow_identifier=flow_identifier, + client_identifier=client_identifier) + + self.state.StoreContainer(dataframe_container) + def Process(self, container: containers.Host) -> None: """Collect osquery results from a host with GRR. Raises: DFTimewolfError: if no artifacts specified nor resolved by platform. """ - for client in self._FindClients([container.hostname]): - osquery_containers = self.GetContainers(containers.OsqueryQuery) - - for osquery_container in osquery_containers: - hunt_args = osquery_flows.OsqueryFlowArgs( - query=osquery_container.query, - timeout_millis=self.timeout_millis, - ignore_stderr_errors=self.ignore_stderr_errors) - - try: - flow_id = self._LaunchFlow(client, 'OsqueryFlow', hunt_args) - self._AwaitFlow(client, flow_id) - except DFTimewolfError as error: - self.ModuleError( - f'Error raised while launching/awaiting flow: {error.message}') - continue + client = self._GetClientBySelector(container.hostname) - name = osquery_container.name - description = osquery_container.description - query = osquery_container.query - hostname = container.hostname - flow_identifier = flow_id - client_identifier = client.client_id - - results = self._DownloadResults(client, flow_id) - if not results: - results_container = containers.OsqueryResult( - name=name, - description=description, - query=query, - hostname=hostname, - data_frame=pd.DataFrame(), - flow_identifier=flow_identifier, - client_identifier=client_identifier) - self.StoreContainer(results_container) - continue + osquery_containers = self.state.GetContainers(containers.OsqueryQuery) - for data_frame in results: - self.logger.info( - f'{str(flow_id)} ({container.hostname}): {len(data_frame)} rows ' - 'collected') - - dataframe_container = containers.OsqueryResult( - name=name, - description=description, - query=query, - hostname=hostname, - data_frame=data_frame, - flow_identifier=flow_identifier, - client_identifier=client_identifier) + host_osquery_futures = [] + with ThreadPoolExecutor(self.GetQueryThreadPoolSize()) as executor: + for osquery_container in osquery_containers: + host_osquery_future = executor.submit( + self._ProcessQuery, container.hostname, client, osquery_container) + host_osquery_futures.append(host_osquery_future) - self.StoreContainer(dataframe_container) + for host_osquery_future in host_osquery_futures: + if host_osquery_future.exception(): + self.logger.error( + f'Error with osquery flow {str(host_osquery_future.exception())}') def PreProcess(self) -> None: """Not implemented.""" @@ -1067,7 +1092,7 @@ def PostProcess(self) -> None: continue hostname = container.hostname client_id = container.client_identifier - flow_id = container.client_identifier + flow_id = container.flow_identifier query = container.query output_file_path = os.path.join( @@ -1085,6 +1110,10 @@ def GetThreadOnContainerType(self) -> Type[interface.AttributeContainer]: """This module operates on Host containers.""" return containers.Host + def GetQueryThreadPoolSize(self) -> int: + """Get the number of osquery threads.""" + return 4 # Arbitrary + class GRRFlowCollector(GRRFlow): """Flow collector.