Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Feb 8, 2024
1 parent 9140e3e commit 4e3019e
Showing 1 changed file with 86 additions and 26 deletions.
112 changes: 86 additions & 26 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,35 +340,15 @@ func (r *Task) Reflect(client k8s.Client) (err error) {
}
return
}
mark := time.Now()
status := pod.Status
switch status.Phase {
switch pod.Status.Phase {
case core.PodPending:
r.podPending(pod)
case core.PodRunning:
r.State = Running
r.podRunning(pod, client)
case core.PodSucceeded:
r.State = Succeeded
r.Terminated = &mark
r.podSucceeded(pod)
case core.PodFailed:
r.Error(
"Error",
"Pod failed: %s",
pod.Status.ContainerStatuses[0].State.Terminated.Reason)
switch pod.Status.ContainerStatuses[0].State.Terminated.ExitCode {
case 137: // Killed.
if r.Retries < Settings.Hub.Task.Retries {
_ = client.Delete(context.TODO(), pod)
r.Pod = ""
r.State = Ready
r.Errors = nil
r.Retries++
} else {
r.State = Failed
r.Terminated = &mark
}
default:
r.State = Failed
r.Terminated = &mark
}
r.podFailed(pod, client)
}

return
Expand Down Expand Up @@ -403,6 +383,86 @@ func (r *Task) Delete(client k8s.Client) (err error) {
return
}

// podPending handles pod pending.
func (r *Task) podPending(pod *core.Pod) {
for _, status := range pod.Status.InitContainerStatuses {
if status.Started == nil {
continue
}
if *status.Started {
r.State = Running
return
}
}
}

// podRunning handles pod running.
func (r *Task) podRunning(pod *core.Pod, client k8s.Client) {
var statuses []core.ContainerStatus
statuses = append(
statuses,
pod.Status.InitContainerStatuses...)
statuses = append(
statuses,
pod.Status.ContainerStatuses...)
for _, status := range statuses {
if status.State.Terminated == nil {
continue
}
switch status.State.Terminated.ExitCode {
case 0: // Succeeded.
default: // failed.
r.podFailed(pod, client)
}
}
}

// podFailed handles pod succeeded.
func (r *Task) podSucceeded(pod *core.Pod) {
mark := time.Now()
r.State = Succeeded
r.Terminated = &mark
}

// podFailed handles pod failed.
func (r *Task) podFailed(pod *core.Pod, client k8s.Client) {
mark := time.Now()
var statuses []core.ContainerStatus
statuses = append(
statuses,
pod.Status.InitContainerStatuses...)
statuses = append(
statuses,
pod.Status.ContainerStatuses...)
for _, status := range statuses {
if status.State.Terminated == nil {
continue
}
switch status.State.Terminated.ExitCode {
case 0: // Succeeded.
case 137: // Killed.
if r.Retries < Settings.Hub.Task.Retries {
_ = client.Delete(context.TODO(), pod)
r.Pod = ""
r.State = Ready
r.Errors = nil
r.Retries++
return
}
fallthrough
default: // Error.
r.State = Failed
r.Terminated = &mark
r.Error(
"Error",
"Container (%s) failed: %s",
status.Name,
status.State.Terminated.Reason)
return
}
}
}

// Cancel the task.
func (r *Task) Cancel(client k8s.Client) (err error) {
err = r.Delete(client)
Expand Down

0 comments on commit 4e3019e

Please sign in to comment.