diff --git a/task/manager.go b/task/manager.go index 9152570b..0050feeb 100644 --- a/task/manager.go +++ b/task/manager.go @@ -105,11 +105,14 @@ type Manager struct { cluster Cluster // queue of actions. queue chan func() + // collector registry. + collector map[string]*LogCollector } // Run the manager. func (m *Manager) Run(ctx context.Context) { m.queue = make(chan func(), 100) + m.collector = make(map[string]*LogCollector) m.cluster.Client = m.Client auth.Validators = append( auth.Validators, @@ -870,7 +873,13 @@ func (m *Manager) updateRunning() { running := task pod, found := running.Reflect(&m.cluster) if found { + err = m.attachCollector(task, pod) + if err != nil { + Log.Error(err, "") + continue + } if task.StateIn(Succeeded, Failed) { + m.detachCollector(task, pod) err = m.podSnapshot(running, pod) if err != nil { Log.Error(err, "") @@ -992,33 +1001,12 @@ func (m *Manager) deleteOrphanPods() { // Includes: // - pod YAML // - pod Events -// - container Logs func (m *Manager) podSnapshot(task *Task, pod *core.Pod) (err error) { - var files []*model.File - d, err := m.podYAML(pod) - if err != nil { - return - } - files = append(files, d) - logs, err := m.podLogs(pod) - if err != nil { - return - } - files = append(files, logs...) - for _, f := range files { - task.attach(f) - } - Log.V(1).Info("Task pod snapshot attached.", "id", task.ID) - return -} - -// podYAML builds pod resource description. -func (m *Manager) podYAML(pod *core.Pod) (file *model.File, err error) { events, err := m.podEvent(pod) if err != nil { return } - file = &model.File{Name: "pod.yaml"} + file := &model.File{Name: "pod.yaml"} err = m.DB.Create(file).Error if err != nil { err = liberr.Wrap(err) @@ -1042,6 +1030,7 @@ func (m *Manager) podYAML(pod *core.Pod) (file *model.File, err error) { } b, _ := yaml.Marshal(d) _, _ = f.Write(b) + task.attach(file) return } @@ -1078,62 +1067,49 @@ func (m *Manager) podEvent(pod *core.Pod) (events []Event, err error) { return } -// podLogs - get and store pod logs as a Files. -func (m *Manager) podLogs(pod *core.Pod) (files []*model.File, err error) { +// attachCollector - ensure each container has a log collector attached. +func (m *Manager) attachCollector(task *Task, pod *core.Pod) (err error) { for _, container := range pod.Status.ContainerStatuses { if container.State.Waiting != nil { continue } - f, nErr := m.containerLog(pod, container.Name) - if nErr == nil { - files = append(files, f) - } else { - err = nErr + key := fmt.Sprintf( + "%d.%s.%s", + task.ID, + pod.Name, + container.Name) + if _, found := m.collector[key]; found { + continue + } + Log.Info("Collector attached.", "key", key) + collector := &LogCollector{ + Registry: m.collector, + DB: m.DB, + Pod: pod, + Container: &container, + } + err = collector.Attach(task) + if err != nil { return } + m.collector[key] = collector } return } -// containerLog - get container log and store in file. -func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.File, err error) { - options := &core.PodLogOptions{ - Container: container, - } - clientSet, err := k8s2.NewClientSet() - if err != nil { - return - } - podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace) - req := podClient.GetLogs(pod.Name, options) - reader, err := req.Stream(context.TODO()) - if err != nil { - err = liberr.Wrap(err) - return - } - defer func() { - _ = reader.Close() - }() - file = &model.File{Name: container + ".log"} - err = m.DB.Create(file).Error - if err != nil { - err = liberr.Wrap(err) - return - } - f, err := os.Create(file.Path) - if err != nil { - err = liberr.Wrap(err) - return - } - defer func() { - _ = f.Close() - }() - _, err = io.Copy(f, reader) - if err != nil { - err = liberr.Wrap(err) - return +// detachCollector ensures log collectors have been deleted from the registry. +func (m *Manager) detachCollector(task *Task, pod *core.Pod) { + for _, container := range pod.Status.ContainerStatuses { + key := fmt.Sprintf( + "%d.%s.%s", + task.ID, + pod.Name, + container.Name) + if _, found := m.collector[key]; found { + Log.Info("Collector detached.", "key", key) + delete(m.collector, key) + } } - return } // ensureTerminated - Terminate running containers. @@ -2192,3 +2168,84 @@ type Preempt struct { task *Task by *Task } + +// LogCollector collect and report container logs. +type LogCollector struct { + Registry map[string]*LogCollector + DB *gorm.DB + Pod *core.Pod + Container *core.ContainerStatus +} + +// Attach - get container log and store in file. +// - Request logs. +// - Create file resource and attach to the task. +// - Register collector. +// - Write (copy) log. +// - Unregister collector. +func (r *LogCollector) Attach(task *Task) (err error) { + options := &core.PodLogOptions{ + Container: r.Container.Name, + Follow: true, + } + clientSet, err := k8s2.NewClientSet() + if err != nil { + return + } + podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace) + req := podClient.GetLogs(r.Pod.Name, options) + reader, err := req.Stream(context.TODO()) + if err != nil { + err = liberr.Wrap(err) + return + } + file := &model.File{Name: r.Container.Name + ".log"} + err = r.DB.Create(file).Error + if err != nil { + _ = reader.Close() + err = liberr.Wrap(err) + return + } + f, err := os.Create(file.Path) + if err != nil { + _ = reader.Close() + _ = r.DB.Delete(file) + err = liberr.Wrap(err) + return + } + task.attach(file) + go func() { + defer func() { + _ = reader.Close() + _ = f.Close() + }() + err := r.copy(reader, f) + Log.Error(err, "") + }() + return +} + +// copy data. +func (r *LogCollector) copy(reader io.Reader, f *os.File) (err error) { + buf := make([]byte, 0x8000) + for { + n, rErr := reader.Read(buf) + if n > 0 { + _, err = f.Write(buf[0:n]) + if err != nil { + return + } + err = f.Sync() + if err != nil { + return + } + } + if rErr != nil { + if rErr == io.EOF { + err = rErr + } + break + } + } + return +}