Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【WIP】add pod diagnosis feature #1219

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dlrover/python/common/diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import List

from kubernetes.client import V1Pod

from dlrover.python.util.file_util import read_last_n_lines

Expand All @@ -21,6 +24,7 @@
CUDALOG = "cuda_log"
TRAININGLOG = "training_log"
CHIPMETRICES = "chip_metrics"
K8SPODDATA = "k8s_pod_data"


class DiagnosisData(metaclass=ABCMeta):
Expand Down Expand Up @@ -78,6 +82,22 @@
def get_type(self) -> str:
return DiagnosisDataType.CHIPMETRICES

class K8sPodData(DiagnosisData):
def __init__(self, timestamp: int, pods: List[V1Pod]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not involve 'V1Pod' in common package(or user should add 'kubernetes' deps in their env).

super().__init__()
if timestamp == 0:
self.timestamp = int(round(datetime.now().timestamp()))
else:
self.timestamp = timestamp

Check warning on line 91 in dlrover/python/common/diagnosis.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/diagnosis.py#L91

Added line #L91 was not covered by tests

self.pods = pods

def get_timestamp(self) -> float:
return self.timestamp

def get_type(self) -> str:
return DiagnosisDataType.K8SPODDATA

Check warning on line 99 in dlrover/python/common/diagnosis.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/diagnosis.py#L99

Added line #L99 was not covered by tests


def node_failed(log_file: str) -> bool:
if len(log_file) == 0:
Expand Down
26 changes: 18 additions & 8 deletions dlrover/python/master/diagnosis/diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time

from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.singleton import Singleton
from dlrover.python.master.diagnosis.diagnosis_data import (
DataManager,
DiagnosisData,
Expand All @@ -28,7 +29,7 @@
)


class DiagnosisManager:
class DiagnosisManager(Singleton):
def __init__(self):
self.data_manager: DataManager = DataManager(600)
self.diagnostician: Diagnostician = Diagnostician(self.data_manager)
Expand All @@ -43,13 +44,18 @@
InferenceName.TRAINING,
InferenceAttribute.ISORNOT,
InferenceDescription.HANG,
),
Inference(
InferenceName.POD,
InferenceAttribute.ISORNOT,
InferenceDescription.PENDING,
)
]
self.diagnostician.register_problems(problems)

try:
thread = threading.Thread(
target=self._diagnose_failures(),
target=self._diagnose_failures,
name="diagnose_failures",
daemon=True,
)
Expand All @@ -67,10 +73,14 @@
def _diagnose_failures(self):
logger.info("Start to diagnose failures")
while True:
observed_problems = self.diagnostician.observe_training()
for problem in observed_problems:
logger.info(f"observed problems: {problem}")
root_causes = self.diagnostician.diagnose_failure(problem)
for root_cause in root_causes:
logger.info(f"identify root cause: {root_cause}")
try:
observed_problems = self.diagnostician.observe_training()
for problem in observed_problems:
logger.info(f"observed problems: {problem}")
root_causes = self.diagnostician.diagnose_failure(problem)
if root_causes is not None:
for root_cause in root_causes:
logger.info(f"identify root cause: {root_cause}")
except Exception as e:
logger.warning(e)

Check warning on line 85 in dlrover/python/master/diagnosis/diagnosis.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/diagnosis.py#L76-L85

Added lines #L76 - L85 were not covered by tests
time.sleep(180)
2 changes: 2 additions & 0 deletions dlrover/python/master/diagnosis/inferencechain/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
class InferenceName:
END = "end"
TRAINING = "training"
POD = "pod"


class InferenceAttribute:
Expand All @@ -27,6 +28,7 @@ class InferenceAttribute:

class InferenceDescription:
HANG = "hang"
PENDING = "pending"


@dataclass
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 The DLRover Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

import datetime

from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.diagnosis import K8sPodData, DiagnosisDataType
from dlrover.python.master.diagnosis.diagnosis_data import DataManager
from dlrover.python.master.diagnosis.inferencechain.common import (
Inference,
InferenceAttribute,
InferenceDescription,
InferenceName,
InferenceOperator,
)


