Skip to content

Commit

Permalink
Add support for CPA and CES to be in different namespace. (#216)
Browse files Browse the repository at this point in the history
* Add support for CPA and CES to be in different namespace.

- Owner references are valid only within the namespace, hence
owner reference between CPA and CES has been removed.
- As a side-effect of removing owner reference, CES doesn't get
deleted on CPA delete, hence issuing a delete from CPA delete.
- VMs are imported under CES namespace.
- Re-structured vm and vpc snapshot part in azure plugin.
- Make a single API call from inventory to plugin for fetching vm and vpc snapshot.
- Fixes for handling upgrade from previous nephe version(related to account namespace in CES).
- Added UT cases for covering basic cases, integration test
to have CES and CPA in different ns, verify imported vms based on
ces namespace.
- Entity selector integration test is re-written to include multi-ces.
- Fixed poller, azure plugin UTs, added CES webhook level UT.
- Got rid of all filters case and added CES webhook check for mandating either vpcMatch or vmMatch.

Signed-off-by: Archana Holla <[email protected]>
  • Loading branch information
archanapholla authored Jul 3, 2023
1 parent b875052 commit 60b1971
Show file tree
Hide file tree
Showing 46 changed files with 1,509 additions and 1,289 deletions.
4 changes: 2 additions & 2 deletions apis/crd/v1alpha1/cloudentityselector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type VirtualMachineSelector struct {
// CloudEntitySelectorSpec defines the desired state of CloudEntitySelector.
type CloudEntitySelectorSpec struct {
// AccountName specifies the name of CloudProviderAccount.
AccountName string `json:"accountName,omitempty"`
AccountName string `json:"accountName"`
// AccountNamespace specifies the namespace of CloudProviderAccount.
AccountNamespace string `json:"accountNamespace,omitempty"`
AccountNamespace string `json:"accountNamespace"`
// VMSelector selects the VirtualMachines the user has modify privilege.
// VMSelector is mandatory, at least one selector under VMSelector is required.
// It is an array, VirtualMachines satisfying any item on VMSelector are selected(ORed).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ spec:
type: object
type: array
required:
- accountName
- accountNamespace
- vmSelector
type: object
status:
Expand Down
2 changes: 2 additions & 0 deletions config/nephe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ spec:
type: object
type: array
required:
- accountName
- accountNamespace
- vmSelector
type: object
status:
Expand Down
34 changes: 15 additions & 19 deletions pkg/accountmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ type Interface interface {

type AccountManager struct {
client.Client
Log logr.Logger

Log logr.Logger
mutex sync.RWMutex
Inventory inventory.Interface
accPollers map[types.NamespacedName]*accountPoller
Expand Down Expand Up @@ -105,7 +104,7 @@ func (a *AccountManager) AddAccount(namespacedName *types.NamespacedName, accoun
if !exists {
if !util.DoesCesCrExistsForAccount(a.Client, namespacedName) {
a.Log.Info("Starting account poller", "account", namespacedName)
go wait.Until(accPoller.doAccountPolling, time.Duration(accPoller.PollIntvInSeconds)*time.Second, accPoller.ch)
go wait.Until(accPoller.doAccountPolling, time.Duration(accPoller.pollIntvInSeconds)*time.Second, accPoller.ch)
} else {
a.Log.V(1).Info("Ignoring start of account poller", "account", namespacedName)
if ctrlsync.GetControllerSyncStatusInstance().IsControllerSynced(ctrlsync.ControllerTypeCPA) && !config.initialized {
Expand All @@ -131,9 +130,8 @@ func (a *AccountManager) RemoveAccount(namespacedName *types.NamespacedName) err
// Stop and remove the poller.
_ = a.removeAccountPoller(namespacedName)

// Cleanup inventory data for this account.
// Cleanup vpc inventory data for this account, vm inventory is deleted in removeAccountPoller.
_ = a.Inventory.DeleteVpcsFromCache(namespacedName)
_ = a.Inventory.DeleteVmsFromCache(namespacedName)

defer func() {
// Remove account config.
Expand Down Expand Up @@ -204,12 +202,11 @@ func (a *AccountManager) RemoveResourceFiltersFromAccount(accNamespacedName *typ
selectorNamespacedName *types.NamespacedName) error {
cloudProviderType, ok := a.getAccountProviderType(accNamespacedName)
if !ok {
// If we cannot find cloud provider type, that means CPA may not be added or it's already removed.
// If we cannot find cloud provider type, that means CPA may not be added, or it's already removed.
return fmt.Errorf(fmt.Sprintf("failed to delete selector %v, account %v: provider type not found",
selectorNamespacedName, accNamespacedName))
}
cloudInterface, _ := cloud.GetCloudInterface(cloudProviderType)
a.Log.V(1).Info("Removing selectors for account", "name", accNamespacedName)
cloudInterface.RemoveAccountResourcesSelector(accNamespacedName, selectorNamespacedName)
// Delete selector config from the account config.
a.removeSelectorFromAccountConfig(accNamespacedName, selectorNamespacedName)
Expand All @@ -220,7 +217,7 @@ func (a *AccountManager) RemoveResourceFiltersFromAccount(accNamespacedName *typ
return fmt.Errorf(fmt.Sprintf("failed to delete selector %v, account %v: %v",
selectorNamespacedName, accNamespacedName, errorMsgAccountPollerNotFound))
}
accPoller.removeSelector(accNamespacedName)
_ = accPoller.inventory.DeleteVmsFromCache(accNamespacedName, selectorNamespacedName)
accPoller.restartPoller(accNamespacedName)
return nil
}
Expand All @@ -247,20 +244,19 @@ func (a *AccountManager) addAccountPoller(cloudInterface cloud.CloudInterface, n
accPoller, exists := a.getAccountPoller(namespacedName)
if exists {
// Update the polling interval.
accPoller.PollIntvInSeconds = *account.Spec.PollIntervalInSeconds
accPoller.pollIntvInSeconds = *account.Spec.PollIntervalInSeconds
return accPoller, true
}

// Add and init the new poller.
poller := &accountPoller{
Client: a.Client,
log: a.Log.WithName("Poller"),
PollIntvInSeconds: *account.Spec.PollIntervalInSeconds,
cloudInterface: cloudInterface,
namespacedName: namespacedName,
selector: nil,
ch: make(chan struct{}),
inventory: a.Inventory,
Client: a.Client,
log: a.Log.WithName("Poller"),
pollIntvInSeconds: *account.Spec.PollIntervalInSeconds,
cloudInterface: cloudInterface,
accountNamespacedName: namespacedName,
ch: make(chan struct{}),
inventory: a.Inventory,
}
poller.initVmSelectorCache()

Expand All @@ -277,7 +273,7 @@ func (a *AccountManager) removeAccountPoller(namespacedName *types.NamespacedNam
if !exists {
return fmt.Errorf(fmt.Sprintf("%v %v", errorMsgAccountPollerNotFound, namespacedName))
}
accPoller.removeSelector(namespacedName)
_ = accPoller.inventory.DeleteAllVmsFromCache(namespacedName)
accPoller.stopPoller()

a.mutex.Lock()
Expand Down Expand Up @@ -391,7 +387,7 @@ func (a *AccountManager) handleAddProviderAccountError(namespacedName *types.Nam
// Account poller is removed upon any error in the plug-in.
_ = a.removeAccountPoller(namespacedName)
_ = a.Inventory.DeleteVpcsFromCache(namespacedName)
_ = a.Inventory.DeleteVmsFromCache(namespacedName)
_ = a.Inventory.DeleteAllVmsFromCache(namespacedName)
// TODO: require lock to write into account config structure.
config.initialized = false
if strings.Contains(err.Error(), util.ErrorMsgSecretReference) {
Expand Down
87 changes: 33 additions & 54 deletions pkg/accountmanager/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1"
"antrea.io/nephe/pkg/cloudprovider/cloud"
"antrea.io/nephe/pkg/inventory"
nephetypes "antrea.io/nephe/pkg/types"
)

const (
Expand All @@ -42,15 +43,14 @@ type accountPoller struct {
client.Client
log logr.Logger

PollIntvInSeconds uint
PollDone bool
cloudInterface cloud.CloudInterface
namespacedName *types.NamespacedName
selector *crdv1alpha1.CloudEntitySelector
vmSelector cache.Indexer
ch chan struct{}
mutex sync.RWMutex
inventory inventory.Interface
pollIntvInSeconds uint
pollDone bool
cloudInterface cloud.CloudInterface
accountNamespacedName *types.NamespacedName
vmSelector cache.Indexer
ch chan struct{}
mutex sync.RWMutex
inventory inventory.Interface
}

// initVmSelectorCache inits account poller selector cache and its indexers.
Expand Down Expand Up @@ -126,16 +126,6 @@ func (p *accountPoller) addOrUpdateSelector(selector *crdv1alpha1.CloudEntitySel
}
}
}

// Store selector to filter cloud resources based on selector.
p.selector = selector
}

// removeSelector reset selector in account poller.
func (p *accountPoller) removeSelector(accountNamespacedName *types.NamespacedName) {
p.selector = nil
// Remove VMs from the cache when selectors are removed.
_ = p.inventory.DeleteVmsFromCache(accountNamespacedName)
}

// doAccountPolling calls the cloud plugin and fetches the cloud inventory. Once successful poll, updates the cloud
Expand All @@ -145,44 +135,42 @@ func (p *accountPoller) doAccountPolling() {
p.mutex.Lock()
defer p.mutex.Unlock()

p.PollDone = false
p.pollDone = false
// Ignoring error since it is captured in the CloudProviderAccount CR's status field.
_ = p.cloudInterface.DoInventoryPoll(p.namespacedName)
_ = p.cloudInterface.DoInventoryPoll(p.accountNamespacedName)

defer func() {
p.PollDone = true
p.pollDone = true
// Update status on CPA CR after polling.
p.updateAccountStatus(p.cloudInterface)
}()

// TODO: Avoid calling plugin to get VPC inventory from snapshot.
vpcMap, err := p.cloudInterface.GetVpcInventory(p.namespacedName)
cloudInventory, err := p.cloudInterface.GetCloudInventory(p.accountNamespacedName)
if err != nil {
p.log.Error(err, "failed to fetch cloud vpc list from internal snapshot", "account",
p.namespacedName.String())
p.log.Error(err, "failed to fetch cloud inventory from internal snapshot", "account",
p.accountNamespacedName)
return
}
_ = p.inventory.BuildVpcCache(vpcMap, p.namespacedName)

// Perform VM Operations only when CES is added.
vmCount := 0
if p.selector != nil {
// TODO: Avoid calling plugin to get VM inventory from snapshot.
virtualMachines := p.getComputeResources(p.cloudInterface)
// TODO: We are walking thru virtual map twice. Once here and second one in BuildVmCAche.
// May be expose Add, Delete, Update routine in inventory and we do the calculation here.
p.processCloudInventory(cloudInventory)
}

// processCloudInventory fetches vpc and vm inventory from the snapshot and updates respective cache inventory.
func (p *accountPoller) processCloudInventory(cloudInventory *nephetypes.CloudInventory) {
_ = p.inventory.BuildVpcCache(cloudInventory.VpcMap, p.accountNamespacedName)

// VMs are stored per selector in the VmMap.
for selectorNamespacedName, virtualMachines := range cloudInventory.VmMap {
// Maybe expose, Add, Delete, Update routine in inventory, and do the calculation here.
p.updateAgentState(virtualMachines)
p.inventory.BuildVmCache(virtualMachines, p.namespacedName)
vmCount = len(virtualMachines)
p.inventory.BuildVmCache(virtualMachines, p.accountNamespacedName, &selectorNamespacedName)
}
p.log.Info("Discovered compute resources statistics", "Account", p.namespacedName,
"Vpcs", len(vpcMap), "VirtualMachines", vmCount)
}

// updateAccountStatus updates status of a CPA object when it's changed.
func (p *accountPoller) updateAccountStatus(cloudInterface cloud.CloudInterface) {
discoveredStatus := crdv1alpha1.CloudProviderAccountStatus{}
status, err := cloudInterface.GetAccountStatus(p.namespacedName)
status, err := cloudInterface.GetAccountStatus(p.accountNamespacedName)
if err != nil {
discoveredStatus.Error = fmt.Sprintf("failed to get account status, err %v", err)
} else if status != nil {
Expand All @@ -191,22 +179,22 @@ func (p *accountPoller) updateAccountStatus(cloudInterface cloud.CloudInterface)

updateStatusFunc := func() error {
account := &crdv1alpha1.CloudProviderAccount{}
if err := p.Get(context.TODO(), *p.namespacedName, account); err != nil {
if err := p.Get(context.TODO(), *p.accountNamespacedName, account); err != nil {
return nil
}
if account.Status != discoveredStatus {
account.Status.Error = discoveredStatus.Error
p.log.Info("Setting CPA status", "account", p.namespacedName, "message", discoveredStatus.Error)
p.log.Info("Setting CPA status", "account", p.accountNamespacedName, "message", discoveredStatus.Error)
if err = p.Client.Status().Update(context.TODO(), account); err != nil {
p.log.Error(err, "failed to update CPA status, retrying", "account", p.namespacedName)
p.log.Error(err, "failed to update CPA status, retrying", "account", p.accountNamespacedName)
return err
}
}
return nil
}

if err := retry.RetryOnConflict(retry.DefaultRetry, updateStatusFunc); err != nil {
p.log.Error(err, "failed to update CPA status", "account", p.namespacedName)
p.log.Error(err, "failed to update CPA status", "account", p.accountNamespacedName)
}
}

Expand All @@ -217,15 +205,6 @@ func (p *accountPoller) updateAgentState(vms map[string]*runtimev1alpha1.Virtual
}
}

func (p *accountPoller) getComputeResources(cloudInterface cloud.CloudInterface) map[string]*runtimev1alpha1.VirtualMachine {
virtualMachines, e := cloudInterface.InstancesGivenProviderAccount(p.namespacedName)
if e != nil {
p.log.Error(e, "failed to discover compute resources", "account", p.namespacedName)
return map[string]*runtimev1alpha1.VirtualMachine{}
}
return virtualMachines
}

// getVmSelectorMatch returns a VMSelector for a VirtualMachine only if it is agented.
func (p *accountPoller) getVmSelectorMatch(vm *runtimev1alpha1.VirtualMachine) *crdv1alpha1.VirtualMachineSelector {
vmSelectors, _ := p.vmSelector.ByIndex(virtualMachineSelectorMatchIndexerByID, vm.Status.CloudId)
Expand Down Expand Up @@ -278,7 +257,7 @@ func (p *accountPoller) waitForPollDone(accountNamespacedName *types.NamespacedN
if err := wait.PollImmediate(100*time.Millisecond, defaultPollTimeout, func() (done bool, err error) {
p.mutex.RLock()
defer p.mutex.RUnlock()
if p.PollDone {
if p.pollDone {
return true, nil
}
return false, nil
Expand All @@ -300,7 +279,7 @@ func (p *accountPoller) restartPoller(name *types.NamespacedName) {

p.log.Info("Restarting account poller", "account", name)
p.ch = make(chan struct{})
go wait.Until(p.doAccountPolling, time.Duration(p.PollIntvInSeconds)*time.Second, p.ch)
go wait.Until(p.doAccountPolling, time.Duration(p.pollIntvInSeconds)*time.Second, p.ch)
}

// stopPoller stops account poller thread if it's running.
Expand Down
Loading

0 comments on commit 60b1971

Please sign in to comment.