Skip to content

Commit

Permalink
Add more logging around catalog source sync
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Oct 18, 2024
1 parent efe9779 commit b48b163
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 38 deletions.
65 changes: 47 additions & 18 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,8 +897,15 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
}

func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {
defer func() {
logger.WithFields(logrus.Fields{
"continueSync": continueSync,
}).Info("source type validated")
}()

out = in
var err error

switch sourceType := out.Spec.SourceType; sourceType {
case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap:
if out.Spec.ConfigMap == "" {
Expand All @@ -912,6 +919,7 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
err = fmt.Errorf("unknown sourcetype: %s", sourceType)
}
if err != nil {
logger.WithError(err).Error("error validating catalog source type")
out.SetError(v1alpha1.CatalogSourceSpecInvalidError, err)
return
}
Expand All @@ -923,7 +931,6 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
}
}
continueSync = true

return
}

Expand All @@ -936,17 +943,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc

out = in.DeepCopy()

logger = logger.WithField("step", "syncConfigMap")
logger = logger.WithFields(logrus.Fields{
"configmap.namespace": in.Namespace,
"configmap.name": in.Spec.ConfigMap,
})
logger.Info("checking catsrc configmap state")

defer func() {
logger.WithError(syncError).WithField("continueSync", continueSync).Info("config map sync'ed")
}()

var updateLabel bool
// Get the catalog source's config map
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(in.GetNamespace()).Get(in.Spec.ConfigMap)
// Attempt to look up the CM via api call if there is a cache miss
if apierrors.IsNotFound(err) {
// TODO: Don't reach out via live client if its not found in the cache (https://github.com/operator-framework/operator-lifecycle-manager/issues/3415)
configMap, err = o.opClient.KubernetesInterface().CoreV1().ConfigMaps(in.GetNamespace()).Get(context.TODO(), in.Spec.ConfigMap, metav1.GetOptions{})
// Found cm in the cluster, add managed label to configmap
if err == nil {
Expand All @@ -973,12 +985,9 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
out.SetError(v1alpha1.CatalogSourceConfigMapError, syncError)
return
}

logger.Info("adopted configmap")
}

if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) {
logger.Info("updating catsrc configmap state")
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Expand All @@ -998,11 +1007,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
out = in.DeepCopy()

logger.Info("synchronizing registry server")
// extraLogContext is used when logging before exiting to provide
// additional context in cases where we exit early without an error
extraLogContext := "none"
defer func() {
logger.WithFields(logrus.Fields{
"continueSync": continueSync,
"syncError": syncError,
"extra": extraLogContext,
}).Info("registry server sync'ed")
}()

sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()}

srcReconciler := o.reconciler.ReconcilerForSource(in)
if srcReconciler == nil {
// TODO: Add failure status on catalogsource and remove from sources
// TODO: Add failure status on catalogsource and remove from sources )
syncError = fmt.Errorf("no reconciler for source type %s", in.Spec.SourceType)
out.SetError(v1alpha1.CatalogSourceRegistryServerError, syncError)
return
Expand All @@ -1015,24 +1035,23 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

logger.WithField("health", healthy).Infof("checked registry server health")
logger = logger.WithField("health", healthy)
logger.Info("checked registry server health")

if healthy && in.Status.RegistryServiceStatus != nil {
logger.Info("registry state good")
continueSync = true
// return here if catalog does not have polling enabled
if !out.Poll() {
logger.Info("polling not enabled, nothing more to do")
extraLogContext = "catalog does not have polling enabled"
return
}
}

// Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
logger.Info("ensuring registry server")

err = srcReconciler.EnsureRegistryServer(logger, out)
if err != nil {
if _, ok := err.(reconciler.UpdateNotReadyErr); ok {
extraLogContext = "update pod is not ready yet"
logger.Info("requeueing registry server for catalog update check: update pod not yet ready")
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), reconciler.CatalogPollingRequeuePeriod)
return
Expand All @@ -1042,8 +1061,6 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

logger.Info("ensured registry server")

// requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled
if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil {
if out.Spec.UpdateStrategy.Interval == nil {
Expand All @@ -1052,22 +1069,30 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}
if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError {
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError))
err := errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError)
logger.WithError(err).Error("registry server sync error: failed to parse registry poll interval")
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, err)
}
logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
extraLogContext = fmt.Sprintf("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now())
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)())
return
}

if err := o.sources.Remove(sourceKey); err != nil {
o.logger.WithError(err).Debug("error closing client connection")
o.logger.WithError(err).Error("registry server sync error: error closing client connection")
}

return
}

func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
defer func() {
logger.WithFields(logrus.Fields{
"continueSync": continueSync,
"syncError": syncError,
}).Info("registry server connection sync'ed")
}()
out = in.DeepCopy()

sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()}
Expand Down Expand Up @@ -1152,7 +1177,11 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
"catalogsource.name": catsrc.Name,
"id": queueinformer.NewLoopID(),
})
logger.Info("syncing catalog source")
defer func() {
logger.WithFields(logrus.Fields{
"syncError": syncError,
}).Info("catalog source sync'ed")
}()

syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) {
out = in
Expand Down
50 changes: 30 additions & 20 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

"github.com/google/go-cmp/cmp"
"github.com/operator-framework/api/pkg/operators/v1alpha1"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
Expand Down Expand Up @@ -201,10 +200,9 @@ func (c *GrpcRegistryReconciler) currentUpdatePods(logger *logrus.Entry, source
}

func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig) ([]*corev1.Pod, error) {
logger.Info("searching for current pods")
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels()))
if err != nil {
logger.WithError(err).Warn("couldn't find pod in cache")
logger.WithError(err).Warn("error searching for catalog source pods: couldn't find pod in cache")
return nil, nil
}
found := []*corev1.Pod{}
Expand All @@ -213,20 +211,10 @@ func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logr
return nil, err
}
for _, p := range pods {
images, hash := correctImages(source, p), podHashMatch(p, newPod)
logger = logger.WithFields(logrus.Fields{
"current-pod.namespace": p.Namespace, "current-pod.name": p.Name,
"correctImages": images, "correctHash": hash,
})
logger.Info("evaluating current pod")
if !hash {
logger.Infof("pod spec diff: %s", cmp.Diff(p.Spec, newPod.Spec))
}
if correctImages(source, p) && podHashMatch(p, newPod) {
found = append(found, p)
}
}
logger.Infof("of %d pods matching label selector, %d have the correct images and matching hash", len(pods), len(found))
return found, nil
}

Expand All @@ -252,64 +240,71 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
// if service status is nil, we force create every object to ensure they're created the first time
valid, err := isRegistryServiceStatusValid(&source)
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not validate registry service status")
return err
}
overwrite := !valid
if overwrite {
logger.Info("registry service status invalid, need to overwrite")
}

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
sa, err := c.ensureSA(source)
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.WithError(err).Error("error ensuring registry server: could not ensure registry service account")
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.GetName())
}

sa, err = c.OpClient.GetServiceAccount(sa.GetNamespace(), sa.GetName())
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not get registry service account")
return err
}

defaultPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace())
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not get default pod security config")
return err
}

// recreate the pod if no existing pod is serving the latest image or correct spec
current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, sa, defaultPodSecurityConfig)
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not get current pods with correct image and spec")
return err
}
overwritePod := overwrite || len(current) == 0
if overwritePod {
logger.Info("registry pods invalid, need to overwrite")
}

pod, err := source.Pod(sa, defaultPodSecurityConfig)
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not create registry pod")
return err
}
if err := c.ensurePod(logger, source, sa, defaultPodSecurityConfig, overwritePod); err != nil {
logger.WithError(err).Error("error ensuring registry server: could not ensure registry pod")
return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
}
if err := c.ensureUpdatePod(logger, sa, defaultPodSecurityConfig, source); err != nil {
logger.WithError(err).Error("error ensuring registry server: could not ensure update pod")
if _, ok := err.(UpdateNotReadyErr); ok {
logger.WithError(err).Error("error ensuring registry server: ensure update pod error is not of type UpdateNotReadyErr")
return err
}
return pkgerrors.Wrapf(err, "error ensuring updated catalog source pod: %s", pod.GetName())
}

service, err := source.Service()
if err != nil {
logger.WithError(err).Error("couldn't get service")
return err
}
if err := c.ensureService(source, overwrite); err != nil {
logger.WithError(err).Error("error ensuring registry server: could not ensure service")
return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName())
}

if overwritePod {
now := c.now()
service, err := source.Service()
if err != nil {
logger.WithError(err).Error("error ensuring registry server: could not get service")
return err
}
catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
Expand Down Expand Up @@ -589,6 +584,7 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal
serviceAccount := source.ServiceAccount()
serviceAccount, err := c.OpClient.GetServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName())
if err != nil {
logger.WithError(err).Error("registry service not healthy: could not get service account")
if !apierrors.IsNotFound(err) {
return false, err
}
Expand All @@ -597,27 +593,41 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal

registryPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace())
if err != nil {
logger.WithError(err).Error("registry service not healthy: could not get registry pod security config")
return false, err
}

// Check on registry resources
// TODO: add gRPC health check
service, err := c.currentService(source)
if err != nil {
logger.WithError(err).Error("registry service not healthy: could not get current service")
return false, err
}

currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
if err != nil {
logger.WithError(err).Error("registry service not healthy: could not get current pods")
return false, err
}

currentServiceAccount := c.currentServiceAccount(source)
if len(currentPods) < 1 ||
service == nil || c.currentServiceAccount(source) == nil {
service == nil || currentServiceAccount == nil {
logger.WithFields(logrus.Fields{
"numCurrentPods": len(currentPods),
"isServiceNil": service == nil,
"isCurrentServiceAccountNil": currentServiceAccount == nil,
}).Error("registry service not healthy: one or more required resources are missing")
return false, nil
}

podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace())
if e != nil {
logger.WithError(e).Error("registry service not healthy: could not detect and delete dead pods")
return false, fmt.Errorf("error deleting dead pods: %v", e)
}

return podsAreLive, nil
}

Expand Down

0 comments on commit b48b163

Please sign in to comment.