Skip to content

Commit

Permalink
Merge pull request #25 from crassirostris/fix-event-expoter
Browse files Browse the repository at this point in the history
[event-exporter] Treat update with empty previous object as non-error
  • Loading branch information
Mik Vyatskov authored Aug 11, 2017
2 parents ed189f5 + 73a29f0 commit 2a7cf30
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 95 deletions.
2 changes: 1 addition & 1 deletion event-exporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BINARY_NAME = event-exporter

PREFIX = gcr.io/google-containers
IMAGE_NAME = event-exporter
TAG = v0.1.4
TAG = v0.1.5

build:
${ENVVAR} godep go build -a -o ${BINARY_NAME}
Expand Down
12 changes: 9 additions & 3 deletions event-exporter/sinks/stackdriver/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ func (s *sdSink) OnAdd(event *api_v1.Event) {
}

func (s *sdSink) OnUpdate(oldEvent *api_v1.Event, newEvent *api_v1.Event) {
if newEvent.Count != oldEvent.Count+1 {
var oldCount int32
if oldEvent != nil {
oldCount = oldEvent.Count
}

if newEvent.Count != oldCount+1 {
// Sink doesn't send a LogEntry to Stackdriver, b/c event compression might
// indicate that part of the watch history was lost, which may result in
// multiple events being compressed. This may create an unecessary
// flood in Stackdriver.
// flood in Stackdriver. Also this is a perfectly valid behavior for the
// configuration with empty backing storage.
glog.V(2).Infof("Event count has increased by %d != 1.\n"+
"\tOld event: %+v\n\tNew event: %+v", newEvent.Count-oldEvent.Count, oldEvent, newEvent)
"\tOld event: %+v\n\tNew event: %+v", newEvent.Count-oldCount, oldEvent, newEvent)
}

receivedEntryCount.WithLabelValues(newEvent.Source.Component, newEvent.Source.Host).Inc()
Expand Down
214 changes: 125 additions & 89 deletions event-exporter/sinks/stackdriver/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,135 +38,171 @@ func (w *fakeSdWriter) Write(entries []*sd.LogEntry, logName string, resource *s
return 0
}

const (
defaultTestFlushDelay = 10 * time.Millisecond
defaultTestMaxConcurrency = 10
defaultTestMaxBufferSize = 10

bufferSizeParamName = "buffersize"
flushDelayParamName = "flushdelay"
blockingParamName = "blocking"
)

func TestMaxConcurrency(t *testing.T) {
done := make(chan struct{})
c, s, q, done := createSinkWith(map[string]interface{}{
blockingParamName: true,
})
defer close(done)
config := &sdSinkConfig{
Resource: nil,
FlushDelay: 100 * time.Millisecond,
LogName: "logname",
MaxConcurrency: 10,
MaxBufferSize: 10,
}
q := make(chan struct{}, config.MaxConcurrency+1)
w := &fakeSdWriter{
writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int {
q <- struct{}{}
<-done
return 0
},
}
s := newSdSink(w, clock.NewFakeClock(time.Time{}), config)
go s.Run(done)

for i := 0; i < config.MaxConcurrency*(config.MaxBufferSize+2); i++ {
for i := 0; i < c.MaxConcurrency*(c.MaxBufferSize+2); i++ {
s.OnAdd(&api_v1.Event{})
}

wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
return len(q) == config.MaxConcurrency, nil
})
if len(q) != config.MaxConcurrency {
t.Fatalf("Write called %d times, expected %d", len(q), config.MaxConcurrency)
want := c.MaxConcurrency
got := waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}
}

func TestBatchTimeout(t *testing.T) {
done := make(chan struct{})
_, s, q, done := createSink()
defer close(done)
config := &sdSinkConfig{
Resource: nil,
FlushDelay: 100 * time.Millisecond,
LogName: "logname",
MaxConcurrency: 10,
MaxBufferSize: 10,
}
q := make(chan struct{}, config.MaxConcurrency+1)
w := &fakeSdWriter{
writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int {
q <- struct{}{}
return 0
},
}
s := newSdSink(w, clock.NewFakeClock(time.Time{}), config)
go s.Run(done)

s.OnAdd(&api_v1.Event{})
wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
return len(q) == 1, nil
})

if len(q) != 1 {
t.Fatalf("Write called %d times, expected 1", len(q))
want := 1
got := waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}
}

