Skip to content

Commit

Permalink
Detect hang by tensor util (#1448)
Browse files Browse the repository at this point in the history
* add _collector thread to update metrics into global JobMetricContext

* add update_node_xpu_info api

the agent will update master worker local xpu type, which is GPU or NPU. the master will
start metric collector in diagnosis manager

* add tensor hang support in diagnosis manager

* use hang_downtime and hang_detection in job args for hang detection

* remove collect_xpu_info

* fix log format

* add --xpu_type into job args and update docs for new job args

---------

Co-authored-by: Ma Jie Yue <[email protected]>
  • Loading branch information
majieyue and Ma Jie Yue authored Jan 27, 2025
1 parent 03c965f commit 2ce0e02
Show file tree
Hide file tree
Showing 17 changed files with 995 additions and 57 deletions.
13 changes: 11 additions & 2 deletions dlrover/python/common/global_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

import os

from dlrover.python.common.constants import CommunicationType, UserEnv
from dlrover.python.common.constants import (
Accelerators,
CommunicationType,
UserEnv,
)
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.singleton import Singleton
from dlrover.python.util.common_util import (
Expand Down Expand Up @@ -58,6 +62,8 @@ class DefaultValues(object):
SEC_TO_WAIT_FAILED_PS = 600 # 10min
HANG_CPU_USAGE_RATE = 0.05
HANG_DETECTION = 1
HANG_DOWNTIME = 30
MIN_HANG_DOWNTIME = 3
GPU_NUM_PER_NODE = 8
NPU_NUM_PER_NODE = 16
MAX_METRIC_REC = 30
Expand Down Expand Up @@ -107,9 +113,12 @@ def __init__(self):
# The strategy of 'hang detection':
# 0: log only; 1: notify; 2: with fault tolerance
self.hang_detection = DefaultValues.HANG_DETECTION
# The duration of downtime as training hang, unit is minute
self.hang_downtime = DefaultValues.HANG_DOWNTIME
#
self.xpu_type = Accelerators.NVIDIA_GPU
self.gpu_per_node = DefaultValues.GPU_NUM_PER_NODE
self.npu_per_node = DefaultValues.NPU_NUM_PER_NODE
self.max_metric_records = DefaultValues.MAX_METRIC_REC

def set_params_from_brain(self):
self.train_speed_record_num = self.get_param_value_from_brain(
Expand Down
120 changes: 116 additions & 4 deletions dlrover/python/common/metric/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from collections import OrderedDict
from datetime import datetime
from typing import Dict

from dlrover.python.common.global_context import Context
from dlrover.python.common.global_context import Context, DefaultValues
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.metric.metric import XpuNodeMetric
from dlrover.python.common.singleton import Singleton

Expand All @@ -29,7 +30,6 @@ class JobMetricContext(Singleton):
"""

def __init__(self):
self._lock = threading.Lock()
"""
job metrics dict is a dict with timestamp as key,
and the value is another dict with worker node id as key,
Expand All @@ -38,7 +38,119 @@ def __init__(self):
self._xpu_job_metrics: OrderedDict[
int, Dict[str, XpuNodeMetric]
] = OrderedDict()
self.max_metric_records = _dlrover_context.max_metric_records
self.max_metric_records = DefaultValues.MAX_METRIC_REC
self._lock = threading.Lock()

def backtrace_avg_metrics(self, metric, depth):
"""
backtrace all avg metrics of all nodes
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum depth of backtrace
Returns:
OrderedDict with key as timestamp, value as avg metric
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
total = 0
for v in self._xpu_job_metrics[tm].values():
total += v.get_avg_metric(metric)
od[tm] = round(
total / len(self._xpu_job_metrics[tm].values()), 2
)
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_avg_metrics {metric} error: {e}")
return None

def backtrace_node_metrics(self, metric, depth):
"""
backtrace all node avg metric lists
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth
Returns:
OrderedDict with key as timestamp, value as node metric list
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_avg_metric(metric))
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_node_metrics {metric} error: {e}")
return None

def backtrace_rank_metrics(self, metric, depth):
"""
backtrace a number of rank metric lists
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth
Returns:
OrderedDict with key as timestamp, value as rank metric list
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_node_metrics(metric))
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_rank_metrics {metric} error: {e}")
return None

def log_job_metric(self, metric):
"""
print job avg metrics of type metric among all nodes
"""
try:
for tm, metrics in self._xpu_job_metrics.items():
dt_obj = datetime.fromtimestamp(tm)
dt_str = "{}-{}-{} {}:{}:{}".format(
dt_obj.year,
dt_obj.month,
dt_obj.day,
dt_obj.hour,
dt_obj.minute,
dt_obj.second,
)
total = 0
for v in metrics.values():
total += v.get_avg_metric(metric)

logger.info(
f"{metric}[{dt_str}]: "
f"{round(total/len(metrics.values()), 2)}"
)

except Exception as e:
logger.error(f"log_job_metric error: {e}")

def add_node_metrics(
self, timestamp: int, metrics: Dict[str, XpuNodeMetric]
Expand Down
26 changes: 26 additions & 0 deletions dlrover/python/common/metric/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def __init__(self):
def update_avg_metrics(self):
pass

@abstractmethod
def get_avg_metric(self, metric):
pass

@abstractmethod
def get_node_metrics(self, metric):
pass


class GpuNodeMetric(XpuNodeMetric):
"""
Expand All @@ -155,6 +163,15 @@ def __init__(self):
self.node_metrics: Dict[int, GpuMetric] = {}
self.avg_metrics = GpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
self.avg_metrics.metrics[GpuMetricEnum.GPU_FREE_MEM] = 0
self.avg_metrics.metrics[GpuMetricEnum.GPU_USED_MEM] = 0
Expand Down Expand Up @@ -217,6 +234,15 @@ def __init__(self):
self.node_metrics: Dict[int, NpuMetric] = {}
self.avg_metrics = NpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
for _, metric in self.node_metrics.items():
self.avg_metrics.metrics[
Expand Down
Loading

0 comments on commit 2ce0e02

Please sign in to comment.