Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect hang by tensor util #1448

Merged
5 changes: 5 additions & 0 deletions dlrover/python/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ class GlobalStep(Message):
elapsed_time_per_step: float = 0.0


@dataclass
class NodeXpuInfo(Message):
xpu_type: str = ""


@dataclass
class HeartBeat(Message):
timestamp: int = 0
Expand Down
116 changes: 114 additions & 2 deletions dlrover/python/common/metric/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from collections import OrderedDict
from datetime import datetime
from typing import Dict

from dlrover.python.common.global_context import Context
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.metric.metric import XpuNodeMetric
from dlrover.python.common.singleton import Singleton

Expand All @@ -29,7 +30,6 @@
"""

def __init__(self):
self._lock = threading.Lock()
"""
job metrics dict is a dict with timestamp as key,
and the value is another dict with worker node id as key,
Expand All @@ -39,6 +39,118 @@
int, Dict[str, XpuNodeMetric]
] = OrderedDict()
self.max_metric_records = _dlrover_context.max_metric_records
self._lock = threading.Lock()

def backtrace_avg_metrics(self, metric, depth):
"""
backtrace all avg metrics of all nodes

Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum depth of backtrace

Returns:
OrderedDict with key as timestamp, value as avg metric

"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
total = 0
for v in self._xpu_job_metrics[tm].values():
total += v.get_avg_metric(metric)
od[tm] = round(
total / len(self._xpu_job_metrics[tm].values()), 2
)
depth = depth - 1
if depth <= 0:
break

Check warning on line 68 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L68

Added line #L68 was not covered by tests
return od
except Exception as e:
logger.error(f"bt_avg_metrics {metric} error: {e}")
return None

Check warning on line 72 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L70-L72

Added lines #L70 - L72 were not covered by tests

def backtrace_node_metrics(self, metric, depth):
"""
backtrace all node avg metric lists

Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth

Returns:
OrderedDict with key as timestamp, value as node metric list

"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_avg_metric(metric))
depth = depth - 1
if depth <= 0:
break

Check warning on line 95 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L95

Added line #L95 was not covered by tests
return od
except Exception as e:
logger.error(f"bt_node_metrics {metric} error: {e}")
return None

Check warning on line 99 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L97-L99

Added lines #L97 - L99 were not covered by tests

def backtrace_rank_metrics(self, metric, depth):
"""
backtrace a number of rank metric lists

Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth

Returns:
OrderedDict with key as timestamp, value as rank metric list

"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_node_metrics(metric))
depth = depth - 1
if depth <= 0:
break

Check warning on line 122 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L122

Added line #L122 was not covered by tests
return od
except Exception as e:
logger.error(f"bt_rank_metrics {metric} error: {e}")
return None

Check warning on line 126 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L124-L126

Added lines #L124 - L126 were not covered by tests

def log_job_metric(self, metric):
"""
print job avg metrics of type metric among all nodes
"""
try:
for tm, metrics in self._xpu_job_metrics.items():
dt_obj = datetime.fromtimestamp(tm)
dt_str = "{}-{}-{} {}:{}:{}".format(
dt_obj.year,
dt_obj.month,
dt_obj.day,
dt_obj.hour,
dt_obj.minute,
dt_obj.second,
)
total = 0
for v in metrics.values():
total += v.get_avg_metric(metric)

logger.info(
f"{metric}[{dt_str}]: "
f"{round(total/len(metrics.values()), 2)}"
)

except Exception as e:
logger.error(f"log_job_metric error: {e}")

Check warning on line 153 in dlrover/python/common/metric/context.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/context.py#L152-L153

Added lines #L152 - L153 were not covered by tests

def add_node_metrics(
self, timestamp: int, metrics: Dict[str, XpuNodeMetric]
Expand Down
26 changes: 26 additions & 0 deletions dlrover/python/common/metric/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@
def update_avg_metrics(self):
pass

@abstractmethod
def get_avg_metric(self, metric):
pass

Check warning on line 148 in dlrover/python/common/metric/metric.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/metric.py#L148

Added line #L148 was not covered by tests

@abstractmethod
def get_node_metrics(self, metric):
pass

Check warning on line 152 in dlrover/python/common/metric/metric.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/metric/metric.py#L152

Added line #L152 was not covered by tests


class GpuNodeMetric(XpuNodeMetric):
"""
Expand All @@ -155,6 +163,15 @@
self.node_metrics: Dict[int, GpuMetric] = {}
self.avg_metrics = GpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
self.avg_metrics.metrics[GpuMetricEnum.GPU_FREE_MEM] = 0
self.avg_metrics.metrics[GpuMetricEnum.GPU_USED_MEM] = 0
Expand Down Expand Up @@ -217,6 +234,15 @@
self.node_metrics: Dict[int, NpuMetric] = {}
self.avg_metrics = NpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
for _, metric in self.node_metrics.items():
self.avg_metrics.metrics[
Expand Down
Loading
Loading