Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: refactor to maintain policy state in-memory #33

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
e2e-test:
name: "E2E Test"
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- name: Set up Go 1.17
uses: actions/setup-go@v2
Expand Down
140 changes: 92 additions & 48 deletions pkg/plugins/placementpolicy/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,34 @@ import (
"sort"

"github.com/Azure/placement-policy-scheduler-plugins/apis/v1alpha1"
ppclientset "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/clientset/versioned"
ppinformers "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/informers/externalversions/apis/v1alpha1"
pplisters "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/listers/apis/v1alpha1"
"github.com/Azure/placement-policy-scheduler-plugins/pkg/utils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

// Manager defines the interfaces for PlacementPolicy management.
type Manager interface {
GetPlacementPolicyForPod(context.Context, *corev1.Pod) (*v1alpha1.PlacementPolicy, error)
GetPodsWithLabels(context.Context, map[string]string) ([]*corev1.Pod, error)
AnnotatePod(context.Context, *corev1.Pod, *v1alpha1.PlacementPolicy, bool) (*corev1.Pod, error)
GetPlacementPolicy(context.Context, string, string) (*v1alpha1.PlacementPolicy, error)
RemovePodFromPolicy(*corev1.Pod) error
AddPodToPolicy(context.Context, *corev1.Pod, *v1alpha1.PlacementPolicy) (*PolicyInfo, error)
}

type PlacementPolicyManager struct {
// client is a clientset for the kube API server.
client kubernetes.Interface
// client is a placementPolicy client
ppClient ppclientset.Interface
// podLister is pod lister
podLister corelisters.PodLister
// snapshotSharedLister is pod shared list
snapshotSharedLister framework.SharedLister
// ppLister is placementPolicy lister
ppLister pplisters.PlacementPolicyLister
// available policies by namespace
policies PolicyInfos
}

func NewPlacementPolicyManager(
client kubernetes.Interface,
ppClient ppclientset.Interface,
snapshotSharedLister framework.SharedLister,
ppInformer ppinformers.PlacementPolicyInformer,
podLister corelisters.PodLister) *PlacementPolicyManager {
ppInformer ppinformers.PlacementPolicyInformer) *PlacementPolicyManager {
return &PlacementPolicyManager{
client: client,
ppClient: ppClient,
snapshotSharedLister: snapshotSharedLister,
ppLister: ppInformer.Lister(),
podLister: podLister,
ppLister: ppInformer.Lister(),
policies: NewPolicyInfos(),
}
}

Expand All @@ -73,38 +55,100 @@ func (m *PlacementPolicyManager) GetPlacementPolicyForPod(ctx context.Context, p
return ppList[0], nil
}

func (m *PlacementPolicyManager) GetPodsWithLabels(ctx context.Context, podLabels map[string]string) ([]*corev1.Pod, error) {
return m.podLister.List(labels.Set(podLabels).AsSelector())
func (m *PlacementPolicyManager) filterPlacementPolicyList(ppList []*v1alpha1.PlacementPolicy, pod *corev1.Pod) []*v1alpha1.PlacementPolicy {
var filteredPPList []*v1alpha1.PlacementPolicy
for _, pp := range ppList {
labels := pp.Spec.PodSelector.MatchLabels
if utils.HasMatchingLabels(pod.Labels, labels) {
filteredPPList = append(filteredPPList, pp)
}
}
return filteredPPList
}

// AnnotatePod annotates the pod with the placement policy.
func (m *PlacementPolicyManager) AnnotatePod(ctx context.Context, pod *corev1.Pod, pp *v1alpha1.PlacementPolicy, preferredNodeWithMatchingLabels bool) (*corev1.Pod, error) {
annotations := map[string]string{}
if pod.Annotations != nil {
annotations = pod.Annotations
func (m *PlacementPolicyManager) RemovePodFromPolicy(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
if keyError != nil {
return keyError
}

preference := "false"
if preferredNodeWithMatchingLabels {
preference = "true"
podNamespace := pod.Namespace
ppList := m.policies[podNamespace]

if ppList != nil {
var matchingPolicy *PolicyInfo

for _, pp := range ppList {
if pp.allQualifyingPods.Has(key) {
matchingPolicy = pp
break
}
}

if matchingPolicy != nil {
removeError := matchingPolicy.removePodIfPresent(pod)
if removeError != nil {
return removeError
}

m.updatePolicies(matchingPolicy)
}
}
annotations[v1alpha1.PlacementPolicyAnnotationKey] = pp.Name
annotations[v1alpha1.PlacementPolicyPreferenceAnnotationKey] = preference
pod.Annotations = annotations
return m.client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
return nil
}

func (m *PlacementPolicyManager) GetPlacementPolicy(ctx context.Context, namespace, name string) (*v1alpha1.PlacementPolicy, error) {
return m.ppLister.PlacementPolicies(namespace).Get(name)
func (m *PlacementPolicyManager) getPolicyInfoForPlacementPolicy(pp *v1alpha1.PlacementPolicy) *PolicyInfo {
ppNamespace := pp.Namespace
ppName := pp.Name

namespace, exists := m.policies[ppNamespace]

if exists {
policy, policyExists := namespace[ppName]
if policyExists {
return policy
}
}

info := newPolicyInfo(ppNamespace, ppName, pp.Spec.Policy.Action, pp.Spec.Policy.TargetSize)
return info
}

func (m *PlacementPolicyManager) filterPlacementPolicyList(ppList []*v1alpha1.PlacementPolicy, pod *corev1.Pod) []*v1alpha1.PlacementPolicy {
var filteredPPList []*v1alpha1.PlacementPolicy
for _, pp := range ppList {
labels := pp.Spec.PodSelector.MatchLabels
if utils.HasMatchingLabels(pod.Labels, labels) {
filteredPPList = append(filteredPPList, pp)
func (m *PlacementPolicyManager) AddPodToPolicy(ctx context.Context, pod *corev1.Pod, pp *v1alpha1.PlacementPolicy) (*PolicyInfo, error) {
policy := m.getPolicyInfoForPlacementPolicy(pp)

addError := policy.addPodIfNotPresent(pod)
if addError != nil {
return policy, addError
}

m.updatePolicies(policy)
return policy, nil
}

func (m *PlacementPolicyManager) updatePolicies(policy *PolicyInfo) {
namespace := policy.Namespace
name := policy.Name

namespacePolicies, namespaceExists := m.policies[namespace]

if namespaceExists {
existing, exists := namespacePolicies[name]
if exists {
qualifyingPodCount := len(policy.allQualifyingPods)
if qualifyingPodCount > 0 {
m.policies[namespace][name] = policy.merge(existing)
return
}

// to ensure the in-memory collection of policies is kept reasonably sized, if a policy has 0 associated pods, it should be removed from the collection
// since the lister for policies is always used to match to a pod, there is no opportunity for a pod to be matched with a deleted policy
// on the flip side, this also means that changes to or deletion of a policy are handled here when a pod is added or removed versus attaching an event handler to the policy informer
delete(m.policies[namespace], name)
return
}
} else {
m.policies[namespace] = make(map[string]*PolicyInfo)
}
return filteredPPList
m.policies[namespace][name] = policy
}
148 changes: 148 additions & 0 deletions pkg/plugins/placementpolicy/core/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package core

import (
"github.com/Azure/placement-policy-scheduler-plugins/apis/v1alpha1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

type PolicyInfo struct {
Namespace string
Name string
Action v1alpha1.Action
TargetSize *intstr.IntOrString
allQualifyingPods sets.String
podsManagedByPolicy sets.String
targetMet bool
}

func newPolicyInfo(namespace string, name string, action v1alpha1.Action, targetSize *intstr.IntOrString) *PolicyInfo {
policy := &PolicyInfo{
Namespace: namespace,
Name: name,
Action: action,
TargetSize: targetSize,
allQualifyingPods: sets.NewString(),
podsManagedByPolicy: sets.NewString(),
targetMet: false,
}
return policy
}

type PolicyInfos map[string]map[string]*PolicyInfo

func NewPolicyInfos() PolicyInfos {
return make(PolicyInfos)
}

func (p *PolicyInfo) merge(existing *PolicyInfo) *PolicyInfo {
existing.Action = p.Action
existing.TargetSize = p.TargetSize
existing.targetMet = p.targetMet
existing.allQualifyingPods = sets.NewString()
existing.podsManagedByPolicy = sets.NewString()

if len(p.allQualifyingPods) > 0 {
pods := p.allQualifyingPods.List()
for _, pod := range pods {
existing.allQualifyingPods.Insert(pod)
}
}

if len(p.podsManagedByPolicy) > 0 {
pods := p.podsManagedByPolicy.List()
for _, pod := range pods {
existing.podsManagedByPolicy.Insert(pod)
}
}

return existing
}

func (p *PolicyInfo) removePodIfPresent(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
if keyError != nil {
return keyError
}

if !p.PodQualifiesForPolicy(key) {
return nil
}

p.allQualifyingPods = p.allQualifyingPods.Delete(key)

if p.PodIsManagedByPolicy(key) {
p.podsManagedByPolicy = p.podsManagedByPolicy.Delete(key)
}

err := p.setTargetMet()
return err
}

func (p *PolicyInfo) addPodIfNotPresent(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
if keyError != nil {
return keyError
}

//if pod is already in the list, do nothing
if p.PodQualifiesForPolicy(key) {
return nil
}

p.allQualifyingPods = p.allQualifyingPods.Insert(key)

targetErr := p.setTargetMet()
if targetErr != nil {
return targetErr
}

//if target was met without also adding the pod to the "managed" list, then nothing else to do
if p.targetMet {
return nil
}

p.podsManagedByPolicy = p.podsManagedByPolicy.Insert(key)

err := p.setTargetMet()
return err
}

func (p *PolicyInfo) calculateTrueTargetSize() (int, error) {
specTarget := p.TargetSize
lenAllPods := len(p.allQualifyingPods)

target, err := intstr.GetScaledValueFromIntOrPercent(specTarget, lenAllPods, false)

if err != nil {
return 0, err
}

if p.Action == v1alpha1.ActionMustNot {
target = lenAllPods - target
}

return target, nil
}

func (p *PolicyInfo) setTargetMet() error {
target, calcError := p.calculateTrueTargetSize()
if calcError != nil {
return calcError
}

managedCount := len(p.podsManagedByPolicy)
p.targetMet = managedCount >= target //since the TargetSize is rounded down, the expectation that it will only meet/equal and never exceed
return nil
}

func (p *PolicyInfo) PodQualifiesForPolicy(podKey string) bool {
return p.allQualifyingPods.Has(podKey)
}

func (p *PolicyInfo) PodIsManagedByPolicy(podKey string) bool {
return p.podsManagedByPolicy.Has(podKey)
}
Loading