Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
[Fix] fix continuous reconciles and status updates
Browse files Browse the repository at this point in the history
Signed-off-by: Varsha Prasad Narsing <[email protected]>
  • Loading branch information
varshaprasad96 committed Sep 7, 2023
1 parent 863cf15 commit 8479c98
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/operator-framework/rukpak/api/v1alpha2"

v1alpha2deployer "github.com/operator-framework/rukpak/internal/controllers/v1alpha2/deployer"
Expand Down Expand Up @@ -106,6 +107,11 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}

// Skip reconciling if `spec.paused` is set.
if existingBD.Spec.Paused {
return ctrl.Result{}, nil
}

reconciledBD := existingBD.DeepCopy()
res, reconcileErr := b.reconcile(ctx, reconciledBD)
// Update the status subresource before updating the main object. This is
Expand All @@ -114,36 +120,71 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
// complete. Therefore, we need to make the status update prior to the main
// object update to ensure that the status update can be processed before
// a potential deletion.
// The controller is not updating spec, we only update the status. Hence sending
// a status update should be enough.
if !equality.Semantic.DeepEqual(existingBD.Status, reconciledBD.Status) {
fmt.Println("before update", cmp.Diff(existingBD, reconciledBD))
if updateErr := b.Status().Update(ctx, reconciledBD); updateErr != nil {
return res, apimacherrors.NewAggregate([]error{reconcileErr, updateErr})
}
}
existingBD.Status, reconciledBD.Status = v1alpha2.BundleDeploymentStatus{}, v1alpha2.BundleDeploymentStatus{}
if !equality.Semantic.DeepEqual(existingBD, reconciledBD) {
if updateErr := b.Update(ctx, reconciledBD); updateErr != nil {
return res, apimacherrors.NewAggregate([]error{reconcileErr, updateErr})
}
}
return res, reconcileErr
}

func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2.BundleDeployment) (ctrl.Result, error) {

Check failure on line 134 in internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go

View workflow job for this annotation

GitHub Actions / lint

(*bundleDeploymentReconciler).reconcile - result 0 (sigs.k8s.io/controller-runtime/pkg/reconcile.Result) is always nil (unparam)
bundleDepFS, err := b.unpackContents(ctx, bd)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error unpacking contents: %v", err)
// Unpack contents from the bundle deployment for each of the specified source and update
// the status of the object.
bundleDepFS, state, err := b.unpackContents(ctx, bd)
switch state {
case v1alpha2source.StateUnpackPending:
setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, nil
case v1alpha2source.StateUnpacking:
setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, nil
case v1alpha2source.StateUnpackFailed:
setUnpackStatusFailing(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2source.StateUnpacked:
setUnpackStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("unpacked %s", bd.GetName()), bd.Generation)
default:
return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation)

Check failure on line 151 in internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go

View workflow job for this annotation

GitHub Actions / lint

`unkown` is a misspelling of `unknown` (misspell)
}

// Unpacked contents from each source would now be availabe in the fs. Validate

Check failure on line 154 in internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go

View workflow job for this annotation

GitHub Actions / lint

`availabe` is a misspelling of `available` (misspell)
// if the contents together conform to the specified format.
if err = b.validateContents(ctx, bd, bundleDepFS); err != nil {
return ctrl.Result{}, fmt.Errorf("error validating contents for bundle %s with format %s: %v", bd.Name, bd.Spec.Format, err)
validateErr := fmt.Errorf("error validating contents for bundle %s with format %s: %v", bd.Name, bd.Spec.Format, err)
setValidateFailing(&bd.Status.Conditions, validateErr.Error(), bd.Generation)
return ctrl.Result{}, validateErr
}

