Skip to content

Commit

Permalink
Add framework for building sublattices inside new-style executors
Browse files Browse the repository at this point in the history
`LocalDispatcher.prepare_manifest()` will attempt to copy the task
artifacts to `COVALENT_STAGING_URI_PREFIX` if that environment
variable is set.
  • Loading branch information
cjao committed Sep 11, 2024
1 parent 84660e3 commit 40cfdb8
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 25 deletions.
65 changes: 58 additions & 7 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from furl import furl

from covalent._file_transfer import File, FileTransfer

from .._api.apiclient import CovalentAPIClient as APIClient
from .._results_manager.result import Result
from .._results_manager.results_manager import get_result, get_result_manager
Expand All @@ -36,7 +38,7 @@
from .._shared_files.config import get_config
from .._shared_files.schemas.asset import AssetSchema
from .._shared_files.schemas.result import ResultSchema
from .._shared_files.utils import copy_file_locally, format_server_url
from .._shared_files.utils import format_server_url
from .._workflow.lattice import Lattice
from ..triggers import BaseTrigger
from .base import BaseDispatcher
Expand Down Expand Up @@ -521,7 +523,41 @@ def prepare_manifest(lattice, storage_path) -> ResultSchema:
"""Prepare a built-out lattice for submission"""

result_object = Result(lattice)
return serialize_result(result_object, storage_path)
manifest = serialize_result(result_object, storage_path)
LocalDispatcher.transfer_local_assets_to_remote(manifest, storage_path)
return manifest

@staticmethod
def transfer_local_assets_to_remote(manifest: ResultSchema, storage_path) -> ResultSchema:
"""Transfer assets from temporary staging directory to remote storage.
This will be used when building sublattice graphs in an
executor. The executor will deposit the workflow assets at a
location mutually agreed upon between the orchestrator and the
executor plugin.
"""
remote_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None)
if not remote_uri_prefix:
return manifest

local_prefix = "file://"

assets = extract_assets(manifest)

for asset in assets:
# Don't upload empty files
if asset.size > 0:
local_object_key = asset.uri[len(local_prefix) :]
asset.remote_uri = f"{remote_uri_prefix}{local_object_key}"

LocalDispatcher._upload(assets)

for asset in assets:
asset.uri = asset.remote_uri
asset.remote_uri = ""

return manifest