func TestBatchSizeLimit(t *testing.T) {
done := make(chan struct{})
_, s, q, done := createSinkWith(map[string]interface{}{
flushDelayParamName: 1 * time.Hour,
})
defer close(done)
config := &sdSinkConfig{
Resource: nil,
FlushDelay: 1 * time.Minute,
LogName: "logname",
MaxConcurrency: 10,
MaxBufferSize: 10,
}
q := make(chan struct{}, config.MaxConcurrency+1)
w := &fakeSdWriter{
writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int {
q <- struct{}{}
return 0
},
}
s := newSdSink(w, clock.NewFakeClock(time.Time{}), config)
go s.Run(done)

for i := 0; i < 15; i++ {
s.OnAdd(&api_v1.Event{})
}

wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
return len(q) == 1, nil
})

if len(q) != 1 {
t.Fatalf("Write called %d times, expected 1", len(q))
want := 1
got := waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}
}

func TestInitialList(t *testing.T) {
done := make(chan struct{})
_, s, q, done := createSinkWith(map[string]interface{}{
bufferSizeParamName: 1,
})
defer close(done)
config := &sdSinkConfig{
Resource: nil,
FlushDelay: 100 * time.Millisecond,
LogName: "logname",
MaxConcurrency: 10,
MaxBufferSize: 1,

s.OnList(&api_v1.EventList{})

want := 1
got := waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}

s.OnList(&api_v1.EventList{})

got = waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}
}

func TestOnUpdate(t *testing.T) {
tcs := []struct {
desc string
old *api_v1.Event
new *api_v1.Event
wantEntry bool
}{
{
"old=nil,new=event",
nil,
&api_v1.Event{},
true,
},
{
"old=event,new=event",
&api_v1.Event{},
&api_v1.Event{},
true,
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
_, s, q, done := createSink()
defer close(done)

s.OnUpdate(tc.old, tc.new)

want := 1
if !tc.wantEntry {
want = 0
}
got := waitWritesCount(q, want)
if got != want {
t.Fatalf("Write called %d times, want %d", got, want)
}
})
}
q := make(chan struct{}, config.MaxConcurrency+1)
}

func createSink() (*sdSinkConfig, *sdSink, chan struct{}, chan struct{}) {
return createSinkWith(make(map[string]interface{}))
}

func createSinkWith(params map[string]interface{}) (c *sdSinkConfig, s *sdSink, q chan struct{}, done chan struct{}) {
done = make(chan struct{})
c = createConfig(params)
q = make(chan struct{}, 2*c.MaxConcurrency)
w := &fakeSdWriter{
writeFunc: func([]*sd.LogEntry, string, *sd.MonitoredResource) int {
q <- struct{}{}
if v, ok := params[blockingParamName]; ok && v.(bool) {
<-done
}
return 0
},
}
s := newSdSink(w, clock.NewFakeClock(time.Time{}), config)
s = newSdSink(w, clock.NewFakeClock(time.Time{}), c)
go s.Run(done)
return
}

s.OnList(&api_v1.EventList{})

wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
return len(q) == 1, nil
})
if len(q) != 1 {
t.Fatalf("Write called %d times, expected 1", len(q))
func createConfig(params map[string]interface{}) *sdSinkConfig {
c := &sdSinkConfig{
Resource: nil,
FlushDelay: defaultTestFlushDelay,
LogName: "logname",
MaxConcurrency: defaultTestMaxConcurrency,
MaxBufferSize: defaultTestMaxBufferSize,
}

s.OnList(&api_v1.EventList{})
if v, ok := params[bufferSizeParamName]; ok {
c.MaxBufferSize = v.(int)
}

time.Sleep(2 * config.FlushDelay)
if len(q) != 1 {
t.Fatalf("Write called %d times, expected 1", len(q))
if v, ok := params[flushDelayParamName]; ok {
c.FlushDelay = v.(time.Duration)
}

return c
}

func waitWritesCount(q chan struct{}, want int) int {
wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
return len(q) == want, nil
})
// Wait for some more time to ensure that the number is not greater.
time.Sleep(100 * time.Millisecond)
return len(q)
}
2 changes: 1 addition & 1 deletion event-exporter/watchers/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *eventHandlerWrapper) OnAdd(obj interface{}) {
func (c *eventHandlerWrapper) OnUpdate(oldObj interface{}, newObj interface{}) {
oldEvent, oldOk := c.convert(oldObj)
newEvent, newOk := c.convert(newObj)
if oldOk && newOk {
if newOk && (oldObj == nil || oldOk) {
c.handler.OnUpdate(oldEvent, newEvent)
}
}
Expand Down
2 changes: 1 addition & 1 deletion event-exporter/watchers/events/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestEventWatchHandlerUpdate(t *testing.T) {
"oldObj=nil,newObj=event",
nil,
&api_v1.Event{},
false,
true,
},
{
"oldObj=non-event,newObj=event",
Expand Down

0 comments on commit 2a7cf30

Please sign in to comment.