diff --git a/.coveragerc b/.coveragerc index f3791c30c..f669f1698 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,11 +1,12 @@ # .coveragerc to control coverage.py [run] omit = - */test_templates.py + pytest_splunk_addon/standard_lib/*/test_templates.py pytest_splunk_addon/standard_lib/cim_compliance/base_report.py pytest_splunk_addon/standard_lib/cim_compliance/base_table.py pytest_splunk_addon/standard_lib/cim_tests/base_schema.py pytest_splunk_addon/standard_lib/event_ingestors/base_event_ingestor.py + pytest_splunk_addon/standard_lib/addon_basic.py [report] exclude_lines = diff --git a/pytest_splunk_addon/standard_lib/app_test_generator.py b/pytest_splunk_addon/standard_lib/app_test_generator.py index a674d8a1f..5c3a0a1cc 100644 --- a/pytest_splunk_addon/standard_lib/app_test_generator.py +++ b/pytest_splunk_addon/standard_lib/app_test_generator.py @@ -120,11 +120,8 @@ def generate_tests(self, fixture): test_type="line_breaker" ) ) - if isinstance(pytest_params, str): - LOGGER.warning(pytest_params) - elif pytest_params: - yield from sorted(pytest_params, key=lambda param: param.id) + yield from sorted(pytest_params, key=lambda param: param.id) def dedup_tests(self, test_list, fixture): """ diff --git a/tests/unit/tests_standard_lib/conftest.py b/tests/unit/tests_standard_lib/conftest.py index 1fd012219..758551cf8 100644 --- a/tests/unit/tests_standard_lib/conftest.py +++ b/tests/unit/tests_standard_lib/conftest.py @@ -5,8 +5,8 @@ @pytest.fixture() def mock_object(monkeypatch): - def create_mock_object(object_path): - mo = MagicMock() + def create_mock_object(object_path, **kwargs): + mo = MagicMock(**kwargs) monkeypatch.setattr(object_path, mo) return mo @@ -20,6 +20,13 @@ def open_mock(monkeypatch): return open_mock +@pytest.fixture() +def os_path_join_file_mock(mock_object): + os = mock_object("os.path.join") + os.side_effect = lambda *x: "/".join(x) + return os + + @pytest.fixture() def json_load_mock(mock_object): return mock_object("json.load") diff --git a/tests/unit/tests_standard_lib/test_addon_parser/test_pytest_addon_init.py b/tests/unit/tests_standard_lib/test_addon_parser/test_pytest_addon_init.py index c674cd332..1bbef5fc5 100644 --- a/tests/unit/tests_standard_lib/test_addon_parser/test_pytest_addon_init.py +++ b/tests/unit/tests_standard_lib/test_addon_parser/test_pytest_addon_init.py @@ -7,6 +7,7 @@ PROPS_RETURN_VALUE = "Props_return_value" TAGS_RETURN_VALUE = "Tags_return_value" EVENTTYPE_RETURN_VALUE = "Eventtype_return_value" +SAVEDSEARCH_RETURN_VALUE = "Savedsearch_return_value" TEST_VALUE = "Test_value" ADDON_PARSER_PATH = "pytest_splunk_addon.standard_lib.addon_parser" @@ -19,11 +20,14 @@ def addonparser(): f"{ADDON_PARSER_PATH}.tags_parser.TagsParser" ) as tags_mock, patch( f"{ADDON_PARSER_PATH}.eventtype_parser.EventTypeParser" - ) as eventtype_mock: + ) as eventtype_mock, patch( + f"{ADDON_PARSER_PATH}.savedsearches_parser.SavedSearchParser" + ) as savedsearch_mock: app_mock.return_value = APP_RETURN_VALUE props_mock.return_value = PROPS_RETURN_VALUE tags_mock.return_value = TAGS_RETURN_VALUE eventtype_mock.return_value = EVENTTYPE_RETURN_VALUE + savedsearch_mock.return_value = SAVEDSEARCH_RETURN_VALUE import pytest_splunk_addon.standard_lib.addon_parser importlib.reload(pytest_splunk_addon.standard_lib.addon_parser) @@ -37,6 +41,7 @@ def test_addonparser_init(addonparser): assert ap.props_parser == PROPS_RETURN_VALUE assert ap.tags_parser == TAGS_RETURN_VALUE assert ap.eventtype_parser == EVENTTYPE_RETURN_VALUE + assert ap.savedsearch_parser == SAVEDSEARCH_RETURN_VALUE @pytest.mark.parametrize( @@ -45,6 +50,7 @@ def test_addonparser_init(addonparser): ("get_tags", "tags_parser"), ("get_props_fields", "props_parser"), ("get_eventtypes", "eventtype_parser"), + ("get_savedsearches", "savedsearch_parser"), ], ) def test_get_methods(addonparser, monkeypatch, function, obj_to_mock): diff --git a/tests/unit/tests_standard_lib/test_addon_parser/test_savedsearches_parser.py b/tests/unit/tests_standard_lib/test_addon_parser/test_savedsearches_parser.py new file mode 100644 index 000000000..601d72afb --- /dev/null +++ b/tests/unit/tests_standard_lib/test_addon_parser/test_savedsearches_parser.py @@ -0,0 +1,77 @@ +import pytest +from unittest.mock import patch, PropertyMock +from pytest_splunk_addon.standard_lib.addon_parser.savedsearches_parser import ( + SavedSearchParser, +) + +output_to_build = { + "basic_search": { + "search": "_internal | stats count by sourcetype", + }, + "search_earliest_time": { + "search": "index = _internal | stats count by sourcetype | outputlookup saved_search_data.csv", + "dispatch.earliest_time": "-4d", + }, + "empty_search_latest_time": { + "search": "", + "dispatch.latest_time": "-1s", + }, +} + + +@pytest.fixture(scope="module") +def parsed_output(build_parsed_output): + return build_parsed_output(output_to_build) + + +@pytest.fixture() +def parser_instance(parsed_output, parser): + return parser(SavedSearchParser, "get_config", parsed_output) + + +def test_savedsearches(parser_instance): + assert list(parser_instance.savedsearches.sects.keys()) == [ + "basic_search", + "search_earliest_time", + "empty_search_latest_time", + ] + parser_instance.app.get_config.assert_called_once_with("savedsearches.conf") + + +def test_no_savedsearches_config_file(parser_instance): + parser_instance.app.get_config.side_effect = OSError + assert parser_instance.savedsearches is None + + +def test_get_savedsearches(parser_instance): + out = list(parser_instance.get_savedsearches()) + assert out == [ + { + "stanza": "basic_search", + "search": "_internal | stats count by sourcetype", + "dispatch.earliest_time": "0", + "dispatch.latest_time": "now", + }, + { + "stanza": "search_earliest_time", + "search": "index = _internal | stats count by sourcetype | outputlookup saved_search_data.csv", + "dispatch.earliest_time": "-4d", + "dispatch.latest_time": "now", + }, + { + "stanza": "empty_search_latest_time", + "search": 'index = "main"', + "dispatch.earliest_time": "0", + "dispatch.latest_time": "-1s", + }, + ] + + +def test_get_savedsearches_without_config_file(parser): + with patch.object( + SavedSearchParser, "savedsearches", new_callable=PropertyMock + ) as savedsearches_mock: + savedsearches_mock.return_value = None + parser_instance = parser(SavedSearchParser, "get_config", {}) + output = [search for search in parser_instance.get_savedsearches() if search] + assert output == [], "savedsearches returned when no config file exists" diff --git a/tests/unit/tests_standard_lib/test_addon_parser/test_tags_parser.py b/tests/unit/tests_standard_lib/test_addon_parser/test_tags_parser.py index 643f96f4b..105a6d6c5 100644 --- a/tests/unit/tests_standard_lib/test_addon_parser/test_tags_parser.py +++ b/tests/unit/tests_standard_lib/test_addon_parser/test_tags_parser.py @@ -56,8 +56,7 @@ def test_tags_can_be_parsed_and_returned(parser_instance): def test_get_tags_calls_app_get_config(parser_instance): for _ in parser_instance.get_tags(): pass - parser_instance.app.get_config.assert_called_once() - parser_instance.app.get_config.assert_called_with("tags.conf") + parser_instance.app.get_config.assert_called_once_with("tags.conf") def test_no_tags_config_file(parser_instance): diff --git a/tests/unit/tests_standard_lib/test_app_test_generator.py b/tests/unit/tests_standard_lib/test_app_test_generator.py new file mode 100644 index 000000000..d2321fb5e --- /dev/null +++ b/tests/unit/tests_standard_lib/test_app_test_generator.py @@ -0,0 +1,188 @@ +import pytest +from unittest.mock import patch +from collections import namedtuple +from pytest_splunk_addon.standard_lib.app_test_generator import AppTestGenerator + +module = "pytest_splunk_addon.standard_lib.app_test_generator" +config = { + "splunk_app": "fake_app", + "field_bank": "fake_field_bank", + "splunk_dm_path": "fake_path", + "store_events": True, + "splunk_data_generator": "psa.conf", +} +pytest_config = namedtuple("Config", ["getoption"]) +test_config = pytest_config(getoption=lambda x, *y: config[x]) +test_config_without_dm_path = pytest_config( + getoption=lambda x, *y: config[x] if x != "splunk_dm_path" else None +) +params = namedtuple("ParameterSet", ["values", "id"]) + + +@pytest.fixture() +def app_test_generator(mock_object): + fieldtest_generator = mock_object(f"{module}.FieldTestGenerator") + cim_test_generator = mock_object(f"{module}.CIMTestGenerator") + indextime_test_generator = mock_object(f"{module}.IndexTimeTestGenerator") + requirement_test_generator = mock_object(f"{module}.ReqsTestGenerator") + for mock_element in [ + fieldtest_generator, + cim_test_generator, + indextime_test_generator, + requirement_test_generator, + ]: + setattr(mock_element, "return_value", mock_element) + + +@pytest.mark.parametrize( + "simple_config, path", + [ + (test_config, "fake_path"), + (test_config_without_dm_path, "/fake_dir/data_models"), + ], +) +def test_app_test_generator_instantiation( + mock_object, os_path_join_file_mock, app_test_generator, simple_config, path +): + os_path_dirname_mock = mock_object("os.path.dirname") + os_path_dirname_mock.return_value = "/fake_dir" + atg = AppTestGenerator(simple_config) + atg.fieldtest_generator.assert_called_once_with( + config["splunk_app"], field_bank=config["field_bank"] + ) + atg.cim_test_generator.assert_called_once_with(config["splunk_app"], path) + atg.requirement_test_generator.assert_called_once_with(config["splunk_app"]) + atg.indextime_test_generator.assert_called_once_with() + + +@pytest.mark.parametrize( + "fixture, called_function, test_generator, generator_args, generator_kwargs, expected_tests, dedup_call_count", + [ + ( + "splunk_searchtime_fields", + "fieldtest_generator", + lambda fixture: (f"{fixture}_test_{i + 1}" for i in range(3)), + ["splunk_searchtime_fields"], + {}, + [ + "splunk_searchtime_fields_test_1", + "splunk_searchtime_fields_test_2", + "splunk_searchtime_fields_test_3", + ], + 1, + ), + ( + "splunk_searchtime_cim", + "cim_test_generator", + lambda fixture: (f"{fixture}_test_{i + 1}" for i in range(3)), + ["splunk_searchtime_cim"], + {}, + [ + "splunk_searchtime_cim_test_1", + "splunk_searchtime_cim_test_2", + "splunk_searchtime_cim_test_3", + ], + 1, + ), + ( + "splunk_searchtime_requirement", + "requirement_test_generator", + lambda fixture: (f"{fixture}_test_{i + 1}" for i in range(3)), + ["splunk_searchtime_requirement"], + {}, + [ + "splunk_searchtime_requirement_test_1", + "splunk_searchtime_requirement_test_2", + "splunk_searchtime_requirement_test_3", + ], + 1, + ), + ( + "splunk_indextime_key_fields", + "indextime_test_generator", + lambda x, app_path, config_path, test_type: ( + params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) + for i in range(3) + ), + [True], + { + "app_path": "fake_app", + "config_path": "psa.conf", + "test_type": "key_fields", + }, + [ + params(values=f"splunk_indextime_key_fields_test_1", id=1), + params(values=f"splunk_indextime_key_fields_test_2", id=2), + params(values=f"splunk_indextime_key_fields_test_3", id=3), + ], + 0, + ), + ( + "splunk_indextime_time", + "indextime_test_generator", + lambda x, app_path, config_path, test_type: ( + params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) + for i in range(3) + ), + [True], + {"app_path": "fake_app", "config_path": "psa.conf", "test_type": "_time"}, + [ + params(values=f"splunk_indextime__time_test_1", id=1), + params(values=f"splunk_indextime__time_test_2", id=2), + params(values=f"splunk_indextime__time_test_3", id=3), + ], + 0, + ), + ( + "splunk_indextime_line_breaker", + "indextime_test_generator", + lambda x, app_path, config_path, test_type: ( + params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) + for i in range(3) + ), + [True], + { + "app_path": "fake_app", + "config_path": "psa.conf", + "test_type": "line_breaker", + }, + [ + params(values=f"splunk_indextime_line_breaker_test_1", id=1), + params(values=f"splunk_indextime_line_breaker_test_2", id=2), + params(values=f"splunk_indextime_line_breaker_test_3", id=3), + ], + 0, + ), + ], +) +def test_generate_tests( + app_test_generator, + fixture, + called_function, + test_generator, + generator_args, + generator_kwargs, + expected_tests, + dedup_call_count, +): + atg = AppTestGenerator(test_config) + setattr(getattr(atg, called_function).generate_tests, "side_effect", test_generator) + with patch.object( + AppTestGenerator, "dedup_tests", side_effect=lambda x, y: x + ) as dedup_mock: + out = list(atg.generate_tests(fixture)) + assert out == expected_tests + getattr(atg, called_function).generate_tests.assert_called_once_with( + *generator_args, **generator_kwargs + ) + assert dedup_mock.call_count == dedup_call_count + + +def test_dedup_tests(app_test_generator): + parameter_list = [params(values=f"val{x}", id=x) for x in range(7)] + atg = AppTestGenerator(test_config) + out = [] + for parameters in [parameter_list[:3], parameter_list[2:5]]: + out.extend(atg.dedup_tests(parameters, "splunk_searchtime")) + assert out == parameter_list[:5] + assert atg.seen_tests == {("splunk_searchtime", x) for x in range(5)} diff --git a/tests/unit/tests_standard_lib/test_event_ingestors/test_file_monitor_ingestor.py b/tests/unit/tests_standard_lib/test_event_ingestors/test_file_monitor_ingestor.py new file mode 100644 index 000000000..293e662a1 --- /dev/null +++ b/tests/unit/tests_standard_lib/test_event_ingestors/test_file_monitor_ingestor.py @@ -0,0 +1,199 @@ +import pytest +from unittest.mock import patch, call +from urllib.parse import unquote +from requests.exceptions import ConnectionError +from collections import namedtuple +from os import sep as os_sep +from pytest_splunk_addon.standard_lib.event_ingestors.file_monitor_ingestor import ( + FileMonitorEventIngestor, +) + +file_name = "pytest_splunk_addon.standard_lib.event_ingestors.file_monitor_ingestor" +required_config = { + "uf_host": "localhost", + "uf_port": "8888", + "uf_username": "admin", + "uf_password": "secret", +} +sample_event = namedtuple("SampleEvent", ["event", "metadata"], defaults=[None, None]) + + +def test_ingest(mock_object): + mock_object(f"{file_name}.sleep") + events = ["event_1", "event_2", "event_3"] + with patch.object( + FileMonitorEventIngestor, "create_output_conf" + ) as create_output_conf_mock, patch.object( + FileMonitorEventIngestor, "create_event_file" + ) as create_event_file_mock, patch.object( + FileMonitorEventIngestor, "create_inputs_stanza" + ) as create_inputs_stanza_mock: + fmei = FileMonitorEventIngestor(required_config) + fmei.ingest(events, 20) + create_output_conf_mock.assert_called_once_with() + create_event_file_mock.assert_has_calls([call(event) for event in events]) + create_inputs_stanza_mock.assert_has_calls([call(event) for event in events]) + assert ( + create_event_file_mock.call_count + == create_inputs_stanza_mock.call_count + == len(events) + ) + + +def test_create_output_conf(requests_mock): + requests_mock.post("https://localhost:8888/services/data/outputs/tcp/group") + fmei = FileMonitorEventIngestor(required_config) + fmei.create_output_conf() + sent_requests = [ + (unquote(str(req)), unquote(req.text)) for req in requests_mock.request_history + ] + assert sent_requests == [ + ( + "POST https://localhost:8888/services/data/outputs/tcp/group", + "name=uf_monitor&servers=splunk:9997", + ) + ] + + +@pytest.mark.parametrize( + "response, message", + [ + ( + {"text": "Not Found", "status_code": 404}, + "Unable to create stanza in outputs.conf\nStatus code: 404 \nReason: None \ntext:Not Found", + ), + ( + {"exc": ConnectionError("test connection error")}, + "Unable to connect to Universal forwarder, test connection error", + ), + ], +) +def test_create_output_conf_bad_request(requests_mock, caplog, response, message): + requests_mock.post( + "https://localhost:8888/services/data/outputs/tcp/group", **response + ) + fmei = FileMonitorEventIngestor(required_config) + fmei.create_output_conf() + assert message in caplog.messages + + +def test_create_event_file(open_mock): + with patch.object( + FileMonitorEventIngestor, "get_file_path", return_value="fake_file" + ): + fmei = FileMonitorEventIngestor(required_config) + fmei.create_event_file( + sample_event( + event="CREATE EVENT", + metadata={"host": "localhost", "source": "sys.log"}, + ) + ) + open_mock.assert_has_calls( + [ + call("fake_file", "w+"), + call().__enter__(), + call().write("CREATE EVENT"), + call().__exit__(None, None, None), + ] + ) + + +def test_create_event_file_gets_an_exception(open_mock, caplog): + open_mock().write.side_effect = Exception("test error") + with patch.object( + FileMonitorEventIngestor, "get_file_path", return_value="fake_file" + ): + fmei = FileMonitorEventIngestor(required_config) + fmei.create_event_file( + sample_event( + event="CREATE EVENT", + metadata={"host": "localhost", "source": "sys.log"}, + ) + ) + assert ( + "Unable to create event file for host : localhost, Reason : test error" + in caplog.messages + ) + + +@pytest.mark.parametrize( + "event, post_args", + [ + ( + sample_event( + metadata={"sourcetype": "splunkd", "index": "ut", "host_type": "plugin"} + ), + f"name={os_sep}home{os_sep}uf_files{os_sep}host_name{os_sep}sample_name&" + f"sourcetype=splunkd&index=ut&disabled=False&crc-salt=&host_segment=3", + ), + ( + sample_event(metadata={"host_type": "modinput"}), + f"name={os_sep}home{os_sep}uf_files{os_sep}host_name{os_sep}sample_name&" + f"sourcetype=pytest_splunk_addon&index=main&disabled=False&crc-salt=", + ), + ], +) +def test_create_inputs_stanza(requests_mock, event, post_args): + requests_mock.post( + "https://localhost:8888/servicesNS/nobody/search/data/inputs/monitor" + ) + with patch.object( + FileMonitorEventIngestor, + "get_file_path", + return_value=f"{os_sep}home{os_sep}uf_files{os_sep}host_name{os_sep}sample_name", + ): + fmei = FileMonitorEventIngestor(required_config) + fmei.create_inputs_stanza(event) + sent_requests = [ + (unquote(str(req)), unquote(req.text)) + for req in requests_mock.request_history + ] + assert sent_requests == [ + ( + "POST https://localhost:8888/servicesNS/nobody/search/data/inputs/monitor", + post_args, + ) + ] + + +@pytest.mark.parametrize( + "response, message", + [ + ( + {"text": "Not Found", "status_code": 404}, + f"Unable to add stanza in inputs.conf for Path : {os_sep}home{os_sep}uf_files{os_sep}host_name{os_sep}" + f"sample_name \nStatus code: 404 \nReason: None \ntext:Not Found", + ), + ( + {"exc": ConnectionError("test connection error")}, + "Unable to connect to Universal forwarder, test connection error", + ), + ], +) +def test_create_inputs_stanza_bad_request(requests_mock, caplog, response, message): + requests_mock.post( + "https://localhost:8888/servicesNS/nobody/search/data/inputs/monitor", + **response, + ) + with patch.object( + FileMonitorEventIngestor, + "get_file_path", + return_value=f"{os_sep}home{os_sep}uf_files{os_sep}host_name{os_sep}sample_name", + ): + fmei = FileMonitorEventIngestor(required_config) + fmei.create_inputs_stanza(sample_event(metadata={"host_type": "modinput"})) + assert message in caplog.messages + + +def test_get_file_path(open_mock, mock_object, os_path_join_file_mock): + os_getcwd_mock = mock_object("os.getcwd") + os_getcwd_mock.return_value = "/fake_path" + os_path_exists = mock_object("os.path.exists") + os_path_exists.return_value = False + os_mkdir = mock_object("os.mkdir") + fmei = FileMonitorEventIngestor(required_config) + out = fmei.get_file_path( + sample_event(metadata={"host": "localhost", "source": "sys.log"}) + ) + assert out == "/fake_path/uf_files/localhost/sys.log" + os_mkdir.assert_called_once_with("/fake_path/uf_files/localhost") diff --git a/tests/unit/tests_standard_lib/test_event_ingestors/test_requirement_event_ingestor.py b/tests/unit/tests_standard_lib/test_event_ingestors/test_requirement_event_ingestor.py index ee17b79b6..bf5351784 100644 --- a/tests/unit/tests_standard_lib/test_event_ingestors/test_requirement_event_ingestor.py +++ b/tests/unit/tests_standard_lib/test_event_ingestors/test_requirement_event_ingestor.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import MagicMock, mock_open, call, patch +from unittest.mock import MagicMock, call, patch from recordtype import recordtype from pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester import ( RequirementEventIngestor, @@ -8,6 +8,7 @@ ) +module = "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester" src_regex = recordtype("SrcRegex", [("regex_src", None), ("source_type", None)]) sample_event = recordtype( "SampleEvent", @@ -15,31 +16,14 @@ ) -@pytest.fixture() -def open_mock(monkeypatch): - monkeypatch.setattr("builtins.open", mock_open()) - - -@pytest.fixture() -def os_mock(monkeypatch): - monkeypatch.setattr("os.path.isdir", MagicMock(return_value=True)) - monkeypatch.setattr("os.listdir", MagicMock(return_value=["sample.log"])) - - @pytest.fixture() def sample_event_mock(monkeypatch): - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester.SampleEvent", - sample_event, - ) + monkeypatch.setattr(f"{module}.SampleEvent", sample_event) @pytest.fixture() def src_regex_mock(monkeypatch): - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester.SrcRegex", - src_regex, - ) + monkeypatch.setattr(f"{module}.SrcRegex", src_regex) @pytest.fixture() @@ -47,10 +31,7 @@ def get_root_mocked(monkeypatch): tree_mock = MagicMock() tree_mock.return_value = tree_mock tree_mock.getroot.return_value = "root" - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester.ET.parse", - tree_mock, - ) + monkeypatch.setattr(f"{module}.ET.parse", tree_mock) @pytest.fixture() @@ -92,36 +73,24 @@ def configparser_mock(monkeypatch): @pytest.fixture() -def requirement_ingestor_mocked(monkeypatch): - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester." - "RequirementEventIngestor.extract_regex_transforms", - MagicMock(return_value=[src_regex("event", "host::$1")]), +def requirement_ingestor_mocked(monkeypatch, mock_object): + mock_object( + f"{module}.RequirementEventIngestor.extract_regex_transforms", + return_value=[src_regex("event", "host::$1")], ) - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester." - "RequirementEventIngestor.check_xml_format", - MagicMock(return_value=True), + mock_object( + f"{module}.RequirementEventIngestor.check_xml_format", return_value=True ) - root_mock = MagicMock() + root_mock = mock_object(f"{module}.RequirementEventIngestor.get_root") root_mock.return_value = root_mock root_mock.iter.return_value = ["session created", "session closed"] - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester." - "RequirementEventIngestor.get_root", - root_mock, - ) - ere_mock = MagicMock(side_effect=lambda x: f"event: {x}") - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester." - "RequirementEventIngestor.extract_raw_events", - ere_mock, + ere_mock = mock_object( + f"{module}.RequirementEventIngestor.extract_raw_events", + side_effect=lambda x: f"event: {x}", ) - es_mock = MagicMock(side_effect=("host$1", "host$2")) - monkeypatch.setattr( - "pytest_splunk_addon.standard_lib.event_ingestors.requirement_event_ingester." - "RequirementEventIngestor.extract_sourcetype", - es_mock, + es_mock = mock_object( + f"{module}.RequirementEventIngestor.extract_sourcetype", + side_effect=("host$1", "host$2"), ) return {"root_mock": root_mock, "ere_mock": ere_mock, "es_mock": es_mock} @@ -174,8 +143,10 @@ def test_sourcetype_can_be_extracted(): def test_events_can_be_obtained( - requirement_ingestor_mocked, os_mock, sample_event_mock + mock_object, requirement_ingestor_mocked, sample_event_mock ): + mock_object("os.path.isdir", return_value=True) + mock_object("os.listdir", return_value=["sample.log"]) req = RequirementEventIngestor("fake_path") assert req.get_events() == [ sample_event( diff --git a/tests/unit/tests_standard_lib/test_requirement_tests/__init__.py b/tests/unit/tests_standard_lib/test_requirement_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/tests_standard_lib/test_requirement_tests/test_test_generator.py b/tests/unit/tests_standard_lib/test_requirement_tests/test_test_generator.py new file mode 100644 index 000000000..0e167917e --- /dev/null +++ b/tests/unit/tests_standard_lib/test_requirement_tests/test_test_generator.py @@ -0,0 +1,338 @@ +import pytest +from unittest.mock import MagicMock, call, patch +from collections import namedtuple +from recordtype import recordtype +from pytest_splunk_addon.standard_lib.requirement_tests.test_generator import ( + ReqsTestGenerator, + SrcRegex, +) + +root_content = namedtuple("Element", ["text"]) +src_regex = recordtype("SrcRegex", [("regex_src", None), ("source_type", None)]) + + +@pytest.fixture() +def reqs_test_generator(os_path_join_file_mock): + return ReqsTestGenerator("fake_path") + + +@pytest.fixture() +def root_mock(): + root_mock = MagicMock() + root_mock.tags = { + "model": ["Network Traffic", "Authentication"], + "raw": ["RT_FLOW_SESSION_CREATE"], + } + root_mock.iter.side_effect = lambda x: ( + root_content(text=item) for item in root_mock.tags[x] + ) + return root_mock + + +@pytest.fixture() +def et_parse_mock(monkeypatch): + tree_mock = MagicMock() + monkeypatch.setattr( + "pytest_splunk_addon.standard_lib.requirement_tests.test_generator.ET.parse", + tree_mock, + ) + return tree_mock + + +@pytest.fixture() +def src_regex_mock(monkeypatch): + monkeypatch.setattr( + "pytest_splunk_addon.standard_lib.requirement_tests.test_generator.SrcRegex", + src_regex, + ) + + +@pytest.fixture() +def configparser_mock(monkeypatch): + config_mock = MagicMock() + config_mock.sections.return_value = [ + "ta_fiction_lookup", + "fiction-rsc-delim-fields", + "fiction-tsc-regex", + ] + items = { + "ta_fiction_lookup": {"a": 1, "b": 3}, + "fiction-rsc-delim-fields": { + "dest_key": "MetaData:Sourcetype", + "fields": "day_id, event_id, end_time, start_time", + "format": 'comp::"$1"', + }, + "fiction-tsc-regex": { + "dest_key": "MetaData:Sourcetype", + "regex": "group=(?[^,]+)", + }, + } + config_mock.__getitem__.side_effect = lambda key: items[key] + config_mock.return_value = config_mock + monkeypatch.setattr( + "configparser.ConfigParser", + config_mock, + ) + return config_mock + + +def test_src_regex_can_be_instantiated(): + srcregex = SrcRegex() + assert hasattr(srcregex, "regex_src") + assert hasattr(srcregex, "source_type") + + +def test_generate_tests(): + with patch.object( + ReqsTestGenerator, + "generate_cim_req_params", + side_effect=lambda: (tc for tc in ["test_1", "test_2", "test_3"]), + ): + rtg = ReqsTestGenerator("fake_path") + out = list(rtg.generate_tests("splunk_searchtime_requirement_param")) + assert out == ["test_1", "test_2", "test_3"] + + +def test_extract_key_value_xml(): + event = MagicMock() + event.iter.side_effect = lambda x: ( + d + for d in [ + {"name": "field1", "value": "value1"}, + {"name": "field2", "value": "value2"}, + ] + ) + rtg = ReqsTestGenerator("fake_path") + out = rtg.extract_key_value_xml(event) + assert out == {"field1": "value1", "field2": "value2"} + event.iter.assert_called_once_with("field") + + +@pytest.mark.parametrize( + "listdir_return_value, " + "check_xml_format_return_value, " + "root_events, " + "extractSourcetype_return_value, " + "get_models_return_value, " + "extract_key_value_xml_return_value, " + "expected_output", + [ + ( + ["requirement.log"], + [True], + {"event": ["event_1", "event_2"]}, + ["splunkd", "sc4s"], + [["model_1", "model_2"], ["model_3"]], + [{"field1": "value1", "field2": "value2"}, {"field3": "value3"}], + [ + ( + { + "model": "model_1", + "escaped_event": "event_1", + "filename": "fake_path/requirement_files/requirement.log", + "sourcetype": "splunkd", + "Key_value_dict": {"field1": "value1", "field2": "value2"}, + }, + "model_1::fake_path/requirement_files/requirement.log::req_test_id::1", + ), + ( + { + "model": "model_2", + "escaped_event": "event_1", + "filename": "fake_path/requirement_files/requirement.log", + "sourcetype": "splunkd", + "Key_value_dict": {"field1": "value1", "field2": "value2"}, + }, + "model_2::fake_path/requirement_files/requirement.log::req_test_id::2", + ), + ( + { + "model": "model_3", + "escaped_event": "event_2", + "filename": "fake_path/requirement_files/requirement.log", + "sourcetype": "sc4s", + "Key_value_dict": {"field3": "value3"}, + }, + "model_3::fake_path/requirement_files/requirement.log::req_test_id::3", + ), + ], + ), + ( + ["requirement.xml"], + [True], + {"event": ["event_1", "event_2"]}, + ["splunkd", "sc4s"], + [["model_1", "model_2"], ["model_3"]], + [{"field1": "value1", "field2": "value2"}, {"field3": "value3"}], + [], + ), + ( + ["not_requirement.log"], + Exception, + {"event": ["event_1", "event_2"]}, + ["splunkd", "sc4s"], + [["model_1", "model_2"], ["model_3"]], + [{"field1": "value1", "field2": "value2"}, {"field3": "value3"}], + [ + ( + { + "model": None, + "escaped_event": None, + "filename": "fake_path/requirement_files/not_requirement.log", + "sourcetype": None, + }, + "None::fake_path/requirement_files/not_requirement.log::req_test_id::1", + ), + ], + ), + ( + ["req.log"], + [True], + {"event": ["event_1"]}, + ["splunkd", "sc4s"], + [[]], + [{"field1": "value1", "field2": "value2"}, {"field3": "value3"}], + [], + ), + ], +) +def test_generate_cim_req_params( + mock_object, + root_mock, + listdir_return_value, + check_xml_format_return_value, + root_events, + extractSourcetype_return_value, + get_models_return_value, + extract_key_value_xml_return_value, + expected_output, +): + os_path_is_dir_mock = mock_object("os.path.isdir") + os_path_is_dir_mock.return_value = True + os_listdir_mock = mock_object("os.listdir") + os_listdir_mock.return_value = listdir_return_value + root_mock.tags.update(root_events) + with patch.object( + ReqsTestGenerator, + "extractRegexTransforms", + retrun_value=[ + src_regex(source_type='comp::"$1"', regex_src="group=(?[^,]+)") + ], + ), patch.object( + ReqsTestGenerator, "check_xml_format", side_effect=check_xml_format_return_value + ), patch.object( + ReqsTestGenerator, "get_root", side_effect=[root_mock] + ), patch.object( + ReqsTestGenerator, "get_event", side_effect=lambda x: x.text + ), patch.object( + ReqsTestGenerator, + "extractSourcetype", + side_effect=extractSourcetype_return_value, + ), patch.object( + ReqsTestGenerator, "escape_char_event", side_effect=lambda x: x + ), patch.object( + ReqsTestGenerator, "get_models", side_effect=get_models_return_value + ), patch.object( + ReqsTestGenerator, + "extract_key_value_xml", + side_effect=extract_key_value_xml_return_value, + ), patch.object( + pytest, "param", side_effect=lambda x, id: (x, id) + ) as param_mock: + rtg = ReqsTestGenerator("fake_path") + out = list(rtg.generate_cim_req_params()) + assert out == expected_output + param_mock.assert_has_calls( + [call(param[0], id=param[1]) for param in expected_output] + ) + + +def test_get_models(root_mock): + rtg = ReqsTestGenerator("fake_path") + assert rtg.get_models(root_mock) == ["Network Traffic", "Authentication"] + root_mock.iter.assert_called_once_with("model") + + +def test_get_event(root_mock): + rtg = ReqsTestGenerator("fake_path") + assert rtg.get_event(root_mock) == "RT_FLOW_SESSION_CREATE" + root_mock.iter.assert_called_once_with("raw") + + +def test_get_root(et_parse_mock): + et_parse_mock.return_value = et_parse_mock + et_parse_mock.getroot.return_value = "root" + rtg = ReqsTestGenerator("fake_path") + assert rtg.get_root("requirement.log") == "root" + et_parse_mock.assert_has_calls([call("requirement.log"), call.getroot()]) + + +@pytest.mark.parametrize( + "is_xml_valid, expected_output", + [ + (True, True), + (False, None), + ], +) +def test_check_xml_format(et_parse_mock, is_xml_valid, expected_output): + et_parse_mock.return_value = is_xml_valid + rtg = ReqsTestGenerator("fake_path") + assert rtg.check_xml_format("requirement.log") is expected_output + + +@pytest.mark.parametrize( + "escape_char, expected_output", + [ + ("\\", "SESSION\\\\CREATED"), + ("`", "SESSION\\`CREATED"), + ("~", "SESSION\\~CREATED"), + ("!", "SESSION\\!CREATED"), + ("@", "SESSION\\@CREATED"), + ("#", "SESSION\\#CREATED"), + ("$", "SESSION\\$CREATED"), + ("%", "SESSION\\%CREATED"), + ("^", "SESSION\\^CREATED"), + ("&", "SESSION\\&CREATED"), + ("*", "SESSION\\*CREATED"), + ("(", "SESSION\\(CREATED"), + (")", "SESSION\\)CREATED"), + ("-", "SESSION\\-CREATED"), + ("=", "SESSION\\=CREATED"), + ("+", "SESSION\\+CREATED"), + ("[", "SESSION\\[CREATED"), + ("]", "SESSION\\]CREATED"), + ("}", "SESSION\\}CREATED"), + ("{", "SESSION\\{CREATED"), + ("|", "SESSION\\|CREATED"), + (";", "SESSION\\;CREATED"), + (":", "SESSION\\:CREATED"), + ("'", "SESSION\\'CREATED"), + ('"', 'SESSION\\"CREATED'), + ("\,", "SESSION\\\\\,CREATED"), + ("<", "SESSION\\", "SESSION\\>CREATED"), + ("\/", "SESSION\\\\\/CREATED"), + ("?", "SESSION\\?CREATED"), + ], +) +def test_escape_char_event(escape_char, expected_output): + rtg = ReqsTestGenerator("fake_path") + assert rtg.escape_char_event(f"SESSION{escape_char}CREATED") == expected_output + + +def test_extrect_regex_transforms(open_mock, configparser_mock, src_regex_mock): + rtg = ReqsTestGenerator("fake_path") + out = rtg.extractRegexTransforms() + assert out == [ + src_regex(source_type='comp::"$1"'), + src_regex(regex_src="group=(?[^,]+)"), + ] + + +def test_extract_sourcetype(): + rtg = ReqsTestGenerator("fake_path") + out = rtg.extractSourcetype( + [src_regex(source_type='comp::"$1"', regex_src="group=(\\w+),")], + "event start group=alert, event end", + ) + assert out == '"$1"'