Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor to maintain policy state in-memory #49

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75b3f7d
maintain state of a placement policy in-memory rather than use pod an…
cmaclaughlin Jan 19, 2022
67eb90e
attempt to resolve issues
cmaclaughlin Jan 21, 2022
f457f74
initial PR changes
cmaclaughlin Jan 24, 2022
6d0aea4
fix compilation error
cmaclaughlin Jan 24, 2022
cba7a0c
Update pkg/plugins/placementpolicy/core/core.go
cmaclaughlin Feb 8, 2022
8059236
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 8, 2022
138af22
Merge branch 'Azure:main' into inmemory-policy-state
cmaclaughlin Feb 8, 2022
1f17889
changes per initial PR comments
cmaclaughlin Feb 8, 2022
744c441
Merge branch 'Azure:main' into inmemory-policy-state
cmaclaughlin Feb 22, 2022
f104ce5
adding comments for clarity
cmaclaughlin Feb 22, 2022
da2c4e6
Update pkg/plugins/placementpolicy/placementpolicy.go
cmaclaughlin Feb 25, 2022
8f418f9
Update pkg/plugins/placementpolicy/placementpolicy.go
cmaclaughlin Feb 25, 2022
db96728
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
cedd819
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
3adaed7
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
cd95269
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
0d482b2
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
34b0d77
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
c2fb51e
Update pkg/plugins/placementpolicy/core/policy.go
cmaclaughlin Feb 25, 2022
5f93723
Update pkg/plugins/placementpolicy/placementpolicy.go
cmaclaughlin Feb 25, 2022
e346776
initial changes for next round of feedback
cmaclaughlin Feb 25, 2022
ef85acc
Additional changes per PR feedback
cmaclaughlin Feb 25, 2022
934623e
remove stage dependency
cmaclaughlin Feb 25, 2022
d2fc5ef
final cleanup/fixes
cmaclaughlin Feb 25, 2022
0d7a9f3
Merge branch 'Azure:main' into inmemory-policy-state
cmaclaughlin May 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
e2e-test:
name: "E2E Test"
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- name: Set up Go 1.17
uses: actions/setup-go@v2
Expand Down
159 changes: 111 additions & 48 deletions pkg/plugins/placementpolicy/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,36 @@ import (
"sort"

"github.com/Azure/placement-policy-scheduler-plugins/apis/v1alpha1"
ppclientset "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/clientset/versioned"
ppinformers "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/informers/externalversions/apis/v1alpha1"
pplisters "github.com/Azure/placement-policy-scheduler-plugins/pkg/client/listers/apis/v1alpha1"
"github.com/Azure/placement-policy-scheduler-plugins/pkg/utils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

// Manager defines the interfaces for PlacementPolicy management.
type Manager interface {
GetPlacementPolicyForPod(context.Context, *corev1.Pod) (*v1alpha1.PlacementPolicy, error)
GetPodsWithLabels(context.Context, map[string]string) ([]*corev1.Pod, error)
AnnotatePod(context.Context, *corev1.Pod, *v1alpha1.PlacementPolicy, bool) (*corev1.Pod, error)
GetPlacementPolicy(context.Context, string, string) (*v1alpha1.PlacementPolicy, error)
GetPolicyInfo(*v1alpha1.PlacementPolicy) *PolicyInfo
MatchPodToPolicy(context.Context, *corev1.Pod, *PolicyInfo) (*PolicyInfo, error)
AddPodToPolicy(context.Context, *corev1.Pod, *PolicyInfo) (*PolicyInfo, error)
RemovePodFromPolicy(*corev1.Pod) error
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
}

type PlacementPolicyManager struct {
// client is a clientset for the kube API server.
client kubernetes.Interface
// client is a placementPolicy client
ppClient ppclientset.Interface
// podLister is pod lister
podLister corelisters.PodLister
// snapshotSharedLister is pod shared list
snapshotSharedLister framework.SharedLister
// ppLister is placementPolicy lister
ppLister pplisters.PlacementPolicyLister
// available policies by namespace
policies PolicyInfos
}

func NewPlacementPolicyManager(
client kubernetes.Interface,
ppClient ppclientset.Interface,
snapshotSharedLister framework.SharedLister,
ppInformer ppinformers.PlacementPolicyInformer,
podLister corelisters.PodLister) *PlacementPolicyManager {
ppInformer ppinformers.PlacementPolicyInformer) *PlacementPolicyManager {
return &PlacementPolicyManager{
client: client,
ppClient: ppClient,
snapshotSharedLister: snapshotSharedLister,
ppLister: ppInformer.Lister(),
podLister: podLister,
ppLister: ppInformer.Lister(),
policies: NewPolicyInfos(),
}
}

Expand All @@ -73,38 +57,117 @@ func (m *PlacementPolicyManager) GetPlacementPolicyForPod(ctx context.Context, p
return ppList[0], nil
}

func (m *PlacementPolicyManager) GetPodsWithLabels(ctx context.Context, podLabels map[string]string) ([]*corev1.Pod, error) {
return m.podLister.List(labels.Set(podLabels).AsSelector())
func (m *PlacementPolicyManager) filterPlacementPolicyList(ppList []*v1alpha1.PlacementPolicy, pod *corev1.Pod) []*v1alpha1.PlacementPolicy {
var filteredPPList []*v1alpha1.PlacementPolicy
for _, pp := range ppList {
labels := pp.Spec.PodSelector.MatchLabels
if utils.HasMatchingLabels(pod.Labels, labels) {
filteredPPList = append(filteredPPList, pp)
}
}
return filteredPPList
}

func (m *PlacementPolicyManager) GetPolicyInfo(pp *v1alpha1.PlacementPolicy) *PolicyInfo {
policyNamespace := pp.Namespace
policyName := pp.Name
corePolicy := pp.Spec.Policy

namespacePolicies, namespaceExists := m.policies[policyNamespace]
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved

if namespaceExists {
policy, policyExists := namespacePolicies[policyName]

if policyExists {
return policy
}

created := newPolicyInfo(policyNamespace, policyName, corePolicy.Action, corePolicy.TargetSize)
m.policies[policyNamespace][policyName] = created
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
return created
}

m.policies[policyNamespace] = make(map[string]*PolicyInfo)
created := newPolicyInfo(policyNamespace, policyName, corePolicy.Action, corePolicy.TargetSize)
m.policies[policyNamespace][policyName] = created
return created
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
}

// AnnotatePod annotates the pod with the placement policy.
func (m *PlacementPolicyManager) AnnotatePod(ctx context.Context, pod *corev1.Pod, pp *v1alpha1.PlacementPolicy, preferredNodeWithMatchingLabels bool) (*corev1.Pod, error) {
annotations := map[string]string{}
if pod.Annotations != nil {
annotations = pod.Annotations
type PodAction int16

const (
Match PodAction = iota
Add
Remove
)

func (m *PlacementPolicyManager) MatchPodToPolicy(ctx context.Context, pod *corev1.Pod, policy *PolicyInfo) (*PolicyInfo, error) {
matchError := policy.addMatch(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if matchError != nil {
return nil, matchError
}

preference := "false"
if preferredNodeWithMatchingLabels {
preference = "true"
m.updatePolicies(policy, Match)
return policy, nil
}

func (m *PlacementPolicyManager) AddPodToPolicy(ctx context.Context, pod *corev1.Pod, policy *PolicyInfo) (*PolicyInfo, error) {
addError := policy.addPodIfNotPresent(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if addError != nil {
return nil, addError
}
annotations[v1alpha1.PlacementPolicyAnnotationKey] = pp.Name
annotations[v1alpha1.PlacementPolicyPreferenceAnnotationKey] = preference
pod.Annotations = annotations
return m.client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})

m.updatePolicies(policy, Add)
return policy, nil
}

func (m *PlacementPolicyManager) GetPlacementPolicy(ctx context.Context, namespace, name string) (*v1alpha1.PlacementPolicy, error) {
return m.ppLister.PlacementPolicies(namespace).Get(name)
func (m *PlacementPolicyManager) RemovePodFromPolicy(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if keyError != nil {
return keyError
}

podNamespace := pod.Namespace
ppList := m.policies[podNamespace]

cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if ppList != nil {
var matchingPolicy *PolicyInfo

for _, pp := range ppList {
if pp.PodQualifiesForPolicy(key) {
matchingPolicy = pp
break
}
}

if matchingPolicy != nil {
removeError := matchingPolicy.removePodIfPresent(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if removeError != nil {
return removeError
}

m.updatePolicies(matchingPolicy, Remove)
}
}
return nil
}

func (m *PlacementPolicyManager) filterPlacementPolicyList(ppList []*v1alpha1.PlacementPolicy, pod *corev1.Pod) []*v1alpha1.PlacementPolicy {
var filteredPPList []*v1alpha1.PlacementPolicy
for _, pp := range ppList {
labels := pp.Spec.PodSelector.MatchLabels
if utils.HasMatchingLabels(pod.Labels, labels) {
filteredPPList = append(filteredPPList, pp)
func (m *PlacementPolicyManager) updatePolicies(policy *PolicyInfo, act PodAction) {
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
namespace := policy.Namespace
name := policy.Name

if act == Remove {
qualifyingCount := len(policy.qualifiedPods)

if qualifyingCount == 0 {
delete(m.policies[namespace], name)
return
}

m.policies[namespace][name] = policy
return
}
return filteredPPList

existing := m.policies[namespace][name]
m.policies[namespace][name] = policy.merge(existing)
}
167 changes: 167 additions & 0 deletions pkg/plugins/placementpolicy/core/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package core

import (
"github.com/Azure/placement-policy-scheduler-plugins/apis/v1alpha1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

type PolicyInfo struct {
Namespace string
Name string
Action v1alpha1.Action
TargetSize *intstr.IntOrString
matchedPods sets.String
qualifiedPods sets.String
managedPods sets.String
targetMet bool
}

func newPolicyInfo(namespace string, name string, action v1alpha1.Action, targetSize *intstr.IntOrString) *PolicyInfo {
policy := &PolicyInfo{
Namespace: namespace,
Name: name,
Action: action,
TargetSize: targetSize,
matchedPods: sets.NewString(),
qualifiedPods: sets.NewString(),
managedPods: sets.NewString(),
targetMet: false,
}
return policy
}

type PolicyInfos map[string]map[string]*PolicyInfo
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved

func NewPolicyInfos() PolicyInfos {
return make(PolicyInfos)
}

func (p *PolicyInfo) merge(existing *PolicyInfo) *PolicyInfo {
existing.Action = p.Action
existing.TargetSize = p.TargetSize
existing.targetMet = p.targetMet

tempMatched := sets.NewString()
if len(p.matchedPods) > 0 {
pods := p.matchedPods.List()
for _, pod := range pods {
tempMatched = tempMatched.Insert(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
}
}
existing.matchedPods = tempMatched

tempQualified := sets.NewString()
if len(p.qualifiedPods) > 0 {
pods := p.qualifiedPods.List()
for _, pod := range pods {
tempQualified = tempQualified.Insert(pod)
}
}
existing.qualifiedPods = tempQualified

tempManaged := sets.NewString()
if len(p.managedPods) > 0 {
pods := p.managedPods.List()
for _, pod := range pods {
tempManaged = tempManaged.Insert(pod)
}
}
existing.managedPods = tempManaged

return existing
}

func (p *PolicyInfo) addMatch(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if keyError != nil {
return keyError
}

p.matchedPods = p.managedPods.Insert(key)
return nil
}

func (p *PolicyInfo) removePodIfPresent(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
if keyError != nil {
return keyError
}

if !p.PodQualifiesForPolicy(key) {
return nil
}

p.qualifiedPods = p.qualifiedPods.Delete(key)

if p.PodIsManagedByPolicy(key) {
p.managedPods = p.managedPods.Delete(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if Delete from the set is thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is since it is implemented via map[string]struct{} and delete(map, key) is not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like a thread safe implementation or leave it as-is?

}

err := p.setTargetMet()
return err
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *PolicyInfo) addPodIfNotPresent(pod *corev1.Pod) error {
key, keyError := framework.GetPodKey(pod)
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if keyError != nil {
return keyError
}

//if pod is already in the list, do nothing
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if p.PodQualifiesForPolicy(key) {
return nil
}

p.matchedPods = p.matchedPods.Delete(key) //once added, don't need to worry about matched anymore

p.qualifiedPods = p.qualifiedPods.Insert(key)

targetError := p.setTargetMet()
cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if targetError != nil {
return targetError
}

//if target met, pod doesn't need to be managed
if p.targetMet {
return nil
}

p.managedPods = p.managedPods.Insert(key)
err := p.setTargetMet()
return err
}

func (p *PolicyInfo) setTargetMet() error {
specTarget := p.TargetSize
lenAllPods := len(p.qualifiedPods)

target, calcError := intstr.GetScaledValueFromIntOrPercent(specTarget, lenAllPods, false)

cmaclaughlin marked this conversation as resolved.
Show resolved Hide resolved
if calcError != nil {
return calcError
}

if p.Action == v1alpha1.ActionMustNot {
target = lenAllPods - target
}

managedCount := len(p.managedPods)
p.targetMet = managedCount >= target //since the TargetSize is rounded down, the expectation that it will only meet/equal and never exceed
return nil
}

func (p *PolicyInfo) PodMatchesPolicy(podKey string) bool {
return p.matchedPods.Has(podKey)
}

func (p *PolicyInfo) PodQualifiesForPolicy(podKey string) bool {
return p.qualifiedPods.Has(podKey)
}

func (p *PolicyInfo) PodIsManagedByPolicy(podKey string) bool {
return p.managedPods.Has(podKey)
}
Loading