Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRON-2208: Add toggle in tron config to disable retries on LOST k8s jobs #988

Merged
merged 15 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tron.egg-info
docs/_build/
.idea
.vscode
.fleet
tron.iml
docs/images/
*.dot
Expand Down
1 change: 1 addition & 0 deletions requirements-dev-minimal.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
asynctest
debugpy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: what is this for?

Copy link
Member Author

@cuza cuza Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VSCode Python Debugger :)

https://github.com/microsoft/debugpy

flake8
mock
mypy
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
astroid==2.13.3
asynctest==0.12.0
cfgv==2.0.1
debugpy==1.8.1
dill==0.3.6
distlib==0.3.6
filelock==3.4.1
Expand Down
2 changes: 1 addition & 1 deletion tests/config/config_parse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def make_mesos_options():


def make_k8s_options():
return schema.ConfigKubernetes(enabled=False, default_volumes=())
return schema.ConfigKubernetes(enabled=False, non_retryable_exit_codes=(), default_volumes=())


def make_action(**kwargs):
Expand Down
2 changes: 1 addition & 1 deletion tests/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def test_handle_event_lost(mock_kubernetes_task):
)
)

assert mock_kubernetes_task.is_unknown
assert mock_kubernetes_task.exit_status == exitcode.EXIT_KUBERNETES_TASK_LOST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to keep the old assert as well, since we do still want to verify 'lost' k8s events lead to marking the task as unknown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sadly the way we're checking unknow is by checking exit code

  def is_unknown(self):
      return self.exit_status is None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh interesting; i didn't realize assigning the LOST's an exit code changed this behavior, thanks.

Did you happen to glance through all the other places where we use is_unknown/unknown to check this change won't change any expected behavior there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not be a problem since we're only marking lost jobs with the special lost exit code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_broken() uses the is_unknown property and it looks like there's some jobrun code that uses is_broken to gate starting new action runs - but i'm not sure if this change impacts that in a meaningful way yet



def test_create_task_disabled():
Expand Down
2 changes: 2 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,14 @@ class ValidateKubernetes(Validator):
defaults = {
"kubeconfig_path": None,
"enabled": False,
"non_retryable_exit_codes": (),
"default_volumes": (),
}

validators = {
"kubeconfig_path": valid_string,
"enabled": valid_bool,
"non_retryable_exit_codes": build_list_of_type_validator(valid_int, allow_empty=True),
"default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
"watcher_kubeconfig_paths": build_list_of_type_validator(valid_string, allow_empty=True),
}
Expand Down
1 change: 1 addition & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def config_object_factory(name, required=None, optional=None):
optional=[
"kubeconfig_path",
"enabled",
"non_retryable_exit_codes",
"default_volumes",
"watcher_kubeconfig_paths",
],
Expand Down
33 changes: 26 additions & 7 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ def fail(self, exit_status=None):

return self._done("fail", exit_status)

def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> Optional[Union[bool, ActionCommand]]:
def _exit_unsuccessful(
self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[]
) -> Optional[Union[bool, ActionCommand]]:
if self.is_done:
log.info(
f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying',
Expand All @@ -599,13 +601,17 @@ def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> O
if self.last_attempt is not None:
self.last_attempt.exit(exit_status)
if self.retries_remaining is not None:
if self.retries_remaining > 0:
self.retries_remaining -= 1
return self.restart(original_command=retry_original_command)
if exit_status in non_retryable_exit_codes:
self.retries_remaining = 0
log.info(f"{self} skipping auto-retries, received non-retryable exit code ({exit_status}).")
cuza marked this conversation as resolved.
Show resolved Hide resolved
else:
log.info(
f"Reached maximum number of retries: {len(self.attempts)}",
)
if self.retries_remaining > 0:
self.retries_remaining -= 1
return self.restart(original_command=retry_original_command)
else:
log.info(
f"Reached maximum number of retries: {len(self.attempts)}",
)
if exit_status is None:
return self._done("fail_unknown", exit_status)
else:
Expand Down Expand Up @@ -1314,6 +1320,19 @@ def kill(self, final: bool = True) -> Optional[str]:

return "\n".join(msgs)

def _exit_unsuccessful(
self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[]
) -> Optional[Union[bool, ActionCommand]]:

k8s_cluster = KubernetesClusterRepository.get_cluster()
non_retryable_exit_codes = [] if not k8s_cluster else k8s_cluster.non_retryable_exit_codes

return super()._exit_unsuccessful(
exit_status=exit_status,
retry_original_command=retry_original_command,
non_retryable_exit_codes=non_retryable_exit_codes,
)

def handle_action_command_state_change(
self, action_command: ActionCommand, event: str, event_data=None
) -> Optional[Union[bool, ActionCommand]]:
Expand Down
6 changes: 5 additions & 1 deletion tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def handle_event(self, event: Event) -> None:
self.log.warning(f" tronctl skip {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:")
self.log.warning(f" tronctl fail {self.id}")
self.exited(None)
self.exited(exitcode.EXIT_KUBERNETES_TASK_LOST)
else:
self.log.info(
f"Did not handle unknown kubernetes event type: {event}",
Expand Down Expand Up @@ -281,10 +281,12 @@ def __init__(
default_volumes: Optional[List[ConfigVolume]] = None,
pod_launch_timeout: Optional[int] = None,
watcher_kubeconfig_paths: Optional[List[str]] = None,
non_retryable_exit_codes: Optional[List[int]] = [],
):
# general k8s config
self.kubeconfig_path = kubeconfig_path
self.enabled = enabled
self.non_retryable_exit_codes = non_retryable_exit_codes
self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or []
self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S
self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or []
Expand Down Expand Up @@ -621,6 +623,7 @@ def recover(self, task: KubernetesTask) -> None:
class KubernetesClusterRepository:
# Kubernetes config
kubernetes_enabled: bool = False
kubernetes_non_retryable_exit_codes: Optional[List[int]] = []
kubeconfig_path: Optional[str] = None
pod_launch_timeout: Optional[int] = None
default_volumes: Optional[List[ConfigVolume]] = None
Expand Down Expand Up @@ -665,6 +668,7 @@ def shutdown(cls) -> None:
def configure(cls, kubernetes_options: ConfigKubernetes) -> None:
cls.kubeconfig_path = kubernetes_options.kubeconfig_path
cls.kubernetes_enabled = kubernetes_options.enabled
cls.kubernetes_non_retryable_exit_codes = kubernetes_options.non_retryable_exit_codes
cls.default_volumes = kubernetes_options.default_volumes
cls.watcher_kubeconfig_paths = kubernetes_options.watcher_kubeconfig_paths

Expand Down
2 changes: 2 additions & 0 deletions tron/utils/exitcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
EXIT_KUBERNETES_ABNORMAL = -9
EXIT_KUBERNETES_SPOT_INTERRUPTION = -10
EXIT_KUBERNETES_NODE_SCALEDOWN = -11
EXIT_KUBERNETES_TASK_LOST = -12

EXIT_REASONS = {
EXIT_INVALID_COMMAND: "Invalid command",
Expand All @@ -23,4 +24,5 @@
EXIT_KUBERNETES_ABNORMAL: "Kubernetes task failed in an unexpected manner",
EXIT_KUBERNETES_SPOT_INTERRUPTION: "Kubernetes task failed due to spot interruption",
EXIT_KUBERNETES_NODE_SCALEDOWN: "Kubernetes task failed due to the autoscaler scaling down a node",
EXIT_KUBERNETES_TASK_LOST: "Tron lost track of a pod it already thought it had started for a job.",
}
Loading