Skip to content

Commit

Permalink
pod weight
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Apr 9, 2024
1 parent e9ad32a commit 91bad4e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 52 deletions.
16 changes: 8 additions & 8 deletions settings/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
EnvAppName = "APP_NAME"
EnvDisconnected = "DISCONNECTED"
EnvAnalysisReportPath = "ANALYSIS_REPORT_PATH"
EnvTaskCapacity = "TASK_CAPACITY"
EnvTaskWeightLimit = "TASK_WEIGHT_LIMIT"
)

type Hub struct {
Expand Down Expand Up @@ -64,10 +64,10 @@ type Hub struct {
}
// Task
Task struct {
SA string
Retries int
Capacity int
Reaper struct { // minutes.
SA string
Retries int
WeightLimit int
Reaper struct { // minutes.
Created int
Succeeded int
Failed int
Expand Down Expand Up @@ -162,12 +162,12 @@ func (r *Hub) Load() (err error) {
} else {
r.Task.Retries = 1
}
s, found = os.LookupEnv(EnvTaskCapacity)
s, found = os.LookupEnv(EnvTaskWeightLimit)
if found {
n, _ := strconv.Atoi(s)
r.Task.Capacity = n
r.Task.WeightLimit = n
} else {
r.Task.Capacity = 10
r.Task.WeightLimit = 50
}
s, found = os.LookupEnv(EnvFrequencyTask)
if found {
Expand Down
76 changes: 32 additions & 44 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (m *Manager) startReady() {
}
}
}
batch := 0
if len(list) > 0 {
batch = m.nextBatch()
}
sort.Slice(
list,
func(i, j int) bool {
Expand All @@ -175,27 +171,29 @@ func (m *Manager) startReady() {
return it.Priority > jt.Priority ||
it.ID < jt.ID
})
started := 0
weight := 0
weightLimit := Settings.Hub.Task.WeightLimit
for i := range list {
task := &list[i]
if task.State != Ready {
continue
}
if started >= batch {
if weight >= weightLimit {
Log.V(1).Info("Weight Limit - reached.")
break
}
ready := task
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
rt := Task{ready}
err := rt.Run(m.DB, m.Client)
w, err := rt.Run(m.DB, m.Client)
if err != nil {
ready.State = Failed
Log.Error(err, "")
} else {
Log.Info("Task started.", "id", ready.ID)
started++
weight += w
}
err = m.DB.Save(ready).Error
Log.Error(err, "")
Expand Down Expand Up @@ -475,48 +473,14 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil
return
}

// nextBatch returns the number of tasks that may be started.
func (m *Manager) nextBatch() (n int) {
n = Settings.Hub.Task.Capacity
list := []model.Task{}
db := m.DB.Order("priority DESC, id")
err := db.Find(
&list,
"state IN ?",
[]string{
Pending,
Running,
}).Error
if err != nil {
Log.Error(err, "")
n = 0
return
}
for i := range list {
task := &list[i]
switch task.State {
case Pending: // cluster saturated.
n = 0
return
case Running:
if n > 0 {
n--
} else {
return
}
}
}
return
}

// Task is an runtime task.
type Task struct {
// model.
*model.Task
}

// Run the specified task.
func (r *Task) Run(db *gorm.DB, client k8s.Client) (err error) {
func (r *Task) Run(db *gorm.DB, client k8s.Client) (weight int, err error) {
mark := time.Now()
defer func() {
if err != nil {
Expand Down Expand Up @@ -571,6 +535,7 @@ func (r *Task) Run(db *gorm.DB, client k8s.Client) (err error) {
err = liberr.Wrap(err)
return
}
weight = r.weight(&pod)
defer func() {
if err != nil {
_ = client.Delete(context.TODO(), &pod)
Expand Down Expand Up @@ -609,7 +574,7 @@ func (r *Task) Reflect(db *gorm.DB, client k8s.Client) (pod *core.Pod, err error
pod)
if err != nil {
if k8serr.IsNotFound(err) {
err = r.Run(db, client)
_, err = r.Run(db, client)
} else {
err = liberr.Wrap(err)
}
Expand Down Expand Up @@ -1132,6 +1097,29 @@ func (r *Task) attach(file *model.File) {
r.Attached, _ = json.Marshal(attached)
}

// weight estimates the pod weight.
func (r *Task) weight(pod *core.Pod) (weight int) {
weight = 1
for _, cont := range pod.Spec.Containers {
request := cont.Resources.Requests
w := int64(1)
memory := request.Memory()
if memory != nil {
n := memory.Value()
w += n / int64(100000000) // 100MB
}
cpu := request.Cpu()
if cpu != nil {
n := cpu.Value()
w = n * w
}
if int(w) > weight {
weight = int(w)
}
}
return
}

// Event represents a pod event.
type Event struct {
Type string
Expand Down

0 comments on commit 91bad4e

Please sign in to comment.