Skip to content

Commit

Permalink
Checks to see if handle_event errors out for any event
Browse files Browse the repository at this point in the history
  • Loading branch information
EmanElsaban committed Dec 4, 2023
1 parent 425b32f commit 15cb1dd
Showing 1 changed file with 160 additions and 124 deletions.
284 changes: 160 additions & 124 deletions tron/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import json
from logging import Logger
from typing import cast
from typing import Collection
Expand Down Expand Up @@ -41,8 +42,23 @@

log = logging.getLogger(__name__)

try:
import clog # type: ignore

def combine_volumes(defaults: Collection[ConfigVolume], overrides: Collection[ConfigVolume],) -> List[ConfigVolume]:
clog.config.configure(
scribe_host="169.254.255.254",
scribe_port=1463,
monk_disable=False,
scribe_disable=False,
)
except ImportError:
clog = None


def combine_volumes(
defaults: Collection[ConfigVolume],
overrides: Collection[ConfigVolume],
) -> List[ConfigVolume]:
"""Helper to reconcile lists of volume mounts.
If any volumes have the same container path, the one in overrides wins.
Expand Down Expand Up @@ -122,136 +138,150 @@ def handle_event(self, event: Event) -> None:
"""
Transitions Tron's state machine for this task based on events from task_processing.
"""
event_id = getattr(event, "task_id", None)
if event_id != self.get_kubernetes_id():
self.log.warning(
f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.",
)
return

k8s_type = getattr(event, "platform_type", None)
self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).")

try:
self.log_event_info(event=event)
except Exception:
self.log.exception(f"Unable to log event info for id={event_id}.")

if k8s_type == "running":
self.started()
elif k8s_type in KUBERNETES_TERMINAL_TYPES:
raw_object = getattr(event, "raw", {}) or {}
pod_status = raw_object.get("status", {}) or {}
container_statuses = pod_status.get("containerStatuses", []) or []
exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL

if len(container_statuses) > 1 or len(container_statuses) == 0:
# shouldn't happen right now, but who knows what future us will do :p
self.log.error(
"Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success."
# Might be that somewhere in this function, something is throwing an exception and we're not handling it
# wrap this whole function in a big block of try/except block
# if handle_event errors out for any reason then we will never see this event at all
event_id = getattr(event, "task_id", None)
if event_id != self.get_kubernetes_id():
self.log.warning(
f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.",
)
self.log.error(f"Event with >1 || 0 containers: {raw_object}")
else:
main_container_statuses = container_statuses[0]
main_container_state = main_container_statuses.get("state", {}) or {}
main_container_last_state = main_container_statuses.get("lastState", {}) or {}
return

event_missing_state = not main_container_state
event_missing_previous_state = not main_container_last_state
k8s_type = getattr(event, "platform_type", None)
self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).")

# We are expecting this code to never be hit as we are expecting both state and last_state have values
# The else statement should handle the situation gracefully when either current/last state are missing
if event_missing_state and event_missing_previous_state:
try:
self.log_event_info(event=event)
except Exception:
self.log.exception(f"Unable to log event info for id={event_id}.")

if k8s_type == "running":
self.started()
elif k8s_type in KUBERNETES_TERMINAL_TYPES:
raw_object = getattr(event, "raw", {}) or {}
pod_status = raw_object.get("status", {}) or {}
container_statuses = pod_status.get("containerStatuses", []) or []
exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL

if len(container_statuses) > 1 or len(container_statuses) == 0:
# shouldn't happen right now, but who knows what future us will do :p
self.log.error(
f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}."
"Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success."
)
self.log.error(f"Event with missing state: {raw_object}")
self.log.error(f"Event with >1 || 0 containers: {raw_object}")
else:
state_termination_metadata = main_container_state.get("terminated", {}) or {}
last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {}
if k8s_type == "finished":
# this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually
# due to what appear to be race conditons like those mentioned in
# https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that
# these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually
# started. So far, we've noticed that when this happens, the finished_at and reason fields will be None
# and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail"
# the affected action
# NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect
# success) being fixed :p
if state_termination_metadata.get("exitCode") == 0 and (
state_termination_metadata.get("finishedAt") is None
and state_termination_metadata.get("reason") is None
):
exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL
self.log.warning("Container never started due to a Kubernetes/infra flake!")
self.log.warning(
f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry."
)
elif k8s_type in KUBERNETES_FAILURE_TYPES:
# pod killed before it reached terminal state, assume node scaledown
if not (state_termination_metadata or last_state_termination_metadata):
self.log.warning("Container did not complete, likely due to scaling down a node.")
exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN

# Handling spot terminations
elif (
last_state_termination_metadata.get("exitCode") == 137
and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown"
):
exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION
self.log.warning("Tronjob failed due to spot interruption.")
# Handling K8s scaling down a node
elif state_termination_metadata.get("exitCode") == 143 and (
state_termination_metadata.get("reason") == "Error"
):
exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN
self.log.warning("Tronjob failed due to Kubernetes scaling down a node.")
else:
# Capture the real exit code
state_exit_code = state_termination_metadata.get("exitCode")
last_state_exit_code = last_state_termination_metadata.get("exitCode")
if state_exit_code:
exit_code = state_exit_code
elif last_state_exit_code:
exit_code = last_state_exit_code

if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES:
self.log.warning(
f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry."
)
self.log.warning(
"If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default."
)
self.exited(exit_code)
elif k8s_type == "lost":
# Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called,
# the tasks inside task_metadata map are all UNKNOWN
self.log.warning("Kubernetes does not know anything about this task, it is LOST")
self.log.warning(
"This can happen for any number of reasons, and Tron can't know if the task ran or not at all!"
)
self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:")
self.log.warning(f" tronctl retry {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:")
self.log.warning(f" tronctl skip {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:")
self.log.warning(f" tronctl fail {self.id}")
self.exited(None)
else:
self.log.info(f"Did not handle unknown kubernetes event type: {event}",)
main_container_statuses = container_statuses[0]
main_container_state = main_container_statuses.get("state", {}) or {}
main_container_last_state = main_container_statuses.get("lastState", {}) or {}

event_missing_state = not main_container_state
event_missing_previous_state = not main_container_last_state

# We are expecting this code to never be hit as we are expecting both state and last_state have values
# The else statement should handle the situation gracefully when either current/last state are missing
if event_missing_state and event_missing_previous_state:
self.log.error(
f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}."
)
self.log.error(f"Event with missing state: {raw_object}")
else:
state_termination_metadata = main_container_state.get("terminated", {}) or {}
last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {}
if k8s_type == "finished":
# this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually
# due to what appear to be race conditons like those mentioned in
# https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that
# these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually
# started. So far, we've noticed that when this happens, the finished_at and reason fields will be None
# and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail"
# the affected action
# NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect
# success) being fixed :p
if state_termination_metadata.get("exitCode") == 0 and (
state_termination_metadata.get("finishedAt") is None
and state_termination_metadata.get("reason") is None
):
exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL
self.log.warning("Container never started due to a Kubernetes/infra flake!")
self.log.warning(
f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry."
)
elif k8s_type in KUBERNETES_FAILURE_TYPES:
# pod killed before it reached terminal state, assume node scaledown
if not (state_termination_metadata or last_state_termination_metadata):
self.log.warning("Container did not complete, likely due to scaling down a node.")
exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN

# Handling spot terminations
elif (
last_state_termination_metadata.get("exitCode") == 137
and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown"
):
exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION
self.log.warning("Tronjob failed due to spot interruption.")
# Handling K8s scaling down a node
elif state_termination_metadata.get("exitCode") == 143 and (
state_termination_metadata.get("reason") == "Error"
):
exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN
self.log.warning("Tronjob failed due to Kubernetes scaling down a node.")
else:
# Capture the real exit code
state_exit_code = state_termination_metadata.get("exitCode")
last_state_exit_code = last_state_termination_metadata.get("exitCode")
if state_exit_code:
exit_code = state_exit_code
elif last_state_exit_code:
exit_code = last_state_exit_code

if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES:
self.log.warning(
f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry."
)
self.log.warning(
"If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default."
)
self.exited(exit_code)

elif k8s_type == "lost":
# Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called,
# the tasks inside task_metadata map are all UNKNOWN
self.log.warning("Kubernetes does not know anything about this task, it is LOST")
self.log.warning(
"This can happen for any number of reasons, and Tron can't know if the task ran or not at all!"
)
self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:")
self.log.warning(f" tronctl retry {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:")
self.log.warning(f" tronctl skip {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:")
self.log.warning(f" tronctl fail {self.id}")
self.exited(None)
else:
self.log.info(
f"Did not handle unknown kubernetes event type: {event}",
)

if event.terminal:
self.log.info("This Kubernetes event was terminal, ending this action")
self.report_resources(decrement=True)
if event.terminal:
self.log.info("This Kubernetes event was terminal, ending this action")
self.report_resources(decrement=True)

exit_code = int(not getattr(event, "success", False))
# Returns False if we've already exited normally above
unexpected_error = self.exited(exit_code)
if unexpected_error:
self.log.error("Unexpected failure, exiting")
exit_code = int(not getattr(event, "success", False))
# Returns False if we've already exited normally above
unexpected_error = self.exited(exit_code)
if unexpected_error:
self.log.error("Unexpected failure, exiting")

self.done()
self.done()
except Exception:
self.log.exception(f"unable to handle an event for id={event_id} for event={str(event)}")
#clog here and make sure the message is a string
if clog is None:
log.debug("Clog logger unavailable. Unable to log event")
else:
clog.log_line("tmp_missed_tronevents", json.dumps(event)) # Capture event


class KubernetesCluster:
Expand Down Expand Up @@ -338,12 +368,14 @@ def handle_next_event(self, _=None) -> None:
* control: events regarding how the task_proc plugin is running - handled directly
* task: events regarding how the actual tasks/Pods we're running our doing - forwarded to KubernetesTask
"""
if self.deferred and not self.deferred.called:
if self.deferred is not None and not self.deferred.called:
log.warning("Already have handlers waiting for next event in queue, not adding more")
return

self.deferred = self.queue.get()

if self.deferred is None:
log.warning("Unable to get a handler for next event in queue - this should never happen!")
return
# we want to process the event we just popped off the queue, but we also want
# to form a sort of event loop, so we add two callbacks:
# * one to actually deal with the event
Expand Down Expand Up @@ -507,7 +539,11 @@ def create_task(
log.error(f"Invalid {task_id} for {action_run_id}")
return None

return KubernetesTask(action_run_id=action_run_id, task_config=task_config, serializer=serializer,)
return KubernetesTask(
action_run_id=action_run_id,
task_config=task_config,
serializer=serializer,
)

def _check_connection(self) -> None:
"""
Expand Down

0 comments on commit 15cb1dd

Please sign in to comment.