Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ckpt events #1321

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dlrover/python/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,6 @@ class ErrorMonitorConstants(object):
ACTION_RDZV_TIMEOUT = "rendezvous_timeout"
ACTION_TRAINING_START = "training_start"
ACTION_RESTART_TRAINING = "restart_training"
ACTION_START_SAVE_SHARD = "start_save_shard"
ACTION_COMPLETE_SAVE_SHARD = "complete_save_shard"
ACTION_SAVE_SHARD_ERROR = "save_shard_error"
9 changes: 9 additions & 0 deletions dlrover/python/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,12 @@ class ElasticRunConfigRequest(Message):
@dataclass
class ElasticRunConfig(Message):
configs: Dict[str, str] = field(default_factory=dict)


@dataclass
class InfoEvent(Message):
event_type: str = ""
instance: str = ""
action: str = ""
msg: str = ""
labels: Dict[str, str] = field(default_factory=dict)
17 changes: 17 additions & 0 deletions dlrover/python/elastic_agent/master_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,23 @@ def get_elastic_run_config(self) -> Dict[str, str]:
response: grpc.ElasticRunConfig = self._get(request)
return response.configs

def report_info_event(
self,
event_type: str,
instance: str,
action: str,
msg: str,
labels: Dict[str, str],
):
message = grpc.InfoEvent(
event_type=event_type,
instance=instance,
action=action,
msg=msg,
labels=labels,
)
self._report(message)

@classmethod
def singleton_instance(cls, *args, **kwargs):
if not cls._instance:
Expand Down
48 changes: 48 additions & 0 deletions dlrover/python/elastic_agent/torch/ckpt_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dlrover.python.common import env_utils
from dlrover.python.common.constants import (
CheckpointConstant,
ErrorMonitorConstants,
NodeEnv,
TrainingExceptionLevel,
)
Expand Down Expand Up @@ -567,6 +568,34 @@ def _sync_shm_to_storage(self):
)
self._report_failure_to_master(str(e))

def _report_event_to_master(
self,
event_type: str = "",
instance: str = "",
action: str = "",
msg: str = "",
labels: Optional[Dict[str, str]] = None,
):
master_client = self.get_master_client()
if not master_client:
logger.warning(
"Skip ckpt saver event reporting for master "
"client hasn't setup."
)
return
if labels is None:
labels = {}
try:
master_client.report_info_event(
event_type,
instance,
action,
msg,
labels,
)
except Exception as e:
logger.warning(f"Failed to report event: {e}.")

def _report_failure_to_master(self, error_msg):
master_client = self.get_master_client()
if not master_client:
Expand Down Expand Up @@ -610,6 +639,7 @@ def _save_shard(
step_done_dir: str,
):
"""Save the shard of state dict into the storage."""

