From 857bc350339045fe8419f2ce43de93dc8856e118 Mon Sep 17 00:00:00 2001 From: Ramo Date: Thu, 4 Jul 2024 13:02:00 +1000 Subject: [PATCH] Refactor GRR file downloads (#891) * GRR module updates * More updates * Linter appeasement * linter appeasement * Linter appeasement * Linter appeasement * Unittest fix --- dftimewolf/lib/collectors/grr_hosts.py | 175 ++++---------- poetry.lock | 15 +- tests/lib/collectors/grr_hosts.py | 219 ++++++++---------- .../collectors/test_data/mock_grr_hosts.py | 46 +++- 4 files changed, 192 insertions(+), 263 deletions(-) diff --git a/dftimewolf/lib/collectors/grr_hosts.py b/dftimewolf/lib/collectors/grr_hosts.py index dd0622fb4..fb403670b 100644 --- a/dftimewolf/lib/collectors/grr_hosts.py +++ b/dftimewolf/lib/collectors/grr_hosts.py @@ -3,10 +3,9 @@ import datetime import os +import pathlib import re -import tempfile -import time -import zipfile +import stat from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Tuple, Type @@ -245,61 +244,6 @@ def _LaunchFlow(self, client: Client, name: str, args: str) -> str: return flow_id - # TODO: change object to more specific GRR type information. - def _AwaitFlow(self, client: Client, flow_id: str) -> None: - """Waits for a specific GRR flow to complete. - - Args: - client (object): GRR Client object in which to await the flow. - flow_id (str): GRR identifier of the flow to await. - - Raises: - DFTimewolfError: If a Flow error was encountered. - """ - self.logger.info(f'{flow_id:s}: Waiting to finish') - if self.skip_offline_clients: - self.logger.debug("Client will be skipped if offline.") - - while True: - try: - status = client.Flow(flow_id).Get().data - except grr_errors.UnknownError: - msg = (f'Unknown error retrieving flow {flow_id} for host ' - f'{client.data.os_info.fqdn.lower()}') - self.ModuleError(msg, critical=True) - - if status.state == flows_pb2.FlowContext.ERROR: - # TODO(jbn): If one artifact fails, what happens? Test. - message = status.context.backtrace - if 'ArtifactNotRegisteredError' in status.context.backtrace: - message = status.context.backtrace.split('\n')[-2] - self.ModuleError( - f'{flow_id:s}: FAILED! Message from GRR:\n{message:s}', - critical=True) - - if status.state == 4: # Flow crashed, no enum in flows_pb2 - self.ModuleError(f'{flow_id:s}: Crashed', critical=False) - break - - if status.state == flows_pb2.FlowContext.TERMINATED: - self.logger.info(f'{flow_id:s}: Complete') - break - - time.sleep(self._CHECK_FLOW_INTERVAL_SEC) - if not self.skip_offline_clients: - continue - - client_last_seen = datetime.datetime.fromtimestamp( - client.data.last_seen_at / 1000000, datetime.timezone.utc) - now = datetime.datetime.now(datetime.timezone.utc) - if (now - client_last_seen).total_seconds() > self._MAX_OFFLINE_TIME_SEC: - self.logger.warning( - 'Client {0:s} has been offline for more than {1:.1f} minutes' - ', skipping...'.format( - client.client_id, self._MAX_OFFLINE_TIME_SEC / 60)) - self._skipped_flows.append((client.client_id, flow_id)) - break - def _CheckSkippedFlows(self) -> None: if not self._skipped_flows: return @@ -317,42 +261,35 @@ def _CheckSkippedFlows(self) -> None: client_id, flow_id, self.reason )) - def _DownloadArchive( - self, grr_flow: Client.Flow, flow_output_dir: str - ) -> None: - """Request an archive of files from GRR, download and extract it. - - This does not work on larger files, use _DownloadBlobs instead. - - Args: - grr_flow: GRR Flow object to download files from. - flow_output_dir: Directory to extract the downloaded files. - """ - output_file_path = os.path.join(self.output_path, f"{grr_flow.flow_id}.zip") - file_archive = grr_flow.GetFilesArchive() - file_archive.WriteToFile(output_file_path) - with zipfile.ZipFile(output_file_path) as archive: - archive.extractall(path=flow_output_dir) - os.remove(output_file_path) - def _DownloadBlobs( self, client: Client, - pathspecs: List["jobs_pb2.PathSpec"], + payloads: List["jobs_pb2.PathSpec"], flow_output_dir: str, ) -> None: - """Download an individual collected file from GRR to the local filesystem. + """Download individual collected files from GRR to the local filesystem. Args: client: GRR Client object to download blobs from. - pathspecs: List of pathspecs to download blobs from. + payloads: List of pathspecs to download blobs from. flow_output_dir: Directory to store the downloaded files. """ - for pathspec in pathspecs: - if pathspec.nested_path.pathtype == pathspec.nested_path.NTFS: - vfspath = f"fs/ntfs{pathspec.path}{pathspec.nested_path.path}" + for payload in payloads: + if hasattr(payload, 'stat'): + stats = payload.stat + elif hasattr(payload, 'stat_entry'): + stats = payload.stat_entry + else: + raise RuntimeError('Unsupported file collection attempted') + if stat.S_ISDIR(stats.st_mode): + continue + if (stats.pathspec.nested_path.pathtype == + jobs_pb2.PathSpec.NTFS): + vfspath = ( + f"fs/ntfs{stats.pathspec.path}{stats.pathspec.nested_path.path}") else: - vfspath = re.sub("^([a-zA-Z]:)?/(.*)$", "fs/os/\\1/\\2", pathspec.path) + vfspath = re.sub("^([a-zA-Z]:)?/(.*)$", "fs/os/\\1/\\2", + stats.pathspec.path) filename = os.path.basename(vfspath) base_dir = os.path.join(flow_output_dir, os.path.dirname(vfspath)) os.makedirs(base_dir, exist_ok=True) @@ -360,8 +297,13 @@ def _DownloadBlobs( f = client.File(vfspath) self.logger.debug(f"Downloading blob {filename} from {vfspath}") try: - with open(os.path.join(base_dir, filename), "wb") as out: - f.GetBlobWithOffset(0).WriteToStream(out) + path = os.path.join(base_dir, filename) + if stats.st_size: + with open(path, "wb") as out: + self.logger.debug(f'File: {filename}') + f.GetBlob().WriteToStream(out) + else: + pathlib.Path(path).touch() except grr_errors.ResourceNotFoundError as e: self.logger.warning( f"Failed to download blob {filename} from {vfspath}: {e}" @@ -405,51 +347,22 @@ def _DownloadFiles(self, client: Client, flow_id: str) -> Optional[str]: Returns: str: path containing the downloaded files. """ - grr_flow = client.Flow(flow_id) + flow_handle = client.Flow(flow_id).Get() fqdn = client.data.os_info.fqdn.lower() flow_output_dir = os.path.join(self.output_path, fqdn, flow_id) os.makedirs(flow_output_dir, exist_ok=True) - flow_name = grr_flow.Get().data.name + flow_name = flow_handle.data.name if flow_name == "TimelineFlow": self.logger.debug("Downloading timeline from GRR") - self._DownloadTimeline(client, grr_flow, flow_output_dir) + self._DownloadTimeline(client, flow_handle, flow_output_dir) return flow_output_dir - results = grr_flow.ListResults() - pathspecs = [] - large_files = [] - collect_browser_flow = False - for result in results: - stat_entry = result.payload - if flow_name == "CollectBrowserHistory": - stat_entry = result.payload.stat_entry - collect_browser_flow = True - if stat_entry.st_size > self._LARGE_FILE_SIZE_THRESHOLD: - size_gb = stat_entry.st_size / 1024 / 1024 / 1024 - self.logger.warning( - "Large file detected:" - f" {stat_entry.pathspec.path} ({size_gb:.2f} GB)" - ) - large_files.append(size_gb) - pathspecs.append(stat_entry.pathspec) - - if large_files: - self.logger.warning( - f'Large files detected ({", ".join(large_files)} GB), downloading' - ' blobs instead of archive.' - ) - self._DownloadBlobs(client, pathspecs, flow_output_dir) - elif collect_browser_flow: - self.logger.debug( - "CollectBrowserHistory flow detected, downloading blobs instead of" - " archive..." - ) - self._DownloadBlobs(client, pathspecs, flow_output_dir) - else: - self.logger.debug("Downloading file archive from GRR") - self._DownloadArchive(grr_flow, flow_output_dir) + payloads = [] + for r in flow_handle.ListResults(): + payloads.append(r.payload) + self._DownloadBlobs(client, payloads, flow_output_dir) return flow_output_dir @@ -617,10 +530,10 @@ def Process(self, container: containers.Host self.logger.info( f'Launched flow {flow_id} on {client.client_id} ({grr_hostname})') - self._AwaitFlow(client, flow_id) + grr_flow = client.Flow(flow_id) + grr_flow.WaitUntilDone() - # Get latest flow data from GRR server. - grr_flow = client.Flow(flow_id).Get() + grr_flow = grr_flow.Get() results = list(grr_flow.ListResults()) yara_hits_df = self._YaraHitsToDataFrame(client, results) @@ -870,7 +783,8 @@ def Process(self, container: containers.Host msg = f'Flow could not be launched on {client.client_id:s}.' msg += f'\nArtifactCollectorFlow args: {flow_args!s}' self.ModuleError(msg, critical=True) - self._AwaitFlow(client, flow_id) + client.Flow(flow_id).WaitUntilDone() + collected_flow_data = self._DownloadFiles(client, flow_id) if collected_flow_data: @@ -1001,7 +915,7 @@ def Process(self, container: containers.Host pathtype=path_type, action=flow_action) flow_id = self._LaunchFlow(client, 'FileFinder', flow_args) - self._AwaitFlow(client, flow_id) + client.Flow(flow_id).WaitUntilDone() collected_flow_data = self._DownloadFiles(client, flow_id) if collected_flow_data: self.PublishMessage(f'{flow_id}: Downloaded: {collected_flow_data}') @@ -1165,7 +1079,7 @@ def _ProcessQuery( try: flow_id = self._LaunchFlow(client, 'OsqueryFlow', flow_args) - self._AwaitFlow(client, flow_id) + client.Flow(flow_id).WaitUntilDone() except DFTimewolfError as error: self.ModuleError( f'Error raised while launching/awaiting flow: {error.message}') @@ -1356,7 +1270,7 @@ def Process(self, container: containers.GrrFlow # We don't need clients to be online to grab the flows. client = self._GetClientBySelector( container.hostname, discard_inactive=False) - self._AwaitFlow(client, container.flow_id) + client.Flow(container.flow_id).WaitUntilDone() self._CheckSkippedFlows() collected_flow_data = self._DownloadFiles(client, container.flow_id) if collected_flow_data: @@ -1471,10 +1385,9 @@ def Process(self, container: containers.Host timeline_args = timeline_pb2.TimelineArgs(root=root_path,) flow_id = self._LaunchFlow(client, 'TimelineFlow', timeline_args) - self._AwaitFlow(client, flow_id) - temp_directory = tempfile.mkdtemp() + client.Flow(flow_id).WaitUntilDone() collected_timeline = self._DownloadTimeline( - client, client.Flow(flow_id), temp_directory + client, client.Flow(flow_id), self.output_path ) self.PublishMessage(f"{flow_id}: Downloaded: {collected_timeline}") cont = containers.File( diff --git a/poetry.lock b/poetry.lock index 968791237..8e6bd6d8d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -164,13 +164,13 @@ doc = ["docutils", "jinja2", "myst-parser", "numpydoc", "pillow (>=9,<10)", "pyd [[package]] name = "astroid" -version = "3.2.0" +version = "3.2.2" description = "An abstract syntax tree for Python with inference support." optional = false python-versions = ">=3.8.0" files = [ - {file = "astroid-3.2.0-py3-none-any.whl", hash = "sha256:16ee8ca5c75ac828783028cc1f967777f0e507c6886a295ad143e0f405b975a2"}, - {file = "astroid-3.2.0.tar.gz", hash = "sha256:f7f829f8506ade59f1b3c6c93d8fac5b1ebc721685fa9af23e9794daf1d450a3"}, + {file = "astroid-3.2.2-py3-none-any.whl", hash = "sha256:e8a0083b4bb28fcffb6207a3bfc9e5d0a68be951dd7e336d5dcf639c682388c0"}, + {file = "astroid-3.2.2.tar.gz", hash = "sha256:8ead48e31b92b2e217b6c9733a21afafe479d52d6e164dd25fb1a770c7c3cf94"}, ] [[package]] @@ -2602,17 +2602,17 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] [[package]] name = "pylint" -version = "3.2.0" +version = "3.2.5" description = "python code static checker" optional = false python-versions = ">=3.8.0" files = [ - {file = "pylint-3.2.0-py3-none-any.whl", hash = "sha256:9f20c05398520474dac03d7abb21ab93181f91d4c110e1e0b32bc0d016c34fa4"}, - {file = "pylint-3.2.0.tar.gz", hash = "sha256:ad8baf17c8ea5502f23ae38d7c1b7ec78bd865ce34af9a0b986282e2611a8ff2"}, + {file = "pylint-3.2.5-py3-none-any.whl", hash = "sha256:32cd6c042b5004b8e857d727708720c54a676d1e22917cf1a2df9b4d4868abd6"}, + {file = "pylint-3.2.5.tar.gz", hash = "sha256:e9b7171e242dcc6ebd0aaa7540481d1a72860748a0a7816b8fe6cf6c80a6fe7e"}, ] [package.dependencies] -astroid = ">=3.2.0,<=3.3.0-dev0" +astroid = ">=3.2.2,<=3.3.0-dev0" colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""} dill = [ {version = ">=0.3.7", markers = "python_version >= \"3.12\""}, @@ -2817,7 +2817,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/tests/lib/collectors/grr_hosts.py b/tests/lib/collectors/grr_hosts.py index 0dd8c19fa..c6e4ea52c 100644 --- a/tests/lib/collectors/grr_hosts.py +++ b/tests/lib/collectors/grr_hosts.py @@ -4,14 +4,21 @@ import unittest +import os +import tempfile +from typing import IO + import mock -import six import pandas as pd from grr_api_client import errors as grr_errors +from grr_api_client import client +from grr_response_proto.api import client_pb2 from grr_response_proto import flows_pb2 from grr_response_proto import jobs_pb2 from grr_response_proto import osquery_pb2 +from grr_response_proto import timeline_pb2 +from google.protobuf import text_format from tests.lib.collectors.test_data import mock_grr_hosts from dftimewolf import config @@ -19,7 +26,17 @@ from dftimewolf.lib import errors from dftimewolf.lib.collectors import grr_hosts from dftimewolf.lib.containers import containers -from dftimewolf.lib.errors import DFTimewolfError + + +def _MOCK_WRITE_TO_STREAM(out: IO[bytes]): + for _ in range(1024): + out.write(b'\0') + + +def _MOCK_WRITE_TO_FILE(path: str): + with open(path, 'wb') as fp: + for _ in range(1024): + fp.write(b'\0') # Extensive access to protected members for testing, and mocking of classes. @@ -59,17 +76,19 @@ def testGetClientBySelector(self): """Tests that GetClientBySelector fetches the most recent GRR client.""" self.mock_grr_api.SearchClients.return_value = \ mock_grr_hosts.MOCK_CLIENT_LIST - client = self.grr_flow_module._GetClientBySelector('C.0000000000000001') + client_handle = self.grr_flow_module._GetClientBySelector( + 'C.0000000000000001') self.mock_grr_api.SearchClients.assert_called_with('C.0000000000000001') - self.assertEqual(client.data.client_id, 'C.0000000000000001') + self.assertEqual(client_handle.data.client_id, 'C.0000000000000001') def testGetClientByUsername(self): """Tests that GetClientBySelector fetches the correct GRR client.""" self.mock_grr_api.SearchClients.return_value = \ mock_grr_hosts.MOCK_CLIENT_LIST - client = self.grr_flow_module._GetClientBySelector('tomchop_username2') + client_handle = self.grr_flow_module._GetClientBySelector( + 'tomchop_username2') self.mock_grr_api.SearchClients.assert_called_with('tomchop_username2') - self.assertEqual(client.data.client_id, 'C.0000000000000001') + self.assertEqual(client_handle.data.client_id, 'C.0000000000000001') def testGetClientBySelectorError(self): """Tests that GetClientBySelector fetches the most recent GRR client.""" @@ -89,85 +108,39 @@ def testLaunchFlow(self, mock_CreateFlow): self.assertEqual(flow_id, 'F:12345') mock_CreateFlow.assert_called_once_with(name="FlowName", args="FlowArgs") - @mock.patch('grr_api_client.flow.FlowRef.Get') - def testAwaitFlow(self, mock_FlowGet): - """Test that no errors are generated when GRR flow succeeds.""" - mock_FlowGet.return_value = mock_grr_hosts.MOCK_FLOW - self.grr_flow_module._AwaitFlow(mock_grr_hosts.MOCK_CLIENT, "F:12345") - mock_FlowGet.assert_called_once() - self.assertEqual(self.test_state.errors, []) - - @mock.patch('grr_api_client.flow.FlowRef.Get') - def testAwaitFlowOffline(self, mock_FlowGet): - """Test that flows on offline hosts will be abandoned.""" - mock_FlowGet.return_value = mock_grr_hosts.MOCK_FLOW_RUNNING - mock_FlowGet.return_value.state = 0 - self.grr_flow_module.skip_offline_clients = True - self.grr_flow_module._AwaitFlow(mock_grr_hosts.MOCK_CLIENT, "F:12345") - mock_FlowGet.assert_called_once() - self.assertEqual(self.test_state.errors, []) - self.assertEqual( - self.grr_flow_module._skipped_flows, - [('C.0000000000000000', 'F:12345')]) - self.grr_flow_module.skip_offline_clients = False - - @mock.patch('grr_api_client.flow.FlowRef.Get') - def testAwaitFlowError(self, mock_FlowGet): - """Test that an exception is raised when flow has an ERROR status.""" - mock_FlowGet.return_value = mock_grr_hosts.MOCK_FLOW_ERROR - error_msg = 'F:12345: FAILED! Message from GRR:' - with six.assertRaisesRegex(self, DFTimewolfError, error_msg): - self.grr_flow_module._AwaitFlow(mock_grr_hosts.MOCK_CLIENT, "F:12345") - - @mock.patch('grr_api_client.flow.FlowRef.Get') - def testAwaitFlowGRRError(self, mock_FlowGet): - """"Test that an exception is raised if the GRR API raises an error.""" - mock_FlowGet.side_effect = grr_errors.UnknownError - error_msg = 'Unknown error retrieving flow F:12345 for host' - with six.assertRaisesRegex(self, DFTimewolfError, error_msg): - self.grr_flow_module._AwaitFlow(mock_grr_hosts.MOCK_CLIENT, "F:12345") - - @mock.patch("os.remove") - @mock.patch('os.makedirs') - @mock.patch('zipfile.ZipFile') - @mock.patch('grr_api_client.flow.FlowBase.GetFilesArchive') @mock.patch('grr_api_client.flow.FlowBase.Get') - @mock.patch('grr_api_client.flow.FlowBase.ListResults') - def testDownloadArtifactFilesForFlow( - self, - unused_mock_ListResults, - mock_Get, - mock_GetFilesArchive, - mock_ZipFile, - mock_makedirs, - mock_remove, - ): - """Test if results are downloaded & unzipped in the correct directories.""" - # Change output_path to something constant so we can easily assert - # if calls were done correctly. - self.grr_flow_module.output_path = "/tmp/random" - mock_Get.return_value.data.name = 'ArtifactFlow' - - return_value = self.grr_flow_module._DownloadFiles( - mock_grr_hosts.MOCK_CLIENT, 'F:12345') - self.assertEqual(return_value, '/tmp/random/tomchop/F:12345') - mock_GetFilesArchive.assert_called_once() - mock_ZipFile.assert_called_once_with("/tmp/random/F:12345.zip") - mock_makedirs.assert_called_once_with( - "/tmp/random/tomchop/F:12345", exist_ok=True - ) - mock_remove.assert_called_once_with('/tmp/random/F:12345.zip') + def testDownloadFilesForFlow(self, mock_Get): + """Test if results are downloaded to the correct directories.""" + mock_Get.return_value.data.name = 'ClientFileFinder' + mock_Get.return_value.ListResults.return_value = ( + mock_grr_hosts.MOCK_CFF_RESULTS) + + mock_client = client.Client( + data=text_format.Parse(mock_grr_hosts.client_proto1, + client_pb2.ApiClient()), + context=True) + mock_client.File = mock.MagicMock() + mock_client.File.return_value.GetBlob.return_value.WriteToStream = ( + _MOCK_WRITE_TO_STREAM) + + with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as local_path: + self.grr_flow_module.output_path = local_path + + self.grr_flow_module._DownloadFiles(mock_client, 'F:12345') + + self.assertTrue( + os.path.exists( + os.path.join(local_path, mock_client.data.os_info.fqdn.lower(), + 'F:12345', 'fs', 'os', 'directory', 'file'))) @mock.patch("os.remove") @mock.patch('os.makedirs') @mock.patch("zipfile.ZipFile") @mock.patch('grr_api_client.flow.FlowBase.GetFilesArchive') - @mock.patch('grr_api_client.flow.FlowBase.GetCollectedTimelineBody') @mock.patch('grr_api_client.flow.FlowBase.Get') def testDownloadTimelineBodyForFlow( self, mock_Get, - mock_GetCollectedTimelineBody, mock_GetFilesArchive, mock_ZipFile, mock_makedirs, @@ -178,11 +151,12 @@ def testDownloadTimelineBodyForFlow( # if calls were done correctly. self.grr_flow_module.output_path = "/tmp/random" mock_Get.return_value.data.name = 'TimelineFlow' + mock_Get.return_value.GetCollectedTimelineBody = mock.MagicMock() return_value = self.grr_flow_module._DownloadFiles( mock_grr_hosts.MOCK_CLIENT, 'F:12345') self.assertEqual(return_value, '/tmp/random/tomchop/F:12345') - mock_GetCollectedTimelineBody.assert_called_once() + mock_Get.return_value.GetCollectedTimelineBody.assert_called_once() mock_GetFilesArchive.assert_not_called() mock_ZipFile.assert_not_called() mock_makedirs.assert_called_once_with( @@ -408,7 +382,7 @@ def testProcess(self, mock_CreateFlow, mock_Get, mock_DownloadFiles): self.assertEqual(result.path, '/tmp/tmpRandom/tomchop') @mock.patch('grr_api_client.api.InitHttp') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._FindClients') @@ -417,7 +391,7 @@ def testProcessFromHostContainers( mock_FindClients, unused_LaunchFlow, unused_DownloadFiles, - unused_AwaitFlow, + unused_WaitUntilDone, mock_InitHttp, ): """Tests that processing works when only containers are passed.""" @@ -451,7 +425,7 @@ def testProcessFromHostContainers( mock_FindClients.assert_called_with(['container.host']) @mock.patch("grr_api_client.api.InitHttp") - @mock.patch("dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow") + @mock.patch("grr_api_client.flow.FlowBase.WaitUntilDone") @mock.patch("dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles") @mock.patch("dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow") @mock.patch("dftimewolf.lib.collectors.grr_hosts.GRRFlow._FindClients") @@ -460,7 +434,7 @@ def testProcessFromArtifactContainers( mock_FindClients, mock_LaunchFlow, mock_DownloadFiles, - unused_AwaitFlow, + unused_WaitUntilDone, mock_InitHttp, ): """Tests that processing works when only containers are passed.""" @@ -558,7 +532,7 @@ def testPreProcess(self): self.assertEqual( self.grr_file_collector.files, ['/etc/passwd', '/etc/hosts']) - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') def testProcess(self, mock_LaunchFlow, mock_DownloadFiles, _): @@ -592,7 +566,7 @@ def testProcess(self, mock_LaunchFlow, mock_DownloadFiles, _): self.assertEqual(results[0].name, 'tomchop') self.assertEqual(results[0].path, '/tmp/something') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('grr_api_client.api.InitHttp') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @@ -600,7 +574,7 @@ def testWindowsProcess(self, mock_LaunchFlow, mock_DownloadFiles, mock_InitHttp, - unused_await): + unused_waituntildone): """Tests that processing launches appropriate flows.""" self.mock_grr_api = mock.Mock() mock_InitHttp.return_value = self.mock_grr_api @@ -652,7 +626,7 @@ def testWindowsProcess(self, self.assertEqual(results[0].name, 'tomchop') self.assertEqual(results[0].path, '/tmp/something') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @mock.patch('grr_api_client.api.InitHttp') @@ -662,7 +636,7 @@ def testProcessFromContainers(self, mock_InitHttp, unused_LaunchFlow, unused_DownloadFiles, - unused_AwaitFlow + unused_waituntildonw ): """Tests that processing works when only containers are passed.""" mock_InitHttp.return_value = self.mock_grr_api @@ -725,7 +699,7 @@ def setUp(self, mock_InitHttp, mock_list_flows, unused_mock_flow_get): ) @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') def testProcess(self, _, mock_DownloadFiles): """Tests that the collector can be initialized.""" self.mock_grr_api.SearchClients.return_value = \ @@ -783,7 +757,7 @@ def testPreProcessNoFlows( @mock.patch('grr_api_client.client.Client.ListFlows') @mock.patch('grr_api_client.api.InitHttp') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') def testProcessNoFlowData(self, _, mock_DLFiles, @@ -864,40 +838,41 @@ def testInitialization(self): self.assertEqual(self.grr_timeline_collector.root_path, b'/') self.assertEqual(self.grr_timeline_collector._timeline_format, 1) - @mock.patch('dftimewolf.lib.collectors.grr_hosts.' - 'GRRTimelineCollector._DownloadTimeline') - # mock grr_api_client.flow.FlowBase.GetCollectedTimeline instead once when it - # becomes available in pypi - @mock.patch('grr_api_client.flow.FlowBase.Get') - @mock.patch('grr_api_client.client.ClientBase.CreateFlow') - def testProcess(self, mock_CreateFlow, mock_Get, mock_DownloadTimeline): - """Tests that the collector can be initialized.""" - mock_CreateFlow.return_value = mock_grr_hosts.MOCK_FLOW - self.mock_grr_api.SearchClients.return_value = \ - mock_grr_hosts.MOCK_CLIENT_LIST - mock_DownloadTimeline.return_value = '/tmp/something' - mock_Get.return_value = mock_grr_hosts.MOCK_FLOW + def testProcess(self): + """Tests the Process method of GRRTimelineCollector.""" + self.grr_timeline_collector._GetClientBySelector = mock.MagicMock() + self.grr_timeline_collector._GetClientBySelector.return_value = ( + mock_grr_hosts.MOCK_CLIENT) + + with (mock.patch('grr_api_client.client.ClientBase.CreateFlow' + ) as mock_createflow, + mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone' + ) as mock_waituntildone, + mock.patch('grr_api_client.flow.FlowBase.GetCollectedTimelineBody' + ) as mock_getcollectedtimeline): + mock_createflow.return_value.flow_id = 'F:12345' + mock_waituntildone.return_value = None + mock_getcollectedtimeline.return_value.WriteToFile = _MOCK_WRITE_TO_FILE + + self.grr_timeline_collector.PreProcess() + in_containers = self.grr_timeline_collector.GetContainers( + self.grr_timeline_collector.GetThreadOnContainerType()) + for c in in_containers: + self.grr_timeline_collector.Process(c) # pytype: disable=wrong-arg-types + self.grr_timeline_collector.PostProcess() - self.grr_timeline_collector.PreProcess() - in_containers = self.grr_timeline_collector.GetContainers( - self.grr_timeline_collector.GetThreadOnContainerType()) - for c in in_containers: - self.grr_timeline_collector.Process(c) - self.grr_timeline_collector.PostProcess() + mock_createflow.assert_called_once_with( + name='TimelineFlow', args=timeline_pb2.TimelineArgs(root=b'/')) + mock_waituntildone.assert_called_once() - args, _ = mock_DownloadTimeline.call_args - self.assertEqual(args[0], mock_grr_hosts.MOCK_CLIENT_RECENT) - self.assertEqual(args[1].flow_id, "F:12345") - self.assertTrue(args[2].startswith("/tmp/")) + expected_output_path = os.path.join( + self.grr_timeline_collector.output_path, + f'{mock_createflow.return_value.flow_id}_timeline.body') - results = self.grr_timeline_collector.GetContainers(containers.File) - self.assertEqual(len(results), 1) - result = results[0] - self.assertEqual(result.name, 'tomchop') - self.assertEqual(result.path, '/tmp/something') + self.assertTrue(os.path.exists(expected_output_path)) @mock.patch('grr_api_client.api.InitHttp') - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._DownloadFiles') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._FindClients') @@ -905,7 +880,7 @@ def testProcessFromContainers(self, mock_FindClients, unused_LaunchFlow, unused_DownloadFiles, - unused_AwaitFlow, + unused_WaitUntilDone, mock_InitHttp): """Tests that processing works when only containers are passed.""" mock_InitHttp.return_value = self.mock_grr_api @@ -973,7 +948,7 @@ def testInitialization(self): """Tests that the collector can be initialized.""" self.assertIsNotNone(self.grr_osquery_collector) - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRROsqueryCollector.' '_DownloadResults') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @@ -1072,7 +1047,7 @@ def testInitializeBadRegex(self, unused_mock_InitHttp): 'subpattern at position 15', error.exception.message) - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') + @mock.patch('grr_api_client.flow.FlowBase.WaitUntilDone') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRROsqueryCollector.' '_DownloadResults') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @@ -1084,7 +1059,7 @@ def testProcess( mock_Get, unused_mock_LaunchFlow, unused_mock_DownloadResults, - unused_mock_AwaitFlow): + unused_mock_WaitUntilDone): """Tests that the module launches appropriate flows.""" mock_InitHttp.return_value.SearchClients.return_value = \ mock_grr_hosts.MOCK_CLIENT_LIST @@ -1145,7 +1120,6 @@ def testProcess( ] ) - @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._AwaitFlow') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRROsqueryCollector.' '_DownloadResults') @mock.patch('dftimewolf.lib.collectors.grr_hosts.GRRFlow._LaunchFlow') @@ -1156,8 +1130,7 @@ def testPreProcess( unused_mock_InitHttp, unused_mock_Get, unused_mock_LaunchFlow, - unused_mock_DownloadResults, - unused_mock_AwaitFlow): + unused_mock_DownloadResults): """Tests that the prexes are appended to a Yara rule that uses modules.""" self.grr_yara_scanner.SetUp( reason='Random reason', diff --git a/tests/lib/collectors/test_data/mock_grr_hosts.py b/tests/lib/collectors/test_data/mock_grr_hosts.py index a655093a1..f4b739cb0 100644 --- a/tests/lib/collectors/test_data/mock_grr_hosts.py +++ b/tests/lib/collectors/test_data/mock_grr_hosts.py @@ -1,6 +1,7 @@ """Mocks objects and protos for the GRR Host module tests.""" import datetime +import mock from grr_api_client import client from grr_api_client import flow @@ -91,7 +92,8 @@ ) MOCK_CLIENT = client.Client( - data=text_format.Parse(client_proto1, client_pb2.ApiClient()), context=True) + data=text_format.Parse(client_proto1, client_pb2.ApiClient()), + context=mock.MagicMock()) MOCK_CLIENT_RECENT = client.Client( data=text_format.Parse(client_proto2, client_pb2.ApiClient()), context=True) MOCK_WINDOWS_CLIENT = client.Client( @@ -160,3 +162,45 @@ ) ] ) + +MOCK_CFF_PAYLOAD_DIRECTORY_TEXT = """ +payload: { + [type.googleapis.com/grr.FileFinderResult] { + stat_entry { + st_mode: 17901 + pathspec { + pathtype: OS + path: "/directory" + path_options: CASE_LITERAL + } + } + } +} +""" +MOCK_CFF_PAYLOAD_DIRECTORY = flow.FlowResult( + data=text_format.Parse( + MOCK_CFF_PAYLOAD_DIRECTORY_TEXT, flow_pb2.ApiFlowResult())) + +MOCK_CFF_PAYLOAD_FILE_TEXT = """ +payload: { + [type.googleapis.com/grr.FileFinderResult] { + stat_entry { + st_mode: 33184 + st_size: 1024 + pathspec { + pathtype: OS + path: "/directory/file" + path_options: CASE_LITERAL + } + } + } +} +""" +MOCK_CFF_PAYLOAD_FILE = flow.FlowResult( + data=text_format.Parse( + MOCK_CFF_PAYLOAD_FILE_TEXT, flow_pb2.ApiFlowResult())) + +MOCK_CFF_RESULTS = [ + MOCK_CFF_PAYLOAD_DIRECTORY, + MOCK_CFF_PAYLOAD_FILE +]