Skip to content

Commit

Permalink
Update controller name generation logic for data distribution efficie…
Browse files Browse the repository at this point in the history
…ncy (CentaurusInfra#581)

* Update controller name generation logic for data distribution efficiency

* Fix integration test
  • Loading branch information
Sindica authored Aug 14, 2020
1 parent 22d7f2d commit 42dba08
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 123 deletions.
38 changes: 26 additions & 12 deletions pkg/cloudfabric-controller/controllerframework/cim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"testing"
)

func newControllerInstance(controllerType string, controllerKey int64, workloadNum int32, isLocked bool) *v1.ControllerInstance {
func newControllerInstance(cim *ControllerInstanceManager, controllerType string, controllerKey int64, workloadNum int32) *v1.ControllerInstance {
controllerInstance := &v1.ControllerInstance{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "100",
Expand All @@ -33,15 +33,19 @@ func newControllerInstance(controllerType string, controllerKey int64, workloadN
WorkloadNum: workloadNum,
}

controllerInstance.Name = generateControllerName(nil)
GetInstanceHandler = func() *ControllerInstanceManager {
return cim
}
controllerInstance.Name = generateControllerName(controllerType, nil)
GetInstanceHandler = getControllerInstanceManager

return controllerInstance
}

func testAddEvent(t *testing.T, cim *ControllerInstanceManager, notifyTimes int) (*v1.ControllerInstance, string, map[string]v1.ControllerInstance) {
// add event
controllerType := "foo"
controllerInstance1 := newControllerInstance(controllerType, 10000, 999, false)
controllerInstance1 := newControllerInstance(cim, controllerType, 10000, 999)
cim.addControllerInstance(controllerInstance1)

controllerInstanceMap, err := cim.ListControllerInstances(controllerType)
Expand Down Expand Up @@ -153,7 +157,7 @@ func TestDeleteControllerInstanceDoesNotExist(t *testing.T) {
cim, _ := CreateTestControllerInstanceManager(stopCh)
notifyTimes = 0

controllerInstance1 := newControllerInstance("bar", 10000, 999, false)
controllerInstance1 := newControllerInstance(cim, "bar", 10000, 999)
cim.deleteControllerInstance(controllerInstance1)

controllerInstanceMap, err := cim.ListControllerInstances(controllerInstance1.ControllerType)
Expand All @@ -166,16 +170,20 @@ func TestAddMultipleControllerInstancesForSameControllerType(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

cim, _ := CreateTestControllerInstanceManager(stopCh)
cim1, _ := CreateTestControllerInstanceManager(stopCh)
cim2, _ := CreateTestControllerInstanceManager(stopCh)
notifyTimes = 0

// add event
controllerInstance1, controllerType1, _ := testAddEvent(t, cim, 1)
controllerInstance2, controllerType2, _ := testAddEvent(t, cim, 2)
controllerInstance1, controllerType1, _ := testAddEvent(t, cim1, 1)
controllerInstance2, controllerType2, _ := testAddEvent(t, cim2, 2)
assert.Equal(t, controllerType1, controllerType2)
assert.NotEqual(t, controllerInstance1.Name, controllerInstance2.Name)

controllerInstanceMap, err := cim.ListControllerInstances(controllerType1)
// cim 1 got controller 2 creation event
cim1.addControllerInstance(controllerInstance2)

controllerInstanceMap, err := cim1.ListControllerInstances(controllerType1)
assert.Nil(t, err)
assert.NotNil(t, controllerInstanceMap)
controllerInstanceRead1, isOK1 := controllerInstanceMap[controllerInstance1.Name]
Expand Down Expand Up @@ -227,21 +235,27 @@ func TestErrorHandlingInListControllerInstances(t *testing.T) {
cim, _ := CreateTestControllerInstanceManager(stopCh)
notifyTimes = 0

_, controllerType, _ := testAddEvent(t, cim, 1)
controllerInstance1, controllerType, _ := testAddEvent(t, cim, 1)
testAddEvent(t, cim, 2)

controllerInstance3 := newControllerInstance("foo2", 10000, 999, false)
cim.addControllerInstance(controllerInstance3)
controllerInstance2 := newControllerInstance(cim, "foo2", 10000, 999)
cim.addControllerInstance(controllerInstance2)

cim.isControllerListInitialized = false

controllerInstanceMap1, err := cim.ListControllerInstances(controllerType)
assert.Nil(t, err)
assert.NotNil(t, controllerInstanceMap1)
assert.Equal(t, 2, len(controllerInstanceMap1))
assert.Equal(t, 1, len(controllerInstanceMap1))
instanceRead, isOK := controllerInstanceMap1[controllerInstance1.Name]
assert.True(t, isOK)
assert.Equal(t, controllerInstance1.ControllerKey, instanceRead.ControllerKey)

controllerInstanceMap2, err := cim.ListControllerInstances("foo2")
assert.Nil(t, err)
assert.NotNil(t, controllerInstanceMap2)
assert.Equal(t, 1, len(controllerInstanceMap2))
instanceRead, isOK = controllerInstanceMap2[controllerInstance2.Name]
assert.True(t, isOK)
assert.Equal(t, controllerInstance2.ControllerKey, instanceRead.ControllerKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package controllerframework

import (
"fmt"
"strings"
"sync"

"github.com/grafov/bcast"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -32,10 +32,6 @@ import (
"k8s.io/kubernetes/pkg/util/metrics"
)

const (
controllerInstanceNamePrefix string = "ci"
)

var ResetFilterHandler = resetFilter

type controllerInstanceLocal struct {
Expand Down Expand Up @@ -92,7 +88,7 @@ func NewControllerBase(controllerType string, client clientset.Interface, cimUpd
controller := &ControllerBase{
client: client,
controllerType: controllerType,
controllerName: generateControllerName(controllerInstanceMap),
controllerName: generateControllerName(controllerType, controllerInstanceMap),
controllerInstanceMap: controllerInstanceMap,
curPos: -1,
controllerInstanceUpdateCh: cimUpdateCh,
Expand Down Expand Up @@ -441,26 +437,27 @@ func (c *ControllerBase) ReportHealth(client clientset.Interface) {
}
}

func generateControllerName(existedInstanceMap map[string]v1.ControllerInstance) string {
for {
uid := uuid.NewUUID()
name := fmt.Sprintf("%s-%v", controllerInstanceNamePrefix, uid)
_, isExist := existedInstanceMap[name]
if !isExist {
return name
}
func generateControllerName(controllerType string, existedInstanceMap map[string]v1.ControllerInstance) string {
cimInstanceId := GetInstanceId()
if cimInstanceId == "" {
klog.Fatalf("Controller Instance Manager not available.")
}

// Error
klog.Infof("Controller name %s conflict. Get a new one ", name)
name := fmt.Sprintf("%s-%v", strings.ToLower(controllerType), cimInstanceId)
_, isExist := existedInstanceMap[name]
if isExist {
klog.Fatalf("Controller instance name %s conflict. Need to restart process to get a new one ", name)
}

return name
}

// Get controller instances by controller type
// Return sorted controller instance list & error if any
func listControllerInstancesByType(controllerType string) (map[string]v1.ControllerInstance, error) {
controllerInstanceMap := make(map[string]v1.ControllerInstance)

cim := GetControllerInstanceManager()
cim := GetInstanceHandler()
if cim == nil {
klog.Fatalf("Unexpected reference to uninitialized controller instance manager")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func convertControllerBaseToControllerInstance(controllerBase *ControllerBase) *

func TestGetControllerInstanceManager(t *testing.T) {
instance = nil
cim := GetControllerInstanceManager()
cim := GetInstanceHandler()
assert.Nil(t, cim)

client := fake.NewSimpleClientset()
Expand Down Expand Up @@ -103,6 +103,67 @@ func TestCreateControllerInstanceBase(t *testing.T) {
assert.Equal(t, int64(math.MaxInt64), controllerInstanceBase2.sortedControllerInstancesLocal[0].controllerKey)
}

func TestDeleteControllerInstance(t *testing.T) {
client := fake.NewSimpleClientset()
stopCh := make(chan struct{})
defer close(stopCh)

controllerType := "foo"
controllerInstanceBase, cim1 := createControllerInstanceBaseAndCIM(t, client, nil, controllerType, stopCh)
controllerInstance1 := convertControllerBaseToControllerInstance(controllerInstanceBase)

// 1st controller instance for a type needs to cover all workload
assert.Equal(t, 0, controllerInstanceBase.curPos)
assert.Equal(t, 1, len(controllerInstanceBase.sortedControllerInstancesLocal))
assert.Equal(t, int64(0), controllerInstanceBase.sortedControllerInstancesLocal[0].lowerboundKey)
assert.Equal(t, int64(math.MaxInt64), controllerInstanceBase.sortedControllerInstancesLocal[0].controllerKey)

// 2nd controller instance will split workload space with 1st one
stopCh2 := make(chan struct{})
controllerInstanceBase2, cim2 := createControllerInstanceBaseAndCIM(t, client, nil, controllerType, stopCh2)
controllerInstance2 := convertControllerBaseToControllerInstance(controllerInstanceBase2)

// notify controller creation events
cim1.addControllerInstance(controllerInstance2)
cim2.addControllerInstance(controllerInstance1)

controllerInstances, err := listControllerInstancesByType(controllerType)
assert.Nil(t, err)
assert.NotNil(t, controllerInstances)
controllerInstanceBase.updateCachedControllerInstances(controllerInstances)

expectedPos := getPosFromControllerInstances(controllerInstance1, controllerInstance1, controllerInstance2)
assert.Equal(t, expectedPos, controllerInstanceBase.curPos)

hashKey1 := int64(4611686018427387904) // mid point
assert.Equal(t, 2, len(controllerInstanceBase.sortedControllerInstancesLocal))
assert.Equal(t, int64(0), controllerInstanceBase.sortedControllerInstancesLocal[0].lowerboundKey)
assert.Equal(t, hashKey1, controllerInstanceBase.sortedControllerInstancesLocal[0].controllerKey)
assert.Equal(t, hashKey1, controllerInstanceBase.sortedControllerInstancesLocal[1].lowerboundKey)
assert.Equal(t, int64(math.MaxInt64), controllerInstanceBase.sortedControllerInstancesLocal[1].controllerKey)

// controller that takes the second half workload died, the left controller needs to take all workload
instanceNameToDel := controllerInstanceBase.sortedControllerInstancesLocal[1].instanceName
instanceBaseToCheck := controllerInstanceBase
if instanceNameToDel == controllerInstance1.Name {
cim1.deleteControllerInstance(controllerInstance1)
cim2.deleteControllerInstance(controllerInstance1)
instanceBaseToCheck = controllerInstanceBase2
} else {
cim1.deleteControllerInstance(controllerInstance2)
cim2.deleteControllerInstance(controllerInstance2)
}

controllerInstances, err = listControllerInstancesByType(controllerType)
assert.Nil(t, err)
assert.NotNil(t, controllerInstances)
instanceBaseToCheck.updateCachedControllerInstances(controllerInstances)
assert.Equal(t, 0, instanceBaseToCheck.curPos)
assert.Equal(t, 1, len(instanceBaseToCheck.sortedControllerInstancesLocal))
assert.Equal(t, int64(0), instanceBaseToCheck.sortedControllerInstancesLocal[0].lowerboundKey)
assert.Equal(t, int64(math.MaxInt64), instanceBaseToCheck.sortedControllerInstancesLocal[0].controllerKey)
}

func TestCreateControllerInstanceBaseInRaceCondition_2(t *testing.T) {
controllerType := "foo"

Expand Down Expand Up @@ -170,14 +231,19 @@ func TestConsolidateControllerInstances_Sort(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

// 2nd controller instance will share same workload space with 1st one
// Test case : 2nd controller instance will split workload space with 1st one
// 1. create 1st controller
controllerType := "foo"
controllerInstanceBase, cim := createControllerInstanceBaseAndCIM(t, client, nil, controllerType, stopCh)
controllerInstance1 := convertControllerBaseToControllerInstance(controllerInstanceBase)

// 2. create 2nd controller
cim2, _ := CreateTestControllerInstanceManager(stopCh)
hashKey1 := int64(4611686018427387904) // mid point
controllerInstance1_2 := newControllerInstance(controllerType, int64(10000), int32(100), true)
controllerInstance1_2 := newControllerInstance(cim2, controllerType, int64(10000), int32(100))
cim.addControllerInstance(controllerInstance1_2)
cim2.addControllerInstance(controllerInstance1_2)
cim2.addControllerInstance(controllerInstance1)

controllerInstances, err := listControllerInstancesByType(controllerType)
assert.Nil(t, err)
Expand All @@ -196,8 +262,13 @@ func TestConsolidateControllerInstances_Sort(t *testing.T) {
// 3nd controller instance will share same workload space with the first 2 - each take 1/3
hashKey1 = int64(3074457345618258603)
hashKey2 := int64(6148914691236517205)
controllerInstance1_3 := newControllerInstance("foo", int64(2000), 100, true)
cim3, _ := CreateTestControllerInstanceManager(stopCh)
controllerInstance1_3 := newControllerInstance(cim3, "foo", int64(2000), 100)
cim.addControllerInstance(controllerInstance1_3)
cim2.addControllerInstance(controllerInstance1_3)
cim3.addControllerInstance(controllerInstance1_3)
cim3.addControllerInstance(controllerInstance1)
cim3.addControllerInstance(controllerInstance1_2)
controllerInstances, err = listControllerInstancesByType(controllerType)
assert.Nil(t, err)
assert.NotNil(t, controllerInstances)
Expand Down Expand Up @@ -232,6 +303,7 @@ func TestIsInRange(t *testing.T) {

controllerType := "foo"
controllerInstanceBase, cim := createControllerInstanceBaseAndCIM(t, client, nil, controllerType, stopCh)
controllerInstance1 := convertControllerBaseToControllerInstance(controllerInstanceBase)

// check range
assert.True(t, controllerInstanceBase.IsInRange(int64(0)))
Expand All @@ -244,8 +316,11 @@ func TestIsInRange(t *testing.T) {
controllerInstanceBase.sortedControllerInstancesLocal[0].workloadNum = workloadNum1

hashKey1 := int64(100000)
controllerInstance2 := newControllerInstance(controllerType, hashKey1, workloadNum1, true)
cim2, _ := CreateTestControllerInstanceManager(stopCh)
controllerInstance2 := newControllerInstance(cim2, controllerType, hashKey1, workloadNum1)
cim.addControllerInstance(controllerInstance2)
cim2.addControllerInstance(controllerInstance2)
cim2.addControllerInstance(controllerInstance1)
controllerInstanceBase.instanceUpdateProcess(controllerType)

// check range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func CreateTestControllerInstanceManager(stopCh chan struct{}) (*ControllerInsta
cim.controllerListerSynced = alwaysReady
cim.notifyHandler = mockNotifyHander
checkInstanceHandler = mockCheckInstanceHander
return GetControllerInstanceManager(), informers
return GetInstanceHandler(), informers
}

func MockCreateControllerInstanceAndResetChs(stopCh chan struct{}) (*bcast.Member, *bcast.Group) {
cimUpdateChGrp := bcast.NewGroup()
cimUpdateCh := cimUpdateChGrp.Join()
informersResetChGrp := bcast.NewGroup()

cim := GetControllerInstanceManager()
cim := GetInstanceHandler()
if cim == nil {
cim, _ = CreateTestControllerInstanceManager(stopCh)
go cim.Run(stopCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type ControllerInstanceManager struct {

var instance *ControllerInstanceManager
var checkInstanceHandler = checkInstanceExistence
var GetInstanceHandler = getControllerInstanceManager

func GetControllerInstanceManager() *ControllerInstanceManager {
func getControllerInstanceManager() *ControllerInstanceManager {
return instance
}

Expand Down Expand Up @@ -110,8 +111,13 @@ func NewControllerInstanceManager(coInformer coreinformers.ControllerInstanceInf
return instance
}

func (cim *ControllerInstanceManager) GetInstanceId() types.UID {
return cim.instanceId
func GetInstanceId() types.UID {
cim := GetInstanceHandler()
if cim != nil {
return cim.instanceId
} else {
return ""
}
}

func (cim *ControllerInstanceManager) addControllerInstance(obj interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudfabric-controller/replicaset/replica_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh ch
cimUpdateChGrp := bcast.NewGroup()
cimUpdateCh := cimUpdateChGrp.Join()

cim := controllerframework.GetControllerInstanceManager()
cim := controllerframework.GetInstanceHandler()
if cim == nil {
cim, _ = controllerframework.CreateTestControllerInstanceManager(stopCh)
go cim.Run(stopCh)
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestWatchControllers(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

cim := controllerframework.GetControllerInstanceManager()
cim := controllerframework.GetInstanceHandler()
if cim == nil {
cim, _ = controllerframework.CreateTestControllerInstanceManager(stopCh)
go cim.Run(stopCh)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/cloudfabriccontrollers/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func dcSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *replicaset.R
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(configs, "deployment-informers")), resyncPeriod)

// controller instance manager set up
cim := controller.GetControllerInstanceManager()
cim := controller.GetInstanceHandler()
if cim == nil {
cimUpdateChGrp := bcast.NewGroup()
go cimUpdateChGrp.Broadcast(0)
Expand Down
Loading

0 comments on commit 42dba08

Please sign in to comment.