var deployedObjects []client.Object
if deployedObjects, err = b.deployContents(ctx, bd, bundleDepFS); err != nil {
return ctrl.Result{}, fmt.Errorf("error deploying contents: %v", err)
setValidateSuccess(&bd.Status.Conditions, fmt.Sprintf("validate successful for bundle deployment %s", bd.GetName()), bd.Generation)

// Deploy the validated contents onto the cluster.
// The deployer should return the list of objects which have been deployed, so that
// controller can be configured to set up watches for them.
deployRes, err := b.deployContents(ctx, bd, bundleDepFS)
switch deployRes.State {
case v1alpha2deployer.StateIntallFailed:
setInstallStatusFailed(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2deployer.StateUnpgradeFailed:
setUnpackStatusFailing(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2deployer.StateReconcileFailed:
setReconcileStatusFailed(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2deployer.StateObjectFetchFailed:
setDynamicWatchFailed(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2deployer.StateDeploySuccessful:
setInstallStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("installed %s", bd.GetName()), bd.Generation)
default:
return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation)

Check failure on line 183 in internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go

View workflow job for this annotation

GitHub Actions / lint

`unkown` is a misspelling of `unknown` (misspell)
}

for _, obj := range deployedObjects {
fmt.Println("deplpoy done")
for _, obj := range deployRes.AppliedObjects {
uMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
setDynamicWatchFailed(&bd.Status.Conditions, err.Error(), bd.Generation)
Expand Down Expand Up @@ -171,61 +212,57 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2
return ctrl.Result{}, err
}
}

fmt.Println("deployed contents successfully")

return ctrl.Result{}, nil
}

// unpackContents unpacks contents from all the sources, and stores under a directory referenced by the bundle deployment name.
func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, error) {
// add status to mention that the contents are being unpacked.
setUnpackStatusPending(&bd.Status.Conditions, fmt.Sprintf("unpacking bundledeployment %q", bd.GetName()), bd.Generation)

// It returns the consolidated state on whether contents from all the sources have been unpacked.
func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, v1alpha2source.State, error) {
// set a base filesystem path and unpack contents under the root filepath defined by
// bundledeployment name.
bundleDepFs := afero.NewBasePathFs(afero.NewOsFs(), bd.GetName())

errs := make([]error, 0)
unpackResult := make([]v1alpha2source.Result, len(bd.Spec.Sources))

// Unpack each of the sources individually, and consolidate all their results into one.
for _, source := range bd.Spec.Sources {
res, err := b.unpacker.Unpack(ctx, bd.Name, &source, bundleDepFs)
res, err := b.unpacker.Unpack(ctx, bd.Name, source, bundleDepFs)
if err != nil {
errs = append(errs, fmt.Errorf("error unpacking from %s source: %q: %v", source.Kind, res.Message, err))
}
}

// Even if one source has not unpacked, update Bundle Deployment status accordingly.
for _, res := range unpackResult {
if res.State == v1alpha2source.StateUnpackPending {
return &bundleDepFs, v1alpha2source.StateUnpackPending, nil
} else if res.State == v1alpha2source.StateUnpacking {
return &bundleDepFs, v1alpha2source.StateUnpacking, nil
}
}

if len(errs) != 0 {
setUnpackStatusFailing(&bd.Status.Conditions, fmt.Sprintf("unpacking failure %q", bd.GetName()), bd.Generation)
return &bundleDepFs, v1alpha2source.StateUnpackFailed, apimacherrors.NewAggregate(errs)
}

setUnpackStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("unpacking successful %q", bd.GetName()), bd.Generation)
return &bundleDepFs, apimacherrors.NewAggregate(errs)
return &bundleDepFs, v1alpha2source.StateUnpacked, nil
}

