Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Ipam reconciliation task to prefect #4640

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions backend/infrahub/core/ipam/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import ipaddress
from typing import TYPE_CHECKING

from prefect import flow

from infrahub.core import registry
from infrahub.core.ipam.reconciler import IpamReconciler
from infrahub.services import services

from .model import IpamNodeDetails

if TYPE_CHECKING:
from infrahub.core.ipam.constants import AllIPTypes


@flow(name="ipam-reconciliation")
async def ipam_reconciliation(branch: str, ipam_node_details: list[IpamNodeDetails]) -> None:
service = services.service
branch_obj = await registry.get_branch(db=service.database, branch=branch)
ipam_reconciler = IpamReconciler(db=service.database, branch=branch_obj)

for ipam_node_detail_item in ipam_node_details:
if ipam_node_detail_item.is_address:
ip_value: AllIPTypes = ipaddress.ip_interface(ipam_node_detail_item.ip_value)
else:
ip_value = ipaddress.ip_network(ipam_node_detail_item.ip_value)
await ipam_reconciler.reconcile(
ip_value=ip_value,
namespace=ipam_node_detail_item.namespace_id,
node_uuid=ipam_node_detail_item.node_uuid,
is_delete=ipam_node_detail_item.is_delete,
)
2 changes: 0 additions & 2 deletions backend/infrahub/message_bus/messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from .transform_python_data import TransformPythonData, TransformPythonDataResponse
from .trigger_artifact_definition_generate import TriggerArtifactDefinitionGenerate
from .trigger_generatordefinition_run import TriggerGeneratorDefinitionRun
from .trigger_ipam_reconciliation import TriggerIpamReconciliation
from .trigger_proposed_change_cancel import TriggerProposedChangeCancel
from .trigger_webhook_actions import TriggerWebhookActions

Expand Down Expand Up @@ -115,7 +114,6 @@
"transform.python.data": TransformPythonData,
"trigger.artifact_definition.generate": TriggerArtifactDefinitionGenerate,
"trigger.generator_definition.run": TriggerGeneratorDefinitionRun,
"trigger.ipam.reconciliation": TriggerIpamReconciliation,
"trigger.proposed_change.cancel": TriggerProposedChangeCancel,
"trigger.webhook.actions": TriggerWebhookActions,
}
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
"transform.python.data": transform.python.data,
"trigger.artifact_definition.generate": trigger.artifact_definition.generate,
"trigger.generator_definition.run": trigger.generator_definition.run,
"trigger.ipam.reconciliation": trigger.ipam.reconciliation,
"trigger.proposed_change.cancel": trigger.proposed_change.cancel,
"trigger.webhook.actions": trigger.webhook.actions,
}
Expand Down
12 changes: 9 additions & 3 deletions backend/infrahub/message_bus/operations/event/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from infrahub.log import get_logger
from infrahub.message_bus import InfrahubMessage, messages
from infrahub.services import InfrahubServices
from infrahub.workflows.catalogue import IPAM_RECONCILIATION

log = get_logger()

Expand Down Expand Up @@ -46,7 +47,6 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -

events: List[InfrahubMessage] = [
messages.RefreshRegistryBranches(),
messages.TriggerIpamReconciliation(branch=message.target_branch, ipam_node_details=message.ipam_node_details),
messages.TriggerArtifactDefinitionGenerate(branch=message.target_branch),
messages.TriggerGeneratorDefinitionRun(branch=message.target_branch),
]
Expand All @@ -56,6 +56,11 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -
# send diff update requests for every branch-tracking diff
branch_diff_roots = await diff_repository.get_empty_roots(base_branch_names=[message.target_branch])

await service.workflow.submit_workflow(
workflow=IPAM_RECONCILIATION,
parameters={"branch": message.target_branch, "ipam_node_details": message.ipam_node_details},
)

