Skip to content

Commit

Permalink
handled the context and goroutine internally
Browse files Browse the repository at this point in the history
  • Loading branch information
ShwethaKumbla committed May 30, 2022
1 parent 8339ef1 commit 1f5c2a6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
2 changes: 1 addition & 1 deletion examples/watch_resources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dep := appsv1.Deployment{
}

// watch for the deployment and triger action based on the event recieved.
go cl.Resources().Watch(&appsv1.DeploymentList{}, &client.ListOptions{
cl.Resources().Watch(&appsv1.DeploymentList{}, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name),
Namespace: dep.Namespace}, cl.RESTConfig()).WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx)
...
Expand Down
12 changes: 5 additions & 7 deletions examples/watch_resources/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestWatchForResources(t *testing.T) {
}

// watch for the deployment and triger action based on the event recieved.
go cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))).
cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))).
WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx)

return ctx
Expand Down Expand Up @@ -96,12 +96,10 @@ func TestWatchForResourcesWithStop(t *testing.T) {
w = cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))).
WithAddFunc(onAdd).WithDeleteFunc(onDelete)

go func() {
err := w.Start(ctx)
if err != nil {
t.Error(err)
}
}()
err = w.Start(ctx)
if err != nil {
t.Error(err)
}

return ctx
}).
Expand Down
54 changes: 34 additions & 20 deletions klient/k8s/watcher/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type EventHandler interface {
// Start triggers the registered methods based on the event received for
// particular k8s resources
func (e *EventHandlerFuncs) Start(ctx context.Context) error {
// check if context is valid and that it has not been cancelled.
if ctx.Err() != nil {
return ctx.Err()
}

cl, err := cr.NewWithWatch(e.Cfg, cr.Options{})
if err != nil {
return err
Expand All @@ -62,28 +67,37 @@ func (e *EventHandlerFuncs) Start(ctx context.Context) error {
// set watcher object
e.watcher = w

for event := range e.watcher.ResultChan() {
// retrieve the event type
eventType := event.Type

switch eventType {
case watch.Added:
// calls AddFunc if it's not nil.
if e.addFunc != nil {
e.addFunc(event.Object)
}
case watch.Modified:
// calls UpdateFunc if it's not nil.
if e.updateFunc != nil {
e.updateFunc(event.Object)
}
case watch.Deleted:
// calls DeleteFunc if it's not nil.
if e.deleteFunc != nil {
e.deleteFunc(event.Object)
go func() {
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
return
}
case event := <-e.watcher.ResultChan():
// retrieve the event type
eventType := event.Type

switch eventType {
case watch.Added:
// calls AddFunc if it's not nil.
if e.addFunc != nil {
e.addFunc(event.Object)
}
case watch.Modified:
// calls UpdateFunc if it's not nil.
if e.updateFunc != nil {
e.updateFunc(event.Object)
}
case watch.Deleted:
// calls DeleteFunc if it's not nil.
if e.deleteFunc != nil {
e.deleteFunc(event.Object)
}
}
}
}
}
}()

return nil
}
Expand Down

0 comments on commit 1f5c2a6

Please sign in to comment.