diff --git a/generated/crd/tackle.konveyor.io_tasks.yaml b/generated/crd/tackle.konveyor.io_tasks.yaml index 3ac7f79f9..06af53201 100644 --- a/generated/crd/tackle.konveyor.io_tasks.yaml +++ b/generated/crd/tackle.konveyor.io_tasks.yaml @@ -15,14 +15,7 @@ spec: singular: task scope: Namespaced versions: - - additionalPrinterColumns: - - jsonPath: .status.conditions[?(@.type=='Ready')].status - name: READY - type: string - - jsonPath: .metadata.creationTimestamp - name: AGE - type: date - name: v1alpha1 + - name: v1alpha1 schema: openAPIV3Schema: properties: @@ -54,6 +47,14 @@ spec: type: string type: object type: array + dependencies: + description: Dependencies + items: + type: string + type: array + priority: + description: Priority + type: integer type: object status: description: TaskStatus defines the observed state of Task diff --git a/k8s/api/tackle/v1alpha1/task.go b/k8s/api/tackle/v1alpha1/task.go index 025b03c63..02a21c514 100644 --- a/k8s/api/tackle/v1alpha1/task.go +++ b/k8s/api/tackle/v1alpha1/task.go @@ -24,6 +24,10 @@ import ( type TaskSpec struct { // Addon selector. Addon []Selector `json:"addon,omitempty"` + // Priority + Priority int `json:"priority,omitempty"` + // Dependencies + Dependencies []string `json:"dependencies,omitempty"` } // TaskStatus defines the observed state of Task @@ -37,8 +41,6 @@ type TaskStatus struct { // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:openapi-gen=true // +kubebuilder:subresource:status -// +kubebuilder:printcolumn:name="READY",type=string,JSONPath=".status.conditions[?(@.type=='Ready')].status" -// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp" type Task struct { meta.TypeMeta `json:",inline"` meta.ObjectMeta `json:"metadata,omitempty"` @@ -46,6 +48,18 @@ type Task struct { Status TaskStatus `json:"status,omitempty"` } +// HasDep return true if the task has the dependency. +func (r *Task) HasDep(name string) (found bool) { + for i := range r.Spec.Dependencies { + n := r.Spec.Dependencies[i] + if n == name { + found = true + break + } + } + return +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type TaskList struct { meta.TypeMeta `json:",inline"` diff --git a/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go b/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go index e31f329b5..43988637b 100644 --- a/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go +++ b/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go @@ -351,6 +351,11 @@ func (in *TaskSpec) DeepCopyInto(out *TaskSpec) { *out = make([]Selector, len(*in)) copy(*out, *in) } + if in.Dependencies != nil { + in, out := &in.Dependencies, &out.Dependencies + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskSpec. diff --git a/settings/hub.go b/settings/hub.go index 26cba3301..083a23317 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -28,6 +28,7 @@ const ( EnvAppName = "APP_NAME" EnvDisconnected = "DISCONNECTED" EnvAnalysisReportPath = "ANALYSIS_REPORT_PATH" + EnvTaskCapacity = "TASK_CAPACITY" ) type Hub struct { @@ -63,9 +64,10 @@ type Hub struct { } // Task Task struct { - SA string - Retries int - Reaper struct { // minutes. + SA string + Retries int + Capacity int + Reaper struct { // minutes. Created int Succeeded int Failed int @@ -160,6 +162,13 @@ func (r *Hub) Load() (err error) { } else { r.Task.Retries = 1 } + s, found = os.LookupEnv(EnvTaskCapacity) + if found { + n, _ := strconv.Atoi(s) + r.Task.Capacity = n + } else { + r.Task.Capacity = 10 + } s, found = os.LookupEnv(EnvFrequencyTask) if found { n, _ := strconv.Atoi(s) diff --git a/task/manager.go b/task/manager.go index 44e885d54..5b9974e4b 100644 --- a/task/manager.go +++ b/task/manager.go @@ -67,10 +67,13 @@ type Manager struct { Client k8s.Client // Addon token scopes. Scopes []string + // Kinds of tasks. + kinds map[string]crd.Task } // Run the manager. func (m *Manager) Run(ctx context.Context) { + m.kinds = make(map[string]crd.Task) auth.Validators = append( auth.Validators, &Validator{ @@ -115,7 +118,19 @@ func (m *Manager) startReady() { if result.Error != nil { return } + capacity := 0 + if len(list) > 0 { + capacity = m.determineCapacity() + } + err := m.refreshKinds() + if err != nil { + Log.Error(result.Error, "") + return + } for i := range list { + if i >= capacity { + break + } task := &list[i] if Settings.Disconnected { mark := time.Now() @@ -134,11 +149,16 @@ func (m *Manager) startReady() { case Ready, Postponed: ready := task - if m.postpone(ready, list) { + postponed, caused := m.postpone(ready, list) + if postponed { ready.State = Postponed Log.Info("Task postponed.", "id", ready.ID) sErr := m.DB.Save(ready).Error Log.Error(sErr, "") + for i := range caused { + sErr := m.DB.Save(caused[i]).Error + Log.Error(sErr, "") + } continue } if ready.Retries == 0 { @@ -209,12 +229,34 @@ func (m *Manager) updateRunning() { } } +// refreshKinds refresh kind map. +func (m *Manager) refreshKinds() (err error) { + m.kinds = make(map[string]crd.Task) + list := crd.TaskList{} + err = m.Client.List( + context.TODO(), + &list, + &k8s.ListOptions{Namespace: Settings.Namespace}) + if err != nil { + err = liberr.Wrap(err) + return + } + for _, task := range list.Items { + m.kinds[task.Name] = task + } + return +} + // postpone Postpones a task as needed based on rules. -func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool) { +func (m *Manager) postpone(ready *model.Task, list []model.Task) (p bool, caused []*model.Task) { ruleSet := []Rule{ &RuleIsolated{}, &RuleUnique{}, + &RuleDeps{ + kinds: m.kinds, + }, } + for i := range list { other := &list[i] if ready.ID == other.ID { @@ -225,7 +267,9 @@ func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool Pending: for _, rule := range ruleSet { if rule.Match(ready, other) { - postponed = true + m.escalate(ready, other) + caused = append(caused, other) + p = true return } } @@ -235,6 +279,20 @@ func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool return } +// escalate priority. +func (m *Manager) escalate(ready, caused *model.Task) { + if ready.Priority <= caused.Priority { + return + } + caused.Priority = ready.Priority + Log.Info( + "Priority escalated.", + "id", + caused.ID, + "match", + ready.ID) +} + // The task has been canceled. func (m *Manager) canceled(task *model.Task) { rt := Task{task} @@ -396,6 +454,40 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil return } +// determineCapacity returns the number of tasks that may be started. +func (m *Manager) determineCapacity() (n int) { + n = Settings.Hub.Task.Capacity + list := []model.Task{} + db := m.DB.Order("priority DESC, id") + err := db.Find( + &list, + "state IN ?", + []string{ + Pending, + Running, + }).Error + if err != nil { + Log.Error(err, "") + n = 0 + return + } + for i := range list { + task := &list[i] + switch task.State { + case Pending: // cluster saturated. + n = 0 + return + case Running: + if n > 0 { + n-- + } else { + return + } + } + } + return +} + // Task is an runtime task. type Task struct { // model. diff --git a/task/rule.go b/task/rule.go index f627d9adf..16a760f50 100644 --- a/task/rule.go +++ b/task/rule.go @@ -3,6 +3,7 @@ package task import ( "strings" + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1" "github.com/konveyor/tackle2-hub/model" ) @@ -13,7 +14,6 @@ type Rule interface { // RuleUnique running tasks must be unique by: // - application -// - variant // - addon. type RuleUnique struct { } @@ -40,13 +40,40 @@ func (r *RuleUnique) Match(candidate, other *model.Task) (matched bool) { return } +// RuleDeps - Task kind dependencies. +type RuleDeps struct { + kinds map[string]crd.Task +} + +// Match determines the match. +func (r *RuleDeps) Match(candidate, other *model.Task) (matched bool) { + if candidate.Kind == "" || other.Kind == "" { + return + } + if *candidate.ApplicationID != *other.ApplicationID { + return + } + def, found := r.kinds[candidate.Kind] + if !found { + return + } + matched = def.HasDep(other.Kind) + Log.Info( + "Rule:dep matched.", + "candidate", + candidate.ID, + "by", + other.ID) + return +} + // RuleIsolated policy. type RuleIsolated struct { } // Match determines the match. func (r *RuleIsolated) Match(candidate, other *model.Task) (matched bool) { - matched = r.hasPolicy(candidate, Isolated) || r.hasPolicy(other, Isolated) + matched = hasPolicy(candidate, Isolated) || hasPolicy(other, Isolated) if matched { Log.Info( "Rule:Isolated matched.", @@ -59,8 +86,8 @@ func (r *RuleIsolated) Match(candidate, other *model.Task) (matched bool) { return } -// Returns true if the task policy includes: isolated -func (r *RuleIsolated) hasPolicy(task *model.Task, name string) (matched bool) { +// Returns true if the task policy includes the specified rule. +func hasPolicy(task *model.Task, name string) (matched bool) { for _, p := range strings.Split(task.Policy, ";") { p = strings.TrimSpace(p) p = strings.ToLower(p)