diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index e1c81e5ba..edbdd3c6f 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -352,3 +352,6 @@ class ErrorMonitorConstants(object): ACTION_RDZV_TIMEOUT = "rendezvous_timeout" ACTION_TRAINING_START = "training_start" ACTION_RESTART_TRAINING = "restart_training" + ACTION_START_SAVE_SHARD = "start_save_shard" + ACTION_COMPLETE_SAVE_SHARD = "complete_save_shard" + ACTION_SAVE_SHARD_ERROR = "save_shard_error" diff --git a/dlrover/python/common/grpc.py b/dlrover/python/common/grpc.py index 17a7d0540..861af0d92 100644 --- a/dlrover/python/common/grpc.py +++ b/dlrover/python/common/grpc.py @@ -506,3 +506,12 @@ class ElasticRunConfigRequest(Message): @dataclass class ElasticRunConfig(Message): configs: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class InfoEvent(Message): + event_type: str = "" + instance: str = "" + action: str = "" + msg: str = "" + labels: Dict[str, str] = field(default_factory=dict) diff --git a/dlrover/python/elastic_agent/master_client.py b/dlrover/python/elastic_agent/master_client.py index c59c5593f..867358611 100644 --- a/dlrover/python/elastic_agent/master_client.py +++ b/dlrover/python/elastic_agent/master_client.py @@ -443,6 +443,23 @@ def get_elastic_run_config(self) -> Dict[str, str]: response: grpc.ElasticRunConfig = self._get(request) return response.configs + def report_info_event( + self, + event_type: str, + instance: str, + action: str, + msg: str, + labels: Dict[str, str], + ): + message = grpc.InfoEvent( + event_type=event_type, + instance=instance, + action=action, + msg=msg, + labels=labels, + ) + self._report(message) + @classmethod def singleton_instance(cls, *args, **kwargs): if not cls._instance: diff --git a/dlrover/python/elastic_agent/torch/ckpt_saver.py b/dlrover/python/elastic_agent/torch/ckpt_saver.py index 3c5211982..79c1e85d5 100644 --- a/dlrover/python/elastic_agent/torch/ckpt_saver.py +++ b/dlrover/python/elastic_agent/torch/ckpt_saver.py @@ -32,6 +32,7 @@ from dlrover.python.common import env_utils from dlrover.python.common.constants import ( CheckpointConstant, + ErrorMonitorConstants, NodeEnv, TrainingExceptionLevel, ) @@ -567,6 +568,34 @@ def _sync_shm_to_storage(self): ) self._report_failure_to_master(str(e)) + def _report_event_to_master( + self, + event_type: str = "", + instance: str = "", + action: str = "", + msg: str = "", + labels: Optional[Dict[str, str]] = None, + ): + master_client = self.get_master_client() + if not master_client: + logger.warning( + "Skip ckpt saver event reporting for master " + "client hasn't setup." + ) + return + if labels is None: + labels = {} + try: + master_client.report_info_event( + event_type, + instance, + action, + msg, + labels, + ) + except Exception as e: + logger.warning(f"Failed to report event: {e}.") + def _report_failure_to_master(self, error_msg): master_client = self.get_master_client() if not master_client: @@ -610,6 +639,7 @@ def _save_shard( step_done_dir: str, ): """Save the shard of state dict into the storage.""" + try: shm_handler = self._shm_handlers[local_shard_id] shm_lock = self._shm_locks[local_shard_id] @@ -632,7 +662,19 @@ def _save_shard( f"of rank {ckpt_config.rank} from the " f"shared memory into the storage {ckpt_config}." ) + self._report_event_to_master( + event_type=ErrorMonitorConstants.TYPE_INFO, + instance=str(ckpt_config.rank), + action=ErrorMonitorConstants.ACTION_START_SAVE_SHARD, + msg=f"local_id={local_shard_id}, step={step}", + ) self.persist_to_storage(local_shard_id, ckpt_config) + self._report_event_to_master( + event_type=ErrorMonitorConstants.TYPE_INFO, + instance=str(ckpt_config.rank), + action=ErrorMonitorConstants.ACTION_COMPLETE_SAVE_SHARD, + msg=f"local_id={local_shard_id}, step={step}", + ) shm_lock.release() step_done_file = os.path.join(step_done_dir, str(ckpt_config.rank)) self.storage.write("done", step_done_file) @@ -647,6 +689,12 @@ def _save_shard( f"of rank {ckpt_config.rank}, error: {e}", exc_info=True, ) + self._report_event_to_master( + event_type=ErrorMonitorConstants.TYPE_ERROR, + instance=str(ckpt_config.rank), + action=ErrorMonitorConstants.ACTION_SAVE_SHARD_ERROR, + msg=f"local_id={local_shard_id}, step={step}, error={e}", + ) return False finally: shm_lock.release() diff --git a/dlrover/python/master/dist_master.py b/dlrover/python/master/dist_master.py index c68942e2c..118c1fa73 100644 --- a/dlrover/python/master/dist_master.py +++ b/dlrover/python/master/dist_master.py @@ -144,6 +144,7 @@ def __init__( ), } self.diagnosis_manager = DiagnosisManager() + self._error_monitor = error_monitor self.job_metric_collector = self._create_metric_collector_if_needed( args ) @@ -154,7 +155,6 @@ def __init__( self._stop_requested = False self._exit_code = 0 self._exit_reason = None - self._error_monitor = error_monitor def _create_master_grpc_service(self, port, params: JobArgs): return create_master_service( @@ -167,6 +167,7 @@ def _create_master_grpc_service(self, port, params: JobArgs): self.job_metric_collector, self.elastic_ps_service, self.sync_service, + self._error_monitor, ) def _create_metric_collector_if_needed(self, params: JobArgs): diff --git a/dlrover/python/master/servicer.py b/dlrover/python/master/servicer.py index 3b106a0f6..9b1611e30 100644 --- a/dlrover/python/master/servicer.py +++ b/dlrover/python/master/servicer.py @@ -78,6 +78,7 @@ def __init__( job_metric_collector=None, elastic_ps_service=None, sync_service=None, + error_monitor=None, ): self._task_manager: TaskManager = task_manager self._job_manager: JobManager = job_manager @@ -92,6 +93,7 @@ def __init__( self._version = 0 self._start_training_time = 0 self._start_autoscale = False + self._error_monitor = error_monitor # preload module for class reflection self._diagnosis_data_module = importlib.import_module( @@ -359,6 +361,8 @@ def report(self, request, _): success = self._sync_checkpoint(node_type, node_id, message) elif isinstance(message, grpc.DiagnosisReportData): success = self._report_worker_diagnosis_data(message) + elif isinstance(message, grpc.InfoEvent): + success = self._report_info_event(message) response.success = success return response @@ -649,6 +653,17 @@ def _sync_training_ports( port=sync_ports.training_port, newport=sync_ports.next_check_port ) + def _report_info_event(self, message: grpc.InfoEvent): + if self._error_monitor: + self._error_monitor.report_event( + message.event_type, + message.instance, + message.action, + message.msg, + message.labels, + ) + return True + def create_master_service( port, @@ -660,6 +675,7 @@ def create_master_service( job_metric_collector, elastic_ps_service, sync_service, + error_monitor=None, ) -> MasterServicer: """Create GRPC server""" logger.info("Creating master service") @@ -682,6 +698,7 @@ def create_master_service( job_metric_collector=job_metric_collector, elastic_ps_service=elastic_ps_service, sync_service=sync_service, + error_monitor=error_monitor, ) elastic_training_pb2_grpc.add_MasterServicer_to_server( diff --git a/dlrover/trainer/tests/torch/checkpoint_egine_test.py b/dlrover/trainer/tests/torch/checkpoint_egine_test.py index cbbb4ee7d..10a0d6b0f 100644 --- a/dlrover/trainer/tests/torch/checkpoint_egine_test.py +++ b/dlrover/trainer/tests/torch/checkpoint_egine_test.py @@ -29,6 +29,10 @@ from dlrover.python.common.grpc import find_free_port from dlrover.python.common.multi_process import clear_sock_dir from dlrover.python.common.storage import PosixDiskStorage +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) from dlrover.python.elastic_agent.torch.ckpt_saver import ( AsyncCheckpointSaver, CheckpointConfig, @@ -36,6 +40,7 @@ MegatronCheckpointSaver, TempDirCheckpointSaver, ) +from dlrover.python.tests.test_utils import start_local_master from dlrover.trainer.torch.flash_checkpoint.deepspeed_engine import ( DeepSpeedCheckpointEngine, ) @@ -122,6 +127,9 @@ def load(self, resume_path=""): class ShardingCheckpointEngineTest(unittest.TestCase): def setUp(self): + self._master, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) + os.environ[NodeEnv.NODE_NUM] = "1" os.environ[NodeEnv.NODE_RANK] = "0" AsyncCheckpointSaver._saver_instance = None diff --git a/dlrover/trainer/tests/torch/deepspeed_ckpt_test.py b/dlrover/trainer/tests/torch/deepspeed_ckpt_test.py index a7f990069..6feabb000 100644 --- a/dlrover/trainer/tests/torch/deepspeed_ckpt_test.py +++ b/dlrover/trainer/tests/torch/deepspeed_ckpt_test.py @@ -23,9 +23,14 @@ from dlrover.python.common.constants import CheckpointConstant from dlrover.python.common.multi_process import clear_sock_dir +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) from dlrover.python.elastic_agent.torch.ckpt_saver import ( DeepSpeedCheckpointSaver, ) +from dlrover.python.tests.test_utils import start_local_master from dlrover.trainer.torch.flash_checkpoint.deepspeed import ( DeepSpeedCheckpointer, StorageType, @@ -90,6 +95,9 @@ def forward(self, x): class DeepSpeedCheckpointTest(unittest.TestCase): def setUp(self): + self._master, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) + DeepSpeedCheckpointSaver._saver_instance = None DeepSpeedCheckpointSaver.start_async_saving_ckpt() diff --git a/dlrover/trainer/tests/torch/fsdp_ckpt_test.py b/dlrover/trainer/tests/torch/fsdp_ckpt_test.py index 33aa900b1..22a77ae03 100644 --- a/dlrover/trainer/tests/torch/fsdp_ckpt_test.py +++ b/dlrover/trainer/tests/torch/fsdp_ckpt_test.py @@ -50,10 +50,15 @@ from dlrover.python.common.constants import CheckpointConstant from dlrover.python.common.multi_process import SharedMemory, clear_sock_dir from dlrover.python.common.storage import PosixDiskStorage +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) from dlrover.python.elastic_agent.torch.ckpt_saver import ( AsyncCheckpointSaver, SharedMemoryHandler, ) +from dlrover.python.tests.test_utils import start_local_master from dlrover.trainer.torch.flash_checkpoint.fsdp import FsdpShardCheckpointer from dlrover.trainer.torch.flash_checkpoint.fsdp_engine import ( FileReader, @@ -157,6 +162,9 @@ def _write_state_dict_to_shm(shared_memory, files, state_dict): class FsdpCheckpointTest(unittest.TestCase): def setUp(self): + self._master, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) + self.shm = SharedMemory(name="test_write_item", create=True, size=1024) AsyncCheckpointSaver._saver_instance = None AsyncCheckpointSaver.start_async_saving_ckpt() diff --git a/dlrover/trainer/tests/torch/megatron_ckpt_test.py b/dlrover/trainer/tests/torch/megatron_ckpt_test.py index 78b35f81b..f441dac9b 100644 --- a/dlrover/trainer/tests/torch/megatron_ckpt_test.py +++ b/dlrover/trainer/tests/torch/megatron_ckpt_test.py @@ -24,9 +24,14 @@ from dlrover.python.common.constants import CheckpointConstant from dlrover.python.common.multi_process import clear_sock_dir +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) from dlrover.python.elastic_agent.torch.ckpt_saver import ( MegatronCheckpointSaver, ) +from dlrover.python.tests.test_utils import start_local_master from dlrover.trainer.torch.flash_checkpoint import megatron from dlrover.trainer.torch.flash_checkpoint.checkpointer import StorageType from dlrover.trainer.torch.flash_checkpoint.megatron import ( @@ -61,6 +66,9 @@ def forward(self, x): class MegatrionCheckpointTest(unittest.TestCase): def setUp(self): + self._master, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) + MegatronCheckpointSaver._saver_instance = None MegatronCheckpointSaver.start_async_saving_ckpt() diff --git a/test.json b/test.json new file mode 100644 index 000000000..4c81f12ff --- /dev/null +++ b/test.json @@ -0,0 +1 @@ +{"distributionStrategy": "ParameterServerStrategy", "dashboardUrl": "localhost:5000", "rayActor": "RayWorker", "command": "python run.py", "spec": {"replicaSpecs": {"worker": {"replica": 3, "resource": {"cpu": 1, "memory": 1024}, "workingDir": "./", "env": [{"key": "key1", "value": "v1"}, {"key": "key2", "value": "v2"}], "requirements": ["pip install numpy", "pip install tensorflow"]}, "ps": {"replica": 3, "resource": {"cpu": 1, "memory": 1024}, "workingDir": "./", "env": [{"key": "key1", "value": "v1"}, {"key": "key2", "value": "v2"}], "requirements": ["pip install numpy", "pip install tensorflow"]}}}} \ No newline at end of file