Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding of snatglobalinfo #1011

Open
wants to merge 1 commit into
base: kmr2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 62 additions & 25 deletions pkg/controller/snatglobalinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (cont *AciController) handleSnatNodeInfo(nodeinfo *nodeinfo.NodeInfo) bool

func (cont *AciController) syncSnatGlobalInfo() bool {
env := cont.env.(*K8sEnvironment)
requeue := false
globalcl := env.snatGlobalClient
if globalcl == nil {
return false
Expand All @@ -344,36 +345,72 @@ func (cont *AciController) syncSnatGlobalInfo() bool {
}
}
cont.indexMutex.Unlock()
snatglobalInfo, err := util.GetGlobalInfoCR(*globalcl)
if errors.IsNotFound(err) {
snatglinfos, err := util.ListGlobalInfoCRs(*globalcl)
if err != nil {
cont.log.Error("Failed to list SnatGlobalInfoCRs", err)
requeue = true
} else if snatglinfos != nil && len(snatglinfos) > 0 {
//delete CR if it's not present in
for _, glinfocr := range snatglinfos {
namecr := glinfocr.Spec.NodeName
delete := true
for namecache := range glInfoCache {
if namecr == namecache {
delete = false
break
}
}
if delete {
err := util.DeleteGlobalInfoCR(*globalcl, namecr)
if err != nil {
requeue = true
cont.log.Error(namecr, " SnatGlobalInfoCR deletion failed :", err)
continue
}
cont.log.Info("Deleted SnatGlobalInfoCR :", namecr)
}
}

}
for name, glinfo := range glInfoCache {
spec := snatglobalinfo.SnatGlobalInfoSpec{
GlobalInfos: glInfoCache,
NodeName: name,
GlobalInfos: map[string]snatglobalinfo.GlobalInfoList{
name: glinfo,
},
}
if globalcl != nil {
err := util.CreateSnatGlobalInfoCR(*globalcl, spec)
if err != nil {
cont.log.Error("SnatGlobalInfoCR Create failed requeue the request", err)
return true
snatglobalInfo, err := util.GetGlobalInfoCR(*globalcl, name)
if errors.IsNotFound(err) {
if globalcl != nil {
err := util.CreateSnatGlobalInfoCR(*globalcl, spec)
if err != nil {
cont.log.Error(name, " SnatGlobalInfoCR Create failed requeue the request", err)
requeue = true
continue
}
}
continue
} else if err != nil {
cont.log.Error(name, " SnatGlobalInfoCR Create failed requeue the request-1: ", err)
requeue = true
continue
}
return false
} else if err != nil {
cont.log.Error("SnatGlobalInfoCR Create failed requeue the request-1: ", err)
return true
}
if reflect.DeepEqual(snatglobalInfo.Spec.GlobalInfos, glInfoCache) {
return false
}
snatglobalInfo.Spec.GlobalInfos = glInfoCache
cont.log.Debug("Update GlobalInfo cache: ", glInfoCache)
cont.log.Debug("Updating GlobalInfo CR")
err = util.UpdateGlobalInfoCR(*globalcl, snatglobalInfo)
if err != nil {
cont.log.Error("GlobalInfo CR Update Failed: ", err)
return true
if reflect.DeepEqual(snatglobalInfo.Spec, spec) {
continue
}
snatglobalInfo.Spec = spec
cont.log.Debug("Update GlobalInfo cache: ", spec)
cont.log.Debug("Updating GlobalInfo CR: ", name)
err = util.UpdateGlobalInfoCR(*globalcl, snatglobalInfo)
if err != nil {
cont.log.Error(name, " GlobalInfo CR Update Failed: ", err)
requeue = true
continue
}
cont.log.Debug(name, " GlobalInfo CR successfully updated")

}
cont.log.Debug("GlobalInfo CR successfully updated")
return false
return requeue
}

func (cont *AciController) updateGlobalInfoforPolicy(portrange snatglobalinfo.PortRange,
Expand Down
29 changes: 16 additions & 13 deletions pkg/controller/snats.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,29 +291,32 @@ func (cont *AciController) createGlobalInfoCache(unittestmode bool) bool {
}
cont.log.Fatalf("snatglobalinfo client not found")
}
snatglobalInfo, err := util.GetGlobalInfoCR(*globalcl)
//snatglobalInfo, err := util.GetGlobalInfoCR(*globalcl)
snatglobalInfos, err := util.ListGlobalInfoCRs(*globalcl)

if err != nil {
cont.log.Info("No existing snatglobalinfo CR found in controller bootstrap")
if err != nil || len(snatglobalInfos) < 1 {
cont.log.Error("No existing snatglobalinfo CR found in controller bootstrap")
} else {
cont.log.Info("Syncing snatglobalinfo cache with existing CR")
cont.log.Info("Syncing snatglobalinfo cache with existing CRs")
macAddressToNodeName := make(map[string]string)
for _, value := range nodeInfos {
nodeName := value.ObjectMeta.Name
macAddress := value.Spec.Macaddress
macAddressToNodeName[macAddress] = nodeName
}

for _, glinfos := range snatglobalInfo.Spec.GlobalInfos {
for _, v := range glinfos {
nodeName := macAddressToNodeName[v.MacAddress]
snatIP := v.SnatIp
if _, ok := cont.snatGlobalInfoCache[snatIP]; !ok {
cont.snatGlobalInfoCache[snatIP] = make(map[string]*snatglobalinfo.GlobalInfo)
for _, snatglinfos := range snatglobalInfos {
for _, ginfos := range snatglinfos.Spec.GlobalInfos {
for _, v := range ginfos {
nodeName := macAddressToNodeName[v.MacAddress]
snatIP := v.SnatIp
if _, ok := cont.snatGlobalInfoCache[snatIP]; !ok {
cont.snatGlobalInfoCache[snatIP] = make(map[string]*snatglobalinfo.GlobalInfo)
}
copiedValue := v
cont.snatGlobalInfoCache[snatIP][nodeName] = &copiedValue
cont.log.Info("Adding globalinfo entry for snatIP ", snatIP, " and node name ", nodeName, ": ", cont.snatGlobalInfoCache[snatIP][nodeName])
}
copiedValue := v
cont.snatGlobalInfoCache[snatIP][nodeName] = &copiedValue
cont.log.Info("Adding globalinfo entry for snatIP ", snatIP, " and node name ", nodeName, ": ", cont.snatGlobalInfoCache[snatIP][nodeName])
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/hostagent/integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func mkSnatGlobalObj() *snatglobal.SnatGlobalInfo {
}
}
}
return snatglobaldata("123456", "snatglobalinfo", "test-node", "aci", newglobal)
return snatglobaldata("123456", "test-node", "aci", newglobal)
}
func (it *integ) checkEpSnatUids(id int, uids []string, sg string) {
epid := fmt.Sprintf("%d%s_%d%s_", id, testPodID, id, testPodID)
Expand Down
18 changes: 4 additions & 14 deletions pkg/hostagent/snats.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,6 @@ func (agent *HostAgent) snaGlobalInfoChanged(snatobj interface{}, logger *logrus
}
logger.Debug("Snat Global info Changed...")
globalInfo := snat.Spec.GlobalInfos
// This case is possible when all the pods will be deleted from that node
if len(globalInfo) < len(agent.opflexSnatGlobalInfos) {
for nodename := range agent.opflexSnatGlobalInfos {
if _, ok := globalInfo[nodename]; !ok {
delete(agent.opflexSnatGlobalInfos, nodename)
syncSnat = true
}
}
}
for nodename, val := range globalInfo {
var newglobalinfos []*opflexSnatGlobalInfo
for _, v := range val {
Expand Down Expand Up @@ -701,12 +692,11 @@ func (agent *HostAgent) snaGlobalInfoChanged(snatobj interface{}, logger *logrus
func (agent *HostAgent) snatGlobalInfoDelete(obj interface{}) {
agent.log.Debug("Snat Global Info Obj Delete")
snat := obj.(*snatglobal.SnatGlobalInfo)
globalInfo := snat.Spec.GlobalInfos
nodename := snat.Spec.NodeName
agent.indexMutex.Lock()
for nodename := range globalInfo {
if _, ok := agent.opflexSnatGlobalInfos[nodename]; ok {
delete(agent.opflexSnatGlobalInfos, nodename)
}

if _, ok := agent.opflexSnatGlobalInfos[nodename]; ok {
delete(agent.opflexSnatGlobalInfos, nodename)
}
agent.indexMutex.Unlock()
}
Expand Down
26 changes: 12 additions & 14 deletions pkg/hostagent/snats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ type portRange struct {
end int
}

func snatglobaldata(uuid string, name string, nodename string, namespace string, globalinfo snatglobal.GlobalInfoList) *snatglobal.SnatGlobalInfo {
func snatglobaldata(uuid string, nodename string, namespace string, globalinfo snatglobal.GlobalInfoList) *snatglobal.SnatGlobalInfo {
GlobalInfos := make(map[string]snatglobal.GlobalInfoList, 10)
GlobalInfos[nodename] = globalinfo
return &snatglobal.SnatGlobalInfo{
Spec: snatglobal.SnatGlobalInfoSpec{
GlobalInfos: GlobalInfos,
NodeName: nodename,
},
ObjectMeta: metav1.ObjectMeta{
UID: apitypes.UID(uuid),
Namespace: namespace,
Name: name,
Name: nodename,
Labels: map[string]string{},
},
}
Expand Down Expand Up @@ -74,7 +75,6 @@ func snatpolicydata(name string, namespace string,
}

type snatGlobal struct {
name string
ip string
mac string
port_range portRange
Expand All @@ -93,7 +93,6 @@ type policy struct {

var snatGlobals = []snatGlobal{
{
"snatglobalinfo",
"10.1.1.8",
"00:0c:29:92:fe:d0",
portRange{4000, 5000},
Expand All @@ -104,7 +103,6 @@ var snatGlobals = []snatGlobal{
},

{
"snatglobalinfo",
"10.1.1.9",
"00:0c:29:92:fe:d1",
portRange{7000, 8000},
Expand Down Expand Up @@ -145,28 +143,28 @@ func (agent *testHostAgent) doTestSnat(t *testing.T, tempdir string,
var raw []byte
snat := &OpflexSnatIp{}

tu.WaitFor(t, pt.name, 2000*time.Millisecond,
tu.WaitFor(t, pt.nodename, 2000*time.Millisecond,
func(last bool) (bool, error) {
var err error
snatfile := filepath.Join(tempdir,
pt.uuid+".snat")
raw, err = ioutil.ReadFile(snatfile)
if !tu.WaitNil(t, last, err, desc, pt.name, "read snat") {
if !tu.WaitNil(t, last, err, desc, pt.nodename, "read snat") {
return false, nil
}
err = json.Unmarshal(raw, snat)
agent.log.Info("Snat file added ", snatfile)
return tu.WaitNil(t, last, err, desc, pt.name, "unmarshal snat"), nil
return tu.WaitNil(t, last, err, desc, pt.nodename, "unmarshal snat"), nil
})
agent.log.Info("Snat Object added ", snat)
snatdstr := pt.uuid
assert.Equal(t, snatdstr, snat.Uuid, desc, pt.name, "uuid")
assert.Equal(t, pt.ip, snat.SnatIp, desc, pt.name, "ip")
assert.Equal(t, snatdstr, snat.Uuid, desc, pt.nodename, "uuid")
assert.Equal(t, pt.ip, snat.SnatIp, desc, pt.nodename, "ip")
switch {
case pt.policyname == "policy1":
assert.Equal(t, []string{"10.10.10.0/31", "10.10.10.0/26", "10.10.10.0/24"}, snat.DestIpAddress, desc, pt.name, "destip")
assert.Equal(t, []string{"10.10.10.0/31", "10.10.10.0/26", "10.10.10.0/24"}, snat.DestIpAddress, desc, pt.nodename, "destip")
case pt.policyname == "policy2":
assert.Equal(t, []string{"10.10.0.0/16"}, snat.DestIpAddress, desc, pt.name, "destip")
assert.Equal(t, []string{"10.10.0.0/16"}, snat.DestIpAddress, desc, pt.nodename, "destip")
}
//assert.Equal(t, pt.port_range.start, snat.PortRange[0].Start, desc, pt.name, "port start")
//assert.Equal(t, pt.port_range.end, snat.PortRange[0].End, desc, pt.name, "port end")
Expand Down Expand Up @@ -236,7 +234,7 @@ func TestSnatSync(t *testing.T) {
agent.log.Info("Global added##### ", newglobal)
}
}
snatglobalinfo = snatglobaldata(pt.uuid, pt.name, pt.nodename, pt.namespace, newglobal)
snatglobalinfo = snatglobaldata(pt.uuid, pt.nodename, pt.namespace, newglobal)
agent.fakeSnatGlobalSource.Add(snatglobalinfo)
agent.log.Info("Complete Globale Info #### ", snatglobalinfo)
agent.doTestSnat(t, tempdir, &pt, "create")
Expand Down Expand Up @@ -302,7 +300,7 @@ func TestSnatPortExhausted(t *testing.T) {
globalinfo.PortRanges = portrange
globalinfo.SnatPolicyName = "policy1"
newglobal = append(newglobal, globalinfo)
snatglobalinfo = snatglobaldata("policy1-uid", "snatglobalinfo", "test-node-1", "testns", newglobal)
snatglobalinfo = snatglobaldata("policy1-uid", "test-node-1", "testns", newglobal)
agent.fakeSnatGlobalSource.Add(snatglobalinfo)
time.Sleep(1000 * time.Millisecond)
// modify the policy with port exhaused
Expand Down
1 change: 1 addition & 0 deletions pkg/snatglobalinfo/apis/aci.snat/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type SnatGlobalInfoSpec struct {
// Add custom validation using kubebuilder tags: https://book.kubebuilder.io/beyond_basics/generating_crd.html
// +kubebuilder:validation:Enum=selector, node
GlobalInfos map[string]GlobalInfoList `json:"globalInfos"`
NodeName string `json:"nodeName"`
}

// SnatGlobalInfoStatus defines the observed state of SnatGlobalInfo
Expand Down
33 changes: 27 additions & 6 deletions pkg/util/snat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package util

import (
"context"
"os"
"sort"
"strconv"

nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
nodeinfoclset "github.com/noironetworks/aci-containers/pkg/nodeinfo/clientset/versioned"
snatglobal "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
Expand All @@ -24,9 +28,6 @@ import (
snatpolicyclset "github.com/noironetworks/aci-containers/pkg/snatpolicy/clientset/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"os"
"sort"
"strconv"
)

type StartSorter []snatglobal.PortRange
Expand Down Expand Up @@ -64,9 +65,10 @@ func ExpandPortRanges(currPortRange []snatglobal.PortRange, step int) []snatglob
func CreateSnatGlobalInfoCR(c snatglobalclset.Clientset,
globalInfoSpec snatglobal.SnatGlobalInfoSpec) error {
ns := os.Getenv("ACI_SNAT_NAMESPACE")
name := globalInfoSpec.NodeName
obj := &snatglobal.SnatGlobalInfo{
ObjectMeta: metav1.ObjectMeta{
Name: os.Getenv("ACI_SNAGLOBALINFO_NAME"),
Name: name,
Namespace: ns,
},
Spec: globalInfoSpec,
Expand All @@ -88,15 +90,34 @@ func UpdateGlobalInfoCR(c snatglobalclset.Clientset, globalInfo snatglobal.SnatG
return nil
}

func GetGlobalInfoCR(c snatglobalclset.Clientset) (snatglobal.SnatGlobalInfo, error) {
func GetGlobalInfoCR(c snatglobalclset.Clientset, name string) (snatglobal.SnatGlobalInfo, error) {
ns := os.Getenv("ACI_SNAT_NAMESPACE")
globalinfo, err := c.AciV1().SnatGlobalInfos(ns).Get(context.TODO(), os.Getenv("ACI_SNAGLOBALINFO_NAME"), metav1.GetOptions{})
globalinfo, err := c.AciV1().SnatGlobalInfos(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return snatglobal.SnatGlobalInfo{}, err
}
return *globalinfo, nil
}

func DeleteGlobalInfoCR(c snatglobalclset.Clientset, name string) error {
ns := os.Getenv("ACI_SNAT_NAMESPACE")
err := c.AciV1().SnatGlobalInfos(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
return err
}

func ListGlobalInfoCRs(c snatglobalclset.Clientset) ([]snatglobal.SnatGlobalInfo, error) {
var globalinfos []snatglobal.SnatGlobalInfo
ns := os.Getenv("ACI_SNAT_NAMESPACE")
globalinfolist, err := c.AciV1().SnatGlobalInfos(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return globalinfos, err
}
if globalinfolist != nil && len(globalinfolist.Items) > 0 {
globalinfos = globalinfolist.Items
}
return globalinfos, nil
}

// CreateNodeInfoCR Creates a NodeInfo CR
func CreateNodeInfoCR(c nodeinfoclset.Clientset,
nodeInfoSpec nodeinfo.NodeInfoSpec, nodename string) error {
Expand Down