From 5cb146df3ecfdaf337a1fbc5ba969b9e1ea1c337 Mon Sep 17 00:00:00 2001 From: Petr Muller Date: Fri, 25 Apr 2025 15:47:50 +0200 Subject: [PATCH 1/2] refactor(start): process always enable caps in constructor --- pkg/start/start.go | 36 ++++++++++++++++------------- pkg/start/start_integration_test.go | 24 ++++++++----------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/pkg/start/start.go b/pkg/start/start.go index daee3c77f..d49f9ac65 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -86,9 +86,11 @@ type Options struct { HyperShift bool - // AlwaysEnableCapabilities is a list of cluster version capabilities - // which will always be implicitly enabled. + // AlwaysEnableCapabilities is user-provided list of cluster version capabilities to be always be implicitly enabled AlwaysEnableCapabilities []string + // alwaysEnableCapabilities is the parsed list of cluster version capabilities to be always be implicitly enabled, + // guaranteed to contain only known capabilities. + alwaysEnableCapabilities []configv1.ClusterVersionCapability // for testing only Name string @@ -158,9 +160,8 @@ func (o *Options) Run(ctx context.Context) error { if len(o.Exclude) > 0 { klog.Infof("Excluding manifests for %q", o.Exclude) } - alwaysEnableCaps, unknownCaps := parseAlwaysEnableCapabilities(o.AlwaysEnableCapabilities) - if len(unknownCaps) > 0 { - return fmt.Errorf("--always-enable-capabilities was set with unknown capabilities: %v", unknownCaps) + if err := o.parseAlwaysEnableCapabilities(); err != nil { + return fmt.Errorf("--always-enable-capability: %w", err) } // Inject the cluster ID into PromQL queries in HyperShift @@ -185,7 +186,7 @@ func (o *Options) Run(ctx context.Context) error { } // initialize the controllers and attempt to load the payload information - controllerCtx, err := o.NewControllerContext(cb, alwaysEnableCaps) + controllerCtx, err := o.NewControllerContext(cb) if err != nil { return err } @@ -470,7 +471,7 @@ type Context struct { // NewControllerContext initializes the default Context for the current Options. It does // not start any background processes. -func (o *Options) NewControllerContext(cb *ClientBuilder, alwaysEnableCapabilities []configv1.ClusterVersionCapability) (*Context, error) { +func (o *Options) NewControllerContext(cb *ClientBuilder) (*Context, error) { client := cb.ClientOrDie("shared-informer") kubeClient := cb.KubeClientOrDie(internal.ConfigNamespace, useProtobuf) operatorClient := cb.OperatorClientOrDie("operator-client") @@ -511,7 +512,7 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, alwaysEnableCapabiliti o.PromQLTarget, o.InjectClusterIdIntoPromQL, o.UpdateService, - alwaysEnableCapabilities, + o.alwaysEnableCapabilities, ) if err != nil { return nil, err @@ -620,16 +621,14 @@ func (c *Context) InitializeFromPayload(ctx context.Context, restConfig *rest.Co // parseAlwaysEnableCapabilities parses the string list of capabilities // into two lists of configv1.ClusterVersionCapability: known and unknown. -func parseAlwaysEnableCapabilities(caps []string) ([]configv1.ClusterVersionCapability, []configv1.ClusterVersionCapability) { - var ( - knownCaps []configv1.ClusterVersionCapability - unknownCaps []configv1.ClusterVersionCapability - ) - for _, c := range caps { +func (o *Options) parseAlwaysEnableCapabilities() error { + var unknownCaps []configv1.ClusterVersionCapability + + for _, c := range o.AlwaysEnableCapabilities { known := false for _, kc := range configv1.KnownClusterVersionCapabilities { if configv1.ClusterVersionCapability(c) == kc { - knownCaps = append(knownCaps, kc) + o.alwaysEnableCapabilities = append(o.alwaysEnableCapabilities, kc) known = true break } @@ -638,5 +637,10 @@ func parseAlwaysEnableCapabilities(caps []string) ([]configv1.ClusterVersionCapa unknownCaps = append(unknownCaps, configv1.ClusterVersionCapability(c)) } } - return knownCaps, unknownCaps + + if len(unknownCaps) > 0 { + return fmt.Errorf("unknown capabilities: %v", unknownCaps) + } + + return nil } diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 86704f1a8..4e2162f48 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -184,15 +184,13 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "0.0.1") options.leaderElection = getLeaderElectionConfig(ctx, cfg) - alwaysEnableCapabilities := []configv1.ClusterVersionCapability{ - configv1.ClusterVersionCapabilityIngress, - } - controllers, err := options.NewControllerContext(cb, alwaysEnableCapabilities) + options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} + controllers, err := options.NewControllerContext(cb) if err != nil { t.Fatal(err) } - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, alwaysEnableCapabilities) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, options.alwaysEnableCapabilities) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, options.Namespace, options.Name) @@ -318,15 +316,13 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "0.0.1") options.leaderElection = getLeaderElectionConfig(ctx, cfg) - alwaysEnableCapabilities := []configv1.ClusterVersionCapability{ - configv1.ClusterVersionCapabilityIngress, - } - controllers, err := options.NewControllerContext(cb, alwaysEnableCapabilities) + options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} + controllers, err := options.NewControllerContext(cb) if err != nil { t.Fatal(err) } - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, alwaysEnableCapabilities) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, options.alwaysEnableCapabilities) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, options.Namespace, options.Name) @@ -514,15 +510,13 @@ metadata: options.ReleaseImage = payloadImage1 options.PayloadOverride = payloadDir options.leaderElection = getLeaderElectionConfig(ctx, cfg) - alwaysEnableCapabilities := []configv1.ClusterVersionCapability{ - configv1.ClusterVersionCapabilityIngress, - } - controllers, err := options.NewControllerContext(cb, alwaysEnableCapabilities) + options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} + controllers, err := options.NewControllerContext(cb) if err != nil { t.Fatal(err) } - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, alwaysEnableCapabilities) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", "", record.NewFakeRecorder(100), payload.DefaultClusterProfile, options.alwaysEnableCapabilities) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, options.Namespace, options.Name) From 0b85c403f1d2b792c135012448c25daf1c0fa7ad Mon Sep 17 00:00:00 2001 From: Petr Muller Date: Fri, 25 Apr 2025 18:18:45 +0200 Subject: [PATCH 2/2] poc(ocpbugs-30080): read OCP version from payload metadada --- pkg/cvo/cvo.go | 30 ++--- pkg/featuregates/featuregates.go | 30 ----- pkg/payload/payload.go | 39 +++--- pkg/start/start.go | 199 +++++++++++++++------------- pkg/start/start_integration_test.go | 11 +- 5 files changed, 151 insertions(+), 158 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a043ce972..4b4674e5c 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -175,6 +175,7 @@ type Operator struct { exclude string enabledFeatureGates featuregates.CvoGateChecker + requiredFeatureSet configv1.FeatureSet clusterProfile string uid types.UID @@ -210,6 +211,8 @@ func New( injectClusterIdIntoPromQL bool, updateService string, alwaysEnableCapabilities []configv1.ClusterVersionCapability, + startingFeatureSet configv1.FeatureSet, + cvoGates featuregates.CvoGateChecker, ) (*Operator, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) @@ -246,7 +249,9 @@ func New( // Because of OCPBUGS-30080, we can only detect the enabled feature gates after Operator loads the initial payload // from disk via LoadInitialPayload. We must not have any gate-checking code until that happens, so we initialize // this field with a checker that panics when used. - enabledFeatureGates: featuregates.PanicOnUsageBeforeInitialization, + enabledFeatureGates: cvoGates, + requiredFeatureSet: startingFeatureSet, + alwaysEnableCapabilities: alwaysEnableCapabilities, } @@ -283,7 +288,7 @@ func New( // LoadInitialPayload waits until a ClusterVersion object exists. It then retrieves the payload contents, verifies the // initial state and returns it. If the payload is invalid, an error is returned. -func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFeatureSet configv1.FeatureSet, restConfig *rest.Config) (*payload.Update, error) { +func (optr *Operator) LoadInitialPayload(ctx context.Context, restConfig, burstRestConfig *rest.Config) error { // wait until cluster version object exists if err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(ctx context.Context) (bool, error) { @@ -300,19 +305,19 @@ func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFe } return true, nil }); err != nil { - return nil, fmt.Errorf("Error when attempting to get cluster version object: %w", err) + return fmt.Errorf("Error when attempting to get cluster version object: %w", err) } - update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, string(startingRequiredFeatureSet), + update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, string(optr.requiredFeatureSet), optr.clusterProfile, configv1.KnownClusterVersionCapabilities) if err != nil { - return nil, fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err) + return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err) } httpClientConstructor := sigstore.NewCachedHTTPClientConstructor(optr.HTTPClient, nil) configClient, err := coreclientsetv1.NewForConfig(restConfig) if err != nil { - return nil, fmt.Errorf("unable to create a configuration client: %v", err) + return fmt.Errorf("unable to create a configuration client: %v", err) } customSignatureStore := &customsignaturestore.Store{ @@ -324,7 +329,7 @@ func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFe // attempt to load a verifier as defined in the payload verifier, signatureStore, err := loadConfigMapVerifierDataFromUpdate(update, httpClientConstructor.HTTPClient, configClient, customSignatureStore) if err != nil { - return nil, err + return err } if verifier != nil { klog.Infof("Verifying release authenticity: %v", verifier) @@ -334,13 +339,6 @@ func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFe } optr.verifier = verifier optr.signatureStore = signatureStore - return update, nil -} - -// InitializeFromPayload configures the controller that loads and applies content to the cluster given an initial payload -// and feature gate data. -func (optr *Operator) InitializeFromPayload(update *payload.Update, requiredFeatureSet configv1.FeatureSet, cvoFlags featuregates.CvoGateChecker, restConfig *rest.Config, burstRestConfig *rest.Config) { - optr.enabledFeatureGates = cvoFlags optr.release = update.Release optr.releaseCreated = update.ImageRef.CreationTimestamp.Time @@ -358,11 +356,13 @@ func (optr *Operator) InitializeFromPayload(update *payload.Update, requiredFeat Cap: time.Second * 15, }, optr.exclude, - requiredFeatureSet, + optr.requiredFeatureSet, optr.eventRecorder, optr.clusterProfile, optr.alwaysEnableCapabilities, ) + + return nil } // ownerReferenceModifier sets the owner reference to the current CV resource if no other reference exists. It also resets diff --git a/pkg/featuregates/featuregates.go b/pkg/featuregates/featuregates.go index b91bcb9b4..9a17a1b36 100644 --- a/pkg/featuregates/featuregates.go +++ b/pkg/featuregates/featuregates.go @@ -35,36 +35,6 @@ type CvoGateChecker interface { CVOConfiguration() bool } -type panicOnUsageBeforeInitializationFunc func() - -func panicOnUsageBeforeInitialization() { - panic("CVO feature flags were used before they were initialized") -} - -// PanicOnUsageBeforeInitialization is a CvoGateChecker that panics if any of its methods are called. This checker should -// be used before CVO feature gates are actually known and some code tries to check them. -var PanicOnUsageBeforeInitialization = panicOnUsageBeforeInitializationFunc(panicOnUsageBeforeInitialization) - -func (p panicOnUsageBeforeInitializationFunc) ReconciliationIssuesCondition() bool { - p() - return false -} - -func (p panicOnUsageBeforeInitializationFunc) StatusReleaseArchitecture() bool { - p() - return false -} - -func (p panicOnUsageBeforeInitializationFunc) UnknownVersion() bool { - p() - return false -} - -func (p panicOnUsageBeforeInitializationFunc) CVOConfiguration() bool { - p() - return false -} - // CvoGates contains flags that control CVO functionality gated by product feature gates. The // names do not correspond to product feature gates, the booleans here are "smaller" (product-level // gate will enable multiple CVO behaviors). diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index a0730dc04..e09398007 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -141,11 +141,23 @@ type metadata struct { func LoadUpdate(dir, releaseImage, excludeIdentifier string, requiredFeatureSet string, profile string, knownCapabilities []configv1.ClusterVersionCapability) (*Update, error) { - payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage, profile) + klog.V(2).Infof("Loading updatepayload from %q", dir) + if err := ValidateDirectory(dir); err != nil { + return nil, err + } + + var ( + releaseDir = filepath.Join(dir, ReleaseManifestDir) + cvoDir = filepath.Join(dir, CVOManifestDir) + ) + + payload, err := loadMetadata(releaseDir, releaseImage) if err != nil { return nil, err } + tasks := getPayloadTasks(releaseDir, cvoDir, releaseImage, profile) + var onlyKnownCaps *configv1.ClusterVersionCapabilitiesStatus if knownCapabilities != nil { @@ -299,38 +311,27 @@ type payloadTasks struct { skipFiles sets.Set[string] } -func loadUpdatePayloadMetadata(dir, releaseImage, clusterProfile string) (*Update, []payloadTasks, error) { - klog.V(2).Infof("Loading updatepayload from %q", dir) - if err := ValidateDirectory(dir); err != nil { - return nil, nil, err - } - var ( - cvoDir = filepath.Join(dir, CVOManifestDir) - releaseDir = filepath.Join(dir, ReleaseManifestDir) - ) - - release, arch, err := loadReleaseFromMetadata(releaseDir) +func loadMetadata(releaseDir, releaseImage string) (*Update, error) { + release, arch, err := LoadReleaseFromMetadata(releaseDir) if err != nil { - return nil, nil, err + return nil, err } release.Image = releaseImage imageRef, err := loadImageReferences(releaseDir) if err != nil { - return nil, nil, err + return nil, err } if imageRef.Name != release.Version { - return nil, nil, fmt.Errorf("Version from %s (%s) differs from %s (%s)", imageReferencesFile, imageRef.Name, cincinnatiJSONFile, release.Version) + return nil, fmt.Errorf("Version from %s (%s) differs from %s (%s)", imageReferencesFile, imageRef.Name, cincinnatiJSONFile, release.Version) } - tasks := getPayloadTasks(releaseDir, cvoDir, releaseImage, clusterProfile) - return &Update{ Release: release, ImageRef: imageRef, Architecture: arch, - }, tasks, nil + }, nil } func getPayloadTasks(releaseDir, cvoDir, releaseImage, clusterProfile string) []payloadTasks { @@ -353,7 +354,7 @@ func getPayloadTasks(releaseDir, cvoDir, releaseImage, clusterProfile string) [] }} } -func loadReleaseFromMetadata(releaseDir string) (configv1.Release, string, error) { +func LoadReleaseFromMetadata(releaseDir string) (configv1.Release, string, error) { var release configv1.Release path := filepath.Join(releaseDir, cincinnatiJSONFile) data, err := os.ReadFile(path) diff --git a/pkg/start/start.go b/pkg/start/start.go index d49f9ac65..f54d2b75a 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -9,6 +9,7 @@ import ( "net/url" "os" "os/signal" + "path" "syscall" "time" @@ -34,7 +35,7 @@ import ( configv1 "github.com/openshift/api/config/v1" clientset "github.com/openshift/client-go/config/clientset/versioned" - "github.com/openshift/client-go/config/informers/externalversions" + configexternalversions "github.com/openshift/client-go/config/informers/externalversions" operatorclientset "github.com/openshift/client-go/operator/clientset/versioned" operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions" "github.com/openshift/library-go/pkg/config/clusterstatus" @@ -185,8 +186,11 @@ func (o *Options) Run(ctx context.Context) error { return err } + cvConfigInformer, configInformer := o.prepareConfigInformers(cb) + startingFeatureSet, cvoGates, err := o.initializeFeatureGates(ctx, configInformer) + // initialize the controllers and attempt to load the payload information - controllerCtx, err := o.NewControllerContext(cb) + controllerCtx, err := o.NewControllerContext(cb, startingFeatureSet, cvoGates, cvConfigInformer, configInformer) if err != nil { return err } @@ -195,6 +199,83 @@ func (o *Options) Run(ctx context.Context) error { return nil } +func (o *Options) prepareConfigInformers(cb *ClientBuilder) (configexternalversions.SharedInformerFactory, configexternalversions.SharedInformerFactory) { + client := cb.ClientOrDie("shared-informer") + cvInformer := configexternalversions.NewFilteredSharedInformerFactory(client, resyncPeriod(o.ResyncInterval), "", func(opts *metav1.ListOptions) { + opts.FieldSelector = fmt.Sprintf("metadata.name=%s", o.Name) + }) + sharedInformers := configexternalversions.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)) + + return cvInformer, sharedInformers +} + +func (o *Options) initializeFeatureGates(ctx context.Context, configInformer configexternalversions.SharedInformerFactory) (configv1.FeatureSet, *featuregates.CvoGates, error) { + featureGates := configInformer.Config().V1().FeatureGates().Lister() + configInformer.Start(ctx.Done()) + configInformer.WaitForCacheSync(ctx.Done()) + + var startingFeatureSet configv1.FeatureSet + var clusterFeatureGate *configv1.FeatureGate + + var releaseRoot = "/" + if o.PayloadOverride != "" { + releaseRoot = o.PayloadOverride + } + + releaseMetadata, _, err := payload.LoadReleaseFromMetadata(path.Join(releaseRoot, payload.ReleaseManifestDir)) + if err != nil { + return "", nil, fmt.Errorf("error loading release version: %v", err) + } + + // client-go automatically retries some network blip errors on GETs for 30s by default, and we want to + // retry the remaining ones ourselves. If we fail longer than that, the operator won't be able to do work + // anyway. Return the error and crashloop. + // + // We implement the timeout with a context because the timeout in PollImmediateWithContext does not behave + // well when ConditionFunc takes longer time to execute, like here where the GET can be retried by client-go + var lastError error + if err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 25*time.Second, true, func(ctx context.Context) (bool, error) { + gate, fgErr := featureGates.Get("cluster") + switch { + case apierrors.IsNotFound(fgErr): + // if we have no featuregates, then the cluster is using the default featureset, which is "". + // This excludes everything that could possibly depend on a different feature set. + startingFeatureSet = "" + klog.Infof("FeatureGate not found in cluster, using default feature set %q at startup", startingFeatureSet) + return true, nil + case fgErr != nil: + lastError = fgErr + klog.Warningf("Failed to get FeatureGate from cluster: %v", fgErr) + return false, nil + default: + clusterFeatureGate = gate + startingFeatureSet = gate.Spec.FeatureSet + klog.Infof("FeatureGate found in cluster, using its feature set %q at startup", startingFeatureSet) + return true, nil + } + }); err != nil { + if lastError != nil { + return "", nil, lastError + } + return "", nil, err + } + + var cvoGates featuregates.CvoGates + if clusterFeatureGate != nil { + cvoGates = featuregates.CvoGatesFromFeatureGate(clusterFeatureGate, releaseMetadata.Version) + } else { + cvoGates = featuregates.DefaultCvoGates(releaseMetadata.Version) + } + + if cvoGates.UnknownVersion() { + klog.Infof("CVO features for version %s could not be detected from FeatureGate; will use defaults plus special UnknownVersion feature gate", releaseMetadata.Version) + } + klog.Infof("CVO features for version %s enabled at startup: %+v", releaseMetadata.Version, cvoGates) + + return startingFeatureSet, &cvoGates, nil + +} + // run launches a number of goroutines to handle manifest application, // metrics serving, etc. It continues operating until ctx.Done(), // and then attempts a clean shutdown limited by an internal context @@ -239,6 +320,13 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock resource } } + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + err := controllerCtx.StopOnFeatureGateChange.Run(runContext, runCancel) + resultChannel <- asyncResult{name: "stop-on-techpreview-change controller", error: err} + }() + resultChannelCount++ go func() { defer utilruntime.HandleCrash() @@ -260,7 +348,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock resource resultChannel <- asyncResult{name: "metrics server", error: err} }() } - if err := controllerCtx.InitializeFromPayload(runContext, restConfig, burstRestConfig); err != nil { + if err := controllerCtx.CVO.LoadInitialPayload(ctx, restConfig, burstRestConfig); err != nil { if firstError == nil { firstError = err } @@ -276,13 +364,6 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock resource resultChannel <- asyncResult{name: "main operator", error: err} }() - resultChannelCount++ - go func() { - defer utilruntime.HandleCrash() - err := controllerCtx.StopOnFeatureGateChange.Run(runContext, runCancel) - resultChannel <- asyncResult{name: "stop-on-techpreview-change controller", error: err} - }() - if controllerCtx.AutoUpdate != nil { resultChannelCount++ go func() { @@ -460,10 +541,10 @@ type Context struct { AutoUpdate *autoupdate.Controller StopOnFeatureGateChange *featuregates.ChangeStopper - CVInformerFactory externalversions.SharedInformerFactory + CVInformerFactory configexternalversions.SharedInformerFactory OpenshiftConfigInformerFactory informers.SharedInformerFactory OpenshiftConfigManagedInformerFactory informers.SharedInformerFactory - InformerFactory externalversions.SharedInformerFactory + InformerFactory configexternalversions.SharedInformerFactory OperatorInformerFactory operatorexternalversions.SharedInformerFactory fgLister configlistersv1.FeatureGateLister @@ -471,23 +552,20 @@ type Context struct { // NewControllerContext initializes the default Context for the current Options. It does // not start any background processes. -func (o *Options) NewControllerContext(cb *ClientBuilder) (*Context, error) { - client := cb.ClientOrDie("shared-informer") +func (o *Options) NewControllerContext(cb *ClientBuilder, startingFeatureSet configv1.FeatureSet, cvoGates *featuregates.CvoGates, cvConfigInformer, configInformer configexternalversions.SharedInformerFactory) (*Context, error) { + kubeClient := cb.KubeClientOrDie(internal.ConfigNamespace, useProtobuf) operatorClient := cb.OperatorClientOrDie("operator-client") - cvInformer := externalversions.NewFilteredSharedInformerFactory(client, resyncPeriod(o.ResyncInterval), "", func(opts *metav1.ListOptions) { - opts.FieldSelector = fmt.Sprintf("metadata.name=%s", o.Name) - }) openshiftConfigInformer := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), informers.WithNamespace(internal.ConfigNamespace)) openshiftConfigManagedInformer := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), informers.WithNamespace(internal.ConfigManagedNamespace)) - sharedInformers := externalversions.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)) + operatorInformerFactory := operatorexternalversions.NewSharedInformerFactoryWithOptions(operatorClient, o.ResyncInterval, operatorexternalversions.WithTweakListOptions(func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", configuration.ClusterVersionOperatorConfigurationName).String() })) - coInformer := sharedInformers.Config().V1().ClusterOperators() + coInformer := configInformer.Config().V1().ClusterOperators() cvoKubeClient := cb.KubeClientOrDie(o.Namespace, useProtobuf) o.PromQLTarget.KubeClient = cvoKubeClient @@ -497,11 +575,11 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) (*Context, error) { o.ReleaseImage, o.PayloadOverride, resyncPeriod(o.ResyncInterval), - cvInformer.Config().V1().ClusterVersions(), + cvConfigInformer.Config().V1().ClusterVersions(), coInformer, openshiftConfigInformer.Core().V1().ConfigMaps(), openshiftConfigManagedInformer.Core().V1().ConfigMaps(), - sharedInformers.Config().V1().Proxies(), + configInformer.Config().V1().Proxies(), operatorInformerFactory, cb.ClientOrDie(o.Namespace), cvoKubeClient, @@ -513,33 +591,36 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) (*Context, error) { o.InjectClusterIdIntoPromQL, o.UpdateService, o.alwaysEnableCapabilities, + startingFeatureSet, + cvoGates, ) if err != nil { return nil, err } - featureChangeStopper, err := featuregates.NewChangeStopper(sharedInformers.Config().V1().FeatureGates()) + featureChangeStopper, err := featuregates.NewChangeStopper(configInformer.Config().V1().FeatureGates()) if err != nil { return nil, err } + featureChangeStopper.SetStartingFeatures(startingFeatureSet, *cvoGates) ctx := &Context{ - CVInformerFactory: cvInformer, + CVInformerFactory: cvConfigInformer, OpenshiftConfigInformerFactory: openshiftConfigInformer, OpenshiftConfigManagedInformerFactory: openshiftConfigManagedInformer, - InformerFactory: sharedInformers, + InformerFactory: configInformer, OperatorInformerFactory: operatorInformerFactory, CVO: cvo, StopOnFeatureGateChange: featureChangeStopper, - fgLister: sharedInformers.Config().V1().FeatureGates().Lister(), + fgLister: configInformer.Config().V1().FeatureGates().Lister(), } if o.EnableAutoUpdate { ctx.AutoUpdate, err = autoupdate.New( o.Namespace, o.Name, - cvInformer.Config().V1().ClusterVersions(), - sharedInformers.Config().V1().ClusterOperators(), + cvConfigInformer.Config().V1().ClusterVersions(), + configInformer.Config().V1().ClusterOperators(), cb.ClientOrDie(o.Namespace), cb.KubeClientOrDie(o.Namespace), ) @@ -555,70 +636,6 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) (*Context, error) { return ctx, nil } -// InitializeFromPayload initializes the CVO and FeatureGate ChangeStoppers controllers from the payload. It extracts the -// current CVO version from the initial payload and uses it to determine the initial the required featureset and enabled -// feature gates. Both the payload and determined feature information are used to initialize CVO and feature gate -// ChangeStopper controllers. -func (c *Context) InitializeFromPayload(ctx context.Context, restConfig *rest.Config, burstRestConfig *rest.Config) error { - var startingFeatureSet configv1.FeatureSet - var clusterFeatureGate *configv1.FeatureGate - - // client-go automatically retries some network blip errors on GETs for 30s by default, and we want to - // retry the remaining ones ourselves. If we fail longer than that, the operator won't be able to do work - // anyway. Return the error and crashloop. - // - // We implement the timeout with a context because the timeout in PollImmediateWithContext does not behave - // well when ConditionFunc takes longer time to execute, like here where the GET can be retried by client-go - var lastError error - if err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 25*time.Second, true, func(ctx context.Context) (bool, error) { - gate, fgErr := c.fgLister.Get("cluster") - switch { - case apierrors.IsNotFound(fgErr): - // if we have no featuregates, then the cluster is using the default featureset, which is "". - // This excludes everything that could possibly depend on a different feature set. - startingFeatureSet = "" - klog.Infof("FeatureGate not found in cluster, using default feature set %q at startup", startingFeatureSet) - return true, nil - case fgErr != nil: - lastError = fgErr - klog.Warningf("Failed to get FeatureGate from cluster: %v", fgErr) - return false, nil - default: - clusterFeatureGate = gate - startingFeatureSet = gate.Spec.FeatureSet - klog.Infof("FeatureGate found in cluster, using its feature set %q at startup", startingFeatureSet) - return true, nil - } - }); err != nil { - if lastError != nil { - return lastError - } - return err - } - - payload, err := c.CVO.LoadInitialPayload(ctx, startingFeatureSet, restConfig) - if err != nil { - return err - } - - var cvoGates featuregates.CvoGates - if clusterFeatureGate != nil { - cvoGates = featuregates.CvoGatesFromFeatureGate(clusterFeatureGate, payload.Release.Version) - } else { - cvoGates = featuregates.DefaultCvoGates(payload.Release.Version) - } - - if cvoGates.UnknownVersion() { - klog.Infof("CVO features for version %s could not be detected from FeatureGate; will use defaults plus special UnknownVersion feature gate", payload.Release.Version) - } - klog.Infof("CVO features for version %s enabled at startup: %+v", payload.Release.Version, cvoGates) - - c.StopOnFeatureGateChange.SetStartingFeatures(startingFeatureSet, cvoGates) - c.CVO.InitializeFromPayload(payload, startingFeatureSet, cvoGates, restConfig, burstRestConfig) - - return nil -} - // parseAlwaysEnableCapabilities parses the string list of capabilities // into two lists of configv1.ClusterVersionCapability: known and unknown. func (o *Options) parseAlwaysEnableCapabilities() error { diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 4e2162f48..2a6855b9f 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -17,6 +17,8 @@ import ( "time" "github.com/google/uuid" + "github.com/openshift/cluster-version-operator/pkg/featuregates" + "k8s.io/utils/ptr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -185,7 +187,8 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.PayloadOverride = filepath.Join(dir, "0.0.1") options.leaderElection = getLeaderElectionConfig(ctx, cfg) options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} - controllers, err := options.NewControllerContext(cb) + cvConfigInformer, configInformer := options.prepareConfigInformers(cb) + controllers, err := options.NewControllerContext(cb, configv1.Default, ptr.To(featuregates.DefaultCvoGates("0.0.1-snapshot")), cvConfigInformer, configInformer) if err != nil { t.Fatal(err) } @@ -317,7 +320,8 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.PayloadOverride = filepath.Join(dir, "0.0.1") options.leaderElection = getLeaderElectionConfig(ctx, cfg) options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} - controllers, err := options.NewControllerContext(cb) + cvConfigInformer, configInformer := options.prepareConfigInformers(cb) + controllers, err := options.NewControllerContext(cb, configv1.Default, ptr.To(featuregates.DefaultCvoGates("0.0.1-snapshot")), cvConfigInformer, configInformer) if err != nil { t.Fatal(err) } @@ -511,7 +515,8 @@ metadata: options.PayloadOverride = payloadDir options.leaderElection = getLeaderElectionConfig(ctx, cfg) options.alwaysEnableCapabilities = []configv1.ClusterVersionCapability{configv1.ClusterVersionCapabilityIngress} - controllers, err := options.NewControllerContext(cb) + cvConfigInformer, configInformer := options.prepareConfigInformers(cb) + controllers, err := options.NewControllerContext(cb, configv1.Default, ptr.To(featuregates.DefaultCvoGates("0.0.1-snapshot")), cvConfigInformer, configInformer) if err != nil { t.Fatal(err) }