Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Withdraw site resource when vm creation is failed #300

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
74f1776
changed klog.Infof to klog.V(4).Infof
kimeunju108 May 11, 2021
207f229
updated code - removed else and added continue
kimeunju108 May 11, 2021
86cde93
Merge branch 'master' of https://github.com/CentaurusInfra/global-res…
kimeunju108 May 12, 2021
c75e438
implemented withdraw reserved resource for pod
kimeunju108 May 18, 2021
0c14cbe
implemented resource revokation when vm creation failed
kimeunju108 May 18, 2021
bba5311
implemented resource revokation when vm creation failed
kimeunju108 May 18, 2021
7773eeb
implemented resource revokation when vm creation failed
kimeunju108 May 18, 2021
3b5a1c3
applied review
kimeunju108 May 18, 2021
f1e1542
updated according to review
kimeunju108 May 18, 2021
fe2ba14
updated resource data structure
kimeunju108 May 19, 2021
ec4735b
applied review
kimeunju108 May 19, 2021
4a49283
pulled from pr 289
kimeunju108 May 25, 2021
80033e6
applied reviews
kimeunju108 May 27, 2021
b6e04d9
performed CICD test
kimeunju108 May 27, 2021
37f0e04
add unit testcase
kimeunju108 Jun 1, 2021
d05b4f9
add unit testcase
kimeunju108 Jun 1, 2021
414d752
added unit testcase
kimeunju108 Jun 1, 2021
0416df5
added unit testcase
kimeunju108 Jun 1, 2021
504fe0b
updated scheduler according to review
kimeunju108 Jun 3, 2021
baa3a96
updated scheduler according to review
kimeunju108 Jun 3, 2021
3d3664a
updated eventhandlers_test.go
kimeunju108 Jun 3, 2021
ed4c2b1
updated eventhandlers_test.go
kimeunju108 Jun 3, 2021
0dfb890
updated eventhandlers_test.go
kimeunju108 Jun 3, 2021
ea63fd4
updated eventhandlers_test.go
kimeunju108 Jun 3, 2021
1f6026d
updated eventhandlers_test.go
kimeunju108 Jun 4, 2021
c174688
updated selector
kimeunju108 Jun 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions globalscheduler/controllers/dispatcher/dispatcher_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NewProcess(config *rest.Config, namespace string, name string, quit chan st
if err != nil {
klog.Fatal(err)
}

return Process{
namespace: namespace,
name: name,
Expand All @@ -97,12 +96,11 @@ func (p *Process) Run(quit chan struct{}) {

dispatcherSelector := fields.ParseSelectorOrDie("metadata.name=" + p.name)
dispatcherLW := cache.NewListWatchFromClient(p.dispatcherClientset.GlobalschedulerV1(), "dispatchers", p.namespace, dispatcherSelector)

dispatcherInformer := cache.NewSharedIndexInformer(dispatcherLW, &dispatcherv1.Dispatcher{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

dispatcherInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
klog.Infof("The dispatcher %s process is going to be killed...", p.name)
klog.V(3).Infof("The dispatcher %s process is going to be killed...", p.name)
os.Exit(0)
},
UpdateFunc: func(old, new interface{}) {
Expand Down Expand Up @@ -234,7 +232,6 @@ func (p *Process) SendPodToCluster(pod *v1.Pod) {
klog.Warningf("The pod %v failed to update its apiserver dtatbase status to failed with the error %v", pod.ObjectMeta.Name, err)
}
}
// util.CheckTime(pod.Name, "dispatcher", "CreatePod-End", 2)
}()
}
}
Expand Down
3 changes: 3 additions & 0 deletions globalscheduler/pkg/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ go_test(
srcs = ["eventhandlers_test.go"],
embed = [":go_default_library"],
deps = [
"//globalscheduler/pkg/scheduler/client/typed:go_default_library",
"//globalscheduler/pkg/scheduler/internal/cache:go_default_library",
"//globalscheduler/pkg/scheduler/internal/cache/fake:go_default_library",
"//globalscheduler/pkg/scheduler/sitecacheinfo:go_default_library",
"//globalscheduler/pkg/scheduler/types:go_default_library",
],
)
Expand Down
167 changes: 144 additions & 23 deletions globalscheduler/pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
//"k8s.io/client-go/kubernetes"
pdgetrf marked this conversation as resolved.
Show resolved Hide resolved
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
clusterv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/cluster/v1"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/common/constants"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/types"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/utils"
"k8s.io/kubernetes/pkg/controller"
statusutil "k8s.io/kubernetes/pkg/util/pod"
)
Expand Down Expand Up @@ -113,6 +115,31 @@ func AddAllEventHandlers(sched *Scheduler) {
},
},
)
// failed pod queue
sched.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return failedToSchedule(t) && responsibleForPod(t, sched.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return failedToSchedule(pod) && responsibleForPod(pod, sched.SchedulerName)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodWithdrawResource,
UpdateFunc: sched.updatePodWithdrawResource,
DeleteFunc: sched.deletePodWithdrawResource,
},
},
)
sched.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.addCluster,
UpdateFunc: sched.updateCluster,
Expand All @@ -135,10 +162,14 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Status.AssignedScheduler.Name
}

