Skip to content

Commit

Permalink
🐛 add task manager cluster mutex. (#703)
Browse files Browse the repository at this point in the history
Fixes a potential concurrency when Manager.findRefs() using the
`Cluster` while the Manager is refreshing it.

---------

Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel authored Jul 5, 2024
1 parent 6851316 commit 24bb742
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
109 changes: 88 additions & 21 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/golang-jwt/jwt/v4"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) {
return
default:
}
pod, found := m.cluster.pods[path.Base(task.Pod)]
pod, found := m.cluster.Pod(path.Base(task.Pod))
if found {
snErr := m.podSnapshot(task, pod)
Log.Error(
Expand Down Expand Up @@ -400,14 +401,14 @@ func (m *Manager) findRefs(task *Task) (err error) {
return
}
if task.Addon != "" {
_, found := m.cluster.addons[task.Addon]
_, found := m.cluster.Addon(task.Addon)
if !found {
err = &AddonNotFound{Name: task.Addon}
return
}
}
for _, name := range task.Extensions {
_, found := m.cluster.extensions[name]
_, found := m.cluster.Extension(name)
if !found {
err = &ExtensionNotFound{Name: name}
return
Expand All @@ -416,7 +417,7 @@ func (m *Manager) findRefs(task *Task) (err error) {
if task.Kind == "" {
return
}
kind, found := m.cluster.tasks[task.Kind]
kind, found := m.cluster.Task(task.Kind)
if !found {
err = &KindNotFound{Name: task.Kind}
return
Expand Down Expand Up @@ -469,21 +470,21 @@ func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) {
func (m *Manager) selectAddon(task *Task) (addon *crd.Addon, err error) {
if task.Addon != "" {
found := false
addon, found = m.cluster.addons[task.Addon]
addon, found = m.cluster.Addon(task.Addon)
if !found {
err = &AddonNotFound{task.Addon}
}
return
}
kind, found := m.cluster.tasks[task.Kind]
kind, found := m.cluster.Task(task.Kind)
if !found {
err = &KindNotFound{task.Kind}
return
}
matched := false
var selected *crd.Addon
selector := NewSelector(m.DB, task)
for _, addon = range m.cluster.addons {
for _, addon = range m.cluster.Addons() {
if addon.Spec.Task != kind.Name {
continue
}
Expand Down Expand Up @@ -512,7 +513,7 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) {
}
matched := false
selector := NewSelector(m.DB, task)
for name, extension := range m.cluster.extensions {
for _, extension := range m.cluster.Extensions() {
if extension.Spec.Addon != addon.Name {
continue
}
Expand All @@ -521,8 +522,8 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) {
return
}
if matched {
task.Extensions = append(task.Extensions, name)
task.Event(ExtSelected, name)
task.Extensions = append(task.Extensions, extension.Name)
task.Event(ExtSelected, extension.Name)
}
}
return
Expand Down Expand Up @@ -554,7 +555,7 @@ func (m *Manager) postpone(list []*Task) (err error) {
matched: make(map[uint]uint),
},
&RuleDeps{
cluster: m.cluster,
cluster: &m.cluster,
},
}
for _, task := range list {
Expand Down Expand Up @@ -624,7 +625,7 @@ func (m *Manager) adjustPriority(list []*Task) (err error) {
if len(list) == 0 {
return
}
pE := Priority{cluster: m.cluster}
pE := Priority{cluster: &m.cluster}
escalated := pE.Escalate(list)
for _, task := range escalated {
if task.State != Pending {
Expand Down Expand Up @@ -662,7 +663,7 @@ func (m *Manager) createPod(list []*Task) (err error) {
}
ready := task
started := false
started, err = ready.Run(m.cluster)
started, err = ready.Run(&m.cluster)
if err != nil {
Log.Error(err, "")
return
Expand Down Expand Up @@ -829,7 +830,7 @@ func (m *Manager) updateRunning() {
continue
}
running := task
pod, found := running.Reflect(m.cluster)
pod, found := running.Reflect(&m.cluster)
if found {
if task.StateIn(Succeeded, Failed) {
err = m.podSnapshot(running, pod)
Expand Down Expand Up @@ -1071,7 +1072,7 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) {
}

// Run the specified task.
func (r *Task) Run(cluster Cluster) (started bool, err error) {
func (r *Task) Run(cluster *Cluster) (started bool, err error) {
mark := time.Now()
client := cluster.Client
defer func() {
Expand All @@ -1088,7 +1089,7 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
err = nil
}
}()
addon, found := cluster.addons[r.Addon]
addon, found := cluster.Addon(r.Addon)
if !found {
err = &AddonNotFound{Name: r.Addon}
return
Expand Down Expand Up @@ -1120,7 +1121,7 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
pod := r.pod(
addon,
extensions,
cluster.tackle,
cluster.Tackle(),
&secret)
err = client.Create(context.TODO(), &pod)
if err != nil {
Expand Down Expand Up @@ -1168,8 +1169,8 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
}

// Reflect finds the associated pod and updates the task state.
func (r *Task) Reflect(cluster Cluster) (pod *core.Pod, found bool) {
pod, found = cluster.pods[path.Base(r.Pod)]
func (r *Task) Reflect(cluster *Cluster) (pod *core.Pod, found bool) {
pod, found = cluster.Pod(path.Base(r.Pod))
if !found {
r.State = Ready
r.Event(PodNotFound, r.Pod)
Expand Down Expand Up @@ -1559,7 +1560,7 @@ type Event struct {

// Priority escalator.
type Priority struct {
cluster Cluster
cluster *Cluster
}

// Escalate task dependencies as needed.
Expand Down Expand Up @@ -1601,7 +1602,7 @@ func (p *Priority) Escalate(ready []*Task) (escalated []*Task) {

// graph builds a dependency graph.
func (p *Priority) graph(task *Task, ready []*Task) (deps []*Task) {
kind, found := p.cluster.tasks[task.Kind]
kind, found := p.cluster.Task(task.Kind)
if !found {
return
}
Expand Down Expand Up @@ -1638,16 +1639,22 @@ func (p *Priority) unique(in []*Task) (out []*Task) {
return
}

// Cluster provides cached cluster resources.
// Maps must NOT be accessed directly.
type Cluster struct {
k8s.Client
mutex sync.RWMutex
tackle *crd.Tackle
addons map[string]*crd.Addon
extensions map[string]*crd.Extension
tasks map[string]*crd.Task
pods map[string]*core.Pod
}

// Refresh the cache.
func (k *Cluster) Refresh() (err error) {
k.mutex.Lock()
defer k.mutex.Unlock()
if Settings.Hub.Disconnected {
k.tackle = &crd.Tackle{}
k.addons = make(map[string]*crd.Addon)
Expand Down Expand Up @@ -1679,6 +1686,66 @@ func (k *Cluster) Refresh() (err error) {
return
}

// Tackle returns the tackle resource.
func (k *Cluster) Tackle() (r *crd.Tackle) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r = k.tackle
return
}

// Addon returns an addon my name.
func (k *Cluster) Addon(name string) (r *crd.Addon, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.addons[name]
return
}

// Addons returns an addon my name.
func (k *Cluster) Addons() (list []*crd.Addon) {
k.mutex.RLock()
defer k.mutex.RUnlock()
for _, r := range k.addons {
list = append(list, r)
}
return
}

// Extension returns an extension by name.
func (k *Cluster) Extension(name string) (r *crd.Extension, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.extensions[name]
return
}

// Extensions returns an addon my name.
func (k *Cluster) Extensions() (list []*crd.Extension) {
k.mutex.RLock()
defer k.mutex.RUnlock()
for _, r := range k.extensions {
list = append(list, r)
}
return
}

// Task returns a task by name.
func (k *Cluster) Task(name string) (r *crd.Task, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.tasks[name]
return
}

// Pod returns a pod by name.
func (k *Cluster) Pod(name string) (r *core.Pod, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.pods[name]
return
}

// getTackle
func (k *Cluster) getTackle() (err error) {
options := &k8s.ListOptions{Namespace: Settings.Namespace}
Expand Down
4 changes: 2 additions & 2 deletions task/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r *RuleUnique) Match(ready, other *Task) (matched bool, reason string) {

// RuleDeps - Task kind dependencies.
type RuleDeps struct {
cluster Cluster
cluster *Cluster
}

// Match determines the match.
Expand All @@ -54,7 +54,7 @@ func (r *RuleDeps) Match(ready, other *Task) (matched bool, reason string) {
if *ready.ApplicationID != *other.ApplicationID {
return
}
def, found := r.cluster.tasks[ready.Kind]
def, found := r.cluster.Task(ready.Kind)
if !found {
return
}
Expand Down
4 changes: 2 additions & 2 deletions task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestPriorityEscalate(t *testing.T) {
ready = append(ready, task)

pE := Priority{
cluster: Cluster{
cluster: &Cluster{
tasks: kinds,
}}

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestPriorityGraph(t *testing.T) {
ready = append(ready, task)

pE := Priority{
cluster: Cluster{
cluster: &Cluster{
tasks: kinds,
}}
deps := pE.graph(ready[0], ready)
Expand Down

0 comments on commit 24bb742

Please sign in to comment.