diff --git a/docs/book/component-guide/orchestrators/daytona.md b/docs/book/component-guide/orchestrators/daytona.md new file mode 100644 index 0000000000..2aee63587a --- /dev/null +++ b/docs/book/component-guide/orchestrators/daytona.md @@ -0,0 +1,111 @@ +--- +description: Orchestrating your pipelines to run on Daytona. +--- + +# Daytona Orchestrator + +The Daytona orchestrator is an integration provided by ZenML that allows you to run your pipelines on Daytona's infrastructure, leveraging its scalable compute resources and managed environment. + +{% hint style="warning" %} +This component is only meant to be used within the context of a [remote ZenML deployment scenario](../../getting-started/deploying-zenml/README.md). Usage with a local ZenML deployment may lead to unexpected behavior! +{% endhint %} + +## When to use it + +* You are looking for a fast and easy way to run your pipelines on Daytona's infrastructure. +* You're already using Daytona for your machine learning projects. +* You want to leverage Daytona's managed infrastructure for running your pipelines. +* You're looking for a solution that simplifies the deployment and scaling of your ML workflows. + +## How to deploy it + +To use the Daytona orchestrator, you need to have a Daytona account and the necessary credentials. You don't need to deploy any additional infrastructure, as the orchestrator will use Daytona's managed resources. + +## How it works + +The Daytona orchestrator is a ZenML orchestrator that runs your pipelines on Daytona's infrastructure. When you run a pipeline with the Daytona orchestrator, ZenML will archive your current ZenML repository and upload it to the Daytona platform. Once the code is archived, ZenML will create a new environment in Daytona and upload the code to it. Then ZenML runs a list of commands to prepare for the pipeline run (e.g., installing dependencies, setting up the environment). Finally, ZenML will run the pipeline on Daytona's infrastructure. + +* The orchestrator supports both CPU and GPU machine types. You can specify the machine type in the `DaytonaOrchestratorSettings`. + +## How to use it + +To use the Daytona orchestrator, you need: + +* The ZenML `daytona` integration installed. If you haven't done so, run + +```shell +zenml integration install daytona +``` + +* A [remote artifact store](../artifact-stores/artifact-stores.md) as part of your stack. + +* Daytona credentials + +### Daytona credentials + +You will need the following credentials to use the Daytona orchestrator: + +* `DAYTONA_API_KEY`: Your Daytona API key +* `DAYTONA_SERVER_URL`: Your Daytona server URL + +You can set these credentials as environment variables or you can set them when registering the orchestrator: + +```shell +zenml orchestrator register daytona_orchestrator \ + --flavor=daytona \ + --api_key= \ + --server_url= +``` + +We can then register the orchestrator and use it in our active stack: + +```bash +# Register and activate a stack with the new orchestrator +zenml stack register daytona_stack -o daytona_orchestrator ... --set +``` + +You can configure the orchestrator at pipeline level, using the `orchestrator` parameter. + +```python +from zenml.integrations.daytona.flavors import DaytonaOrchestratorSettings + + +daytona_settings = DaytonaOrchestratorSettings( + machine_type="cpu", + custom_commands=["pip install -r requirements.txt", "do something else"] +) + +@pipeline( + +## Settings + +The `DaytonaOrchestratorSettings` allows you to configure various aspects of the orchestrator: + +* `api_key`: The API key for authenticating with Daytona. +* `server_url`: The server URL for connecting to Daytona. +* `machine_type`: The type of machine to use (e.g., "cpu", "gpu"). +* `custom_commands`: A list of custom commands to run before executing the pipeline. +* `synchronous`: Whether to run the pipeline synchronously or asynchronously. +* `image`: The Docker image to use for running the pipeline. +* `cpu`: The number of CPU cores to allocate. +* `memory`: The amount of memory to allocate (in MB). +* `disk`: The amount of disk space to allocate (in GB). +* `gpu`: The number of GPUs to allocate. +* `timeout`: The maximum time to wait for the pipeline to complete (in seconds). +* `auto_stop_interval`: The time interval after which the environment will be automatically stopped if idle (in seconds). + +```bash +zenml pipeline run +``` + +## Monitoring and Logs + +- Logs are available in the Daytona workspace. +- For asynchronous execution, check the `pipeline.log` file in the workspace. + +## Advanced Configuration + +- Customize resource allocation (CPU, memory, etc.) based on your pipeline needs. +- Use custom commands to set up the environment in the workspace. + +For more details, refer to the [Daytona SDK documentation](https://daytona.work/docs). \ No newline at end of file diff --git a/docs/book/toc.md b/docs/book/toc.md index fc3ad05b1e..8581303df2 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -250,6 +250,7 @@ * [HyperAI Orchestrator](component-guide/orchestrators/hyperai.md) * [Lightning AI Orchestrator](component-guide/orchestrators/lightning.md) * [Develop a custom orchestrator](component-guide/orchestrators/custom.md) + * [Daytona Orchestrator](component-guide/orchestrators/daytona.md) * [Artifact Stores](component-guide/artifact-stores/artifact-stores.md) * [Local Artifact Store](component-guide/artifact-stores/local.md) * [Amazon Simple Cloud Storage (S3)](component-guide/artifact-stores/s3.md) diff --git a/pyproject.toml b/pyproject.toml index 273e40ed0c..77d27570de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -485,5 +485,6 @@ module = [ "vllm.*", "numba.*", "uvloop.*", + "daytona_sdk.*", ] ignore_missing_imports = true diff --git a/scripts/install-zenml-dev.sh b/scripts/install-zenml-dev.sh index 45bfd39121..87e15d93b2 100755 --- a/scripts/install-zenml-dev.sh +++ b/scripts/install-zenml-dev.sh @@ -36,7 +36,7 @@ install_integrations() { # figure out the python version python_version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:2])))") - ignore_integrations="feast label_studio bentoml seldon pycaret skypilot_aws skypilot_gcp skypilot_azure pigeon prodigy argilla" + ignore_integrations="feast label_studio bentoml seldon pycaret skypilot_aws skypilot_gcp skypilot_azure pigeon prodigy argilla daytona" # Ignore tensorflow and deepchecks only on Python 3.12 if [ "$python_version" = "3.12" ]; then diff --git a/src/zenml/integrations/__init__.py b/src/zenml/integrations/__init__.py index 19a55d1346..a0a168c6ae 100644 --- a/src/zenml/integrations/__init__.py +++ b/src/zenml/integrations/__init__.py @@ -24,6 +24,7 @@ from zenml.integrations.bentoml import BentoMLIntegration # noqa from zenml.integrations.bitbucket import BitbucketIntegration # noqa from zenml.integrations.databricks import DatabricksIntegration # noqa +from zenml.integrations.daytona import DaytonaIntegration # noqa from zenml.integrations.comet import CometIntegration # noqa from zenml.integrations.deepchecks import DeepchecksIntegration # noqa from zenml.integrations.discord import DiscordIntegration # noqa diff --git a/src/zenml/integrations/constants.py b/src/zenml/integrations/constants.py index baea84329d..4e87bb8ddc 100644 --- a/src/zenml/integrations/constants.py +++ b/src/zenml/integrations/constants.py @@ -23,6 +23,7 @@ COMET = "comet" DASH = "dash" DATABRICKS = "databricks" +DAYTONA = "daytona" DEEPCHECKS = "deepchecks" DISCORD = "discord" EVIDENTLY = "evidently" diff --git a/src/zenml/integrations/daytona/__init__.py b/src/zenml/integrations/daytona/__init__.py new file mode 100644 index 0000000000..1bd8822d77 --- /dev/null +++ b/src/zenml/integrations/daytona/__init__.py @@ -0,0 +1,47 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Daytona integration for ZenML.""" + +from typing import List, Type + +from zenml.integrations.constants import DAYTONA +from zenml.integrations.integration import Integration +from zenml.stack import Flavor + +DAYTONA_ORCHESTRATOR_FLAVOR = "daytona" + + +class DaytonaIntegration(Integration): + """Definition of Daytona Integration for ZenML.""" + + NAME = DAYTONA_ORCHESTRATOR_FLAVOR + REQUIREMENTS = ["daytona-sdk>=0.9.0"] + + @classmethod + def flavors(cls) -> List[Type[Flavor]]: + """Declare the stack component flavors for the Daytona integration. + + Returns: + List of stack component flavors for this integration. + """ + from zenml.integrations.daytona.flavors import ( + DaytonaOrchestratorFlavor, + ) + + return [ + DaytonaOrchestratorFlavor, + ] + + +DaytonaIntegration.check_installation() \ No newline at end of file diff --git a/src/zenml/integrations/daytona/flavors/__init__.py b/src/zenml/integrations/daytona/flavors/__init__.py new file mode 100644 index 0000000000..6ff1f55071 --- /dev/null +++ b/src/zenml/integrations/daytona/flavors/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Daytona flavors package.""" + +from zenml.integrations.daytona.flavors.daytona_orchestrator_flavor import ( + DaytonaOrchestratorFlavor, +) + +__all__ = ["DaytonaOrchestratorFlavor"] \ No newline at end of file diff --git a/src/zenml/integrations/daytona/flavors/daytona_orchestrator_flavor.py b/src/zenml/integrations/daytona/flavors/daytona_orchestrator_flavor.py new file mode 100644 index 0000000000..bfbaaf64fb --- /dev/null +++ b/src/zenml/integrations/daytona/flavors/daytona_orchestrator_flavor.py @@ -0,0 +1,179 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Daytona orchestrator flavor.""" + +from typing import TYPE_CHECKING, Dict, List, Optional, Type + +from zenml.config.base_settings import BaseSettings +from zenml.integrations.daytona import DAYTONA_ORCHESTRATOR_FLAVOR +from zenml.orchestrators import BaseOrchestratorConfig +from zenml.orchestrators.base_orchestrator import BaseOrchestratorFlavor +from zenml.utils.secret_utils import SecretField + +if TYPE_CHECKING: + from zenml.integrations.daytona.orchestrators.daytona_orchestrator import ( + DaytonaOrchestrator, + ) + + +class DaytonaOrchestratorSettings(BaseSettings): + """Settings for the Daytona orchestrator. + + Attributes: + api_key: The API key for authenticating with Daytona. + server_url: The URL of the Daytona server. + target: The target environment for Daytona. + custom_commands: Custom commands to run in the workspace. + synchronous: If True, wait for the pipeline to complete. If False, + return immediately after starting the pipeline. Defaults to False. + image: Custom Docker image to use for the workspace. + os_user: Operating system user for the workspace. + env_vars: Additional environment variables to set in the workspace. + labels: Labels to attach to the workspace. + public: Whether the workspace should be public. + cpu: Number of CPUs to allocate to the workspace. + memory: Memory in MB to allocate to the workspace. + disk: Disk size in GB to allocate to the workspace. + gpu: Number of GPUs to allocate to the workspace. + timeout: Timeout in seconds for workspace operations. + auto_stop_interval: Interval in seconds after which to automatically stop the workspace. + """ + + # Authentication and connection settings + api_key: Optional[str] = SecretField(default=None) + server_url: Optional[str] = "https://daytona.work/api" + target: Optional[str] = "us" + custom_commands: Optional[List[str]] = None + synchronous: bool = False + + # Workspace configuration + image: Optional[str] = None + os_user: Optional[str] = None + env_vars: Optional[Dict[str, str]] = None + labels: Optional[Dict[str, str]] = None + public: Optional[bool] = None + + # Resource configuration + cpu: Optional[int] = None + memory: Optional[int] = None # in MB + disk: Optional[int] = None # in GB + gpu: Optional[int] = None + + # Operational settings + timeout: Optional[float] = None + auto_stop_interval: Optional[int] = None + + +class DaytonaOrchestratorConfig( + BaseOrchestratorConfig, DaytonaOrchestratorSettings +): + """Configuration for the Daytona orchestrator.""" + + @property + def is_local(self) -> bool: + """Checks if this stack component is running locally. + + Returns: + True if this config is for a local component, False otherwise. + """ + return False + + @property + def is_synchronous(self) -> bool: + """Whether the orchestrator runs synchronous or not. + + Returns: + Whether the orchestrator runs synchronous or not. + """ + return self.synchronous + + @property + def is_schedulable(self) -> bool: + """Whether the orchestrator is schedulable or not. + + Returns: + Whether the orchestrator is schedulable or not. + """ + return False + + @property + def supports_client_side_caching(self) -> bool: + """Whether the orchestrator supports client side caching. + + Returns: + Whether the orchestrator supports client side caching. + """ + return False + + +class DaytonaOrchestratorFlavor(BaseOrchestratorFlavor): + """Flavor for the Daytona orchestrator.""" + + @property + def name(self) -> str: + """Name of the flavor. + + Returns: + The name of the flavor. + """ + return DAYTONA_ORCHESTRATOR_FLAVOR + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/daytona.png" + + @property + def config_class(self) -> Type[DaytonaOrchestratorConfig]: + """Returns DaytonaOrchestratorConfig config class. + + Returns: + The config class. + """ + return DaytonaOrchestratorConfig + + @property + def implementation_class(self) -> Type["DaytonaOrchestrator"]: + """Implementation class for this flavor. + + Returns: + The implementation class. + """ + from zenml.integrations.daytona.orchestrators.daytona_orchestrator import ( + DaytonaOrchestrator, + ) + + return DaytonaOrchestrator diff --git a/src/zenml/integrations/daytona/orchestrators/__init__.py b/src/zenml/integrations/daytona/orchestrators/__init__.py new file mode 100644 index 0000000000..8859f384be --- /dev/null +++ b/src/zenml/integrations/daytona/orchestrators/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Daytona orchestrators package.""" + +from .daytona_orchestrator import DaytonaOrchestrator \ No newline at end of file diff --git a/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator.py b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator.py new file mode 100644 index 0000000000..bcaace6026 --- /dev/null +++ b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator.py @@ -0,0 +1,300 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the Daytona orchestrator.""" + +import os +import tempfile +from typing import TYPE_CHECKING, Dict, Optional, Type, cast + +from daytona_sdk import ( # type: ignore + CreateWorkspaceParams, + Daytona, + DaytonaConfig, + WorkspaceResources, +) + +from zenml.constants import ( + ENV_ZENML_CUSTOM_SOURCE_ROOT, + ENV_ZENML_WHEEL_PACKAGE_NAME, +) +from zenml.entrypoints.step_entrypoint_configuration import ( + StepEntrypointConfiguration, +) +from zenml.integrations.daytona.flavors.daytona_orchestrator_flavor import ( + DaytonaOrchestratorConfig, + DaytonaOrchestratorSettings, +) +from zenml.integrations.daytona.orchestrators.daytona_orchestrator_entrypoint_configuration import ( + DaytonaOrchestratorEntrypointConfiguration, +) +from zenml.logger import get_logger +from zenml.orchestrators import WheeledOrchestrator +from zenml.orchestrators.dag_runner import ThreadedDagRunner +from zenml.orchestrators.utils import get_orchestrator_run_name +from zenml.stack import Stack, StackValidator +from zenml.utils import io_utils + +if TYPE_CHECKING: + from zenml.config.base_settings import BaseSettings + from zenml.models import PipelineDeploymentResponse + +logger = get_logger(__name__) + +ENV_ZENML_DAYTONA_RUN_ID = "ZENML_DAYTONA_RUN_ID" +DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH = "/home/daytona/zenml" + + +class DaytonaOrchestrator(WheeledOrchestrator): + """Orchestrator responsible for running pipelines using the Daytona SDK.""" + + @property + def config(self) -> DaytonaOrchestratorConfig: + """Returns the `DaytonaOrchestratorConfig` config. + + Returns: + The configuration. + """ + return cast(DaytonaOrchestratorConfig, self._config) + + @property + def settings_class(self) -> Optional[Type["BaseSettings"]]: + """Settings class for the Daytona orchestrator. + + Returns: + The settings class. + """ + return DaytonaOrchestratorSettings + + @property + def validator(self) -> Optional[StackValidator]: + """Validates the stack. + + Returns: + A StackValidator instance. + """ + return StackValidator( + required_components=set(), + custom_validation_function=None, + ) + + @property + def root_directory(self) -> str: + """Path to the root directory for all files concerning this orchestrator. + + Returns: + Path to the root directory. + """ + return os.path.join( + io_utils.get_global_config_directory(), + "daytona", + str(self.id), + ) + + def prepare_or_run_pipeline( + self, + deployment: "PipelineDeploymentResponse", + stack: "Stack", + environment: Dict[str, str], + ) -> None: + """Prepare and run the pipeline using Daytona SDK. + + Args: + deployment: The pipeline deployment to prepare or run. + stack: The stack the pipeline will run on. + environment: Environment variables to set in the orchestration + environment. + + Raises: + ValueError: If the Daytona API key is not set in the settings. + """ + settings = cast( + DaytonaOrchestratorSettings, self.get_settings(deployment) + ) + if not settings.api_key: + raise ValueError( + "Daytona orchestrator requires `api_key` to be set in the settings." + ) + + # Initialize Daytona client + daytona_config = DaytonaConfig( + api_key=settings.api_key or "", + server_url=settings.server_url or "https://daytona.work/api", + target=settings.target or "us", + ) + daytona = Daytona(daytona_config) + + # Create a workspace for the pipeline + orchestrator_run_name = get_orchestrator_run_name( + pipeline_name=deployment.pipeline_configuration.name + ) + + logger.info("Creating Daytona workspace...") + workspace_params = CreateWorkspaceParams( + language="python", + id=orchestrator_run_name, + name=orchestrator_run_name, + image=settings.image, + os_user=settings.os_user, + env_vars=settings.env_vars, + labels=settings.labels, + public=settings.public, + target=settings.target or "us", + timeout=settings.timeout, + auto_stop_interval=settings.auto_stop_interval, + ) + + # Add resource configuration if any resource settings are specified + if any( + x is not None + for x in [ + settings.cpu, + settings.memory, + settings.disk, + settings.gpu, + ] + ): + workspace_params.resources = WorkspaceResources( + cpu=settings.cpu, + memory=settings.memory, + disk=settings.disk, + gpu=settings.gpu, + ) + + workspace = daytona.create(workspace_params) + + # Create a wheel for the package in a temporary directory + with tempfile.TemporaryDirectory() as temp_dir: + wheel_path = self.create_wheel(temp_dir=temp_dir) + + # Set up environment variables + env_vars = environment.copy() + env_vars[ENV_ZENML_DAYTONA_RUN_ID] = orchestrator_run_name + env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = ( + DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH + ) + env_vars[ENV_ZENML_WHEEL_PACKAGE_NAME] = self.package_name + + # Set environment variables in the workspace + for key, value in env_vars.items(): + workspace.process.exec( + f"export {key}='{value}'", cwd="/home/daytona" + ) + + # Create the repository directory and set up the environment + self._setup_workspace(workspace, wheel_path) + + if settings.synchronous: + # In synchronous mode, run steps directly in the main workspace + logger.info("Running pipeline synchronously...") + pipeline_dag = { + step_name: step.spec.upstream_steps + for step_name, step in deployment.step_configurations.items() + } + + def run_step(step_name: str) -> None: + """Run a pipeline step. + + Args: + step_name: Name of the step to run. + """ + logger.info(f"Running step: {step_name}") + entrypoint_command = " ".join( + StepEntrypointConfiguration.get_entrypoint_command() + ) + entrypoint_args = " ".join( + StepEntrypointConfiguration.get_entrypoint_arguments( + step_name=step_name, + deployment_id=deployment.id, + ) + ) + command = f"{entrypoint_command} {entrypoint_args}" + response = workspace.process.exec( + command, + cwd=DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH, + ) + logger.info(f"Step {step_name} result: {response.result}") + + ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step).run() + logger.info("Pipeline execution completed.") + else: + # In asynchronous mode, use the orchestrator entrypoint + logger.info("Running pipeline asynchronously...") + entrypoint_command = " ".join( + DaytonaOrchestratorEntrypointConfiguration.get_entrypoint_command() + ) + entrypoint_args = " ".join( + DaytonaOrchestratorEntrypointConfiguration.get_entrypoint_arguments( + run_name=orchestrator_run_name, + deployment_id=deployment.id, + ) + ) + command = f"{entrypoint_command} {entrypoint_args}" + response = workspace.process.exec( + f"nohup {command} > pipeline.log 2>&1 &", + cwd=DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH, + ) + logger.info( + "Pipeline started asynchronously. You can check the logs at " + f"{DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH}/pipeline.log" + ) + + def _setup_workspace(self, workspace, wheel_path: str) -> None: + """Set up the workspace with required dependencies and code. + + Args: + workspace: The Daytona workspace to set up. + wheel_path: Path to the wheel package to install. + """ + # Create the repository directory + workspace.process.exec( + f"mkdir -p {DAYTONA_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH}", + cwd="/home/daytona", + ) + + # Upload and install the wheel package + workspace.fs.upload_file( + f"/home/daytona/{os.path.basename(wheel_path)}", + open(wheel_path, "rb").read(), + ) + + # Install Python dependencies + workspace.process.exec("pip install uv", cwd="/home/daytona") + workspace.process.exec( + f"uv pip install {os.path.basename(wheel_path)}", + cwd="/home/daytona", + ) + + # Execute custom commands if any + settings = cast(DaytonaOrchestratorSettings, self.config) + for command in settings.custom_commands or []: + logger.info(f"Executing custom command: {command}") + response = workspace.process.exec(command, cwd="/home/daytona") + logger.info(f"Custom command result: {response.result}") + + def get_orchestrator_run_id(self) -> str: + """Returns the active orchestrator run id. + + Raises: + RuntimeError: If no run id exists. + + Returns: + The orchestrator run id. + """ + try: + return os.environ[ENV_ZENML_DAYTONA_RUN_ID] + except KeyError: + raise RuntimeError( + "Unable to read run id from environment variable " + f"{ENV_ZENML_DAYTONA_RUN_ID}." + ) diff --git a/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint.py b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint.py new file mode 100644 index 0000000000..f864bf90c9 --- /dev/null +++ b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint.py @@ -0,0 +1,91 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Entrypoint of the Daytona orchestrator.""" + +import argparse +import os + +from zenml.client import Client +from zenml.entrypoints.step_entrypoint_configuration import ( + StepEntrypointConfiguration, +) +from zenml.logger import get_logger +from zenml.orchestrators.dag_runner import ThreadedDagRunner + +logger = get_logger(__name__) + + +def parse_args() -> argparse.Namespace: + """Parse entrypoint arguments. + + Returns: + Parsed args. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--run_name", type=str, required=True) + parser.add_argument("--deployment_id", type=str, required=True) + return parser.parse_args() + + +def main() -> None: + """Entrypoint of the Daytona orchestrator. + + This is responsible for running the pipeline steps in the Daytona workspace. + """ + logger.info("Daytona orchestrator started.") + + # Parse arguments + args = parse_args() + + # Get deployment + deployment = Client().get_deployment(args.deployment_id) + + # Create DAG of steps + pipeline_dag = { + step_name: step.spec.upstream_steps + for step_name, step in deployment.step_configurations.items() + } + + def run_step(step_name: str) -> None: + """Run a pipeline step. + + Args: + step_name: Name of the step to run. + """ + logger.info(f"Running step: {step_name}") + + # Get the entrypoint command and arguments + entrypoint_command = " ".join( + StepEntrypointConfiguration.get_entrypoint_command() + ) + entrypoint_args = " ".join( + StepEntrypointConfiguration.get_entrypoint_arguments( + step_name=step_name, + deployment_id=args.deployment_id, + ) + ) + + # Run the step + command = f"{entrypoint_command} {entrypoint_args}" + logger.info(f"Executing command: {command}") + os.system(command) + + # Run the DAG + ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step).run() + + logger.info("Pipeline execution completed.") + + +if __name__ == "__main__": + main() diff --git a/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint_configuration.py b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint_configuration.py new file mode 100644 index 0000000000..b88c1d4323 --- /dev/null +++ b/src/zenml/integrations/daytona/orchestrators/daytona_orchestrator_entrypoint_configuration.py @@ -0,0 +1,77 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Entrypoint configuration for the Daytona orchestrator.""" + +from typing import TYPE_CHECKING, List, Set + +if TYPE_CHECKING: + from uuid import UUID + +RUN_NAME_OPTION = "run_name" +DEPLOYMENT_ID_OPTION = "deployment_id" + + +class DaytonaOrchestratorEntrypointConfiguration: + """Entrypoint configuration for the Daytona orchestrator.""" + + @classmethod + def get_entrypoint_options(cls) -> Set[str]: + """Gets all the options required for running this entrypoint. + + Returns: + Entrypoint options. + """ + options = { + RUN_NAME_OPTION, + DEPLOYMENT_ID_OPTION, + } + return options + + @classmethod + def get_entrypoint_command(cls) -> List[str]: + """Returns a command that runs the entrypoint module. + + Returns: + Entrypoint command. + """ + command = [ + "python", + "-m", + "zenml.integrations.daytona.orchestrators.daytona_orchestrator_entrypoint", + ] + return command + + @classmethod + def get_entrypoint_arguments( + cls, + run_name: str, + deployment_id: "UUID", + ) -> List[str]: + """Gets all arguments that the entrypoint command should be called with. + + Args: + run_name: Name of the ZenML run. + deployment_id: ID of the deployment. + + Returns: + List of entrypoint arguments. + """ + args = [ + f"--{RUN_NAME_OPTION}", + run_name, + f"--{DEPLOYMENT_ID_OPTION}", + str(deployment_id), + ] + + return args