diff --git a/task/manager.go b/task/manager.go index 4f6398fcc..9412572d9 100644 --- a/task/manager.go +++ b/task/manager.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/golang-jwt/jwt/v4" @@ -253,7 +254,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { return default: } - pod, found := m.cluster.pods[path.Base(task.Pod)] + pod, found := m.cluster.Pod(path.Base(task.Pod)) if found { snErr := m.podSnapshot(task, pod) Log.Error( @@ -400,14 +401,14 @@ func (m *Manager) findRefs(task *Task) (err error) { return } if task.Addon != "" { - _, found := m.cluster.addons[task.Addon] + _, found := m.cluster.Addon(task.Addon) if !found { err = &AddonNotFound{Name: task.Addon} return } } for _, name := range task.Extensions { - _, found := m.cluster.extensions[name] + _, found := m.cluster.Extension(name) if !found { err = &ExtensionNotFound{Name: name} return @@ -416,7 +417,7 @@ func (m *Manager) findRefs(task *Task) (err error) { if task.Kind == "" { return } - kind, found := m.cluster.tasks[task.Kind] + kind, found := m.cluster.Task(task.Kind) if !found { err = &KindNotFound{Name: task.Kind} return @@ -469,13 +470,13 @@ func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) { func (m *Manager) selectAddon(task *Task) (addon *crd.Addon, err error) { if task.Addon != "" { found := false - addon, found = m.cluster.addons[task.Addon] + addon, found = m.cluster.Addon(task.Addon) if !found { err = &AddonNotFound{task.Addon} } return } - kind, found := m.cluster.tasks[task.Kind] + kind, found := m.cluster.Task(task.Kind) if !found { err = &KindNotFound{task.Kind} return @@ -483,7 +484,7 @@ func (m *Manager) selectAddon(task *Task) (addon *crd.Addon, err error) { matched := false var selected *crd.Addon selector := NewSelector(m.DB, task) - for _, addon = range m.cluster.addons { + for _, addon = range m.cluster.Addons() { if addon.Spec.Task != kind.Name { continue } @@ -512,7 +513,7 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) { } matched := false selector := NewSelector(m.DB, task) - for name, extension := range m.cluster.extensions { + for _, extension := range m.cluster.Extensions() { if extension.Spec.Addon != addon.Name { continue } @@ -521,8 +522,8 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) { return } if matched { - task.Extensions = append(task.Extensions, name) - task.Event(ExtSelected, name) + task.Extensions = append(task.Extensions, extension.Name) + task.Event(ExtSelected, extension.Name) } } return @@ -554,7 +555,7 @@ func (m *Manager) postpone(list []*Task) (err error) { matched: make(map[uint]uint), }, &RuleDeps{ - cluster: m.cluster, + cluster: &m.cluster, }, } for _, task := range list { @@ -624,7 +625,7 @@ func (m *Manager) adjustPriority(list []*Task) (err error) { if len(list) == 0 { return } - pE := Priority{cluster: m.cluster} + pE := Priority{cluster: &m.cluster} escalated := pE.Escalate(list) for _, task := range escalated { if task.State != Pending { @@ -662,7 +663,7 @@ func (m *Manager) createPod(list []*Task) (err error) { } ready := task started := false - started, err = ready.Run(m.cluster) + started, err = ready.Run(&m.cluster) if err != nil { Log.Error(err, "") return @@ -829,7 +830,7 @@ func (m *Manager) updateRunning() { continue } running := task - pod, found := running.Reflect(m.cluster) + pod, found := running.Reflect(&m.cluster) if found { if task.StateIn(Succeeded, Failed) { err = m.podSnapshot(running, pod) @@ -1071,7 +1072,7 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) { } // Run the specified task. -func (r *Task) Run(cluster Cluster) (started bool, err error) { +func (r *Task) Run(cluster *Cluster) (started bool, err error) { mark := time.Now() client := cluster.Client defer func() { @@ -1088,7 +1089,7 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) { err = nil } }() - addon, found := cluster.addons[r.Addon] + addon, found := cluster.Addon(r.Addon) if !found { err = &AddonNotFound{Name: r.Addon} return @@ -1168,8 +1169,8 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) { } // Reflect finds the associated pod and updates the task state. -func (r *Task) Reflect(cluster Cluster) (pod *core.Pod, found bool) { - pod, found = cluster.pods[path.Base(r.Pod)] +func (r *Task) Reflect(cluster *Cluster) (pod *core.Pod, found bool) { + pod, found = cluster.Pod(path.Base(r.Pod)) if !found { r.State = Ready r.Event(PodNotFound, r.Pod) @@ -1559,7 +1560,7 @@ type Event struct { // Priority escalator. type Priority struct { - cluster Cluster + cluster *Cluster } // Escalate task dependencies as needed. @@ -1601,7 +1602,7 @@ func (p *Priority) Escalate(ready []*Task) (escalated []*Task) { // graph builds a dependency graph. func (p *Priority) graph(task *Task, ready []*Task) (deps []*Task) { - kind, found := p.cluster.tasks[task.Kind] + kind, found := p.cluster.Task(task.Kind) if !found { return } @@ -1638,8 +1639,11 @@ func (p *Priority) unique(in []*Task) (out []*Task) { return } +// Cluster provides cached cluster resources. +// Maps must NOT be accessed directly. type Cluster struct { k8s.Client + mutex sync.RWMutex tackle *crd.Tackle addons map[string]*crd.Addon extensions map[string]*crd.Extension @@ -1647,7 +1651,10 @@ type Cluster struct { pods map[string]*core.Pod } +// Refresh the cache. func (k *Cluster) Refresh() (err error) { + k.mutex.Lock() + defer k.mutex.Unlock() if Settings.Hub.Disconnected { k.tackle = &crd.Tackle{} k.addons = make(map[string]*crd.Addon) @@ -1679,6 +1686,66 @@ func (k *Cluster) Refresh() (err error) { return } +// Tackle returns the tackle resource. +func (k *Cluster) Tackle() (r *crd.Tackle) { + k.mutex.RLock() + defer k.mutex.RUnlock() + r = k.tackle + return +} + +// Addon returns an addon my name. +func (k *Cluster) Addon(name string) (r *crd.Addon, found bool) { + k.mutex.RLock() + defer k.mutex.RUnlock() + r, found = k.addons[name] + return +} + +// Addons returns an addon my name. +func (k *Cluster) Addons() (list []*crd.Addon) { + k.mutex.RLock() + defer k.mutex.RUnlock() + for _, r := range k.addons { + list = append(list, r) + } + return +} + +// Extension returns an extension by name. +func (k *Cluster) Extension(name string) (r *crd.Extension, found bool) { + k.mutex.RLock() + defer k.mutex.RUnlock() + r, found = k.extensions[name] + return +} + +// Extensions returns an addon my name. +func (k *Cluster) Extensions() (list []*crd.Extension) { + k.mutex.RLock() + defer k.mutex.RUnlock() + for _, r := range k.extensions { + list = append(list, r) + } + return +} + +// Task returns a task by name. +func (k *Cluster) Task(name string) (r *crd.Task, found bool) { + k.mutex.RLock() + defer k.mutex.RUnlock() + r, found = k.tasks[name] + return +} + +// Pod returns a pod by name. +func (k *Cluster) Pod(name string) (r *core.Pod, found bool) { + k.mutex.RLock() + defer k.mutex.RUnlock() + r, found = k.pods[name] + return +} + // getTackle func (k *Cluster) getTackle() (err error) { options := &k8s.ListOptions{Namespace: Settings.Namespace} diff --git a/task/rule.go b/task/rule.go index 4efe071f8..1ccd7161b 100644 --- a/task/rule.go +++ b/task/rule.go @@ -43,7 +43,7 @@ func (r *RuleUnique) Match(ready, other *Task) (matched bool, reason string) { // RuleDeps - Task kind dependencies. type RuleDeps struct { - cluster Cluster + cluster *Cluster } // Match determines the match. diff --git a/task/task_test.go b/task/task_test.go index fb0eff386..80fe29bb3 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -67,7 +67,7 @@ func TestPriorityEscalate(t *testing.T) { ready = append(ready, task) pE := Priority{ - cluster: Cluster{ + cluster: &Cluster{ tasks: kinds, }} @@ -129,7 +129,7 @@ func TestPriorityGraph(t *testing.T) { ready = append(ready, task) pE := Priority{ - cluster: Cluster{ + cluster: &Cluster{ tasks: kinds, }} deps := pE.graph(ready[0], ready)