class CheckPodPendingOperator(InferenceOperator):
def __init__(self, data_manager: DataManager):
self.data_manager = data_manager

def is_compatible(self, inference: Inference) -> bool:
if (
inference.name == InferenceName.POD
and inference.attribution == InferenceAttribute.ISORNOT
and inference.description == InferenceDescription.PENDING
):
return True

Check warning on line 40 in dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py#L40

Added line #L40 was not covered by tests
else:
return False

def infer(self, inferences: List[Inference]) -> List[Inference]:
# data = K8sPodData(0, pods)
# DiagnosisManager.singleton_instance().collect_diagnosis_data(DiagnosisDataType.K8SPODDATA, data)
pod_data = self.data_manager.get_data(DiagnosisDataType.K8SPODDATA)
if pod_data is None or len(pod_data) == 0:
logger.info('[PodPendingChecker] No pod data collected yet.')
return []
k8s_pod_data = pod_data[-1]
if not isinstance(k8s_pod_data, K8sPodData):
logger.info('[PodPendingChecker] data is not instance of K8sPodData.')
return []

Check warning on line 54 in dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py#L47-L54

Added lines #L47 - L54 were not covered by tests

pods = k8s_pod_data.pods
logger.info(f'[PodPendingChecker] {len(pods)} pods collected at {pod_data[len(pod_data) - 1].timestamp}')

Check warning on line 57 in dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py#L56-L57

Added lines #L56 - L57 were not covered by tests
# check pod pending time
for pod in pods:
if pod.status.phase == 'Pending':
if pod.status.conditions is None or len(pod.status.conditions) == 0:
logger.info(f'[PodPendingChecker] Pod {pod.metadata.name} has no conditions.')
continue
start_time = pod.status.conditions[-1].last_transition_time
time_difference = (datetime.now() - start_time).total_seconds() / 60
logger.info(f'[PodPendingChecker] Pod {pod.metadata.name} is pending for {time_difference} minutes')
if time_difference > 15:

Check warning on line 67 in dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py#L59-L67

Added lines #L59 - L67 were not covered by tests
# TODO: add inference and do restart for pod
logger.info(f'[PodPendingChecker] TODO: restart Pod {pod.metadata.name}')
return []

Check warning on line 70 in dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/diagnosis/operator/check_pod_pending_operator.py#L69-L70

Added lines #L69 - L70 were not covered by tests
7 changes: 5 additions & 2 deletions dlrover/python/master/diagnosis/operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
from dlrover.python.master.diagnosis.inferencechain.common import (
InferenceOperator,
)
from dlrover.python.master.diagnosis.operator.check_pod_pending_operator import CheckPodPendingOperator
from dlrover.python.master.diagnosis.operator.check_training_hang_operator import ( # noqa: E501
CheckTrainingHangOperator,
)


def register_operators(data_manager: DataManager) -> List[InferenceOperator]:
return [CheckTrainingHangOperator(data_manager)]
return [
CheckTrainingHangOperator(data_manager),
CheckPodPendingOperator(data_manager),
]
44 changes: 44 additions & 0 deletions dlrover/python/master/monitor/pod_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import threading
import time

from dlrover.python.common.diagnosis import K8sPodData, DiagnosisDataType
from dlrover.python.common.log import default_logger as logger
from dlrover.python.master.diagnosis.diagnosis import DiagnosisManager
from dlrover.python.master.watcher.k8s_watcher import K8sPodWatcher
from dlrover.python.scheduler.job import JobArgs


