diff --git a/task/manager.go b/task/manager.go index bb6c3a46..7efca2d7 100644 --- a/task/manager.go +++ b/task/manager.go @@ -107,15 +107,18 @@ type Manager struct { cluster Cluster // queue of actions. queue chan func() - // collector registry. - collector map[string]*LogCollector + // logManager provides pod log collection. + logManager LogManager } // Run the manager. func (m *Manager) Run(ctx context.Context) { m.queue = make(chan func(), 100) - m.collector = make(map[string]*LogCollector) m.cluster.Client = m.Client + m.logManager = LogManager{ + collector: make(map[string]*LogCollector), + DB: m.DB, + } auth.Validators = append( auth.Validators, &Validator{ @@ -131,10 +134,9 @@ func (m *Manager) Run(ctx context.Context) { default: err := m.cluster.Refresh() if err == nil { - m.deleteOrphanCollector() m.deleteOrphanPods() m.runActions() - m.updateRunning() + m.updateRunning(ctx) m.deleteZombies() m.startReady() m.pause() @@ -847,7 +849,7 @@ func (m *Manager) preempt(list []*Task) (err error) { } // updateRunning tasks to reflect pod state. -func (m *Manager) updateRunning() { +func (m *Manager) updateRunning(ctx context.Context) { var err error defer func() { Log.Error(err, "") @@ -876,7 +878,7 @@ func (m *Manager) updateRunning() { running := task pod, found := running.Reflect(&m.cluster) if found { - err = m.ensureCollector(task, pod) + err = m.logManager.EnsureCollection(task, pod, ctx) if err != nil { Log.Error(err, "") continue @@ -1101,41 +1103,6 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) { return } -// ensureCollector - ensure each container has a log collector attached. -func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) { - for _, container := range pod.Status.ContainerStatuses { - if container.State.Waiting != nil { - continue - } - key := pod.Name + "." + container.Name - if _, found := m.collector[key]; found { - continue - } - collector := &LogCollector{ - Registry: m.collector, - DB: m.DB, - Pod: pod, - Container: &container, - } - err = collector.Begin(task) - if err != nil { - return - } - m.collector[key] = collector - } - return -} - -// deleteOrphanCollector delete orphaned collectors. -func (m *Manager) deleteOrphanCollector() { - for key, collector := range m.collector { - _, found := m.cluster.Pod(collector.Pod.Name) - if !found { - delete(m.collector, key) - } - } -} - // ensureTerminated - Terminate running containers. func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) { for _, status := range pod.Status.ContainerStatuses { @@ -2195,12 +2162,13 @@ type Preempt struct { // LogCollector collect and report container logs. type LogCollector struct { - nBuf int + Owner *LogManager Registry map[string]*LogCollector DB *gorm.DB Pod *core.Pod Container *core.ContainerStatus // + nBuf int nSkip int64 } @@ -2210,7 +2178,7 @@ type LogCollector struct { // - Register collector. // - Write (copy) log. // - Unregister collector. -func (r *LogCollector) Begin(task *Task) (err error) { +func (r *LogCollector) Begin(task *Task, ctx context.Context) (err error) { reader, err := r.request() if err != nil { return @@ -2223,13 +2191,20 @@ func (r *LogCollector) Begin(task *Task) (err error) { defer func() { _ = reader.Close() _ = f.Close() + r.Owner.terminated(r) }() - err := r.copy(reader, f) + err := r.copy(reader, f, ctx) Log.Error(err, "") }() return } +// key returns the collector key. +func (r *LogCollector) key() (key string) { + key = r.Pod.Name + "." + r.Container.Name + return +} + // request func (r *LogCollector) request() (reader io.ReadCloser, err error) { options := &core.PodLogOptions{ @@ -2319,21 +2294,52 @@ func (r *LogCollector) create(task *Task) (f *os.File, err error) { // The read bytes are discarded when smaller than nSkip. // The offset is adjusted when to account for the buffer // containing bytes to be skipped and written. -func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) { +func (r *LogCollector) copy(reader io.ReadCloser, writer io.Writer, ctx context.Context) (err error) { + timer := time.NewTimer(time.Second) + canceled := false + readCh := make(chan []byte) if r.nBuf < 1 { r.nBuf = 0x8000 } - buf := make([]byte, r.nBuf) + go func() { + defer func() { + close(readCh) + }() + for { + buf := make([]byte, r.nBuf) + n, err := reader.Read(buf) + if err != nil { + if canceled { + return + } + if err != io.EOF { + Log.Error(err, "") + } + break + } + readCh <- buf[:n] + } + }() for { - n, rErr := reader.Read(buf) - if rErr != nil { - if rErr != io.EOF { - err = rErr + var buf []byte + nRead := int64(-1) + select { + case <-ctx.Done(): + canceled = true + _ = reader.Close() + return + case b, read := <-readCh: + if read { + nRead = int64(len(b)) + buf = b } + case <-timer.C: + nRead = 0 + } + if nRead == -1 { // EOF. break } - nRead := int64(n) - if nRead == 0 { + if nRead == 0 { // Timeout. continue } offset := int64(0) @@ -2346,7 +2352,7 @@ func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) { continue } } - b := buf[offset:nRead] + b := buf[offset:] _, err = writer.Write(b) if err != nil { return @@ -2360,3 +2366,54 @@ func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) { } return } + +// LogManager manages log collectors. +type LogManager struct { + // collector registry. + collector map[string]*LogCollector + // mutex + mutex sync.Mutex + // DB + DB *gorm.DB +} + +// EnsureCollection - ensure each container has a log collector. +func (m *LogManager) EnsureCollection(task *Task, pod *core.Pod, ctx context.Context) (err error) { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, container := range pod.Status.ContainerStatuses { + if container.State.Waiting != nil { + continue + } + collector := &LogCollector{ + Owner: m, + Registry: m.collector, + DB: m.DB, + Pod: pod, + Container: &container, + } + key := collector.key() + if _, found := m.collector[key]; found { + continue + } + err = collector.Begin(task, ctx) + if err != nil { + return + } + m.collector[key] = collector + } + return +} + +// terminated provides notification that a collector has terminated. +func (m *LogManager) terminated(collector *LogCollector) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.collector { + if collector == m.collector[i] { + key := collector.key() + delete(m.collector, key) + break + } + } +} diff --git a/task/task_test.go b/task/task_test.go index e03a2209..04136167 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -2,6 +2,8 @@ package task import ( "bytes" + "context" + "io" "testing" crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1" @@ -140,12 +142,14 @@ func TestPriorityGraph(t *testing.T) { func TestLogCollectorCopy(t *testing.T) { g := gomega.NewGomegaWithT(t) + ctx := context.Background() + // no skipped bytes. collector := LogCollector{} content := "ABCDEFGHIJ" - reader := bytes.NewBufferString(content) + reader := io.NopCloser(bytes.NewBufferString(content)) writer := bytes.NewBufferString("") - err := collector.copy(reader, writer) + err := collector.copy(reader, writer, ctx) g.Expect(err).To(gomega.BeNil()) g.Expect(content).To(gomega.Equal(writer.String())) @@ -155,9 +159,9 @@ func TestLogCollectorCopy(t *testing.T) { nSkip: int64(len(existing)), } content = "ABCDEFGHIJ" - reader = bytes.NewBufferString(content) + reader = io.NopCloser(bytes.NewBufferString(content)) writer = bytes.NewBufferString(existing) - err = collector.copy(reader, writer) + err = collector.copy(reader, writer, ctx) g.Expect(err).To(gomega.BeNil()) g.Expect(content).To(gomega.Equal(writer.String())) @@ -168,9 +172,9 @@ func TestLogCollectorCopy(t *testing.T) { nSkip: int64(len(existing)), } content = "ABCDEFGHIJ" - reader = bytes.NewBufferString(content) + reader = io.NopCloser(bytes.NewBufferString(content)) writer = bytes.NewBufferString(existing) - err = collector.copy(reader, writer) + err = collector.copy(reader, writer, ctx) g.Expect(err).To(gomega.BeNil()) g.Expect(content).To(gomega.Equal(writer.String())) @@ -181,9 +185,9 @@ func TestLogCollectorCopy(t *testing.T) { nSkip: int64(len(existing)), } content = "ABCDEFGHIJ" - reader = bytes.NewBufferString(content) + reader = io.NopCloser(bytes.NewBufferString(content)) writer = bytes.NewBufferString(existing) - err = collector.copy(reader, writer) + err = collector.copy(reader, writer, ctx) g.Expect(err).To(gomega.BeNil()) g.Expect(content).To(gomega.Equal(writer.String())) }