try:
shm_handler = self._shm_handlers[local_shard_id]
shm_lock = self._shm_locks[local_shard_id]
Expand All @@ -632,7 +662,19 @@ def _save_shard(
f"of rank {ckpt_config.rank} from the "
f"shared memory into the storage {ckpt_config}."
)
self._report_event_to_master(
event_type=ErrorMonitorConstants.TYPE_INFO,
instance=str(ckpt_config.rank),
action=ErrorMonitorConstants.ACTION_START_SAVE_SHARD,
msg=f"local_id={local_shard_id}, step={step}",
)
self.persist_to_storage(local_shard_id, ckpt_config)
self._report_event_to_master(
event_type=ErrorMonitorConstants.TYPE_INFO,
instance=str(ckpt_config.rank),
action=ErrorMonitorConstants.ACTION_COMPLETE_SAVE_SHARD,
msg=f"local_id={local_shard_id}, step={step}",
)
shm_lock.release()
step_done_file = os.path.join(step_done_dir, str(ckpt_config.rank))
self.storage.write("done", step_done_file)
Expand All @@ -647,6 +689,12 @@ def _save_shard(
f"of rank {ckpt_config.rank}, error: {e}",
exc_info=True,
)
self._report_event_to_master(
event_type=ErrorMonitorConstants.TYPE_ERROR,
instance=str(ckpt_config.rank),
action=ErrorMonitorConstants.ACTION_SAVE_SHARD_ERROR,
msg=f"local_id={local_shard_id}, step={step}, error={e}",
)
return False
finally:
shm_lock.release()
Expand Down
3 changes: 2 additions & 1 deletion dlrover/python/master/dist_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def __init__(
),
}
self.diagnosis_manager = DiagnosisManager()
self._error_monitor = error_monitor
self.job_metric_collector = self._create_metric_collector_if_needed(
args
)
Expand All @@ -154,7 +155,6 @@ def __init__(
self._stop_requested = False
self._exit_code = 0
self._exit_reason = None
self._error_monitor = error_monitor

def _create_master_grpc_service(self, port, params: JobArgs):
return create_master_service(
Expand All @@ -167,6 +167,7 @@ def _create_master_grpc_service(self, port, params: JobArgs):
self.job_metric_collector,
self.elastic_ps_service,
self.sync_service,
self._error_monitor,
)

def _create_metric_collector_if_needed(self, params: JobArgs):
Expand Down
17 changes: 17 additions & 0 deletions dlrover/python/master/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
job_metric_collector=None,
elastic_ps_service=None,
sync_service=None,
error_monitor=None,
):
self._task_manager: TaskManager = task_manager
self._job_manager: JobManager = job_manager
Expand All @@ -92,6 +93,7 @@ def __init__(
self._version = 0
self._start_training_time = 0
self._start_autoscale = False
self._error_monitor = error_monitor

# preload module for class reflection
self._diagnosis_data_module = importlib.import_module(
Expand Down Expand Up @@ -359,6 +361,8 @@ def report(self, request, _):
success = self._sync_checkpoint(node_type, node_id, message)
elif isinstance(message, grpc.DiagnosisReportData):
success = self._report_worker_diagnosis_data(message)
elif isinstance(message, grpc.InfoEvent):
success = self._report_info_event(message)

response.success = success
return response
Expand Down Expand Up @@ -649,6 +653,17 @@ def _sync_training_ports(
port=sync_ports.training_port, newport=sync_ports.next_check_port
)

def _report_info_event(self, message: grpc.InfoEvent):
if self._error_monitor:
self._error_monitor.report_event(
message.event_type,
message.instance,
message.action,
message.msg,
message.labels,
)
return True


def create_master_service(
port,
Expand All @@ -660,6 +675,7 @@ def create_master_service(
job_metric_collector,
elastic_ps_service,
sync_service,
error_monitor=None,
) -> MasterServicer:
"""Create GRPC server"""
logger.info("Creating master service")
Expand All @@ -682,6 +698,7 @@ def create_master_service(
job_metric_collector=job_metric_collector,
elastic_ps_service=elastic_ps_service,
sync_service=sync_service,
error_monitor=error_monitor,
)

elastic_training_pb2_grpc.add_MasterServicer_to_server(
Expand Down
8 changes: 8 additions & 0 deletions dlrover/trainer/tests/torch/checkpoint_egine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
from dlrover.python.common.grpc import find_free_port
from dlrover.python.common.multi_process import clear_sock_dir
from dlrover.python.common.storage import PosixDiskStorage
from dlrover.python.elastic_agent.master_client import (
MasterClient,
build_master_client,
)
from dlrover.python.elastic_agent.torch.ckpt_saver import (
AsyncCheckpointSaver,
CheckpointConfig,
DeepSpeedCheckpointSaver,
MegatronCheckpointSaver,
TempDirCheckpointSaver,
)
from dlrover.python.tests.test_utils import start_local_master
from dlrover.trainer.torch.flash_checkpoint.deepspeed_engine import (
DeepSpeedCheckpointEngine,
)
Expand Down Expand Up @@ -122,6 +127,9 @@ def load(self, resume_path=""):

class ShardingCheckpointEngineTest(unittest.TestCase):
def setUp(self):
self._master, self.addr = start_local_master()
MasterClient._instance = build_master_client(self.addr, 1)

os.environ[NodeEnv.NODE_NUM] = "1"
os.environ[NodeEnv.NODE_RANK] = "0"
AsyncCheckpointSaver._saver_instance = None
Expand Down
8 changes: 8 additions & 0 deletions dlrover/trainer/tests/torch/deepspeed_ckpt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@

from dlrover.python.common.constants import CheckpointConstant
from dlrover.python.common.multi_process import clear_sock_dir
from dlrover.python.elastic_agent.master_client import (
MasterClient,
build_master_client,
)
from dlrover.python.elastic_agent.torch.ckpt_saver import (
DeepSpeedCheckpointSaver,
)
from dlrover.python.tests.test_utils import start_local_master
from dlrover.trainer.torch.flash_checkpoint.deepspeed import (
DeepSpeedCheckpointer,
StorageType,
Expand Down Expand Up @@ -90,6 +95,9 @@ def forward(self, x):

class DeepSpeedCheckpointTest(unittest.TestCase):
def setUp(self):
self._master, self.addr = start_local_master()
MasterClient._instance = build_master_client(self.addr, 1)

DeepSpeedCheckpointSaver._saver_instance = None
DeepSpeedCheckpointSaver.start_async_saving_ckpt()

Expand Down
8 changes: 8 additions & 0 deletions dlrover/trainer/tests/torch/fsdp_ckpt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@
from dlrover.python.common.constants import CheckpointConstant
from dlrover.python.common.multi_process import SharedMemory, clear_sock_dir
from dlrover.python.common.storage import PosixDiskStorage
from dlrover.python.elastic_agent.master_client import (
MasterClient,
build_master_client,
)
from dlrover.python.elastic_agent.torch.ckpt_saver import (
AsyncCheckpointSaver,
SharedMemoryHandler,
)
from dlrover.python.tests.test_utils import start_local_master
from dlrover.trainer.torch.flash_checkpoint.fsdp import FsdpShardCheckpointer
from dlrover.trainer.torch.flash_checkpoint.fsdp_engine import (
FileReader,
Expand Down Expand Up @@ -157,6 +162,9 @@ def _write_state_dict_to_shm(shared_memory, files, state_dict):

class FsdpCheckpointTest(unittest.TestCase):
def setUp(self):
self._master, self.addr = start_local_master()
MasterClient._instance = build_master_client(self.addr, 1)

self.shm = SharedMemory(name="test_write_item", create=True, size=1024)
AsyncCheckpointSaver._saver_instance = None
AsyncCheckpointSaver.start_async_saving_ckpt()
Expand Down
8 changes: 8 additions & 0 deletions dlrover/trainer/tests/torch/megatron_ckpt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@

from dlrover.python.common.constants import CheckpointConstant
from dlrover.python.common.multi_process import clear_sock_dir
from dlrover.python.elastic_agent.master_client import (
MasterClient,
build_master_client,
)
from dlrover.python.elastic_agent.torch.ckpt_saver import (
MegatronCheckpointSaver,
)
from dlrover.python.tests.test_utils import start_local_master
from dlrover.trainer.torch.flash_checkpoint import megatron
from dlrover.trainer.torch.flash_checkpoint.checkpointer import StorageType
from dlrover.trainer.torch.flash_checkpoint.megatron import (
Expand Down Expand Up @@ -61,6 +66,9 @@ def forward(self, x):

class MegatrionCheckpointTest(unittest.TestCase):
def setUp(self):
self._master, self.addr = start_local_master()
MasterClient._instance = build_master_client(self.addr, 1)

MegatronCheckpointSaver._saver_instance = None
MegatronCheckpointSaver.start_async_saving_ckpt()

Expand Down
1 change: 1 addition & 0 deletions test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"distributionStrategy": "ParameterServerStrategy", "dashboardUrl": "localhost:5000", "rayActor": "RayWorker", "command": "python run.py", "spec": {"replicaSpecs": {"worker": {"replica": 3, "resource": {"cpu": 1, "memory": 1024}, "workingDir": "./", "env": [{"key": "key1", "value": "v1"}, {"key": "key2", "value": "v2"}], "requirements": ["pip install numpy", "pip install tensorflow"]}, "ps": {"replica": 3, "resource": {"cpu": 1, "memory": 1024}, "workingDir": "./", "env": [{"key": "key1", "value": "v1"}, {"key": "key2", "value": "v2"}], "requirements": ["pip install numpy", "pip install tensorflow"]}}}}
Loading