// failedToSchedule selects pods that scheduled but failed to create vm
func failedToSchedule(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}

// addPodToCache add pod to the stack cache of the scheduler
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
klog.Infof("Add a pod: %v", pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
return
Expand All @@ -160,7 +191,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
return
}
newPod, ok := newObj.(*v1.Pod)
klog.Infof("Update a pod: %v", newPod)
klog.V(4).Infof("Update a pod: %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
Expand All @@ -178,7 +209,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
switch t := obj.(type) {
case *v1.Pod:
pod = t
klog.Infof("Delete a pod: %v", pod)
klog.V(4).Infof("Delete a pod: %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down Expand Up @@ -301,15 +332,13 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
return
}
newPod, ok := newObj.(*v1.Pod)
klog.Infof("updatePodToSchedulingQueue : %v", newPod)
klog.V(4).Infof("updatePodToSchedulingQueue : %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}

oldStack := getStackFromPod(oldPod)
newStack := getStackFromPod(newPod)

if sched.skipStackUpdate(newStack) {
return
}
Expand All @@ -323,7 +352,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
klog.Infof("deletePodToSchedulingQueue : %v", pod)
klog.V(4).Infof("deletePodToSchedulingQueue : %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down Expand Up @@ -372,14 +401,14 @@ func (sched *Scheduler) skipStackUpdate(stack *types.Stack) bool {
if !reflect.DeepEqual(assumedStackCopy, stackCopy) {
return false
}
klog.V(3).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName)
klog.V(4).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName)
return true
}

func (sched *Scheduler) bindStacks(assumedStacks []types.Stack) {
klog.Infof("assumedStacks: %v", assumedStacks)
klog.V(4).Infof("assumedStacks: %v", assumedStacks)
for _, newStack := range assumedStacks {
klog.Infof("newStack: %v", newStack)
klog.V(4).Infof("newStack: %v", newStack)
clusterName := newStack.Selected.ClusterName
sched.bindToSite(clusterName, &newStack)
}
Expand All @@ -398,15 +427,15 @@ func (sched *Scheduler) setPodScheduleErr(reqStack *types.Stack) error {
newStatus := v1.PodStatus{
Phase: v1.PodNoSchedule,
}
klog.Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus)
klog.V(4).Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus)
_, _, err = statusutil.PatchPodStatus(sched.Client, reqStack.Tenant, reqStack.PodNamespace, reqStack.PodName, pod.Status, newStatus)
if err != nil {
klog.Warningf("PatchPodStatus for pod %q: %v", reqStack.PodName+"/"+reqStack.PodNamespace+"/"+
reqStack.Tenant+"/"+reqStack.UID, err)
return err
}

klog.Infof("Update pod status from %v to %v success", pod.Status, newStatus)
klog.V(4).Infof("Update pod status from %v to %v success", pod.Status, newStatus)
return nil
}

