diff --git a/task/manager.go b/task/manager.go index 0050feeb..3cc1336d 100644 --- a/task/manager.go +++ b/task/manager.go @@ -129,6 +129,7 @@ func (m *Manager) Run(ctx context.Context) { default: err := m.cluster.Refresh() if err == nil { + m.deleteOrphanCollectors() m.deleteOrphanPods() m.runActions() m.updateRunning() @@ -873,13 +874,12 @@ func (m *Manager) updateRunning() { running := task pod, found := running.Reflect(&m.cluster) if found { - err = m.attachCollector(task, pod) + err = m.ensureCollector(task, pod) if err != nil { Log.Error(err, "") continue } if task.StateIn(Succeeded, Failed) { - m.detachCollector(task, pod) err = m.podSnapshot(running, pod) if err != nil { Log.Error(err, "") @@ -1067,8 +1067,8 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) { return } -// attachCollector - ensure each container has a log collector attached. -func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) { +// ensureCollector - ensure each container has a log collector attached. +func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) { for _, container := range pod.Status.ContainerStatuses { if container.State.Waiting != nil { continue @@ -1097,16 +1097,11 @@ func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) { return } -// detachCollector ensures log collectors have been deleted from the registry. -func (m *Manager) detachCollector(task *Task, pod *core.Pod) { - for _, container := range pod.Status.ContainerStatuses { - key := fmt.Sprintf( - "%d.%s.%s", - task.ID, - pod.Name, - container.Name) - if _, found := m.collector[key]; found { - Log.Info("Collector detached.", "key", key) +// deleteOrphanCollectors delete orphaned collectors. +func (m *Manager) deleteOrphanCollectors() { + for key, collector := range m.collector { + _, found := m.cluster.Pod(collector.Pod.Name) + if !found { delete(m.collector, key) } }