diff --git a/pkg/search/proxy/store/multi_cluster_cache.go b/pkg/search/proxy/store/multi_cluster_cache.go index 81b9c6f62abc..01a6e255d78c 100644 --- a/pkg/search/proxy/store/multi_cluster_cache.go +++ b/pkg/search/proxy/store/multi_cluster_cache.go @@ -334,15 +334,44 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe if cache == nil { continue } - w, err := cache.Watch(ctx, options) - if err != nil { + + // The following logic adds a 30-second timeout to prevent watch requests to member clusters from hanging, + // which could cause the client watch to hang. + watchChan := make(chan watch.Interface, 1) + errChan := make(chan error, 1) + + go func(cluster string) { + w, err := cache.Watch(ctx, options) + if err != nil { + select { + case errChan <- fmt.Errorf("failed to start watch for resource %v in cluster %q: %v", gvr.String(), cluster, err): + case <-ctx.Done(): + } + return + } + + select { + case watchChan <- w: + case <-ctx.Done(): + w.Stop() + } + }(cluster) + + select { + case w := <-watchChan: + mux.AddSource(w, func(e watch.Event) { + setObjectResourceVersionFunc(cluster, e.Object) + addCacheSourceAnnotation(e.Object, cluster) + }) + case err := <-errChan: + // If the watch request fails, return the error, and the client will retry. return nil, err + case <-time.After(30 * time.Second): + // If the watch request times out, return an error, and the client will retry. + return nil, fmt.Errorf("timeout waiting for watch for resource %v in cluster %q", gvr.String(), cluster) + case <-ctx.Done(): + return nil, ctx.Err() } - - mux.AddSource(w, func(e watch.Event) { - setObjectResourceVersionFunc(cluster, e.Object) - addCacheSourceAnnotation(e.Object, cluster) - }) } mux.Start() return mux, nil