for diff_root in branch_diff_roots:
if (
diff_root.base_branch_name != diff_root.diff_branch_name
Expand All @@ -77,8 +82,9 @@ async def rebased(message: messages.EventBranchRebased, service: InfrahubService
messages.RefreshRegistryRebasedBranch(branch=message.branch),
]
if message.ipam_node_details:
events.append(
messages.TriggerIpamReconciliation(branch=message.branch, ipam_node_details=message.ipam_node_details),
await service.workflow.submit_workflow(
workflow=IPAM_RECONCILIATION,
parameters={"branch": message.branch, "ipam_node_details": message.ipam_node_details},
)

# for every diff that touches the rebased branch, recalculate it
Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/message_bus/operations/trigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import artifact_definition, generator_definition, ipam, proposed_change, webhook
from . import artifact_definition, generator_definition, proposed_change, webhook

__all__ = ["artifact_definition", "generator_definition", "ipam", "proposed_change", "webhook"]
__all__ = ["artifact_definition", "generator_definition", "proposed_change", "webhook"]
33 changes: 0 additions & 33 deletions backend/infrahub/message_bus/operations/trigger/ipam.py

This file was deleted.

2 changes: 1 addition & 1 deletion backend/infrahub/services/adapters/workflow/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ async def submit_workflow(
workflow: WorkflowDefinition,
parameters: dict[str, Any] | None = None,
) -> WorkflowInfo:
workflow.get_function()
await self.execute_workflow(workflow=workflow, parameters=parameters)
return WorkflowInfo(id=uuid.uuid4())
8 changes: 8 additions & 0 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
function="schema_validate_migrations",
)

IPAM_RECONCILIATION = WorkflowDefinition(
name="ipam_reconciliation",
type=WorkflowType.INTERNAL,
module="infrahub.core.ipam.tasks",
function="ipam_reconciliation",
)

worker_pools = [INFRAHUB_WORKER_POOL]

workflows = [
Expand All @@ -49,4 +56,5 @@
ANONYMOUS_TELEMETRY_SEND,
SCHEMA_APPLY_MIGRATION,
SCHEMA_VALIDATE_MIGRATION,
IPAM_RECONCILIATION,
]
5 changes: 3 additions & 2 deletions backend/tests/adapters/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from infrahub.message_bus.types import MessageTTL
from infrahub.services import InfrahubServices
from infrahub.services.adapters.message_bus import InfrahubMessageBus
from infrahub.services.adapters.workflow import InfrahubWorkflow

ResponseClass = TypeVar("ResponseClass")

Expand All @@ -36,10 +37,10 @@ def seen_routing_keys(self) -> list[str]:


class BusSimulator(InfrahubMessageBus):
def __init__(self, database: Optional[InfrahubDatabase] = None) -> None:
def __init__(self, database: InfrahubDatabase | None = None, workflow: InfrahubWorkflow | None = None) -> None:
self.messages: list[InfrahubMessage] = []
self.messages_per_routing_key: dict[str, list[InfrahubMessage]] = {}
self.service: InfrahubServices = InfrahubServices(database=database, message_bus=self)
self.service: InfrahubServices = InfrahubServices(database=database, message_bus=self, workflow=workflow)
self.replies: dict[str, list[InfrahubMessage]] = defaultdict(list)
build_component_registry()

Expand Down
2 changes: 1 addition & 1 deletion backend/tests/helpers/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def api_token(self) -> str:

@pytest.fixture(scope="class")
def bus_simulator(self, db: InfrahubDatabase) -> Generator[BusSimulator, None, None]:
bus = BusSimulator(database=db)
bus = BusSimulator(database=db, workflow=WorkflowLocalExecution())
original = config.OVERRIDE.message_bus
config.OVERRIDE.message_bus = bus
yield bus
Expand Down
21 changes: 11 additions & 10 deletions backend/tests/unit/message_bus/operations/event/test_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from infrahub.message_bus import messages
from infrahub.message_bus.operations.event.branch import delete, merge, rebased
from infrahub.services import InfrahubServices
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from tests.adapters.message_bus import BusRecorder


async def test_delete():
async def test_delete(prefect_test_fixture):
"""Validate that a deleted branch triggers a registry refresh and cancels open proposed changes"""

message = messages.EventBranchDelete(
Expand All @@ -31,7 +32,7 @@ async def test_delete():
assert trigger_cancel.branch == "cr1234"


async def test_merged(default_branch: Branch):
async def test_merged(default_branch: Branch, prefect_test_fixture):
source_branch_name = "cr1234"
target_branch_name = "main"
right_now = Timestamp()
Expand All @@ -41,7 +42,8 @@ async def test_merged(default_branch: Branch):

recorder = BusRecorder()
database = MagicMock()
service = InfrahubServices(message_bus=recorder, database=database)
workflow = WorkflowLocalExecution()
service = InfrahubServices(message_bus=recorder, database=database, workflow=workflow)
tracked_diff_roots = [
EnrichedDiffRoot(
base_branch_name=target_branch_name,
Expand Down Expand Up @@ -76,16 +78,15 @@ async def test_merged(default_branch: Branch):

mock_component_registry.get_component.assert_awaited_once_with(DiffRepository, db=database, branch=default_branch)
diff_repo.get_empty_roots.assert_awaited_once_with(base_branch_names=[target_branch_name])
assert len(recorder.messages) == 6
assert len(recorder.messages) == 5
assert recorder.messages[0] == messages.RefreshRegistryBranches()
assert recorder.messages[1] == messages.TriggerIpamReconciliation(branch=target_branch_name, ipam_node_details=[])
assert recorder.messages[2] == messages.TriggerArtifactDefinitionGenerate(branch=target_branch_name)
assert recorder.messages[3] == messages.TriggerGeneratorDefinitionRun(branch=target_branch_name)
assert recorder.messages[4] == messages.RequestDiffUpdate(branch_name=tracked_diff_roots[0].diff_branch_name)
assert recorder.messages[5] == messages.RequestDiffUpdate(branch_name=tracked_diff_roots[1].diff_branch_name)
assert recorder.messages[1] == messages.TriggerArtifactDefinitionGenerate(branch=target_branch_name)
assert recorder.messages[2] == messages.TriggerGeneratorDefinitionRun(branch=target_branch_name)
assert recorder.messages[3] == messages.RequestDiffUpdate(branch_name=tracked_diff_roots[0].diff_branch_name)
assert recorder.messages[4] == messages.RequestDiffUpdate(branch_name=tracked_diff_roots[1].diff_branch_name)


async def test_rebased(default_branch: Branch):
async def test_rebased(default_branch: Branch, prefect_test_fixture):
"""Validate that a rebased branch triggers a registry refresh and cancels open proposed changes"""
branch_name = "cr1234"
right_now = Timestamp()
Expand Down
41 changes: 0 additions & 41 deletions docs/docs/reference/message-bus-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1162,26 +1162,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **branch** | The branch to run the Generators in | string | None |
<!-- vale on -->

<!-- vale off -->
### Trigger Ipam
<!-- vale on -->

<!-- vale off -->
#### Event trigger.ipam.reconciliation
<!-- vale on -->

**Description**: Sent after a branch has been merged/rebased to reconcile changed IP Prefix and Address nodes

**Priority**: 3

<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **branch** | The updated branch | string | None |
| **ipam_node_details** | Details for changed IP nodes | array | None |
<!-- vale on -->

<!-- vale off -->
### Trigger Proposed Change
<!-- vale on -->
Expand Down Expand Up @@ -2429,27 +2409,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **branch** | The branch to run the Generators in | string | None |
<!-- vale on -->

<!-- vale off -->
### Trigger Ipam
<!-- vale on -->

<!-- vale off -->
#### Event trigger.ipam.reconciliation
<!-- vale on -->

**Description**: Sent after a branch has been merged/rebased to reconcile changed IP Prefix and Address nodes

**Priority**: 3


<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **branch** | The updated branch | string | None |
| **ipam_node_details** | Details for changed IP nodes | array | None |
<!-- vale on -->

<!-- vale off -->
### Trigger Proposed Change
<!-- vale on -->
Expand Down
Loading