diff --git a/adapters/incluster/v1/adapter.go b/adapters/incluster/v1/adapter.go index 8312dd7..455b35a 100644 --- a/adapters/incluster/v1/adapter.go +++ b/adapters/incluster/v1/adapter.go @@ -120,7 +120,7 @@ func (a *Adapter) Start(ctx context.Context) error { go func() { if err := backoff.RetryNotify(func() error { return client.Start(ctx) - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(true), func(err error, d time.Duration) { logger.L().Ctx(ctx).Warning("start client", helpers.Error(err), helpers.String("resource", client.res.Resource), helpers.String("retry in", d.String())) diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index 0f5aaa8..b66906a 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -119,7 +119,7 @@ func (c *Client) Start(ctx context.Context) error { var err error watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx) return err - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(true), func(err error, d time.Duration) { logger.L().Ctx(ctx).Warning("get existing storage objects", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("retry in", d.String())) @@ -227,7 +227,7 @@ func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, e } eventQueue.Enqueue(event) } - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(true), func(err error, d time.Duration) { if !errors.Is(err, errWatchClosed) { logger.L().Ctx(ctx).Warning("watch", helpers.Error(err), helpers.String("resource", c.res.Resource), diff --git a/cmd/client/main.go b/cmd/client/main.go index ed6de94..1de9da5 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -130,7 +130,7 @@ func main() { return backoff.Permanent(fmt.Errorf("server rejected our client version <%s>, please update", version)) } return err - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(false), func(err error, d time.Duration) { logger.L().Ctx(ctx).Warning("connection error", helpers.Error(err), helpers.String("retry in", d.String())) }); err != nil { diff --git a/core/synchronizer.go b/core/synchronizer.go index ad3a695..e8725b0 100644 --- a/core/synchronizer.go +++ b/core/synchronizer.go @@ -97,7 +97,7 @@ func (s *Synchronizer) sendData(ctx context.Context, data []byte) { } } return nil - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(true), func(err error, d time.Duration) { logger.L().Ctx(ctx).Warning("send data", helpers.Error(err), helpers.String("retry in", d.String())) }); err != nil { @@ -463,7 +463,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { return fmt.Errorf("invoke inPool: %w", err) } return nil - }, utils.NewBackOff(), func(err error, d time.Duration) { + }, utils.NewBackOff(true), func(err error, d time.Duration) { logger.L().Ctx(ctx).Warning("process incoming messages", helpers.Error(err), helpers.String("retry in", d.String())) }); err != nil { return fmt.Errorf("giving up process incoming messages: %w", err) diff --git a/utils/utils.go b/utils/utils.go index ac1a9a0..3897003 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -241,10 +241,12 @@ func StringValueBigger(s1, s2 string) bool { return i1 > i2 } -func NewBackOff() backoff.BackOff { +func NewBackOff(neverStop bool) backoff.BackOff { b := backoff.NewExponentialBackOff() // never stop retrying (unless PermanentError is returned) - b.MaxElapsedTime = 0 + if neverStop { + b.MaxElapsedTime = 0 + } return b }