From 6b332acadab1335364d8a2f66d9295d8c3a9ca46 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Mon, 2 Oct 2023 11:11:13 +0200 Subject: [PATCH] dataclients/kubernetes: improve logging (#2637) * log number of received routegroups * log distinct messages once per ingress/routegroup * log number or all routes loaded and mapped Signed-off-by: Alexander Yastrebov --- dataclients/kubernetes/clusterclient.go | 19 +-- dataclients/kubernetes/clusterclient_test.go | 63 ++++++++++ dataclients/kubernetes/ingressv1.go | 8 +- dataclients/kubernetes/kube.go | 4 +- dataclients/kubernetes/logger.go | 40 +++++- dataclients/kubernetes/logger_test.go | 124 ++++++++++--------- dataclients/kubernetes/routegroup.go | 6 +- 7 files changed, 182 insertions(+), 82 deletions(-) diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index d85fa9977c..e15103e8d6 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -362,11 +362,13 @@ func (c *clusterClient) loadIngressesV1() ([]*definitions.IngressV1Item, error) log.Debugf("requesting all ingresses failed: %v", err) return nil, err } - log.Debugf("all ingresses received: %d", len(il.Items)) + fItems := c.filterIngressesV1ByClass(il.Items) log.Debugf("filtered ingresses by ingress class: %d", len(fItems)) + sortByMetadata(fItems, func(i int) *definitions.Metadata { return fItems[i].Metadata }) + return fItems, nil } @@ -375,6 +377,7 @@ func (c *clusterClient) LoadRouteGroups() ([]*definitions.RouteGroupItem, error) if err := c.getJSON(c.routeGroupsURI+c.routeGroupsLabelSelectors, &rgl); err != nil { return nil, err } + log.Debugf("all routegroups received: %d", len(rgl.Items)) rgs := make([]*definitions.RouteGroupItem, 0, len(rgl.Items)) for _, i := range rgl.Items { @@ -396,19 +399,21 @@ func (c *clusterClient) LoadRouteGroups() ([]*definitions.RouteGroupItem, error) rgs = append(rgs, i) } + log.Debugf("filtered valid routegroups by routegroups class: %d", len(rgs)) + sortByMetadata(rgs, func(i int) *definitions.Metadata { return rgs[i].Metadata }) + return rgs, nil } func (c *clusterClient) loadServices() (map[definitions.ResourceID]*service, error) { var services serviceList - if err := c.getJSON(c.servicesURI+c.servicesLabelSelectors, &services); err != nil { log.Debugf("requesting all services failed: %v", err) return nil, err } - log.Debugf("all services received: %d", len(services.Items)) + result := make(map[definitions.ResourceID]*service) var hasInvalidService bool for _, service := range services.Items { @@ -429,13 +434,12 @@ func (c *clusterClient) loadServices() (map[definitions.ResourceID]*service, err func (c *clusterClient) loadSecrets() (map[definitions.ResourceID]*secret, error) { var secrets secretList - if err := c.getJSON(c.secretsURI+c.secretsLabelSelectors, &secrets); err != nil { log.Debugf("requesting all secrets failed: %v", err) return nil, err } - log.Debugf("all secrets received: %d", len(secrets.Items)) + result := make(map[definitions.ResourceID]*secret) for _, secret := range secrets.Items { if secret == nil || secret.Metadata == nil { @@ -454,8 +458,8 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e log.Debugf("requesting all endpoints failed: %v", err) return nil, err } - log.Debugf("all endpoints received: %d", len(endpoints.Items)) + result := make(map[definitions.ResourceID]*endpoint) for _, endpoint := range endpoints.Items { resID := endpoint.Meta.ToResourceID() @@ -476,13 +480,12 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e // given service, check endpointSlice.ToResourceID(). func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skipperEndpointSlice, error) { var endpointSlices endpointSliceList - if err := c.getJSON(c.endpointSlicesURI+c.endpointSlicesLabelSelectors, &endpointSlices); err != nil { log.Debugf("requesting all endpointslices failed: %v", err) return nil, err } - log.Debugf("all endpointslices received: %d", len(endpointSlices.Items)) + mapSlices := make(map[definitions.ResourceID][]*endpointSlice) for _, endpointSlice := range endpointSlices.Items { resID := endpointSlice.ToResourceID() // service resource ID diff --git a/dataclients/kubernetes/clusterclient_test.go b/dataclients/kubernetes/clusterclient_test.go index af63f7a236..5d626b7412 100644 --- a/dataclients/kubernetes/clusterclient_test.go +++ b/dataclients/kubernetes/clusterclient_test.go @@ -6,8 +6,11 @@ import ( "os" "strings" "testing" + "time" log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" @@ -251,3 +254,63 @@ spec: }) } } + +func TestLoggingInterval(t *testing.T) { + // TODO: with validation changes we need to update/refactor this test + manifest, err := os.Open("testdata/routegroups/convert/missing-service.yaml") + require.NoError(t, err) + defer manifest.Close() + + var out bytes.Buffer + log.SetOutput(&out) + defer log.SetOutput(os.Stderr) + + countMessages := func() int { + return strings.Count(out.String(), "Error transforming external hosts") + } + + a, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, manifest) + require.NoError(t, err) + + s := httptest.NewServer(a) + defer s.Close() + + c, err := kubernetes.New(kubernetes.Options{KubernetesURL: s.URL}) + require.NoError(t, err) + defer c.Close() + + const loggingInterval = 100 * time.Millisecond + c.SetLoggingInterval(loggingInterval) + + _, err = c.LoadAll() + require.NoError(t, err) + + assert.Equal(t, 1, countMessages(), "one message expected after initial load") + + const ( + n = 2 + updateDuration = time.Duration(n)*loggingInterval + loggingInterval/2 + ) + + start := time.Now() + for time.Since(start) < updateDuration { + _, _, err := c.LoadUpdate() + require.NoError(t, err) + + time.Sleep(loggingInterval / 10) + } + + assert.Equal(t, 1+n, countMessages(), "%d additional messages expected", n) + + oldLevel := log.GetLevel() + defer log.SetLevel(oldLevel) + + log.SetLevel(log.DebugLevel) + + for i := 1; i <= 10; i++ { + _, _, err := c.LoadUpdate() + require.NoError(t, err) + + assert.Equal(t, 1+n+i, countMessages(), "a new message expected for each subsequent update when log level is debug") + } +} diff --git a/dataclients/kubernetes/ingressv1.go b/dataclients/kubernetes/ingressv1.go index 4bfab6c9e1..4e76f0163c 100644 --- a/dataclients/kubernetes/ingressv1.go +++ b/dataclients/kubernetes/ingressv1.go @@ -122,8 +122,8 @@ func convertPathRuleV1( ic.logger.Debugf("Found %d endpoints for %s, %s", len(eps), svcName, servicePort) } if len(eps) == 0 { - // add shunt route https://github.com/zalando/skipper/issues/1525 - ic.logger.Debugf("Adding shuntroute to return 502 for service %s with %d endpoints", svcName, len(eps)) + ic.logger.Debugf("Target endpoints not found, shuntroute for %s:%s", svcName, svcPort) + r := &eskip.Route{ Id: routeID(ns, name, host, prule.Path, svcName), HostRegexps: hostRegexp, @@ -364,8 +364,8 @@ func (ing *ingress) convertDefaultBackendV1( } if len(eps) == 0 { - // add shunt route https://github.com/zalando/skipper/issues/1525 - ic.logger.Debugf("Adding shuntroute to return 502 for service %s with %d endpoints", svcName, len(eps)) + ic.logger.Debugf("Target endpoints not found, shuntroute for %s:%s", svcName, svcPort) + r := &eskip.Route{ Id: routeID(ns, name, "", "", ""), } diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 4c4f8ab9b5..536838b5a0 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -437,6 +437,8 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) { return r, nil } +// shuntRoute creates a route that returns a 502 status code when there are no endpoints found, +// see https://github.com/zalando/skipper/issues/1525 func shuntRoute(r *eskip.Route) { r.Filters = []*eskip.Filter{ { @@ -499,7 +501,7 @@ func (c *Client) LoadAll() ([]*eskip.Route, error) { c.current, r = mapRoutes(r) - log.Debugf("all routes loaded and mapped") + log.Debugf("all routes loaded and mapped: %d", len(r)) return r, nil } diff --git a/dataclients/kubernetes/logger.go b/dataclients/kubernetes/logger.go index 1441ca0e21..920354772f 100644 --- a/dataclients/kubernetes/logger.go +++ b/dataclients/kubernetes/logger.go @@ -1,32 +1,62 @@ package kubernetes -import log "github.com/sirupsen/logrus" +import ( + "fmt" + "sync" + + log "github.com/sirupsen/logrus" +) type logger struct { logger *log.Entry + + mu sync.Mutex + history map[string]struct{} } +// newLogger creates a logger that logs each unique message once +// for the resource identified by kind, namespace and name. +// It logs nothing when disabled. func newLogger(kind, namespace, name string, enabled bool) *logger { if !enabled { return nil } - return &logger{log.WithFields(log.Fields{"kind": kind, "ns": namespace, "name": name})} + return &logger{logger: log.WithFields(log.Fields{"kind": kind, "ns": namespace, "name": name})} } func (l *logger) Debugf(format string, args ...any) { if l != nil { - l.logger.Debugf(format, args...) + l.once(log.DebugLevel, format, args...) } } func (l *logger) Infof(format string, args ...any) { if l != nil { - l.logger.Infof(format, args...) + l.once(log.InfoLevel, format, args...) } } func (l *logger) Errorf(format string, args ...any) { if l != nil { - l.logger.Errorf(format, args...) + l.once(log.ErrorLevel, format, args...) + } +} + +func (l *logger) once(level log.Level, format string, args ...any) { + if !l.logger.Logger.IsLevelEnabled(level) { + return + } + l.mu.Lock() + defer l.mu.Unlock() + + if l.history == nil { + l.history = make(map[string]struct{}) + } + + msg := fmt.Sprintf(format, args...) + key := fmt.Sprintf("%s %s", level, msg) + if _, ok := l.history[key]; !ok { + l.logger.Log(level, msg) + l.history[key] = struct{}{} } } diff --git a/dataclients/kubernetes/logger_test.go b/dataclients/kubernetes/logger_test.go index c67b5fb25b..3849e8711b 100644 --- a/dataclients/kubernetes/logger_test.go +++ b/dataclients/kubernetes/logger_test.go @@ -1,76 +1,82 @@ -package kubernetes_test +package kubernetes import ( "bytes" - "net/http/httptest" - "os" "strings" "testing" - "time" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/zalando/skipper/dataclients/kubernetes" - "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" ) -func TestLogger(t *testing.T) { - // TODO: with validation changes we need to update/refactor this test - manifest, err := os.Open("testdata/routegroups/convert/missing-service.yaml") - require.NoError(t, err) - defer manifest.Close() - +func captureLog(t *testing.T, level log.Level) *bytes.Buffer { + oldOut := log.StandardLogger().Out var out bytes.Buffer log.SetOutput(&out) - defer log.SetOutput(os.Stderr) - - countMessages := func() int { - return strings.Count(out.String(), "Error transforming external hosts") - } - - a, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, manifest) - require.NoError(t, err) - - s := httptest.NewServer(a) - defer s.Close() - - c, err := kubernetes.New(kubernetes.Options{KubernetesURL: s.URL}) - require.NoError(t, err) - defer c.Close() - - const loggingInterval = 100 * time.Millisecond - c.SetLoggingInterval(loggingInterval) - - _, err = c.LoadAll() - require.NoError(t, err) - - assert.Equal(t, 1, countMessages(), "one message expected after initial load") - - const ( - n = 2 - updateDuration = time.Duration(n)*loggingInterval + loggingInterval/2 - ) - - start := time.Now() - for time.Since(start) < updateDuration { - _, _, err := c.LoadUpdate() - require.NoError(t, err) - - time.Sleep(loggingInterval / 10) - } - - assert.Equal(t, 1+n, countMessages(), "%d additional messages expected", n) oldLevel := log.GetLevel() - defer log.SetLevel(oldLevel) - - log.SetLevel(log.DebugLevel) + log.SetLevel(level) - for i := 1; i <= 10; i++ { - _, _, err := c.LoadUpdate() - require.NoError(t, err) + t.Cleanup(func() { + log.SetOutput(oldOut) + log.SetLevel(oldLevel) + }) + return &out +} - assert.Equal(t, 1+n+i, countMessages(), "a new message expected for each subsequent update when log level is debug") - } +func TestLogger(t *testing.T) { + t.Run("test disabled", func(t *testing.T) { + out := captureLog(t, log.DebugLevel) + + l := newLogger("ingress", "foo", "bar", false) + l.Debugf("test message") + l.Infof("test message") + l.Errorf("test message") + + assert.Equal(t, 0, strings.Count(out.String(), "test message")) + }) + + t.Run("test level", func(t *testing.T) { + out := captureLog(t, log.ErrorLevel) + + l := newLogger("ingress", "foo", "bar", true) + l.Debugf("test message") + l.Infof("test message") + l.Errorf("test message") + + assert.Equal(t, 0, strings.Count(out.String(), `level=debug`)) + assert.Equal(t, 0, strings.Count(out.String(), `level=info`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=error`)) + }) + + t.Run("test logs once per level", func(t *testing.T) { + out := captureLog(t, log.DebugLevel) + + l := newLogger("ingress", "foo", "bar", true) + + msg1 := "test message1 %d %s" + args1 := []any{1, "qux"} + for i := 0; i < 3; i++ { + l.Debugf(msg1, args1...) + l.Infof(msg1, args1...) + l.Errorf(msg1, args1...) + } + + msg2 := "test message2 %d %s" + args2 := []any{2, "quux"} + for i := 0; i < 3; i++ { + l.Debugf(msg2, args2...) + l.Infof(msg2, args2...) + l.Errorf(msg2, args2...) + } + + t.Logf("log output: %s", out.String()) + + assert.Equal(t, 1, strings.Count(out.String(), `level=debug msg="test message1 1 qux" kind=ingress name=bar ns=foo`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=info msg="test message1 1 qux" kind=ingress name=bar ns=foo`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=error msg="test message1 1 qux" kind=ingress name=bar ns=foo`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=debug msg="test message2 2 quux" kind=ingress name=bar ns=foo`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=info msg="test message2 2 quux" kind=ingress name=bar ns=foo`)) + assert.Equal(t, 1, strings.Count(out.String(), `level=error msg="test message2 2 quux" kind=ingress name=bar ns=foo`)) + }) } diff --git a/dataclients/kubernetes/routegroup.go b/dataclients/kubernetes/routegroup.go index eb8150745c..d4df8e306d 100644 --- a/dataclients/kubernetes/routegroup.go +++ b/dataclients/kubernetes/routegroup.go @@ -193,11 +193,7 @@ func applyServiceBackend(ctx *routeGroupContext, backend *definitions.SkipperBac ) if len(eps) == 0 { - ctx.logger.Debugf( - "Target endpoints not found, shuntroute for %s:%d", - backend.ServiceName, - backend.ServicePort, - ) + ctx.logger.Debugf("Target endpoints not found, shuntroute for %s:%d", backend.ServiceName, backend.ServicePort) shuntRoute(r) return nil