Skip to content

Commit

Permalink
remove TopologicalSort and added sorting when BFS
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <[email protected]>
  • Loading branch information
miklezzzz committed Dec 11, 2024
1 parent 71208fc commit 77ca666
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 49 deletions.
14 changes: 0 additions & 14 deletions pkg/module_manager/scheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,6 @@ type Node struct {
module ModuleInterface
}

type NodeWeightRange []NodeWeight

func (r NodeWeightRange) Len() int {
return len(r)
}

func (r NodeWeightRange) Less(i, j int) bool {
return r[i] < r[j]
}

func (r NodeWeightRange) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

const (
ModuleType NodeType = "module"
WeightType NodeType = "weight"
Expand Down
72 changes: 45 additions & 27 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,16 @@ func (s *Scheduler) AddExtender(ext extenders.Extender) error {
return nil
}

// getModuleNodes traverses the graph in BFS-way and returns all connected Module-type vertices
func (s *Scheduler) getModuleNodes() ([]*node.Node, error) {
// getModuleVertices traverses the graph in BFS-way and returns all connected Module-type vertices
func (s *Scheduler) getModuleVertices() ([]*node.Node, error) {
if s.root == nil {
return nil, fmt.Errorf("graph is empty")
}

var nodes []*node.Node
var bfsErr error
var (
nodes []*node.Node
bfsErr error
)

err := s.customBFS(s.root.GetName(), func(name string, _ int) bool {
vertex, props, err := s.dag.VertexWithProperties(name)
Expand All @@ -422,7 +424,7 @@ func (s *Scheduler) getModuleNodes() ([]*node.Node, error) {
return nodes, err
}

// customBFS implements BFS but allows visiting vertices multiple times
// customBFS reimplements BFS but allows visiting vertices multiple times and vertices at the same level are traversed in a sorted way
func (s *Scheduler) customBFS(root string, visit func(node string, depth int) bool) error {
adjacencyMap, err := s.dag.AdjacencyMap()
if err != nil {
Expand Down Expand Up @@ -450,10 +452,13 @@ func (s *Scheduler) customBFS(root string, visit func(node string, depth int) bo
break
}

sliceOfAdjacencies := make([]string, 0)
for adjacency := range adjacencyMap[currentHash] {
nextLvl += 1

Check failure on line 457 in pkg/module_manager/scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / Run Go linters

increment-decrement: should replace nextLvl += 1 with nextLvl++ (revive)
queue = append(queue, adjacency)
sliceOfAdjacencies = append(sliceOfAdjacencies, adjacency)
}
slices.Sort(sliceOfAdjacencies)
queue = append(queue, sliceOfAdjacencies...)

if currLvl == 0 {
currLvl = nextLvl
Expand All @@ -466,33 +471,46 @@ func (s *Scheduler) customBFS(root string, visit func(node string, depth int) bo
return nil
}

// PrintSummary returns resulting consisting of all module-type vertices, their states and last applied extenders
func (s *Scheduler) PrintSummary() (map[string]bool, error) {
// printSummary returns summary, consisting of all module-type vertices, their states and last applied extenders
// intended for testing purposes only
func (s *Scheduler) printSummary() (map[string]bool, error) {
result := make(map[string]bool, 0)

s.l.Lock()
names, err := graph.StableTopologicalSort(s.dag, moduleSortFunc)
vertices, err := s.getModuleVertices()
if err != nil {
return result, err
return nil, err
}

for _, name := range names {
vertex, err := s.dag.Vertex(name)
if err != nil {
return nil, err
}
for _, vertex := range vertices {
if vertex.GetType() == node.ModuleType {
result[fmt.Sprintf("%s/%s", vertex.GetName(), vertex.GetUpdatedBy())] = vertex.GetState()
}
}

for extenderName, parents := range s.topologicalHints {
for _, children := range parents {
for _, child := range children {
found := false
for k := range result {
if strings.HasPrefix(k, child) {
found = true
break
}
}

if !found {
result[fmt.Sprintf("%s/%s", child, extenderName)] = false
}
}
}
}

s.l.Unlock()

return result, nil
}

func moduleSortFunc(m1, m2 string) bool {
return m1 < m2
}

func (s *Scheduler) GetUpdatedByExtender(moduleName string) (string, error) {
s.l.Lock()
defer s.l.Unlock()
Expand Down Expand Up @@ -665,20 +683,20 @@ func (s *Scheduler) recalculateGraphState(logLabels map[string]string) ( /* Grap
errList := make([]string, 0)
logEntry := utils.EnrichLoggerWithLabels(s.logger, logLabels)

nodes, err := s.getModuleNodes()
vertices, err := s.getModuleVertices()
if err != nil {
errList = append(errList, fmt.Sprintf("couldn't get module nodes: %s", err.Error()))
errList = append(errList, fmt.Sprintf("couldn't get module vertices: %s", err.Error()))
s.errList = errList
return true, nil
}

// create a buffer to store all updates during upcoming run, the updates are applied if there is no errors during the run
s.vertexStateBuffer.state = make(map[string]*vertexState, len(nodes))
s.vertexStateBuffer.enabledModules = make([]string, 0, len(nodes))
s.vertexStateBuffer.state = make(map[string]*vertexState, len(vertices))
s.vertexStateBuffer.enabledModules = make([]string, 0, len(vertices))

outerCycle:
for _, node := range nodes {
moduleName := node.GetName()
for _, vertex := range vertices {
moduleName := vertex.GetName()
s.vertexStateBuffer.state[moduleName] = &vertexState{}

for _, e := range s.extenders {
Expand Down Expand Up @@ -716,13 +734,13 @@ outerCycle:
}
}

if s.vertexStateBuffer.state[moduleName].enabled != node.GetState() {
if s.vertexStateBuffer.state[moduleName].enabled != vertex.GetState() {
diff[moduleName] = s.vertexStateBuffer.state[moduleName].enabled
} else {
delete(diff, moduleName)
}

if s.vertexStateBuffer.state[moduleName].updatedBy != node.GetUpdatedBy() {
if s.vertexStateBuffer.state[moduleName].updatedBy != vertex.GetUpdatedBy() {
updByDiff[moduleName] = struct{}{}
} else {
delete(updByDiff, moduleName)
Expand Down
16 changes: 8 additions & 8 deletions pkg/module_manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ kubeDnsEnabled: false
"kube-dns",
}

summary, err := s.PrintSummary()
summary, err := s.printSummary()
assert.NoError(t, err)

assert.Equal(t, expectedSummary, summary)
Expand All @@ -663,7 +663,7 @@ kubeDnsEnabled: false
_, _, diff, err = s.GetGraphState(logLabels)
assert.NoError(t, err)

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)

assert.Equal(t, expectedSummary, summary)
Expand Down Expand Up @@ -697,7 +697,7 @@ kubeDnsEnabled: false
_, _, diff, err = s.GetGraphState(logLabels)
assert.NoError(t, err)

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)

assert.Equal(t, expectedSummary, summary)
Expand Down Expand Up @@ -896,7 +896,7 @@ l2LoadBalancerEnabled: false
"node-local-dns",
}

summary, err := s.PrintSummary()
summary, err := s.printSummary()
assert.NoError(t, err)

assert.Equal(t, expected, summary)
Expand Down Expand Up @@ -952,7 +952,7 @@ l2LoadBalancerEnabled: false
"openstack-cloud-provider",
}

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)
assert.Equal(t, expected, summary)
assert.Equal(t, expectedDiff, diff)
Expand Down Expand Up @@ -1018,7 +1018,7 @@ l2LoadBalancerEnabled: false
"prometheus-crd",
}

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)
assert.Equal(t, expected, summary)
assert.Equal(t, expectedDiff, diff)
Expand Down Expand Up @@ -1071,7 +1071,7 @@ l2LoadBalancerEnabled: false
"monitoring-applications",
}

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)
assert.Equal(t, expected, summary)
assert.Equal(t, expectedDiff, diff)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ l2LoadBalancerEnabled: false

expectedVerticesToUpdate = []string{}

summary, err = s.PrintSummary()
summary, err = s.printSummary()
assert.NoError(t, err)

assert.Equal(t, expected, summary)
Expand Down

0 comments on commit 77ca666

Please sign in to comment.