diff --git a/backend/infrahub/message_bus/operations/transform/jinja.py b/backend/infrahub/message_bus/operations/transform/jinja.py index f3ca53ab4a..d3166cf3e6 100644 --- a/backend/infrahub/message_bus/operations/transform/jinja.py +++ b/backend/infrahub/message_bus/operations/transform/jinja.py @@ -33,7 +33,7 @@ async def template(message: TransformJinjaTemplate, service: InfrahubServices) - await service.reply(message=response, initiator=message) -@flow +@flow(persist_result=True) async def transform_render_jinja2_template(message: TransformJinjaTemplateData) -> str: service = services.service repo = await get_initialized_repo( diff --git a/backend/infrahub/services/adapters/workflow/worker.py b/backend/infrahub/services/adapters/workflow/worker.py index 668b9e1700..e51c17e3d7 100644 --- a/backend/infrahub/services/adapters/workflow/worker.py +++ b/backend/infrahub/services/adapters/workflow/worker.py @@ -1,11 +1,7 @@ from __future__ import annotations -import base64 -import json -from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable -import cloudpickle from prefect.client.orchestration import get_client from prefect.client.schemas.actions import WorkPoolCreate from prefect.deployments import run_deployment @@ -54,16 +50,7 @@ async def execute( response: FlowRun = await run_deployment(name=workflow.full_name, parameters=kwargs or {}) # type: ignore[return-value, misc] if not response.state: raise RuntimeError("Unable to read state from the response") - result = response.state.result() - - with Path(result.storage_key).open(encoding="utf-8") as f: - result_data = json.load(f) - encoded_data = result_data["data"] - decoded_data = base64.b64decode(encoded_data) - - if result_data["serializer"]["type"] == "pickle": - return cloudpickle.loads(decoded_data) - raise ValueError("Unsupported serializer type") + return await response.state.result(fetch=True, raise_on_failure=True) # type: ignore[call-overload] if function: return await function(**kwargs) diff --git a/backend/infrahub/workers/infrahub_async.py b/backend/infrahub/workers/infrahub_async.py index 63d14efcb4..de08b11197 100644 --- a/backend/infrahub/workers/infrahub_async.py +++ b/backend/infrahub/workers/infrahub_async.py @@ -1,5 +1,6 @@ import importlib import logging +import os from typing import Any, Optional import typer @@ -57,7 +58,8 @@ async def setup(self, **kwargs: dict[str, Any]) -> None: logging.getLogger("aiormq").setLevel(logging.ERROR) logging.getLogger("git").setLevel(logging.ERROR) - config.load_and_exit() + config_file = os.environ.get("INFRAHUB_CONFIG", "infrahub.toml") + config.load_and_exit(config_file_name=config_file) self._logger.debug(f"Using Infrahub API at {config.SETTINGS.main.internal_address}") client = InfrahubClient(