diff --git a/backend/workflow_manager/workflow/execution.py b/backend/workflow_manager/workflow/execution.py index 3de3bec0a..764f3ed4f 100644 --- a/backend/workflow_manager/workflow/execution.py +++ b/backend/workflow_manager/workflow/execution.py @@ -209,7 +209,7 @@ def build(self) -> None: ) raise WorkflowExecutionError(self.compilation_result["problems"][0]) - def execute(self, single_step: bool = False) -> None: + def execute(self, run_id: str, file_name: str, single_step: bool = False) -> None: execution_type = ExecutionType.COMPLETE if single_step: execution_type = ExecutionType.STEP @@ -230,7 +230,9 @@ def execute(self, single_step: bool = False) -> None: start_time = time.time() try: - self.execute_workflow(execution_type=execution_type) + self.execute_workflow( + run_id=run_id, file_name=file_name, execution_type=execution_type + ) end_time = time.time() execution_time = end_time - start_time except StopExecution as exception: @@ -302,44 +304,32 @@ def publish_initial_tool_execution_logs( def execute_input_file( self, + run_id: str, file_name: str, single_step: bool, - ) -> tuple[bool, bool]: + ) -> None: """Executes the input file. Args: + run_id (str): UUID for a single run of a file file_name (str): The name of the file to be executed. single_step (bool): Flag indicating whether to execute in single step mode. - Returns: - tuple[bool, bool]: Flag indicating whether the file was executed - and skipped. """ execution_type = ExecutionType.COMPLETE if single_step: execution_type = ExecutionType.STEP - self.execute_uncached_input(file_name=file_name, single_step=single_step) - self.publish_log(f"Tool executed successfully for '{file_name}'") - self._handle_execution_type(execution_type) - - def execute_uncached_input(self, file_name: str, single_step: bool) -> None: - """Executes the uncached input file. - - Args: - file_name (str): The name of the file to be executed. - single_step (bool): Flag indicating whether to execute in - single step mode. - - Returns: - None - """ - self.publish_log("No entries found in cache, executing the tools") + self.publish_log( + "No entries found in cache, " f"running the tool(s) for {file_name}" + ) self.publish_update_log( state=LogState.SUCCESS, message=f"{file_name} Sent for execution", component=LogComponent.SOURCE, ) - self.execute(single_step) + self.execute(run_id, file_name, single_step) + self.publish_log(f"Tool executed successfully for '{file_name}'") + self._handle_execution_type(execution_type) def initiate_tool_execution( self, diff --git a/backend/workflow_manager/workflow/workflow_helper.py b/backend/workflow_manager/workflow/workflow_helper.py index b934cc8dd..fbc4a560a 100644 --- a/backend/workflow_manager/workflow/workflow_helper.py +++ b/backend/workflow_manager/workflow/workflow_helper.py @@ -3,6 +3,7 @@ import os import traceback from typing import Any, Optional +from uuid import uuid4 from account.constants import Common from account.models import Organization @@ -194,7 +195,11 @@ def process_file( current_file_idx, total_files, file_name, single_step ) if not file_hash.is_executed: + # Multiple run_ids are linked to an execution_id + # Each run_id corresponds to workflow runs for a single file + run_id = str(uuid4()) execution_service.execute_input_file( + run_id=run_id, file_name=file_name, single_step=single_step, ) diff --git a/docker/docker-compose.build.yaml b/docker/docker-compose.build.yaml index 14673e0de..a0f6343fa 100644 --- a/docker/docker-compose.build.yaml +++ b/docker/docker-compose.build.yaml @@ -9,7 +9,7 @@ services: build: dockerfile: docker/dockerfiles/backend.Dockerfile context: .. - worker: + runner: image: unstract/worker:${VERSION} build: dockerfile: docker/dockerfiles/worker.Dockerfile diff --git a/unstract/core/src/unstract/core/utilities.py b/unstract/core/src/unstract/core/utilities.py index 0364d28d9..ec3c81d14 100644 --- a/unstract/core/src/unstract/core/utilities.py +++ b/unstract/core/src/unstract/core/utilities.py @@ -23,3 +23,9 @@ def get_env(env_key: str, default: Optional[str] = None, raise_err=False) -> str if env_value is None or env_value == "" and raise_err: raise RuntimeError(f"Env variable {env_key} is required") return env_value + + @staticmethod + def build_tool_container_name( + tool_image: str, tool_version: str, run_id: str + ) -> str: + return f"{tool_image.split('/')[-1]}-{tool_version}-{run_id}" diff --git a/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py b/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py index 480fd4841..dbc1cb6a4 100644 --- a/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py +++ b/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py @@ -69,6 +69,7 @@ def make_get_request( def call_tool_handler( self, + run_id: str, image_name: str, image_tag: str, settings: dict[str, Any], @@ -86,6 +87,7 @@ def call_tool_handler( """ url = f"{self.base_url}{UnstractWorker.RUN_API_ENDPOINT}" data = self.create_tool_request_data( + run_id, image_name, image_tag, settings, @@ -108,6 +110,7 @@ def call_tool_handler( def create_tool_request_data( self, + run_id: str, image_name: str, image_tag: str, settings: dict[str, Any], @@ -118,6 +121,7 @@ def create_tool_request_data( "organization_id": self.organization_id, "workflow_id": self.workflow_id, "execution_id": self.execution_id, + "run_id": run_id, "settings": settings, "envs": self.envs, "messaging_channel": self.messaging_channel, diff --git a/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py b/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py index d878ce090..193c385fd 100644 --- a/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py +++ b/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py @@ -93,8 +93,9 @@ def get_variables(self) -> Optional[dict[str, Any]]: ) return result - def run_tool(self) -> Optional[dict[str, Any]]: + def run_tool(self, run_id: str) -> Optional[dict[str, Any]]: return self.helper.call_tool_handler( # type: ignore + run_id, self.image_name, self.image_tag, self.settings, diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py b/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py index 705225b5e..ba7287f19 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py @@ -168,19 +168,21 @@ def check_tools_are_available(self, tool_ids: list[str]) -> bool: def run_tool( self, + run_id: str, tool_sandbox: ToolSandbox, ) -> Any: - return self.run_tool_with_retry(tool_sandbox) + return self.run_tool_with_retry(run_id, tool_sandbox) def run_tool_with_retry( self, + run_id: str, tool_sandbox: ToolSandbox, max_retries: int = ToolExecution.MAXIMUM_RETRY, ) -> Any: error: Optional[dict[str, Any]] = None for retry_count in range(max_retries): try: - response = tool_sandbox.run_tool() + response = tool_sandbox.run_tool(run_id) if response: return response logger.warning( diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py index 1e9f67cff..77075ad1e 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py @@ -24,6 +24,7 @@ from unstract.workflow_execution.tools_utils import ToolsUtils from unstract.core.pubsub_helper import LogPublisher +from unstract.core.utilities import UnstractUtils logger = logging.getLogger(__name__) @@ -137,11 +138,15 @@ def build_workflow(self) -> None: logger.info(f"Execution {self.execution_id}: Build completed") - def execute_workflow(self, execution_type: ExecutionType) -> None: + def execute_workflow( + self, run_id: str, file_name: str, execution_type: ExecutionType + ) -> None: """Executes the complete workflow by running each tools one by one. Returns the result from final tool in a dictionary. Args: + run_id (str): UUID for a single run of a file + file_name (str): Name of the file to process execution_type (ExecutionType): STEP or COMPLETE Raises: @@ -157,8 +162,22 @@ def execute_workflow(self, execution_type: ExecutionType) -> None: self._initialize_execution() total_steps = len(self.tool_sandboxes) self.total_steps = total_steps + # Currently each tool is run serially for files and workflows contain 1 tool + # only. While supporting more tools in a workflow, correct the tool container + # name to avoid conflicts. for step, sandbox in enumerate(self.tool_sandboxes): + container_name = UnstractUtils.build_tool_container_name( + tool_image=sandbox.image_name, + tool_version=sandbox.image_tag, + run_id=run_id, + ) + logger.info( + f"Running execution: '{self.execution_id}', " + f"tool: '{sandbox.image_name}:{sandbox.image_tag}', " + f"file '{file_name}', container: '{container_name}'" + ) self._execute_step( + run_id=run_id, step=step, sandbox=sandbox, ) @@ -166,12 +185,14 @@ def execute_workflow(self, execution_type: ExecutionType) -> None: def _execute_step( self, + run_id: str, step: int, sandbox: ToolSandbox, ) -> None: """Execution of workflow step. Args: + run_id (str): UUID for a single run of a file step (int): workflow step sandbox (ToolSandbox): instance of tool sandbox execution_type (ExecutionType): step or complete @@ -186,7 +207,8 @@ def _execute_step( tool_uid = sandbox.get_tool_uid() tool_instance_id = sandbox.get_tool_instance_id() log_message = f"Executing step {actual_step} with tool {tool_uid}" - logger.info(f"Execution {self.execution_id}: {log_message}") + logger.info(f"Execution {self.execution_id}, Run {run_id}: {log_message}") + # TODO: Mention run_id in the FE logs / components self.publish_log( log_message, step=actual_step, @@ -199,13 +221,14 @@ def _execute_step( message="Ready for execution", component=tool_instance_id, ) - result = self.tool_utils.run_tool(tool_sandbox=sandbox) + result = self.tool_utils.run_tool(run_id=run_id, tool_sandbox=sandbox) if result and result.get("error"): raise ToolOutputNotFoundException(result.get("error")) if not self.validate_execution_result(step + 1): raise ToolOutputNotFoundException( - f"Tool exception raised for tool {tool_uid}, " - "check logs for more information" + f"Error running tool '{tool_uid}' for run " + f"'{run_id}' of execution '{self.execution_id}'. " + "Check logs for more information" ) log_message = f"Step {actual_step} executed successfully" self.publish_update_log( @@ -219,7 +242,6 @@ def _execute_step( # TODO: Catch specific workflow execution error to avoid showing pythonic error except Exception as error: - self.publish_log(str(error), LogLevel.ERROR, step=actual_step) self.publish_update_log( state=LogState.ERROR, message="Failed to execute", diff --git a/worker/pyproject.toml b/worker/pyproject.toml index 2ac93a7a3..4656d1dd8 100644 --- a/worker/pyproject.toml +++ b/worker/pyproject.toml @@ -37,6 +37,11 @@ dev = [ ] [tool.pdm.scripts] +# Runs the flask service runner.cmd = "flask --app src/unstract/worker/main.py run --port 5002" runner.env_file = ".env" runner.help = "Runs the Unstract tool runner" +# Run tests for this service +test.cmd = "pytest -s -v" +test.env_file = "tests/.env" +test.help = "Runs pytests for the Unstract tool runner" diff --git a/worker/src/unstract/worker/clients/docker.py b/worker/src/unstract/worker/clients/docker.py index 8e8817bb7..537023b17 100644 --- a/worker/src/unstract/worker/clients/docker.py +++ b/worker/src/unstract/worker/clients/docker.py @@ -5,7 +5,6 @@ from docker.errors import APIError, ImageNotFound from docker.models.containers import Container -from unstract.worker.clients.helper import ContainerClientHelper from unstract.worker.clients.interface import ( ContainerClientInterface, ContainerInterface, @@ -14,6 +13,7 @@ from unstract.worker.utils import Utils from docker import DockerClient +from unstract.core.utilities import UnstractUtils class DockerContainer(ContainerInterface): @@ -161,6 +161,7 @@ def get_container_run_config( organization_id: str, workflow_id: str, execution_id: str, + run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, ) -> dict[str, Any]: @@ -188,7 +189,9 @@ def get_container_run_config( execution_id, ) return { - "name": ContainerClientHelper.normalize_container_name(self.image_name), + "name": UnstractUtils.build_tool_container_name( + tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id + ), "image": self.get_image(), "command": command, "detach": True, diff --git a/worker/src/unstract/worker/clients/helper.py b/worker/src/unstract/worker/clients/helper.py index f796987ad..993106a76 100644 --- a/worker/src/unstract/worker/clients/helper.py +++ b/worker/src/unstract/worker/clients/helper.py @@ -1,6 +1,5 @@ import logging import os -import uuid from importlib import import_module from .interface import ContainerClientInterface @@ -16,7 +15,3 @@ def get_container_client() -> ContainerClientInterface: ) logger.info("Loading the container client from path:", client_path) return import_module(client_path).Client - - @staticmethod - def normalize_container_name(name: str) -> str: - return name.replace("/", "-") + "-" + str(uuid.uuid4()) diff --git a/worker/src/unstract/worker/clients/interface.py b/worker/src/unstract/worker/clients/interface.py index fa731e247..231a573d8 100644 --- a/worker/src/unstract/worker/clients/interface.py +++ b/worker/src/unstract/worker/clients/interface.py @@ -65,6 +65,7 @@ def get_container_run_config( organization_id: str, workflow_id: str, execution_id: str, + run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, ) -> dict[str, Any]: diff --git a/worker/src/unstract/worker/clients/test_docker.py b/worker/src/unstract/worker/clients/test_docker.py index 3f6fe2290..6c587a78d 100644 --- a/worker/src/unstract/worker/clients/test_docker.py +++ b/worker/src/unstract/worker/clients/test_docker.py @@ -114,10 +114,11 @@ def test_get_container_run_config(docker_client, mocker): organization_id = "org123" workflow_id = "wf123" execution_id = "ex123" + run_id = "run123" mocker.patch.object(docker_client, "_Client__image_exists", return_value=True) mocker_normalize = mocker.patch( - f"{DOCKER_MODULE}.ContainerClientHelper.normalize_container_name", + "unstract.core.utilities.UnstractUtils.build_tool_container_name", return_value="test-image", ) config = docker_client.get_container_run_config( @@ -125,11 +126,14 @@ def test_get_container_run_config(docker_client, mocker): organization_id, workflow_id, execution_id, + run_id, envs={"KEY": "VALUE"}, auto_remove=True, ) - mocker_normalize.assert_called_once_with("test-image") + mocker_normalize.assert_called_once_with( + tool_image="test-image", tool_version="latest", run_id=run_id + ) assert config["name"] == "test-image" assert config["image"] == "test-image:latest" assert config["command"] == ["echo", "hello"] @@ -150,21 +154,26 @@ def test_get_container_run_config_without_mount(docker_client, mocker): """Test the get_container_run_config method.""" os.environ[Env.WORKFLOW_DATA_DIR] = "/source" command = ["echo", "hello"] + execution_id = "ex123" + run_id = "run123" mocker.patch.object(docker_client, "_Client__image_exists", return_value=True) mocker_normalize = mocker.patch( - f"{DOCKER_MODULE}.ContainerClientHelper.normalize_container_name", + "unstract.core.utilities.UnstractUtils.build_tool_container_name", return_value="test-image", ) config = docker_client.get_container_run_config( command, None, None, - None, + execution_id, + run_id, auto_remove=True, ) - mocker_normalize.assert_called_once_with("test-image") + mocker_normalize.assert_called_once_with( + tool_image="test-image", tool_version="latest", run_id=run_id + ) assert config["name"] == "test-image" assert config["image"] == "test-image:latest" assert config["command"] == ["echo", "hello"] diff --git a/worker/src/unstract/worker/main.py b/worker/src/unstract/worker/main.py index f2e2915d4..496512d42 100644 --- a/worker/src/unstract/worker/main.py +++ b/worker/src/unstract/worker/main.py @@ -28,6 +28,7 @@ def run_container() -> Optional[Any]: organization_id = data["organization_id"] workflow_id = data["workflow_id"] execution_id = data["execution_id"] + run_id = data["run_id"] settings = data["settings"] envs = data["envs"] messaging_channel = data["messaging_channel"] @@ -37,6 +38,7 @@ def run_container() -> Optional[Any]: organization_id=organization_id, workflow_id=workflow_id, execution_id=execution_id, + run_id=run_id, settings=settings, envs=envs, messaging_channel=messaging_channel, diff --git a/worker/src/unstract/worker/worker.py b/worker/src/unstract/worker/worker.py index b8caff4bb..bf197de01 100644 --- a/worker/src/unstract/worker/worker.py +++ b/worker/src/unstract/worker/worker.py @@ -139,6 +139,7 @@ def run_command(self, command: str) -> Optional[Any]: organization_id="", workflow_id="", execution_id="", + run_id="", auto_remove=True, ) container = None @@ -163,6 +164,7 @@ def run_container( organization_id: str, workflow_id: str, execution_id: str, + run_id: str, settings: dict[str, Any], envs: dict[str, Any], messaging_channel: Optional[str] = None, @@ -193,6 +195,7 @@ def run_container( organization_id=organization_id, workflow_id=workflow_id, execution_id=execution_id, + run_id=run_id, envs=envs, ) # Add labels to container for logging with Loki. @@ -207,10 +210,11 @@ def run_container( container = None result = {"type": "RESULT", "result": None} try: - container: ContainerInterface = self.client.run_container(container_config) self.logger.info( - f"Running Docker container: {container_config.get('name')}" + f"Execution ID: {execution_id}, running docker " + f"container: {container_config.get('name')}" ) + container: ContainerInterface = self.client.run_container(container_config) tool_instance_id = str(settings.get(ToolKey.TOOL_INSTANCE_ID)) # Stream logs self.stream_logs(