// validateContents validates if the unpacked bundle contents are of the right format.
func (b *bundleDeploymentReconciler) validateContents(ctx context.Context, bd *v1alpha2.BundleDeployment, fs *afero.Fs) error {
setValidatePending(&bd.Status.Conditions, fmt.Sprintf("validating bundledeployment %q", bd.GetName()), bd.Generation)

errs := make([]error, 0)
for _, validator := range b.validators {
if err := validator.Validate(ctx, *fs, bd); err != nil {
if err := validator.Validate(ctx, *fs, *bd); err != nil {
errs = append(errs, err)
}
}
if len(errs) != 0 {
setValidateFailing(&bd.Status.Conditions, fmt.Sprintf("validating failure %q", bd.GetName()), bd.Generation)
}

setValidateSuccess(&bd.Status.Conditions, fmt.Sprintf("validating successful %q", bd.GetName()), bd.Generation)
return apimacherrors.NewAggregate(errs)
}

func (b *bundleDeploymentReconciler) deployContents(ctx context.Context, bd *v1alpha2.BundleDeployment, fs *afero.Fs) ([]client.Object, error) {
deployedObjects, err := b.deployer.Deploy(ctx, *fs, bd)
if err != nil {
return nil, fmt.Errorf("error deploying contents: %v", err)
}
return deployedObjects, nil
// deployContents calls the registered deployer to apply the bundle contents onto the cluster.
func (b *bundleDeploymentReconciler) deployContents(ctx context.Context, bd *v1alpha2.BundleDeployment, fs *afero.Fs) (*v1alpha2deployer.Result, error) {
return b.deployer.Deploy(ctx, *fs, bd)
}

func (b *bundleDeploymentReconciler) validateConfig() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// setUnpackStatusPending sets the resolved status condition to success.
func setUnpackStatusPending(conditions *[]metav1.Condition, message string, generation int64) {
// setUnpackStatusPending sets the unpack status condition to unpacking.
func setUnpackStausPacking(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeUnpacked,
Status: metav1.ConditionFalse,
Expand All @@ -17,7 +17,18 @@ func setUnpackStatusPending(conditions *[]metav1.Condition, message string, gene
})
}

// setUnpackStatusFailing sets the resolved status condition to success.
// setUnpackStatusPending sets the unpack status condition to pending.
func setUnpackStatusPending(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeUnpacked,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonUnpackPending,
Message: message,
ObservedGeneration: generation,
})
}

// setUnpackStatusFailing sets the unpack status condition to failing.
func setUnpackStatusFailing(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeUnpacked,
Expand All @@ -28,7 +39,7 @@ func setUnpackStatusFailing(conditions *[]metav1.Condition, message string, gene
})
}

// setUnpackStatusSuccess sets the resolved status condition to success.
// setUnpackStatusSuccess sets the unpack status condition to success.
func setUnpackStatusSuccess(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeUnpacked,
Expand All @@ -39,7 +50,7 @@ func setUnpackStatusSuccess(conditions *[]metav1.Condition, message string, gene
})
}

// setValidatePending sets the resolved status condition to success.
// setValidatePending sets the unpack status condition to pending.
func setValidatePending(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeValidated,
Expand All @@ -50,7 +61,7 @@ func setValidatePending(conditions *[]metav1.Condition, message string, generati
})
}

// setValidateFailing sets the resolved status condition to success.
// setValidateFailing sets the unpack status condition to failing.
func setValidateFailing(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeValidated,
Expand All @@ -61,7 +72,7 @@ func setValidateFailing(conditions *[]metav1.Condition, message string, generati
})
}

// setValidateSuccess sets the resolved status condition to success.
// setValidateSuccess sets the unpack status condition to success.
func setValidateSuccess(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeValidated,
Expand All @@ -72,6 +83,9 @@ func setValidateSuccess(conditions *[]metav1.Condition, message string, generati
})
}

// setDynamicWatchFailed sets the installed status to failing with the appropriate reason.
// This status appears when there is an error while fetching the applied objects from cluster
// after the deployer has returned so as to set watches on them.
func setDynamicWatchFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Expand All @@ -81,3 +95,49 @@ func setDynamicWatchFailed(conditions *[]metav1.Condition, message string, gener
ObservedGeneration: generation,
})
}

// setInstallFailed sets the installed success to failing.
func setInstallStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonInstallFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setUnpgradeFailed sets the installed success to failing as there is an error while patching
// objects on cluster.
func setUnpgradeStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonUpgradeFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setReconcileStatusFailed sets the installed success to failing as there is an error while reconciling
// existing objects on cluster.
func setReconcileStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonReconcileFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setInstallStatusSuccess sets the installed success to success.
func setInstallStatusSuccess(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionTrue,
Reason: v1alpha2.ReasonInstallationSucceeded,
Message: message,
ObservedGeneration: generation,
})
}
Loading

0 comments on commit 8479c98

Please sign in to comment.