Skip to content

Commit

Permalink
fix: Used WF exec ID for tool container name to allow better tracing (#…
Browse files Browse the repository at this point in the history
…666)

* Used WF exec ID for tool container name to allow better tracing

* Fixed unstract-runner tests

* Set and pass run_id from backend to unstract-runner for tracing logs

* Fixed failing worker tests and removed unused code

---------

Co-authored-by: Ritwik G <[email protected]>
Co-authored-by: Jaseem Jas <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent a598a43 commit 21068b2
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 47 deletions.
36 changes: 13 additions & 23 deletions backend/workflow_manager/workflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def build(self) -> None:
)
raise WorkflowExecutionError(self.compilation_result["problems"][0])

def execute(self, single_step: bool = False) -> None:
def execute(self, run_id: str, file_name: str, single_step: bool = False) -> None:
execution_type = ExecutionType.COMPLETE
if single_step:
execution_type = ExecutionType.STEP
Expand All @@ -230,7 +230,9 @@ def execute(self, single_step: bool = False) -> None:

start_time = time.time()
try:
self.execute_workflow(execution_type=execution_type)
self.execute_workflow(
run_id=run_id, file_name=file_name, execution_type=execution_type
)
end_time = time.time()
execution_time = end_time - start_time
except StopExecution as exception:
Expand Down Expand Up @@ -302,44 +304,32 @@ def publish_initial_tool_execution_logs(

def execute_input_file(
self,
run_id: str,
file_name: str,
single_step: bool,
) -> tuple[bool, bool]:
) -> None:
"""Executes the input file.
Args:
run_id (str): UUID for a single run of a file
file_name (str): The name of the file to be executed.
single_step (bool): Flag indicating whether to execute in
single step mode.
Returns:
tuple[bool, bool]: Flag indicating whether the file was executed
and skipped.
"""
execution_type = ExecutionType.COMPLETE
if single_step:
execution_type = ExecutionType.STEP
self.execute_uncached_input(file_name=file_name, single_step=single_step)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

def execute_uncached_input(self, file_name: str, single_step: bool) -> None:
"""Executes the uncached input file.
Args:
file_name (str): The name of the file to be executed.
single_step (bool): Flag indicating whether to execute in
single step mode.
Returns:
None
"""
self.publish_log("No entries found in cache, executing the tools")
self.publish_log(
"No entries found in cache, " f"running the tool(s) for {file_name}"
)
self.publish_update_log(
state=LogState.SUCCESS,
message=f"{file_name} Sent for execution",
component=LogComponent.SOURCE,
)
self.execute(single_step)
self.execute(run_id, file_name, single_step)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

