diff --git a/api/v1alpha1/pipeline_types.go b/api/v1alpha1/pipeline_types.go index b2773d0..c7ce5f7 100644 --- a/api/v1alpha1/pipeline_types.go +++ b/api/v1alpha1/pipeline_types.go @@ -185,7 +185,9 @@ func (p *PipelineStatus) setWaitingApproval(env string, waitingApproval WaitingA type EnvironmentStatus struct { WaitingApproval WaitingApproval `json:"waitingApproval,omitempty"` - Targets []TargetStatus `json:"targets,omitempty"` + // +optional + Promotion *PromotionStatus `json:"promotion,omitempty"` + Targets []TargetStatus `json:"targets,omitempty"` } // WaitingApproval holds the environment revision that's currently waiting approval. @@ -194,6 +196,31 @@ type WaitingApproval struct { Revision string `json:"revision"` } +// PromotionStatus represents the state of an attempted promotion to +// the enclosing Environment. +type PromotionStatus struct { + Revision string `json:"revision"` + LastAttemptedTime metav1.Time `json:"lastAttemptedTime"` + Succeeded bool `json:"succeeded"` + + // +optional + PullRequest *PullRequestDetails `json:"pullRequest,omitempty"` +} + +// Pull request states +const ( + PullRequestMerged = "merged" // Merged into the target branch, yay. + PullRequestAbandoned = "abandoned" // Closed without being merged, meaning it (very likely) won't proceed. + PullRequestMergeable = "mergeable" // Approved and passed checks, so ready to be merged. + PullRequestOpen = "open" // Open but not yet ready to be merged. +) + +// PullRequestStatus records the status of an attempted pull request promotion. +type PullRequestDetails struct { + URL string `json:"url"` + State string `json:"state"` +} + // ClusterAppReference is a fully-qualified target reference. It holds // the namespaced target name and its type, and the cluster reference // if the target is in a remote cluster. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 3670135..3ce30c7 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -78,6 +78,11 @@ func (in *Environment) DeepCopy() *Environment { func (in *EnvironmentStatus) DeepCopyInto(out *EnvironmentStatus) { *out = *in out.WaitingApproval = in.WaitingApproval + if in.Promotion != nil { + in, out := &in.Promotion, &out.Promotion + *out = new(PromotionStatus) + (*in).DeepCopyInto(*out) + } if in.Targets != nil { in, out := &in.Targets, &out.Targets *out = make([]TargetStatus, len(*in)) @@ -267,6 +272,42 @@ func (in *Promotion) DeepCopy() *Promotion { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PromotionStatus) DeepCopyInto(out *PromotionStatus) { + *out = *in + in.LastAttemptedTime.DeepCopyInto(&out.LastAttemptedTime) + if in.PullRequest != nil { + in, out := &in.PullRequest, &out.PullRequest + *out = new(PullRequestDetails) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PromotionStatus. +func (in *PromotionStatus) DeepCopy() *PromotionStatus { + if in == nil { + return nil + } + out := new(PromotionStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PullRequestDetails) DeepCopyInto(out *PullRequestDetails) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PullRequestDetails. +func (in *PullRequestDetails) DeepCopy() *PullRequestDetails { + if in == nil { + return nil + } + out := new(PullRequestDetails) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PullRequestPromotion) DeepCopyInto(out *PullRequestPromotion) { *out = *in diff --git a/charts/pipeline-controller/crds/pipelines.weave.works_pipelines.yaml b/charts/pipeline-controller/crds/pipelines.weave.works_pipelines.yaml index 96a1b44..4065c3b 100644 --- a/charts/pipeline-controller/crds/pipelines.weave.works_pipelines.yaml +++ b/charts/pipeline-controller/crds/pipelines.weave.works_pipelines.yaml @@ -353,6 +353,34 @@ spec: environments: additionalProperties: properties: + promotion: + description: PromotionStatus represents the state of an attempted + promotion to the enclosing Environment. + properties: + lastAttemptedTime: + format: date-time + type: string + pullRequest: + description: PullRequestStatus records the status of an + attempted pull request promotion. + properties: + state: + type: string + url: + type: string + required: + - state + - url + type: object + revision: + type: string + succeeded: + type: boolean + required: + - lastAttemptedTime + - revision + - succeeded + type: object targets: items: description: TargetStatus represents the status of an application diff --git a/config/crd/bases/pipelines.weave.works_pipelines.yaml b/config/crd/bases/pipelines.weave.works_pipelines.yaml index 96a1b44..4065c3b 100644 --- a/config/crd/bases/pipelines.weave.works_pipelines.yaml +++ b/config/crd/bases/pipelines.weave.works_pipelines.yaml @@ -353,6 +353,34 @@ spec: environments: additionalProperties: properties: + promotion: + description: PromotionStatus represents the state of an attempted + promotion to the enclosing Environment. + properties: + lastAttemptedTime: + format: date-time + type: string + pullRequest: + description: PullRequestStatus records the status of an + attempted pull request promotion. + properties: + state: + type: string + url: + type: string + required: + - state + - url + type: object + revision: + type: string + succeeded: + type: boolean + required: + - lastAttemptedTime + - revision + - succeeded + type: object targets: items: description: TargetStatus represents the status of an application diff --git a/controllers/leveltriggered/controller.go b/controllers/leveltriggered/controller.go index f4051ce..9c1b7cc 100644 --- a/controllers/leveltriggered/controller.go +++ b/controllers/leveltriggered/controller.go @@ -15,10 +15,12 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" "github.com/weaveworks/pipeline-controller/api/v1alpha1" @@ -86,9 +88,12 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c var unready bool for _, env := range pipeline.Spec.Environments { - var envStatus v1alpha1.EnvironmentStatus + envStatus, ok := pipeline.Status.Environments[env.Name] + if !ok { + envStatus = &v1alpha1.EnvironmentStatus{} + } envStatus.Targets = make([]v1alpha1.TargetStatus, len(env.Targets)) - envStatuses[env.Name] = &envStatus + envStatuses[env.Name] = envStatus for i, target := range env.Targets { targetStatus := &envStatus.Targets[i] @@ -210,9 +215,18 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c pipeline.GetNamespace(), pipeline.GetName(), ) + // If it's not ready, we can't make any promotion decisions. Requeue, presuming backoff. + if unready { + return ctrl.Result{Requeue: true}, nil + } + firstEnv := pipeline.Spec.Environments[0] - latestRevision := checkAllTargetsHaveSameRevision(pipeline.Status.Environments[firstEnv.Name]) + firstEnvStatus, ok := pipeline.Status.Environments[firstEnv.Name] + if !ok { + return ctrl.Result{}, fmt.Errorf("did not find status for environment listed first %q", firstEnv.Name) + } + latestRevision := checkAllTargetsHaveSameRevision(firstEnvStatus) if latestRevision == "" { // not all targets have the same revision, or have no revision set, so we can't proceed setPendingCondition(&pipeline, v1alpha1.EnvironmentNotReadyReason, "Waiting for all targets to have the same revision") @@ -223,7 +237,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } - if !checkAllTargetsAreReady(pipeline.Status.Environments[firstEnv.Name]) { + if !checkAllTargetsAreReady(firstEnvStatus) { // not all targets are ready, so we can't proceed setPendingCondition(&pipeline, v1alpha1.EnvironmentNotReadyReason, "Waiting for all targets to be ready") if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil { @@ -233,32 +247,60 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } - removePendingCondition(&pipeline) - if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil { - return ctrl.Result{}, fmt.Errorf("error removing pending condition: %w", err) + if removePendingCondition(&pipeline) { + if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil { + return ctrl.Result{}, fmt.Errorf("error removing pending condition: %w", err) + } } for _, env := range pipeline.Spec.Environments[1:] { + envStatus, ok := pipeline.Status.Environments[env.Name] + if !ok { + return ctrl.Result{}, fmt.Errorf("environment in spec %q does not have a calculated status", env.Name) + } + // if all targets run the latest revision and are ready, we can skip this environment - if checkAllTargetsRunRevision(pipeline.Status.Environments[env.Name], latestRevision) && checkAllTargetsAreReady(pipeline.Status.Environments[env.Name]) { + if checkAllTargetsRunRevision(envStatus, latestRevision) && checkAllTargetsAreReady(pipeline.Status.Environments[env.Name]) { continue } - if checkAnyTargetHasRevision(pipeline.Status.Environments[env.Name], latestRevision) { - return ctrl.Result{}, nil + // otherwise: if there's a promotion recorded, we can stop here. + if envStatus.Promotion != nil && envStatus.Promotion.Revision == latestRevision { + logger.Info("promotion already recorded", "env", env.Name, "revision", latestRevision) + break } - err := r.promoteLatestRevision(ctx, pipeline, env, latestRevision) + // other-otherwise: attempt a promotion + promoteErr := r.promoteLatestRevision(ctx, pipeline, env, latestRevision) + logger.Info("promoting env", "env", env.Name, "revision", latestRevision) + err := setPromotionStatus(&pipeline, env.Name, latestRevision, promoteErr) if err != nil { - return ctrl.Result{}, fmt.Errorf("error promoting new version: %w", err) + return ctrl.Result{}, fmt.Errorf("error recording promotion status: %w", err) } break } + if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, nil } +func setPromotionStatus(pipeline *v1alpha1.Pipeline, env, revision string, promErr error) error { + envStatus, ok := pipeline.Status.Environments[env] + if !ok { + return fmt.Errorf("environment %q not found in status", env) + } + var prom v1alpha1.PromotionStatus + prom.Revision = revision + prom.Succeeded = (promErr == nil) + prom.LastAttemptedTime = metav1.Now() + envStatus.Promotion = &prom + pipeline.Status.Environments[env] = envStatus + return nil +} + func setPendingCondition(pipeline *v1alpha1.Pipeline, reason, message string) { condition := metav1.Condition{ Type: conditions.PromotionPendingCondition, @@ -269,8 +311,12 @@ func setPendingCondition(pipeline *v1alpha1.Pipeline, reason, message string) { apimeta.SetStatusCondition(&pipeline.Status.Conditions, condition) } -func removePendingCondition(pipeline *v1alpha1.Pipeline) { - apimeta.RemoveStatusCondition(&pipeline.Status.Conditions, conditions.PromotionPendingCondition) +func removePendingCondition(pipeline *v1alpha1.Pipeline) bool { + ok := apimeta.FindStatusCondition(pipeline.Status.Conditions, conditions.PromotionPendingCondition) != nil + if ok { + apimeta.RemoveStatusCondition(&pipeline.Status.Conditions, conditions.PromotionPendingCondition) + } + return ok } func (r *PipelineReconciler) promoteLatestRevision(ctx context.Context, pipeline v1alpha1.Pipeline, env v1alpha1.Environment, revision string) error { @@ -299,16 +345,6 @@ func (r *PipelineReconciler) promoteLatestRevision(ctx context.Context, pipeline return err } -func checkAnyTargetHasRevision(env *v1alpha1.EnvironmentStatus, revision string) bool { - for _, target := range env.Targets { - if target.Revision == revision { - return true - } - } - - return false -} - func checkAllTargetsRunRevision(env *v1alpha1.EnvironmentStatus, revision string) bool { for _, target := range env.Targets { if target.Revision != revision { @@ -439,7 +475,9 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.Pipeline{}). + For(&v1alpha1.Pipeline{}, + builder.WithPredicates(predicate.GenerationChangedPredicate{}), + ). Watches( &clusterctrlv1alpha1.GitopsCluster{}, handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)), diff --git a/controllers/leveltriggered/controller_remote_test.go b/controllers/leveltriggered/controller_remote_test.go index 26b939f..e7cf368 100644 --- a/controllers/leveltriggered/controller_remote_test.go +++ b/controllers/leveltriggered/controller_remote_test.go @@ -99,6 +99,7 @@ func TestRemoteTargets(t *testing.T) { g.Expect(getTargetStatus(g, p, "test", 0).Ready).NotTo(BeTrue()) // we can see "target cluster client not synced" before "not found" g.Eventually(func() string { + p = getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) return getTargetStatus(g, p, "test", 0).Error }).Should(ContainSubstring("not found")) diff --git a/controllers/leveltriggered/controller_test.go b/controllers/leveltriggered/controller_test.go index 71bf55c..c529172 100644 --- a/controllers/leveltriggered/controller_test.go +++ b/controllers/leveltriggered/controller_test.go @@ -139,7 +139,10 @@ func TestReconcile(t *testing.T) { // the application hasn't been created, so we expect "not found" p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) g.Expect(getTargetStatus(g, p, "test", 0).Ready).NotTo(BeTrue()) - g.Expect(getTargetStatus(g, p, "test", 0).Error).To(ContainSubstring("not found")) + g.Eventually(func() string { + p = getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) + return getTargetStatus(g, p, "test", 0).Error + }, "2s").Should(ContainSubstring("not found")) // FIXME create the app app := kustomv1.Kustomization{ diff --git a/controllers/leveltriggered/promote_test.go b/controllers/leveltriggered/promote_test.go index f1b1d75..1fc143c 100644 --- a/controllers/leveltriggered/promote_test.go +++ b/controllers/leveltriggered/promote_test.go @@ -2,6 +2,7 @@ package leveltriggered import ( "context" + "fmt" "testing" helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" @@ -40,20 +41,62 @@ func TestPromotionAlgorithm(t *testing.T) { mockStrategy := installMockStrategy(t, pipelineReconciler) mockStrategy.EXPECT().Handles(gomock.Any()).Return(true).AnyTimes() + // These operations act to assert the property that _a promotion + // at a particular revision will not be retried_. This property + // holds only if the test cases do not move the pipeline back to a + // prior revision. This is something that could happen in normal + // operation (i.e., that property would not hold); but avoided in + // these test cases, without loss of generality. + + type promotionAt struct { + env, version string + } + + promoted := map[promotionAt]bool{} // environment+version -> absent=not attempted|false=waiting|true=done + completePromotion := func(env, version string) { + done, ok := promoted[promotionAt{env, version}] + if !ok { + panic("completePromotion called for unstarted promotion. This suggests a faulty test case.") + } + if done { + panic("completePromotion called for completed promotion. This suggests a faulty test case") + } + switch env { + case "staging": + setAppRevision(ctx, g, stagingApp, version) + case "prod": + setAppRevision(ctx, g, prodApp, version) + default: + panic("Unexpected environment. Make sure to setup the pipeline properly in the test.") + } + fmt.Printf("[DEBUG] complete promotion %s->%s\n", env, version) + promoted[promotionAt{env, version}] = true + } + startPromotion := func(env, version string) { + if _, ok := promoted[promotionAt{env, version}]; ok { + panic("attempting to replay a promotion " + env + "->" + version + "; this indicates bad code") + } + fmt.Printf("[DEBUG] start promotion %s->%s\n", env, version) + promoted[promotionAt{env, version}] = false + } + isPromotionStarted := func(env, version string) bool { + done, ok := promoted[promotionAt{env, version}] + return ok && !done + } mockStrategy.EXPECT(). Promote(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). Do(func(ctx context.Context, p v1alpha1.Promotion, prom strategy.Promotion) { - switch prom.Environment.Name { - case "staging": - setAppRevision(ctx, g, stagingApp, prom.Version) - case "prod": - setAppRevision(ctx, g, prodApp, prom.Version) - default: - panic("Unexpected environment. Make sure to setup the pipeline properly in the test.") - } + startPromotion(prom.Environment.Name, prom.Version) }) + checkAndCompletePromotion := func(g Gomega, env, version string) { + g.Eventually(func() bool { + return isPromotionStarted(env, version) + }).Should(BeTrue()) + completePromotion(env, version) + } + t.Run("promotes revision to all environments", func(t *testing.T) { g := testingutils.NewGomegaWithT(t) name := "pipeline-" + rand.String(5) @@ -105,10 +148,13 @@ func TestPromotionAlgorithm(t *testing.T) { // Bumping dev revision to trigger the promotion setAppRevisionAndReadyStatus(ctx, g, devApp, versionToPromote) + checkAndCompletePromotion(g, "staging", versionToPromote) + checkAndCompletePromotion(g, "prod", versionToPromote) // checks if the revision of all target status is v1.0.1 + var p *v1alpha1.Pipeline g.Eventually(func() bool { - p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) + p = getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) for _, env := range p.Spec.Environments { if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], versionToPromote) { @@ -123,25 +169,33 @@ func TestPromotionAlgorithm(t *testing.T) { return true }, "5s", "0.2s").Should(BeTrue()) + // success through to prod + p = getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) + checkPromotionSuccess(g, p, versionToPromote, "prod") + t.Run("triggers another promotion if the app is updated again", func(t *testing.T) { + const newVersion = "v1.0.2" g := testingutils.NewGomegaWithT(t) // Bumping dev revision to trigger the promotion - setAppRevisionAndReadyStatus(ctx, g, devApp, "v1.0.2") + setAppRevisionAndReadyStatus(ctx, g, devApp, newVersion) + checkAndCompletePromotion(g, "staging", newVersion) + checkAndCompletePromotion(g, "prod", newVersion) - // checks if the revision of all target status is v1.0.2 + // checks if the revision of all target status is the new version g.Eventually(func() bool { p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) for _, env := range p.Spec.Environments { - if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], "v1.0.2") { + if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], newVersion) { return false } } - return true }, "5s", "0.2s").Should(BeTrue()) - }) + p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline)) + checkPromotionSuccess(g, p, newVersion, "prod") + }) }) t.Run("sets PipelinePending condition", func(t *testing.T) { @@ -211,6 +265,19 @@ func TestPromotionAlgorithm(t *testing.T) { }) } +func checkPromotionSuccess(g Gomega, pipeline *v1alpha1.Pipeline, revision, lastEnv string) { + for _, env := range pipeline.Spec.Environments[1:] { // no promotion in the first env + prom := pipeline.Status.Environments[env.Name].Promotion + g.Expect(prom).NotTo(BeNil()) + g.Expect(prom.LastAttemptedTime).NotTo(BeZero()) + g.Expect(prom.Succeeded).To(BeTrue()) + g.Expect(prom.Revision).To(Equal(revision)) + if env.Name == lastEnv { + return + } + } +} + func setAppRevisionAndReadyStatus(ctx context.Context, g Gomega, hr *helmv2.HelmRelease, revision string) { setAppRevision(ctx, g, hr, revision) setAppStatusReadyCondition(ctx, g, hr)