Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 19, 2024
1 parent aa541e7 commit 6ef2f59
Showing 1 changed file with 51 additions and 51 deletions.
102 changes: 51 additions & 51 deletions task/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,57 @@ import (
core "k8s.io/api/core/v1"
)

// LogManager manages log collectors.
type LogManager struct {
// collector registry.
collector map[string]*LogCollector
// mutex
mutex sync.Mutex
// DB
DB *gorm.DB
}

// EnsureCollection - ensure each container has a log collector.
func (m *LogManager) EnsureCollection(task *Task, pod *core.Pod, ctx context.Context) (err error) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
}
collector := &LogCollector{
Owner: m,
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
key := collector.key()
if _, found := m.collector[key]; found {
continue
}
err = collector.Begin(task, ctx)
if err != nil {
return
}
m.collector[key] = collector
}
return
}

// terminated provides notification that a collector has terminated.
func (m *LogManager) terminated(collector *LogCollector) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.collector {
if collector == m.collector[i] {
key := collector.key()
delete(m.collector, key)
break
}
}
}

// LogCollector collect and report container logs.
type LogCollector struct {
Owner *LogManager
Expand Down Expand Up @@ -220,54 +271,3 @@ func (r *LogCollector) copy(reader io.ReadCloser, writer io.Writer, ctx context.
}
return
}

// LogManager manages log collectors.
type LogManager struct { // LogCenter
// collector registry.
collector map[string]*LogCollector
// mutex
mutex sync.Mutex
// DB
DB *gorm.DB
}

// EnsureCollection - ensure each container has a log collector.
func (m *LogManager) EnsureCollection(task *Task, pod *core.Pod, ctx context.Context) (err error) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
}
collector := &LogCollector{
Owner: m,
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
key := collector.key()
if _, found := m.collector[key]; found {
continue
}
err = collector.Begin(task, ctx)
if err != nil {
return
}
m.collector[key] = collector
}
return
}

// terminated provides notification that a collector has terminated.
func (m *LogManager) terminated(collector *LogCollector) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.collector {
if collector == m.collector[i] {
key := collector.key()
delete(m.collector, key)
break
}
}
}

0 comments on commit 6ef2f59

Please sign in to comment.