Skip to content

Commit

Permalink
Merge pull request #470 from sophieliu15/mem_1
Browse files Browse the repository at this point in the history
Improve memory usage of event-exporter
  • Loading branch information
osalau authored May 13, 2022
2 parents 0738e8d + f55d24f commit f1e933a
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 37 deletions.
2 changes: 1 addition & 1 deletion event-exporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ALL_ARCH=amd64 arm64
IMAGE_NAME = event-exporter

PREFIX ?= staging-k8s.gcr.io
TAG ?= v0.3.10
TAG ?= v0.4.0

IMAGE=$(PREFIX)/$(IMAGE_NAME)

Expand Down
20 changes: 2 additions & 18 deletions event-exporter/sinks/stackdriver/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,7 @@ func (s *sdSink) OnAdd(event *corev1.Event) {
s.logEntryChannel <- logEntry
}

func (s *sdSink) OnUpdate(oldEvent *corev1.Event, newEvent *corev1.Event) {
var oldCount int32
if oldEvent != nil {
oldCount = oldEvent.Count
}

if newEvent.Count != oldCount+1 {
// Sink doesn't send a LogEntry to Stackdriver, b/c event compression might
// indicate that part of the watch history was lost, which may result in
// multiple events being compressed. This may create an unecessary
// flood in Stackdriver. Also this is a perfectly valid behavior for the
// configuration with empty backing storage.
glog.V(2).Infof("Event count has increased by %d != 1.\n"+
"\tOld event: %+v\n\tNew event: %+v", newEvent.Count-oldCount, oldEvent, newEvent)
}

func (s *sdSink) OnUpdate(_ *corev1.Event, newEvent *corev1.Event) {
receivedEntryCount.Inc()

logEntry := s.logEntryFactory.FromEvent(newEvent)
Expand All @@ -101,10 +86,9 @@ func (s *sdSink) OnDelete(*corev1.Event) {

// OnList logs a message indicating that the Event Exporter starts upon
// receiving the first list of events.
func (s *sdSink) OnList(list *corev1.EventList) {
func (s *sdSink) OnList(*corev1.EventList) {
if s.beforeFirstList {
receivedEntryCount.Inc()

entry := s.logEntryFactory.FromMessage("Event exporter started watching. " +
"Some events may have been lost up to this point.")
s.writer.Write([]*sd.LogEntry{entry}, s.logName, s.sdResourceFactory.defaultResource)
Expand Down
3 changes: 3 additions & 0 deletions event-exporter/watchers/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) wa
// List and watch events in all namespaces.
ListerWatcher: &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
// Only return 1 item to help Reflector retrieve ResourceVersion to reestablish
// Watch.
options.Limit = 1
list, err := client.CoreV1().Events(meta_v1.NamespaceAll).List(context.TODO(), options)
if err == nil {
config.OnList(list)
Expand Down
19 changes: 1 addition & 18 deletions event-exporter/watchers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,16 @@ type watcherStore struct {
}

func (s *watcherStore) Add(obj interface{}) error {
if err := s.Store.Add(obj); err != nil {
return err
}
s.handler.OnAdd(obj)
return nil
}

func (s *watcherStore) Update(obj interface{}) error {
oldObj, ok, err := s.Store.Get(obj)
if err != nil {
return err
}
if !ok {
oldObj = nil
}

if err = s.Store.Update(obj); err != nil {
return err
}
s.handler.OnUpdate(oldObj, obj)
s.handler.OnUpdate(nil, obj)
return nil
}

func (s *watcherStore) Delete(obj interface{}) error {
if err := s.Store.Delete(obj); err != nil {
return err
}
s.handler.OnDelete(obj)
return nil
}
Expand Down

0 comments on commit f1e933a

Please sign in to comment.