Skip to content

Commit

Permalink
fix: the scheduler is now fair
Browse files Browse the repository at this point in the history
Generators are now ordered by rank in the priority queue.

The rank computation formula is:
- 100/(1+generated_task_count) for high priority tasks
- 10/(1+generated_task_count) for medium priority tasks
- 1/(1+generated_task_count) for low priority tasks

Note the ranks are used when comparing generators both with the same priority and with different priority.
So now we are:
- giving an opportunity to all generators with the same priority to take turns generating tasks
- giving roughly 1 low priority and 10 medium priority tasks the opportunity to run for every 100 high priority tasks running.

After a generator generates a task, the generators are reordered in the priority queue based on rank.

Signed-off-by: Andrei Aaron <[email protected]>
  • Loading branch information
andaaron committed Jan 11, 2024
1 parent 449b9ca commit 3ea716b
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 17 deletions.
21 changes: 20 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"container/heap"
"context"
"math"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -26,7 +27,7 @@ func (pq generatorsPriorityQueue) Len() int {
}

func (pq generatorsPriorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
return pq[i].getRanking() > pq[j].getRanking()
}

func (pq generatorsPriorityQueue) Swap(i, j int) {
Expand Down Expand Up @@ -331,7 +332,13 @@ func (scheduler *Scheduler) generateTasks() {

// check if the generator with highest priority is ready to run
if scheduler.generators[0].getState() == Ready {
// we are not popping it as we will generate multiple tasks until it is done
// we are going to pop after all tasks are generated
gen = scheduler.generators[0]

// trigger a generator reorder, as generating a task may impact the order
// equivalent of pop/remove followed by push, but more efficient
heap.Fix(&scheduler.generators, 0)
} else {
gen, _ = heap.Pop(&scheduler.generators).(*generator)
if gen.getState() == Waiting {
Expand Down Expand Up @@ -439,6 +446,7 @@ type generator struct {
taskGenerator TaskGenerator
remainingTask Task
index int
taskCount int64
}

func (gen *generator) generate(sch *Scheduler) {
Expand All @@ -460,6 +468,7 @@ func (gen *generator) generate(sch *Scheduler) {
if gen.taskGenerator.IsDone() {
gen.done = true
gen.lastRun = time.Now()
gen.taskCount = 0
gen.taskGenerator.Reset()

return
Expand All @@ -468,6 +477,9 @@ func (gen *generator) generate(sch *Scheduler) {
task = nextTask
}

// keep track of generated task count to use it for generator ordering
gen.taskCount++

// check if it's possible to add a new task to the channel
// if not, keep the generated task and retry to add it next time
select {
Expand Down Expand Up @@ -502,12 +514,19 @@ func (gen *generator) getState() State {
return Ready
}

func (gen *generator) getRanking() float64 {
// take into account the priority, but also how many tasks of
// a specific generator were executed in the current generator run
return math.Pow(10, float64(gen.priority)) / (1 + float64(gen.taskCount)) //nolint:gomnd
}

func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
newGenerator := &generator{
interval: interval,
done: false,
priority: priority,
taskGenerator: taskGenerator,
taskCount: 0,
remainingTask: nil,
}

Expand Down
142 changes: 126 additions & 16 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"runtime"
"strings"
"testing"
"time"

Expand All @@ -18,9 +19,10 @@ import (
)

type task struct {
log log.Logger
msg string
err bool
log log.Logger
msg string
err bool
delay time.Duration
}

var errInternal = errors.New("task: internal error")
Expand All @@ -35,7 +37,7 @@ func (t *task) DoWork(ctx context.Context) error {
return ctx.Err()
}

time.Sleep(100 * time.Millisecond)
time.Sleep(t.delay)
}

t.log.Info().Msg(t.msg)
Expand All @@ -52,15 +54,17 @@ func (t *task) Name() string {
}

type generator struct {
log log.Logger
priority string
done bool
index int
step int
log log.Logger
priority string
done bool
index int
step int
limit int
taskDelay time.Duration
}

func (g *generator) Next() (scheduler.Task, error) {
if g.step > 100 {
if g.step > g.limit {
g.done = true
}
g.step++
Expand All @@ -74,7 +78,12 @@ func (g *generator) Next() (scheduler.Task, error) {
return nil, errInternal
}

return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil
return &task{
log: g.log,
msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index),
err: false,
delay: g.taskDelay,
}, nil
}

func (g *generator) IsDone() bool {
Expand Down Expand Up @@ -154,13 +163,13 @@ func TestScheduler(t *testing.T) {
sch := scheduler.NewScheduler(cfg, metrics, logger)
sch.RateLimit = 5 * time.Second

genL := &generator{log: logger, priority: "low priority"}
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)

genM := &generator{log: logger, priority: "medium priority"}
genM := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)

genH := &generator{log: logger, priority: "high priority"}
genH := &generator{log: logger, priority: "high priority", limit: 100, taskDelay: 100 * time.Millisecond}
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)

sch.RunScheduler()
Expand All @@ -176,6 +185,107 @@ func TestScheduler(t *testing.T) {
So(string(data), ShouldNotContainSubstring, "failed to execute task")
})

Convey("Test reordering of generators in queue", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)

defer os.Remove(logFile.Name()) // clean up

logger := log.NewLogger("debug", logFile.Name())
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
sch.RateLimit = 1 * time.Nanosecond

// Testing repordering of generators using the same medium priority, as well as reordering with
// a low priority generator

genL := &generator{log: logger, priority: "low priority", limit: 110, taskDelay: time.Nanosecond}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)

genM := &generator{log: logger, priority: "medium 1 priority", limit: 110, taskDelay: time.Nanosecond}
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)

genH := &generator{log: logger, priority: "medium 2 priority", limit: 110, taskDelay: time.Nanosecond}
sch.SubmitGenerator(genH, time.Duration(0), scheduler.MediumPriority)

sch.RunScheduler()
time.Sleep(1 * time.Second)
sch.Shutdown()

data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)

// Check all tasks show up in the logs
for i := 1; i < 110; i++ {
if i%11 == 0 || i%13 == 0 {
continue
}

So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 1 priority task; index: %d", i))
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 2 priority task; index: %d", i))
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing low priority task; index: %d", i))
}

taskCounter := 0
priorityFlippedCounter := 0
samePriorityFlippedCounter := 0
lastPriority := "medium"
lastMediumGenerator := "1"

for _, line := range strings.Split(strings.TrimSuffix(string(data), "\n"), "\n") {
if !strings.Contains(line, "priority task; index: ") {
continue
}

taskCounter++

// low priority tasks start executing later
// medium priority generators are prioritized until the rank 100/9 (8 generated generated tasks)

Check failure on line 245 in pkg/scheduler/scheduler_test.go

View workflow job for this annotation

GitHub Actions / lint

Duplicate words (generated) found (dupword)
// starting with 100/10, a low priority generator could potentially be prioritized instead
// there will be at least 8 * 2 medium priority tasks executed before low priority tasks are pushed
if taskCounter < 17 {
So(line, ShouldContainSubstring, "executing medium")
}

// medium priority 2*110 medium priority tasks should have been generated,
// medium priority generators should be done
// add around 10 low priority tasks to the counter
// and an additional margin of 5 to make sure the test is stable
if taskCounter > 225 {
So(line, ShouldContainSubstring, "executing low priority")
}

if strings.Contains(line, "executing medium") {
if !strings.Contains(line, fmt.Sprintf("executing medium %s", lastMediumGenerator)) {
samePriorityFlippedCounter++
if lastMediumGenerator == "1" {
lastMediumGenerator = "2"
} else {
lastMediumGenerator = "1"
}
}
}

if !strings.Contains(line, fmt.Sprintf("executing %s", lastPriority)) {
priorityFlippedCounter++
if lastPriority == "low" {
lastPriority = "medium"
} else {
lastPriority = "low"
}
}
}

// fairness: make sure none of the medium priority generators is favored by the algorithm
So(samePriorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 60)
t.Logf("Switched between medium priority generators %d times", samePriorityFlippedCounter)
// fairness: make sure the algorithm alternates between generator priorities
So(priorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 10)
t.Logf("Switched between generator priorities %d times", priorityFlippedCounter)
})

Convey("Test task returning an error", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
Expand Down Expand Up @@ -209,7 +319,7 @@ func TestScheduler(t *testing.T) {
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)

genL := &generator{log: logger, priority: "low priority"}
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)

sch.RunScheduler()
Expand Down Expand Up @@ -272,7 +382,7 @@ func TestScheduler(t *testing.T) {
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)

genL := &generator{log: logger, priority: "medium priority"}
genL := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority)

sch.RunScheduler()
Expand Down

0 comments on commit 3ea716b

Please sign in to comment.