Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up "object has already been modified" warnings #700 #717

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified
.PHONY: deploy
deploy: manifests kustomize ## Deploy controller manager to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side=true -oyaml -f -
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side=true -f -

.PHONY: undeploy
undeploy: ## Undeploy controller manager from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
Expand Down
71 changes: 65 additions & 6 deletions operator/internal/controller/auto/update_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -41,6 +42,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -77,13 +79,15 @@ type UpdateReconciler struct {
// Reconcile manages the Update CRD and initiates Pulumi operations.
func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
l.Info("Reconciling Update")

obj := &autov1alpha1.Update{}
err := r.Get(ctx, req.NamespacedName, obj)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
l = l.WithValues("revision", obj.ResourceVersion)
ctx = log.IntoContext(ctx, l)
l.Info("Reconciling Update")

rs := newReconcileSession(r.Client, obj)

Expand Down Expand Up @@ -116,12 +120,32 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, fmt.Errorf("failed to update the status: %w", err)
}

// TODO check the w status before proceeding
// Get the workspace and check that it is ready
w := &autov1alpha1.Workspace{}
err = r.Client.Get(ctx, client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.WorkspaceName}, w)
if err != nil {
if apierrors.IsNotFound(err) {
l.Info("Workspace not found", "workspace", obj.Spec.WorkspaceName)
rs.progressing.Status = metav1.ConditionFalse
rs.progressing.Reason = "WorkspaceNotFound"
rs.failed.Status = metav1.ConditionFalse
rs.failed.Reason = UpdateConditionReasonProgressing
rs.complete.Status = metav1.ConditionFalse
rs.complete.Reason = UpdateConditionReasonProgressing
return ctrl.Result{}, rs.updateStatus(ctx, obj)
}
return ctrl.Result{}, fmt.Errorf("failed to get workspace: %w", err)
}
if !isWorkspaceReady(w) {
l.Info("Workspace not ready", "workspace", w.Name)
rs.progressing.Status = metav1.ConditionFalse
rs.progressing.Reason = "WorkspaceNotReady"
rs.failed.Status = metav1.ConditionFalse
rs.failed.Reason = UpdateConditionReasonProgressing
rs.complete.Status = metav1.ConditionFalse
rs.complete.Reason = UpdateConditionReasonProgressing
return ctrl.Result{}, rs.updateStatus(ctx, obj)
}

// Connect to the workspace's GRPC server
addr := fmt.Sprintf("%s:%d", fqdnForService(w), WorkspaceGrpcPort)
Expand Down Expand Up @@ -168,6 +192,36 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}

func isWorkspaceReady(ws *autov1alpha1.Workspace) bool {
if ws == nil || ws.Generation != ws.Status.ObservedGeneration {
return false
}
return meta.IsStatusConditionTrue(ws.Status.Conditions, autov1alpha1.WorkspaceReady)
}

type workspaceReadyPredicate struct{}

var _ predicate.Predicate = &workspaceReadyPredicate{}

func (workspaceReadyPredicate) Create(e event.CreateEvent) bool {
return isWorkspaceReady(e.Object.(*autov1alpha1.Workspace))
}
EronWright marked this conversation as resolved.
Show resolved Hide resolved

func (workspaceReadyPredicate) Delete(_ event.DeleteEvent) bool {
return false
}

func (workspaceReadyPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
return !isWorkspaceReady(e.ObjectOld.(*autov1alpha1.Workspace)) && isWorkspaceReady(e.ObjectNew.(*autov1alpha1.Workspace))
EronWright marked this conversation as resolved.
Show resolved Hide resolved
}

func (workspaceReadyPredicate) Generic(_ event.GenericEvent) bool {
return false
}