def initiate_tool_execution(
self,
Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/workflow/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import traceback
from typing import Any, Optional
from uuid import uuid4

from account.constants import Common
from account.models import Organization
Expand Down Expand Up @@ -194,7 +195,11 @@ def process_file(
current_file_idx, total_files, file_name, single_step
)
if not file_hash.is_executed:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
run_id = str(uuid4())
execution_service.execute_input_file(
run_id=run_id,
file_name=file_name,
single_step=single_step,
)
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
build:
dockerfile: docker/dockerfiles/backend.Dockerfile
context: ..
worker:
runner:
image: unstract/worker:${VERSION}
build:
dockerfile: docker/dockerfiles/worker.Dockerfile
Expand Down
6 changes: 6 additions & 0 deletions unstract/core/src/unstract/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ def get_env(env_key: str, default: Optional[str] = None, raise_err=False) -> str
if env_value is None or env_value == "" and raise_err:
raise RuntimeError(f"Env variable {env_key} is required")
return env_value

@staticmethod
def build_tool_container_name(
tool_image: str, tool_version: str, run_id: str
) -> str:
return f"{tool_image.split('/')[-1]}-{tool_version}-{run_id}"
4 changes: 4 additions & 0 deletions unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def make_get_request(

def call_tool_handler(
self,
run_id: str,
image_name: str,
image_tag: str,
settings: dict[str, Any],
Expand All @@ -86,6 +87,7 @@ def call_tool_handler(
"""
url = f"{self.base_url}{UnstractWorker.RUN_API_ENDPOINT}"
data = self.create_tool_request_data(
run_id,
image_name,
image_tag,
settings,
Expand All @@ -108,6 +110,7 @@ def call_tool_handler(

def create_tool_request_data(
self,
run_id: str,
image_name: str,
image_tag: str,
settings: dict[str, Any],
Expand All @@ -118,6 +121,7 @@ def create_tool_request_data(
"organization_id": self.organization_id,
"workflow_id": self.workflow_id,
"execution_id": self.execution_id,
"run_id": run_id,
"settings": settings,
"envs": self.envs,
"messaging_channel": self.messaging_channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ def get_variables(self) -> Optional[dict[str, Any]]:
)
return result

def run_tool(self) -> Optional[dict[str, Any]]:
def run_tool(self, run_id: str) -> Optional[dict[str, Any]]:
return self.helper.call_tool_handler( # type: ignore
run_id,
self.image_name,
self.image_tag,
self.settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,21 @@ def check_tools_are_available(self, tool_ids: list[str]) -> bool:

def run_tool(
self,
run_id: str,
tool_sandbox: ToolSandbox,
) -> Any:
return self.run_tool_with_retry(tool_sandbox)
return self.run_tool_with_retry(run_id, tool_sandbox)

def run_tool_with_retry(
self,
run_id: str,
tool_sandbox: ToolSandbox,
max_retries: int = ToolExecution.MAXIMUM_RETRY,
) -> Any:
error: Optional[dict[str, Any]] = None
for retry_count in range(max_retries):
try:
response = tool_sandbox.run_tool()
response = tool_sandbox.run_tool(run_id)
if response:
return response
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from unstract.workflow_execution.tools_utils import ToolsUtils

from unstract.core.pubsub_helper import LogPublisher
from unstract.core.utilities import UnstractUtils

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,11 +138,15 @@ def build_workflow(self) -> None:

logger.info(f"Execution {self.execution_id}: Build completed")

def execute_workflow(self, execution_type: ExecutionType) -> None:
def execute_workflow(
self, run_id: str, file_name: str, execution_type: ExecutionType
) -> None:
"""Executes the complete workflow by running each tools one by one.
Returns the result from final tool in a dictionary.
Args:
run_id (str): UUID for a single run of a file
file_name (str): Name of the file to process
execution_type (ExecutionType): STEP or COMPLETE
Raises:
Expand All @@ -157,21 +162,37 @@ def execute_workflow(self, execution_type: ExecutionType) -> None:
self._initialize_execution()
total_steps = len(self.tool_sandboxes)
self.total_steps = total_steps
# Currently each tool is run serially for files and workflows contain 1 tool
# only. While supporting more tools in a workflow, correct the tool container
# name to avoid conflicts.
for step, sandbox in enumerate(self.tool_sandboxes):
container_name = UnstractUtils.build_tool_container_name(
tool_image=sandbox.image_name,
tool_version=sandbox.image_tag,
run_id=run_id,
)
logger.info(
f"Running execution: '{self.execution_id}', "
f"tool: '{sandbox.image_name}:{sandbox.image_tag}', "
f"file '{file_name}', container: '{container_name}'"
)
self._execute_step(
run_id=run_id,
step=step,
sandbox=sandbox,
)
self._finalize_execution(execution_type)

def _execute_step(
self,
run_id: str,
step: int,
sandbox: ToolSandbox,
) -> None:
"""Execution of workflow step.
Args:
run_id (str): UUID for a single run of a file
step (int): workflow step
sandbox (ToolSandbox): instance of tool sandbox
execution_type (ExecutionType): step or complete
Expand All @@ -186,7 +207,8 @@ def _execute_step(
tool_uid = sandbox.get_tool_uid()
tool_instance_id = sandbox.get_tool_instance_id()
log_message = f"Executing step {actual_step} with tool {tool_uid}"
logger.info(f"Execution {self.execution_id}: {log_message}")
logger.info(f"Execution {self.execution_id}, Run {run_id}: {log_message}")
# TODO: Mention run_id in the FE logs / components
self.publish_log(
log_message,
step=actual_step,
Expand All @@ -199,13 +221,14 @@ def _execute_step(
message="Ready for execution",
component=tool_instance_id,
)
result = self.tool_utils.run_tool(tool_sandbox=sandbox)
result = self.tool_utils.run_tool(run_id=run_id, tool_sandbox=sandbox)
if result and result.get("error"):
raise ToolOutputNotFoundException(result.get("error"))
if not self.validate_execution_result(step + 1):
raise ToolOutputNotFoundException(
f"Tool exception raised for tool {tool_uid}, "
"check logs for more information"
f"Error running tool '{tool_uid}' for run "
f"'{run_id}' of execution '{self.execution_id}'. "
"Check logs for more information"
)
log_message = f"Step {actual_step} executed successfully"
self.publish_update_log(
Expand All @@ -219,7 +242,6 @@ def _execute_step(

# TODO: Catch specific workflow execution error to avoid showing pythonic error
except Exception as error:
self.publish_log(str(error), LogLevel.ERROR, step=actual_step)
self.publish_update_log(
state=LogState.ERROR,
message="Failed to execute",
Expand Down
5 changes: 5 additions & 0 deletions worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ dev = [
]

[tool.pdm.scripts]
# Runs the flask service
runner.cmd = "flask --app src/unstract/worker/main.py run --port 5002"
runner.env_file = ".env"
runner.help = "Runs the Unstract tool runner"
# Run tests for this service
test.cmd = "pytest -s -v"
test.env_file = "tests/.env"
test.help = "Runs pytests for the Unstract tool runner"
7 changes: 5 additions & 2 deletions worker/src/unstract/worker/clients/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from docker.errors import APIError, ImageNotFound
from docker.models.containers import Container
from unstract.worker.clients.helper import ContainerClientHelper
from unstract.worker.clients.interface import (
ContainerClientInterface,
ContainerInterface,
Expand All @@ -14,6 +13,7 @@
from unstract.worker.utils import Utils

from docker import DockerClient
from unstract.core.utilities import UnstractUtils


class DockerContainer(ContainerInterface):
Expand Down Expand Up @@ -161,6 +161,7 @@ def get_container_run_config(
organization_id: str,
workflow_id: str,
execution_id: str,
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,
) -> dict[str, Any]:
Expand Down Expand Up @@ -188,7 +189,9 @@ def get_container_run_config(
execution_id,
)
return {
"name": ContainerClientHelper.normalize_container_name(self.image_name),
"name": UnstractUtils.build_tool_container_name(
tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id
),
"image": self.get_image(),
"command": command,
"detach": True,
Expand Down
5 changes: 0 additions & 5 deletions worker/src/unstract/worker/clients/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os
import uuid
from importlib import import_module

from .interface import ContainerClientInterface
Expand All @@ -16,7 +15,3 @@ def get_container_client() -> ContainerClientInterface:
)
logger.info("Loading the container client from path:", client_path)
return import_module(client_path).Client

@staticmethod
def normalize_container_name(name: str) -> str:
return name.replace("/", "-") + "-" + str(uuid.uuid4())
1 change: 1 addition & 0 deletions worker/src/unstract/worker/clients/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_container_run_config(
organization_id: str,
workflow_id: str,
execution_id: str,
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,
) -> dict[str, Any]:
Expand Down
Loading

0 comments on commit 21068b2

Please sign in to comment.