Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Jul 29, 2024
1 parent 2dc0a35 commit 0cbc431
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
20 changes: 20 additions & 0 deletions task/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,23 @@ func (e *QuotaExceeded) Retry() (r bool) {
r = true
return
}

// NotTerminated report pod/container not terminated.
type NotTerminated struct {
Kind string
Name string
}

func (e *NotTerminated) Error() string {
return fmt.Sprintf("%s: '%s' not terminated as expected.", e.Kind, e.Name)
}

func (e *NotTerminated) Is(err error) (matched bool) {
var inst *NotTerminated
matched = errors.As(err, &inst)
return
}

func (e *NotTerminated) Retry() (r bool) {
return
}
66 changes: 49 additions & 17 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ const (

// Events
const (
AddonSelected = "AddonSelected"
ExtSelected = "ExtensionSelected"
ImageError = "ImageError"
PodNotFound = "PodNotFound"
PodCreated = "PodCreated"
PodPending = "PodPending"
PodRunning = "PodRunning"
Preempted = "Preempted"
PodSucceeded = "PodSucceeded"
PodFailed = "PodFailed"
PodDeleted = "PodDeleted"
Escalated = "Escalated"
Released = "Released"
AddonSelected = "AddonSelected"
ExtSelected = "ExtensionSelected"
ImageError = "ImageError"
PodNotFound = "PodNotFound"
PodCreated = "PodCreated"
PodPending = "PodPending"
PodRunning = "PodRunning"
Preempted = "Preempted"
PodSucceeded = "PodSucceeded"
PodFailed = "PodFailed"
PodDeleted = "PodDeleted"
Escalated = "Escalated"
Released = "Released"
ContainerKilled = "ContainerKilled"
)

// k8s labels.
Expand Down Expand Up @@ -880,7 +881,7 @@ func (m *Manager) updateRunning() {
podRetention = Settings.Hub.Task.Pod.Retention.Failed
}
if podRetention > 0 {
err = m.ensureTerminated(pod)
err = m.ensureTerminated(running, pod)
if err != nil {
podRetention = 0
}
Expand Down Expand Up @@ -1084,15 +1085,15 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil
}

// ensureTerminated - Terminate running containers.
func (m *Manager) ensureTerminated(pod *core.Pod) (err error) {
func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) {
for _, status := range pod.Status.ContainerStatuses {
if status.State.Terminated != nil {
continue
}
if status.Started == nil || !*status.Started {
continue
}
err = m.terminateContainer(pod, status.Name)
err = m.terminateContainer(task, pod, status.Name)
if err != nil {
return
}
Expand All @@ -1101,7 +1102,23 @@ func (m *Manager) ensureTerminated(pod *core.Pod) (err error) {
}

// terminateContainer - Terminate container as needed.
func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error) {
// The container is killed.
// Should the container continue to run after (1) minute,
// it is reported as an error.
func (m *Manager) terminateContainer(task *Task, pod *core.Pod, container string) (err error) {
matched := task.FindEvent(ContainerKilled)
if len(matched) > 0 {
for _, event := range matched {
if time.Since(event.Last) > time.Minute {
err = &NotTerminated{
Kind: "Container",
Name: container,
}
break
}
}
return
}
Log.V(1).Info("KILL container", "container", container)
clientSet, err := k8s2.NewClientSet()
if err != nil {
Expand Down Expand Up @@ -1149,6 +1166,10 @@ func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error
"stderr",
stderr.String())
} else {
task.Event(
ContainerKilled,
"container: '%s' has not terminated.",
container)
Log.Info(
"Container KILLED.",
"name",
Expand Down Expand Up @@ -1229,6 +1250,17 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) {
return
}

// FindEvent returns the matched events by kind.
func (r *Task) FindEvent(kind string) (matched []*model.TaskEvent) {
for i := 0; i < len(r.Events); i++ {
event := &r.Events[i]
if kind == event.Kind {
matched = append(matched, event)
}
}
return
}

// Run the specified task.
func (r *Task) Run(cluster *Cluster) (started bool, err error) {
mark := time.Now()
Expand Down

0 comments on commit 0cbc431

Please sign in to comment.