diff --git a/.gitignore b/.gitignore index 2354cd11e..372c00c9d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ tron.egg-info docs/_build/ .idea .vscode +.fleet tron.iml docs/images/ *.dot diff --git a/requirements-dev-minimal.txt b/requirements-dev-minimal.txt index eb9d38808..eddbadb2b 100644 --- a/requirements-dev-minimal.txt +++ b/requirements-dev-minimal.txt @@ -1,4 +1,5 @@ asynctest +debugpy flake8 mock mypy diff --git a/requirements-dev.txt b/requirements-dev.txt index 41f7131fc..2860357cd 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,7 @@ astroid==2.13.3 asynctest==0.12.0 cfgv==2.0.1 +debugpy==1.8.1 dill==0.3.6 distlib==0.3.6 filelock==3.4.1 diff --git a/tests/config/config_parse_test.py b/tests/config/config_parse_test.py index 03fc69122..4a9d2bff5 100644 --- a/tests/config/config_parse_test.py +++ b/tests/config/config_parse_test.py @@ -116,7 +116,7 @@ def make_mesos_options(): def make_k8s_options(): - return schema.ConfigKubernetes(enabled=False, default_volumes=()) + return schema.ConfigKubernetes(enabled=False, non_retryable_exit_codes=(), default_volumes=()) def make_action(**kwargs): diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index 62be2b254..ad2089b01 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -447,7 +447,7 @@ def test_handle_event_lost(mock_kubernetes_task): ) ) - assert mock_kubernetes_task.is_unknown + assert mock_kubernetes_task.exit_status == exitcode.EXIT_KUBERNETES_TASK_LOST def test_create_task_disabled(): diff --git a/tron/config/config_parse.py b/tron/config/config_parse.py index 491db7e9c..bc2129ed0 100644 --- a/tron/config/config_parse.py +++ b/tron/config/config_parse.py @@ -867,12 +867,14 @@ class ValidateKubernetes(Validator): defaults = { "kubeconfig_path": None, "enabled": False, + "non_retryable_exit_codes": (), "default_volumes": (), } validators = { "kubeconfig_path": valid_string, "enabled": valid_bool, + "non_retryable_exit_codes": build_list_of_type_validator(valid_int, allow_empty=True), "default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True), "watcher_kubeconfig_paths": build_list_of_type_validator(valid_string, allow_empty=True), } diff --git a/tron/config/schema.py b/tron/config/schema.py index 5f6d36229..948a65110 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -119,6 +119,7 @@ def config_object_factory(name, required=None, optional=None): optional=[ "kubeconfig_path", "enabled", + "non_retryable_exit_codes", "default_volumes", "watcher_kubeconfig_paths", ], diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 580cf263c..98eddaabd 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -590,7 +590,9 @@ def fail(self, exit_status=None): return self._done("fail", exit_status) - def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> Optional[Union[bool, ActionCommand]]: + def _exit_unsuccessful( + self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[] + ) -> Optional[Union[bool, ActionCommand]]: if self.is_done: log.info( f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying', @@ -599,13 +601,17 @@ def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> O if self.last_attempt is not None: self.last_attempt.exit(exit_status) if self.retries_remaining is not None: - if self.retries_remaining > 0: - self.retries_remaining -= 1 - return self.restart(original_command=retry_original_command) + if exit_status in non_retryable_exit_codes: + self.retries_remaining = 0 + log.info(f"{self} skipping auto-retries, received non-retryable exit code ({exit_status}).") else: - log.info( - f"Reached maximum number of retries: {len(self.attempts)}", - ) + if self.retries_remaining > 0: + self.retries_remaining -= 1 + return self.restart(original_command=retry_original_command) + else: + log.info( + f"Reached maximum number of retries: {len(self.attempts)}", + ) if exit_status is None: return self._done("fail_unknown", exit_status) else: @@ -1314,6 +1320,19 @@ def kill(self, final: bool = True) -> Optional[str]: return "\n".join(msgs) + def _exit_unsuccessful( + self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[] + ) -> Optional[Union[bool, ActionCommand]]: + + k8s_cluster = KubernetesClusterRepository.get_cluster() + non_retryable_exit_codes = [] if not k8s_cluster else k8s_cluster.non_retryable_exit_codes + + return super()._exit_unsuccessful( + exit_status=exit_status, + retry_original_command=retry_original_command, + non_retryable_exit_codes=non_retryable_exit_codes, + ) + def handle_action_command_state_change( self, action_command: ActionCommand, event: str, event_data=None ) -> Optional[Union[bool, ActionCommand]]: diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 960ec4e7f..e9316064e 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -252,7 +252,7 @@ def handle_event(self, event: Event) -> None: self.log.warning(f" tronctl skip {self.id}") self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:") self.log.warning(f" tronctl fail {self.id}") - self.exited(None) + self.exited(exitcode.EXIT_KUBERNETES_TASK_LOST) else: self.log.info( f"Did not handle unknown kubernetes event type: {event}", @@ -281,10 +281,12 @@ def __init__( default_volumes: Optional[List[ConfigVolume]] = None, pod_launch_timeout: Optional[int] = None, watcher_kubeconfig_paths: Optional[List[str]] = None, + non_retryable_exit_codes: Optional[List[int]] = [], ): # general k8s config self.kubeconfig_path = kubeconfig_path self.enabled = enabled + self.non_retryable_exit_codes = non_retryable_exit_codes self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or [] self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or [] @@ -621,6 +623,7 @@ def recover(self, task: KubernetesTask) -> None: class KubernetesClusterRepository: # Kubernetes config kubernetes_enabled: bool = False + kubernetes_non_retryable_exit_codes: Optional[List[int]] = [] kubeconfig_path: Optional[str] = None pod_launch_timeout: Optional[int] = None default_volumes: Optional[List[ConfigVolume]] = None @@ -665,6 +668,7 @@ def shutdown(cls) -> None: def configure(cls, kubernetes_options: ConfigKubernetes) -> None: cls.kubeconfig_path = kubernetes_options.kubeconfig_path cls.kubernetes_enabled = kubernetes_options.enabled + cls.kubernetes_non_retryable_exit_codes = kubernetes_options.non_retryable_exit_codes cls.default_volumes = kubernetes_options.default_volumes cls.watcher_kubeconfig_paths = kubernetes_options.watcher_kubeconfig_paths diff --git a/tron/utils/exitcode.py b/tron/utils/exitcode.py index df7819385..b5a95d509 100644 --- a/tron/utils/exitcode.py +++ b/tron/utils/exitcode.py @@ -10,6 +10,7 @@ EXIT_KUBERNETES_ABNORMAL = -9 EXIT_KUBERNETES_SPOT_INTERRUPTION = -10 EXIT_KUBERNETES_NODE_SCALEDOWN = -11 +EXIT_KUBERNETES_TASK_LOST = -12 EXIT_REASONS = { EXIT_INVALID_COMMAND: "Invalid command", @@ -23,4 +24,5 @@ EXIT_KUBERNETES_ABNORMAL: "Kubernetes task failed in an unexpected manner", EXIT_KUBERNETES_SPOT_INTERRUPTION: "Kubernetes task failed due to spot interruption", EXIT_KUBERNETES_NODE_SCALEDOWN: "Kubernetes task failed due to the autoscaler scaling down a node", + EXIT_KUBERNETES_TASK_LOST: "Tron lost track of a pod it already thought it had started for a job.", }