Skip to content

Commit

Permalink
feat(job_monitor): log pod errors and disruptions as warnings (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh authored and tiborsimko committed Oct 21, 2024
1 parent 88b0acb commit db9c258
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
30 changes: 26 additions & 4 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def get_job_status(self, job_pod) -> Optional[str]:
f"Kubernetes job {backend_job_id}, assuming successful."
)
elif reason != "Completed":
logging.info(
logging.warn(
f"Kubernetes job id: {backend_job_id} failed, phase 'Succeeded' but "
f"container '{container.name}' was terminated because of '{reason}'."
)
Expand Down Expand Up @@ -199,19 +199,19 @@ def get_job_status(self, job_pod) -> Optional[str]:
continue

if "ErrImagePull" in reason:
logging.info(
logging.warn(
f"Container {container.name} in Kubernetes job {backend_job_id} "
"failed to fetch image."
)
status = JobStatus.failed.name
elif "InvalidImageName" in reason:
logging.info(
logging.warn(
f"Container {container.name} in Kubernetes job {backend_job_id} "
"failed due to invalid image name."
)
status = JobStatus.failed.name
elif "CreateContainerConfigError" in reason:
logging.info(
logging.warn(

Check warning on line 214 in reana_job_controller/job_monitor.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/job_monitor.py#L214

Added line #L214 was not covered by tests
f"Container {container.name} in Kubernetes job {backend_job_id} "
f"failed due to container configuration error: {message}"
)
Expand Down Expand Up @@ -247,6 +247,11 @@ def watch_jobs(self, job_db, app=None):
backend_job_id, job_pod=job_pod
)

if job_status == JobStatus.failed.name:
self.log_disruption(

Check warning on line 251 in reana_job_controller/job_monitor.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/job_monitor.py#L250-L251

Added lines #L250 - L251 were not covered by tests
event["object"].status.conditions, backend_job_id
)

store_job_logs(reana_job_id, logs)
update_job_status(reana_job_id, job_status)

Expand All @@ -260,6 +265,23 @@ def watch_jobs(self, job_db, app=None):
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))

def log_disruption(self, conditions, backend_job_id):
"""Log disruption message from Kubernetes event conditions.
Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.
:param conditions: List of Kubernetes event conditions.
:param backend_job_id: Backend job ID.
"""
disruption_target = next(
(item for item in conditions if item.type == "DisruptionTarget"),
None,
)
if disruption_target:
logging.warn(
f"{disruption_target.reason}: Job {backend_job_id} was disrupted: {disruption_target.message}"
)


condorJobStatus = {
"Unexpanded": 0,
Expand Down
59 changes: 59 additions & 0 deletions tests/test_job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import mock
import pytest
from kubernetes.client.models import V1PodCondition

from reana_job_controller.job_monitor import (
JobMonitorHTCondorCERN,
Expand Down Expand Up @@ -107,3 +108,61 @@ def test_kubernetes_should_process_job(
)

assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process


@pytest.mark.parametrize(
"conditions,is_call_expected,expected_message",
[
(
[
V1PodCondition(
type="PodScheduled",
status="True",
),
V1PodCondition(
type="DisruptionTarget",
status="True",
reason="EvictionByEvictionAPI",
message="Eviction API: evicting",
),
V1PodCondition(
type="Initialized",
status="True",
),
],
True,
"EvictionByEvictionAPI: Job backend_job_id was disrupted: Eviction API: evicting",
),
(
[
V1PodCondition(
type="PodScheduled",
status="True",
),
V1PodCondition(
type="Initialized",
status="True",
),
],
False,
"",
),
(
[],
False,
"",
),
],
)
def test_log_disruption_evicted(conditions, is_call_expected, expected_message):
"""Test logging of disruption target condition."""
with (
mock.patch("reana_job_controller.job_monitor.threading"),
mock.patch("reana_job_controller.job_monitor.logging.warn") as log_mock,
):
job_monitor_k8s = JobMonitorKubernetes(app=None)
job_monitor_k8s.log_disruption(conditions, "backend_job_id")
if is_call_expected:
log_mock.assert_called_with(expected_message)
else:
log_mock.assert_not_called()

0 comments on commit db9c258

Please sign in to comment.