diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 857839554..a43c525d1 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -1,4 +1,5 @@ import logging +import json from logging import Logger from typing import cast from typing import Collection @@ -41,8 +42,23 @@ log = logging.getLogger(__name__) +try: + import clog # type: ignore -def combine_volumes(defaults: Collection[ConfigVolume], overrides: Collection[ConfigVolume],) -> List[ConfigVolume]: + clog.config.configure( + scribe_host="169.254.255.254", + scribe_port=1463, + monk_disable=False, + scribe_disable=False, + ) +except ImportError: + clog = None + + +def combine_volumes( + defaults: Collection[ConfigVolume], + overrides: Collection[ConfigVolume], +) -> List[ConfigVolume]: """Helper to reconcile lists of volume mounts. If any volumes have the same container path, the one in overrides wins. @@ -122,136 +138,150 @@ def handle_event(self, event: Event) -> None: """ Transitions Tron's state machine for this task based on events from task_processing. """ - event_id = getattr(event, "task_id", None) - if event_id != self.get_kubernetes_id(): - self.log.warning( - f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.", - ) - return - - k8s_type = getattr(event, "platform_type", None) - self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).") - try: - self.log_event_info(event=event) - except Exception: - self.log.exception(f"Unable to log event info for id={event_id}.") - - if k8s_type == "running": - self.started() - elif k8s_type in KUBERNETES_TERMINAL_TYPES: - raw_object = getattr(event, "raw", {}) or {} - pod_status = raw_object.get("status", {}) or {} - container_statuses = pod_status.get("containerStatuses", []) or [] - exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL - - if len(container_statuses) > 1 or len(container_statuses) == 0: - # shouldn't happen right now, but who knows what future us will do :p - self.log.error( - "Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success." + # Might be that somewhere in this function, something is throwing an exception and we're not handling it + # wrap this whole function in a big block of try/except block + # if handle_event errors out for any reason then we will never see this event at all + event_id = getattr(event, "task_id", None) + if event_id != self.get_kubernetes_id(): + self.log.warning( + f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.", ) - self.log.error(f"Event with >1 || 0 containers: {raw_object}") - else: - main_container_statuses = container_statuses[0] - main_container_state = main_container_statuses.get("state", {}) or {} - main_container_last_state = main_container_statuses.get("lastState", {}) or {} + return - event_missing_state = not main_container_state - event_missing_previous_state = not main_container_last_state + k8s_type = getattr(event, "platform_type", None) + self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).") - # We are expecting this code to never be hit as we are expecting both state and last_state have values - # The else statement should handle the situation gracefully when either current/last state are missing - if event_missing_state and event_missing_previous_state: + try: + self.log_event_info(event=event) + except Exception: + self.log.exception(f"Unable to log event info for id={event_id}.") + + if k8s_type == "running": + self.started() + elif k8s_type in KUBERNETES_TERMINAL_TYPES: + raw_object = getattr(event, "raw", {}) or {} + pod_status = raw_object.get("status", {}) or {} + container_statuses = pod_status.get("containerStatuses", []) or [] + exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL + + if len(container_statuses) > 1 or len(container_statuses) == 0: + # shouldn't happen right now, but who knows what future us will do :p self.log.error( - f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}." + "Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success." ) - self.log.error(f"Event with missing state: {raw_object}") + self.log.error(f"Event with >1 || 0 containers: {raw_object}") else: - state_termination_metadata = main_container_state.get("terminated", {}) or {} - last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {} - if k8s_type == "finished": - # this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually - # due to what appear to be race conditons like those mentioned in - # https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that - # these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually - # started. So far, we've noticed that when this happens, the finished_at and reason fields will be None - # and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail" - # the affected action - # NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect - # success) being fixed :p - if state_termination_metadata.get("exitCode") == 0 and ( - state_termination_metadata.get("finishedAt") is None - and state_termination_metadata.get("reason") is None - ): - exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL - self.log.warning("Container never started due to a Kubernetes/infra flake!") - self.log.warning( - f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." - ) - elif k8s_type in KUBERNETES_FAILURE_TYPES: - # pod killed before it reached terminal state, assume node scaledown - if not (state_termination_metadata or last_state_termination_metadata): - self.log.warning("Container did not complete, likely due to scaling down a node.") - exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN - - # Handling spot terminations - elif ( - last_state_termination_metadata.get("exitCode") == 137 - and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown" - ): - exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION - self.log.warning("Tronjob failed due to spot interruption.") - # Handling K8s scaling down a node - elif state_termination_metadata.get("exitCode") == 143 and ( - state_termination_metadata.get("reason") == "Error" - ): - exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN - self.log.warning("Tronjob failed due to Kubernetes scaling down a node.") - else: - # Capture the real exit code - state_exit_code = state_termination_metadata.get("exitCode") - last_state_exit_code = last_state_termination_metadata.get("exitCode") - if state_exit_code: - exit_code = state_exit_code - elif last_state_exit_code: - exit_code = last_state_exit_code - - if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES: - self.log.warning( - f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." - ) - self.log.warning( - "If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default." - ) - self.exited(exit_code) - elif k8s_type == "lost": - # Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called, - # the tasks inside task_metadata map are all UNKNOWN - self.log.warning("Kubernetes does not know anything about this task, it is LOST") - self.log.warning( - "This can happen for any number of reasons, and Tron can't know if the task ran or not at all!" - ) - self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:") - self.log.warning(f" tronctl retry {self.id}") - self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:") - self.log.warning(f" tronctl skip {self.id}") - self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:") - self.log.warning(f" tronctl fail {self.id}") - self.exited(None) - else: - self.log.info(f"Did not handle unknown kubernetes event type: {event}",) + main_container_statuses = container_statuses[0] + main_container_state = main_container_statuses.get("state", {}) or {} + main_container_last_state = main_container_statuses.get("lastState", {}) or {} + + event_missing_state = not main_container_state + event_missing_previous_state = not main_container_last_state + + # We are expecting this code to never be hit as we are expecting both state and last_state have values + # The else statement should handle the situation gracefully when either current/last state are missing + if event_missing_state and event_missing_previous_state: + self.log.error( + f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}." + ) + self.log.error(f"Event with missing state: {raw_object}") + else: + state_termination_metadata = main_container_state.get("terminated", {}) or {} + last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {} + if k8s_type == "finished": + # this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually + # due to what appear to be race conditons like those mentioned in + # https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that + # these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually + # started. So far, we've noticed that when this happens, the finished_at and reason fields will be None + # and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail" + # the affected action + # NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect + # success) being fixed :p + if state_termination_metadata.get("exitCode") == 0 and ( + state_termination_metadata.get("finishedAt") is None + and state_termination_metadata.get("reason") is None + ): + exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL + self.log.warning("Container never started due to a Kubernetes/infra flake!") + self.log.warning( + f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." + ) + elif k8s_type in KUBERNETES_FAILURE_TYPES: + # pod killed before it reached terminal state, assume node scaledown + if not (state_termination_metadata or last_state_termination_metadata): + self.log.warning("Container did not complete, likely due to scaling down a node.") + exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN + + # Handling spot terminations + elif ( + last_state_termination_metadata.get("exitCode") == 137 + and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown" + ): + exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION + self.log.warning("Tronjob failed due to spot interruption.") + # Handling K8s scaling down a node + elif state_termination_metadata.get("exitCode") == 143 and ( + state_termination_metadata.get("reason") == "Error" + ): + exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN + self.log.warning("Tronjob failed due to Kubernetes scaling down a node.") + else: + # Capture the real exit code + state_exit_code = state_termination_metadata.get("exitCode") + last_state_exit_code = last_state_termination_metadata.get("exitCode") + if state_exit_code: + exit_code = state_exit_code + elif last_state_exit_code: + exit_code = last_state_exit_code + + if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES: + self.log.warning( + f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." + ) + self.log.warning( + "If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default." + ) + self.exited(exit_code) + + elif k8s_type == "lost": + # Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called, + # the tasks inside task_metadata map are all UNKNOWN + self.log.warning("Kubernetes does not know anything about this task, it is LOST") + self.log.warning( + "This can happen for any number of reasons, and Tron can't know if the task ran or not at all!" + ) + self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:") + self.log.warning(f" tronctl retry {self.id}") + self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:") + self.log.warning(f" tronctl skip {self.id}") + self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:") + self.log.warning(f" tronctl fail {self.id}") + self.exited(None) + else: + self.log.info( + f"Did not handle unknown kubernetes event type: {event}", + ) - if event.terminal: - self.log.info("This Kubernetes event was terminal, ending this action") - self.report_resources(decrement=True) + if event.terminal: + self.log.info("This Kubernetes event was terminal, ending this action") + self.report_resources(decrement=True) - exit_code = int(not getattr(event, "success", False)) - # Returns False if we've already exited normally above - unexpected_error = self.exited(exit_code) - if unexpected_error: - self.log.error("Unexpected failure, exiting") + exit_code = int(not getattr(event, "success", False)) + # Returns False if we've already exited normally above + unexpected_error = self.exited(exit_code) + if unexpected_error: + self.log.error("Unexpected failure, exiting") - self.done() + self.done() + except Exception: + self.log.exception(f"unable to handle an event for id={event_id} for event={str(event)}") + #clog here and make sure the message is a string + if clog is None: + log.debug("Clog logger unavailable. Unable to log event") + else: + clog.log_line("tmp_missed_tronevents", json.dumps(event)) # Capture event class KubernetesCluster: @@ -338,12 +368,14 @@ def handle_next_event(self, _=None) -> None: * control: events regarding how the task_proc plugin is running - handled directly * task: events regarding how the actual tasks/Pods we're running our doing - forwarded to KubernetesTask """ - if self.deferred and not self.deferred.called: + if self.deferred is not None and not self.deferred.called: log.warning("Already have handlers waiting for next event in queue, not adding more") return self.deferred = self.queue.get() - + if self.deferred is None: + log.warning("Unable to get a handler for next event in queue - this should never happen!") + return # we want to process the event we just popped off the queue, but we also want # to form a sort of event loop, so we add two callbacks: # * one to actually deal with the event @@ -507,7 +539,11 @@ def create_task( log.error(f"Invalid {task_id} for {action_run_id}") return None - return KubernetesTask(action_run_id=action_run_id, task_config=task_config, serializer=serializer,) + return KubernetesTask( + action_run_id=action_run_id, + task_config=task_config, + serializer=serializer, + ) def _check_connection(self) -> None: """