diff --git a/README.md b/README.md
index 2a7da4ee9..657ff8455 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ Furthermore, DLRover offers extension libraries for PyTorch and TensorFlow to ex
## Latest News
-- [2025/01] [EDiT: A Local-SGD-Based Efficient Distributed Training Method for Large Language Models, ICLR'24.](https://arxiv.org/abs/2412.07210)
+- [2025/01] [EDiT: A Local-SGD-Based Efficient Distributed Training Method for Large Language Models, ICLR'25.](https://arxiv.org/abs/2412.07210)
- [2024/06] [DLRover-RM has been accepted by VLDB'24.](docs/blogs/dlrover_rm.md)
- [2024/04] [Flash Checkpoint Supports HuggingFace transformers.Trainer to Asynchronously persist checkpoints.](docs/blogs/flash_checkpoint.md#huggingface-transformerstrainer)
- [2024/02] [Flash Checkpoint Saves the Megatron-LM Checkpoint in Seconds.](docs/blogs/megatron_flash_checkpoint.md)
diff --git a/dlrover/python/common/global_context.py b/dlrover/python/common/global_context.py
index 1f29c81c0..40027255e 100644
--- a/dlrover/python/common/global_context.py
+++ b/dlrover/python/common/global_context.py
@@ -13,7 +13,11 @@
import os
-from dlrover.python.common.constants import CommunicationType, UserEnv
+from dlrover.python.common.constants import (
+ Accelerators,
+ CommunicationType,
+ UserEnv,
+)
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.singleton import Singleton
from dlrover.python.util.common_util import (
@@ -58,6 +62,8 @@ class DefaultValues(object):
SEC_TO_WAIT_FAILED_PS = 600 # 10min
HANG_CPU_USAGE_RATE = 0.05
HANG_DETECTION = 1
+ HANG_DOWNTIME = 30
+ MIN_HANG_DOWNTIME = 3
GPU_NUM_PER_NODE = 8
NPU_NUM_PER_NODE = 16
MAX_METRIC_REC = 30
@@ -107,9 +113,12 @@ def __init__(self):
# The strategy of 'hang detection':
# 0: log only; 1: notify; 2: with fault tolerance
self.hang_detection = DefaultValues.HANG_DETECTION
+ # The duration of downtime as training hang, unit is minute
+ self.hang_downtime = DefaultValues.HANG_DOWNTIME
+ #
+ self.xpu_type = Accelerators.NVIDIA_GPU
self.gpu_per_node = DefaultValues.GPU_NUM_PER_NODE
self.npu_per_node = DefaultValues.NPU_NUM_PER_NODE
- self.max_metric_records = DefaultValues.MAX_METRIC_REC
def set_params_from_brain(self):
self.train_speed_record_num = self.get_param_value_from_brain(
diff --git a/dlrover/python/common/metric/context.py b/dlrover/python/common/metric/context.py
index 5a2eecac5..86e1509ea 100644
--- a/dlrover/python/common/metric/context.py
+++ b/dlrover/python/common/metric/context.py
@@ -10,12 +10,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import threading
from collections import OrderedDict
+from datetime import datetime
from typing import Dict
-from dlrover.python.common.global_context import Context
+from dlrover.python.common.global_context import Context, DefaultValues
+from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.metric.metric import XpuNodeMetric
from dlrover.python.common.singleton import Singleton
@@ -29,7 +30,6 @@ class JobMetricContext(Singleton):
"""
def __init__(self):
- self._lock = threading.Lock()
"""
job metrics dict is a dict with timestamp as key,
and the value is another dict with worker node id as key,
@@ -38,7 +38,119 @@ def __init__(self):
self._xpu_job_metrics: OrderedDict[
int, Dict[str, XpuNodeMetric]
] = OrderedDict()
- self.max_metric_records = _dlrover_context.max_metric_records
+ self.max_metric_records = DefaultValues.MAX_METRIC_REC
+ self._lock = threading.Lock()
+
+ def backtrace_avg_metrics(self, metric, depth):
+ """
+ backtrace all avg metrics of all nodes
+
+ Args:
+ metric: name, e.g. GPU_TENSOR_UTIL
+ depth: maximum depth of backtrace
+
+ Returns:
+ OrderedDict with key as timestamp, value as avg metric
+
+ """
+ with self._lock:
+ try:
+ od = OrderedDict()
+ for tm in list(self._xpu_job_metrics.keys())[::-1]:
+ total = 0
+ for v in self._xpu_job_metrics[tm].values():
+ total += v.get_avg_metric(metric)
+ od[tm] = round(
+ total / len(self._xpu_job_metrics[tm].values()), 2
+ )
+ depth = depth - 1
+ if depth <= 0:
+ break
+ return od
+ except Exception as e:
+ logger.error(f"bt_avg_metrics {metric} error: {e}")
+ return None
+
+ def backtrace_node_metrics(self, metric, depth):
+ """
+ backtrace all node avg metric lists
+
+ Args:
+ metric: name, e.g. GPU_TENSOR_UTIL
+ depth: maximum backtrace depth
+
+ Returns:
+ OrderedDict with key as timestamp, value as node metric list
+
+ """
+ with self._lock:
+ try:
+ od = OrderedDict()
+ for tm in list(self._xpu_job_metrics.keys())[::-1]:
+ od[tm] = []
+ for v in self._xpu_job_metrics[tm].values():
+ od[tm].append(v.get_avg_metric(metric))
+ depth = depth - 1
+ if depth <= 0:
+ break
+ return od
+ except Exception as e:
+ logger.error(f"bt_node_metrics {metric} error: {e}")
+ return None
+
+ def backtrace_rank_metrics(self, metric, depth):
+ """
+ backtrace a number of rank metric lists
+
+ Args:
+ metric: name, e.g. GPU_TENSOR_UTIL
+ depth: maximum backtrace depth
+
+ Returns:
+ OrderedDict with key as timestamp, value as rank metric list
+
+ """
+ with self._lock:
+ try:
+ od = OrderedDict()
+ for tm in list(self._xpu_job_metrics.keys())[::-1]:
+ od[tm] = []
+ for v in self._xpu_job_metrics[tm].values():
+ od[tm].append(v.get_node_metrics(metric))
+ depth = depth - 1
+ if depth <= 0:
+ break
+ return od
+ except Exception as e:
+ logger.error(f"bt_rank_metrics {metric} error: {e}")
+ return None
+
+ def log_job_metric(self, metric):
+ """
+ print job avg metrics of type metric among all nodes
+ """
+ try:
+ for tm, metrics in self._xpu_job_metrics.items():
+ dt_obj = datetime.fromtimestamp(tm)
+ dt_str = "{}-{}-{} {}:{}:{}".format(
+ dt_obj.year,
+ dt_obj.month,
+ dt_obj.day,
+ dt_obj.hour,
+ dt_obj.minute,
+ dt_obj.second,
+ )
+ total = 0
+ for v in metrics.values():
+ total += v.get_avg_metric(metric)
+
+ logger.info(
+ f"{metric}[{dt_str}]: "
+ f"{round(total/len(metrics.values()), 2)}"
+ )
+
+ except Exception as e:
+ logger.error(f"log_job_metric error: {e}")
def add_node_metrics(
self, timestamp: int, metrics: Dict[str, XpuNodeMetric]
diff --git a/dlrover/python/common/metric/metric.py b/dlrover/python/common/metric/metric.py
index bb18cb075..adb94a4d2 100644
--- a/dlrover/python/common/metric/metric.py
+++ b/dlrover/python/common/metric/metric.py
@@ -143,6 +143,14 @@ def __init__(self):
def update_avg_metrics(self):
pass
+ @abstractmethod
+ def get_avg_metric(self, metric):
+ pass
+
+ @abstractmethod
+ def get_node_metrics(self, metric):
+ pass
+
class GpuNodeMetric(XpuNodeMetric):
"""
@@ -155,6 +163,15 @@ def __init__(self):
self.node_metrics: Dict[int, GpuMetric] = {}
self.avg_metrics = GpuMetric()
+ def get_avg_metric(self, metric):
+ return self.avg_metrics.get_metric(metric)
+
+ def get_node_metrics(self, metric):
+ metrics = []
+ for v in self.node_metrics.values():
+ metrics.append(v.get_metric(metric))
+ return metrics
+
def update_avg_metrics(self):
self.avg_metrics.metrics[GpuMetricEnum.GPU_FREE_MEM] = 0
self.avg_metrics.metrics[GpuMetricEnum.GPU_USED_MEM] = 0
@@ -217,6 +234,15 @@ def __init__(self):
self.node_metrics: Dict[int, NpuMetric] = {}
self.avg_metrics = NpuMetric()
+ def get_avg_metric(self, metric):
+ return self.avg_metrics.get_metric(metric)
+
+ def get_node_metrics(self, metric):
+ metrics = []
+ for v in self.node_metrics.values():
+ metrics.append(v.get_metric(metric))
+ return metrics
+
def update_avg_metrics(self):
for _, metric in self.node_metrics.items():
self.avg_metrics.metrics[
diff --git a/dlrover/python/common/metric/monitor.py b/dlrover/python/common/metric/monitor.py
index 1f202509e..6375469bf 100644
--- a/dlrover/python/common/metric/monitor.py
+++ b/dlrover/python/common/metric/monitor.py
@@ -12,6 +12,8 @@
# limitations under the License.
import os
+import threading
+import time
import traceback
from abc import ABCMeta, abstractmethod
from datetime import datetime
@@ -75,8 +77,12 @@ class SimpleMetricMonitor(MetricMonitor):
implementation of metric monitor that uses http REST api to query metrics
"""
- def __init__(self):
+ def __init__(self, job_name, metrics):
super().__init__()
+ self._stopped = True
+ self._job_name = job_name
+ self._collect_metrics = metrics
+ self._thread = None
@staticmethod
def build_request_headers(token):
@@ -148,6 +154,31 @@ def build_gpu_pod_request_data(metric, pod, start, end):
}
]
+ @staticmethod
+ def align_ts_on_minute(tm):
+ """
+ align timestamp on 0 sec of a minute
+
+ Args:
+ tm: timestamp
+
+ Returns:
+ aligned timestamp with unit of second
+
+ """
+ dt_obj = datetime.fromtimestamp(tm)
+ dt_str = "{}-{}-{} {}:{}:{}".format(
+ dt_obj.year,
+ dt_obj.month,
+ dt_obj.day,
+ dt_obj.hour,
+ dt_obj.minute,
+ "00",
+ )
+ tm_obj = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
+
+ return int(tm_obj.timestamp())
+
@staticmethod
def adjust_timestamp(start, end):
"""
@@ -249,21 +280,95 @@ def query_job_metrics(
return None
def collect_job_metrics(
- self, job_name, metric_type, start, end, pod_name=None, metrics=None
+ self,
+ job_name,
+ metric_type,
+ start,
+ end,
+ pod_name=None,
+ metrics=_metric_context,
):
pass
+ def _collector(self, interval):
+ """
+ _collector is thread func
+ """
+ logger.info("Metric monitor collector is running...")
+ self._stopped = False
+ while True:
+ if self._stopped:
+ logger.info("Metric monitor collector is stopping...")
+ break
+
+ try:
+ tm = self.align_ts_on_minute(datetime.now().timestamp()) - 60
+ job_metrics = {}
+
+ for metric in self._collect_metrics:
+ if not self.collect_job_metrics(
+ self._job_name,
+ metric,
+ tm,
+ tm + 60,
+ metrics=job_metrics,
+ ):
+ raise Exception("collect_job_metrics return None")
+ _metric_context.add_node_metrics(tm, job_metrics)
+ time.sleep(interval)
+
+ except Exception as e:
+ logger.warning(
+ f"Collect metrics failed, reset after 5min: {e}"
+ )
+ logger.info(
+ f"Dump metric context {_metric_context.size()} entries:"
+ )
+ for metric in self._collect_metrics:
+ _metric_context.log_job_metric(metric)
+ _metric_context.clear_node_metrics()
+ time.sleep(300)
+
+ def start(self, interval=60):
+ try:
+ self._thread = threading.Thread(
+ target=self._collector,
+ name="_metric_collector",
+ args=(interval,),
+ daemon=True,
+ )
+ self._thread.start()
+ except Exception as e:
+ logger.error(f"Failed to start metric collector: {e}")
+
+ def stop(self):
+ self._stopped = True
+
+ def join(self, timeout=0):
+ if not self._thread:
+ return
+ if timeout == 0:
+ self._thread.join()
+ else:
+ self._thread.join(timeout)
+
class GpuMetricMonitor(SimpleMetricMonitor):
"""
metric monitor of nvidia GPU metrics
"""
- def __init__(self):
- super().__init__()
+ def __init__(self, job_name, metrics):
+ super().__init__(job_name, metrics)
def collect_job_metrics(
- self, job_name, metric_type, start, end, pod_name=None, metrics=None
+ self,
+ job_name,
+ metric_type,
+ start,
+ end,
+ pod_name=None,
+ metrics=None,
):
rsp = self.query_job_metrics(
job_name, metric_type, start, end, is_gpu=True, pod_name=pod_name
@@ -304,18 +409,21 @@ def collect_job_metrics(
metric_type, data
)
+ for pod, node_metric in job_metrics.items():
+ node_metric.update_avg_metrics()
+
return job_metrics
except KeyError as e:
- logger.warning(f"Key error: {e}")
+ logger.warning(f"collect_job_metrics key error: {e}")
traceback.print_exc()
return None
except ValueError as e:
- logger.warning(f"Value error: {e}")
+ logger.warning(f"collect_job_metrics value error: {e}")
traceback.print_exc()
return None
except Exception as e:
- logger.warning(f"Unexpected error: {e}")
+ logger.warning(f"collect_job_metrics unexpected error: {e}")
traceback.print_exc()
return None
@@ -326,11 +434,17 @@ class NpuMetricMonitor(SimpleMetricMonitor):
"""
- def __init__(self):
- super().__init__()
+ def __init__(self, job_name, metrics):
+ super().__init__(job_name, metrics)
def collect_job_metrics(
- self, job_name, metric_enum, start, end, pod_name=None, metrics=None
+ self,
+ job_name,
+ metric_enum,
+ start,
+ end,
+ pod_name=None,
+ metrics=None,
):
rsp = self.query_job_metrics(
job_name, metric_enum, start, end, is_gpu=False, pod_name=pod_name
@@ -375,17 +489,20 @@ def collect_job_metrics(
metric_enum, data
)
+ for pod, node_metric in job_metrics.items():
+ node_metric.update_avg_metrics()
+
return job_metrics
except KeyError as e:
- logger.warning(f"Key error: {e}")
+ logger.warning(f"collect_job_metrics key error: {e}")
traceback.print_exc()
return None
except ValueError as e:
- logger.warning(f"Value error: {e}")
+ logger.warning(f"collect_job_metrics value error: {e}")
traceback.print_exc()
return None
except Exception as e:
- logger.warning(f"Unexpected error: {e}")
+ logger.warning(f"collect_job_metrics unexpected error: {e}")
traceback.print_exc()
return None
diff --git a/dlrover/python/diagnosis/common/constants.py b/dlrover/python/diagnosis/common/constants.py
index a55a2a864..2a75af812 100644
--- a/dlrover/python/diagnosis/common/constants.py
+++ b/dlrover/python/diagnosis/common/constants.py
@@ -33,6 +33,8 @@ class InferenceConfigKey(object):
class DiagnosisConstant(object):
MASTER_DIAGNOSIS_OBSERVING_INTERVAL_SECS = 180
+ METRIC_COLLECT_INTERVAL_SECS = 60
+ CHECK_TENSOR_DEFAULT_RECORDS = 30
# the minimum diagnosis interval is 5 seconds
AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS = 5
AGENT_PERIODICALLY_REPORT_INTERVAL_SECS = 15
@@ -66,3 +68,43 @@ class DiagnosisActionType(object):
# node operation
RESTART_WORKER = "restart_worker"
RELAUNCH_WORKER = "relaunch_worker"
+
+
+class DiagnosisResult(object):
+ # diag error
+ DIAG_ERROR = "error"
+ # waiting for more data to finish diag
+ DIAG_WAITING = "waiting"
+ # continue to next diagnosis phase
+ DIAG_CONTINUE = "continue"
+ # diag finished, job is healthy
+ DIAG_HEALTHY = "succeeded"
+ # diag finished, job is hang
+ DIAG_HANG = "hang"
+ # diag finished, job has straggler
+ DIAG_STRAGGLE = "straggle"
+ # diag finished, job has failure node
+ DIAG_FAILURE = "failure"
+
+
+class JobHangPolicy(object):
+ XPU_TIMER = "xpu_timer"
+ STEP_HANG = "step_hang"
+ CKPT_HANG = "ckpt_hang"
+ TENSOR_ZERO = "tensor_zero"
+ NPU_ZERO = "npu_zero"
+ NVLINK_DROP = "nvlink_drop"
+ RDMA_DROP = "rdma_drop"
+ PROCESS_HANG = "process_hang"
+
+
+class JobHangWatermark(object):
+ # TENSOR_UTIL is [0, 1]
+ TENSOR_UTIL_LOW_WM = 0.001
+ TENSOR_UTIL_HIGH_WM = 0.8
+ # GPU_UTIL is [0, 100]
+ GPU_UTIL_LOW_WM = 0.5
+ GPU_UTIL_HIGH_WM = 98
+ # NPU_UTIL is [0, 100]
+ NPU_UTIL_LOW_WM = 0.5
+ NPU_UTIL_HIGH_WM = 98
diff --git a/dlrover/python/elastic_agent/torch/training.py b/dlrover/python/elastic_agent/torch/training.py
index c5a41b99f..7c41236b7 100644
--- a/dlrover/python/elastic_agent/torch/training.py
+++ b/dlrover/python/elastic_agent/torch/training.py
@@ -1075,9 +1075,17 @@ def _process_diagnosis_action(self, action: DiagnosisAction):
if isinstance(action, NodeAction):
action.__class__ = NodeAction
if action.action_type == DiagnosisActionType.RESTART_WORKER:
+ logger.info(
+ f"exec diagnosis action: "
+ f"{action.action_type} {action.instance}"
+ )
self._remaining_failovers -= 1
self._restart_workers(self._worker_group)
elif action.action_type == DiagnosisActionType.RELAUNCH_WORKER:
+ logger.info(
+ f"exec diagnosis action: "
+ f"{action.action_type} {action.instance}"
+ )
self._stop_workers(self._worker_group)
self._worker_group.state = WorkerState.FAILED
elif isinstance(action, EventAction):
@@ -1279,10 +1287,10 @@ def launch_agent(
f" training_log : {config.training_log_file}\n"
f" failure_errors : {config.failure_node_errors}\n"
f" numa_affinity : {config.numa_affinity}\n"
+ f" accelerator : {config.accelerator}\n"
)
_set_paral_config()
-
monitor = TorchTrainingMonitor(ConfigPath.RUNTIME_METRICS)
monitor.start()
@@ -1307,6 +1315,7 @@ def launch_agent(
shutdown_rdzv = True
is_node_check_failed = False
result = None
+
try:
metrics.initialize_metrics(metrics.MetricsConfig(config.metrics_cfg))
diff --git a/dlrover/python/master/args.py b/dlrover/python/master/args.py
index a72e3cd35..a3bcf7478 100644
--- a/dlrover/python/master/args.py
+++ b/dlrover/python/master/args.py
@@ -116,6 +116,18 @@ def _build_master_args_parser():
help="The strategy of 'hang detection', "
"0: log only; 1: notify; 2: with fault tolerance",
)
+ parser.add_argument(
+ "--hang_downtime",
+ default=30,
+ type=pos_int,
+ help="Training downtime to detect job hang, unit is minute",
+ )
+ parser.add_argument(
+ "--xpu_type",
+ default="nvidia",
+ type=str,
+ help="The type of XPU, should be 'nvidia' or 'ascend'",
+ )
add_params(parser)
return parser
diff --git a/dlrover/python/master/diagnosis/diagnosis_manager.py b/dlrover/python/master/diagnosis/diagnosis_manager.py
index 6df80f6c0..f999635e0 100644
--- a/dlrover/python/master/diagnosis/diagnosis_manager.py
+++ b/dlrover/python/master/diagnosis/diagnosis_manager.py
@@ -11,11 +11,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
import threading
import time
+from datetime import datetime
+from dlrover.python.common.constants import (
+ Accelerators,
+ GpuMetricEnum,
+ NpuMetricEnum,
+)
+from dlrover.python.common.global_context import Context, DefaultValues
from dlrover.python.common.log import default_logger as logger
-from dlrover.python.diagnosis.common.constants import DiagnosisConstant
+from dlrover.python.common.metric.context import JobMetricContext
+from dlrover.python.common.metric.monitor import (
+ GpuMetricMonitor,
+ NpuMetricMonitor,
+)
+from dlrover.python.diagnosis.common.constants import (
+ DiagnosisActionType,
+ DiagnosisConstant,
+ DiagnosisResult,
+ JobHangWatermark,
+)
+from dlrover.python.diagnosis.common.diagnosis_action import NodeAction
from dlrover.python.diagnosis.common.diagnosis_data import DiagnosisData
from dlrover.python.diagnosis.common.inference_chain import (
InferenceAttribute,
@@ -35,17 +54,24 @@
)
from dlrover.python.master.node.job_context import get_job_context
+_metric_context = JobMetricContext.singleton_instance()
+_dlrover_context = Context.singleton_instance()
+
class DiagnosisManager:
"""
DiagnosisManager is to manage all diagnosis issues in a training job
+
"""
- def __init__(self):
+ def __init__(self, job_name=None):
self._is_observing_started = False
self._data_manager: DiagnosisDataManager = DiagnosisDataManager(600)
self._diagnostician: Diagnostician = Diagnostician(self._data_manager)
self._job_context = get_job_context()
+ self._job_name = job_name
+ self._metric_monitor = None
+ self._lock = threading.Lock()
def collect_diagnosis_data(self, data: DiagnosisData):
self._data_manager.store_data(data)
@@ -54,8 +80,54 @@ def pre_check(self):
logger.info("Start Diagnosis Manager to pre-check training...")
pass
+ def start_metric_collect(self):
+ """
+ create a XpuMetricMonitor instance based on worker XPU type
+ start a thread to collect metrics data
+ store the data into global JobMetricContext
+
+ """
+ logger.info(f"start {_dlrover_context.xpu_type} metric collector...")
+ if not os.getenv("DLROVER_METRIC_URL", ""):
+ logger.warning("no GPU metrics url defined, stop metric collector")
+ return
+ if not os.getenv("DLROVER_METRIC_TOKEN", ""):
+ logger.warning(
+ "no GPU metrics token defined, stop metric collector"
+ )
+ return
+
+ if _dlrover_context.xpu_type is Accelerators.ASCEND_NPU:
+ self._metric_monitor = NpuMetricMonitor(
+ job_name=self._job_name,
+ metrics=[
+ NpuMetricEnum.NPU_UTIL,
+ ],
+ )
+ else:
+ self._metric_monitor = GpuMetricMonitor(
+ job_name=self._job_name,
+ metrics=[
+ GpuMetricEnum.GPU_UTIL,
+ GpuMetricEnum.GPU_TENSOR_UTIL,
+ ],
+ )
+
+ if self._metric_monitor:
+ self._metric_monitor.start()
+
+ def stop_metric_collect(self):
+ logger.info("Stop metric collector...")
+ if self._metric_monitor:
+ self._metric_monitor.stop()
+
+ def join_metric_collect(self):
+ logger.info("Join metric collector...")
+ if self._metric_monitor:
+ self._metric_monitor.join()
+
def start_observing(self):
- logger.info("Start Diagnosis Manager to observing training...")
+ logger.info("Start diagnosis manager training observation...")
self._is_observing_started = True
self._diagnostician.register_training_problems(
@@ -75,28 +147,121 @@ def start_observing(self):
)
try:
- thread = threading.Thread(
+ diag = threading.Thread(
target=self._diagnose,
name="diagnose_failures",
daemon=True,
)
- thread.start()
- if thread.is_alive():
- logger.info("Diagnosis Manager is started")
+ diag.start()
+ if diag.is_alive():
+ logger.info("_diagnose thread has started")
+
+ diag_metric = threading.Thread(
+ target=self._diagnose_metrics,
+ name="diagnose_metrics",
+ daemon=True,
+ )
+ diag_metric.start()
+ if diag_metric.is_alive():
+ logger.info("_diagnose_metrics thread has started")
+
except Exception as e:
logger.error(
f"Failed to start the diagnosis manager thread. Error: {e}"
)
def stop_observing(self):
- logger.info("Stop Diagnosis Manager to observing training.")
+ logger.info("Stop diagnosis manager training observation...")
self._is_observing_started = False
+ @staticmethod
+ def check_tensor_drop_zero(duration):
+ if duration > _metric_context.max_metric_records:
+ duration = _metric_context.max_metric_records
+
+ if _dlrover_context.xpu_type is Accelerators.ASCEND_NPU:
+ metrics = _metric_context.backtrace_avg_metrics(
+ NpuMetricEnum.NPU_UTIL, duration
+ )
+ else:
+ metrics = _metric_context.backtrace_avg_metrics(
+ GpuMetricEnum.GPU_TENSOR_UTIL, duration
+ )
+
+ if metrics is None:
+ logger.warning(f"invalid metrics: {metrics}")
+ return DiagnosisResult.DIAG_ERROR, 0, 0
+
+ if len(metrics) < duration:
+ logger.warning(
+ f"Waiting for tensor metrics: {len(metrics)}/{duration}"
+ )
+ return DiagnosisResult.DIAG_WAITING, 0, 0
+
+ key_list = list(metrics.keys())
+ key_list.sort(reverse=True)
+
+ start_ts = key_list[0]
+ end_ts = key_list[0]
+ for key in key_list:
+ end_ts = key
+ if metrics[key] > JobHangWatermark.TENSOR_UTIL_LOW_WM:
+ return DiagnosisResult.DIAG_HEALTHY, start_ts, end_ts
+
+ duration = duration - 1
+ if duration <= 0:
+ break
+
+ return DiagnosisResult.DIAG_HANG, start_ts, end_ts
+
+ def _diagnose_metrics(self):
+ logger.info("_diagnose_metrics thread is running...")
+ while True:
+ if not self._is_observing_started:
+ logger.info(
+ f"Stop _metric_diagnose thread due to "
+ f"{self._is_observing_started}"
+ )
+ break
+
+ if (
+ _dlrover_context.hang_downtime
+ < DefaultValues.MIN_HANG_DOWNTIME
+ ):
+ hang_downtime = DefaultValues.MIN_HANG_DOWNTIME
+ else:
+ hang_downtime = _dlrover_context.hang_downtime
+ result, start, end = self.check_tensor_drop_zero(hang_downtime)
+ if result is DiagnosisResult.DIAG_HANG:
+ start_dt = datetime.fromtimestamp(start).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ end_dt = datetime.fromtimestamp(end).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ logger.warning(
+ f"Detect job hang by tensor drop zero: "
+ f"{start_dt}-{end_dt}"
+ )
+
+ if _dlrover_context.hang_detection == 2:
+ self._job_context.enqueue_action(
+ NodeAction(
+ action_type=DiagnosisActionType.RESTART_WORKER,
+ instance=DiagnosisConstant.ANY_INSTANCE,
+ )
+ )
+
+ time.sleep(DiagnosisConstant.METRIC_COLLECT_INTERVAL_SECS)
+
def _diagnose(self):
- logger.info("Start to diagnose failures for observing.")
+ logger.info("_diagnose thread is running...")
while True:
if not self._is_observing_started:
- logger.info("Stop to diagnose failures for observing.")
+ logger.info(
+ f"Stop _diagnose thread due to "
+ f"{self._is_observing_started}"
+ )
break
observed_problems = self._diagnostician.observe_training()
diff --git a/dlrover/python/master/dist_master.py b/dlrover/python/master/dist_master.py
index c10ec853e..b4aaf558c 100644
--- a/dlrover/python/master/dist_master.py
+++ b/dlrover/python/master/dist_master.py
@@ -143,7 +143,7 @@ def __init__(
error_monitor
),
}
- self.diagnosis_manager = DiagnosisManager()
+ self.diagnosis_manager = DiagnosisManager(job_name=args.job_name)
self._error_monitor = error_monitor
self.job_metric_collector = self._create_metric_collector_if_needed(
args
@@ -231,6 +231,11 @@ def run(self):
"""
# start training runtime diagnosis
+ try:
+ self.diagnosis_manager.start_metric_collect()
+ except Exception as e:
+ logger.warning(f"Failed to start metric collecting: {str(e)}")
+
try:
self.diagnosis_manager.start_observing()
except Exception as e:
diff --git a/dlrover/python/master/local_master.py b/dlrover/python/master/local_master.py
index aa03ed865..36f8d4182 100644
--- a/dlrover/python/master/local_master.py
+++ b/dlrover/python/master/local_master.py
@@ -21,6 +21,7 @@
ReporterType,
)
from dlrover.python.common.log import default_logger as logger
+from dlrover.python.master.diagnosis.diagnosis_manager import DiagnosisManager
from dlrover.python.master.elastic_training.rdzv_manager import (
ElasticTrainingRendezvousManager,
NetworkCheckRendezvousManager,
@@ -40,6 +41,7 @@ def __init__(self, port, args: JobArgs):
self.speed_monitor = SpeedMonitor()
self.task_manager = TaskManager(0, self.speed_monitor)
self.job_manager = create_job_manager(args, self.speed_monitor)
+ self.diagnosis_manager = DiagnosisManager(job_name=args.job_name)
elastic_training = RendezvousName.ELASTIC_TRAINING
self.rdzv_managers: Dict[str, RendezvousManager] = {
elastic_training: ElasticTrainingRendezvousManager(),
@@ -61,7 +63,7 @@ def _create_master_service(self, port, params: JobArgs):
self.job_manager,
self.speed_monitor,
self.rdzv_managers,
- None,
+ self.diagnosis_manager,
self.job_metric_collector,
None,
None,
diff --git a/dlrover/python/master/main.py b/dlrover/python/master/main.py
index de4bda4c2..281ca1834 100644
--- a/dlrover/python/master/main.py
+++ b/dlrover/python/master/main.py
@@ -14,6 +14,7 @@
import os
from dlrover.python.common.constants import (
+ Accelerators,
DistributionStrategy,
NodeType,
PlatformType,
@@ -45,6 +46,18 @@ def run(args):
job_args.initilize()
logger.info("Job args : %s", job_args.to_json(indent=4))
_dlrover_context.config_master_port(port=args.port)
+ _dlrover_context.hang_detection = args.hang_detection
+ _dlrover_context.hang_downtime = args.hang_downtime
+ if args.xpu_type.lower() == "ascend":
+ _dlrover_context.xpu_type = Accelerators.ASCEND_NPU
+ elif args.xpu_type.lower() == "nvidia":
+ _dlrover_context.xpu_type = Accelerators.NVIDIA_GPU
+ else:
+ logger.info(
+ f"Invalid XPU type: {args.xpu_type}, use Nvidia as default"
+ )
+ _dlrover_context.xpu_type = Accelerators.NVIDIA_GPU
+
if job_args.platform == PlatformType.LOCAL:
from dlrover.python.master.local_master import LocalJobMaster
diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py
index 39f9b8625..6e0b7eede 100644
--- a/dlrover/python/master/node/dist_job_manager.py
+++ b/dlrover/python/master/node/dist_job_manager.py
@@ -1242,7 +1242,13 @@ def collect_node_heart_beat(
logger.info(f"Start receiving heartbeat from node {node_id}")
node.heartbeat_time = timestamp
self._job_context.update_job_node(node)
- return self._job_context.next_action(instance=node_id)
+ action = self._job_context.next_action(instance=node_id)
+ if not action or isinstance(action, NoAction):
+ return self._job_context.next_action(
+ instance=DiagnosisConstant.ANY_INSTANCE
+ )
+ else:
+ return action
def update_node_required_info_callback(self):
self._worker_manager.update_node_required_info(self._nodes_required)
diff --git a/dlrover/python/tests/test_args.py b/dlrover/python/tests/test_args.py
index eb49386db..69fbce034 100644
--- a/dlrover/python/tests/test_args.py
+++ b/dlrover/python/tests/test_args.py
@@ -47,3 +47,28 @@ def test_parse_master_args(self):
self.assertEqual(parsed_args.pending_timeout, 600)
self.assertEqual(parsed_args.pending_fail_strategy, 2)
self.assertTrue(parsed_args.service_type, "http")
+
+ original_args = [
+ "--job_name",
+ "test",
+ "--hang_detection",
+ "1",
+ "--hang_downtime",
+ "15",
+ "--xpu_type",
+ "ascend",
+ ]
+ parsed_args = parse_master_args(original_args)
+ self.assertEqual(parsed_args.job_name, "test")
+ self.assertEqual(parsed_args.hang_detection, 1)
+ self.assertEqual(parsed_args.hang_downtime, 15)
+ self.assertEqual(parsed_args.xpu_type, "ascend")
+
+ original_args = [
+ "--job_name",
+ "test",
+ "--xpu_type",
+ "nvidia",
+ ]
+ parsed_args = parse_master_args(original_args)
+ self.assertEqual(parsed_args.xpu_type, "nvidia")
diff --git a/dlrover/python/tests/test_diagnosis_manager.py b/dlrover/python/tests/test_diagnosis_manager.py
index 4a721b9e1..db9816a33 100644
--- a/dlrover/python/tests/test_diagnosis_manager.py
+++ b/dlrover/python/tests/test_diagnosis_manager.py
@@ -11,14 +11,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import copy
import time
import unittest
+from datetime import datetime
from typing import List
from unittest import mock
+from dlrover.python.common.constants import Accelerators, GpuMetricEnum
+from dlrover.python.common.global_context import Context
+from dlrover.python.common.metric.context import JobMetricContext
+from dlrover.python.common.metric.metric import GpuMetric, GpuNodeMetric
from dlrover.python.diagnosis.common.constants import (
DiagnosisActionType,
DiagnosisDataType,
+ DiagnosisResult,
)
from dlrover.python.diagnosis.common.diagnosis_data import (
DiagnosisData,
@@ -39,6 +46,9 @@
)
from dlrover.python.master.diagnosis.diagnosis_manager import DiagnosisManager
+_metric_context = JobMetricContext.singleton_instance()
+_dlrover_context = Context.singleton_instance()
+
class DiagnosisManagerTest(unittest.TestCase):
def setUp(self):
@@ -105,3 +115,63 @@ def test_diagnosis_manager(self):
# explore solutions to observed problems
action = mgr._diagnostician.resolve_problems(observed_problems)
self.assertEqual(action.action_type, DiagnosisActionType.NONE)
+
+ def test_gpu_tensor_drop_zero(self):
+ mgr = DiagnosisManager()
+ _metric_context.clear_node_metrics()
+
+ _dlrover_context.xpu_type = Accelerators.NVIDIA_GPU
+ job_metrics = {}
+ metric = GpuNodeMetric()
+ for i in range(8):
+ metric.node_metrics[i] = GpuMetric()
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_FREE_MEM, 0)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_USED_MEM, 80)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_UTIL, 99.5)
+ metric.node_metrics[i].set_metric(
+ GpuMetricEnum.GPU_TENSOR_UTIL, 0.307
+ )
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+
+ ts = int(datetime.now().timestamp())
+ _metric_context.add_node_metrics(ts, job_metrics)
+ self.assertEqual(
+ mgr.check_tensor_drop_zero(10)[0], DiagnosisResult.DIAG_WAITING
+ )
+
+ for _ in range(10):
+ ts = ts + 60
+ _metric_context.add_node_metrics(ts, job_metrics)
+ self.assertEqual(
+ mgr.check_tensor_drop_zero(10)[0], DiagnosisResult.DIAG_HEALTHY
+ )
+
+ job_metrics = {}
+ metric = GpuNodeMetric()
+ for i in range(8):
+ metric.node_metrics[i] = GpuMetric()
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_FREE_MEM, 0)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_USED_MEM, 80)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_UTIL, 99.5)
+ metric.node_metrics[i].set_metric(
+ GpuMetricEnum.GPU_TENSOR_UTIL, 0.0002
+ )
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+
+ for _ in range(30):
+ ts = ts + 60
+ _metric_context.add_node_metrics(ts, job_metrics)
+ self.assertEqual(
+ mgr.check_tensor_drop_zero(10)[0], DiagnosisResult.DIAG_HANG
+ )
+ self.assertEqual(
+ mgr.check_tensor_drop_zero(30)[0], DiagnosisResult.DIAG_HANG
+ )
diff --git a/dlrover/python/tests/test_elastic_training_agent.py b/dlrover/python/tests/test_elastic_training_agent.py
index 19880d846..6b2660c8c 100644
--- a/dlrover/python/tests/test_elastic_training_agent.py
+++ b/dlrover/python/tests/test_elastic_training_agent.py
@@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import copy
import json
import os
import shutil
@@ -21,7 +22,7 @@
import threading
import time
import unittest
-from datetime import timedelta
+from datetime import datetime, timedelta
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -42,10 +43,21 @@
Accelerators,
AscendConstants,
ConfigPath,
+ GpuMetricEnum,
JobConstant,
NodeEnv,
+ NpuMetricEnum,
RendezvousName,
)
+from dlrover.python.common.global_context import Context
+from dlrover.python.common.log import default_logger as logger
+from dlrover.python.common.metric.context import JobMetricContext
+from dlrover.python.common.metric.metric import (
+ GpuMetric,
+ GpuNodeMetric,
+ NpuMetric,
+ NpuNodeMetric,
+)
from dlrover.python.common.storage import PosixDiskStorage
from dlrover.python.diagnosis.common.constants import DiagnosisConstant
from dlrover.python.diagnosis.common.diagnosis_action import EventAction
@@ -77,6 +89,9 @@
)
from dlrover.python.tests.test_utils import start_local_master
+_metric_context = JobMetricContext.singleton_instance()
+_dlrover_context = Context.singleton_instance()
+
class ElasticTrainingAgentTest(unittest.TestCase):
def setUp(self) -> None:
@@ -356,10 +371,50 @@ def recoverable_multi_get(*args):
self.assertEqual(len(workers), 2)
+def mock_gpu_metric_collect(*args, **kwargs):
+ logger.info("mock gpu metric collector is running...")
+ job_metrics = {}
+ metric = GpuNodeMetric()
+ for i in range(8):
+ metric.node_metrics[i] = GpuMetric()
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_FREE_MEM, 0)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_USED_MEM, 80)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_UTIL, 99.5)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_TENSOR_UTIL, 30.5)
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+ _metric_context.add_node_metrics(
+ int(datetime.now().timestamp()), job_metrics
+ )
+
+
+def mock_npu_metric_collect(*args, **kwargs):
+ logger.info("mock npu metric collector is running...")
+ job_metrics = {}
+ metric = NpuNodeMetric()
+ for i in range(16):
+ metric.node_metrics[i] = NpuMetric()
+ metric.node_metrics[i].set_metric(NpuMetricEnum.NPU_USED_MEM, 78)
+ metric.node_metrics[i].set_metric(NpuMetricEnum.NPU_TOTAL_MEM, 80)
+ metric.node_metrics[i].set_metric(NpuMetricEnum.NPU_UTIL, 99.5)
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+ _metric_context.add_node_metrics(
+ int(datetime.now().timestamp()), job_metrics
+ )
+
+
class ElasticTrainingAgentRunTest(unittest.TestCase):
def setUp(self) -> None:
self._master, addr = start_local_master()
MasterClient._instance = build_master_client(addr, 1)
+ self._client = MasterClient.singleton_instance()
launch_config = LaunchConfig(
min_nodes=1,
max_nodes=1,
@@ -423,6 +478,39 @@ def test_monitor_workers(self):
self.assertDictEqual(run_result.failures, {})
self.assertEqual(run_result.state, WorkerState.SUCCEEDED)
+ def test_metric_collect(self):
+ with patch(
+ "dlrover.python.common.metric.monitor.SimpleMetricMonitor._collector", # noqa
+ side_effect=mock_gpu_metric_collect(),
+ ):
+ os.environ[
+ "DLROVER_METRIC_URL"
+ ] = "https://metric.mock.dlrover.org"
+ os.environ["DLROVER_METRIC_TOKEN"] = "0123456789"
+ self.assertIsNot(os.getenv("DLROVER_METRIC_URL", ""), "")
+ self.assertIsNot(os.getenv("DLROVER_METRIC_TOKEN", ""), "")
+
+ _metric_context.clear_node_metrics()
+ _dlrover_context.xpu_type = Accelerators.NVIDIA_GPU
+
+ self._master.diagnosis_manager.stop_metric_collect()
+
+ with patch(
+ "dlrover.python.common.metric.monitor.SimpleMetricMonitor._collector", # noqa
+ side_effect=mock_npu_metric_collect(),
+ ):
+ os.environ[
+ "DLROVER_METRIC_URL"
+ ] = "https://metric.mock.dlrover.org"
+ os.environ["DLROVER_METRIC_TOKEN"] = "0123456789"
+ self.assertIsNot(os.getenv("DLROVER_METRIC_URL", ""), "")
+ self.assertIsNot(os.getenv("DLROVER_METRIC_TOKEN", ""), "")
+
+ _metric_context.clear_node_metrics()
+ _dlrover_context.xpu_type = Accelerators.ASCEND_NPU
+
+ self._master.diagnosis_manager.stop_metric_collect()
+
def test_failure_ending_after_training(self):
agent = ElasticTrainingAgent(
node_rank=0,
diff --git a/dlrover/python/tests/test_metric_monitor.py b/dlrover/python/tests/test_metric_monitor.py
index 7d3f2d59a..ef18fb27e 100644
--- a/dlrover/python/tests/test_metric_monitor.py
+++ b/dlrover/python/tests/test_metric_monitor.py
@@ -22,8 +22,15 @@
import requests
-from dlrover.python.common.constants import GpuMetricEnum, NpuMetricEnum
-from dlrover.python.common.metric.context import get_job_metric_context
+from dlrover.python.common.constants import (
+ Accelerators,
+ GpuMetricEnum,
+ NpuMetricEnum,
+)
+from dlrover.python.common.metric.context import (
+ JobMetricContext,
+ get_job_metric_context,
+)
from dlrover.python.common.metric.metric import (
GpuMetric,
GpuNodeMetric,
@@ -36,6 +43,8 @@
SimpleMetricMonitor,
)
+_metric_context = JobMetricContext.singleton_instance()
+
class MetricContextTests(unittest.TestCase):
def test_gpu_metric(self):
@@ -561,10 +570,11 @@ def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
def test_query_exception(self):
- mon = SimpleMetricMonitor()
-
job_name = "dlrover-testjob"
metric_type = NpuMetricEnum.NPU_UTIL
+
+ mon = SimpleMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-22 4:55:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -618,10 +628,11 @@ def test_query_exception(self):
def test_query_npu_job_metrics(self):
with patch("requests.post", side_effect=mock_npu_job_metric_request):
- mon = SimpleMetricMonitor()
-
job_name = "dlrover-testjob"
metric_type = NpuMetricEnum.NPU_UTIL
+
+ mon = SimpleMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-22 4:55:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -640,11 +651,12 @@ def test_query_npu_job_metrics(self):
def test_query_npu_pod_metrics(self):
with patch("requests.post", side_effect=mock_npu_pod_metric_request):
- mon = SimpleMetricMonitor()
-
job_name = "dlrover-testjob"
pod_name = "dlrover-testjob-worker-100"
metric_type = NpuMetricEnum.NPU_UTIL
+
+ mon = SimpleMetricMonitor(job_name, [metric_type])
+
start = int(
datetime.strptime(
"2024-11-22 4:55:00", "%Y-%m-%d %H:%M:%S"
@@ -668,10 +680,11 @@ def test_query_npu_pod_metrics(self):
def test_query_gpu_job_metrics(self):
with patch("requests.post", side_effect=mock_gpu_job_metric_request):
- mon = SimpleMetricMonitor()
-
job_name = "dlrover-testjob"
metric_type = GpuMetricEnum.GPU_UTIL
+
+ mon = SimpleMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-24 10:00:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -690,11 +703,12 @@ def test_query_gpu_job_metrics(self):
def test_query_gpu_pod_metrics(self):
with patch("requests.post", side_effect=mock_gpu_pod_metric_request):
- mon = SimpleMetricMonitor()
-
job_name = "dlrover-testjob"
pod_name = "dlrover-testjob-worker-100"
metric_type = GpuMetricEnum.GPU_UTIL
+
+ mon = SimpleMetricMonitor(job_name, [metric_type])
+
start = int(
datetime.strptime(
"2024-11-24 10:00:00", "%Y-%m-%d %H:%M:%S"
@@ -718,10 +732,11 @@ def test_query_gpu_pod_metrics(self):
def test_collect_npu_job_metrics(self):
with patch("requests.post", side_effect=mock_npu_job_metric_request):
- mon = NpuMetricMonitor()
-
job_name = "dlrover-testjob"
metric_type = NpuMetricEnum.NPU_UTIL
+
+ mon = NpuMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-22 4:55:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -740,7 +755,7 @@ def test_collect_npu_job_metrics(self):
for pod, node_metric in job_metric.items():
if pod == "dlrover-testjob-worker-100":
- node_metric.update_avg_metrics()
+ # node_metric.update_avg_metrics()
self.assertEqual(
node_metric.avg_metrics.get_metric(
NpuMetricEnum.NPU_UTIL
@@ -748,7 +763,7 @@ def test_collect_npu_job_metrics(self):
96.5,
)
elif pod == "dlrover-testjob-worker-101":
- node_metric.update_avg_metrics()
+ # node_metric.update_avg_metrics()
self.assertEqual(
node_metric.avg_metrics.get_metric(
NpuMetricEnum.NPU_UTIL
@@ -758,11 +773,12 @@ def test_collect_npu_job_metrics(self):
def test_collect_npu_pod_metrics(self):
with patch("requests.post", side_effect=mock_npu_pod_metric_request):
- mon = NpuMetricMonitor()
-
job_name = "dlrover-testjob"
pod_name = "dlrover-testjob-worker-100"
metric_type = NpuMetricEnum.NPU_UTIL
+
+ mon = NpuMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-22 4:55:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -781,7 +797,7 @@ def test_collect_npu_pod_metrics(self):
self.assertIsNotNone(pod_metric)
for pod, node_metric in pod_metric.items():
- node_metric.update_avg_metrics()
+ # node_metric.update_avg_metrics()
self.assertEqual(
node_metric.avg_metrics.get_metric(NpuMetricEnum.NPU_UTIL),
97.75,
@@ -789,10 +805,11 @@ def test_collect_npu_pod_metrics(self):
def test_collect_gpu_job_metrics(self):
with patch("requests.post", side_effect=mock_gpu_job_metric_request):
- mon = GpuMetricMonitor()
-
job_name = "dlrover-testjob"
metric_type = GpuMetricEnum.GPU_UTIL
+
+ mon = GpuMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-24 10:00:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -829,11 +846,12 @@ def test_collect_gpu_job_metrics(self):
def test_collect_gpu_pod_metrics(self):
with patch("requests.post", side_effect=mock_gpu_pod_metric_request):
- mon = GpuMetricMonitor()
-
job_name = "dlrover-testjob"
pod_name = "dlrover-testjob-worker-100"
metric_type = GpuMetricEnum.GPU_UTIL
+
+ mon = GpuMetricMonitor(job_name, [metric_type])
+
start = datetime.strptime(
"2024-11-24 10:00:00", "%Y-%m-%d %H:%M:%S"
).timestamp()
@@ -853,3 +871,218 @@ def test_collect_gpu_pod_metrics(self):
node_metric.avg_metrics.get_metric(GpuMetricEnum.GPU_UTIL),
97.75,
)
+
+
+def mock_gpu_monitor_collect(*args, **kwargs):
+ if "metrics" in kwargs:
+ job_metrics = kwargs["metrics"]
+ metric = GpuNodeMetric()
+ for i in range(8):
+ metric.node_metrics[i] = GpuMetric()
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_FREE_MEM, 0)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_USED_MEM, 80)
+ metric.node_metrics[i].set_metric(GpuMetricEnum.GPU_UTIL, 99.5)
+ metric.node_metrics[i].set_metric(
+ GpuMetricEnum.GPU_TENSOR_UTIL, 30.5
+ )
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+
+ return job_metrics
+
+
+def mock_npu_monitor_collect(*args, **kwargs):
+ if "metrics" in kwargs:
+ job_metrics = kwargs["metrics"]
+ metric = NpuNodeMetric()
+ for i in range(8):
+ metric.node_metrics[i] = NpuMetric()
+ metric.node_metrics[i].set_metric(NpuMetricEnum.NPU_UTIL, 99.5)
+ metric.update_avg_metrics()
+ job_metrics["worker-1"] = copy.deepcopy(metric)
+ job_metrics["worker-2"] = copy.deepcopy(metric)
+ job_metrics["worker-3"] = copy.deepcopy(metric)
+ job_metrics["worker-4"] = copy.deepcopy(metric)
+
+ return job_metrics
+
+
+class GpuMetricMonitorTest(unittest.TestCase):
+ def setUp(self):
+ self.url = os.getenv("DLROVER_METRIC_URL", "")
+ self.token = os.getenv("DLROVER_METRIC_TOKEN", "")
+ self.job_name = os.getenv("DLROVER_JOB_NAME", "")
+ self.xpu = Accelerators.NVIDIA_GPU
+ http_client.HTTPConnection.debuglevel = 1
+ logging.basicConfig()
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ def tearDown(self):
+ pass
+
+ def test_gpu_collector(self):
+ with patch(
+ "dlrover.python.common.metric.monitor.GpuMetricMonitor.collect_job_metrics", # noqa
+ side_effect=mock_gpu_monitor_collect,
+ ):
+ mon = GpuMetricMonitor(
+ job_name=self.job_name,
+ metrics=[
+ GpuMetricEnum.GPU_TENSOR_UTIL,
+ GpuMetricEnum.GPU_UTIL,
+ ],
+ )
+ mon.start(interval=1)
+ time.sleep(2)
+ mon.stop()
+
+ _metric_context.log_job_metric(GpuMetricEnum.GPU_TENSOR_UTIL)
+ _metric_context.log_job_metric(GpuMetricEnum.GPU_UTIL)
+
+ self.assertEqual(_metric_context.size(), 1)
+
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_avg_metrics(
+ GpuMetricEnum.GPU_TENSOR_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [30.5],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_node_metrics(
+ GpuMetricEnum.GPU_TENSOR_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [[30.5, 30.5, 30.5, 30.5]],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_rank_metrics(
+ GpuMetricEnum.GPU_TENSOR_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [
+ [
+ [30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5],
+ [30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5],
+ [30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5],
+ [30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5, 30.5],
+ ]
+ ],
+ )
+
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_avg_metrics(
+ GpuMetricEnum.GPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [99.5],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_node_metrics(
+ GpuMetricEnum.GPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [[99.5, 99.5, 99.5, 99.5]],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_rank_metrics(
+ GpuMetricEnum.GPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [
+ [
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ ]
+ ],
+ )
+
+ _metric_context.clear_node_metrics()
+ self.assertEqual(_metric_context.size(), 0)
+
+
+class NpuMetricMonitorTest(unittest.TestCase):
+ def setUp(self):
+ self.url = os.getenv("DLROVER_METRIC_URL", "")
+ self.token = os.getenv("DLROVER_METRIC_TOKEN", "")
+ self.job_name = os.getenv("DLROVER_JOB_NAME", "")
+ self.xpu = Accelerators.ASCEND_NPU
+ http_client.HTTPConnection.debuglevel = 1
+ logging.basicConfig()
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ def tearDown(self):
+ pass
+
+ def test_npu_collector(self):
+ with patch(
+ "dlrover.python.common.metric.monitor.NpuMetricMonitor.collect_job_metrics", # noqa
+ side_effect=mock_npu_monitor_collect,
+ ):
+ mon = NpuMetricMonitor(
+ job_name=self.job_name,
+ metrics=[
+ NpuMetricEnum.NPU_UTIL,
+ ],
+ )
+ mon.start(interval=1)
+ time.sleep(2)
+ mon.stop()
+
+ _metric_context.log_job_metric(NpuMetricEnum.NPU_UTIL)
+
+ self.assertEqual(_metric_context.size(), 1)
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_avg_metrics(
+ NpuMetricEnum.NPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [99.5],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_node_metrics(
+ NpuMetricEnum.NPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [[99.5, 99.5, 99.5, 99.5]],
+ )
+ self.assertEqual(
+ list(
+ _metric_context.backtrace_rank_metrics(
+ NpuMetricEnum.NPU_UTIL,
+ _metric_context.max_metric_records,
+ ).values()
+ ),
+ [
+ [
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ [99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5, 99.5],
+ ]
+ ],
+ )
+
+ _metric_context.clear_node_metrics()
+ self.assertEqual(_metric_context.size(), 0)
diff --git a/docs/deployment/argument.md b/docs/deployment/argument.md
index e728fdbae..e15c251c4 100644
--- a/docs/deployment/argument.md
+++ b/docs/deployment/argument.md
@@ -14,6 +14,9 @@ when training with DLRover.
| pending_timeout | The timeout value of pending. | No | integer(unit: second) | 900 | \>=0 |
| pending_fail_strategy | The fail strategy for pending case. | No | integer | 1 | -1: disabled
0: skip
1: verify necessary parts
2: verify all parts |
| service_type | The type of master service. | No | string | grpc | grpc,http |
+| xpu_type | The name of xpu. | No | string | nvidia | nvidia,ascend |
+| hang_detection | The strategy of 'hang detection'. | No | integer | 1 | 0: log only
1: notify
2: with fault tolerance
|
+| hang_downtime | Training downtime to detect job hang, unit is minute. | No | integer | 30 | \>=0 |
## 2. Training Arguments