From 0402af1524472997f6bae9e64c3a26f2f3c535a8 Mon Sep 17 00:00:00 2001 From: closetool Date: Thu, 15 Jun 2023 12:27:17 +0800 Subject: [PATCH 1/2] fix: informer stuck Signed-off-by: closetool --- core/cmd/cmd.go | 2 ++ pkg/jobs/k8sevent/k8sevent.go | 2 -- pkg/regioninformers/regioninformers.go | 30 ++++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/core/cmd/cmd.go b/core/cmd/cmd.go index 61dc7c8e..66533b94 100644 --- a/core/cmd/cmd.go +++ b/core/cmd/cmd.go @@ -28,6 +28,7 @@ import ( "regexp" "sync" "syscall" + "time" "github.com/horizoncd/horizon/core/config" accessctl "github.com/horizoncd/horizon/core/controller/access" @@ -439,6 +440,7 @@ func Init(ctx context.Context, flags *Flags, coreConfig *config.Config) { grafanaService := grafana.NewService(coreConfig.GrafanaConfig, manager, client) regionInformers := regioninformers.NewRegionInformers(manager.RegionMgr, 0) regionInformers.Register(workload.Resources...) + go regionInformers.WatchDB(ctx, 60*time.Second) parameter := ¶m.Param{ Manager: manager, OauthManager: oauthManager, diff --git a/pkg/jobs/k8sevent/k8sevent.go b/pkg/jobs/k8sevent/k8sevent.go index bcc54a3a..bb318303 100644 --- a/pkg/jobs/k8sevent/k8sevent.go +++ b/pkg/jobs/k8sevent/k8sevent.go @@ -62,8 +62,6 @@ func New(config k8sevent.Config, informers *regioninformers.RegionInformers, func (v *SuperVisor) Run(ctx context.Context) { v.informers.Register(regioninformers.Resource{GVR: gvrEvent, MakeHandler: v.newEventHandler}) - - v.informers.WatchDB(ctx, 60*time.Second) } type EventWithTime struct { diff --git a/pkg/regioninformers/regioninformers.go b/pkg/regioninformers/regioninformers.go index 4adba26a..eb956bba 100644 --- a/pkg/regioninformers/regioninformers.go +++ b/pkg/regioninformers/regioninformers.go @@ -6,6 +6,7 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" @@ -323,10 +324,14 @@ func (f *RegionInformers) ensureGVR(regionID uint, gvr schema.GroupVersionResour return nil } -func (f *RegionInformers) ensureDynamicGVR(regionID uint, gvr schema.GroupVersionResource) { +func (f *RegionInformers) ensureDynamicGVR(regionID uint, gvr schema.GroupVersionResource) error { if !f.whetherGVRExist(regionID, gvr) { + if !f.resourceExistInK8S(gvr, f.clients[regionID]) { + return fmt.Errorf("resource %s not exist in region %d", gvr.String(), regionID) + } f.watchDynamicGvr(regionID, gvr) } + return nil } type InformerOperation func(informer informers.GenericInformer) error @@ -374,7 +379,9 @@ func (f *RegionInformers) GetDynamicInformer(regionID uint, return err } - f.ensureDynamicGVR(regionID, gvr) + if err := f.ensureDynamicGVR(regionID, gvr); err != nil { + return err + } f.mu.RLock() defer f.mu.RUnlock() @@ -422,11 +429,30 @@ func (f *RegionInformers) Register(handlers ...Resource) { } } +func (f *RegionInformers) resourceExistInK8S(gvr schema.GroupVersionResource, client *RegionClient) bool { + if client == nil { + return false + } + _, err := client.dynamicClientset.Resource(gvr).List(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Errorf(context.Background(), + "list %s failed: %v\ngvr %s not exist in region %d", + gvr, err, gvr, client.regionID) + return false + } + return true +} + func (f *RegionInformers) registerHandler(client *RegionClient) { for i, handler := range f.handlers { if _, ok := client.handlers[i]; ok { continue } + + if !f.resourceExistInK8S(handler.GVR, client) { + continue + } + informer := client.dynamicFactory.ForResource(handler.GVR) if handler.MakeHandler != nil { eventHandler, err := handler.MakeHandler(client.regionID, client.stopCh) From 00829d0cc3ad1c1abc948c8bd9aaf3cde11a7c5b Mon Sep 17 00:00:00 2001 From: closetool Date: Thu, 15 Jun 2023 14:32:22 +0800 Subject: [PATCH 2/2] fix: using discovery client Signed-off-by: closetool --- pkg/regioninformers/regioninformers.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/regioninformers/regioninformers.go b/pkg/regioninformers/regioninformers.go index eb956bba..39367c1c 100644 --- a/pkg/regioninformers/regioninformers.go +++ b/pkg/regioninformers/regioninformers.go @@ -6,7 +6,7 @@ import ( "sync" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" @@ -35,6 +35,7 @@ type RegionClient struct { dynamicFactory dynamicinformer.DynamicSharedInformerFactory clientset kubernetes.Interface dynamicClientset dynamic.Interface + discoveryClient *discovery.DiscoveryClient handlers map[int]struct{} mapper meta.RESTMapper stopCh chan struct{} @@ -111,6 +112,8 @@ func (f *RegionInformers) NewRegionInformers(region *models.Region) error { return err } + discoveryClient := discovery.NewDiscoveryClient(clientSet.RESTClient()) + factory := informers.NewSharedInformerFactory(clientSet, f.defaultResync) dynamicClientSet, err := dynamic.NewForConfig(restConfig) @@ -136,6 +139,7 @@ func (f *RegionInformers) NewRegionInformers(region *models.Region) error { factory: factory, dynamicFactory: dynamicFactory, clientset: clientSet, + discoveryClient: discoveryClient, dynamicClientset: dynamicClientSet, handlers: make(map[int]struct{}), mapper: mapper, @@ -433,14 +437,20 @@ func (f *RegionInformers) resourceExistInK8S(gvr schema.GroupVersionResource, cl if client == nil { return false } - _, err := client.dynamicClientset.Resource(gvr).List(context.Background(), metav1.ListOptions{}) + + apiResourceList, err := client.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) if err != nil { - log.Errorf(context.Background(), - "list %s failed: %v\ngvr %s not exist in region %d", - gvr, err, gvr, client.regionID) + log.Errorf(context.Background(), "list api resources failed: %v", err) return false } - return true + + for _, apiResource := range apiResourceList.APIResources { + if apiResource.Name == gvr.Resource { + return true + } + } + + return false } func (f *RegionInformers) registerHandler(client *RegionClient) {