Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service for event and integration with Prefect event #4493

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/infrahub/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .models import EventMeta, InfrahubEvent
from .node_action import NodeMutatedEvent

__all__ = ["EventMeta", "InfrahubEvent", "NodeMutatedEvent"]
1 change: 1 addition & 0 deletions backend/infrahub/events/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
EVENT_NAMESPACE = "infrahub"
93 changes: 93 additions & 0 deletions backend/infrahub/events/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Any

from pydantic import BaseModel, Field

from infrahub.message_bus import InfrahubMessage, Meta

from .constants import EVENT_NAMESPACE


class EventMeta(BaseModel):
request_id: str = ""
account_id: str = ""
initiator_id: str | None = Field(
default=None, description="The worker identity of the initial sender of this message"
)


class InfrahubEvent(BaseModel):
meta: EventMeta | None = None

def get_event_namespace(self) -> str:
return EVENT_NAMESPACE

def get_name(self) -> str:
return f"{self.get_event_namespace()}.unknown"

def get_resource(self) -> dict[str, str]:
raise NotImplementedError

def get_message(self) -> InfrahubMessage:
raise NotImplementedError

def get_related(self) -> list[dict[str, str]]:
related: list[dict[str, str]] = []

if not self.meta:
return related

if self.meta.account_id:
related.append(
{
"prefect.resource.id": f"infrahub.account.{self.meta.account_id}",
"prefect.resource.role": "account",
}
)

if self.meta.request_id:
related.append(
{
"prefect.resource.id": f"infrahub.request.{self.meta.request_id}",
"prefect.resource.role": "request",
}
)

if self.meta.initiator_id:
related.append(
{
"prefect.resource.id": f"infrahub.source.{self.meta.initiator_id}",
"prefect.resource.role": "event_source",
}
)

return related

def get_payload(self) -> dict[str, Any]:
return {}

def get_message_meta(self) -> Meta:
meta = Meta()
if not self.meta:
return meta

if self.meta.initiator_id:
meta.initiator_id = self.meta.initiator_id
if self.meta.request_id:
meta.initiator_id = self.meta.request_id

return meta


class InfrahubBranchEvent(InfrahubEvent): # pylint: disable=abstract-method
branch: str = Field(..., description="The branch on which the event happend")

def get_related(self) -> list[dict[str, str]]:
related = super().get_related()
related.append(
{
"prefect.resource.id": "infrahub.branch",
"prefect.resource.name": self.branch,
"prefect.resource.role": "branch",
}
)
return related
40 changes: 40 additions & 0 deletions backend/infrahub/events/node_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Any

from pydantic import Field

from infrahub.core.constants import MutationAction
from infrahub.message_bus.messages.event_node_mutated import EventNodeMutated

from .models import InfrahubBranchEvent


class NodeMutatedEvent(InfrahubBranchEvent):
"""Event generated when a node has been mutated"""

kind: str = Field(..., description="The type of object modified")
node_id: str = Field(..., description="The ID of the mutated node")
action: MutationAction = Field(..., description="The action taken on the node")
data: dict[str, Any] = Field(..., description="Data on modified object")

def get_name(self) -> str:
return f"{self.get_event_namespace()}.node.{self.action.value}"

def get_resource(self) -> dict[str, str]:
return {
"prefect.resource.id": f"infrahub.node.{self.node_id}",
"infrahub.node.kind": self.kind,
"infrahub.node.action": self.action.value,
}

def get_payload(self) -> dict[str, Any]:
return self.data

def get_message(self) -> EventNodeMutated:
return EventNodeMutated(
branch=self.branch,
kind=self.kind,
node_id=self.node_id,
action=self.action.value,
data=self.data,
meta=self.get_message_meta(),
)
12 changes: 6 additions & 6 deletions backend/infrahub/graphql/mutations/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
from infrahub.core.timestamp import Timestamp
from infrahub.database import retry_db_transaction
from infrahub.dependencies.registry import get_component_registry
from infrahub.events import EventMeta, NodeMutatedEvent
from infrahub.exceptions import ValidationError
from infrahub.log import get_log_data, get_logger
from infrahub.message_bus import Meta, messages
from infrahub.services import services
from infrahub.worker import WORKER_IDENTITY

