Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 18, 2024
1 parent fab9700 commit 27201ef
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 25 deletions.
137 changes: 112 additions & 25 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,10 +2161,13 @@ type Preempt struct {

// LogCollector collect and report container logs.
type LogCollector struct {
nBuf int
Registry map[string]*LogCollector
DB *gorm.DB
Pod *core.Pod
Container *core.ContainerStatus
//
nSkip int64
}

// Begin - get container log and store in file.
Expand All @@ -2174,6 +2177,27 @@ type LogCollector struct {
// - Write (copy) log.
// - Unregister collector.
func (r *LogCollector) Begin(task *Task) (err error) {
reader, err := r.request()
if err != nil {
return
}
f, err := r.file(task)
if err != nil {
return
}
go func() {
defer func() {
_ = reader.Close()
_ = f.Close()
}()
err := r.copy(reader, f)
Log.Error(err, "")
}()
return
}

// request
func (r *LogCollector) request() (reader io.ReadCloser, err error) {
options := &core.PodLogOptions{
Container: r.Container.Name,
Follow: true,
Expand All @@ -2184,58 +2208,121 @@ func (r *LogCollector) Begin(task *Task) (err error) {
}
podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace)
req := podClient.GetLogs(r.Pod.Name, options)
reader, err := req.Stream(context.TODO())
reader, err = req.Stream(context.TODO())
if err != nil {
err = liberr.Wrap(err)
return
}
file := &model.File{Name: r.Container.Name + ".log"}
return
}

// name returns the canonical name for the container log.
func (r *LogCollector) name() (s string) {
s = r.Container.Name + ".log"
return
}

// file returns an attached log file for writing.
func (r *LogCollector) file(task *Task) (f *os.File, err error) {
f, found, err := r.find(task)
if found || err != nil {
return
}
f, err = r.create(task)
return
}

// find finds and opens an attached log file by name.
func (r *LogCollector) find(task *Task) (f *os.File, found bool, err error) {
var file model.File
name := r.name()
for _, attached := range task.Attached {
if attached.Name == name {
found = true
err = r.DB.First(&file, attached.ID).Error
if err != nil {
err = liberr.Wrap(err)
return
}
}
}
if !found {
return
}
f, err = os.OpenFile(file.Path, os.O_RDONLY|os.O_APPEND, 0666)
if err != nil {
err = liberr.Wrap(err)
return
}
st, err := f.Stat()
if err != nil {
err = liberr.Wrap(err)
return
}
r.nSkip = st.Size()
return
}

// create creates and attaches the log file.
func (r *LogCollector) create(task *Task) (f *os.File, err error) {
file := &model.File{Name: r.name()}
err = r.DB.Create(file).Error
if err != nil {
_ = reader.Close()
err = liberr.Wrap(err)
return
}
f, err := os.Create(file.Path)
f, err = os.Create(file.Path)
if err != nil {
_ = reader.Close()
_ = r.DB.Delete(file)
err = liberr.Wrap(err)
return
}
task.attach(file)
go func() {
defer func() {
_ = reader.Close()
_ = f.Close()
}()
err := r.copy(reader, f)
Log.Error(err, "")
}()
return
}

// copy data.
func (r *LogCollector) copy(reader io.Reader, f *os.File) (err error) {
buf := make([]byte, 0x8000)
// The read bytes are discarded when smaller than nSkip.
// The offset is adjusted when to account for the buffer
// containing bytes to be skipped and written.
func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) {
if r.nBuf < 1 {
r.nBuf = 0x8000
}
buf := make([]byte, r.nBuf)
for {
n, rErr := reader.Read(buf)
if n > 0 {
_, err = f.Write(buf[0:n])
if err != nil {
return
}
err = f.Sync()
if err != nil {
return
}
}
if rErr != nil {
if rErr != io.EOF {
err = rErr
}
break
}
nRead := int64(n)
if nRead == 0 {
continue
}
offset := int64(0)
if r.nSkip > 0 {
if nRead > r.nSkip {
offset = r.nSkip
r.nSkip = 0
} else {
r.nSkip -= nRead
continue
}
}
b := buf[offset:nRead]
_, err = writer.Write(b)
if err != nil {
return
}
if f, cast := writer.(*os.File); cast {
err = f.Sync()
if err != nil {
return
}
}
}
return
}
52 changes: 52 additions & 0 deletions task/task_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package task

import (
"bytes"
"testing"

crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1"
Expand Down Expand Up @@ -135,3 +136,54 @@ func TestPriorityGraph(t *testing.T) {
deps := pE.graph(ready[0], ready)
g.Expect(len(deps)).To(gomega.Equal(2))
}

func TestLogCollectorCopy(t *testing.T) {
g := gomega.NewGomegaWithT(t)

// no skipped bytes.
collector := LogCollector{}
content := "ABCDEFGHIJ"
reader := bytes.NewBufferString(content)
writer := bytes.NewBufferString("")
err := collector.copy(reader, writer)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

// number of skipped bytes smaller than buffer.
existing := "ABC"
collector = LogCollector{
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

// number of skipped bytes larger than buffer.
existing = "ABCD"
collector = LogCollector{
nBuf: 3,
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

// number of skipped bytes equals buffer.
existing = "ABCD"
collector = LogCollector{
nBuf: len(existing),
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))
}

0 comments on commit 27201ef

Please sign in to comment.