From 1f9c6daf63b95544b9516125eb2dec5ac10866a2 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Sun, 28 Jul 2024 13:00:25 -0700 Subject: [PATCH 1/9] Terminate pod. Signed-off-by: Jeff Ortel --- go.mod | 1 + go.sum | 5 ++++ task/manager.go | 74 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 75022634b..6ac197c8d 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index 88f63ca3c..8478cb168 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= @@ -29,6 +30,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -121,6 +123,7 @@ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -162,6 +165,8 @@ github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6 github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= +github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/task/manager.go b/task/manager.go index 914c23999..683d44b2e 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "errors" "fmt" @@ -30,7 +31,10 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" k8s "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" ) // States @@ -860,9 +864,6 @@ func (m *Manager) updateRunning() { list = append(list, &Task{task}) } for _, task := range list { - if !task.StateIn(Running, Pending) { - continue - } running := task pod, found := running.Reflect(&m.cluster) if found { @@ -872,10 +873,13 @@ func (m *Manager) updateRunning() { Log.Error(err, "") continue } - err = running.Delete(m.Client) + err = m.ensureTerminated(pod) if err != nil { - Log.Error(err, "") - continue + err = running.Delete(m.Client) + if err != nil { + Log.Error(err, "") + continue + } } } } @@ -1068,6 +1072,64 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil return } +// ensureTerminated - Terminate running containers. +func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { + for _, status := range pod.Status.ContainerStatuses { + if status.State.Terminated != nil { + continue + } + if status.Started == nil || !*status.Started { + continue + } + err = m.terminateContainer(pod, status.Name) + if err != nil { + return + } + } + return +} + +// terminateContainer - +func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error) { + clientSet, err := k8s2.NewClientSet() + if err != nil { + return + } + cmd := []string{ + "sh", + "-c", + "kill 1", + } + req := clientSet.CoreV1().RESTClient().Post() + req = req.Resource("pods") + req = req.Name(pod.Name) + req = req.Namespace(pod.Namespace) + req = req.SubResource("exec") + option := &core.PodExecOptions{ + Command: cmd, + Container: container, + Stdout: true, + Stderr: true, + TTY: true, + } + req.VersionedParams( + option, + scheme.ParameterCodec, + ) + cfg, _ := config.GetConfig() + exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) + if err != nil { + return + } + stdout := bytes.NewBuffer([]byte{}) + stderr := bytes.NewBuffer([]byte{}) + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: stdout, + Stderr: stderr, + }) + return +} + // Task is an runtime task. type Task struct { // model. From 97a0095679cb284a00927c4e6d539a5cfc391188 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Sun, 28 Jul 2024 13:01:17 -0700 Subject: [PATCH 2/9] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task/manager.go b/task/manager.go index 683d44b2e..df20faa4c 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1089,7 +1089,7 @@ func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { return } -// terminateContainer - +// terminateContainer - Terminate container as needed. func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error) { clientSet, err := k8s2.NewClientSet() if err != nil { From 6f3d54e6b0247f0ebfe7dd99abd9ddae423801e5 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Sun, 28 Jul 2024 13:17:26 -0700 Subject: [PATCH 3/9] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/task/manager.go b/task/manager.go index df20faa4c..1fe9a3ead 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1091,6 +1091,7 @@ func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { // terminateContainer - Terminate container as needed. func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error) { + Log.V(1).Info("KILL container", "container", container) clientSet, err := k8s2.NewClientSet() if err != nil { return @@ -1127,6 +1128,21 @@ func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error Stdout: stdout, Stderr: stderr, }) + if err != nil { + Log.Info( + "Container KILL failed.", + "name", + container, + "err", + err.Error(), + "stderr", + stderr.String()) + } else { + Log.Info( + "Container KILLED.", + "name", + container) + } return } From 4d895c18e2b289d3759cb86b7efd9f2e11ca5bd9 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 29 Jul 2024 07:06:10 -0700 Subject: [PATCH 4/9] pod retention. Signed-off-by: Jeff Ortel --- reaper/task.go | 49 +++++++++++++++++++++++++++++++------------------ settings/hub.go | 22 ++++++++++++++++++++++ task/manager.go | 15 +++++++++++++-- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/reaper/task.go b/reaper/task.go index 3f73432f1..6f64ec41c 100644 --- a/reaper/task.go +++ b/reaper/task.go @@ -44,7 +44,7 @@ type TaskReaper struct { // - Pod is deleted after the defined period. func (r *TaskReaper) Run() { Log.V(1).Info("Reaping tasks.") - list := []model.Task{} + list := []task.Task{} result := r.DB.Find( &list, "state IN ?", @@ -52,6 +52,7 @@ func (r *TaskReaper) Run() { task.Created, task.Succeeded, task.Failed, + task.Canceled, }) Log.Error(result.Error, "") if result.Error != nil { @@ -108,6 +109,10 @@ func (r *TaskReaper) Run() { r.release(m) } } + d := time.Duration(Settings.Hub.Task.Pod.Retention.Succeeded) * Unit + if time.Since(mark) > d { + r.podDelete(m) + } case task.Failed: mark := m.CreateTime if m.Terminated != nil { @@ -124,23 +129,17 @@ func (r *TaskReaper) Run() { r.release(m) } } + d := time.Duration(Settings.Hub.Task.Pod.Retention.Failed) * Unit + if time.Since(mark) > d { + r.podDelete(m) + } } } } -// release resources. -func (r *TaskReaper) release(m *model.Task) { +// release bucket and file resources. +func (r *TaskReaper) release(m *task.Task) { nChanged := 0 - if m.Pod != "" { - rt := Task{Task: m} - err := rt.Delete(r.Client) - if err == nil { - m.Pod = "" - nChanged++ - } else { - Log.Error(err, "") - } - } if m.HasBucket() { Log.Info("Task bucket released.", "id", m.ID) m.SetBucket(nil) @@ -151,8 +150,7 @@ func (r *TaskReaper) release(m *model.Task) { nChanged++ } if nChanged > 0 { - rt := task.Task{Task: m} - rt.Event(task.Released) + m.Event(task.Released) err := r.DB.Save(m).Error if err != nil { Log.Error(err, "") @@ -161,10 +159,25 @@ func (r *TaskReaper) release(m *model.Task) { return } +// podDelete deletes the task pod. +func (r *TaskReaper) podDelete(m *task.Task) { + if m.Pod == "" { + return + } + err := m.Delete(r.Client) + if err != nil { + Log.Error(err, "") + return + } + err = r.DB.Save(m).Error + if err != nil { + Log.Error(err, "") + } +} + // delete task. -func (r *TaskReaper) delete(m *model.Task) { - rt := Task{Task: m} - err := rt.Delete(r.Client) +func (r *TaskReaper) delete(m *task.Task) { + err := m.Delete(r.Client) if err != nil { Log.Error(err, "") } diff --git a/settings/hub.go b/settings/hub.go index d21e14813..a215177e0 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -19,6 +19,8 @@ const ( EnvTaskReapCreated = "TASK_REAP_CREATED" EnvTaskReapSucceeded = "TASK_REAP_SUCCEEDED" EnvTaskReapFailed = "TASK_REAP_FAILED" + EnvTaskPodRetainSucceeded = "TASK_POD_RETAIN_SUCCEEDED" + EnvTaskPodRetainFailed = "TASK_POD_RETAIN_FAILED" EnvTaskSA = "TASK_SA" EnvTaskRetries = "TASK_RETRIES" EnvTaskPreemptEnabled = "TASK_PREEMPT_ENABLED" @@ -84,6 +86,12 @@ type Hub struct { Postponed time.Duration Rate int } + Pod struct { + Retention struct { + Succeeded int + Failed int + } + } } // Frequency Frequency struct { @@ -169,6 +177,20 @@ func (r *Hub) Load() (err error) { } else { r.Task.Reaper.Failed = 43200 // 720 hours (30 days). } + s, found = os.LookupEnv(EnvTaskPodRetainSucceeded) + if found { + n, _ := strconv.Atoi(s) + r.Task.Pod.Retention.Succeeded = n + } else { + r.Task.Pod.Retention.Succeeded = 1 + } + s, found = os.LookupEnv(EnvTaskPodRetainFailed) + if found { + n, _ := strconv.Atoi(s) + r.Task.Pod.Retention.Failed = n + } else { + r.Task.Pod.Retention.Failed = 4320 // 72 hours. + } r.Task.SA, found = os.LookupEnv(EnvTaskSA) if !found { r.Task.SA = "tackle-hub" diff --git a/task/manager.go b/task/manager.go index 1fe9a3ead..d9de411da 100644 --- a/task/manager.go +++ b/task/manager.go @@ -873,8 +873,19 @@ func (m *Manager) updateRunning() { Log.Error(err, "") continue } - err = m.ensureTerminated(pod) - if err != nil { + podRetention := 0 + if running.State == Succeeded { + podRetention = Settings.Hub.Task.Pod.Retention.Succeeded + } else { + podRetention = Settings.Hub.Task.Pod.Retention.Failed + } + if podRetention > 0 { + err = m.ensureTerminated(pod) + if err != nil { + podRetention = 0 + } + } + if podRetention == 0 { err = running.Delete(m.Client) if err != nil { Log.Error(err, "") From e1b55c37472e69177259c05af5a09d928bbeb79a Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 29 Jul 2024 13:33:17 -0700 Subject: [PATCH 5/9] checkpoint Signed-off-by: Jeff Ortel --- task/error.go | 20 +++++++++++++++ task/manager.go | 66 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/task/error.go b/task/error.go index 5063de5d1..2b840ae3d 100644 --- a/task/error.go +++ b/task/error.go @@ -327,3 +327,23 @@ func (e *QuotaExceeded) Retry() (r bool) { r = true return } + +// NotTerminated report pod/container not terminated. +type NotTerminated struct { + Kind string + Name string +} + +func (e *NotTerminated) Error() string { + return fmt.Sprintf("%s: '%s' not terminated as expected.", e.Kind, e.Name) +} + +func (e *NotTerminated) Is(err error) (matched bool) { + var inst *NotTerminated + matched = errors.As(err, &inst) + return +} + +func (e *NotTerminated) Retry() (r bool) { + return +} diff --git a/task/manager.go b/task/manager.go index d9de411da..612d68686 100644 --- a/task/manager.go +++ b/task/manager.go @@ -55,19 +55,20 @@ const ( // Events const ( - AddonSelected = "AddonSelected" - ExtSelected = "ExtensionSelected" - ImageError = "ImageError" - PodNotFound = "PodNotFound" - PodCreated = "PodCreated" - PodPending = "PodPending" - PodRunning = "PodRunning" - Preempted = "Preempted" - PodSucceeded = "PodSucceeded" - PodFailed = "PodFailed" - PodDeleted = "PodDeleted" - Escalated = "Escalated" - Released = "Released" + AddonSelected = "AddonSelected" + ExtSelected = "ExtensionSelected" + ImageError = "ImageError" + PodNotFound = "PodNotFound" + PodCreated = "PodCreated" + PodPending = "PodPending" + PodRunning = "PodRunning" + Preempted = "Preempted" + PodSucceeded = "PodSucceeded" + PodFailed = "PodFailed" + PodDeleted = "PodDeleted" + Escalated = "Escalated" + Released = "Released" + ContainerKilled = "ContainerKilled" ) // k8s labels. @@ -880,7 +881,7 @@ func (m *Manager) updateRunning() { podRetention = Settings.Hub.Task.Pod.Retention.Failed } if podRetention > 0 { - err = m.ensureTerminated(pod) + err = m.ensureTerminated(running, pod) if err != nil { podRetention = 0 } @@ -1084,7 +1085,7 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil } // ensureTerminated - Terminate running containers. -func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { +func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) { for _, status := range pod.Status.ContainerStatuses { if status.State.Terminated != nil { continue @@ -1092,7 +1093,7 @@ func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { if status.Started == nil || !*status.Started { continue } - err = m.terminateContainer(pod, status.Name) + err = m.terminateContainer(task, pod, status.Name) if err != nil { return } @@ -1101,7 +1102,23 @@ func (m *Manager) ensureTerminated(pod *core.Pod) (err error) { } // terminateContainer - Terminate container as needed. -func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error) { +// The container is killed. +// Should the container continue to run after (1) minute, +// it is reported as an error. +func (m *Manager) terminateContainer(task *Task, pod *core.Pod, container string) (err error) { + matched := task.FindEvent(ContainerKilled) + if len(matched) > 0 { + for _, event := range matched { + if time.Since(event.Last) > time.Minute { + err = &NotTerminated{ + Kind: "Container", + Name: container, + } + break + } + } + return + } Log.V(1).Info("KILL container", "container", container) clientSet, err := k8s2.NewClientSet() if err != nil { @@ -1149,6 +1166,10 @@ func (m *Manager) terminateContainer(pod *core.Pod, container string) (err error "stderr", stderr.String()) } else { + task.Event( + ContainerKilled, + "container: '%s' has not terminated.", + container) Log.Info( "Container KILLED.", "name", @@ -1229,6 +1250,17 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) { return } +// FindEvent returns the matched events by kind. +func (r *Task) FindEvent(kind string) (matched []*model.TaskEvent) { + for i := 0; i < len(r.Events); i++ { + event := &r.Events[i] + if kind == event.Kind { + matched = append(matched, event) + } + } + return +} + // Run the specified task. func (r *Task) Run(cluster *Cluster) (started bool, err error) { mark := time.Now() From a1aa8012141b6153cd057213ecc52c54d2a4c51c Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 29 Jul 2024 14:02:41 -0700 Subject: [PATCH 6/9] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/task/manager.go b/task/manager.go index 612d68686..d37673108 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1106,19 +1106,6 @@ func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) { // Should the container continue to run after (1) minute, // it is reported as an error. func (m *Manager) terminateContainer(task *Task, pod *core.Pod, container string) (err error) { - matched := task.FindEvent(ContainerKilled) - if len(matched) > 0 { - for _, event := range matched { - if time.Since(event.Last) > time.Minute { - err = &NotTerminated{ - Kind: "Container", - Name: container, - } - break - } - } - return - } Log.V(1).Info("KILL container", "container", container) clientSet, err := k8s2.NewClientSet() if err != nil { From 029e936bcbba5bba106939280f88ab4587518a3b Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 29 Jul 2024 14:03:20 -0700 Subject: [PATCH 7/9] checkpoint Signed-off-by: Jeff Ortel --- task/error.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/task/error.go b/task/error.go index 2b840ae3d..5063de5d1 100644 --- a/task/error.go +++ b/task/error.go @@ -327,23 +327,3 @@ func (e *QuotaExceeded) Retry() (r bool) { r = true return } - -// NotTerminated report pod/container not terminated. -type NotTerminated struct { - Kind string - Name string -} - -func (e *NotTerminated) Error() string { - return fmt.Sprintf("%s: '%s' not terminated as expected.", e.Kind, e.Name) -} - -func (e *NotTerminated) Is(err error) (matched bool) { - var inst *NotTerminated - matched = errors.As(err, &inst) - return -} - -func (e *NotTerminated) Retry() (r bool) { - return -} From b379a5d2d8d048c5fae472e0f5f2ac75e80aa301 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Sat, 3 Aug 2024 05:52:45 -0700 Subject: [PATCH 8/9] Add zimbie killer. Signed-off-by: Jeff Ortel --- task/manager.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/task/manager.go b/task/manager.go index d37673108..f7e3f1178 100644 --- a/task/manager.go +++ b/task/manager.go @@ -128,6 +128,7 @@ func (m *Manager) Run(ctx context.Context) { m.deleteOrphanPods() m.runActions() m.updateRunning() + m.deleteZombies() m.startReady() m.pause() } else { @@ -904,6 +905,56 @@ func (m *Manager) updateRunning() { } } +// deleteZombies - detect and delete zombie pods. +// A zombie is a (succeed|failed) task with a running pod that +// the manager has previously tried to kill. +func (m *Manager) deleteZombies() { + var err error + defer func() { + Log.Error(err, "") + }() + var pods []string + for _, pod := range m.cluster.Pods() { + if pod.Status.Phase == core.PodRunning { + ref := path.Join(pod.Namespace, pod.Name) + pods = append( + pods, + ref) + } + } + fetched := []*Task{} + db := m.DB.Select("Events") + db = db.Where("Pod", pods) + db = db.Where("state IN ?", + []string{ + Succeeded, + Failed, + }) + err = db.Find(&fetched).Error + if err != nil { + err = liberr.Wrap(err) + return + } + for _, task := range fetched { + event, found := task.LastEvent(ContainerKilled) + if !found { + continue + } + if time.Since(event.Last) > time.Minute { + Log.Info( + "Zombie detected.", + "task", + task.ID, + "pod", + task.Pod) + err = task.Delete(m.Client) + if err != nil { + Log.Error(err, "") + } + } + } +} + // deleteOrphanPods finds and deletes task pods not referenced by a task. func (m *Manager) deleteOrphanPods() { var err error From 90fba5e3f6a22df10a489e813a254f1cada6ec84 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Thu, 15 Aug 2024 06:37:45 -0700 Subject: [PATCH 9/9] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task/manager.go b/task/manager.go index f7e3f1178..5bd34fcf4 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1206,7 +1206,7 @@ func (m *Manager) terminateContainer(task *Task, pod *core.Pod, container string } else { task.Event( ContainerKilled, - "container: '%s' has not terminated.", + "container: '%s' had not terminated.", container) Log.Info( "Container KILLED.",