diff --git a/task/manager.go b/task/manager.go index 483ed542..c5943b50 100644 --- a/task/manager.go +++ b/task/manager.go @@ -2161,10 +2161,13 @@ type Preempt struct { // LogCollector collect and report container logs. type LogCollector struct { + nBuf int Registry map[string]*LogCollector DB *gorm.DB Pod *core.Pod Container *core.ContainerStatus + // + nSkip int64 } // Begin - get container log and store in file. @@ -2174,6 +2177,27 @@ type LogCollector struct { // - Write (copy) log. // - Unregister collector. func (r *LogCollector) Begin(task *Task) (err error) { + reader, err := r.request() + if err != nil { + return + } + f, err := r.file(task) + if err != nil { + return + } + go func() { + defer func() { + _ = reader.Close() + _ = f.Close() + }() + err := r.copy(reader, f) + Log.Error(err, "") + }() + return +} + +// request +func (r *LogCollector) request() (reader io.ReadCloser, err error) { options := &core.PodLogOptions{ Container: r.Container.Name, Follow: true, @@ -2184,58 +2208,121 @@ func (r *LogCollector) Begin(task *Task) (err error) { } podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace) req := podClient.GetLogs(r.Pod.Name, options) - reader, err := req.Stream(context.TODO()) + reader, err = req.Stream(context.TODO()) if err != nil { err = liberr.Wrap(err) return } - file := &model.File{Name: r.Container.Name + ".log"} + return +} + +// name returns the canonical name for the container log. +func (r *LogCollector) name() (s string) { + s = r.Container.Name + ".log" + return +} + +// file returns an attached log file for writing. +func (r *LogCollector) file(task *Task) (f *os.File, err error) { + f, found, err := r.find(task) + if found || err != nil { + return + } + f, err = r.create(task) + return +} + +// find finds and opens an attached log file by name. +func (r *LogCollector) find(task *Task) (f *os.File, found bool, err error) { + var file model.File + name := r.name() + for _, attached := range task.Attached { + if attached.Name == name { + found = true + err = r.DB.First(&file, attached.ID).Error + if err != nil { + err = liberr.Wrap(err) + return + } + } + } + if !found { + return + } + f, err = os.OpenFile(file.Path, os.O_RDONLY|os.O_APPEND, 0666) + if err != nil { + err = liberr.Wrap(err) + return + } + st, err := f.Stat() + if err != nil { + err = liberr.Wrap(err) + return + } + r.nSkip = st.Size() + return +} + +// create creates and attaches the log file. +func (r *LogCollector) create(task *Task) (f *os.File, err error) { + file := &model.File{Name: r.name()} err = r.DB.Create(file).Error if err != nil { - _ = reader.Close() err = liberr.Wrap(err) return } - f, err := os.Create(file.Path) + f, err = os.Create(file.Path) if err != nil { - _ = reader.Close() _ = r.DB.Delete(file) err = liberr.Wrap(err) return } task.attach(file) - go func() { - defer func() { - _ = reader.Close() - _ = f.Close() - }() - err := r.copy(reader, f) - Log.Error(err, "") - }() return } // copy data. -func (r *LogCollector) copy(reader io.Reader, f *os.File) (err error) { - buf := make([]byte, 0x8000) +// The read bytes are discarded when smaller than nSkip. +// The offset is adjusted when to account for the buffer +// containing bytes to be skipped and written. +func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) { + if r.nBuf < 1 { + r.nBuf = 0x8000 + } + buf := make([]byte, r.nBuf) for { n, rErr := reader.Read(buf) - if n > 0 { - _, err = f.Write(buf[0:n]) - if err != nil { - return - } - err = f.Sync() - if err != nil { - return - } - } if rErr != nil { if rErr != io.EOF { err = rErr } break } + nRead := int64(n) + if nRead == 0 { + continue + } + offset := int64(0) + if r.nSkip > 0 { + if nRead > r.nSkip { + offset = r.nSkip + r.nSkip = 0 + } else { + r.nSkip -= nRead + continue + } + } + b := buf[offset:nRead] + _, err = writer.Write(b) + if err != nil { + return + } + if f, cast := writer.(*os.File); cast { + err = f.Sync() + if err != nil { + return + } + } } return } diff --git a/task/task_test.go b/task/task_test.go index e088769e..e03a2209 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "testing" crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1" @@ -135,3 +136,54 @@ func TestPriorityGraph(t *testing.T) { deps := pE.graph(ready[0], ready) g.Expect(len(deps)).To(gomega.Equal(2)) } + +func TestLogCollectorCopy(t *testing.T) { + g := gomega.NewGomegaWithT(t) + + // no skipped bytes. + collector := LogCollector{} + content := "ABCDEFGHIJ" + reader := bytes.NewBufferString(content) + writer := bytes.NewBufferString("") + err := collector.copy(reader, writer) + g.Expect(err).To(gomega.BeNil()) + g.Expect(content).To(gomega.Equal(writer.String())) + + // number of skipped bytes smaller than buffer. + existing := "ABC" + collector = LogCollector{ + nSkip: int64(len(existing)), + } + content = "ABCDEFGHIJ" + reader = bytes.NewBufferString(content) + writer = bytes.NewBufferString(existing) + err = collector.copy(reader, writer) + g.Expect(err).To(gomega.BeNil()) + g.Expect(content).To(gomega.Equal(writer.String())) + + // number of skipped bytes larger than buffer. + existing = "ABCD" + collector = LogCollector{ + nBuf: 3, + nSkip: int64(len(existing)), + } + content = "ABCDEFGHIJ" + reader = bytes.NewBufferString(content) + writer = bytes.NewBufferString(existing) + err = collector.copy(reader, writer) + g.Expect(err).To(gomega.BeNil()) + g.Expect(content).To(gomega.Equal(writer.String())) + + // number of skipped bytes equals buffer. + existing = "ABCD" + collector = LogCollector{ + nBuf: len(existing), + nSkip: int64(len(existing)), + } + content = "ABCDEFGHIJ" + reader = bytes.NewBufferString(content) + writer = bytes.NewBufferString(existing) + err = collector.copy(reader, writer) + g.Expect(err).To(gomega.BeNil()) + g.Expect(content).To(gomega.Equal(writer.String())) +}