Skip to content

Commit

Permalink
Add Task.dependencies and priority escalation.
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Apr 8, 2024
1 parent e50d7b9 commit 7bce37c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 20 deletions.
17 changes: 9 additions & 8 deletions generated/crd/tackle.konveyor.io_tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@ spec:
singular: task
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=='Ready')].status
name: READY
type: string
- jsonPath: .metadata.creationTimestamp
name: AGE
type: date
name: v1alpha1
- name: v1alpha1
schema:
openAPIV3Schema:
properties:
Expand Down Expand Up @@ -54,6 +47,14 @@ spec:
type: string
type: object
type: array
dependencies:
description: Dependencies
items:
type: string
type: array
priority:
description: Priority
type: integer
type: object
status:
description: TaskStatus defines the observed state of Task
Expand Down
18 changes: 16 additions & 2 deletions k8s/api/tackle/v1alpha1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
type TaskSpec struct {
// Addon selector.
Addon []Selector `json:"addon,omitempty"`
// Priority
Priority int `json:"priority,omitempty"`
// Dependencies
Dependencies []string `json:"dependencies,omitempty"`
}

// TaskStatus defines the observed state of Task
Expand All @@ -37,15 +41,25 @@ type TaskStatus struct {
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="READY",type=string,JSONPath=".status.conditions[?(@.type=='Ready')].status"
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp"
type Task struct {
meta.TypeMeta `json:",inline"`
meta.ObjectMeta `json:"metadata,omitempty"`
Spec TaskSpec `json:"spec,omitempty"`
Status TaskStatus `json:"status,omitempty"`
}

// HasDep return true if the task has the dependency.
func (r *Task) HasDep(name string) (found bool) {
for i := range r.Spec.Dependencies {
n := r.Spec.Dependencies[i]
if n == name {
found = true
break
}
}
return
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type TaskList struct {
meta.TypeMeta `json:",inline"`
Expand Down
5 changes: 5 additions & 0 deletions k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions settings/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
EnvAppName = "APP_NAME"
EnvDisconnected = "DISCONNECTED"
EnvAnalysisReportPath = "ANALYSIS_REPORT_PATH"
EnvTaskCapacity = "TASK_CAPACITY"
)

type Hub struct {
Expand Down Expand Up @@ -63,9 +64,10 @@ type Hub struct {
}
// Task
Task struct {
SA string
Retries int
Reaper struct { // minutes.
SA string
Retries int
Capacity int
Reaper struct { // minutes.
Created int
Succeeded int
Failed int
Expand Down Expand Up @@ -160,6 +162,13 @@ func (r *Hub) Load() (err error) {
} else {
r.Task.Retries = 1
}
s, found = os.LookupEnv(EnvTaskCapacity)
if found {
n, _ := strconv.Atoi(s)
r.Task.Capacity = n
} else {
r.Task.Capacity = 10
}
s, found = os.LookupEnv(EnvFrequencyTask)
if found {
n, _ := strconv.Atoi(s)
Expand Down
98 changes: 95 additions & 3 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ type Manager struct {
Client k8s.Client
// Addon token scopes.
Scopes []string
// Kinds of tasks.
kinds map[string]crd.Task
}

// Run the manager.
func (m *Manager) Run(ctx context.Context) {
m.kinds = make(map[string]crd.Task)
auth.Validators = append(
auth.Validators,
&Validator{
Expand Down Expand Up @@ -115,7 +118,19 @@ func (m *Manager) startReady() {
if result.Error != nil {
return
}
capacity := 0
if len(list) > 0 {
capacity = m.determineCapacity()
}
err := m.refreshKinds()
if err != nil {
Log.Error(result.Error, "")
return
}
for i := range list {
if i >= capacity {
break
}
task := &list[i]
if Settings.Disconnected {
mark := time.Now()
Expand All @@ -134,11 +149,16 @@ func (m *Manager) startReady() {
case Ready,
Postponed:
ready := task
if m.postpone(ready, list) {
postponed, caused := m.postpone(ready, list)
if postponed {
ready.State = Postponed
Log.Info("Task postponed.", "id", ready.ID)
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
for i := range caused {
sErr := m.DB.Save(caused[i]).Error
Log.Error(sErr, "")
}
continue
}
if ready.Retries == 0 {
Expand Down Expand Up @@ -209,12 +229,34 @@ func (m *Manager) updateRunning() {
}
}

// refreshKinds refresh kind map.
func (m *Manager) refreshKinds() (err error) {
m.kinds = make(map[string]crd.Task)
list := crd.TaskList{}
err = m.Client.List(
context.TODO(),
&list,
&k8s.ListOptions{Namespace: Settings.Namespace})
if err != nil {
err = liberr.Wrap(err)
return
}
for _, task := range list.Items {
m.kinds[task.Name] = task
}
return
}

// postpone Postpones a task as needed based on rules.
func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool) {
func (m *Manager) postpone(ready *model.Task, list []model.Task) (p bool, caused []*model.Task) {
ruleSet := []Rule{
&RuleIsolated{},
&RuleUnique{},
&RuleDeps{
kinds: m.kinds,
},
}

for i := range list {
other := &list[i]
if ready.ID == other.ID {
Expand All @@ -225,7 +267,9 @@ func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool
Pending:
for _, rule := range ruleSet {
if rule.Match(ready, other) {
postponed = true
m.escalate(ready, other)
caused = append(caused, other)
p = true
return
}
}
Expand All @@ -235,6 +279,20 @@ func (m *Manager) postpone(ready *model.Task, list []model.Task) (postponed bool
return
}

// escalate priority.
func (m *Manager) escalate(ready, caused *model.Task) {
if ready.Priority <= caused.Priority {
return
}
caused.Priority = ready.Priority
Log.Info(
"Priority escalated.",
"id",
caused.ID,
"match",
ready.ID)
}

// The task has been canceled.
func (m *Manager) canceled(task *model.Task) {
rt := Task{task}
Expand Down Expand Up @@ -396,6 +454,40 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil
return
}

// determineCapacity returns the number of tasks that may be started.
func (m *Manager) determineCapacity() (n int) {
n = Settings.Hub.Task.Capacity
list := []model.Task{}
db := m.DB.Order("priority DESC, id")
err := db.Find(
&list,
"state IN ?",
[]string{
Pending,
Running,
}).Error
if err != nil {
Log.Error(err, "")
n = 0
return
}
for i := range list {
task := &list[i]
switch task.State {
case Pending: // cluster saturated.
n = 0
return
case Running:
if n > 0 {
n--
} else {
return
}
}
}
return
}

// Task is an runtime task.
type Task struct {
// model.
Expand Down
35 changes: 31 additions & 4 deletions task/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package task
import (
"strings"

crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1"
"github.com/konveyor/tackle2-hub/model"
)

Expand All @@ -13,7 +14,6 @@ type Rule interface {

// RuleUnique running tasks must be unique by:
// - application
// - variant
// - addon.
type RuleUnique struct {
}
Expand All @@ -40,13 +40,40 @@ func (r *RuleUnique) Match(candidate, other *model.Task) (matched bool) {
return
}

// RuleDeps - Task kind dependencies.
type RuleDeps struct {
kinds map[string]crd.Task
}

// Match determines the match.
func (r *RuleDeps) Match(candidate, other *model.Task) (matched bool) {
if candidate.Kind == "" || other.Kind == "" {
return
}
if *candidate.ApplicationID != *other.ApplicationID {
return
}
def, found := r.kinds[candidate.Kind]
if !found {
return
}
matched = def.HasDep(other.Kind)
Log.Info(
"Rule:dep matched.",
"candidate",
candidate.ID,
"by",
other.ID)
return
}

// RuleIsolated policy.
type RuleIsolated struct {
}

// Match determines the match.
func (r *RuleIsolated) Match(candidate, other *model.Task) (matched bool) {
matched = r.hasPolicy(candidate, Isolated) || r.hasPolicy(other, Isolated)
matched = hasPolicy(candidate, Isolated) || hasPolicy(other, Isolated)
if matched {
Log.Info(
"Rule:Isolated matched.",
Expand All @@ -59,8 +86,8 @@ func (r *RuleIsolated) Match(candidate, other *model.Task) (matched bool) {
return
}

// Returns true if the task policy includes: isolated
func (r *RuleIsolated) hasPolicy(task *model.Task, name string) (matched bool) {
// Returns true if the task policy includes the specified rule.
func hasPolicy(task *model.Task, name string) (matched bool) {
for _, p := range strings.Split(task.Policy, ";") {
p = strings.TrimSpace(p)
p = strings.ToLower(p)
Expand Down

0 comments on commit 7bce37c

Please sign in to comment.