from .node_getter.by_default_filter import MutationNodeGetterByDefaultFilter
Expand Down Expand Up @@ -101,15 +100,16 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, *args: Any, **kwargs

data = await obj.to_graphql(db=context.db, filter_sensitive=True)

message = messages.EventNodeMutated(
event = NodeMutatedEvent(
branch=context.branch.name,
kind=obj._schema.kind,
node_id=obj.id,
data=data,
action=action.value,
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=request_id),
action=action,
meta=EventMeta(initiator_id=WORKER_IDENTITY, request_id=request_id),
)
context.background.add_task(services.send, message)

context.background.add_task(context.service.event.send, event)

return mutation

Expand Down
4 changes: 4 additions & 0 deletions backend/infrahub/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from infrahub.message_bus.types import MessageTTL

from .adapters.cache import InfrahubCache
from .adapters.event import InfrahubEventService
from .adapters.http import InfrahubHTTP
from .adapters.http.httpx import HttpxAdapter
from .adapters.message_bus import InfrahubMessageBus
Expand All @@ -31,6 +32,7 @@ def __init__(
message_bus: Optional[InfrahubMessageBus] = None,
http: InfrahubHTTP | None = None,
workflow: Optional[InfrahubWorkflow] = None,
event: InfrahubEventService | None = None,
log: Optional[InfrahubLogger] = None,
component_type: Optional[ComponentType] = None,
):
Expand All @@ -39,6 +41,7 @@ def __init__(
self._database = database
self.message_bus = message_bus or InfrahubMessageBus()
self.workflow = workflow or InfrahubWorkflow()
self.event = event or InfrahubEventService()
self.log = log or get_logger()
self.component_type = component_type or ComponentType.NONE
self.http = http or HttpxAdapter()
Expand Down Expand Up @@ -103,6 +106,7 @@ async def initialize(self) -> None:
await self.cache.initialize(service=self)
await self.scheduler.initialize(service=self)
await self.workflow.initialize(service=self)
await self.event.initialize(service=self)

async def shutdown(self) -> None:
"""Initialize the Services"""
Expand Down
46 changes: 46 additions & 0 deletions backend/infrahub/services/adapters/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from prefect.events import emit_event

from infrahub.exceptions import InitializationError

if TYPE_CHECKING:
from infrahub.events import InfrahubEvent
from infrahub.services import InfrahubServices


class InfrahubEventService:
"""Base class for infrahub event service"""

def __init__(self) -> None:
self._service: InfrahubServices | None = None

@property
def service(self) -> InfrahubServices:
if not self._service:
raise InitializationError("Event is not initialized with a service")

return self._service

async def initialize(self, service: InfrahubServices) -> None:
"""Initialize the event service"""
self._service = service

async def send(self, event: InfrahubEvent) -> None:
tasks = [self._send_bus(event=event), self._send_prefect(event=event)]
await asyncio.gather(*tasks)

async def _send_bus(self, event: InfrahubEvent) -> None:
message = event.get_message()
await self.service.send(message=message)

async def _send_prefect(self, event: InfrahubEvent) -> None:
emit_event(
event=event.get_name(),
resource=event.get_resource(),
related=event.get_related(),
payload=event.get_payload(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit problematic, is there an async version of emit_event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I could find quickly but as far as I understand they are sending the event in a non blocking way. I'm planning to dive deeper into it soon.
We may have to create our own function to send the event but for now since we are sending the events in a background task I don't think it will make a big different

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right the background tasks are run in a separate thread regardless. But we'll have to keep it in mind if used elsewhere, though if Prefects emit_event does the same it should be fine.

Loading