Skip to content

Commit

Permalink
Use from_cloud for storage type and only allow four types of storage
Browse files Browse the repository at this point in the history
  • Loading branch information
euclidgame committed Nov 15, 2024
1 parent 5ef413d commit dc71860
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 29 deletions.
8 changes: 6 additions & 2 deletions sky/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import networkx as nx

from sky.data.storage import StoreType
from sky.utils import common_utils
from sky.utils import ux_utils

Expand Down Expand Up @@ -38,12 +39,15 @@ class TaskEdge:
source: The upstream task.
target: The downstream task.
data: Optional data transfer information between tasks.
If None, only represents task dependency.
If None, only represents task dependency.
best_storage: Optional best storage option for data transfer.
The tuple is (storage_type, storage_region). This simply
records the information without creating a new store.
"""
source: 'task.Task'
target: 'task.Task'
data: Optional[TaskData] = None
best_storage: Optional[Tuple[str, Optional[str]]] = None
best_storage: Optional[Tuple[StoreType, Optional[str]]] = None

def with_data(self, source_path: str, target_path: str,
size_gb: float) -> 'TaskEdge':
Expand Down
4 changes: 2 additions & 2 deletions sky/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sky.jobs.utils import JOB_CONTROLLER_NAME
from sky.jobs.utils import load_managed_job_queue
from sky.jobs.utils import ManagedJobCodeGen
from sky.jobs.utils import sync_storage_mounts_for_data_transfer
from sky.jobs.utils import set_storage_mounts_for_data_transfer

pathlib.Path(JOBS_TASK_YAML_PREFIX).expanduser().parent.mkdir(parents=True,
exist_ok=True)
Expand All @@ -43,5 +43,5 @@
'format_job_table',
'dump_managed_job_queue',
'load_managed_job_queue',
'sync_storage_mounts_for_data_transfer'
'set_storage_mounts_for_data_transfer'
]
2 changes: 1 addition & 1 deletion sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def run(self) -> None:
# display and its results are not propagated here. Also cluster states
# may change.
sky.optimize(self._dag)
managed_job_utils.sync_storage_mounts_for_data_transfer(self._dag)
managed_job_utils.set_storage_mounts_for_data_transfer(self._dag)

all_tasks_completed = lambda: self._num_tasks == len(self._task_status)
# TODO(andy): Serve has a logic to prevent from too many services
Expand Down
7 changes: 5 additions & 2 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def load_managed_job_queue(payload: str) -> List[Dict[str, Any]]:
return jobs


def sync_storage_mounts_for_data_transfer(dag: 'dag_lib.Dag') -> 'dag_lib.Dag':
def set_storage_mounts_for_data_transfer(dag: 'dag_lib.Dag') -> 'dag_lib.Dag':
"""Syncs up the file mounts for data transfer.
This function is used to sync up the file mounts with the storage
Expand All @@ -707,7 +707,10 @@ def sync_storage_mounts_for_data_transfer(dag: 'dag_lib.Dag') -> 'dag_lib.Dag':
if best_storage is not None:
assert data is not None
bucket_name_tmp = (
f'bucket-for-{src.name}-to-{tgt.name}{uuid.uuid4()}')
f'bucket-for-{src.name}-to-{tgt.name}-{uuid.uuid4()}')
# Generate a valid and unique bucket name. For S3 buckets, the max
# length is 63 characters. Since we don't want to distinguish the
# storage type here, we use the same length for all storage types.
bucket_name = common_utils.make_cluster_name_on_cloud(
bucket_name_tmp, max_length=63)
storage_type, region = best_storage
Expand Down
45 changes: 24 additions & 21 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sky.adaptors import common as adaptors_common
from sky.dag import TaskData
from sky.dag import TaskEdge
from sky.data import StoreType
from sky.utils import env_options
from sky.utils import log_utils
from sky.utils import resources_utils
Expand Down Expand Up @@ -144,12 +145,12 @@ def optimize(dag: 'dag_lib.Dag',
# Make sure to remove the dummy source/sink nodes, even if the
# optimization fails.
Optimizer._remove_dummy_source_sink_nodes(dag)
Optimizer._convert_storage_nodes_to_edge_attributes(
Optimizer._remove_storage_nodes_and_move_to_edge_attributes(
dag, storage_nodes_info)
return dag

@staticmethod
def _convert_storage_nodes_to_edge_attributes(
def _remove_storage_nodes_and_move_to_edge_attributes(
dag: 'dag_lib.Dag',
edges_to_add: List[Tuple[task_lib.Task, task_lib.Task, task_lib.Task,
TaskData]]):
Expand All @@ -164,24 +165,14 @@ def _convert_storage_nodes_to_edge_attributes(
task_edge.with_data(source_path=data.source_path,
target_path=data.target_path,
size_gb=data.size_gb)
instance_to_storage = {
'AWS': 'S3',
'GCP': 'GCS',
'Azure': 'Azure',
'IBM': 'IBM'
}

# TODO(wenjie): support r2 storage.
if storage_node.best_resources is not None:
assert storage_node.best_resources.cloud is not None
cloud_name = str(storage_node.best_resources.cloud)
if cloud_name in instance_to_storage:
task_edge.best_storage = (
instance_to_storage[cloud_name],
storage_node.best_resources.region)
else:
task_edge.best_storage = None
else:
task_edge.best_storage = None
storage_type = StoreType.from_cloud(
storage_node.best_resources.cloud)
task_edge.best_storage = (
storage_type, storage_node.best_resources.region)

@staticmethod
def _add_storage_nodes_for_data_transfer(
Expand All @@ -197,12 +188,24 @@ def _add_storage_nodes_for_data_transfer(
# storage resources. Also, we should add the logic of selecting
# available bucket resources before optimizing the DAG.
for src, dst, edge_data in graph.edges(data=True):
if isinstance(src, task_lib.Task) and isinstance(
dst, task_lib.Task) and edge_data['edge'].data:
if edge_data['edge'].data is not None:
storage_node = task_lib.Task(
f'{src.name}_to_{dst.name}_storage')
data = edge_data['edge'].data
storage_node.set_resources({resources_lib.Resources()})
storage_node.set_resources({
resources_lib.Resources(clouds.AWS()),
resources_lib.Resources(clouds.GCP()),
resources_lib.Resources(clouds.Azure()),
resources_lib.Resources(clouds.IBM())
})

# The time estimator is set to 0 because we use instances to
# represent storage here, and the price is different. Therefore,
# we don't want to consider the time of storage in the
# optimization. Once we use storage resources to represent
# storage, we should set the time estimator to the actual
# storage time, which is likely the sum of the upstream and
# downstream times.
storage_node.set_time_estimator(lambda _: 0)
edges_to_add.append((src, storage_node, dst, data))
with dag:
Expand Down Expand Up @@ -290,7 +293,7 @@ def _get_egress_info(
src_cloud = parent_resources.cloud
assert isinstance(edge_data['edge'], TaskEdge)
task_edge = edge_data['edge']
n_gigabytes = task_edge.data.size_gb if task_edge.data else 0
n_gigabytes = getattr(task_edge.data, 'size_gb', 0)
dst_cloud = resources.cloud
return src_cloud, dst_cloud, n_gigabytes

Expand Down
1 change: 0 additions & 1 deletion sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import dag as dag_lib
from sky import task as task_lib
from sky.backends import cloud_vm_ray_backend

Expand Down

0 comments on commit dc71860

Please sign in to comment.