Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
refine chain search (#13)
Browse files Browse the repository at this point in the history
* fix bug in early stop chain

* keep searching the chains until placement within suggested nodes

* refine logging

* refactor h.Schedule() which was too long

* fix virtual cell's healthiness

* refine suggested nodes related logic

* resolve comments

* readme
  • Loading branch information
zhypku authored Apr 7, 2020
1 parent be47ddb commit fad9012
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 171 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ HiveD supports multiple job **priorities**. Higher-priority jobs can **[preempt]
4. Optimized Resource Fragmentation and Less Starvation
5. [Priorities](example/feature/README.md#Guaranteed-Job), [Overuse with Low Priority](example/feature/README.md#Opportunistic-Job), and [Inter-](example/feature/README.md#Inter-VC-Preemption)/[Intra-VC Preemption](example/feature/README.md#Intra-VC-Preemption)
6. [Job (Full/Partial) Gang Scheduling/Preemption](example/feature/README.md#Gang-Scheduling)
7. Fault-Tolerance, [Hardware Failure-Awareness](example/feature/README.md#Bad-Hardware-Awareness), [Work-Preserving Reconfiguration](example/feature/README.md#Work-Preserving-Reconfiguration)
7. Fault-Tolerance, [Bad Hardware Awareness](example/feature/README.md#Bad-Hardware-Awareness), [Work-Preserving Reconfiguration](example/feature/README.md#Work-Preserving-Reconfiguration)
8. [Leverage K8S Default Scheduler](example/feature/README.md#Leverage-K8S-Default-Scheduler)

## Prerequisite
Expand Down
1 change: 1 addition & 0 deletions pkg/algorithm/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (c *VirtualCell) SetPhysicalCell(cell *PhysicalCell) {
if cell == nil {
c.apiStatus.PhysicalCell = nil
c.apiStatus.CellHealthiness = api.CellHealthy
c.apiStatus.CellState = api.CellState(cellFree)
} else {
pcs := &api.PhysicalCellStatus{}
// shallow copy the status, clear the pointers to avoid reference
Expand Down
338 changes: 208 additions & 130 deletions pkg/algorithm/hived_algorithm.go

Large diffs are not rendered by default.

22 changes: 18 additions & 4 deletions pkg/algorithm/hived_algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ var pss = map[types.UID]api.PodSchedulingSpec{
GpuType: "DGX2-V100",
GpuNumber: 16,
AffinityGroup: group26,
}, "pod36": { // will iterate the GPU types until find a placement within suggested nodes
VirtualCluster: "VC1",
Priority: -1,
LazyPreemptionEnable: true,
ReservationId: "",
GpuType: "",
GpuNumber: 1,
AffinityGroup: group1,
},
}

Expand Down Expand Up @@ -468,6 +476,7 @@ var expectedBindInfos = map[string]result{
"pod25": {node: "0.0.0.1", gpuIsolation: []int32{0, 1}},
"pod28": {node: "0.0.3.0", gpuIsolation: []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}},
"pod34": {node: "0.0.3.0", gpuIsolation: []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}},
"pod36": {node: "0.0.1.0", gpuIsolation: []int32{0}},
}

var expectedPreemptInfos = map[string]common.Set{
Expand Down Expand Up @@ -616,17 +625,21 @@ func testDeletePods(t *testing.T, h *HivedAlgorithm) {
}

func testSuggestedNodes(t *testing.T, h *HivedAlgorithm) {
pod := allPods["pod36"]
pod.Annotations[api.AnnotationKeyPodSchedulingSpec] = common.ToYaml(pss[pod.UID])
psr := h.Schedule(pod, []string{"0.0.1.0"}, internal.PreemptingPhase)
compareSchedulingResult(t, pod, psr)

var nodes []string
for _, node := range allNodes {
if node != "0.0.3.1" {
nodes = append(nodes, node)
}
}
pod := allPods["pod27"]
pod = allPods["pod27"]
pod.Annotations[api.AnnotationKeyPodSchedulingSpec] = common.ToYaml(pss[pod.UID])
psr := h.Schedule(pod, nodes, internal.PreemptingPhase)
psr = h.Schedule(pod, nodes, internal.PreemptingPhase)
compareSchedulingResult(t, pod, psr)

nodes = append(nodes, "0.0.3.1")
pod.Annotations[api.AnnotationKeyPodSchedulingSpec] = common.ToYaml(pss[pod.UID])
// this time scheduling will succeed
Expand All @@ -647,7 +660,8 @@ func testSuggestedNodes(t *testing.T, h *HivedAlgorithm) {
// this time group will be preempting
psr = h.Schedule(pod, nodes, internal.PreemptingPhase)
if g := h.affinityGroups[pss[pod.UID].AffinityGroup.Name]; g == nil {
t.Errorf("Group %v should be preempting but does not exist", g.name)
t.Errorf("Group %v should be preempting but does not exist",
pss[pod.UID].AffinityGroup.Name)
} else if g.state != groupPreempting {
t.Errorf("Group %v should be in Preempting state but not", g.name)
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/algorithm/intra_vc_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,23 @@ func (s *defaultIntraVCScheduler) getReservedCellList() map[api.ReservationId]Ch
return s.reservedCellList
}

func (s *defaultIntraVCScheduler) schedule(sr schedulingRequest) groupVirtualPlacement {
var scheduler *topologyAwareScheduler
var str string
func (s *defaultIntraVCScheduler) schedule(sr schedulingRequest) (placement groupVirtualPlacement) {
scheduler := s.nonReservedSchedulers[sr.chain]
str := fmt.Sprintf("chain %v", sr.chain)
if sr.reservationId != "" {
scheduler = s.reservedSchedulers[sr.reservationId]
str = fmt.Sprintf("reservation %v", sr.reservationId)
} else {
scheduler = s.nonReservedSchedulers[sr.chain]
str = fmt.Sprintf("chain %v", sr.chain)
}
var placement groupVirtualPlacement
klog.Infof("Processing scheduling request in VC %v: %v, GPU numbers %v, priority %v",
sr.vc, str, common.ToJson(sr.affinityGroupPodNums), sr.priority)
if scheduler != nil {
placement = scheduler.Schedule(sr.affinityGroupPodNums, sr.priority, common.NewSet())
}
if placement == nil {
klog.Infof("Insufficient capacity in VC %v for scheduling request: %v, GPU numbers %v, priority %v",
sr.vc, str, sr.affinityGroupPodNums, sr.priority)
klog.Infof("Cannot find placement in VC %v", sr.vc)
} else {
klog.Infof("Succeeded in scheduling in VC %v for scheduling request: %v, GPU numbers %v, priority %v",
sr.vc, str, sr.affinityGroupPodNums, sr.priority)
klog.Infof("Found placement in VC %v: %v",
sr.vc, placement)
}
return placement
}
26 changes: 3 additions & 23 deletions pkg/algorithm/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ import (
func generatePodScheduleResult(
groupPhysicalPlacement groupPhysicalPlacement,
groupVirtualPlacement groupVirtualPlacement,
priority CellPriority,
preemptionVictims map[string]common.Set,
nodesNotInSuggested common.Set,
waitReason string,
cellLevelToType map[CellChain]map[CellLevel]api.CellType,
currentGpuNum int32,
currentPodIndex int32,
group *AlgoAffinityGroup,
groupName string,
suggestedNodes common.Set,
vc api.VirtualClusterName,
pod *core.Pod) internal.PodScheduleResult {

klog.V(4).Infof("[%v]: Got K8s suggested nodes: %v", internal.Key(pod), suggestedNodes)
Expand All @@ -59,25 +57,6 @@ func generatePodScheduleResult(
PodPreemptInfo: generatePodPreemptInfo(preemptionVictims, pod),
}
}
var waitReason string
if groupPhysicalPlacement == nil {
waitReason = "insufficient capacity in physical cluster"
if priority >= minGuaranteedPriority {
waitReason = fmt.Sprintf("insufficient capacity in VC %v", vc)
}
} else if !nodesNotInSuggested.IsEmpty() {
if group == nil || group.state == groupPreempting {
// for an unallocated group, we will keep it waiting if not all of its pods are scheduled to suggested nodes
waitReason = fmt.Sprintf(
"affinity group is decided to be scheduled to some nodes not within K8s suggested nodes: %v",
nodesNotInSuggested)
} else {
// for an existing group, we always insist the previous scheduling decision
// even if some pods are now not within suggested nodes
klog.Warningf("Some nodes used by affinity group %v are no longer within K8s suggested nodes: %v",
group.name, nodesNotInSuggested)
}
}
if waitReason != "" {
klog.Infof("[%v]: need to wait because %v", internal.Key(pod), waitReason)
return internal.PodScheduleResult{PodWaitInfo: &internal.PodWaitInfo{Reason: waitReason}}
Expand All @@ -100,7 +79,8 @@ func generatePodScheduleResult(

// generatePodPreemptInfo writes the preemption victims into a PodPreemptInfo.
func generatePodPreemptInfo(preemptionVictims map[string]common.Set, pod *core.Pod) *internal.PodPreemptInfo {
klog.Infof("[%v]: Preemption victim candidates: %v", internal.Key(pod), victimsToString(preemptionVictims))
klog.Infof("[%v]: Preemption victim candidates: %v",
internal.Key(pod), victimsToString(preemptionVictims))
var (
nodesHavingVictims []string
victimPods []*core.Pod
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ type CellStatus struct {
// (e.g., VC1/0/0 may represent VC1, preassigned cell 0, index 0 among its children)
CellAddress CellAddress `json:"cellAddress"`
// CellState and CellHealthiness are two orthogonal fields.
// That means, there are four possible combinations of them: a cell may be in
// (1) used and healthy, (2) used and bad, (3) free and healthy, and (4) free and bad.
// CellState represents whether the cell is being used (or acquired) by an affinity group.
// CellHealthiness represents whether the physical hardware is working normally.
CellState CellState `json:"cellState"`
CellHealthiness CellHealthiness `json:"cellHealthiness"`
CellPriority int32 `json:"cellPriority"`
Expand Down

0 comments on commit fad9012

Please sign in to comment.