From 1f5c2a64e418a870f58e922e836cc7ae76372cc4 Mon Sep 17 00:00:00 2001 From: Shwetha K Date: Mon, 30 May 2022 20:23:03 +0530 Subject: [PATCH] handled the context and goroutine internally --- examples/watch_resources/README.md | 2 +- examples/watch_resources/watch_test.go | 12 +++--- klient/k8s/watcher/watch.go | 54 ++++++++++++++++---------- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/examples/watch_resources/README.md b/examples/watch_resources/README.md index 311b8843..a664a250 100644 --- a/examples/watch_resources/README.md +++ b/examples/watch_resources/README.md @@ -14,7 +14,7 @@ dep := appsv1.Deployment{ } // watch for the deployment and triger action based on the event recieved. - go cl.Resources().Watch(&appsv1.DeploymentList{}, &client.ListOptions{ + cl.Resources().Watch(&appsv1.DeploymentList{}, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name), Namespace: dep.Namespace}, cl.RESTConfig()).WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx) ... diff --git a/examples/watch_resources/watch_test.go b/examples/watch_resources/watch_test.go index beb26f36..c868887c 100644 --- a/examples/watch_resources/watch_test.go +++ b/examples/watch_resources/watch_test.go @@ -44,7 +44,7 @@ func TestWatchForResources(t *testing.T) { } // watch for the deployment and triger action based on the event recieved. - go cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))). + cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))). WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx) return ctx @@ -96,12 +96,10 @@ func TestWatchForResourcesWithStop(t *testing.T) { w = cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))). WithAddFunc(onAdd).WithDeleteFunc(onDelete) - go func() { - err := w.Start(ctx) - if err != nil { - t.Error(err) - } - }() + err = w.Start(ctx) + if err != nil { + t.Error(err) + } return ctx }). diff --git a/klient/k8s/watcher/watch.go b/klient/k8s/watcher/watch.go index c498b047..96af993e 100644 --- a/klient/k8s/watcher/watch.go +++ b/klient/k8s/watcher/watch.go @@ -49,6 +49,11 @@ type EventHandler interface { // Start triggers the registered methods based on the event received for // particular k8s resources func (e *EventHandlerFuncs) Start(ctx context.Context) error { + // check if context is valid and that it has not been cancelled. + if ctx.Err() != nil { + return ctx.Err() + } + cl, err := cr.NewWithWatch(e.Cfg, cr.Options{}) if err != nil { return err @@ -62,28 +67,37 @@ func (e *EventHandlerFuncs) Start(ctx context.Context) error { // set watcher object e.watcher = w - for event := range e.watcher.ResultChan() { - // retrieve the event type - eventType := event.Type - - switch eventType { - case watch.Added: - // calls AddFunc if it's not nil. - if e.addFunc != nil { - e.addFunc(event.Object) - } - case watch.Modified: - // calls UpdateFunc if it's not nil. - if e.updateFunc != nil { - e.updateFunc(event.Object) - } - case watch.Deleted: - // calls DeleteFunc if it's not nil. - if e.deleteFunc != nil { - e.deleteFunc(event.Object) + go func() { + for { + select { + case <-ctx.Done(): + if ctx.Err() != nil { + return + } + case event := <-e.watcher.ResultChan(): + // retrieve the event type + eventType := event.Type + + switch eventType { + case watch.Added: + // calls AddFunc if it's not nil. + if e.addFunc != nil { + e.addFunc(event.Object) + } + case watch.Modified: + // calls UpdateFunc if it's not nil. + if e.updateFunc != nil { + e.updateFunc(event.Object) + } + case watch.Deleted: + // calls DeleteFunc if it's not nil. + if e.deleteFunc != nil { + e.deleteFunc(event.Object) + } + } } } - } + }() return nil }