Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 17, 2024
1 parent 9ad66c6 commit fab9700
Showing 1 changed file with 8 additions and 13 deletions.
21 changes: 8 additions & 13 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Manager) Run(ctx context.Context) {
default:
err := m.cluster.Refresh()
if err == nil {
m.deleteOrphanCollectors()
m.deleteOrphanCollector()
m.deleteOrphanPods()
m.runActions()
m.updateRunning()
Expand Down Expand Up @@ -1073,22 +1073,17 @@ func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) {
if container.State.Waiting != nil {
continue
}
key := fmt.Sprintf(
"%d.%s.%s",
task.ID,
pod.Name,
container.Name)
key := pod.Name + "." + container.Name
if _, found := m.collector[key]; found {
continue
}
Log.Info("Collector attached.", "key", key)
collector := &LogCollector{
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
err = collector.Attach(task)
err = collector.Begin(task)
if err != nil {
return
}
Expand All @@ -1097,8 +1092,8 @@ func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) {
return
}

// deleteOrphanCollectors delete orphaned collectors.
func (m *Manager) deleteOrphanCollectors() {
// deleteOrphanCollector delete orphaned collectors.
func (m *Manager) deleteOrphanCollector() {
for key, collector := range m.collector {
_, found := m.cluster.Pod(collector.Pod.Name)
if !found {
Expand Down Expand Up @@ -2172,13 +2167,13 @@ type LogCollector struct {
Container *core.ContainerStatus
}

// Attach - get container log and store in file.
// Begin - get container log and store in file.
// - Request logs.
// - Create file resource and attach to the task.
// - Register collector.
// - Write (copy) log.
// - Unregister collector.
func (r *LogCollector) Attach(task *Task) (err error) {
func (r *LogCollector) Begin(task *Task) (err error) {
options := &core.PodLogOptions{
Container: r.Container.Name,
Follow: true,
Expand Down Expand Up @@ -2236,7 +2231,7 @@ func (r *LogCollector) copy(reader io.Reader, f *os.File) (err error) {
}
}
if rErr != nil {
if rErr == io.EOF {
if rErr != io.EOF {
err = rErr
}
break
Expand Down

0 comments on commit fab9700

Please sign in to comment.