Skip to content

Commit

Permalink
[air] Fix behavior of multi-node checkpointing without an external `s…
Browse files Browse the repository at this point in the history
…torage_path` to hard-fail (#37543) (#37567)

Signed-off-by: Justin Yu <[email protected]>
  • Loading branch information
justinvyu authored Jul 19, 2023
1 parent d2299ee commit 0db82e3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
11 changes: 7 additions & 4 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from ray.tune.schedulers import FIFOScheduler, TrialScheduler
from ray.tune.stopper import NoopStopper, Stopper
from ray.tune.search import BasicVariantGenerator, SearchAlgorithm
from ray.tune.syncer import SyncConfig
from ray.tune.syncer import _HeadNodeSyncDeprecationWarning, SyncConfig
from ray.tune.experiment import Trial
from ray.tune.utils import warn_if_slow, flatten_dict
from ray.tune.utils.log import Verbosity, has_verbosity
Expand Down Expand Up @@ -953,12 +953,15 @@ def _process_trial_save(
self._checkpoint_manager.on_trial_checkpoint(trial)
if trial.checkpoint.storage_mode != CheckpointStorage.MEMORY:
self._mark_trial_to_checkpoint(trial)
except Exception:
except Exception as e:
if (
isinstance(e, _HeadNodeSyncDeprecationWarning)
or self._fail_fast == TrialRunner.RAISE
):
raise e
logger.exception(
"Trial %s: Error handling checkpoint %s", trial, checkpoint_value
)
if self._fail_fast == TrialRunner.RAISE:
raise

trial.saving_to = None
decision = self._cached_trial_decisions.pop(trial.trial_id, None)
Expand Down
13 changes: 11 additions & 2 deletions python/ray/tune/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@
f"./{LAZY_CHECKPOINT_MARKER_FILE}",
]


class _HeadNodeSyncDeprecationWarning(DeprecationWarning):
"""Error raised when trying to rely on deprecated head node syncing when
checkpointing across multiple nodes."""

pass


_SYNC_TO_HEAD_DEPRECATION_MESSAGE = (
"Ray AIR no longer supports the synchronization of checkpoints and other "
"artifacts from worker nodes to the head node. This means that the "
Expand All @@ -92,7 +100,8 @@
"`RunConfig(storage_path='/mnt/path/to/nfs_storage')`\n"
"See this Github issue for more details on transitioning to cloud storage/NFS "
"as well as an explanation on why this functionality is "
"being removed: https://github.com/ray-project/ray/issues/37177\n\n"
"being removed: https://github.com/ray-project/ray/issues/37177\n"
"If you are already using NFS, you can ignore this warning message.\n\n"
"Other temporary workarounds:\n"
"- If you want to avoid errors/warnings and continue running with "
"syncing explicitly turned off, set `RunConfig(SyncConfig(syncer=None))`\n"
Expand Down Expand Up @@ -926,7 +935,7 @@ def on_checkpoint(
# that means that it lives on some other node and would be synced to head
# prior to Ray 2.6.
if not os.path.exists(checkpoint.dir_or_data):
raise DeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
raise _HeadNodeSyncDeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
# else:
# No need to raise an error about syncing, since the driver can find
# the checkpoint, because either:
Expand Down
19 changes: 9 additions & 10 deletions release/tune_tests/cloud_tests/workloads/run_cloud_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import ray.cloudpickle as pickle
from ray import air, tune
from ray.air import Checkpoint, session
from ray.tune import TuneError
from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint
from ray.tune.utils.serialization import TuneFunctionDecoder

Expand Down Expand Up @@ -1035,25 +1036,25 @@ def test_head_node_syncing_disabled_error():

# Raise an error for checkpointing + no storage path
def train_fn(config):
time.sleep(1)
session.report({"score": 1}, checkpoint=Checkpoint.from_dict({"dummy": 1}))

tuner = tune.Tuner(
tune.with_resources(train_fn, {"CPU": 2.0}),
run_config=air.RunConfig(
storage_path=None, failure_config=air.FailureConfig(fail_fast="raise")
),
tune_config=tune.TuneConfig(num_samples=4),
run_config=air.RunConfig(storage_path=None),
tune_config=tune.TuneConfig(num_samples=6),
)
with pytest.raises(DeprecationWarning):
with pytest.raises(TuneError) as error_context:
tuner.fit()
# The original `_HeadNodeSyncDeprecationWarning` gets wrapped in 2 TuneError's
assert "_HeadNodeSyncDeprecationWarning" in str(error_context.value.__cause__)
print("Success: checkpointing without a storage path raises an error")

# Workaround: continue running, with syncing explicitly disabled
tuner = tune.Tuner(
tune.with_resources(train_fn, {"CPU": 2.0}),
run_config=air.RunConfig(
storage_path=None,
failure_config=air.FailureConfig(fail_fast="raise"),
sync_config=tune.SyncConfig(syncer=None),
),
tune_config=tune.TuneConfig(num_samples=4),
Expand All @@ -1067,13 +1068,11 @@ def train_fn_no_checkpoint(config):

tuner = tune.Tuner(
tune.with_resources(train_fn_no_checkpoint, {"CPU": 2.0}),
run_config=air.RunConfig(
storage_path=None, failure_config=air.FailureConfig(fail_fast="raise")
),
run_config=air.RunConfig(storage_path=None),
tune_config=tune.TuneConfig(num_samples=4),
)
tuner.fit()
print("Success: a multi-node experiment without checkpoint still runs")
print("Success: a multi-node experiment without checkpointing still runs")


# TODO(ml-team): [Deprecation - head node syncing]
Expand Down

0 comments on commit 0db82e3

Please sign in to comment.