From 50a379aa408e72aebb5d12eb9b07e60d3c5a75bf Mon Sep 17 00:00:00 2001 From: Jeroen Dekkers Date: Thu, 1 Jun 2023 17:34:02 +0200 Subject: [PATCH] Fix scan profile db event issue by adding an explicit reference field (1.9) (#1093) Co-authored-by: Donny Peeters <46660228+Donnype@users.noreply.github.com> Co-authored-by: ammar92 Co-authored-by: Patrick --- octopoes/octopoes/core/service.py | 3 +- octopoes/octopoes/events/events.py | 3 +- octopoes/octopoes/events/manager.py | 94 +++++---- .../repositories/scan_profile_repository.py | 2 + octopoes/tests/conftest.py | 24 ++- octopoes/tests/test_event_manager.py | 173 ++++++++++++++++ octopoes/tests/test_octopoes_service.py | 189 ++++++++++-------- 7 files changed, 355 insertions(+), 133 deletions(-) create mode 100644 octopoes/tests/test_event_manager.py diff --git a/octopoes/octopoes/core/service.py b/octopoes/octopoes/core/service.py index fee87bfa892..323d4784ed2 100644 --- a/octopoes/octopoes/core/service.py +++ b/octopoes/octopoes/core/service.py @@ -130,7 +130,6 @@ def save_origin(self, origin: Origin, oois: List[OOI], valid_time: datetime) -> def _run_inference(self, origin: Origin, valid_time: datetime) -> None: bit_definition = get_bit_definitions()[origin.method] - is_disabled = bit_definition.id in settings.bits_disabled or ( not bit_definition.default_enabled and bit_definition.id not in settings.bits_enabled ) @@ -423,7 +422,7 @@ def _on_delete_origin_parameter(self, event: OriginParameterDBEvent) -> None: return def _run_inferences(self, event: ScanProfileDBEvent) -> None: - inference_origins = self.origin_repository.list_by_source(event.new_data.reference, valid_time=event.valid_time) + inference_origins = self.origin_repository.list_by_source(event.reference, valid_time=event.valid_time) inference_origins = [o for o in inference_origins if o.origin_type == OriginType.INFERENCE] for inference_origin in inference_origins: self._run_inference(inference_origin, event.valid_time) diff --git a/octopoes/octopoes/events/events.py b/octopoes/octopoes/events/events.py index 1d11e9cc9b4..2e08841a8d5 100644 --- a/octopoes/octopoes/events/events.py +++ b/octopoes/octopoes/events/events.py @@ -58,12 +58,13 @@ def primary_key(self) -> str: class ScanProfileDBEvent(DBEvent): entity_type: Literal["scan_profile"] = "scan_profile" + reference: Reference old_data: Optional[ScanProfile] new_data: Optional[ScanProfile] @property def primary_key(self) -> Reference: - return self.new_data.reference if self.new_data else self.old_data.reference + return self.reference EVENT_TYPE = Union[OOIDBEvent, OriginDBEvent, OriginParameterDBEvent, ScanProfileDBEvent] diff --git a/octopoes/octopoes/events/manager.py b/octopoes/octopoes/events/manager.py index 4e9b79dedea..4a48474cdfd 100644 --- a/octopoes/octopoes/events/manager.py +++ b/octopoes/octopoes/events/manager.py @@ -54,60 +54,58 @@ def publish(self, event: DBEvent) -> None: event.client, ) - if isinstance(event, ScanProfileDBEvent): - incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or ( - event.operation_type == OperationType.UPDATE and event.new_data.level > event.old_data.level - ) - if incremented: - ooi = json.dumps( - { - "primary_key": event.new_data.reference, - "object_type": event.new_data.reference.class_, - "scan_profile": event.new_data.dict(), - } - ) - - self.channel.basic_publish( - "", - f"{event.client}__scan_profile_increments", - ooi.encode(), - properties=pika.BasicProperties( - delivery_mode=pika.DeliveryMode.Persistent, - ), - ) - - logger.info( - "Published scan_profile_increment [primary_key=%s] [level=%s]", - format_id_short(event.primary_key), - event.new_data.level, - ) - - # publish mutations - mutation = ScanProfileMutation( - operation=event.operation_type, - primary_key=event.primary_key, - ) + if not isinstance(event, ScanProfileDBEvent): + return - if event.operation_type != OperationType.DELETE: - mutation.value = AbstractOOI( - primary_key=event.new_data.reference, - object_type=event.new_data.reference.class_, - scan_profile=event.new_data, - ) + incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or ( + event.operation_type == OperationType.UPDATE + and event.old_data + and event.new_data.level > event.old_data.level + ) + + if incremented: + ooi = json.dumps( + { + "primary_key": event.reference, + "object_type": event.reference.class_, + "scan_profile": event.new_data.dict(), + } + ) self.channel.basic_publish( "", - f"{event.client}__scan_profile_mutations", - mutation.json().encode(), - properties=pika.BasicProperties( - delivery_mode=pika.DeliveryMode.Persistent, - ), + f"{event.client}__scan_profile_increments", + ooi.encode(), + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) - level = mutation.value.scan_profile.level if mutation.value != OperationType.DELETE else None logger.info( - "Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]", - mutation.operation, + "Published scan_profile_increment [primary_key=%s] [level=%s]", format_id_short(event.primary_key), - level, + event.new_data.level, + ) + + # publish mutations + mutation = ScanProfileMutation(operation=event.operation_type, primary_key=event.primary_key) + + if event.operation_type != OperationType.DELETE: + mutation.value = AbstractOOI( + primary_key=event.new_data.reference, + object_type=event.new_data.reference.class_, + scan_profile=event.new_data, ) + + self.channel.basic_publish( + "", + f"{event.client}__scan_profile_mutations", + mutation.json().encode(), + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + level = mutation.value.scan_profile.level if mutation.value is not None else None + logger.info( + "Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]", + mutation.operation, + format_id_short(event.primary_key), + level, + ) diff --git a/octopoes/octopoes/repositories/scan_profile_repository.py b/octopoes/octopoes/repositories/scan_profile_repository.py index 7cd9a2dde6a..ff27aa6b46b 100644 --- a/octopoes/octopoes/repositories/scan_profile_repository.py +++ b/octopoes/octopoes/repositories/scan_profile_repository.py @@ -108,6 +108,7 @@ def save( event = ScanProfileDBEvent( operation_type=OperationType.CREATE if old_scan_profile is None else OperationType.UPDATE, valid_time=valid_time, + reference=new_scan_profile.reference, old_data=old_scan_profile, new_data=new_scan_profile, ) @@ -118,6 +119,7 @@ def delete(self, scan_profile: ScanProfileBase, valid_time: datetime) -> None: event = ScanProfileDBEvent( operation_type=OperationType.DELETE, + reference=scan_profile.reference, valid_time=valid_time, old_data=scan_profile, ) diff --git a/octopoes/tests/conftest.py b/octopoes/tests/conftest.py index 196ae933c39..a360fbbb167 100644 --- a/octopoes/tests/conftest.py +++ b/octopoes/tests/conftest.py @@ -4,13 +4,15 @@ from unittest.mock import Mock import pytest +from bits.runner import BitRunner from requests.adapters import HTTPAdapter, Retry from octopoes.api.api import app from octopoes.api.router import settings from octopoes.config.settings import Settings, XTDBType from octopoes.core.app import get_xtdb_client -from octopoes.models import OOI, EmptyScanProfile, Reference, ScanProfileBase +from octopoes.core.service import OctopoesService +from octopoes.models import OOI, DeclaredScanProfile, EmptyScanProfile, Reference, ScanProfileBase from octopoes.models.path import Direction, Path from octopoes.models.types import DNSZone, Hostname, IPAddressV4, Network, ResolvedHostname from octopoes.repositories.ooi_repository import OOIRepository @@ -146,6 +148,16 @@ def resolved_hostname(hostname, ipaddressv4, ooi_repository, scan_profile_reposi ) +@pytest.fixture +def empty_scan_profile(): + return EmptyScanProfile(reference="test_reference") + + +@pytest.fixture +def declared_scan_profile(): + return DeclaredScanProfile(reference="test_reference", level=2) + + @pytest.fixture def xtdbtype_multinode(): def get_settings_override(): @@ -161,6 +173,16 @@ def app_settings(): return Settings(xtdb_type=XTDBType.XTDB_MULTINODE) +@pytest.fixture +def octopoes_service() -> OctopoesService: + return OctopoesService(Mock(), Mock(), Mock(), Mock()) + + +@pytest.fixture +def bit_runner(mocker) -> BitRunner: + return mocker.patch("octopoes.core.service.BitRunner") + + @pytest.fixture def xtdb_http_client(app_settings: Settings) -> XTDBHTTPClient: client = get_xtdb_client(app_settings.xtdb_uri, "test", app_settings.xtdb_type) diff --git a/octopoes/tests/test_event_manager.py b/octopoes/tests/test_event_manager.py new file mode 100644 index 00000000000..43a29594dd3 --- /dev/null +++ b/octopoes/tests/test_event_manager.py @@ -0,0 +1,173 @@ +import uuid +from datetime import datetime + +import pika + +from octopoes.events.events import OOIDBEvent, OperationType, ScanProfileDBEvent +from octopoes.events.manager import EventManager + + +def test_event_manager_create_ooi(mocker, network): + celery_mock = mocker.Mock() + channel_mock = mocker.Mock() + + mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d") + manager = EventManager("test", celery_mock, "queue", channel_mock) + event = OOIDBEvent(operation_type=OperationType.CREATE, valid_time=datetime(2023, 1, 1), new_data=network) + manager.publish(event) + + celery_mock.send_task.assert_called_once_with( + "octopoes.tasks.tasks.handle_event", + ( + { + "entity_type": "ooi", + "operation_type": "create", + "valid_time": "2023-01-01T00:00:00", + "client": "test", + "old_data": None, + "new_data": { + "object_type": "Network", + "scan_profile": None, + "primary_key": "Network|internet", + "name": "internet", + }, + }, + ), + queue="queue", + task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d", + ) + + channel_mock.basic_publish.assert_not_called() + + +def test_event_manager_create_empty_scan_profile(mocker, empty_scan_profile): + celery_mock = mocker.Mock() + channel_mock = mocker.Mock() + + mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d") + manager = EventManager("test", celery_mock, "queue", channel_mock) + event = ScanProfileDBEvent( + operation_type=OperationType.CREATE, + valid_time=datetime(2023, 1, 1), + new_data=empty_scan_profile, + reference="test_reference", + ) + manager.publish(event) + + celery_mock.send_task.assert_called_once_with( + "octopoes.tasks.tasks.handle_event", + ( + { + "entity_type": "scan_profile", + "operation_type": "create", + "valid_time": "2023-01-01T00:00:00", + "client": "test", + "old_data": None, + "new_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}, + "reference": "test_reference", + }, + ), + queue="queue", + task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d", + ) + + channel_mock.basic_publish.assert_called_once_with( + "", + "test__scan_profile_mutations", + b'{"operation": "create", "primary_key": "test_reference", ' + b'"value": {"primary_key": "test_reference", ' + b'"object_type": "test_reference", ' + b'"scan_profile": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}}}', + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + +def test_event_manager_create_declared_scan_profile(mocker, declared_scan_profile): + celery_mock = mocker.Mock() + channel_mock = mocker.Mock() + + mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d") + manager = EventManager("test", celery_mock, "queue", channel_mock) + event = ScanProfileDBEvent( + operation_type=OperationType.CREATE, + valid_time=datetime(2023, 1, 1), + new_data=declared_scan_profile, + reference="test_reference", + ) + manager.publish(event) + + celery_mock.send_task.assert_called_once_with( + "octopoes.tasks.tasks.handle_event", + ( + { + "entity_type": "scan_profile", + "operation_type": "create", + "valid_time": "2023-01-01T00:00:00", + "client": "test", + "old_data": None, + "new_data": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}, + "reference": "test_reference", + }, + ), + queue="queue", + task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d", + ) + + assert channel_mock.basic_publish.call_count == 2 + channel_mock.basic_publish.asset_has_calls( + mocker.call( + "", + "test__scan_profile_increments", + b'{"primary_key": "test_reference", "object_type": "test_reference",' + b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}', + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ), + mocker.call( + "", + "test__scan_profile_mutations", + b'{"operation": "create", "primary_key": "test_reference", ' + b'"value": {"primary_key": "test_reference", ' + b'"object_type": "test_reference", ' + b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}}', + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ), + ) + + +def test_event_manager_delete_empty_scan_profile(mocker, empty_scan_profile): + celery_mock = mocker.Mock() + channel_mock = mocker.Mock() + + mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d") + manager = EventManager("test", celery_mock, "queue", channel_mock) + event = ScanProfileDBEvent( + operation_type=OperationType.DELETE, + valid_time=datetime(2023, 1, 1), + old_data=empty_scan_profile, + reference="test_reference", + ) + manager.publish(event) + + celery_mock.send_task.assert_called_once_with( + "octopoes.tasks.tasks.handle_event", + ( + { + "entity_type": "scan_profile", + "operation_type": "delete", + "valid_time": "2023-01-01T00:00:00", + "client": "test", + "old_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}, + "new_data": None, + "reference": "test_reference", + }, + ), + queue="queue", + task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d", + ) + + channel_mock.basic_publish.assert_called_once_with( + "", + "test__scan_profile_mutations", + b'{"operation": "delete", "primary_key": "test_reference", "value": null}', + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) diff --git a/octopoes/tests/test_octopoes_service.py b/octopoes/tests/test_octopoes_service.py index 5a7622e3f68..dfab1293601 100644 --- a/octopoes/tests/test_octopoes_service.py +++ b/octopoes/tests/test_octopoes_service.py @@ -1,13 +1,12 @@ -from datetime import datetime, timezone +from datetime import datetime from ipaddress import ip_address -from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch +import pytest from bits.definitions import BitDefinition -from octopoes.core.service import OctopoesService -from octopoes.events.events import OOIDBEvent, OperationType, OriginDBEvent -from octopoes.models import Reference +from octopoes.events.events import OOIDBEvent, OperationType, OriginDBEvent, ScanProfileDBEvent +from octopoes.models import EmptyScanProfile, Reference from octopoes.models.ooi.dns.zone import Hostname from octopoes.models.ooi.network import IPAddress, IPAddressV4, Network from octopoes.models.origin import Origin, OriginType @@ -22,88 +21,116 @@ def mocked_bit_definitions(): } -class OctopoesServiceTest(TestCase): - def setUp(self) -> None: - self.octopoes = OctopoesService(Mock(), Mock(), Mock(), Mock()) - self.valid_time = datetime.now(timezone.utc) - - def tearDown(self) -> None: - pass - - @patch("octopoes.core.service.get_bit_definitions", mocked_bit_definitions) - def test_process_ooi_create_event(self): - # upon creation of a new ooi - ooi = Hostname(network=Network(name="internet").reference, name="example.com") - self.octopoes.process_event( - OOIDBEvent( - operation_type=OperationType.CREATE, - valid_time=self.valid_time, - client="_dev", - old_data=None, - new_data=ooi, - ) +@patch("octopoes.core.service.get_bit_definitions", mocked_bit_definitions) +def test_process_ooi_create_event(octopoes_service, valid_time): + # upon creation of a new ooi + ooi = Hostname(network=Network(name="internet").reference, name="example.com") + octopoes_service.process_event( + OOIDBEvent( + operation_type=OperationType.CREATE, + valid_time=valid_time, + client="_dev", + old_data=None, + new_data=ooi, ) + ) - # octopoes should create a new origin, because there is a matching bit definition - self.octopoes.origin_repository.save.assert_called_once_with( - Origin( - origin_type=OriginType.INFERENCE, - method="fake-hostname-bit", - source=ooi.reference, - ), - self.valid_time, - ) + # octopoes should create a new origin, because there is a matching bit definition + octopoes_service.origin_repository.save.assert_called_once_with( + Origin( + origin_type=OriginType.INFERENCE, + method="fake-hostname-bit", + source=ooi.reference, + ), + valid_time, + ) - @patch("octopoes.core.service.get_bit_definitions", mocked_bit_definitions) - def test_process_event_abstract_bit_consumes(self): - # upon creation of a new ooi - ooi = IPAddressV4(network=Network(name="internet").reference, address=ip_address("1.1.1.1")) - self.octopoes.process_event( - OOIDBEvent( - operation_type=OperationType.CREATE, - valid_time=self.valid_time, - client="_dev", - old_data=None, - new_data=ooi, - ) - ) - # octopoes should create a new origin, because there is a matching bit definition (w/ abstract class) - self.octopoes.origin_repository.save.assert_called_once_with( - Origin( - origin_type=OriginType.INFERENCE, - method="fake-ipaddress-bit", - source=ooi.reference, - ), - self.valid_time, +@patch("octopoes.core.service.get_bit_definitions", mocked_bit_definitions) +def test_process_event_abstract_bit_consumes(octopoes_service, valid_time): + # upon creation of a new ooi + ooi = IPAddressV4(network=Network(name="internet").reference, address=ip_address("1.1.1.1")) + octopoes_service.process_event( + OOIDBEvent( + operation_type=OperationType.CREATE, + valid_time=valid_time, + client="_dev", + old_data=None, + new_data=ooi, ) + ) - def test_on_update_origin(self): - # when the result of an origin changes - old_data = Origin( - origin_type=OriginType.OBSERVATION, - method="test-boefje", - source=Reference.from_str("Hostname|internet|example.com"), - result=[Reference.from_str("IPAddress|internet|1.1.1.1")], - ) - new_data = Origin( - origin_type=OriginType.OBSERVATION, - method="test-boefje", + # octopoes should create a new origin, because there is a matching bit definition (w/ abstract class) + octopoes_service.origin_repository.save.assert_called_once_with( + Origin( + origin_type=OriginType.INFERENCE, + method="fake-ipaddress-bit", + source=ooi.reference, + ), + valid_time, + ) + + +def test_on_update_origin(octopoes_service, valid_time): + # when the result of an origin changes + old_data = Origin( + origin_type=OriginType.OBSERVATION, + method="test-boefje", + source=Reference.from_str("Hostname|internet|example.com"), + result=[Reference.from_str("IPAddress|internet|1.1.1.1")], + ) + new_data = Origin( + origin_type=OriginType.OBSERVATION, + method="test-boefje", + source=Reference.from_str("Hostname|internet|example.com"), + ) + event = OriginDBEvent( + operation_type=OperationType.UPDATE, + valid_time=valid_time, + client="_dev", + old_data=old_data, + new_data=new_data, + ) + + # and the deferenced ooi is no longer referred to by any origins + octopoes_service.origin_repository.list_by_result.return_value = [] + octopoes_service.process_event(event) + + # the ooi should be deleted + octopoes_service.ooi_repository.delete.assert_called_once_with( + Reference.from_str("IPAddress|internet|1.1.1.1"), valid_time + ) + + +@pytest.mark.parametrize("new_data", [EmptyScanProfile(reference="test_reference"), None]) +@pytest.mark.parametrize("old_data", [EmptyScanProfile(reference="test_reference"), None]) +def test_on_create_scan_profile(octopoes_service, new_data, old_data, bit_runner: MagicMock): + octopoes_service.origin_repository.list_by_source.return_value = [ + Origin( + origin_type=OriginType.INFERENCE, + method="check-csp-header", source=Reference.from_str("Hostname|internet|example.com"), ) - event = OriginDBEvent( - operation_type=OperationType.UPDATE, - valid_time=self.valid_time, - client="_dev", - old_data=old_data, - new_data=new_data, - ) + ] + octopoes_service.scan_profile_repository.get.return_value = Mock(level=2) + octopoes_service.ooi_repository.get.return_value = Mock() + octopoes_service.origin_parameter_repository.list_by_origin.return_value = {} + octopoes_service.ooi_repository.get_bulk.return_value = {} - # and the deferenced ooi is no longer referred to by any origins - self.octopoes.origin_repository.list_by_result.return_value = [] - self.octopoes.process_event(event) + mock_oois = [Mock(reference="test1"), Mock(reference="test2")] + bit_runner().run.return_value = mock_oois - # the ooi should be deleted - self.octopoes.ooi_repository.delete.assert_called_once_with( - Reference.from_str("IPAddress|internet|1.1.1.1"), self.valid_time - ) + valid_time = datetime(2023, 1, 1) + event = ScanProfileDBEvent( + operation_type=OperationType.CREATE, + valid_time=valid_time, + old_data=old_data, + new_data=new_data, + reference="test_reference", + ) + + octopoes_service.process_event(event) + + assert octopoes_service.ooi_repository.save.call_count == 2 + octopoes_service.ooi_repository.save.assert_any_call(mock_oois[0], valid_time=valid_time) + octopoes_service.ooi_repository.save.assert_any_call(mock_oois[1], valid_time=valid_time)