diff --git a/backends/backends.go b/backends/backends.go index e05e07e891..d1971e3c43 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -223,7 +223,7 @@ func (b *Backends) Ensure(svcPorts []ServicePort, igs []*compute.InstanceGroup) ports = append(ports, p.Port) } var err error - igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports) + igs, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports) if err != nil { return err } diff --git a/controller/cluster_manager.go b/controller/cluster_manager.go index ff058debb4..cac0fbbec8 100644 --- a/controller/cluster_manager.go +++ b/controller/cluster_manager.go @@ -172,7 +172,7 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.Se for _, p := range servicePorts { ports = append(ports, p.Port) } - igs, _, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports) + igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports) return igs, err } diff --git a/controller/utils_test.go b/controller/utils_test.go index 62e6141bb9..5ef2acb8d2 100644 --- a/controller/utils_test.go +++ b/controller/utils_test.go @@ -71,7 +71,7 @@ func TestInstancesAddedToZones(t *testing.T) { // Create 2 igs, one per zone. testIG := "test-ig" - lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, []int64{int64(3001)}) + lbc.CloudClusterManager.instancePool.EnsureInstanceGroupsAndPorts(testIG, []int64{int64(3001)}) // node pool syncs kube-nodes, this will add them to both igs. lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"}) diff --git a/instances/instances.go b/instances/instances.go index 1619f987a6..1e78db270d 100644 --- a/instances/instances.go +++ b/instances/instances.go @@ -48,7 +48,10 @@ type Instances struct { // - cloud: implements InstanceGroups, used to sync Kubernetes nodes with // members of the cloud InstanceGroup. func NewNodePool(cloud InstanceGroups) NodePool { - return &Instances{cloud, storage.NewInMemoryPool(), nil} + return &Instances{ + cloud: cloud, + snapshotter: storage.NewInMemoryPool(), + } } // Init initializes the instance pool. The given zoneLister is used to list @@ -58,72 +61,85 @@ func (i *Instances) Init(zl zoneLister) { i.zoneLister = zl } -// AddInstanceGroup creates or gets an instance group if it doesn't exist +// EnsureInstanceGroupsAndPorts creates or gets an instance group if it doesn't exist // and adds the given ports to it. Returns a list of one instance group per zone, // all of which have the exact same named ports. -func (i *Instances) AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) { - igs := []*compute.InstanceGroup{} - - var namedPorts []*compute.NamedPort - for _, port := range ports { - namedPorts = append(namedPorts, utils.GetNamedPort(port)) - } - +func (i *Instances) EnsureInstanceGroupsAndPorts(name string, ports []int64) (igs []*compute.InstanceGroup, err error) { zones, err := i.ListZones() if err != nil { - return igs, namedPorts, err + return nil, err } defer i.snapshotter.Add(name, struct{}{}) for _, zone := range zones { - ig, err := i.Get(name, zone) - if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - glog.Errorf("Failed to get instance group %v/%v, err: %v", zone, name, err) - return nil, nil, err + ig, err := i.ensureInstanceGroupAndPorts(name, zone, ports) + if err != nil { + return nil, err } - if ig == nil { - glog.Infof("Creating instance group %v in zone %v", name, zone) - if err = i.cloud.CreateInstanceGroup(&compute.InstanceGroup{Name: name}, zone); err != nil { - // Error may come back with StatusConflict meaning the instance group was created by another controller - // possibly the Service Controller for internal load balancers. - if utils.IsHTTPErrorCode(err, http.StatusConflict) { - glog.Warningf("Failed to create instance group %v/%v due to conflict status, but continuing sync. err: %v", zone, name, err) - } else { - glog.Errorf("Failed to create instance group %v/%v, err: %v", zone, name, err) - return nil, nil, err - } - } - ig, err = i.cloud.GetInstanceGroup(name, zone) - if err != nil { - glog.Errorf("Failed to get instance group %v/%v after ensuring existence, err: %v", zone, name, err) - return nil, nil, err + igs = append(igs, ig) + } + return igs, nil +} + +func (i *Instances) ensureInstanceGroupAndPorts(name, zone string, ports []int64) (*compute.InstanceGroup, error) { + ig, err := i.Get(name, zone) + if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + glog.Errorf("Failed to get instance group %v/%v, err: %v", zone, name, err) + return nil, err + } + + if ig == nil { + glog.V(3).Infof("Creating instance group %v/%v.", zone, name) + if err = i.cloud.CreateInstanceGroup(&compute.InstanceGroup{Name: name}, zone); err != nil { + // Error may come back with StatusConflict meaning the instance group was created by another controller + // possibly the Service Controller for internal load balancers. + if utils.IsHTTPErrorCode(err, http.StatusConflict) { + glog.Warningf("Failed to create instance group %v/%v due to conflict status, but continuing sync. err: %v", zone, name, err) + } else { + glog.Errorf("Failed to create instance group %v/%v, err: %v", zone, name, err) + return nil, err } - } else { - glog.V(3).Infof("Instance group %v already exists in zone %v", name, zone) } - - existingPorts := map[int64]bool{} - for _, np := range ig.NamedPorts { - existingPorts[np.Port] = true + ig, err = i.cloud.GetInstanceGroup(name, zone) + if err != nil { + glog.Errorf("Failed to get instance group %v/%v after ensuring existence, err: %v", zone, name, err) + return nil, err } - var newPorts []*compute.NamedPort - for _, np := range namedPorts { - if existingPorts[np.Port] { - glog.V(5).Infof("Instance group %v already has named port %+v", ig.Name, np) - continue - } - newPorts = append(newPorts, np) + } else { + glog.V(5).Infof("Instance group %v/%v already exists.", zone, name) + } + + // Build map of existing ports + existingPorts := map[int64]bool{} + for _, np := range ig.NamedPorts { + existingPorts[np.Port] = true + } + + // Determine which ports need to be added + var newPorts []int64 + for _, p := range ports { + if existingPorts[p] { + glog.V(5).Infof("Instance group %v/%v already has named port %v", zone, ig.Name, p) + continue } - if len(newPorts) > 0 { - glog.V(3).Infof("Instance group %v/%v does not have ports %+v, adding them now.", zone, name, newPorts) - if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, newPorts...)); err != nil { - return nil, nil, err - } + newPorts = append(newPorts, p) + } + + // Build slice of NamedPorts for adding + var newNamedPorts []*compute.NamedPort + for _, v := range newPorts { + newNamedPorts = append(newNamedPorts, utils.GetNamedPort(v)) + } + + if len(newNamedPorts) > 0 { + glog.V(3).Infof("Instance group %v/%v does not have ports %+v, adding them now.", zone, name, newPorts) + if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, newNamedPorts...)); err != nil { + return nil, err } - igs = append(igs, ig) } - return igs, namedPorts, nil + + return ig, nil } // DeleteInstanceGroup deletes the given IG by name, from all zones. diff --git a/instances/instances_test.go b/instances/instances_test.go index f0010d2fd8..76ff446b8e 100644 --- a/instances/instances_test.go +++ b/instances/instances_test.go @@ -34,7 +34,7 @@ func TestNodePoolSync(t *testing.T) { f := NewFakeInstanceGroups(sets.NewString( []string{"n1", "n2"}...)) pool := newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", []int64{80}) + pool.EnsureInstanceGroupsAndPorts("test", []int64{80}) // KubeNodes: n1 // GCENodes: n1, n2 @@ -53,7 +53,7 @@ func TestNodePoolSync(t *testing.T) { f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...)) pool = newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", []int64{80}) + pool.EnsureInstanceGroupsAndPorts("test", []int64{80}) f.calls = []int{} kubeNodes = sets.NewString([]string{"n1", "n2"}...) @@ -69,7 +69,7 @@ func TestNodePoolSync(t *testing.T) { f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...)) pool = newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", []int64{80}) + pool.EnsureInstanceGroupsAndPorts("test", []int64{80}) f.calls = []int{} kubeNodes = sets.NewString([]string{"n1", "n2"}...) @@ -112,7 +112,7 @@ func TestSetNamedPorts(t *testing.T) { // TODO: Add tests to remove named ports when we support that. } for _, test := range testCases { - igs, _, err := pool.AddInstanceGroup("ig", test.activePorts) + igs, err := pool.EnsureInstanceGroupsAndPorts("ig", test.activePorts) if err != nil { t.Fatalf("unexpected error in setting ports %v to instance group: %s", test.activePorts, err) } diff --git a/instances/interfaces.go b/instances/interfaces.go index f0d4f0096e..70575b89be 100644 --- a/instances/interfaces.go +++ b/instances/interfaces.go @@ -32,7 +32,7 @@ type NodePool interface { Init(zl zoneLister) // The following 2 methods operate on instance groups. - AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) + EnsureInstanceGroupsAndPorts(name string, ports []int64) ([]*compute.InstanceGroup, error) DeleteInstanceGroup(name string) error // TODO: Refactor for modularity diff --git a/instances/utils.go b/instances/utils.go index 0b1176c0c7..1843fed212 100644 --- a/instances/utils.go +++ b/instances/utils.go @@ -8,6 +8,6 @@ import ( // Helper method to create instance groups. // This method exists to ensure that we are using the same logic at all places. -func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) { - return nodePool.AddInstanceGroup(namer.IGName(), ports) +func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, ports []int64) ([]*compute.InstanceGroup, error) { + return nodePool.EnsureInstanceGroupsAndPorts(namer.IGName(), ports) }