Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into optimize_fault_tole…
Browse files Browse the repository at this point in the history
…rance_when_assign_workers

# Conflicts:
#	dlrover/python/tests/test_elastic_training_agent.py
  • Loading branch information
BalaBalaYi committed Jan 27, 2025
2 parents 401371c + 2ce0e02 commit 3dadb9a
Show file tree
Hide file tree
Showing 18 changed files with 996 additions and 59 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Furthermore, DLRover offers extension libraries for PyTorch and TensorFlow to ex

## Latest News

- [2025/01] [EDiT: A Local-SGD-Based Efficient Distributed Training Method for Large Language Models, ICLR'24.](https://arxiv.org/abs/2412.07210)
- [2025/01] [EDiT: A Local-SGD-Based Efficient Distributed Training Method for Large Language Models, ICLR'25.](https://arxiv.org/abs/2412.07210)
- [2024/06] [DLRover-RM has been accepted by VLDB'24.](docs/blogs/dlrover_rm.md)
- [2024/04] [Flash Checkpoint Supports HuggingFace transformers.Trainer to Asynchronously persist checkpoints.](docs/blogs/flash_checkpoint.md#huggingface-transformerstrainer)
- [2024/02] [Flash Checkpoint Saves the Megatron-LM Checkpoint in Seconds.](docs/blogs/megatron_flash_checkpoint.md)
Expand Down
13 changes: 11 additions & 2 deletions dlrover/python/common/global_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

import os

from dlrover.python.common.constants import CommunicationType, UserEnv
from dlrover.python.common.constants import (
Accelerators,
CommunicationType,
UserEnv,
)
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.singleton import Singleton
from dlrover.python.util.common_util import (
Expand Down Expand Up @@ -58,6 +62,8 @@ class DefaultValues(object):
SEC_TO_WAIT_FAILED_PS = 600 # 10min
HANG_CPU_USAGE_RATE = 0.05
HANG_DETECTION = 1
HANG_DOWNTIME = 30
MIN_HANG_DOWNTIME = 3
GPU_NUM_PER_NODE = 8
NPU_NUM_PER_NODE = 16
MAX_METRIC_REC = 30
Expand Down Expand Up @@ -107,9 +113,12 @@ def __init__(self):
# The strategy of 'hang detection':
# 0: log only; 1: notify; 2: with fault tolerance
self.hang_detection = DefaultValues.HANG_DETECTION
# The duration of downtime as training hang, unit is minute
self.hang_downtime = DefaultValues.HANG_DOWNTIME
#
self.xpu_type = Accelerators.NVIDIA_GPU
self.gpu_per_node = DefaultValues.GPU_NUM_PER_NODE
self.npu_per_node = DefaultValues.NPU_NUM_PER_NODE
self.max_metric_records = DefaultValues.MAX_METRIC_REC

def set_params_from_brain(self):
self.train_speed_record_num = self.get_param_value_from_brain(
Expand Down
120 changes: 116 additions & 4 deletions dlrover/python/common/metric/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from collections import OrderedDict
from datetime import datetime
from typing import Dict

from dlrover.python.common.global_context import Context
from dlrover.python.common.global_context import Context, DefaultValues
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.metric.metric import XpuNodeMetric
from dlrover.python.common.singleton import Singleton

Expand All @@ -29,7 +30,6 @@ class JobMetricContext(Singleton):
"""

def __init__(self):
self._lock = threading.Lock()
"""
job metrics dict is a dict with timestamp as key,
and the value is another dict with worker node id as key,
Expand All @@ -38,7 +38,119 @@ def __init__(self):
self._xpu_job_metrics: OrderedDict[
int, Dict[str, XpuNodeMetric]
] = OrderedDict()
self.max_metric_records = _dlrover_context.max_metric_records
self.max_metric_records = DefaultValues.MAX_METRIC_REC
self._lock = threading.Lock()

def backtrace_avg_metrics(self, metric, depth):
"""
backtrace all avg metrics of all nodes
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum depth of backtrace
Returns:
OrderedDict with key as timestamp, value as avg metric
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
total = 0
for v in self._xpu_job_metrics[tm].values():
total += v.get_avg_metric(metric)
od[tm] = round(
total / len(self._xpu_job_metrics[tm].values()), 2
)
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_avg_metrics {metric} error: {e}")
return None

def backtrace_node_metrics(self, metric, depth):
"""
backtrace all node avg metric lists
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth
Returns:
OrderedDict with key as timestamp, value as node metric list
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_avg_metric(metric))
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_node_metrics {metric} error: {e}")
return None

def backtrace_rank_metrics(self, metric, depth):
"""
backtrace a number of rank metric lists
Args:
metric: name, e.g. GPU_TENSOR_UTIL
depth: maximum backtrace depth
Returns:
OrderedDict with key as timestamp, value as rank metric list
"""
with self._lock:
try:
od = OrderedDict()
for tm in list(self._xpu_job_metrics.keys())[::-1]:
od[tm] = []
for v in self._xpu_job_metrics[tm].values():
od[tm].append(v.get_node_metrics(metric))
depth = depth - 1
if depth <= 0:
break
return od
except Exception as e:
logger.error(f"bt_rank_metrics {metric} error: {e}")
return None

def log_job_metric(self, metric):
"""
print job avg metrics of type metric among all nodes
"""
try:
for tm, metrics in self._xpu_job_metrics.items():
dt_obj = datetime.fromtimestamp(tm)
dt_str = "{}-{}-{} {}:{}:{}".format(
dt_obj.year,
dt_obj.month,
dt_obj.day,
dt_obj.hour,
dt_obj.minute,
dt_obj.second,
)
total = 0
for v in metrics.values():
total += v.get_avg_metric(metric)

logger.info(
f"{metric}[{dt_str}]: "
f"{round(total/len(metrics.values()), 2)}"
)

except Exception as e:
logger.error(f"log_job_metric error: {e}")

def add_node_metrics(
self, timestamp: int, metrics: Dict[str, XpuNodeMetric]
Expand Down
26 changes: 26 additions & 0 deletions dlrover/python/common/metric/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def __init__(self):
def update_avg_metrics(self):
pass

@abstractmethod
def get_avg_metric(self, metric):
pass

@abstractmethod
def get_node_metrics(self, metric):
pass


class GpuNodeMetric(XpuNodeMetric):
"""
Expand All @@ -155,6 +163,15 @@ def __init__(self):
self.node_metrics: Dict[int, GpuMetric] = {}
self.avg_metrics = GpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
self.avg_metrics.metrics[GpuMetricEnum.GPU_FREE_MEM] = 0
self.avg_metrics.metrics[GpuMetricEnum.GPU_USED_MEM] = 0
Expand Down Expand Up @@ -217,6 +234,15 @@ def __init__(self):
self.node_metrics: Dict[int, NpuMetric] = {}
self.avg_metrics = NpuMetric()

def get_avg_metric(self, metric):
return self.avg_metrics.get_metric(metric)

def get_node_metrics(self, metric):
metrics = []
for v in self.node_metrics.values():
metrics.append(v.get_metric(metric))
return metrics

def update_avg_metrics(self):
for _, metric in self.node_metrics.items():
self.avg_metrics.metrics[
Expand Down
Loading

0 comments on commit 3dadb9a

Please sign in to comment.