Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Dec 17, 2024
1 parent fd98244 commit 23dbcbd
Showing 1 changed file with 124 additions and 67 deletions.
191 changes: 124 additions & 67 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,14 @@ type Manager struct {
cluster Cluster
// queue of actions.
queue chan func()
// collector registry.
collector map[string]*LogCollector
}

// Run the manager.
func (m *Manager) Run(ctx context.Context) {
m.queue = make(chan func(), 100)
m.collector = make(map[string]*LogCollector)
m.cluster.Client = m.Client
auth.Validators = append(
auth.Validators,
Expand Down Expand Up @@ -870,7 +873,13 @@ func (m *Manager) updateRunning() {
running := task
pod, found := running.Reflect(&m.cluster)
if found {
err = m.attachCollector(task, pod)
if err != nil {
Log.Error(err, "")
continue
}
if task.StateIn(Succeeded, Failed) {
m.detachCollector(task, pod)
err = m.podSnapshot(running, pod)
if err != nil {
Log.Error(err, "")
Expand Down Expand Up @@ -992,33 +1001,12 @@ func (m *Manager) deleteOrphanPods() {
// Includes:
// - pod YAML
// - pod Events
// - container Logs
func (m *Manager) podSnapshot(task *Task, pod *core.Pod) (err error) {
var files []*model.File
d, err := m.podYAML(pod)
if err != nil {
return
}
files = append(files, d)
logs, err := m.podLogs(pod)
if err != nil {
return
}
files = append(files, logs...)
for _, f := range files {
task.attach(f)
}
Log.V(1).Info("Task pod snapshot attached.", "id", task.ID)
return
}

// podYAML builds pod resource description.
func (m *Manager) podYAML(pod *core.Pod) (file *model.File, err error) {
events, err := m.podEvent(pod)
if err != nil {
return
}
file = &model.File{Name: "pod.yaml"}
file := &model.File{Name: "pod.yaml"}
err = m.DB.Create(file).Error
if err != nil {
err = liberr.Wrap(err)
Expand All @@ -1042,6 +1030,7 @@ func (m *Manager) podYAML(pod *core.Pod) (file *model.File, err error) {
}
b, _ := yaml.Marshal(d)
_, _ = f.Write(b)
task.attach(file)
return
}

Expand Down Expand Up @@ -1078,62 +1067,49 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) {
return
}

// podLogs - get and store pod logs as a Files.
func (m *Manager) podLogs(pod *core.Pod) (files []*model.File, err error) {
// attachCollector - ensure each container has a log collector attached.
func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Waiting != nil {
continue
}
f, nErr := m.containerLog(pod, container.Name)
if nErr == nil {
files = append(files, f)
} else {
err = nErr
key := fmt.Sprintf(
"%d.%s.%s",
task.ID,
pod.Name,
container.Name)
if _, found := m.collector[key]; found {
continue
}
Log.Info("Collector attached.", "key", key)
collector := &LogCollector{
Registry: m.collector,
DB: m.DB,
Pod: pod,
Container: &container,
}
err = collector.Attach(task)
if err != nil {
return
}
m.collector[key] = collector
}
return
}

// containerLog - get container log and store in file.
func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.File, err error) {
options := &core.PodLogOptions{
Container: container,
}
clientSet, err := k8s2.NewClientSet()
if err != nil {
return
}
podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace)
req := podClient.GetLogs(pod.Name, options)
reader, err := req.Stream(context.TODO())
if err != nil {
err = liberr.Wrap(err)
return
}
defer func() {
_ = reader.Close()
}()
file = &model.File{Name: container + ".log"}
err = m.DB.Create(file).Error
if err != nil {
err = liberr.Wrap(err)
return
}
f, err := os.Create(file.Path)
if err != nil {
err = liberr.Wrap(err)
return
}
defer func() {
_ = f.Close()
}()
_, err = io.Copy(f, reader)
if err != nil {
err = liberr.Wrap(err)
return
// detachCollector ensures log collectors have been deleted from the registry.
func (m *Manager) detachCollector(task *Task, pod *core.Pod) {
for _, container := range pod.Status.ContainerStatuses {
key := fmt.Sprintf(
"%d.%s.%s",
task.ID,
pod.Name,
container.Name)
if _, found := m.collector[key]; found {
Log.Info("Collector detached.", "key", key)
delete(m.collector, key)
}
}
return
}

// ensureTerminated - Terminate running containers.
Expand Down Expand Up @@ -2192,3 +2168,84 @@ type Preempt struct {
task *Task
by *Task
}

// LogCollector collect and report container logs.
type LogCollector struct {
Registry map[string]*LogCollector
DB *gorm.DB
Pod *core.Pod
Container *core.ContainerStatus
}

// Attach - get container log and store in file.
// - Request logs.
// - Create file resource and attach to the task.
// - Register collector.
// - Write (copy) log.
// - Unregister collector.
func (r *LogCollector) Attach(task *Task) (err error) {
options := &core.PodLogOptions{
Container: r.Container.Name,
Follow: true,
}
clientSet, err := k8s2.NewClientSet()
if err != nil {
return
}
podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace)
req := podClient.GetLogs(r.Pod.Name, options)
reader, err := req.Stream(context.TODO())
if err != nil {
err = liberr.Wrap(err)
return
}
file := &model.File{Name: r.Container.Name + ".log"}
err = r.DB.Create(file).Error
if err != nil {
_ = reader.Close()
err = liberr.Wrap(err)
return
}
f, err := os.Create(file.Path)
if err != nil {
_ = reader.Close()
_ = r.DB.Delete(file)
err = liberr.Wrap(err)
return
}
task.attach(file)
go func() {
defer func() {
_ = reader.Close()
_ = f.Close()
}()
err := r.copy(reader, f)
Log.Error(err, "")
}()
return
}

// copy data.
func (r *LogCollector) copy(reader io.Reader, f *os.File) (err error) {
buf := make([]byte, 0x8000)
for {
n, rErr := reader.Read(buf)
if n > 0 {
_, err = f.Write(buf[0:n])
if err != nil {
return
}
err = f.Sync()
if err != nil {
return
}
}
if rErr != nil {
if rErr == io.EOF {
err = rErr
}
break
}
}
return
}

0 comments on commit 23dbcbd

Please sign in to comment.