@staticmethod
def register_manifest(
Expand Down Expand Up @@ -602,17 +638,17 @@ def _upload(assets: List[AssetSchema]):
if not asset.remote_uri or not asset.uri:
app_log.debug(f"Skipping asset {i + 1} out of {total}")
continue
if asset.remote_uri.startswith(local_scheme_prefix):
copy_file_locally(asset.uri, asset.remote_uri)
number_uploaded += 1
else:
_upload_asset(asset.uri, asset.remote_uri)
_transfer_asset(asset.uri, asset.remote_uri)
number_uploaded += 1
app_log.debug(f"Uploaded asset {i + 1} out of {total}.")
app_log.debug(f"uploaded {number_uploaded} assets.")


def _upload_asset(local_uri, remote_uri):
# Future improvement: we can probably fold this functionality
# into the HTTP file transfer strategy
def _put_asset(local_uri, remote_uri):
"""Upload asset to an http PUT endpoint."""
scheme_prefix = "file://"
if local_uri.startswith(scheme_prefix):
local_path = local_uri[len(scheme_prefix) :]
Expand All @@ -637,3 +673,18 @@ def _upload_asset(local_uri, remote_uri):

r = api_client.put(endpoint, headers={"Content-Length": str(filesize)}, data=data)
r.raise_for_status()


def _transfer_asset(local_uri, remote_uri):
"""Attempt to upload asset using a generalized file transfer."""

http_prefix = "http://"
https_prefix = "https://"
if remote_uri.startswith(http_prefix) or remote_uri.startswith(https_prefix):
_put_asset(local_uri, remote_uri)

else:
local_obj = File(local_uri)
remote_obj = File(remote_uri)
_, transfer_callable = FileTransfer(local_obj, remote_obj).cp()
transfer_callable()
2 changes: 2 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil

from .. import File
Expand Down Expand Up @@ -46,6 +47,7 @@ def cp(self, from_file: File, to_file: File = File()) -> None:
"""

def callable():
os.makedirs(os.path.dirname(to_file.filepath), exist_ok=True)
shutil.copyfile(from_file.filepath, to_file.filepath)

return callable
Expand Down
34 changes: 19 additions & 15 deletions covalent/_workflow/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,25 +865,29 @@ def _build_sublattice_graph(sub: Lattice, json_parent_metadata: str, *args, **kw
DISABLE_LEGACY_SUBLATTICES = os.environ.get("COVALENT_DISABLE_LEGACY_SUBLATTICES") == "1"

try:
# Attempt multistage sublattice dispatch. For now we require
# the executor to reach the Covalent server
parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"]
dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"]
# Attempt multistage sublattice dispatch.

with tempfile.TemporaryDirectory(prefix="covalent-") as staging_path:
# Try depositing the assets in a location readable by Covalent and
# request Covalent to pull those assets.
staging_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None)
manifest = LocalDispatcher.prepare_manifest(sub, staging_path)
recv_manifest = manifest

# If the executor can reach the Covalent server directly,
# submit the sublattice dispatch to Covalent but don't start it.
if not staging_uri_prefix:
parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"]
dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"]
recv_manifest = LocalDispatcher.register_manifest(
manifest,
dispatcher_addr=dispatcher_url,
parent_dispatch_id=parent_dispatch_id,
push_assets=True,
)
LocalDispatcher.upload_assets(recv_manifest)

# Omit these two steps to return the manifest to Covalent and
# request the assets be pulled
recv_manifest = LocalDispatcher.register_manifest(
manifest,
dispatcher_addr=dispatcher_url,
parent_dispatch_id=parent_dispatch_id,
push_assets=True,
)
LocalDispatcher.upload_assets(recv_manifest)

return recv_manifest.model_dump_json()
return recv_manifest.model_dump_json()

except Exception as ex:
# Fall back to legacy sublattice handling
Expand Down
2 changes: 1 addition & 1 deletion covalent/executor/utils/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ def run_task_group_alt(
task_ids = task_group_metadata["node_ids"]
gid = task_group_metadata["task_group_id"]

os.environ["COVALENT_STAGING_URI_PREFIX"] = f"file://{results_dir}/staging"
os.environ["COVALENT_DISPATCH_ID"] = dispatch_id
os.environ["COVALENT_DISPATCHER_URL"] = server_url

for i, task in enumerate(task_specs):
result_uri, stdout_uri, stderr_uri, qelectron_db_uri = output_uris[i]
Expand Down
6 changes: 4 additions & 2 deletions covalent_dispatcher/_core/data_modules/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,18 @@ def _get_all_assets(dispatch_id: str):
def _pull_assets(manifest: ResultSchema) -> None:
dispatch_id = manifest.metadata.dispatch_id
assets = _get_all_assets(dispatch_id)
futs = []
download_count = 0
for asset in assets["lattice"]:
if asset.remote_uri:
download_count += 1
asset.download(asset.remote_uri)

for asset in assets["nodes"]:
if asset.remote_uri:
download_count += 1
asset.download(asset.remote_uri)

app_log.debug(f"imported {len(futs)} assets for dispatch {dispatch_id}")
app_log.debug(f"imported {download_count} assets for dispatch {dispatch_id}")


async def import_manifest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TestShutilStrategy:
MOCK_TO_FILEPATH = "/home/user/data.csv.bak"

def test_cp(self, mocker):
mocker.patch("os.makedirs")
mock_copyfile = mocker.patch("shutil.copyfile")
from_file = File(TestShutilStrategy.MOCK_FROM_FILEPATH)
to_file = File(TestShutilStrategy.MOCK_TO_FILEPATH)
Expand Down
48 changes: 48 additions & 0 deletions tests/covalent_tests/workflow/electron_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Unit tests for electron"""

import json
import tempfile
from unittest.mock import ANY, MagicMock

import flake8
Expand Down Expand Up @@ -142,6 +143,53 @@ def mock_register(manifest, *args, **kwargs):
assert lat.metadata.workflow_executor_data == parent_metadata["workflow_executor_data"]


def test_build_sublattice_graph_staging_uri(mocker):
"""Test that building a sublattice graph with staging uri."""

dispatch_id = "test_build_sublattice_graph_staging_uri"

@ct.electron
def task(x):
return x

@ct.lattice
def workflow(x):
return task(x)

parent_metadata = {
"executor": "parent_executor",
"executor_data": {},
"workflow_executor": "my_postprocessor",
"workflow_executor_data": {},
"hooks": {
"deps": {"bash": None, "pip": None},
"call_before": [],
"call_after": [],
},
"triggers": "mock-trigger",
"qelectron_data_exists": False,
"results_dir": None,
}

with tempfile.TemporaryDirectory() as tmp_dir:
mock_environ = {
"COVALENT_DISPATCH_ID": dispatch_id,
"COVALENT_STAGING_URI_PREFIX": f"file://{tmp_dir}",
}
mocker.patch("os.environ", mock_environ)
json_manifest = _build_sublattice_graph(workflow, json.dumps(parent_metadata), 1)

# Check that asset uris start with the staging prefix
manifest = ResultSchema.model_validate_json(json_manifest)
for key, asset in manifest.assets:
if asset.size > 0:
assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"])

for key, asset in manifest.lattice.assets:
if asset.size > 0:
assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"])


def test_build_sublattice_graph_fallback(mocker):
"""
Test falling back to monolithic sublattice dispatch
Expand Down

0 comments on commit 40cfdb8

Please sign in to comment.