Expand All @@ -423,36 +452,35 @@ func (sched *Scheduler) bindToSite(clusterName string, assumedStack *types.Stack
Name: clusterName,
},
}

klog.V(3).Infof("binding: %v", binding)
klog.V(4).Infof("binding: %v", binding)
// do api server update here
klog.Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
klog.V(4).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
err := sched.Client.CoreV1().PodsWithMultiTenancy(binding.Namespace, binding.Tenant).Bind(binding)
if err != nil {
klog.Errorf("Failed to bind stack: %v/%v/%v", assumedStack.Tenant, assumedStack.PodNamespace,
assumedStack.PodName)
if err := sched.SchedulerCache.ForgetStack(assumedStack); err != nil {
klog.Errorf("scheduler cache ForgetStack failed: %v", err)
}

return err
}
//
jshaofuturewei marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (sched *Scheduler) addCluster(object interface{}) {
resource := object.(*clusterv1.Cluster)
clusterCopy := resource.DeepCopy()
if sched.verifyClusterInfo(clusterCopy) == false {
klog.Infof(" Cluster data is not correct: %v", clusterCopy)
klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy)
}
key, err := controller.KeyFunc(object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object: %v, error: %v", object, err))
return
}
sched.Enqueue(key, EventType_Create)
klog.Infof("Enqueue Create cluster: %v", key)
klog.V(4).Infof("Enqueue Create cluster: %v", key)
}

func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
Expand All @@ -461,7 +489,7 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
oldClusterCopy := oldResource.DeepCopy()
newClusterCopy := newResource.DeepCopy()
if sched.verifyClusterInfo(newClusterCopy) {
klog.Infof(" Cluster data is not correct: %v", newResource)
klog.V(4).Infof(" Cluster data is not correct: %v", newResource)
}
key1, err1 := controller.KeyFunc(oldObject)
key2, err2 := controller.KeyFunc(newObject)
Expand All @@ -478,13 +506,13 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
switch eventType {
case ClusterUpdateNo:
{
klog.Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name)
klog.V(4).Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name)
break
}
case ClusterUpdateYes:
{
sched.Enqueue(key2, EventType_Update)
klog.Infof("Enqueue Update Cluster: %v", key2)
klog.V(4).Infof("Enqueue Update Cluster: %v", key2)
break
}
default:
Expand All @@ -499,7 +527,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) {
resource := object.(*clusterv1.Cluster)
clusterCopy := resource.DeepCopy()
if sched.verifyClusterInfo(clusterCopy) == false {
klog.Infof(" Cluster data is not correct: %v", clusterCopy)
klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy)
return
}
key, err := controller.KeyFunc(object)
Expand All @@ -510,7 +538,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) {
sched.Enqueue(key, EventType_Delete)
siteID := clusterCopy.Spec.Region.Region + constants.SiteDelimiter + clusterCopy.Spec.Region.AvailabilityZone
sched.deletedClusters[key] = siteID
klog.Infof("Enqueue Delete Cluster: %v", key)
klog.V(4).Infof("Enqueue Delete Cluster: %v", key)
}

// Enqueue puts key of the cluster object in the work queue
Expand All @@ -532,3 +560,96 @@ func (sched *Scheduler) verifyClusterInfo(cluster *clusterv1.Cluster) (verified
verified = true
return verified
}

func (sched *Scheduler) verifyPodInfo(pod *v1.Pod) (verified bool) {
verified = false
name := pod.Name
if pod.Name == "" {
klog.Errorf("pod name:%s is empty", name)
return verified
}
verified = true
return verified
}

func (sched *Scheduler) addPodWithdrawResource(object interface{}) {
pod, ok := object.(*v1.Pod)
klog.V(4).Infof("Add a pod to withdraw resource: %v", pod.Name)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", object)
return
}
podCopy := pod.DeepCopy()
if sched.verifyPodInfo(podCopy) == false {
klog.V(4).Infof(" Pod data is not correct: %v", podCopy)
}
err := sched.withdrawResource(pod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", pod.Name)
}
}

