Skip to content

Commit

Permalink
Merge pull request #389 from freehan/cherrypick-1.2
Browse files Browse the repository at this point in the history
Cherrypick #381and #384 into release-1.2
  • Loading branch information
MrHohn authored Jul 9, 2018
2 parents da79350 + 7dbe8da commit bf3b6fd
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 103 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
}

glog.V(2).Infof("Flags = %+v", flags.F)

defer glog.Flush()
kubeConfig, err := app.NewKubeConfig()
if err != nil {
glog.Fatalf("Failed to create kubernetes client config: %v", err)
Expand Down
20 changes: 13 additions & 7 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestBackendPoolAdd(t *testing.T) {
t.Fatalf("Port %v not added to instance group", sp)
}

hc, err := pool.healthChecker.Get(beName, sp.Version())
hc, err := pool.healthChecker.Get(beName, features.VersionFromServicePort(&sp))
if err != nil {
t.Fatalf("Unexpected err when querying fake healthchecker: %v", err)
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestHealthCheckMigration(t *testing.T) {
pool.Ensure([]utils.ServicePort{p}, nil)

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(beName, p.Version())
hc, _ := pool.healthChecker.Get(beName, features.VersionFromServicePort(&p))
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestBackendPoolUpdateHTTPS(t *testing.T) {
}

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(beName, p.Version())
hc, _ := pool.healthChecker.Get(beName, features.VersionFromServicePort(&p))
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand All @@ -255,7 +255,7 @@ func TestBackendPoolUpdateHTTPS(t *testing.T) {
}

// Assert the proper health check was created
hc, _ = pool.healthChecker.Get(beName, p.Version())
hc, _ = pool.healthChecker.Get(beName, features.VersionFromServicePort(&p))
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand All @@ -280,7 +280,7 @@ func TestBackendPoolUpdateHTTP2(t *testing.T) {
}

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(beName, p.Version())
hc, _ := pool.healthChecker.Get(beName, features.VersionFromServicePort(&p))
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand Down Expand Up @@ -599,13 +599,19 @@ func TestBackendPoolSyncQuota(t *testing.T) {
quota := len(tc.newPorts)

// Add hooks to simulate quota changes & errors.
(fakeGCE.Compute().(*cloud.MockGCE)).MockBackendServices.InsertHook = func(ctx context.Context, key *meta.Key, be *compute.BackendService, m *cloud.MockBackendServices) (bool, error) {
insertFunc := func(ctx context.Context, key *meta.Key, beName string) (bool, error) {
if bsCreated+1 > quota {
return true, &googleapi.Error{Code: http.StatusForbidden, Body: be.Name}
return true, &googleapi.Error{Code: http.StatusForbidden, Body: beName}
}
bsCreated += 1
return false, nil
}
(fakeGCE.Compute().(*cloud.MockGCE)).MockBackendServices.InsertHook = func(ctx context.Context, key *meta.Key, be *compute.BackendService, m *cloud.MockBackendServices) (bool, error) {
return insertFunc(ctx, key, be.Name)
}
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaBackendServices.InsertHook = func(ctx context.Context, key *meta.Key, be *computebeta.BackendService, m *cloud.MockBetaBackendServices) (bool, error) {
return insertFunc(ctx, key, be.Name)
}
(fakeGCE.Compute().(*cloud.MockGCE)).MockBackendServices.DeleteHook = func(ctx context.Context, key *meta.Key, m *cloud.MockBackendServices) (bool, error) {
bsCreated -= 1
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/backends/features/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ limitations under the License.

// This package contains the implementations of backend service
// features.
// TODO(mrhohn): Document what needs to happen when a feature is added.
// For features that requrie non-GA compute API, please make sure to
// update `versionToFeatures` and `featuresFromServicePort()` in
// features.go (upon both feature addition and promotion). It will make
// sure the controller interacts with compute service using the proper
// API version.
package features
7 changes: 6 additions & 1 deletion pkg/backends/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ const (
FeatureHTTP2 = "HTTP2"
// FeatureSecurityPolicy defines the feature name of SecurityPolicy.
FeatureSecurityPolicy = "SecurityPolicy"
// FeatureNEG defines the feature name of NEG.
FeatureNEG = "NEG"
)

var (
// versionToFeatures stores the mapping from the required API
// version to feature names.
versionToFeatures = map[meta.Version][]string{
meta.VersionAlpha: []string{FeatureHTTP2},
meta.VersionBeta: []string{FeatureSecurityPolicy},
meta.VersionBeta: []string{FeatureSecurityPolicy, FeatureNEG},
}
)

Expand All @@ -57,6 +59,9 @@ func featuresFromServicePort(sp *utils.ServicePort) []string {
if sp.BackendConfig != nil && sp.BackendConfig.Spec.SecurityPolicy != nil {
features = append(features, FeatureSecurityPolicy)
}
if sp.NEGEnabled {
features = append(features, FeatureNEG)
}
// Keep feature names sorted to be consistent.
sort.Strings(features)
return features
Expand Down
15 changes: 15 additions & 0 deletions pkg/backends/features/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ var (
},
},
}

svcPortWithNEG = utils.ServicePort{
ID: fakeSvcPortID,
NEGEnabled: true,
}
)

func TestFeaturesFromServicePort(t *testing.T) {
Expand All @@ -92,6 +97,11 @@ func TestFeaturesFromServicePort(t *testing.T) {
svcPort: svcPortWithSecurityPolicy,
expectedFeatures: []string{"SecurityPolicy"},
},
{
desc: "NEG",
svcPort: svcPortWithNEG,
expectedFeatures: []string{"NEG"},
},
{
desc: "HTTP2 + SecurityPolicy",
svcPort: svcPortWithHTTP2SecurityPolicy,
Expand Down Expand Up @@ -128,6 +138,11 @@ func TestVersionFromFeatures(t *testing.T) {
features: []string{FeatureSecurityPolicy},
expectedVersion: meta.VersionBeta,
},
{
desc: "NEG",
features: []string{FeatureNEG},
expectedVersion: meta.VersionBeta,
},
{
desc: "HTTP2 + SecurityPolicy",
features: []string{FeatureHTTP2, FeatureSecurityPolicy},
Expand Down
12 changes: 3 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,9 @@ type LoadBalancerController struct {
client kubernetes.Interface
ctx *context.ControllerContext

ingLister StoreToIngressLister
// endpoint lister is needed when translating service target port to real endpoint target ports.
endpointLister StoreToEndpointLister
nodeLister cache.Indexer
nodes *NodeController
ingLister StoreToIngressLister
nodeLister cache.Indexer
nodes *NodeController

// TODO: Watch secrets
CloudClusterManager *ClusterManager
Expand Down Expand Up @@ -112,10 +110,6 @@ func NewLoadBalancerController(
}
lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync)

if ctx.NEGEnabled {
lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer()
}

// Ingress event handlers.
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down
44 changes: 0 additions & 44 deletions pkg/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

"github.com/golang/glog"

compute "google.golang.org/api/compute/v1"

api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -134,46 +130,6 @@ IngressLoop:
return
}

// StoreToEndpointLister makes a Store that lists Endpoints.
type StoreToEndpointLister struct {
cache.Indexer
}

func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int {
// if targetPort is integer, no need to translate to endpoint ports
if i, err := strconv.Atoi(targetPort); err == nil {
return []int{i}
}

ep, exists, err := s.Indexer.Get(
&api_v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
},
)

if !exists {
glog.Errorf("Endpoint object %v/%v does not exist.", namespace, name)
return []int{}
}
if err != nil {
glog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err)
return []int{}
}

ret := []int{}
for _, subset := range ep.(*api_v1.Endpoints).Subsets {
for _, port := range subset.Ports {
if port.Protocol == api_v1.ProtocolTCP && port.Name == targetPort {
ret = append(ret, int(port.Port))
}
}
}
return ret
}

// setInstanceGroupsAnnotation sets the instance-groups annotation with names of the given instance groups.
func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) error {
type Value struct {
Expand Down
25 changes: 6 additions & 19 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ import (
"k8s.io/ingress-gce/pkg/utils"
)

const (
// For each service, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
)

func init() {
// register prometheus metrics
registerMetrics()
Expand Down Expand Up @@ -336,21 +330,14 @@ func (c *Controller) handleErr(err error, key interface{}) {
return
}

glog.Errorf("Error processing service %q: %v", key, err)
if c.serviceQueue.NumRequeues(key) < maxRetries {
c.serviceQueue.AddRateLimited(key)
return
}

defer c.serviceQueue.Forget(key)
service, exists, err := c.serviceLister.GetByKey(key.(string))
if err != nil {
msg := fmt.Sprintf("error processing service %q: %v", key, err)
glog.Errorf(msg)
if service, exists, err := c.serviceLister.GetByKey(key.(string)); err != nil {
glog.Warning("Failed to retrieve service %q from store: %v", key.(string), err)
return
}
if exists {
c.recorder.Eventf(service.(*apiv1.Service), apiv1.EventTypeWarning, "ProcessServiceFailed", "Service %q dropped from queue (requeued %v times)", key, c.serviceQueue.NumRequeues(key))
} else if exists {
c.recorder.Eventf(service.(*apiv1.Service), apiv1.EventTypeWarning, "ProcessServiceFailed", msg)
}
c.serviceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueService(obj interface{}) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/neg/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ import (

const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second
// For each NEG, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
)

// servicePort includes information to uniquely identify a NEG
Expand Down Expand Up @@ -435,13 +438,13 @@ func (s *syncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.Netwo

func (s *syncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Attaching %d endpoint(s) for %s into NEG %s in %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.AttachNetworkEndpoints, "Attach")
}

func (s *syncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Detaching %d endpoint(s) for %s into NEG %s in %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.DetachNetworkEndpoints, "Detach")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NEGServicePorts(ann annotations.NegAnnotation, knownPorts PortNameMap) (Por
for port, _ := range ann.ExposedPorts {
// TODO: also validate ServicePorts in the exposed NEG annotation via webhook
if _, ok := knownPorts[port]; !ok {
errList = append(errList, fmt.Errorf("ServicePort %v doesn't exist on Service", port))
errList = append(errList, fmt.Errorf("port %v specified in %q doesn't exist in the service", port, annotations.NEGAnnotationKey))
}
portSet[port] = knownPorts[port]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestNEGServicePorts(t *testing.T) {
desc: "NEG annotation references port that Service does not have",
annotation: `{"exposed_ports":{"3000":{}}}`,
expectedErr: utilerrors.NewAggregate([]error{
fmt.Errorf("ServicePort %v doesn't exist on Service", 3000),
fmt.Errorf("port %v specified in %q doesn't exist in the service", 3000, annotations.NEGAnnotationKey),
}),
knownPortMap: PortNameMap{80: "some_port", 443: "another_port"},
expectedPortMap: PortNameMap{3000: ""},
Expand Down
15 changes: 0 additions & 15 deletions pkg/utils/serviceport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"

"k8s.io/ingress-gce/pkg/annotations"
backendconfigv1beta1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1beta1"
Expand Down Expand Up @@ -54,20 +53,6 @@ func (sp ServicePort) GetDescription() Description {
}
}

// Version returns the support version of the ServicePort configuration
// Alpha includes HTTP2
// Beta includes NEG
// The rest are v1
func (sp ServicePort) Version() meta.Version {
if sp.Protocol == annotations.ProtocolHTTP2 {
return meta.VersionAlpha
}
if sp.NEGEnabled {
return meta.VersionBeta
}
return meta.VersionGA
}

// BackendName returns the name of the backend which would be used for this ServicePort.
func (sp ServicePort) BackendName(namer *Namer) string {
if !sp.NEGEnabled {
Expand Down

0 comments on commit bf3b6fd

Please sign in to comment.