class PodMonitor(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impl is duplicated with _monitor_nodes in dist_job_manager.py.


def __init__(self, job_args: JobArgs):
self._stopped = False
self._k8s_pod_watcher = K8sPodWatcher(job_args.job_name, job_args.namespace)

def start(self):
"""Start Detecting. The method should be called only once."""
threading.Thread(
target=self._monitor_pod,
name="pod-monitor",
daemon=True
).start()

def stop(self):
self._stopped = True

Check warning on line 26 in dlrover/python/master/monitor/pod_monitor.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/monitor/pod_monitor.py#L26

Added line #L26 was not covered by tests

def _monitor_pod(self):
logger.info("Start monitoring pod events.")
while True:
logger.info("PodMonitor: monitoring pods")
if self._stopped:
logger.info("Stop monitoring pods.")
break

Check warning on line 34 in dlrover/python/master/monitor/pod_monitor.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/monitor/pod_monitor.py#L33-L34

Added lines #L33 - L34 were not covered by tests
try:
pods = self._k8s_pod_watcher.list()
logger.info(f"PodMonitor: get pods {len(pods)}")
data = K8sPodData(0, pods)
DiagnosisManager.singleton_instance().collect_diagnosis_data(DiagnosisDataType.K8SPODDATA, data)
except Exception as e:
logger.warning(e)
time.sleep(30)

Check warning on line 42 in dlrover/python/master/monitor/pod_monitor.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/monitor/pod_monitor.py#L40-L42

Added lines #L40 - L42 were not covered by tests
time.sleep(5)

4 changes: 4 additions & 0 deletions dlrover/python/master/node/dist_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from dlrover.python.common.log import default_logger as logger
from dlrover.python.common.node import Node, NodeGroupResource
from dlrover.python.master.monitor.error_monitor import K8sJobErrorMonitor
from dlrover.python.master.monitor.pod_monitor import PodMonitor
from dlrover.python.master.node.event_callback import (
ClusterContext,
NodeEventCallback,
Expand Down Expand Up @@ -163,6 +164,8 @@ def __init__(
self._scaler: Scaler = job_scaler
self._init_training_node_manager()

self._pod_monitor = PodMonitor(job_args)

def start(self):
self._scaler.start()
self._job_optimizer.update_job_uuid(self._job_args.job_uuid)
Expand Down Expand Up @@ -203,6 +206,7 @@ def start(self):
name="scaleplan_monitor",
daemon=True,
).start()
self._pod_monitor.start()

def _has_running_workers(self):
nodes = self._node_watcher.list()
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/master/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
self._job_manager: JobManager = job_manager
self._speed_monitor = speed_monitor
self._rdzv_managers = rdzv_managers
self._diagnosis_manager: DiagnosisManager = DiagnosisManager()
self._diagnosis_manager: DiagnosisManager = DiagnosisManager.singleton_instance()
self._kv_store = KVStoreService()
self._job_metric_collector: JobMetricCollector = job_metric_collector
self._elastic_ps_service: ElasticPsService = elastic_ps_service
Expand Down
23 changes: 23 additions & 0 deletions dlrover/python/master/watcher/k8s_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import List

from kubernetes import client, watch
from kubernetes.client import V1Pod

from dlrover.python.common.constants import (
ElasticJobApi,
Expand Down Expand Up @@ -370,3 +371,25 @@
name=scale_crd["metadata"]["name"],
body=scale_crd,
)


class K8sPodWatcher:
"""K8sPodWatcher monitors the pods on the cluster.
It does not convert pod to node as PodWatcher does.
"""

def __init__(self, job_name, namespace):
self._job_name = job_name
self._namespace = namespace
self._k8s_client = k8sClient.singleton_instance(namespace)
self._job_selector = ElasticJobLabel.JOB_KEY + "=" + self._job_name

def list(self) -> List[V1Pod]:
pod_list = self._k8s_client.list_namespaced_pod(self._job_selector)
if not pod_list:
return []

Check warning on line 390 in dlrover/python/master/watcher/k8s_watcher.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/watcher/k8s_watcher.py#L390

Added line #L390 was not covered by tests
if not pod_list.items:
return []

Check warning on line 392 in dlrover/python/master/watcher/k8s_watcher.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/master/watcher/k8s_watcher.py#L392

Added line #L392 was not covered by tests

return pod_list.items

Loading