func (sched *Scheduler) updatePodWithdrawResource(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
klog.V(4).Infof("Update a pod: %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}
if oldPod.Name != newPod.Name {
klog.Errorf("old pod name and new pod name should be equal: %s, %s", oldPod.Name, newPod.Name)
return
}
err := sched.withdrawResource(newPod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", oldPod.Name)
}
}

func (sched *Scheduler) deletePodWithdrawResource(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
klog.V(4).Infof("Delete a pod: %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
err := sched.withdrawResource(pod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", pod.Name)
}
}

//withdraw reserved resources to a pod & add it to cache to other pods
func (sched *Scheduler) withdrawResource(podName string) error {
resource := sched.PodSiteResourceMap[podName]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map is not thread safe. When multiple events of one pod are triggered, how can we prevent synchronization issues here?

Copy link
Collaborator Author

@kimeunju108 kimeunju108 May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map is not thread safe. When multiple events of one pod are triggered, how can we prevent synchronization issues here?
This function is only related to pod creation failure. And pod creation is scheduled by schduling Q which is synchonized.

Copy link
Collaborator

@jshaofuturewei jshaofuturewei Jun 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that a scheduled job will sync resources(cpu & mem) from openstack. Before informers get failed pods from apiserver, the scheduled job has already synchronized from openstack, which means the resource has already claimed back. In that case, is it possible that the failed pod adds resources back twice?
=> No it won't happen because PodSiteResourceMap is only for 60 seconds time gap. So whenever sync resources(cpu & mem) from openstack, this PodSiteResourceMap will be empty at the moment already.

Copy link
Collaborator

@jshaofuturewei jshaofuturewei Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reply to my comments instead of editing them directly.

The assumption is based on informers take fewer than 60 seconds to trigger the events. However, if at that time the resource map is not empty but the scheduled resource synchronization job happens to fetch resource information from openstack, the resources might be wrong.

=> resource map is empty, so the conflict won't happen. If this function creates too much issue, we should think again if we remove this function. It will be better to remove this 60 seconds gap. or resource collector should collect information in advance before waiting 60 seconds. 60 secnds is resource collector's issue mainly. Resource collection has to keep collecting information to remove this gap.

Copy link
Collaborator

@jshaofuturewei jshaofuturewei Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reply to my comments instead of editing them directly.

For example, when you get failed pod event and want to withdraw resources, we assume to update resource based on resource collector does not update openstack resource information at that time. But if it does, we withdraw resource information twice.

We can happen to get a failed pod event just after resource collector updates the most recent openstack resource information, right?

Please correct me if I am wrong.
=> Regardless right or not, collecting resource information is resource collector's work. I tried to cooperate 60 seconds issue of resource collctor. But if there are so many issues, it will be better resource collector take it and solve it.

Copy link
Collaborator

@jshaofuturewei jshaofuturewei Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reply to my comments instead of editing them directly.

Resource collector used to be the only source we know the resource information (mem & cpu). If you introduce the current codes to update resources, please make them work with each other. Thanks

=> It will be better for you to check resource collector's requirement.

Copy link
Collaborator

@jshaofuturewei jshaofuturewei Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reply to my comments instead of editing them directly.

Can you tell us besides resource collector, are the pull request to reduce/withdraw resources to update the openstack resources? Thanks
=> No, this PR for "withdraw", which resolves the issue caused by resource collector issue. It will be better to reconsider if this function should be implemented by scheduler or not.

if resource == nil {
klog.V(4).Infof("there is no preserved resource for pod: %s", podName)
return nil
}
allResInfo := resource.Resource
regionName := utils.GetRegionName(resource.SiteID)
regionFlavor, err := sched.siteCacheInfoSnapshot.GetRegionFlavors(regionName)
if err != nil {
klog.Errorf("there is no valid flavor for region: %s", regionName)
return err
}
siteCacheInfo := sched.siteCacheInfoSnapshot.SiteCacheInfoMap[resource.SiteID]
siteCacheInfo.UpdateSiteResInfo(allResInfo, regionFlavor, false)
delete(sched.PodSiteResourceMap, podName)
return nil
}
Loading