Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 17, 2024
1 parent 23dbcbd commit 9ad66c6
Showing 1 changed file with 9 additions and 14 deletions.
23 changes: 9 additions & 14 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (m *Manager) Run(ctx context.Context) {
default:
err := m.cluster.Refresh()
if err == nil {
m.deleteOrphanCollectors()
m.deleteOrphanPods()
m.runActions()
m.updateRunning()
Expand Down Expand Up @@ -873,13 +874,12 @@ func (m *Manager) updateRunning() {
running := task
pod, found := running.Reflect(&m.cluster)
if found {
err = m.attachCollector(task, pod)
err = m.ensureCollector(task, pod)
if err != nil {
Log.Error(err, "")
continue
}
if task.StateIn(Succeeded, Failed) {
m.detachCollector(task, pod)
err = m.podSnapshot(running, pod)
if err != nil {
Log.Error(err, "")
Expand Down Expand Up @@ -1067,8 +1067,8 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) {
return
}

// attachCollector - ensure each container has a log collector attached.
func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) {
// ensureCollector - ensure each container has a log collector attached.
func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
Expand Down Expand Up @@ -1097,16 +1097,11 @@ func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) {
return
}

// detachCollector ensures log collectors have been deleted from the registry.
func (m *Manager) detachCollector(task *Task, pod *core.Pod) {
for _, container := range pod.Status.ContainerStatuses {
key := fmt.Sprintf(
"%d.%s.%s",
task.ID,
pod.Name,
container.Name)
if _, found := m.collector[key]; found {
Log.Info("Collector detached.", "key", key)
// deleteOrphanCollectors delete orphaned collectors.
func (m *Manager) deleteOrphanCollectors() {
for key, collector := range m.collector {
_, found := m.cluster.Pod(collector.Pod.Name)
if !found {
delete(m.collector, key)
}
}
Expand Down

0 comments on commit 9ad66c6

Please sign in to comment.