From 73a29f06a24d8a3ffb3663a9d7fdf8116e93c1e6 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Fri, 11 Aug 2017 09:36:51 +0200 Subject: [PATCH] Treat update with empty previous object as non-error --- event-exporter/Makefile | 2 +- event-exporter/sinks/stackdriver/sink.go | 12 +- event-exporter/sinks/stackdriver/sink_test.go | 214 ++++++++++-------- event-exporter/watchers/events/handler.go | 2 +- .../watchers/events/handler_test.go | 2 +- 5 files changed, 137 insertions(+), 95 deletions(-) diff --git a/event-exporter/Makefile b/event-exporter/Makefile index cb136b263..df8ddd0ce 100644 --- a/event-exporter/Makefile +++ b/event-exporter/Makefile @@ -19,7 +19,7 @@ BINARY_NAME = event-exporter PREFIX = gcr.io/google-containers IMAGE_NAME = event-exporter -TAG = v0.1.4 +TAG = v0.1.5 build: ${ENVVAR} godep go build -a -o ${BINARY_NAME} diff --git a/event-exporter/sinks/stackdriver/sink.go b/event-exporter/sinks/stackdriver/sink.go index 61272eed8..179069c75 100644 --- a/event-exporter/sinks/stackdriver/sink.go +++ b/event-exporter/sinks/stackdriver/sink.go @@ -90,13 +90,19 @@ func (s *sdSink) OnAdd(event *api_v1.Event) { } func (s *sdSink) OnUpdate(oldEvent *api_v1.Event, newEvent *api_v1.Event) { - if newEvent.Count != oldEvent.Count+1 { + var oldCount int32 + if oldEvent != nil { + oldCount = oldEvent.Count + } + + if newEvent.Count != oldCount+1 { // Sink doesn't send a LogEntry to Stackdriver, b/c event compression might // indicate that part of the watch history was lost, which may result in // multiple events being compressed. This may create an unecessary - // flood in Stackdriver. + // flood in Stackdriver. Also this is a perfectly valid behavior for the + // configuration with empty backing storage. glog.V(2).Infof("Event count has increased by %d != 1.\n"+ - "\tOld event: %+v\n\tNew event: %+v", newEvent.Count-oldEvent.Count, oldEvent, newEvent) + "\tOld event: %+v\n\tNew event: %+v", newEvent.Count-oldCount, oldEvent, newEvent) } receivedEntryCount.WithLabelValues(newEvent.Source.Component, newEvent.Source.Host).Inc() diff --git a/event-exporter/sinks/stackdriver/sink_test.go b/event-exporter/sinks/stackdriver/sink_test.go index 48dfe6493..6345242dc 100644 --- a/event-exporter/sinks/stackdriver/sink_test.go +++ b/event-exporter/sinks/stackdriver/sink_test.go @@ -38,135 +38,171 @@ func (w *fakeSdWriter) Write(entries []*sd.LogEntry, logName string, resource *s return 0 } +const ( + defaultTestFlushDelay = 10 * time.Millisecond + defaultTestMaxConcurrency = 10 + defaultTestMaxBufferSize = 10 + + bufferSizeParamName = "buffersize" + flushDelayParamName = "flushdelay" + blockingParamName = "blocking" +) + func TestMaxConcurrency(t *testing.T) { - done := make(chan struct{}) + c, s, q, done := createSinkWith(map[string]interface{}{ + blockingParamName: true, + }) defer close(done) - config := &sdSinkConfig{ - Resource: nil, - FlushDelay: 100 * time.Millisecond, - LogName: "logname", - MaxConcurrency: 10, - MaxBufferSize: 10, - } - q := make(chan struct{}, config.MaxConcurrency+1) - w := &fakeSdWriter{ - writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int { - q <- struct{}{} - <-done - return 0 - }, - } - s := newSdSink(w, clock.NewFakeClock(time.Time{}), config) - go s.Run(done) - for i := 0; i < config.MaxConcurrency*(config.MaxBufferSize+2); i++ { + for i := 0; i < c.MaxConcurrency*(c.MaxBufferSize+2); i++ { s.OnAdd(&api_v1.Event{}) } - wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - return len(q) == config.MaxConcurrency, nil - }) - if len(q) != config.MaxConcurrency { - t.Fatalf("Write called %d times, expected %d", len(q), config.MaxConcurrency) + want := c.MaxConcurrency + got := waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) } } func TestBatchTimeout(t *testing.T) { - done := make(chan struct{}) + _, s, q, done := createSink() defer close(done) - config := &sdSinkConfig{ - Resource: nil, - FlushDelay: 100 * time.Millisecond, - LogName: "logname", - MaxConcurrency: 10, - MaxBufferSize: 10, - } - q := make(chan struct{}, config.MaxConcurrency+1) - w := &fakeSdWriter{ - writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int { - q <- struct{}{} - return 0 - }, - } - s := newSdSink(w, clock.NewFakeClock(time.Time{}), config) - go s.Run(done) s.OnAdd(&api_v1.Event{}) - wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - return len(q) == 1, nil - }) - if len(q) != 1 { - t.Fatalf("Write called %d times, expected 1", len(q)) + want := 1 + got := waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) } } func TestBatchSizeLimit(t *testing.T) { - done := make(chan struct{}) + _, s, q, done := createSinkWith(map[string]interface{}{ + flushDelayParamName: 1 * time.Hour, + }) defer close(done) - config := &sdSinkConfig{ - Resource: nil, - FlushDelay: 1 * time.Minute, - LogName: "logname", - MaxConcurrency: 10, - MaxBufferSize: 10, - } - q := make(chan struct{}, config.MaxConcurrency+1) - w := &fakeSdWriter{ - writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int { - q <- struct{}{} - return 0 - }, - } - s := newSdSink(w, clock.NewFakeClock(time.Time{}), config) - go s.Run(done) for i := 0; i < 15; i++ { s.OnAdd(&api_v1.Event{}) } - wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - return len(q) == 1, nil - }) - - if len(q) != 1 { - t.Fatalf("Write called %d times, expected 1", len(q)) + want := 1 + got := waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) } } func TestInitialList(t *testing.T) { - done := make(chan struct{}) + _, s, q, done := createSinkWith(map[string]interface{}{ + bufferSizeParamName: 1, + }) defer close(done) - config := &sdSinkConfig{ - Resource: nil, - FlushDelay: 100 * time.Millisecond, - LogName: "logname", - MaxConcurrency: 10, - MaxBufferSize: 1, + + s.OnList(&api_v1.EventList{}) + + want := 1 + got := waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) + } + + s.OnList(&api_v1.EventList{}) + + got = waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) + } +} + +func TestOnUpdate(t *testing.T) { + tcs := []struct { + desc string + old *api_v1.Event + new *api_v1.Event + wantEntry bool + }{ + { + "old=nil,new=event", + nil, + &api_v1.Event{}, + true, + }, + { + "old=event,new=event", + &api_v1.Event{}, + &api_v1.Event{}, + true, + }, + } + for _, tc := range tcs { + t.Run(tc.desc, func(t *testing.T) { + _, s, q, done := createSink() + defer close(done) + + s.OnUpdate(tc.old, tc.new) + + want := 1 + if !tc.wantEntry { + want = 0 + } + got := waitWritesCount(q, want) + if got != want { + t.Fatalf("Write called %d times, want %d", got, want) + } + }) } - q := make(chan struct{}, config.MaxConcurrency+1) +} + +func createSink() (*sdSinkConfig, *sdSink, chan struct{}, chan struct{}) { + return createSinkWith(make(map[string]interface{})) +} + +func createSinkWith(params map[string]interface{}) (c *sdSinkConfig, s *sdSink, q chan struct{}, done chan struct{}) { + done = make(chan struct{}) + c = createConfig(params) + q = make(chan struct{}, 2*c.MaxConcurrency) w := &fakeSdWriter{ writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int { q <- struct{}{} + if v, ok := params[blockingParamName]; ok && v.(bool) { + <-done + } return 0 }, } - s := newSdSink(w, clock.NewFakeClock(time.Time{}), config) + s = newSdSink(w, clock.NewFakeClock(time.Time{}), c) go s.Run(done) + return +} - s.OnList(&api_v1.EventList{}) - - wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - return len(q) == 1, nil - }) - if len(q) != 1 { - t.Fatalf("Write called %d times, expected 1", len(q)) +func createConfig(params map[string]interface{}) *sdSinkConfig { + c := &sdSinkConfig{ + Resource: nil, + FlushDelay: defaultTestFlushDelay, + LogName: "logname", + MaxConcurrency: defaultTestMaxConcurrency, + MaxBufferSize: defaultTestMaxBufferSize, } - s.OnList(&api_v1.EventList{}) + if v, ok := params[bufferSizeParamName]; ok { + c.MaxBufferSize = v.(int) + } - time.Sleep(2 * config.FlushDelay) - if len(q) != 1 { - t.Fatalf("Write called %d times, expected 1", len(q)) + if v, ok := params[flushDelayParamName]; ok { + c.FlushDelay = v.(time.Duration) } + + return c +} + +func waitWritesCount(q chan struct{}, want int) int { + wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(q) == want, nil + }) + // Wait for some more time to ensure that the number is not greater. + time.Sleep(100 * time.Millisecond) + return len(q) } diff --git a/event-exporter/watchers/events/handler.go b/event-exporter/watchers/events/handler.go index 34b44277c..700e1cab8 100644 --- a/event-exporter/watchers/events/handler.go +++ b/event-exporter/watchers/events/handler.go @@ -50,7 +50,7 @@ func (c *eventHandlerWrapper) OnAdd(obj interface{}) { func (c *eventHandlerWrapper) OnUpdate(oldObj interface{}, newObj interface{}) { oldEvent, oldOk := c.convert(oldObj) newEvent, newOk := c.convert(newObj) - if oldOk && newOk { + if newOk && (oldObj == nil || oldOk) { c.handler.OnUpdate(oldEvent, newEvent) } } diff --git a/event-exporter/watchers/events/handler_test.go b/event-exporter/watchers/events/handler_test.go index 6edb597b6..ed71d6edc 100644 --- a/event-exporter/watchers/events/handler_test.go +++ b/event-exporter/watchers/events/handler_test.go @@ -97,7 +97,7 @@ func TestEventWatchHandlerUpdate(t *testing.T) { "oldObj=nil,newObj=event", nil, &api_v1.Event{}, - false, + true, }, { "oldObj=non-event,newObj=event",