type reconcileSession struct {
progressing *metav1.Condition
complete *metav1.Condition
Expand Down Expand Up @@ -375,13 +429,18 @@ func (r *UpdateReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&autov1alpha1.Update{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&autov1alpha1.Workspace{},
handler.EnqueueRequestsFromMapFunc(r.mapWorkspaceToUpdate),
builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
builder.WithPredicates(&workspaceReadyPredicate{})).
Complete(r)
}

func indexUpdateByWorkspace(obj client.Object) []string {
w := obj.(*autov1alpha1.Update)
return []string{w.Spec.WorkspaceName}
u := obj.(*autov1alpha1.Update)
complete := meta.IsStatusConditionTrue(u.Status.Conditions, UpdateConditionTypeComplete)
if complete {
// don't index the completed updates, to avoid unnecessary reconciles when their workspace is updated
return []string{}
}
return []string{u.Spec.WorkspaceName}
}

func (r *UpdateReconciler) mapWorkspaceToUpdate(ctx context.Context, obj client.Object) []reconcile.Request {
Expand Down Expand Up @@ -494,7 +553,7 @@ func (s streamReader[T]) Result() (result, error) {
continue // No result yet.
}

s.l.Info("Result received", "result", res)
s.l.Info("Update complete", "result", res)

s.obj.Status.StartTime = metav1.NewTime(res.GetSummary().StartTime.AsTime())
s.obj.Status.EndTime = metav1.NewTime(res.GetSummary().EndTime.AsTime())
Expand Down
31 changes: 31 additions & 0 deletions operator/internal/controller/auto/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

func connect(ctx context.Context, addr string) (*grpc.ClientConn, error) {
Expand Down Expand Up @@ -84,3 +87,31 @@ func marshalConfigValue(item autov1alpha1.ConfigItem) *agentpb.ConfigValue {
}
return v
}

var l = log.Log.WithName("predicate").WithName("debug")

type DebugPredicate struct {
Controller string
}

var _ predicate.Predicate = &DebugPredicate{}

func (p *DebugPredicate) Create(e event.CreateEvent) bool {
l.V(1).Info("Create", "controller", p.Controller, "type", fmt.Sprintf("%T", e.Object), "name", e.Object.GetName(), "revision", e.Object.GetResourceVersion())
return true
}

func (p *DebugPredicate) Delete(e event.DeleteEvent) bool {
l.V(1).Info("Delete", "controller", p.Controller, "type", fmt.Sprintf("%T", e.Object), "name", e.Object.GetName(), "revision", e.Object.GetResourceVersion())
return true
}

func (p *DebugPredicate) Update(e event.UpdateEvent) bool {
l.V(1).Info("Update", "controller", p.Controller, "type", fmt.Sprintf("%T", e.ObjectOld), "name", e.ObjectOld.GetName(), "old-revision", e.ObjectOld.GetResourceVersion(), "new-revision", e.ObjectNew.GetResourceVersion())
return true
}

func (p *DebugPredicate) Generic(e event.GenericEvent) bool {
l.V(1).Info("Generic", "controller", p.Controller, "type", fmt.Sprintf("%T", e.Object), "name", e.Object.GetName(), "revision", e.Object.GetResourceVersion())
return true
}
48 changes: 43 additions & 5 deletions operator/internal/controller/auto/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand Down Expand Up @@ -78,13 +79,14 @@ type WorkspaceReconciler struct {

func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
l.Info("Reconciling Workspace")

w := &autov1alpha1.Workspace{}
err := r.Get(ctx, req.NamespacedName, w)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
l = l.WithValues("revision", w.ResourceVersion)
l.Info("Reconciling Workspace")

// apply defaults to the workspace spec
// future: use a mutating webhook to apply defaults
Expand All @@ -108,7 +110,11 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err := r.Status().Update(ctx, w)
if err != nil {
l.Error(err, "updating status")
} else {
l = log.FromContext(ctx).WithValues("revision", w.ResourceVersion)
l.V(1).Info("Status updated")
}

return err
}

Expand Down Expand Up @@ -319,15 +325,47 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// SetupWithManager sets up the controller with the Manager.
func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("workspace-controller").
For(&autov1alpha1.Workspace{},
builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.Service{},
builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
builder.WithPredicates(predicate.GenerationChangedPredicate{}, &DebugPredicate{Controller: "workspace-controller"})).
Owns(&appsv1.StatefulSet{},
builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
builder.WithPredicates(&statefulSetReadyPredicate{}, &DebugPredicate{Controller: "workspace-controller"})).
Complete(r)
}

type statefulSetReadyPredicate struct{}

var _ predicate.Predicate = &statefulSetReadyPredicate{}

func isStatefulSetReady(ss *appsv1.StatefulSet) bool {
if ss.Status.ObservedGeneration != ss.Generation || ss.Status.UpdateRevision != ss.Status.CurrentRevision {
return false
}
if ss.Status.AvailableReplicas < 1 {
return false
}
return true
}

func (statefulSetReadyPredicate) Create(e event.CreateEvent) bool {
return isStatefulSetReady(e.Object.(*appsv1.StatefulSet))
}

func (statefulSetReadyPredicate) Delete(_ event.DeleteEvent) bool {
return false
}

func (statefulSetReadyPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
return !isStatefulSetReady(e.ObjectOld.(*appsv1.StatefulSet)) && isStatefulSetReady(e.ObjectNew.(*appsv1.StatefulSet))
}

func (statefulSetReadyPredicate) Generic(_ event.GenericEvent) bool {
return false
}

const (
FieldManager = "pulumi-kubernetes-operator"
WorkspacePulumiContainerName = "pulumi"
Expand Down
70 changes: 43 additions & 27 deletions operator/internal/controller/pulumi/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/pulumi/pulumi-kubernetes-operator/v2/operator/api/pulumi/shared"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

func (sess *stackReconcilerSession) SetupWorkspaceFromFluxSource(ctx context.Context, source unstructured.Unstructured, fluxSource *shared.FluxSource) (string, error) {
Expand Down Expand Up @@ -70,37 +72,28 @@ func checksumOrDigest(source unstructured.Unstructured) (string, error) {
return checksum, nil
}

// checkFluxSourceReady looks for the conventional "Ready" condition to see if the supplied object
// can be considered _not_ ready. It returns an error if it can determine that the object is not
// ready, and nil if it cannot determine so.
func checkFluxSourceReady(obj unstructured.Unstructured) error {
func checkFluxSourceReady(obj *unstructured.Unstructured) bool {
EronWright marked this conversation as resolved.
Show resolved Hide resolved
observedGeneration, ok, err := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
if !ok || err != nil || observedGeneration != obj.GetGeneration() {
return false
}
conditions, ok, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
if ok && err == nil {
// didn't find a []Condition, so there's nothing to indicate that it's not ready there
for _, c0 := range conditions {
var c map[string]interface{}
if c, ok = c0.(map[string]interface{}); !ok {
// condition isn't the right shape, try the next one
continue
}
if t, ok, err := unstructured.NestedString(c, "type"); ok && err == nil && t == "Ready" {
if v, ok, err := unstructured.NestedString(c, "status"); ok && err == nil && v == "True" {
// found the Ready condition and it is actually ready; proceed to next check
break
}
// found the Ready condition and it's something other than ready
return fmt.Errorf("source Ready condition does not have status True %#v", c)
if !ok || err != nil {
return false
}
for _, c0 := range conditions {
var c map[string]interface{}
if c, ok = c0.(map[string]interface{}); !ok {
// condition isn't the right shape, try the next one
continue
}
if t, ok, err := unstructured.NestedString(c, "type"); ok && err == nil && t == "Ready" {
if v, ok, err := unstructured.NestedString(c, "status"); ok && err == nil && v == "True" {
return true
}
}
// Ready=true, or no ready condition to tell us either way
}

_, ok, err = unstructured.NestedMap(obj.Object, "status", "artifact")
if !ok || err != nil {
return fmt.Errorf(".status.artifact does not have an Artifact object")
}

return nil
return false
}

func getSourceGVK(src shared.FluxSourceReference) (schema.GroupVersionKind, error) {
Expand All @@ -111,3 +104,26 @@ func getSourceGVK(src shared.FluxSourceReference) (schema.GroupVersionKind, erro
func fluxSourceKey(gvk schema.GroupVersionKind, name string) string {
return fmt.Sprintf("%s:%s", gvk, name)
}

type fluxSourceReadyPredicate struct{}

var _ predicate.Predicate = &fluxSourceReadyPredicate{}

func (fluxSourceReadyPredicate) Create(e event.CreateEvent) bool {
return checkFluxSourceReady(e.Object.(*unstructured.Unstructured))
}
EronWright marked this conversation as resolved.
Show resolved Hide resolved

func (fluxSourceReadyPredicate) Delete(_ event.DeleteEvent) bool {
return false
}

func (fluxSourceReadyPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
return !checkFluxSourceReady(e.ObjectOld.(*unstructured.Unstructured)) && checkFluxSourceReady(e.ObjectNew.(*unstructured.Unstructured))
}

func (fluxSourceReadyPredicate) Generic(_ event.GenericEvent) bool {
return false
}
Loading