diff --git a/importer_client/python/timesketch_import_client/importer.py b/importer_client/python/timesketch_import_client/importer.py index b944a7dfe8..20731f8e9c 100644 --- a/importer_client/python/timesketch_import_client/importer.py +++ b/importer_client/python/timesketch_import_client/importer.py @@ -28,11 +28,41 @@ from timesketch_api_client import timeline from timesketch_api_client import definitions +from timesketch_api_client.error import UnableToRunAnalyzer from timesketch_import_client import utils logger = logging.getLogger("timesketch_importer.importer") +def run_analyzers(analyzer_names=None, timeline_obj=None): + """Run the analyzers on the uploaded timeline.""" + + if not timeline_obj: + logger.error("Unable to run analyzers: Timeline object not found.") + raise ValueError("Timeline object not found.") + + if timeline_obj.status not in ("ready", "success"): + logger.error("The provided timeline '%s' is not ready yet!", timeline_obj.name) + return None + + if not analyzer_names: + logger.info("No analyzer names provided, skipping analysis.") + return None + + try: + analyzer_results = timeline_obj.run_analyzers(analyzer_names) + except UnableToRunAnalyzer as e: + logger.error( + "Failed to run requested analyzers '%s'! Error: %s", + str(analyzer_names), + str(e), + ) + return None + + logger.debug("Analyzer results: %s", analyzer_results) + return analyzer_results + + class ImportStreamer(object): """Upload object used to stream results to Timesketch.""" @@ -708,8 +738,18 @@ def celery_task_id(self): """Return the celery task identification for the upload.""" return self._celery_task_id + def _trigger_analyzers(self, analyzer_names=None): + """Run the analyzers on the uploaded timeline.""" + + self._ready() + + if self._data_lines: + self.flush(end_stream=True) + + return run_analyzers(analyzer_names=analyzer_names, timeline_obj=self.timeline) + def close(self): - """Close the streamer.""" + """Close the streamer""" try: self._ready() except ValueError: @@ -718,13 +758,6 @@ def close(self): if self._data_lines: self.flush(end_stream=True) - # Trigger auto analyzer pipeline to kick in. - pipe_resource = "{0:s}/sketches/{1:d}/analyzer/".format( - self._sketch.api.api_root, self._sketch.id - ) - data = {"index_name": self._index} - _ = self._sketch.api.session.post(pipe_resource, json=data) - def flush(self, end_stream=True): """Flushes the buffer and uploads to timesketch. @@ -736,6 +769,7 @@ def flush(self, end_stream=True): ValueError: if the stream object is not fully configured. RuntimeError: if the stream was not uploaded. """ + if not self._data_lines: return diff --git a/importer_client/python/timesketch_import_client/importer_test.py b/importer_client/python/timesketch_import_client/importer_test.py index 2776d7682a..15da7a7f5c 100644 --- a/importer_client/python/timesketch_import_client/importer_test.py +++ b/importer_client/python/timesketch_import_client/importer_test.py @@ -17,9 +17,10 @@ import json import unittest import mock - import pandas +from timesketch_api_client.error import UnableToRunAnalyzer + from . import importer @@ -270,3 +271,67 @@ def _run_all_tests(self, columns, lines): ] ) self.assertSetEqual(set(messages), message_correct) + + +class RunAnalyzersTest(unittest.TestCase): + """Tests for the run_analyzers function.""" + + @mock.patch("timesketch_import_client.importer.logger") + def test_run_analyzers_without_timeline(self, mock_logger): + """Test calling run_analyzers without a timeline object.""" + with self.assertRaises(ValueError): + importer.run_analyzers(analyzer_names=["test_analyzer"]) + mock_logger.error.assert_called_with( + "Unable to run analyzers: Timeline object not found." + ) + + @mock.patch("timesketch_import_client.importer.logger") + def test_run_analyzers_timeline_not_ready(self, mock_logger): + """Test calling run_analyzers with a timeline that is not ready.""" + timeline_obj = mock.Mock(status="pending", name="test") + importer.run_analyzers( + analyzer_names=["test_analyzer"], timeline_obj=timeline_obj + ) + mock_logger.error.assert_called_with( + "The provided timeline '%s' is not ready yet!", timeline_obj.name + ) + + @mock.patch("timesketch_import_client.importer.logger") + def test_run_analyzers_without_analyzers(self, mock_logger): + """Test calling run_analyzers without analyzers.""" + timeline_obj = mock.Mock(status="ready") + importer.run_analyzers(timeline_obj=timeline_obj) + mock_logger.info.assert_called_with( + "No analyzer names provided, skipping analysis." + ) + + @mock.patch("timesketch_import_client.importer.logger") + def test_run_analyzers_success(self, mock_logger): + """Test calling run_analyzers successfully.""" + timeline_obj = mock.Mock( + status="ready", run_analyzers=mock.Mock(return_value="Success") + ) + return_value = importer.run_analyzers( + analyzer_names=["test_analyzer"], timeline_obj=timeline_obj + ) + self.assertEqual(return_value, "Success") + mock_logger.debug.assert_called_with("Analyzer results: %s", "Success") + + @mock.patch("timesketch_import_client.importer.logger") + def test_run_analyzers_failed(self, mock_logger): + """Test calling run_analyzers with an exception.""" + timeline_obj = mock.Mock( + status="ready", + run_analyzers=mock.Mock( + side_effect=UnableToRunAnalyzer("Analyzer failed.") + ), + ) + return_value = importer.run_analyzers( + analyzer_names=["test_analyzer"], timeline_obj=timeline_obj + ) + self.assertIsNone(return_value) + mock_logger.error.assert_called_with( + "Failed to run requested analyzers '%s'! Error: %s", + "['test_analyzer']", + "Analyzer failed.", + ) diff --git a/importer_client/python/tools/timesketch_importer.py b/importer_client/python/tools/timesketch_importer.py index f7d51e9c89..eb6645b008 100644 --- a/importer_client/python/tools/timesketch_importer.py +++ b/importer_client/python/tools/timesketch_importer.py @@ -143,6 +143,8 @@ def upload_file( timeline = streamer.timeline task_id = streamer.celery_task_id + streamer.close() + logger.info("File upload completed.") return timeline, task_id @@ -462,6 +464,20 @@ def main(args=None): help=("Path to the file that is to be imported."), ) + config_group.add_argument( + "--analyzer_names", + "--analyzer-names", + nargs="*", + action="store", + dest="analyzer_names", + default=[], + help=( + "Set of analyzers that we will automatically run right after the " + "timelines are uploaded. The input needs to be the analyzers names." + "Provided as strings separated by space" + ), + ) + options = argument_parser.parse_args(args) if options.show_version: @@ -616,6 +632,7 @@ def main(args=None): "log_config_file": options.log_config_file, "data_label": options.data_label, "context": options.context, + "analyzer_names": options.analyzer_names, } logger.info("Uploading file.") @@ -627,6 +644,11 @@ def main(args=None): logger.info( "File got successfully uploaded to sketch: {0:d}".format(my_sketch.id) ) + if options.analyzer_names: + logger.warning( + "Argument 'analyzer_names' only works with 'wait_timeline = " + "True'! Skipping execution of analyzers: {analyzer_names}" + ) return if not timeline: @@ -664,6 +686,16 @@ def main(args=None): print(f"Status of the index is: {task_state}") break + if options.analyzer_names: + logger.info( + "Trigger analyzers: %s on Timeline '%s'", + str(options.analyzer_names), + str(timeline.name), + ) + _ = importer.run_analyzers( + analyzer_names=options.analyzer_names, timeline_obj=timeline + ) + if __name__ == "__main__": main()