Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 19, 2024
1 parent 46ebf82 commit fa44493
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 62 deletions.
165 changes: 111 additions & 54 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,18 @@ type Manager struct {
cluster Cluster
// queue of actions.
queue chan func()
// collector registry.
collector map[string]*LogCollector
// logManager provides pod log collection.
logManager LogManager
}

// Run the manager.
func (m *Manager) Run(ctx context.Context) {
m.queue = make(chan func(), 100)
m.collector = make(map[string]*LogCollector)
m.cluster.Client = m.Client
m.logManager = LogManager{
collector: make(map[string]*LogCollector),
DB: m.DB,
}
auth.Validators = append(
auth.Validators,
&Validator{
Expand All @@ -131,10 +134,9 @@ func (m *Manager) Run(ctx context.Context) {
default:
err := m.cluster.Refresh()
if err == nil {
m.deleteOrphanCollector()
m.deleteOrphanPods()
m.runActions()
m.updateRunning()
m.updateRunning(ctx)
m.deleteZombies()
m.startReady()
m.pause()
Expand Down Expand Up @@ -847,7 +849,7 @@ func (m *Manager) preempt(list []*Task) (err error) {
}

// updateRunning tasks to reflect pod state.
func (m *Manager) updateRunning() {
func (m *Manager) updateRunning(ctx context.Context) {
var err error
defer func() {
Log.Error(err, "")
Expand Down Expand Up @@ -876,7 +878,7 @@ func (m *Manager) updateRunning() {
running := task
pod, found := running.Reflect(&m.cluster)
if found {
err = m.ensureCollector(task, pod)
err = m.logManager.EnsureCollection(task, pod, ctx)
if err != nil {
Log.Error(err, "")
continue
Expand Down Expand Up @@ -1101,41 +1103,6 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) {
return
}

// ensureCollector - ensure each container has a log collector attached.
func (m *Manager) ensureCollector(task *Task, pod *core.Pod) (err error) {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
}
key := pod.Name + "." + container.Name
if _, found := m.collector[key]; found {
continue
}
collector := &LogCollector{
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
err = collector.Begin(task)
if err != nil {
return
}
m.collector[key] = collector
}
return
}

// deleteOrphanCollector delete orphaned collectors.
func (m *Manager) deleteOrphanCollector() {
for key, collector := range m.collector {
_, found := m.cluster.Pod(collector.Pod.Name)
if !found {
delete(m.collector, key)
}
}
}

// ensureTerminated - Terminate running containers.
func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) {
for _, status := range pod.Status.ContainerStatuses {
Expand Down Expand Up @@ -2195,12 +2162,13 @@ type Preempt struct {

// LogCollector collect and report container logs.
type LogCollector struct {
nBuf int
Owner *LogManager
Registry map[string]*LogCollector
DB *gorm.DB
Pod *core.Pod
Container *core.ContainerStatus
//
nBuf int
nSkip int64
}

Expand All @@ -2210,7 +2178,7 @@ type LogCollector struct {
// - Register collector.
// - Write (copy) log.
// - Unregister collector.
func (r *LogCollector) Begin(task *Task) (err error) {
func (r *LogCollector) Begin(task *Task, ctx context.Context) (err error) {
reader, err := r.request()
if err != nil {
return
Expand All @@ -2223,13 +2191,20 @@ func (r *LogCollector) Begin(task *Task) (err error) {
defer func() {
_ = reader.Close()
_ = f.Close()
r.Owner.terminated(r)
}()
err := r.copy(reader, f)
err := r.copy(reader, f, ctx)
Log.Error(err, "")
}()
return
}

// key returns the collector key.
func (r *LogCollector) key() (key string) {
key = r.Pod.Name + "." + r.Container.Name
return
}

// request
func (r *LogCollector) request() (reader io.ReadCloser, err error) {
options := &core.PodLogOptions{
Expand Down Expand Up @@ -2319,21 +2294,52 @@ func (r *LogCollector) create(task *Task) (f *os.File, err error) {
// The read bytes are discarded when smaller than nSkip.
// The offset is adjusted when to account for the buffer
// containing bytes to be skipped and written.
func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) {
func (r *LogCollector) copy(reader io.ReadCloser, writer io.Writer, ctx context.Context) (err error) {
timer := time.NewTimer(time.Second)
canceled := false
readCh := make(chan []byte)
if r.nBuf < 1 {
r.nBuf = 0x8000
}
buf := make([]byte, r.nBuf)
go func() {
defer func() {
close(readCh)
}()
for {
buf := make([]byte, r.nBuf)
n, err := reader.Read(buf)
if err != nil {
if canceled {
return
}
if err != io.EOF {
Log.Error(err, "")
}
break
}
readCh <- buf[:n]
}
}()
for {
n, rErr := reader.Read(buf)
if rErr != nil {
if rErr != io.EOF {
err = rErr
var buf []byte
nRead := int64(-1)
select {
case <-ctx.Done():
canceled = true
_ = reader.Close()
return
case b, read := <-readCh:
if read {
nRead = int64(len(b))
buf = b
}
case <-timer.C:
nRead = 0
}
if nRead == -1 { // EOF.
break
}
nRead := int64(n)
if nRead == 0 {
if nRead == 0 { // Timeout.
continue
}
offset := int64(0)
Expand All @@ -2346,7 +2352,7 @@ func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) {
continue
}
}
b := buf[offset:nRead]
b := buf[offset:]
_, err = writer.Write(b)
if err != nil {
return
Expand All @@ -2360,3 +2366,54 @@ func (r *LogCollector) copy(reader io.Reader, writer io.Writer) (err error) {
}
return
}

// LogManager manages log collectors.
type LogManager struct {
// collector registry.
collector map[string]*LogCollector
// mutex
mutex sync.Mutex
// DB
DB *gorm.DB
}

// EnsureCollection - ensure each container has a log collector.
func (m *LogManager) EnsureCollection(task *Task, pod *core.Pod, ctx context.Context) (err error) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
}
collector := &LogCollector{
Owner: m,
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
key := collector.key()
if _, found := m.collector[key]; found {
continue
}
err = collector.Begin(task, ctx)
if err != nil {
return
}
m.collector[key] = collector
}
return
}

// terminated provides notification that a collector has terminated.
func (m *LogManager) terminated(collector *LogCollector) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.collector {
if collector == m.collector[i] {
key := collector.key()
delete(m.collector, key)
break
}
}
}
20 changes: 12 additions & 8 deletions task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package task

import (
"bytes"
"context"
"io"
"testing"

crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1"
Expand Down Expand Up @@ -140,12 +142,14 @@ func TestPriorityGraph(t *testing.T) {
func TestLogCollectorCopy(t *testing.T) {
g := gomega.NewGomegaWithT(t)

ctx := context.Background()

// no skipped bytes.
collector := LogCollector{}
content := "ABCDEFGHIJ"
reader := bytes.NewBufferString(content)
reader := io.NopCloser(bytes.NewBufferString(content))
writer := bytes.NewBufferString("")
err := collector.copy(reader, writer)
err := collector.copy(reader, writer, ctx)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

Expand All @@ -155,9 +159,9 @@ func TestLogCollectorCopy(t *testing.T) {
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
reader = io.NopCloser(bytes.NewBufferString(content))
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
err = collector.copy(reader, writer, ctx)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

Expand All @@ -168,9 +172,9 @@ func TestLogCollectorCopy(t *testing.T) {
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
reader = io.NopCloser(bytes.NewBufferString(content))
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
err = collector.copy(reader, writer, ctx)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))

Expand All @@ -181,9 +185,9 @@ func TestLogCollectorCopy(t *testing.T) {
nSkip: int64(len(existing)),
}
content = "ABCDEFGHIJ"
reader = bytes.NewBufferString(content)
reader = io.NopCloser(bytes.NewBufferString(content))
writer = bytes.NewBufferString(existing)
err = collector.copy(reader, writer)
err = collector.copy(reader, writer, ctx)
g.Expect(err).To(gomega.BeNil())
g.Expect(content).To(gomega.Equal(writer.String()))
}

0 comments on commit fa44493

Please sign in to comment.