diff --git a/api/v1alpha1/ingressreplica_types.go b/api/v1alpha1/ingressreplica_types.go new file mode 100644 index 00000000..c970a1db --- /dev/null +++ b/api/v1alpha1/ingressreplica_types.go @@ -0,0 +1,55 @@ +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func init() { + SchemeBuilder.Register(&IngressReplica{}, &IngressReplicaList{}) +} + +// IngressReplicaList contains a list of [IngressReplica] +// +kubebuilder:object:root=true +type IngressReplicaList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []IngressReplica `json:"items"` +} + +// IngressReplica is the Schema for the console ingress replica +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Namespaced +// +kubebuilder:subresource:status +type IngressReplica struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec of the IngressReplica + // +kubebuilder:validation:Required + Spec IngressReplicaSpec `json:"spec"` + + // Status of the IngressReplica + // +kubebuilder:validation:Optional + Status Status `json:"status,omitempty"` +} + +type IngressReplicaSpec struct { + // +kubebuilder:validation:Required + IngressRef corev1.ObjectReference `json:"ingressRef"` + + // +kubebuilder:validation:Optional + IngressClassName *string `json:"ingressClassName,omitempty"` + + // +kubebuilder:validation:Optional + TLS []v1.IngressTLS `json:"tls,omitempty"` + + // +kubebuilder:validation:Required + HostMappings map[string]string `json:"hostMappings"` +} + +func (in *IngressReplica) SetCondition(condition metav1.Condition) { + meta.SetStatusCondition(&in.Status.Conditions, condition) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 6cc6cc1d..0b99b29e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ import ( "github.com/pluralsh/console/go/client" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -347,6 +348,100 @@ func (in *HelmSpec) DeepCopy() *HelmSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IngressReplica) DeepCopyInto(out *IngressReplica) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressReplica. +func (in *IngressReplica) DeepCopy() *IngressReplica { + if in == nil { + return nil + } + out := new(IngressReplica) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IngressReplica) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IngressReplicaList) DeepCopyInto(out *IngressReplicaList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]IngressReplica, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressReplicaList. +func (in *IngressReplicaList) DeepCopy() *IngressReplicaList { + if in == nil { + return nil + } + out := new(IngressReplicaList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IngressReplicaList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IngressReplicaSpec) DeepCopyInto(out *IngressReplicaSpec) { + *out = *in + out.IngressRef = in.IngressRef + if in.IngressClassName != nil { + in, out := &in.IngressClassName, &out.IngressClassName + *out = new(string) + **out = **in + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = make([]networkingv1.IngressTLS, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.HostMappings != nil { + in, out := &in.HostMappings, &out.HostMappings + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressReplicaSpec. +func (in *IngressReplicaSpec) DeepCopy() *IngressReplicaSpec { + if in == nil { + return nil + } + out := new(IngressReplicaSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PipelineGate) DeepCopyInto(out *PipelineGate) { *out = *in diff --git a/charts/deployment-operator/Chart.yaml b/charts/deployment-operator/Chart.yaml index 28dd0b53..26b4e0bd 100644 --- a/charts/deployment-operator/Chart.yaml +++ b/charts/deployment-operator/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: deployment-operator description: creates a new instance of the plural deployment operator -appVersion: 0.4.45 -version: 0.4.45 +appVersion: 0.4.46 +version: 0.4.46 maintainers: - name: Plural url: https://www.plural.sh diff --git a/charts/deployment-operator/crds/deployments.plural.sh_ingressreplicas.yaml b/charts/deployment-operator/crds/deployments.plural.sh_ingressreplicas.yaml new file mode 100644 index 00000000..ce8969c2 --- /dev/null +++ b/charts/deployment-operator/crds/deployments.plural.sh_ingressreplicas.yaml @@ -0,0 +1,198 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.3 + name: ingressreplicas.deployments.plural.sh +spec: + group: deployments.plural.sh + names: + kind: IngressReplica + listKind: IngressReplicaList + plural: ingressreplicas + singular: ingressreplica + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: IngressReplica is the Schema for the console ingress replica + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec of the IngressReplica + properties: + hostMappings: + additionalProperties: + type: string + type: object + ingressClassName: + type: string + ingressRef: + description: ObjectReference contains enough information to let you + inspect or modify the referred object. + properties: + apiVersion: + description: API version of the referent. + type: string + fieldPath: + description: |- + If referring to a piece of an object instead of an entire object, this string + should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2]. + For example, if the object reference is to a container within a pod, this would take on a value like: + "spec.containers{name}" (where "name" refers to the name of the container that triggered + the event) or if no container name is specified "spec.containers[2]" (container with + index 2 in this pod). This syntax is chosen only to have some well-defined way of + referencing a part of an object. + type: string + kind: + description: |- + Kind of the referent. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + namespace: + description: |- + Namespace of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + type: string + resourceVersion: + description: |- + Specific resourceVersion to which this reference is made, if any. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency + type: string + uid: + description: |- + UID of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids + type: string + type: object + x-kubernetes-map-type: atomic + tls: + items: + description: IngressTLS describes the transport layer security associated + with an ingress. + properties: + hosts: + description: |- + hosts is a list of hosts included in the TLS certificate. The values in + this list must match the name/s used in the tlsSecret. Defaults to the + wildcard host setting for the loadbalancer controller fulfilling this + Ingress, if left unspecified. + items: + type: string + type: array + x-kubernetes-list-type: atomic + secretName: + description: |- + secretName is the name of the secret used to terminate TLS traffic on + port 443. Field is left optional to allow TLS routing based on SNI + hostname alone. If the SNI host in a listener conflicts with the "Host" + header field used by an IngressRule, the SNI host is used for termination + and value of the "Host" header is used for routing. + type: string + type: object + type: array + required: + - hostMappings + - ingressRef + type: object + status: + description: Status of the IngressReplica + properties: + conditions: + description: Represents the observations of a PrAutomation's current + state. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + id: + description: ID of the resource in the Console API. + type: string + sha: + description: SHA of last applied configuration. + type: string + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/cmd/agent/args/args.go b/cmd/agent/args/args.go index 0e5b9f0e..8b424e99 100644 --- a/cmd/agent/args/args.go +++ b/cmd/agent/args/args.go @@ -29,11 +29,11 @@ const ( defaultRefreshInterval = "2m" defaultRefreshIntervalDuration = 2 * time.Minute - defaultPollInterval = "30s" - defaultPollIntervalDuration = 30 * time.Second + defaultPollInterval = "2m" + defaultPollIntervalDuration = 2 * time.Minute - defaultRefreshJitter = "15s" - defaultRefreshJitterDuration = 15 * time.Second + defaultPollJitter = "15s" + defaultPollJitterDuration = 15 * time.Second defaultResourceCacheTTL = "1h" defaultResourceCacheTTLDuration = time.Hour @@ -41,8 +41,8 @@ const ( defaultManifestCacheTTL = "1h" defaultManifestCacheTTLDuration = time.Hour - defaultControllerCacheTTL = "30s" - defaultControllerCacheTTLDuration = 30 * time.Second + defaultControllerCacheTTL = "2m" + defaultControllerCacheTTLDuration = 2 * time.Minute defaultRestoreNamespace = "velero" @@ -61,15 +61,16 @@ var ( argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.") argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.") - argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.") - argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.") - argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.") - argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.") - argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.") - argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.") - argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.") - argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.") - argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.") + argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.") + argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.") + argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.") + argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.") + argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.") + argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.") + argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "DEPRECATED: Time interval to poll resources from the Console API.") + argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.") + // TODO: ensure this arg can be safely renamed without causing breaking changes. + argPollJitter = flag.String("refresh-jitter", defaultPollJitter, "Randomly selected jitter time up to the provided duration will be added to the poll interval.") argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.") argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.") argControllerCacheTTL = flag.String("controller-cache-ttl", defaultControllerCacheTTL, "The time to live of console controller cache entries.") @@ -184,14 +185,22 @@ func PollInterval() time.Duration { return defaultPollIntervalDuration } + if duration < 10*time.Second { + klog.Fatalf("--poll-interval cannot be lower than 10s") + } + return duration } -func RefreshJitter() time.Duration { - jitter, err := time.ParseDuration(*argRefreshJitter) +func PollJitter() time.Duration { + jitter, err := time.ParseDuration(*argPollJitter) if err != nil { - klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argRefreshJitter, "default", defaultRefreshJitterDuration) - return defaultRefreshJitterDuration + klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argPollJitter, "default", defaultPollJitterDuration) + return defaultPollJitterDuration + } + + if jitter < 10*time.Second { + klog.Fatalf("--refresh-jitter cannot be lower than 10s") } return jitter diff --git a/cmd/agent/args/pprof.go b/cmd/agent/args/pprof.go index 57b060bc..68940d93 100644 --- a/cmd/agent/args/pprof.go +++ b/cmd/agent/args/pprof.go @@ -4,17 +4,17 @@ import ( "net/http" "net/http/pprof" - "github.com/pluralsh/deployment-operator/pkg/log" + "k8s.io/klog/v2" ) func initProfiler() { - log.Logger.Info("initializing profiler") + klog.Info("initializing profiler") mux := http.NewServeMux() mux.HandleFunc(defaultProfilerPath, pprof.Index) go func() { if err := http.ListenAndServe(defaultProfilerAddress, mux); err != nil { - log.Logger.Fatal(err) + klog.Fatal(err) } }() } diff --git a/cmd/agent/console.go b/cmd/agent/console.go index f60a54af..483fc6b5 100644 --- a/cmd/agent/console.go +++ b/cmd/agent/console.go @@ -2,14 +2,14 @@ package main import ( "os" - - "k8s.io/client-go/util/workqueue" + "time" "github.com/pluralsh/deployment-operator/cmd/agent/args" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/client" consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/controller/stacks" + v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1" "k8s.io/client-go/rest" ctrclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -21,58 +21,64 @@ import ( "github.com/pluralsh/deployment-operator/pkg/controller/service" ) -func initConsoleManagerOrDie() *consolectrl.ControllerManager { +func initConsoleManagerOrDie() *consolectrl.Manager { mgr, err := consolectrl.NewControllerManager( consolectrl.WithMaxConcurrentReconciles(args.MaxConcurrentReconciles()), consolectrl.WithCacheSyncTimeout(args.ProcessingTimeout()), - consolectrl.WithRefresh(args.RefreshInterval()), - consolectrl.WithJitter(args.RefreshJitter()), + consolectrl.WithPollInterval(args.PollInterval()), + consolectrl.WithJitter(args.PollJitter()), consolectrl.WithRecoverPanic(true), consolectrl.WithConsoleClientArgs(args.ConsoleUrl(), args.DeployToken()), consolectrl.WithSocketArgs(args.ClusterId(), args.ConsoleUrl(), args.DeployToken()), ) if err != nil { - setupLog.Errorw("unable to create manager", "error", err) + setupLog.Error(err, "unable to create manager") os.Exit(1) } return mgr } +const ( + // Use custom (short) poll intervals for these reconcilers. + pipelineGatesPollInterval = 30 * time.Second + stacksPollInterval = 30 * time.Second +) + func registerConsoleReconcilersOrDie( - mgr *controller.ControllerManager, + mgr *controller.Manager, config *rest.Config, k8sClient ctrclient.Client, consoleClient client.Client, ) { - mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + mgr.AddReconcilerOrDie(service.Identifier, func() (v1.Reconciler, error) { r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl()) - return r, r.SvcQueue, err + return r, err }) - mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { - r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.PollInterval(), args.ClusterId()) - return r, r.GateQueue, err + mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (v1.Reconciler, error) { + r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, pipelineGatesPollInterval) + return r, err }) - mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + mgr.AddReconcilerOrDie(restore.Identifier, func() (v1.Reconciler, error) { r := restore.NewRestoreReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.RestoreNamespace()) - return r, r.RestoreQueue, nil + return r, nil }) - mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + mgr.AddReconcilerOrDie(namespaces.Identifier, func() (v1.Reconciler, error) { r := namespaces.NewNamespaceReconciler(consoleClient, k8sClient, args.ControllerCacheTTL()) - return r, r.NamespaceQueue, nil + return r, nil }) - mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + mgr.AddReconcilerOrDie(stacks.Identifier, func() (v1.Reconciler, error) { namespace, err := utils.GetOperatorNamespace() if err != nil { - setupLog.Errorw("unable to get operator namespace", "error", err) + setupLog.Error(err, "unable to get operator namespace") os.Exit(1) } - r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.PollInterval(), namespace, args.ConsoleUrl(), args.DeployToken()) - return r, r.StackQueue, nil + r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), stacksPollInterval, namespace, args.ConsoleUrl(), args.DeployToken()) + return r, nil }) } diff --git a/cmd/agent/kubernetes.go b/cmd/agent/kubernetes.go index 34958e48..b653e36b 100644 --- a/cmd/agent/kubernetes.go +++ b/cmd/agent/kubernetes.go @@ -21,15 +21,15 @@ import ( "github.com/pluralsh/deployment-operator/cmd/agent/args" "github.com/pluralsh/deployment-operator/internal/controller" + "github.com/pluralsh/deployment-operator/pkg/cache" consoleclient "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/common" consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" - "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" "github.com/pluralsh/deployment-operator/pkg/controller/service" ) func initKubeManagerOrDie(config *rest.Config) manager.Manager { mgr, err := ctrl.NewManager(config, ctrl.Options{ + Logger: setupLog, Scheme: scheme, LeaderElection: args.EnableLeaderElection(), LeaderElectionID: "dep12loy45.plural.sh", @@ -86,7 +86,7 @@ func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clie func registerKubeReconcilersOrDie( manager ctrl.Manager, - consoleManager *consolectrl.ControllerManager, + consoleManager *consolectrl.Manager, config *rest.Config, extConsoleClient consoleclient.Client, ) { @@ -116,10 +116,8 @@ func registerKubeReconcilersOrDie( HttpClient: &http.Client{Timeout: httpClientTimout}, ArgoClientSet: rolloutsClient, DynamicClient: dynamicClient, - SvcReconciler: common.ToReconcilerOrDie[*service.ServiceReconciler]( - consoleManager.GetReconciler(service.Identifier), - ), - KubeClient: kubeClient, + SvcReconciler: consoleManager.GetReconcilerOrDie(service.Identifier), + KubeClient: kubeClient, } reconcileGroups := map[schema.GroupVersionKind]controller.SetupWithManager{ @@ -157,9 +155,6 @@ func registerKubeReconcilersOrDie( if err := (&controller.CustomHealthReconciler{ Client: manager.GetClient(), Scheme: manager.GetScheme(), - ServiceReconciler: common.ToReconcilerOrDie[*service.ServiceReconciler]( - consoleManager.GetReconciler(service.Identifier), - ), }).SetupWithManager(manager); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HealthConvert") } @@ -171,6 +166,13 @@ func registerKubeReconcilersOrDie( setupLog.Error(err, "unable to create controller", "controller", "StackRun") } + if err := (&controller.IngressReplicaReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "IngressReplica") + } + rawConsoleUrl, _ := strings.CutSuffix(args.ConsoleUrl(), "/ext/gql") if err := (&controller.VirtualClusterController{ Client: manager.GetClient(), @@ -198,13 +200,10 @@ func registerKubeReconcilersOrDie( } if err = (&controller.PipelineGateReconciler{ - Client: manager.GetClient(), - GateCache: common.ToReconcilerOrDie[*pipelinegates.GateReconciler]( - consoleManager.GetReconciler(pipelinegates.Identifier), - ).GateCache, + Client: manager.GetClient(), ConsoleClient: consoleclient.New(args.ConsoleUrl(), args.DeployToken()), - Log: ctrl.Log.WithName("controllers").WithName("PipelineGate"), Scheme: manager.GetScheme(), + GateCache: cache.GateCache(), }).SetupWithManager(manager); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Group") os.Exit(1) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 09db47d5..d6d5b608 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -10,13 +10,7 @@ import ( constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" - - deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" - "github.com/pluralsh/deployment-operator/cmd/agent/args" - "github.com/pluralsh/deployment-operator/pkg/cache" - "github.com/pluralsh/deployment-operator/pkg/client" - consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" - "github.com/pluralsh/deployment-operator/pkg/log" + "k8s.io/klog/v2" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -24,11 +18,17 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + + deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/cmd/agent/args" + "github.com/pluralsh/deployment-operator/pkg/cache" + "github.com/pluralsh/deployment-operator/pkg/client" + consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" ) var ( scheme = runtime.NewScheme() - setupLog = log.Logger + setupLog = klog.NewKlogr() ) func init() { @@ -49,13 +49,16 @@ const ( func main() { args.Init() config := ctrl.GetConfigOrDie() - ctx := ctrl.SetupSignalHandler() + ctx := ctrl.LoggerInto(ctrl.SetupSignalHandler(), setupLog) extConsoleClient := client.New(args.ConsoleUrl(), args.DeployToken()) discoveryClient := initDiscoveryClientOrDie(config) kubeManager := initKubeManagerOrDie(config) consoleManager := initConsoleManagerOrDie() + // Initialize Pipeline Gate Cache + cache.InitGateCache(args.ControllerCacheTTL(), extConsoleClient) + registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient) registerKubeReconcilersOrDie(kubeManager, consoleManager, config, extConsoleClient) @@ -86,10 +89,10 @@ func initDiscoveryClientOrDie(config *rest.Config) *discovery.DiscoveryClient { return discoveryClient } -func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.ControllerManager) { +func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.Manager) { setupLog.Info("starting console controller manager") if err := mgr.Start(ctx); err != nil { - setupLog.Errorw("unable to start console controller manager", "error", err) + setupLog.Error(err, "unable to start console controller manager") os.Exit(1) } } @@ -97,7 +100,7 @@ func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.Co func runKubeManagerOrDie(ctx context.Context, mgr ctrl.Manager) { setupLog.Info("starting kubernetes controller manager") if err := mgr.Start(ctx); err != nil { - setupLog.Errorw("unable to start kubernetes controller manager", "error", err) + setupLog.Error(err, "unable to start kubernetes controller manager") os.Exit(1) } } diff --git a/config/crd/bases/deployments.plural.sh_ingressreplicas.yaml b/config/crd/bases/deployments.plural.sh_ingressreplicas.yaml new file mode 100644 index 00000000..ce8969c2 --- /dev/null +++ b/config/crd/bases/deployments.plural.sh_ingressreplicas.yaml @@ -0,0 +1,198 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.3 + name: ingressreplicas.deployments.plural.sh +spec: + group: deployments.plural.sh + names: + kind: IngressReplica + listKind: IngressReplicaList + plural: ingressreplicas + singular: ingressreplica + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: IngressReplica is the Schema for the console ingress replica + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec of the IngressReplica + properties: + hostMappings: + additionalProperties: + type: string + type: object + ingressClassName: + type: string + ingressRef: + description: ObjectReference contains enough information to let you + inspect or modify the referred object. + properties: + apiVersion: + description: API version of the referent. + type: string + fieldPath: + description: |- + If referring to a piece of an object instead of an entire object, this string + should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2]. + For example, if the object reference is to a container within a pod, this would take on a value like: + "spec.containers{name}" (where "name" refers to the name of the container that triggered + the event) or if no container name is specified "spec.containers[2]" (container with + index 2 in this pod). This syntax is chosen only to have some well-defined way of + referencing a part of an object. + type: string + kind: + description: |- + Kind of the referent. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + namespace: + description: |- + Namespace of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + type: string + resourceVersion: + description: |- + Specific resourceVersion to which this reference is made, if any. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency + type: string + uid: + description: |- + UID of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids + type: string + type: object + x-kubernetes-map-type: atomic + tls: + items: + description: IngressTLS describes the transport layer security associated + with an ingress. + properties: + hosts: + description: |- + hosts is a list of hosts included in the TLS certificate. The values in + this list must match the name/s used in the tlsSecret. Defaults to the + wildcard host setting for the loadbalancer controller fulfilling this + Ingress, if left unspecified. + items: + type: string + type: array + x-kubernetes-list-type: atomic + secretName: + description: |- + secretName is the name of the secret used to terminate TLS traffic on + port 443. Field is left optional to allow TLS routing based on SNI + hostname alone. If the SNI host in a listener conflicts with the "Host" + header field used by an IngressRule, the SNI host is used for termination + and value of the "Host" header is used for routing. + type: string + type: object + type: array + required: + - hostMappings + - ingressRef + type: object + status: + description: Status of the IngressReplica + properties: + conditions: + description: Represents the observations of a PrAutomation's current + state. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + id: + description: ID of the resource in the Console API. + type: string + sha: + description: SHA of last applied configuration. + type: string + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/internal/controller/argorollout_controller.go b/internal/controller/argorollout_controller.go index ee50edc8..a7ece956 100644 --- a/internal/controller/argorollout_controller.go +++ b/internal/controller/argorollout_controller.go @@ -24,7 +24,7 @@ import ( "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller/service" + v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1" ) const requeueArgoRolloutAfter = time.Second * 5 @@ -39,7 +39,7 @@ type ArgoRolloutReconciler struct { ArgoClientSet roclientset.Interface DynamicClient dynamic.Interface KubeClient kubernetes.Interface - SvcReconciler *service.ServiceReconciler + SvcReconciler v1.Reconciler } // Reconcile Argo Rollout custom resources to ensure that Console stays in sync with Kubernetes cluster. @@ -107,7 +107,7 @@ func (r *ArgoRolloutReconciler) promote(ctx context.Context, rolloutIf clientset } if r.SvcReconciler != nil { - r.SvcReconciler.SvcQueue.AddRateLimited(svcId) + r.SvcReconciler.Queue().AddRateLimited(svcId) } return nil } diff --git a/internal/controller/customhealth_controller.go b/internal/controller/customhealth_controller.go index 01da7232..111cf5a1 100644 --- a/internal/controller/customhealth_controller.go +++ b/internal/controller/customhealth_controller.go @@ -30,14 +30,12 @@ import ( "github.com/pluralsh/deployment-operator/api/v1alpha1" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/common" - "github.com/pluralsh/deployment-operator/pkg/controller/service" ) // CustomHealthReconciler reconciles a LuaScript object type CustomHealthReconciler struct { client.Client - Scheme *runtime.Scheme - ServiceReconciler *service.ServiceReconciler + Scheme *runtime.Scheme } //+kubebuilder:rbac:groups=deployments.plural.sh,resources=customhealths,verbs=get;list;watch;create;update;patch;delete diff --git a/internal/controller/customhealth_controller_test.go b/internal/controller/customhealth_controller_test.go index 57ca7e8d..3936b49a 100644 --- a/internal/controller/customhealth_controller_test.go +++ b/internal/controller/customhealth_controller_test.go @@ -5,13 +5,13 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/pluralsh/deployment-operator/api/v1alpha1" - "github.com/pluralsh/deployment-operator/pkg/common" - "github.com/pluralsh/deployment-operator/pkg/controller/service" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/pkg/common" ) var _ = Describe("Customhealt Controller", Ordered, func() { @@ -66,11 +66,9 @@ var _ = Describe("Customhealt Controller", Ordered, func() { Conditions: []metav1.Condition{}, }, } - sr := &service.ServiceReconciler{} reconciler := &CustomHealthReconciler{ - Client: kClient, - Scheme: kClient.Scheme(), - ServiceReconciler: sr, + Client: kClient, + Scheme: kClient.Scheme(), } _, err := reconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: typeNamespacedName, diff --git a/internal/controller/ingressreplica_controller.go b/internal/controller/ingressreplica_controller.go new file mode 100644 index 00000000..fb803140 --- /dev/null +++ b/internal/controller/ingressreplica_controller.go @@ -0,0 +1,168 @@ +package controller + +import ( + "context" + + "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/samber/lo" + networkv1 "k8s.io/api/networking/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + k8sClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// IngressReplicaReconciler reconciles a IngressReplica resource. +type IngressReplicaReconciler struct { + k8sClient.Client + Scheme *runtime.Scheme +} + +// Reconcile IngressReplica ensure that stays in sync with Kubernetes cluster. +func (r *IngressReplicaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ reconcile.Result, reterr error) { + logger := log.FromContext(ctx) + + // Read resource from Kubernetes cluster. + ingressReplica := &v1alpha1.IngressReplica{} + if err := r.Get(ctx, req.NamespacedName, ingressReplica); err != nil { + logger.Error(err, "unable to fetch IngressReplica") + return ctrl.Result{}, k8sClient.IgnoreNotFound(err) + } + + logger.Info("reconciling IngressReplica", "namespace", ingressReplica.Namespace, "name", ingressReplica.Name) + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, "") + + scope, err := NewDefaultScope(ctx, r.Client, ingressReplica) + if err != nil { + logger.Error(err, "failed to create scope") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + + // Always patch object when exiting this function, so we can persist any object changes. + defer func() { + if err := scope.PatchObject(); err != nil && reterr == nil { + reterr = err + } + }() + + if !ingressReplica.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + oldIngress := &networkv1.Ingress{} + if err := r.Get(ctx, k8sClient.ObjectKey{Name: ingressReplica.Spec.IngressRef.Name, Namespace: ingressReplica.Spec.IngressRef.Namespace}, oldIngress); err != nil { + logger.Error(err, "failed to get old Ingress") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + + sha, err := utils.HashObject(ingressReplica.Spec) + if err != nil { + logger.Error(err, "failed to hash IngressReplica.Spec") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + + newIngress := &networkv1.Ingress{} + if err := r.Get(ctx, k8sClient.ObjectKey{Name: ingressReplica.Name, Namespace: ingressReplica.Namespace}, newIngress); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "failed to get new Ingress") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + + newIngress = genIngress(ingressReplica, oldIngress) + if err := r.Client.Create(ctx, newIngress); err != nil { + logger.Error(err, "failed to create new Ingress") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + ingressReplica.Status.SHA = &sha + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionTrue, v1alpha1.ReadyConditionReason, "") + return ctrl.Result{}, nil + } + + // update a new ingress + if !ingressReplica.Status.IsSHAEqual(sha) { + updateIngress(ingressReplica, newIngress, oldIngress) + if err := r.Client.Update(ctx, newIngress); err != nil { + logger.Error(err, "failed to update new Ingress") + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionFalse, v1alpha1.ReadyConditionReason, err.Error()) + return ctrl.Result{}, err + } + } + ingressReplica.Status.SHA = &sha + utils.MarkCondition(ingressReplica.SetCondition, v1alpha1.ReadyConditionType, metav1.ConditionTrue, v1alpha1.ReadyConditionReason, "") + return ctrl.Result{}, reterr +} + +// SetupWithManager sets up the controller with the Manager. +func (r *IngressReplicaReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.IngressReplica{}). + Complete(r) +} + +func genIngress(ingressReplica *v1alpha1.IngressReplica, oldIngress *networkv1.Ingress) *networkv1.Ingress { + newIngress := &networkv1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: ingressReplica.Name, + Namespace: ingressReplica.Namespace, + }, + Spec: networkv1.IngressSpec{ + IngressClassName: oldIngress.Spec.IngressClassName, + DefaultBackend: oldIngress.Spec.DefaultBackend, + }, + } + updateIngress(ingressReplica, newIngress, oldIngress) + return newIngress +} + +func updateIngress(ingressReplica *v1alpha1.IngressReplica, newIngress *networkv1.Ingress, oldIngress *networkv1.Ingress) { + if newIngress.Labels == nil { + newIngress.Labels = map[string]string{} + } + if oldIngress.Labels == nil { + oldIngress.Labels = map[string]string{} + } + if ingressReplica.Labels == nil { + ingressReplica.Labels = map[string]string{} + } + // merge from left to right + newIngress.Labels = lo.Assign(newIngress.Labels, oldIngress.Labels, ingressReplica.Labels) + + if newIngress.Annotations == nil { + newIngress.Annotations = map[string]string{} + } + if oldIngress.Annotations == nil { + oldIngress.Annotations = map[string]string{} + } + if ingressReplica.Annotations == nil { + ingressReplica.Annotations = map[string]string{} + } + // merge from left to right + newIngress.Annotations = lo.Assign(newIngress.Annotations, oldIngress.Annotations, ingressReplica.Annotations) + + if ingressReplica.Spec.IngressClassName != nil { + newIngress.Spec.IngressClassName = ingressReplica.Spec.IngressClassName + } + if len(ingressReplica.Spec.TLS) > 0 { + newIngress.Spec.TLS = ingressReplica.Spec.TLS + } + for _, rule := range oldIngress.Spec.Rules { + ir := networkv1.IngressRule{ + Host: rule.Host, + IngressRuleValue: rule.IngressRuleValue, + } + if newHost, ok := ingressReplica.Spec.HostMappings[rule.Host]; ok { + ir.Host = newHost + } + + newIngress.Spec.Rules = append(newIngress.Spec.Rules, ir) + } +} diff --git a/internal/controller/ingressreplica_controller_test.go b/internal/controller/ingressreplica_controller_test.go new file mode 100644 index 00000000..48d00f2a --- /dev/null +++ b/internal/controller/ingressreplica_controller_test.go @@ -0,0 +1,161 @@ +package controller + +import ( + "context" + "sort" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/pkg/test/common" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + networkv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ = Describe("IngressReplica Controller", Ordered, func() { + Context("When reconciling a resource", func() { + const ( + ingressReplicaName = "ingress-replica-name" + ingressName = "old-ingress" + namespace = "default" + ) + + ctx := context.Background() + + namespacedName := types.NamespacedName{Name: ingressReplicaName, Namespace: namespace} + ingressNamespacedName := types.NamespacedName{Name: ingressName, Namespace: namespace} + + ingressReplica := &v1alpha1.IngressReplica{} + oldIngress := &networkv1.Ingress{} + + BeforeAll(func() { + By("Creating IngressReplica") + err := kClient.Get(ctx, namespacedName, ingressReplica) + if err != nil && errors.IsNotFound(err) { + resource := &v1alpha1.IngressReplica{ + ObjectMeta: metav1.ObjectMeta{ + Name: ingressReplicaName, + Namespace: namespace, + }, + Spec: v1alpha1.IngressReplicaSpec{ + IngressRef: corev1.ObjectReference{ + Name: ingressName, + Namespace: namespace, + }, + HostMappings: map[string]string{ + "example.com": "test.example.com", + }, + }, + } + Expect(kClient.Create(ctx, resource)).To(Succeed()) + } + By("Creating Ingress") + err = kClient.Get(ctx, ingressNamespacedName, oldIngress) + if err != nil && errors.IsNotFound(err) { + resource := &networkv1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: ingressName, + Namespace: namespace, + }, + Spec: networkv1.IngressSpec{ + Rules: []networkv1.IngressRule{ + { + Host: "test", + IngressRuleValue: networkv1.IngressRuleValue{}, + }, + }, + }, + } + Expect(kClient.Create(ctx, resource)).To(Succeed()) + } + }) + + AfterAll(func() { + By("Cleanup ingress replica resources") + oldIngress := &networkv1.Ingress{} + Expect(kClient.Get(ctx, ingressNamespacedName, oldIngress)).NotTo(HaveOccurred()) + Expect(kClient.Delete(ctx, oldIngress)).To(Succeed()) + + By("Cleanup ingress replica") + ingressReplica := &v1alpha1.IngressReplica{} + Expect(kClient.Get(ctx, namespacedName, ingressReplica)).NotTo(HaveOccurred()) + Expect(kClient.Delete(ctx, ingressReplica)).To(Succeed()) + }) + + It("create ingress", func() { + reconciler := &IngressReplicaReconciler{ + Client: kClient, + Scheme: kClient.Scheme(), + } + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + + newIngress := &networkv1.Ingress{} + Expect(kClient.Get(ctx, namespacedName, newIngress)).NotTo(HaveOccurred()) + + err = kClient.Get(ctx, namespacedName, ingressReplica) + Expect(err).NotTo(HaveOccurred()) + Expect(SanitizeStatusConditions(ingressReplica.Status)).To(Equal(SanitizeStatusConditions(v1alpha1.Status{ + SHA: lo.ToPtr("ACBBWIKK74ACGAK5NWAXYTTIYI2GDOSXGCJ65UGOLOPFCB24PKUQ===="), + Conditions: []metav1.Condition{ + { + Type: v1alpha1.ReadyConditionType.String(), + Status: metav1.ConditionTrue, + Reason: v1alpha1.ReadyConditionReason.String(), + Message: "", + }, + }, + }))) + + }) + + It("update ingress", func() { + reconciler := &IngressReplicaReconciler{ + Client: kClient, + Scheme: kClient.Scheme(), + } + + Expect(common.MaybePatch(kClient, &v1alpha1.IngressReplica{ + ObjectMeta: metav1.ObjectMeta{Name: ingressReplicaName, Namespace: namespace}, + }, func(p *v1alpha1.IngressReplica) { + p.Status.SHA = lo.ToPtr("diff-sha") + })).To(Succeed()) + + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + err = kClient.Get(ctx, namespacedName, ingressReplica) + Expect(err).NotTo(HaveOccurred()) + Expect(SanitizeStatusConditions(ingressReplica.Status)).To(Equal(SanitizeStatusConditions(v1alpha1.Status{ + SHA: lo.ToPtr("ACBBWIKK74ACGAK5NWAXYTTIYI2GDOSXGCJ65UGOLOPFCB24PKUQ===="), + Conditions: []metav1.Condition{ + { + Type: v1alpha1.ReadyConditionType.String(), + Status: metav1.ConditionTrue, + Reason: v1alpha1.ReadyConditionReason.String(), + Message: "", + }, + }, + }))) + + }) + + }) +}) + +func SanitizeStatusConditions(status v1alpha1.Status) v1alpha1.Status { + for i := range status.Conditions { + status.Conditions[i].LastTransitionTime = metav1.Time{} + status.Conditions[i].ObservedGeneration = 0 + } + + sort.Slice(status.Conditions, func(i, j int) bool { + return status.Conditions[i].Type < status.Conditions[j].Type + }) + + return status +} diff --git a/internal/controller/pipelinegate_controller.go b/internal/controller/pipelinegate_controller.go index 3d536f42..859de23e 100644 --- a/internal/controller/pipelinegate_controller.go +++ b/internal/controller/pipelinegate_controller.go @@ -22,9 +22,7 @@ import ( "github.com/go-logr/logr" console "github.com/pluralsh/console/go/client" - "github.com/pluralsh/deployment-operator/api/v1alpha1" - "github.com/pluralsh/deployment-operator/internal/utils" - consoleclient "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/samber/lo" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -33,17 +31,20 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/internal/utils" + consoleclient "github.com/pluralsh/deployment-operator/pkg/client" ) // PipelineGateReconciler reconciles a PipelineGate object type PipelineGateReconciler struct { - client.Client + runtimeclient.Client ConsoleClient consoleclient.Client - GateCache *consoleclient.Cache[console.PipelineGateFragment] Scheme *runtime.Scheme - Log logr.Logger + GateCache *consoleclient.Cache[console.PipelineGateFragment] } //+kubebuilder:rbac:groups=deployments.plural.sh,resources=pipelinegates,verbs=get;list;watch;create;update;patch;delete @@ -133,7 +134,7 @@ func (r *PipelineGateReconciler) cleanUpGate(ctx context.Context, crGate *v1alph func (r *PipelineGateReconciler) killJob(ctx context.Context, job *batchv1.Job) error { log := log.FromContext(ctx) deletePolicy := metav1.DeletePropagationBackground // kill the job and its pods asap - if err := r.Delete(ctx, job, &client.DeleteOptions{ + if err := r.Delete(ctx, job, &runtimeclient.DeleteOptions{ PropagationPolicy: &deletePolicy, }); err != nil { if !apierrs.IsNotFound(err) { @@ -283,7 +284,7 @@ func hasSucceeded(job *batchv1.Job) bool { } // Job reconciles a k8s job object. -func Job(ctx context.Context, r client.Client, job *batchv1.Job, log logr.Logger) (*batchv1.Job, error) { +func Job(ctx context.Context, r runtimeclient.Client, job *batchv1.Job, log logr.Logger) (*batchv1.Job, error) { foundJob := &batchv1.Job{} if err := r.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, foundJob); err != nil { if !apierrs.IsNotFound(err) { diff --git a/internal/controller/pipelinegate_controller_test.go b/internal/controller/pipelinegate_controller_test.go index dadb3a13..b19774f6 100644 --- a/internal/controller/pipelinegate_controller_test.go +++ b/internal/controller/pipelinegate_controller_test.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" console "github.com/pluralsh/console/go/client" @@ -81,7 +80,6 @@ var _ = Describe("PipelineGate Controller", Ordered, func() { } Expect(kClient.Create(ctx, resource)).To(Succeed()) } - }) It("should set state pending", func() { @@ -90,9 +88,8 @@ var _ = Describe("PipelineGate Controller", Ordered, func() { reconciler := &PipelineGateReconciler{ Client: kClient, ConsoleClient: fakeConsoleClient, - GateCache: gateCache, Scheme: kClient.Scheme(), - Log: logr.Logger{}, + GateCache: gateCache, } _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: gateNamespacedName}) Expect(err).NotTo(HaveOccurred()) @@ -109,9 +106,8 @@ var _ = Describe("PipelineGate Controller", Ordered, func() { reconciler := &PipelineGateReconciler{ Client: kClient, ConsoleClient: fakeConsoleClient, - GateCache: gateCache, Scheme: kClient.Scheme(), - Log: logr.Logger{}, + GateCache: gateCache, } _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: gateNamespacedName}) Expect(err).NotTo(HaveOccurred()) @@ -129,9 +125,8 @@ var _ = Describe("PipelineGate Controller", Ordered, func() { reconciler := &PipelineGateReconciler{ Client: kClient, ConsoleClient: fakeConsoleClient, - GateCache: gateCache, Scheme: kClient.Scheme(), - Log: logr.Logger{}, + GateCache: gateCache, } existingJob := &batchv1.Job{} diff --git a/internal/metrics/metrics_prometheus.go b/internal/metrics/metrics_prometheus.go index 1b48069a..8f03303a 100644 --- a/internal/metrics/metrics_prometheus.go +++ b/internal/metrics/metrics_prometheus.go @@ -25,6 +25,8 @@ type prometheusRecorder struct { resourceCacheWatchCounter *prometheus.GaugeVec resourceCacheHitCounter *prometheus.CounterVec resourceCacheMissCounter *prometheus.CounterVec + + controllerRestartCounter *prometheus.CounterVec } func (in *prometheusRecorder) ResourceCacheWatchStart(resourceType string) { @@ -82,6 +84,10 @@ func (in *prometheusRecorder) StackRunJobCreation() { in.stackRunJobsCreatedCounter.Inc() } +func (in *prometheusRecorder) ControllerRestart(name string) { + in.controllerRestartCounter.WithLabelValues(name).Inc() +} + func (in *prometheusRecorder) init() Recorder { in.discoveryAPICacheRefreshCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: DiscoveryAPICacheRefreshMetricName, @@ -123,6 +129,11 @@ func (in *prometheusRecorder) init() Recorder { Help: ResourceCacheMissMetricDescription, }, []string{MetricLabelServiceID}) + in.controllerRestartCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: ControllerRestartsMetricName, + Help: ControllerRestartsMetricDescription, + }, []string{MetricLabelControllerName}) + return in } diff --git a/internal/metrics/metrics_types.go b/internal/metrics/metrics_types.go index 9b81da66..45324c28 100644 --- a/internal/metrics/metrics_types.go +++ b/internal/metrics/metrics_types.go @@ -36,6 +36,10 @@ const ( MetricLabelServiceName = "service_name" MetricLabelServiceType = "service_type" MetricLabelServiceReconciliationStage = "service_reconciliation_stage" + MetricLabelControllerName = "controller_name" + + ControllerRestartsMetricName = "agent_controller_restarts_total" + ControllerRestartsMetricDescription = "The total number of controller restarts" ) type ServiceReconciliationStage string @@ -76,4 +80,5 @@ type Recorder interface { ResourceCacheWatchEnd(resourceType string) ResourceCacheHit(serviceID string) ResourceCacheMiss(serviceID string) + ControllerRestart(name string) } diff --git a/pkg/cache/discovery_cache.go b/pkg/cache/discovery_cache.go index f4870406..114be58c 100644 --- a/pkg/cache/discovery_cache.go +++ b/pkg/cache/discovery_cache.go @@ -8,10 +8,10 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + "k8s.io/klog/v2" "github.com/pluralsh/deployment-operator/internal/helpers" "github.com/pluralsh/deployment-operator/internal/metrics" - "github.com/pluralsh/deployment-operator/pkg/log" ) var ( @@ -28,10 +28,10 @@ func DiscoveryCache() cmap.ConcurrentMap[string, bool] { } func RunDiscoveryCacheInBackgroundOrDie(ctx context.Context, discoveryClient *discovery.DiscoveryClient) { - log.Logger.Info("starting discovery cache") + klog.Info("starting discovery cache") err := helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { if err = updateDiscoveryCache(discoveryClient); err != nil { - log.Logger.Error(err, "can't fetch API versions") + klog.Error(err, "can't fetch API versions") } metrics.Record().DiscoveryAPICacheRefresh(err) diff --git a/pkg/cache/gate_cache.go b/pkg/cache/gate_cache.go new file mode 100644 index 00000000..8007fcb2 --- /dev/null +++ b/pkg/cache/gate_cache.go @@ -0,0 +1,31 @@ +package cache + +import ( + "time" + + console "github.com/pluralsh/console/go/client" + + "github.com/pluralsh/deployment-operator/pkg/client" +) + +var ( + gateCache *client.Cache[console.PipelineGateFragment] +) + +func InitGateCache(expireAfter time.Duration, consoleClient client.Client) { + if gateCache != nil { + return + } + + gateCache = client.NewCache[console.PipelineGateFragment](expireAfter, func(id string) (*console.PipelineGateFragment, error) { + return consoleClient.GetClusterGate(id) + }) +} + +func GateCache() *client.Cache[console.PipelineGateFragment] { + if gateCache == nil { + panic("gate cache is not initialized") + } + + return gateCache +} diff --git a/pkg/cache/resource_cache.go b/pkg/cache/resource_cache.go index e0af327c..5f8f3208 100644 --- a/pkg/cache/resource_cache.go +++ b/pkg/cache/resource_cache.go @@ -28,7 +28,6 @@ import ( "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/common" - "github.com/pluralsh/deployment-operator/pkg/log" ) // ResourceCache is responsible for creating a global resource cache of the @@ -80,20 +79,20 @@ var ( func Init(ctx context.Context, config *rest.Config, ttl time.Duration) { dynamicClient, err := dynamic.NewForConfig(config) if err != nil { - log.Logger.Error(err, "unable to create dynamic client") + klog.Error(err, "unable to create dynamic client") os.Exit(1) } f := utils.NewFactory(config) mapper, err := f.ToRESTMapper() if err != nil { - log.Logger.Error(err, "unable to create rest mapper") + klog.Error(err, "unable to create rest mapper") os.Exit(1) } discoveryClient, err := f.ToDiscoveryClient() if err != nil { - log.Logger.Error(err, "unable to create discovery client") + klog.Error(err, "unable to create discovery client") os.Exit(1) } @@ -230,7 +229,7 @@ func (in *ResourceCache) GetCacheStatus(key object.ObjMetadata) (*console.Compon func (in *ResourceCache) saveResourceStatus(resource *unstructured.Unstructured) { e, err := in.toStatusEvent(resource) if err != nil { - log.Logger.Error(err, "unable to convert resource to status event") + klog.Error(err, "unable to convert resource to status event") return } @@ -243,7 +242,7 @@ func (in *ResourceCache) saveResourceStatus(resource *unstructured.Unstructured) func (in *ResourceCache) watch(resourceKeySet containers.Set[ResourceKey]) { if in.resourceKeySet.Intersect(resourceKeySet).Len() == 0 { - log.Logger.Infow("resource keys not found in cache, stopping watch", "resourceKeys", resourceKeySet.List()) + klog.InfoS("resource keys not found in cache, stopping watch", "resourceKeys", resourceKeySet.List()) return } @@ -255,12 +254,12 @@ func (in *ResourceCache) watch(resourceKeySet containers.Set[ResourceKey]) { select { case <-in.ctx.Done(): if in.ctx.Err() != nil { - log.Logger.Errorf("status watcher context error %v", in.ctx.Err()) + klog.Errorf("status watcher context error %v", in.ctx.Err()) } return case e, ok := <-ch: if !ok { - log.Logger.Error("status watcher event channel closed") + klog.Error("status watcher event channel closed") in.watch(resourceKeySet) return } diff --git a/pkg/client/cache.go b/pkg/client/cache.go index d6200928..138b5e22 100644 --- a/pkg/client/cache.go +++ b/pkg/client/cache.go @@ -1,6 +1,7 @@ package client import ( + "sync" "time" cmap "github.com/orcaman/concurrent-map/v2" @@ -12,6 +13,8 @@ type cacheLine[T any] struct { } type Cache[T any] struct { + sync.Mutex + cache cmap.ConcurrentMap[string, *cacheLine[T]] expiry time.Duration clientGet Getter[T] @@ -38,6 +41,9 @@ func (c *Cache[T]) Get(id string) (*T, error) { } func (c *Cache[T]) Set(id string) (*T, error) { + c.Lock() + defer c.Unlock() + resource, err := c.clientGet(id) if err != nil { return nil, err @@ -52,6 +58,9 @@ func (c *Cache[T]) Wipe() { } func (c *Cache[T]) Expire(id string) { + c.Lock() + defer c.Unlock() + c.cache.Remove(id) } diff --git a/pkg/common/common.go b/pkg/common/common.go index 16f64b55..ce2623f5 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,14 +1,10 @@ package common import ( - "fmt" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/yaml" - - "github.com/pluralsh/deployment-operator/pkg/controller" ) const ( @@ -37,13 +33,3 @@ func Unmarshal(s string) (map[string]interface{}, error) { return result, nil } - -func ToReconcilerOrDie[R controller.Reconciler](in controller.Reconciler) R { - out, ok := in.(R) - // If cast fails panic. It means that the calling code is bad and has to be changed. - if !ok { - panic(fmt.Sprintf("%T is not a R", in)) - } - - return out -} diff --git a/pkg/common/status.go b/pkg/common/status.go index ad9841bc..3bf8867f 100644 --- a/pkg/common/status.go +++ b/pkg/common/status.go @@ -5,11 +5,11 @@ import ( "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/kstatus/status" internalschema "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" - dlog "github.com/pluralsh/deployment-operator/pkg/log" ) func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalschema.GroupName]string) *console.ComponentAttributes { @@ -49,7 +49,7 @@ func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalsc func ToStatus(obj *unstructured.Unstructured) *console.ComponentState { h, err := GetResourceHealth(obj) if err != nil { - dlog.Logger.Error(err, "Failed to get resource health status", "name", obj.GetName(), "namespace", obj.GetNamespace()) + klog.ErrorS(err, "failed to get resource health status", "name", obj.GetName(), "namespace", obj.GetNamespace()) } if h == nil { return nil diff --git a/pkg/controller/consts.go b/pkg/controller/common/consts.go similarity index 68% rename from pkg/controller/consts.go rename to pkg/controller/common/consts.go index b88a04ba..8c36ce49 100644 --- a/pkg/controller/consts.go +++ b/pkg/controller/common/consts.go @@ -1,4 +1,4 @@ -package controller +package common const ( DefaultPageSize = int64(100) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a67e0e36..291b39d8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,41 +3,23 @@ package controller import ( "context" "fmt" + "math/rand" "sync" "time" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/util/workqueue" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/pluralsh/deployment-operator/pkg/websocket" - logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -type Reconciler interface { - // Reconcile Kubernetes resources to reflect state from the Console. - Reconcile(context.Context, string) (reconcile.Result, error) - - // Poll Console for any state changes and put them in the queue that will be consumed by Reconcile. - Poll(context.Context) (bool, error) - - // GetPublisher returns event name, i.e. "event.service", and Publisher that will be registered with this reconciler. - // TODO: Make it optional and/or accept multiple publishers. - GetPublisher() (string, websocket.Publisher) - // WipeCache containing Console resources. - WipeCache() - - // ShutdownQueue containing Console resources. - ShutdownQueue() - - // GetPollInterval returns custom poll interval. If 0 then controller manager use default from the options. - GetPollInterval() time.Duration -} + v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1" + internallog "github.com/pluralsh/deployment-operator/pkg/log" +) type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. @@ -48,53 +30,37 @@ type Controller struct { // Reconciler is a function that can be called at any time with the ID of an object and // ensures that the state of the system matches the state specified in the object. - Do Reconciler - - // Queue is an listeningQueue that listens for events from Informers and adds object keys to - // the Queue for processing - Queue workqueue.TypedRateLimitingInterface[string] + Do v1.Reconciler // mu is used to synchronize Controller setup mu sync.Mutex - // ctx is the context that was passed to Start() and used when starting watches. - // - // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, - // while we usually always strive to follow best practices, we consider this a legacy case and it should - // undergo a major refactoring and redesign to allow for context to not be stored in a struct. - ctx context.Context - // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. CacheSyncTimeout time.Duration + // PollInterval defines how often controllers should poll for new resources. + PollInterval time.Duration + + // PollJitter defines how much polling jitter should there be when polling for new resources. + PollJitter time.Duration + // RecoverPanic indicates whether the panic caused by reconcile should be recovered. RecoverPanic *bool -} -func (c *Controller) Reconcile(ctx context.Context, req string) (_ reconcile.Result, err error) { - defer func() { - if r := recover(); r != nil { - if c.RecoverPanic != nil && *c.RecoverPanic { - for _, fn := range utilruntime.PanicHandlers { - fn(ctx, r) - } - err = fmt.Errorf("panic: %v [recovered]", r) - return - } + // lastPollTime is the last time Reconciler.Poll was called. + lastPollTime time.Time - log := logf.FromContext(ctx) - log.V(1).Info(fmt.Sprintf("Observed a panic in reconciler: %v", r)) - panic(r) - } - }() - return c.Do.Reconcile(ctx, req) + // lastReconcileTime is the last time Reconciler.Reconcile was called. + lastReconcileTime time.Time } -func (c *Controller) SetupWithManager(manager *ControllerManager) { +func (c *Controller) SetupWithManager(manager *Manager) { c.MaxConcurrentReconciles = manager.MaxConcurrentReconciles c.CacheSyncTimeout = manager.CacheSyncTimeout c.RecoverPanic = manager.RecoverPanic + c.PollInterval = manager.PollInterval + c.PollJitter = manager.PollJitter } // Start implements controller.Controller. @@ -103,14 +69,13 @@ func (c *Controller) Start(ctx context.Context) { // but lock outside to get proper handling of the queue shutdown c.mu.Lock() - // Set the internal context. - c.ctx = ctx - wg := &sync.WaitGroup{} func() { defer c.mu.Unlock() defer utilruntime.HandleCrash() + go c.startPoller(ctx) + wg.Add(c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { @@ -127,10 +92,53 @@ func (c *Controller) Start(ctx context.Context) { wg.Wait() } +func (c *Controller) Restart() { + c.Do.Restart() +} + +// LastPollTime returns the last time controller poll was executed. +// It signals whether the controller is alive and running. +func (c *Controller) LastPollTime() time.Time { + return c.lastPollTime +} + +// LastReconcileTime returns the last time controller poll was executed. +// It signals whether the controller is alive and running. +func (c *Controller) LastReconcileTime() time.Time { + return c.lastReconcileTime +} + +func (c *Controller) startPoller(ctx context.Context) { + defer c.Do.Shutdown() + + // It ensures that controllers won't poll the API at the same time. + jitterInterval := time.Duration(rand.Int63n(int64(c.PollJitter))) + pollInterval := c.PollInterval + if controllerPollInterval := c.Do.GetPollInterval(); controllerPollInterval > 0 { + pollInterval = controllerPollInterval + } + pollInterval += jitterInterval + + klog.V(internallog.LogLevelTrace).InfoS("Starting controller poller", "ctrl", c.Name, "pollInterval", pollInterval) + _ = wait.PollUntilContextCancel(ctx, pollInterval, true, func(_ context.Context) (bool, error) { + defer func() { + c.lastPollTime = time.Now() + }() + + if err := c.Do.Poll(ctx); err != nil { + klog.ErrorS(err, "poller failed") + } + + // never stop + return false, nil + }) + klog.V(internallog.LogLevelDefault).InfoS("Controller poller finished", "ctrl", c.Name) +} + // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { - id, shutdown := c.Queue.Get() + id, shutdown := c.Do.Queue().Get() if shutdown { // Stop working return false @@ -142,7 +150,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. - defer c.Queue.Done(id) + defer c.Do.Queue().Done(id) c.reconcileHandler(ctx, id) return true } @@ -155,11 +163,10 @@ func (c *Controller) reconcileHandler(ctx context.Context, id string) { // RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the // resource to be synced. log.V(5).Info("Reconciling") - result, err := c.Reconcile(ctx, id) + result, err := c.reconcile(ctx, id) switch { case err != nil: - - c.Queue.AddRateLimited(id) + c.Do.Queue().AddRateLimited(id) if !result.IsZero() { log.V(1).Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") @@ -171,19 +178,42 @@ func (c *Controller) reconcileHandler(ctx context.Context, id string) { // along with a non-nil error. But this is intended as // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter - c.Queue.Forget(id) - c.Queue.AddAfter(id, result.RequeueAfter) + c.Do.Queue().Forget(id) + c.Do.Queue().AddAfter(id, result.RequeueAfter) case result.Requeue: log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddRateLimited(id) + c.Do.Queue().AddRateLimited(id) default: log.V(5).Info("Reconcile successful") // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - c.Queue.Forget(id) + c.Do.Queue().Forget(id) } } +func (c *Controller) reconcile(ctx context.Context, req string) (_ reconcile.Result, err error) { + defer func() { + if r := recover(); r != nil { + if c.RecoverPanic != nil && *c.RecoverPanic { + for _, fn := range utilruntime.PanicHandlers { + fn(ctx, r) + } + err = fmt.Errorf("panic: %v [recovered]", r) + return + } + + log := logf.FromContext(ctx) + log.V(1).Info(fmt.Sprintf("Observed a panic in reconciler: %v", r)) + panic(r) + } else { + // Update last reconcile time on successful reconcile + c.lastReconcileTime = time.Now() + } + + }() + return c.Do.Reconcile(ctx, req) +} + // reconcileIDKey is a context.Context Value key. Its associated value should // be a types.UID. type reconcileIDKey struct{} diff --git a/pkg/controller/controller_manager.go b/pkg/controller/controller_manager.go index 8b187880..f1fac6b3 100644 --- a/pkg/controller/controller_manager.go +++ b/pkg/controller/controller_manager.go @@ -3,19 +3,25 @@ package controller import ( "context" "errors" - "math/rand" + "fmt" "os" + "sync" "time" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/internal/metrics" "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller/service" + v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1" "github.com/pluralsh/deployment-operator/pkg/log" "github.com/pluralsh/deployment-operator/pkg/websocket" ) -type ControllerManager struct { +type Manager struct { + sync.Mutex + Controllers []*Controller // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. @@ -28,20 +34,22 @@ type ControllerManager struct { // RecoverPanic indicates whether the panic caused by reconcile should be recovered. RecoverPanic *bool - Refresh time.Duration + // PollInterval defines how often controllers should poll for new resources. + PollInterval time.Duration + + // PollJitter defines how much polling jitter should there be when polling for new resources. + PollJitter time.Duration - Jitter time.Duration + Socket *websocket.Socket - // started is true if the ControllerManager has been Started + // started is true if the Manager has been Started started bool client client.Client - - Socket *websocket.Socket } -func NewControllerManager(options ...ControllerManagerOption) (*ControllerManager, error) { - ctrl := &ControllerManager{ +func NewControllerManager(options ...ControllerManagerOption) (*Manager, error) { + ctrl := &Manager{ Controllers: make([]*Controller, 0), started: false, } @@ -55,75 +63,171 @@ func NewControllerManager(options ...ControllerManagerOption) (*ControllerManage return ctrl, nil } -func (cm *ControllerManager) GetClient() client.Client { - return cm.client -} - -func (cm *ControllerManager) AddController(ctrl *Controller) { +func (cm *Manager) AddController(ctrl *Controller) { ctrl.SetupWithManager(cm) cm.Controllers = append(cm.Controllers, ctrl) } -func (cm *ControllerManager) GetReconciler(name string) Reconciler { +func (cm *Manager) GetReconcilerOrDie(name string) v1.Reconciler { for _, ctrl := range cm.Controllers { if ctrl.Name == name { return ctrl.Do } } - return nil + panic(fmt.Sprintf("controller %s not found", name)) } -func (cm *ControllerManager) AddReconcilerOrDie(name string, reconcilerGetter func() (Reconciler, workqueue.TypedRateLimitingInterface[string], error)) { - reconciler, queue, err := reconcilerGetter() +func (cm *Manager) AddReconcilerOrDie(name string, reconcilerGetter func() (v1.Reconciler, error)) { + reconciler, err := reconcilerGetter() if err != nil { - log.Logger.Errorw("unable to create reconciler", "name", name, "error", err) + klog.ErrorS(err, "unable to create reconciler", "name", name) os.Exit(1) } cm.AddController(&Controller{ - Name: name, - Do: reconciler, - Queue: queue, + Name: name, + Do: reconciler, }) } -func (cm *ControllerManager) Start(ctx context.Context) error { +func (cm *Manager) Start(ctx context.Context) error { + cm.Lock() + defer cm.Unlock() + if cm.started { - return errors.New("controller manager was started more than once") + return errors.New("console controller manager was started more than once") } - for _, ctrl := range cm.Controllers { - controller := ctrl - jitterValue := time.Duration(rand.Int63n(int64(cm.Jitter))) - cm.Socket.AddPublisher(controller.Do.GetPublisher()) + go cm.startSupervised(ctx) - go func() { - defer controller.Do.ShutdownQueue() - defer controller.Do.WipeCache() + _ = helpers.BackgroundPollUntilContextCancel(ctx, cm.PollInterval, true, false, func(_ context.Context) (bool, error) { + if err := cm.Socket.Join(); err != nil { + klog.ErrorS(err, "unable to connect") + } - pollInterval := cm.Refresh - if controllerPollInterval := controller.Do.GetPollInterval(); controllerPollInterval > 0 { - pollInterval = controllerPollInterval - } - pollInterval += jitterValue - _ = wait.PollUntilContextCancel(context.Background(), pollInterval, true, func(_ context.Context) (done bool, err error) { - return controller.Do.Poll(ctx) - }) - }() + // never stop + return false, nil + }) + + cm.started = true + return nil +} + +func (cm *Manager) startSupervised(ctx context.Context) { + wg := &sync.WaitGroup{} + wg.Add(len(cm.Controllers)) + for _, ctrl := range cm.Controllers { go func() { - controller.Start(ctx) + defer wg.Done() + cm.startControllerSupervised(ctx, ctrl) }() } + <-ctx.Done() + klog.InfoS("Shutdown signal received, waiting for all controllers to finish", "name", "console-manager") + wg.Wait() + klog.InfoS("All controllers finished", "name", "console-manager") +} + +func (cm *Manager) startControllerSupervised(ctx context.Context, ctrl *Controller) { + internalCtx, cancel := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + + // Recheck the controller liveness every 30 seconds. + livenessCheckInterval := 30 * time.Second + // Make last controller action deadline 5 times the time of regular poll. + // It means that the controller hasn't polled/reconciled any resources. + // It could indicate that the controller might have died and should be restarted. + lastControllerActionDeadline := 5 * (cm.PollInterval + cm.PollJitter) + ticker := time.NewTicker(livenessCheckInterval) + defer ticker.Stop() + + wg.Add(1) go func() { - _ = wait.PollUntilContextCancel(context.Background(), cm.Refresh, true, func(_ context.Context) (done bool, err error) { - return false, cm.Socket.Join() - }) + defer wg.Done() + cm.startController(internalCtx, ctrl) }() - cm.started = true - return nil + for { + select { + case <-ctx.Done(): + klog.V(log.LogLevelDefault).InfoS( + "Shutdown signal received, waiting for controller to finish", + "name", ctrl.Name, + ) + cancel() + wg.Wait() + klog.V(log.LogLevelDefault).InfoS("Controller shutdown finished", "name", ctrl.Name) + return + case <-internalCtx.Done(): + metrics.Record().ControllerRestart(ctrl.Name) + klog.V(log.LogLevelVerbose).InfoS("Restart signal received, waiting for controller to finish", "name", ctrl.Name) + wg.Wait() + klog.V(log.LogLevelVerbose).InfoS("Controller finished", "name", ctrl.Name) + // Reinitialize context + internalCtx, cancel = context.WithCancel(ctx) + // restart + wg.Add(1) + go func() { + defer wg.Done() + cm.restartController(internalCtx, ctrl) + }() + case <-ticker.C: + lastPollTime := ctrl.LastPollTime() + klog.V(log.LogLevelDebug).InfoS( + "Controller last poll time check", + "name", ctrl.Name, + "lastPollTime", lastPollTime.Format(time.RFC3339), + ) + if time.Now().After(lastPollTime.Add(lastControllerActionDeadline)) { + klog.V(log.LogLevelDefault).InfoS( + "Controller unresponsive, restarting", + "ctrl", ctrl.Name, + "lastPollTime", lastPollTime.Format(time.RFC3339), + ) + cancel() + break + } + + // We only want to do an additional last reconcile time check + // for services controller. There will always be at least one + // service by default that should be reconciled. Other controllers + // do not have any resources to reconcile by default. + if ctrl.Name != service.Identifier { + break + } + + lastReconcileTime := ctrl.LastReconcileTime() + klog.V(log.LogLevelDebug).InfoS( + "Controller last reconcile time check", + "name", ctrl.Name, + "lastReconcileTime", lastReconcileTime.Format(time.RFC3339), + ) + if time.Now().After(lastReconcileTime.Add(lastControllerActionDeadline)) { + klog.V(log.LogLevelDefault).InfoS( + "Controller unresponsive, restarting", + "ctrl", ctrl.Name, + "lastReconcileTime", lastPollTime.Format(time.RFC3339), + ) + cancel() + } + } + } +} + +// startController starts the controller and blocks until it does not stop. +func (cm *Manager) startController(ctx context.Context, ctrl *Controller) { + klog.V(log.LogLevelDefault).InfoS("Starting controller", "name", ctrl.Name) + + // If publisher exists, this is a no-op + cm.Socket.AddPublisher(ctrl.Do.GetPublisher()) + ctrl.Start(ctx) +} + +func (cm *Manager) restartController(ctx context.Context, ctrl *Controller) { + ctrl.Restart() + cm.startController(ctx, ctrl) } diff --git a/pkg/controller/controller_manager_options.go b/pkg/controller/controller_manager_options.go index 2935f64f..d2275765 100644 --- a/pkg/controller/controller_manager_options.go +++ b/pkg/controller/controller_manager_options.go @@ -9,31 +9,31 @@ import ( "github.com/pluralsh/deployment-operator/pkg/websocket" ) -type ControllerManagerOption func(*ControllerManager) error +type ControllerManagerOption func(*Manager) error func WithConsoleClient(client client.Client) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.client = client return nil } } func WithConsoleClientArgs(url string, deployToken string) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.client = client.New(url, deployToken) return nil } } func WithSocket(socket *websocket.Socket) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.Socket = socket return nil } } func WithSocketArgs(clusterID, url, deployToken string) ControllerManagerOption { - return func(o *ControllerManager) (err error) { + return func(o *Manager) (err error) { socket, err := websocket.New(clusterID, url, deployToken) o.Socket = socket @@ -47,35 +47,35 @@ func WithSocketArgs(clusterID, url, deployToken string) ControllerManagerOption } func WithMaxConcurrentReconciles(maxConcurrentReconciles int) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.MaxConcurrentReconciles = maxConcurrentReconciles return nil } } func WithCacheSyncTimeout(timeout time.Duration) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.CacheSyncTimeout = timeout return nil } } -func WithRefresh(refresh time.Duration) ControllerManagerOption { - return func(o *ControllerManager) error { - o.Refresh = refresh +func WithPollInterval(interval time.Duration) ControllerManagerOption { + return func(o *Manager) error { + o.PollInterval = interval return nil } } func WithJitter(jitter time.Duration) ControllerManagerOption { - return func(o *ControllerManager) error { - o.Jitter = jitter + return func(o *Manager) error { + o.PollJitter = jitter return nil } } func WithRecoverPanic(recoverPanic bool) ControllerManagerOption { - return func(o *ControllerManager) error { + return func(o *Manager) error { o.RecoverPanic = &recoverPanic return nil } diff --git a/pkg/controller/namespaces/reconciler.go b/pkg/controller/namespaces/reconciler.go index f897b556..e9be7abb 100644 --- a/pkg/controller/namespaces/reconciler.go +++ b/pkg/controller/namespaces/reconciler.go @@ -20,7 +20,7 @@ import ( clienterrors "github.com/pluralsh/deployment-operator/internal/errors" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/common" "github.com/pluralsh/deployment-operator/pkg/websocket" ) @@ -29,47 +29,65 @@ const ( ) type NamespaceReconciler struct { - ConsoleClient client.Client - K8sClient ctrlclient.Client - NamespaceQueue workqueue.TypedRateLimitingInterface[string] - NamespaceCache *client.Cache[console.ManagedNamespaceFragment] + consoleClient client.Client + k8sClient ctrlclient.Client + namespaceQueue workqueue.TypedRateLimitingInterface[string] + namespaceCache *client.Cache[console.ManagedNamespaceFragment] } func NewNamespaceReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh time.Duration) *NamespaceReconciler { return &NamespaceReconciler{ - ConsoleClient: consoleClient, - K8sClient: k8sClient, - NamespaceQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), - NamespaceCache: client.NewCache[console.ManagedNamespaceFragment](refresh, func(id string) (*console.ManagedNamespaceFragment, error) { + consoleClient: consoleClient, + k8sClient: k8sClient, + namespaceQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + namespaceCache: client.NewCache[console.ManagedNamespaceFragment](refresh, func(id string) (*console.ManagedNamespaceFragment, error) { return consoleClient.GetNamespace(id) }), } } +func (n *NamespaceReconciler) Queue() workqueue.TypedRateLimitingInterface[string] { + return n.namespaceQueue +} + +func (n *NamespaceReconciler) Restart() { + // Cleanup + n.namespaceQueue.ShutDown() + n.namespaceCache.Wipe() + + // Initialize + n.namespaceQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) +} + +func (n *NamespaceReconciler) Shutdown() { + n.namespaceQueue.ShutDown() + n.namespaceCache.Wipe() +} + func (n *NamespaceReconciler) GetPollInterval() time.Duration { return 0 // use default poll interval } func (n *NamespaceReconciler) GetPublisher() (string, websocket.Publisher) { return "namespace.event", &socketPublisher{ - restoreQueue: n.NamespaceQueue, - restoreCache: n.NamespaceCache, + restoreQueue: n.namespaceQueue, + restoreCache: n.namespaceCache, } } func (n *NamespaceReconciler) WipeCache() { - n.NamespaceCache.Wipe() + n.namespaceCache.Wipe() } func (n *NamespaceReconciler) ShutdownQueue() { - n.NamespaceQueue.ShutDown() + n.namespaceQueue.ShutDown() } func (n *NamespaceReconciler) ListNamespaces(ctx context.Context) *algorithms.Pager[*console.ManagedNamespaceEdgeFragment] { logger := log.FromContext(ctx) logger.Info("create namespace pager") fetch := func(page *string, size int64) ([]*console.ManagedNamespaceEdgeFragment, *algorithms.PageInfo, error) { - resp, err := n.ConsoleClient.ListNamespaces(page, &size) + resp, err := n.consoleClient.ListNamespaces(page, &size) if err != nil { logger.Error(err, "failed to fetch namespaces") return nil, nil, err @@ -81,10 +99,10 @@ func (n *NamespaceReconciler) ListNamespaces(ctx context.Context) *algorithms.Pa } return resp.Edges, pageInfo, nil } - return algorithms.NewPager[*console.ManagedNamespaceEdgeFragment](controller.DefaultPageSize, fetch) + return algorithms.NewPager[*console.ManagedNamespaceEdgeFragment](common.DefaultPageSize, fetch) } -func (n *NamespaceReconciler) Poll(ctx context.Context) (done bool, err error) { +func (n *NamespaceReconciler) Poll(ctx context.Context) error { logger := log.FromContext(ctx) logger.Info("fetching namespaces") pager := n.ListNamespaces(ctx) @@ -93,21 +111,21 @@ func (n *NamespaceReconciler) Poll(ctx context.Context) (done bool, err error) { namespaces, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch namespace list") - return false, nil + return err } for _, namespace := range namespaces { logger.Info("sending update for", "namespace", namespace.Node.ID) - n.NamespaceQueue.Add(namespace.Node.ID) + n.namespaceQueue.Add(namespace.Node.ID) } } - return false, nil + return nil } func (n *NamespaceReconciler) Reconcile(ctx context.Context, id string) (reconcile.Result, error) { logger := log.FromContext(ctx) logger.Info("attempting to sync namespace", "id", id) - namespace, err := n.NamespaceCache.Get(id) + namespace, err := n.namespaceCache.Get(id) if err != nil { if clienterrors.IsNotFound(err) { logger.Info("namespace already deleted", "id", id) @@ -147,10 +165,10 @@ func (n *NamespaceReconciler) UpsertNamespace(ctx context.Context, fragment *con if createNamespace { existing := &v1.Namespace{} - err := n.K8sClient.Get(ctx, ctrlclient.ObjectKey{Name: fragment.Name}, existing) + err := n.k8sClient.Get(ctx, ctrlclient.ObjectKey{Name: fragment.Name}, existing) if err != nil { if apierrors.IsNotFound(err) { - if err := n.K8sClient.Create(ctx, &v1.Namespace{ + if err := n.k8sClient.Create(ctx, &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: fragment.Name, Labels: labels, @@ -168,7 +186,7 @@ func (n *NamespaceReconciler) UpsertNamespace(ctx context.Context, fragment *con if !reflect.DeepEqual(labels, existing.Labels) || !reflect.DeepEqual(annotations, existing.Annotations) { existing.Labels = labels existing.Annotations = annotations - if err := n.K8sClient.Update(ctx, existing); err != nil { + if err := n.k8sClient.Update(ctx, existing); err != nil { return err } } diff --git a/pkg/controller/pipelinegates/reconciler.go b/pkg/controller/pipelinegates/reconciler.go index e37f2365..42505005 100644 --- a/pkg/controller/pipelinegates/reconciler.go +++ b/pkg/controller/pipelinegates/reconciler.go @@ -10,18 +10,17 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" - "k8s.io/kubectl/pkg/cmd/util" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/pluralsh/deployment-operator/api/v1alpha1" "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/cache" "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/common" "github.com/pluralsh/deployment-operator/pkg/ping" "github.com/pluralsh/deployment-operator/pkg/websocket" ) @@ -31,20 +30,15 @@ const ( ) type GateReconciler struct { - K8sClient ctrlclient.Client - ConsoleClient client.Client - Config *rest.Config - Clientset *kubernetes.Clientset - GateCache *client.Cache[console.PipelineGateFragment] - GateQueue workqueue.TypedRateLimitingInterface[string] - UtilFactory util.Factory - discoveryClient *discovery.DiscoveryClient + k8sClient ctrlclient.Client + consoleClient client.Client + gateQueue workqueue.TypedRateLimitingInterface[string] pinger *ping.Pinger operatorNamespace string - PollInterval time.Duration + pollInterval time.Duration } -func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, config *rest.Config, refresh, pollInterval time.Duration, clusterId string) (*GateReconciler, error) { +func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, config *rest.Config, pollInterval time.Duration) (*GateReconciler, error) { utils.DisableClientLimits(config) discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) @@ -52,55 +46,50 @@ func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, return nil, err } - gateCache := client.NewCache[console.PipelineGateFragment](refresh, func(id string) (*console.PipelineGateFragment, error) { - return consoleClient.GetClusterGate(id) - }) - - gateQueue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - f := utils.NewFactory(config) - cs, err := f.KubernetesClientSet() - if err != nil { - return nil, err - } - namespace, err := utils.GetOperatorNamespace() if err != nil { return nil, err } + return &GateReconciler{ - K8sClient: k8sClient, - ConsoleClient: consoleClient, - Config: config, - Clientset: cs, - GateQueue: gateQueue, - GateCache: gateCache, - UtilFactory: f, - discoveryClient: discoveryClient, + k8sClient: k8sClient, + consoleClient: consoleClient, + gateQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), pinger: ping.New(consoleClient, discoveryClient, f), operatorNamespace: namespace, - PollInterval: pollInterval, + pollInterval: pollInterval, }, nil } -func (s *GateReconciler) GetPollInterval() time.Duration { - return s.PollInterval +func (s *GateReconciler) Queue() workqueue.TypedRateLimitingInterface[string] { + return s.gateQueue +} + +func (s *GateReconciler) Restart() { + // Cleanup + s.gateQueue.ShutDown() + cache.GateCache().Wipe() + + // Initialize + s.gateQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) } -func (s *GateReconciler) WipeCache() { - s.GateCache.Wipe() +func (s *GateReconciler) Shutdown() { + s.gateQueue.ShutDown() + cache.GateCache().Wipe() } -func (s *GateReconciler) ShutdownQueue() { - s.GateQueue.ShutDown() +func (s *GateReconciler) GetPollInterval() time.Duration { + return s.pollInterval } func (s *GateReconciler) ListGates(ctx context.Context) *algorithms.Pager[*console.PipelineGateEdgeFragment] { logger := log.FromContext(ctx) logger.Info("create pipeline gate pager") fetch := func(page *string, size int64) ([]*console.PipelineGateEdgeFragment, *algorithms.PageInfo, error) { - resp, err := s.ConsoleClient.GetClusterGates(page, &size) + resp, err := s.consoleClient.GetClusterGates(page, &size) if err != nil { logger.Error(err, "failed to fetch gates") return nil, nil, err @@ -112,10 +101,10 @@ func (s *GateReconciler) ListGates(ctx context.Context) *algorithms.Pager[*conso } return resp.PagedClusterGates.Edges, pageInfo, nil } - return algorithms.NewPager[*console.PipelineGateEdgeFragment](controller.DefaultPageSize, fetch) + return algorithms.NewPager[*console.PipelineGateEdgeFragment](common.DefaultPageSize, fetch) } -func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) { +func (s *GateReconciler) Poll(ctx context.Context) error { logger := log.FromContext(ctx) logger.V(1).Info("fetching gates for cluster") @@ -125,12 +114,12 @@ func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) { gates, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch gates list") - return false, nil + return err } for _, gate := range gates { logger.V(2).Info("sending update for", "gate", gate.Node.ID) - s.GateQueue.Add(gate.Node.ID) + s.gateQueue.Add(gate.Node.ID) } } @@ -138,7 +127,7 @@ func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) { logger.Error(err, "failed to ping cluster after scheduling syncs") } - return false, nil + return nil } func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Result, error) { @@ -146,7 +135,7 @@ func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Re logger.V(1).Info("attempting to sync gate", "id", id) var gate *console.PipelineGateFragment - gate, err := s.GateCache.Get(id) + gate, err := cache.GateCache().Get(id) if err != nil { logger.Error(err, "failed to fetch gate: %s, ignoring for now") return reconcile.Result{}, err @@ -159,7 +148,7 @@ func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Re return reconcile.Result{}, nil } - gateCR, err := s.ConsoleClient.ParsePipelineGateCR(gate, s.operatorNamespace) + gateCR, err := s.consoleClient.ParsePipelineGateCR(gate, s.operatorNamespace) if err != nil { logger.Error(err, "failed to parse gate CR", "Name", gate.Name, "ID", gate.ID) return reconcile.Result{}, err @@ -167,7 +156,7 @@ func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Re // get pipelinegate currentGate := &v1alpha1.PipelineGate{} - if err := s.K8sClient.Get(ctx, types.NamespacedName{Name: gateCR.Name, Namespace: gateCR.Namespace}, currentGate); err != nil { + if err := s.k8sClient.Get(ctx, types.NamespacedName{Name: gateCR.Name, Namespace: gateCR.Namespace}, currentGate); err != nil { if !apierrors.IsNotFound(err) { logger.V(1).Info("Could not get gate.", "Namespace", gateCR.Namespace, "Name", gateCR.Name, "ID", gateCR.Spec.ID) return reconcile.Result{}, err @@ -175,7 +164,7 @@ func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Re logger.V(1).Info("This gate doesn't yet have a corresponding CR on this cluster yet.", "Namespace", gateCR.Namespace, "Name", gateCR.Name, "ID", gateCR.Spec.ID) // If the PipelineGate doesn't exist, create it. - if err = s.K8sClient.Create(context.Background(), gateCR); err != nil { + if err = s.k8sClient.Create(context.Background(), gateCR); err != nil { logger.Error(err, "failed to create gate", "Namespace", gateCR.Namespace, "Name", gateCR.Name, "ID", gateCR.Spec.ID) return reconcile.Result{}, err } @@ -186,7 +175,6 @@ func (s *GateReconciler) Reconcile(ctx context.Context, id string) (reconcile.Re func (s *GateReconciler) GetPublisher() (string, websocket.Publisher) { return "gate.event", &socketPublisher{ - gateQueue: s.GateQueue, - gateCache: s.GateCache, + gateQueue: s.gateQueue, } } diff --git a/pkg/controller/pipelinegates/socket_publisher.go b/pkg/controller/pipelinegates/socket_publisher.go index c1977742..2094c79b 100644 --- a/pkg/controller/pipelinegates/socket_publisher.go +++ b/pkg/controller/pipelinegates/socket_publisher.go @@ -1,19 +1,16 @@ package pipelinegates import ( - console "github.com/pluralsh/console/go/client" - - "github.com/pluralsh/deployment-operator/pkg/client" - "k8s.io/client-go/util/workqueue" + + "github.com/pluralsh/deployment-operator/pkg/cache" ) type socketPublisher struct { gateQueue workqueue.TypedRateLimitingInterface[string] - gateCache *client.Cache[console.PipelineGateFragment] } func (sp *socketPublisher) Publish(id string) { - sp.gateCache.Expire(id) + cache.GateCache().Expire(id) sp.gateQueue.Add(id) } diff --git a/pkg/controller/restore/reconciler.go b/pkg/controller/restore/reconciler.go index 1d6992f2..04257931 100644 --- a/pkg/controller/restore/reconciler.go +++ b/pkg/controller/restore/reconciler.go @@ -49,67 +49,85 @@ var ( ) type RestoreReconciler struct { - ConsoleClient client.Client - K8sClient ctrlclient.Client - RestoreQueue workqueue.TypedRateLimitingInterface[string] - RestoreCache *client.Cache[console.ClusterRestoreFragment] - Namespace string + consoleClient client.Client + k8sClient ctrlclient.Client + restoreQueue workqueue.TypedRateLimitingInterface[string] + restoreCache *client.Cache[console.ClusterRestoreFragment] + namespace string } func NewRestoreReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh time.Duration, namespace string) *RestoreReconciler { return &RestoreReconciler{ - ConsoleClient: consoleClient, - K8sClient: k8sClient, - RestoreQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), - RestoreCache: client.NewCache[console.ClusterRestoreFragment](refresh, func(id string) (*console.ClusterRestoreFragment, error) { + consoleClient: consoleClient, + k8sClient: k8sClient, + restoreQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + restoreCache: client.NewCache[console.ClusterRestoreFragment](refresh, func(id string) (*console.ClusterRestoreFragment, error) { return consoleClient.GetClusterRestore(id) }), - Namespace: namespace, + namespace: namespace, } } +func (s *RestoreReconciler) Queue() workqueue.TypedRateLimitingInterface[string] { + return s.restoreQueue +} + +func (s *RestoreReconciler) Restart() { + // Cleanup + s.restoreQueue.ShutDown() + s.restoreCache.Wipe() + + // Initialize + s.restoreQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) +} + +func (s *RestoreReconciler) Shutdown() { + s.restoreQueue.ShutDown() + s.restoreCache.Wipe() +} + func (s *RestoreReconciler) GetPollInterval() time.Duration { return 0 // use default poll interval } func (s *RestoreReconciler) GetPublisher() (string, websocket.Publisher) { return "restore.event", &socketPublisher{ - restoreQueue: s.RestoreQueue, - restoreCache: s.RestoreCache, + restoreQueue: s.restoreQueue, + restoreCache: s.restoreCache, } } func (s *RestoreReconciler) WipeCache() { - s.RestoreCache.Wipe() + s.restoreCache.Wipe() } func (s *RestoreReconciler) ShutdownQueue() { - s.RestoreQueue.ShutDown() + s.restoreQueue.ShutDown() } -func (s *RestoreReconciler) Poll(ctx context.Context) (done bool, err error) { +func (s *RestoreReconciler) Poll(ctx context.Context) error { logger := log.FromContext(ctx) logger.Info("fetching restore for cluster") - myCluster, err := s.ConsoleClient.MyCluster() + myCluster, err := s.consoleClient.MyCluster() if err != nil { logger.Error(err, "failed to fetch my cluster") - return false, nil + return err } if myCluster.MyCluster.Restore != nil { logger.Info("sending update for", "restore", myCluster.MyCluster.Restore.ID) - s.RestoreQueue.Add(myCluster.MyCluster.Restore.ID) + s.restoreQueue.Add(myCluster.MyCluster.Restore.ID) } - return false, nil + return nil } func (s *RestoreReconciler) Reconcile(ctx context.Context, id string) (result reconcile.Result, err error) { logger := log.FromContext(ctx) logger.Info("attempting to sync restore", "id", id) - restore, err := s.RestoreCache.Get(id) + restore, err := s.restoreCache.Get(id) if err != nil { logger.Error(err, "failed to fetch restore, ignoring for now") return @@ -127,19 +145,19 @@ func (s *RestoreReconciler) Reconcile(ctx context.Context, id string) (result re logger.Info("syncing restore", "id", restore.ID) veleroRestore := &velerov1.Restore{} - err = s.K8sClient.Get(ctx, ctrlclient.ObjectKey{Name: id, Namespace: s.Namespace}, veleroRestore) + err = s.k8sClient.Get(ctx, ctrlclient.ObjectKey{Name: id, Namespace: s.namespace}, veleroRestore) if err != nil { if !apierrors.IsNotFound(err) { return } - err = s.K8sClient.Create(ctx, s.genVeleroRestore(restore.ID, restore.Backup.Name)) + err = s.k8sClient.Create(ctx, s.genVeleroRestore(restore.ID, restore.Backup.Name)) if err != nil { return } return reconcile.Result{}, nil } - _, err = s.ConsoleClient.UpdateClusterRestore(restore.ID, console.RestoreAttributes{ + _, err = s.consoleClient.UpdateClusterRestore(restore.ID, console.RestoreAttributes{ Status: restoreStatusMap[veleroRestore.Status.Phase], }) @@ -149,7 +167,7 @@ func (s *RestoreReconciler) Reconcile(ctx context.Context, id string) (result re func (s *RestoreReconciler) UpdateErrorStatus(ctx context.Context, id string) { logger := log.FromContext(ctx) - if _, err := s.ConsoleClient.UpdateClusterRestore(id, console.RestoreAttributes{ + if _, err := s.consoleClient.UpdateClusterRestore(id, console.RestoreAttributes{ Status: console.RestoreStatusFailed, }); err != nil { logger.Error(err, "Failed to update service status, ignoring for now") @@ -160,7 +178,7 @@ func (s *RestoreReconciler) genVeleroRestore(id, backupName string) *velerov1.Re return &velerov1.Restore{ ObjectMeta: metav1.ObjectMeta{ Name: id, - Namespace: s.Namespace, + Namespace: s.namespace, }, Spec: velerov1.RestoreSpec{ BackupName: backupName, diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index f13c57d0..f59c002b 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -25,15 +25,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/pluralsh/deployment-operator/cmd/agent/args" - "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" - agentcommon "github.com/pluralsh/deployment-operator/pkg/common" - clienterrors "github.com/pluralsh/deployment-operator/internal/errors" + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/internal/metrics" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/applier" "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" + agentcommon "github.com/pluralsh/deployment-operator/pkg/common" + common2 "github.com/pluralsh/deployment-operator/pkg/controller/common" plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" "github.com/pluralsh/deployment-operator/pkg/manifests" manis "github.com/pluralsh/deployment-operator/pkg/manifests" @@ -51,39 +50,28 @@ const ( ) type ServiceReconciler struct { - ConsoleClient client.Client - Config *rest.Config - Clientset *kubernetes.Clientset - Applier *applier.Applier - Destroyer *apply.Destroyer - SvcQueue workqueue.TypedRateLimitingInterface[string] - SvcCache *client.Cache[console.GetServiceDeploymentForAgent_ServiceDeployment] - ManifestCache *manifests.ManifestCache - UtilFactory util.Factory - RestoreNamespace string - - mapper meta.RESTMapper - pinger *ping.Pinger + consoleClient client.Client + clientset *kubernetes.Clientset + applier *applier.Applier + destroyer *apply.Destroyer + svcQueue workqueue.TypedRateLimitingInterface[string] + svcCache *client.Cache[console.GetServiceDeploymentForAgent_ServiceDeployment] + manifestCache *manifests.ManifestCache + utilFactory util.Factory + restoreNamespace string + mapper meta.RESTMapper + pinger *ping.Pinger } func NewServiceReconciler(consoleClient client.Client, config *rest.Config, refresh, manifestTTL time.Duration, restoreNamespace, consoleURL string) (*ServiceReconciler, error) { utils.DisableClientLimits(config) _, deployToken := consoleClient.GetCredentials() - discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { return nil, err } - svcCache := client.NewCache[console.GetServiceDeploymentForAgent_ServiceDeployment](refresh, func(id string) (*console.GetServiceDeploymentForAgent_ServiceDeployment, error) { - return consoleClient.GetService(id) - }) - - svcQueue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - - manifestCache := manifests.NewCache(manifestTTL, deployToken, consoleURL) - f := utils.NewFactory(config) mapper, err := f.ToRESTMapper() if err != nil { @@ -107,30 +95,51 @@ func NewServiceReconciler(consoleClient client.Client, config *rest.Config, refr } return &ServiceReconciler{ - ConsoleClient: consoleClient, - Config: config, - Clientset: cs, - SvcQueue: svcQueue, - SvcCache: svcCache, - ManifestCache: manifestCache, - UtilFactory: f, - Applier: a, - Destroyer: d, + consoleClient: consoleClient, + clientset: cs, + svcQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + svcCache: client.NewCache[console.GetServiceDeploymentForAgent_ServiceDeployment](refresh, func(id string) ( + *console.GetServiceDeploymentForAgent_ServiceDeployment, error, + ) { + return consoleClient.GetService(id) + }), + manifestCache: manifests.NewCache(manifestTTL, deployToken, consoleURL), + utilFactory: f, + applier: a, + destroyer: d, pinger: ping.New(consoleClient, discoveryClient, f), - RestoreNamespace: restoreNamespace, + restoreNamespace: restoreNamespace, mapper: mapper, }, nil } +func (s *ServiceReconciler) Queue() workqueue.TypedRateLimitingInterface[string] { + return s.svcQueue +} + +func (s *ServiceReconciler) Restart() { + // Cleanup + s.svcQueue.ShutDown() + s.svcCache.Wipe() + + // Initialize + s.svcQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) +} + +func (s *ServiceReconciler) Shutdown() { + s.svcQueue.ShutDown() + s.svcCache.Wipe() +} + func (s *ServiceReconciler) GetPollInterval() time.Duration { return 0 // use default poll interval } func (s *ServiceReconciler) GetPublisher() (string, websocket.Publisher) { return "service.event", &socketPublisher{ - svcQueue: s.SvcQueue, - svcCache: s.SvcCache, - manCache: s.ManifestCache, + svcQueue: s.svcQueue, + svcCache: s.svcCache, + manCache: s.manifestCache, } } @@ -225,19 +234,19 @@ func postProcess(mans []*unstructured.Unstructured) []*unstructured.Unstructured } func (s *ServiceReconciler) WipeCache() { - s.SvcCache.Wipe() - s.ManifestCache.Wipe() + s.svcCache.Wipe() + s.manifestCache.Wipe() } func (s *ServiceReconciler) ShutdownQueue() { - s.SvcQueue.ShutDown() + s.svcQueue.ShutDown() } func (s *ServiceReconciler) ListServices(ctx context.Context) *algorithms.Pager[*console.ServiceDeploymentEdgeFragment] { logger := log.FromContext(ctx) logger.Info("create service pager") fetch := func(page *string, size int64) ([]*console.ServiceDeploymentEdgeFragment, *algorithms.PageInfo, error) { - resp, err := s.ConsoleClient.GetServices(page, &size) + resp, err := s.consoleClient.GetServices(page, &size) if err != nil { logger.Error(err, "failed to fetch service list from deployments service") return nil, nil, err @@ -249,21 +258,21 @@ func (s *ServiceReconciler) ListServices(ctx context.Context) *algorithms.Pager[ } return resp.PagedClusterServices.Edges, pageInfo, nil } - return algorithms.NewPager[*console.ServiceDeploymentEdgeFragment](controller.DefaultPageSize, fetch) + return algorithms.NewPager[*console.ServiceDeploymentEdgeFragment](common2.DefaultPageSize, fetch) } -func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { +func (s *ServiceReconciler) Poll(ctx context.Context) error { logger := log.FromContext(ctx) logger.Info("fetching services for cluster") restore, err := s.isClusterRestore(ctx) if err != nil { logger.Error(err, "failed to check restore config map") - return false, nil + return err } if restore { logger.Info("restoring cluster from backup") - return false, nil + return nil } pager := s.ListServices(ctx) @@ -272,7 +281,7 @@ func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { services, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch service list from deployments service") - return false, nil + return err } for _, svc := range services { // If services arg is provided, we can skip @@ -282,7 +291,7 @@ func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { } logger.Info("sending update for", "service", svc.Node.ID) - s.SvcQueue.Add(svc.Node.ID) + s.svcQueue.Add(svc.Node.ID) } } @@ -291,7 +300,7 @@ func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { } s.ScrapeKube(ctx) - return false, nil + return nil } func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result reconcile.Result, err error) { @@ -301,7 +310,7 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re logger := log.FromContext(ctx) logger.Info("attempting to sync service", "id", id) - svc, err := s.SvcCache.Get(id) + svc, err := s.svcCache.Get(id) if err != nil { if clienterrors.IsNotFound(err) { logger.Info("service already deleted", "id", id) @@ -344,7 +353,7 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re if svc.DeletedAt != nil { logger.Info("Deleting service", "name", svc.Name, "namespace", svc.Namespace) - ch := s.Destroyer.Run(ctx, inventory.WrapInventoryInfoObj(s.defaultInventoryObjTemplate(id)), apply.DestroyerOptions{ + ch := s.destroyer.Run(ctx, inventory.WrapInventoryInfoObj(s.defaultInventoryObjTemplate(id)), apply.DestroyerOptions{ InventoryPolicy: inventory.PolicyAdoptIfNoInventory, DryRunStrategy: common.DryRunNone, DeleteTimeout: 20 * time.Second, @@ -359,7 +368,7 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re } logger.Info("Fetching manifests", "service", svc.Name) - manifests, err := s.ManifestCache.Fetch(s.UtilFactory, svc) + manifests, err := s.manifestCache.Fetch(s.utilFactory, svc) if err != nil { logger.Error(err, "failed to parse manifests", "service", svc.Name) return @@ -415,7 +424,7 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re options.DryRunStrategy = common.DryRunServer } - ch := s.Applier.Run(ctx, inv, manifests, options) + ch := s.applier.Run(ctx, inv, manifests, options) err = s.UpdateApplyStatus(ctx, svc, ch, false, vcache) return @@ -436,7 +445,7 @@ func (s *ServiceReconciler) CheckNamespace(namespace string, syncConfig *console } } if createNamespace { - return utils.CheckNamespace(*s.Clientset, namespace, labels, annotations) + return utils.CheckNamespace(*s.clientset, namespace, labels, annotations) } return nil } @@ -481,13 +490,13 @@ func (s *ServiceReconciler) defaultInventoryObjTemplate(id string) *unstructured } func (s *ServiceReconciler) isClusterRestore(ctx context.Context) (bool, error) { - cmr, err := s.Clientset.CoreV1().ConfigMaps(s.RestoreNamespace).Get(ctx, RestoreConfigMapName, metav1.GetOptions{}) + cmr, err := s.clientset.CoreV1().ConfigMaps(s.restoreNamespace).Get(ctx, RestoreConfigMapName, metav1.GetOptions{}) if err != nil { return false, nil } timeout := cmr.CreationTimestamp.Add(time.Hour) if time.Now().After(timeout) { - if err := s.Clientset.CoreV1().ConfigMaps(s.RestoreNamespace).Delete(ctx, RestoreConfigMapName, metav1.DeleteOptions{}); err != nil { + if err := s.clientset.CoreV1().ConfigMaps(s.restoreNamespace).Delete(ctx, RestoreConfigMapName, metav1.DeleteOptions{}); err != nil { return true, err } return false, nil diff --git a/pkg/controller/service/reconciler_scraper.go b/pkg/controller/service/reconciler_scraper.go index 7e2fb489..cacdfa18 100644 --- a/pkg/controller/service/reconciler_scraper.go +++ b/pkg/controller/service/reconciler_scraper.go @@ -13,7 +13,7 @@ func (s *ServiceReconciler) ScrapeKube(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("attempting to collect all runtime services for the cluster") runtimeServices := map[string]string{} - deployments, err := s.Clientset.AppsV1().Deployments("").List(ctx, metav1.ListOptions{}) + deployments, err := s.clientset.AppsV1().Deployments("").List(ctx, metav1.ListOptions{}) if err == nil { logger.Info("aggregating from deployments") for _, deployment := range deployments.Items { @@ -21,7 +21,7 @@ func (s *ServiceReconciler) ScrapeKube(ctx context.Context) { } } - statefulSets, err := s.Clientset.AppsV1().StatefulSets("").List(ctx, metav1.ListOptions{}) + statefulSets, err := s.clientset.AppsV1().StatefulSets("").List(ctx, metav1.ListOptions{}) if err == nil { logger.Info("aggregating from statefulsets") for _, ss := range statefulSets.Items { @@ -29,14 +29,14 @@ func (s *ServiceReconciler) ScrapeKube(ctx context.Context) { } } - daemonSets, err := s.Clientset.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{}) + daemonSets, err := s.clientset.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{}) if err == nil { logger.Info("aggregating from daemonsets") for _, ss := range daemonSets.Items { AddRuntimeServiceInfo(ss.GetLabels(), runtimeServices) } } - if err := s.ConsoleClient.RegisterRuntimeServices(runtimeServices, nil); err != nil { + if err := s.consoleClient.RegisterRuntimeServices(runtimeServices, nil); err != nil { logger.Error(err, "failed to register runtime services, this is an ignorable error but could mean your console needs to be upgraded") } } diff --git a/pkg/controller/service/reconciler_status.go b/pkg/controller/service/reconciler_status.go index 426010ac..9b060a22 100644 --- a/pkg/controller/service/reconciler_status.go +++ b/pkg/controller/service/reconciler_status.go @@ -100,21 +100,20 @@ func (s *ServiceReconciler) UpdateApplyStatus( if e.ApplyEvent.Status == event.ApplySuccessful { cache.SaveResourceSHA(e.ApplyEvent.Resource, cache.ApplySHA) } - if e.ApplyEvent.Error != nil { - // e.ApplyEvent.Resource == nil, create the key to get cache entry - key := cache.ResourceKey{ - Namespace: namespace, - Name: name, - GroupKind: gk, - } - sha, exists := cache.GetResourceCache().GetCacheEntry(key.ObjectIdentifier()) - if exists { - // clear SHA when error occurs - sha.Expire() - cache.GetResourceCache().SetCacheEntry(key.ObjectIdentifier(), sha) - } if e.ApplyEvent.Status == event.ApplyFailed { + // e.ApplyEvent.Resource == nil, create the key to get cache entry + key := cache.ResourceKey{ + Namespace: namespace, + Name: name, + GroupKind: gk, + } + sha, exists := cache.GetResourceCache().GetCacheEntry(key.ObjectIdentifier()) + if exists { + // clear SHA when error occurs + sha.Expire() + cache.GetResourceCache().SetCacheEntry(key.ObjectIdentifier(), sha) + } err = fmt.Errorf("%s apply %s: %s\n", resourceIDToString(gk, name), strings.ToLower(e.ApplyEvent.Status.String()), e.ApplyEvent.Error.Error()) logger.Error(err, "apply error") @@ -239,11 +238,11 @@ func (s *ServiceReconciler) UpdateStatus(id, revisionID string, sha *string, com errs = append(errs, err) } - return s.ConsoleClient.UpdateComponents(id, revisionID, sha, components, errs) + return s.consoleClient.UpdateComponents(id, revisionID, sha, components, errs) } func (s *ServiceReconciler) AddErrors(id string, err *console.ServiceErrorAttributes) error { - return s.ConsoleClient.AddServiceErrors(id, []*console.ServiceErrorAttributes{err}) + return s.consoleClient.AddServiceErrors(id, []*console.ServiceErrorAttributes{err}) } func resourceIDToString(gk schema.GroupKind, name string) string { diff --git a/pkg/controller/service/status_collector.go b/pkg/controller/service/status_collector.go index 4967d1bc..912a6d96 100644 --- a/pkg/controller/service/status_collector.go +++ b/pkg/controller/service/status_collector.go @@ -8,13 +8,13 @@ import ( "github.com/samber/lo" "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/object" "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/pkg/cache" "github.com/pluralsh/deployment-operator/pkg/common" - "github.com/pluralsh/deployment-operator/pkg/log" ) type serviceComponentsStatusCollector struct { @@ -45,12 +45,12 @@ func (sc *serviceComponentsStatusCollector) updateApplyStatus(id object.ObjMetad } func (sc *serviceComponentsStatusCollector) refetch(resource *unstructured.Unstructured) *unstructured.Unstructured { - if sc.reconciler.Clientset == nil || resource == nil { + if sc.reconciler.clientset == nil || resource == nil { return nil } response := new(unstructured.Unstructured) - err := sc.reconciler.Clientset.RESTClient().Get().AbsPath(toAPIPath(resource)).Do(context.Background()).Into(response) + err := sc.reconciler.clientset.RESTClient().Get().AbsPath(toAPIPath(resource)).Do(context.Background()).Into(response) if err != nil { return nil } @@ -120,7 +120,7 @@ func (sc *serviceComponentsStatusCollector) componentsAttributes(vcache map[sche for key := range diff { e, err := cache.GetResourceCache().GetCacheStatus(key) if err != nil { - log.Logger.Error(err, "failed to get cache status") + klog.ErrorS(err, "failed to get cache status") continue } gname := schema.GroupName{ diff --git a/pkg/controller/stacks/job.go b/pkg/controller/stacks/job.go index e13b4193..0e9a772a 100644 --- a/pkg/controller/stacks/job.go +++ b/pkg/controller/stacks/job.go @@ -80,22 +80,22 @@ func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.Stac logger := log.FromContext(ctx) jobName := GetRunJobName(run) foundJob := &batchv1.Job{} - if err := r.K8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: r.Namespace}, foundJob); err != nil { + if err := r.k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: r.namespace}, foundJob); err != nil { if !apierrs.IsNotFound(err) { return nil, err } - logger.V(2).Info("generating job", "namespace", r.Namespace, "name", jobName) + logger.V(2).Info("generating job", "namespace", r.namespace, "name", jobName) job := r.GenerateRunJob(run, jobName) logger.V(2).Info("creating job", "namespace", job.Namespace, "name", job.Name) - if err := r.K8sClient.Create(ctx, job); err != nil { + if err := r.k8sClient.Create(ctx, job); err != nil { logger.Error(err, "unable to create job") return nil, err } metrics.Record().StackRunJobCreation() - if err := r.ConsoleClient.UpdateStackRun(run.ID, console.StackRunAttributes{ + if err := r.consoleClient.UpdateStackRun(run.ID, console.StackRunAttributes{ Status: run.Status, JobRef: &console.NamespacedName{ Name: job.Name, @@ -137,7 +137,7 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str jobSpec.Template.Annotations[podDefaultContainerAnnotation] = DefaultJobContainer if jobSpec.Template.ObjectMeta.Namespace == "" { - jobSpec.Template.ObjectMeta.Namespace = r.Namespace + jobSpec.Template.ObjectMeta.Namespace = r.namespace } jobSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever @@ -154,7 +154,7 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: r.Namespace, + Namespace: r.namespace, Annotations: map[string]string{jobSelector: name}, Labels: map[string]string{jobSelector: name}, }, @@ -295,8 +295,8 @@ func (r *StackReconciler) getTag(run *console.StackRunFragment) string { func (r *StackReconciler) getDefaultContainerArgs(runID string) []string { return []string{ - fmt.Sprintf("--console-url=%s", r.ConsoleURL), - fmt.Sprintf("--console-token=%s", r.DeployToken), + fmt.Sprintf("--console-url=%s", r.consoleURL), + fmt.Sprintf("--console-token=%s", r.deployToken), fmt.Sprintf("--stack-run-id=%s", runID), } } diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index 0f746bdb..8c59df0d 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -14,7 +14,7 @@ import ( clienterrors "github.com/pluralsh/deployment-operator/internal/errors" "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/common" "github.com/pluralsh/deployment-operator/pkg/websocket" ) @@ -23,55 +23,73 @@ const ( ) type StackReconciler struct { - ConsoleClient client.Client - K8sClient ctrlclient.Client - StackQueue workqueue.TypedRateLimitingInterface[string] - StackCache *client.Cache[console.StackRunFragment] - Namespace string - ConsoleURL string - DeployToken string - PollInterval time.Duration + consoleClient client.Client + k8sClient ctrlclient.Client + stackQueue workqueue.TypedRateLimitingInterface[string] + stackCache *client.Cache[console.StackRunFragment] + namespace string + consoleURL string + deployToken string + pollInterval time.Duration } func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler { return &StackReconciler{ - ConsoleClient: consoleClient, - K8sClient: k8sClient, - StackQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), - StackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) { + consoleClient: consoleClient, + k8sClient: k8sClient, + stackQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + stackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) { return consoleClient.GetStackRun(id) }), - Namespace: namespace, - ConsoleURL: consoleURL, - DeployToken: deployToken, - PollInterval: pollInterval, + consoleURL: consoleURL, + deployToken: deployToken, + pollInterval: pollInterval, + namespace: namespace, } } +func (r *StackReconciler) Queue() workqueue.TypedRateLimitingInterface[string] { + return r.stackQueue +} + +func (r *StackReconciler) Restart() { + // Cleanup + r.stackQueue.ShutDown() + r.stackCache.Wipe() + + // Initialize + r.stackQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) +} + +func (r *StackReconciler) Shutdown() { + r.stackQueue.ShutDown() + r.stackCache.Wipe() +} + func (r *StackReconciler) GetPollInterval() time.Duration { - return r.PollInterval + return r.pollInterval } func (r *StackReconciler) GetPublisher() (string, websocket.Publisher) { return "stack.run.event", &socketPublisher{ - stackRunQueue: r.StackQueue, - stackRunCache: r.StackCache, + stackRunQueue: r.stackQueue, + stackRunCache: r.stackCache, } } func (r *StackReconciler) WipeCache() { - r.StackCache.Wipe() + r.stackCache.Wipe() } func (r *StackReconciler) ShutdownQueue() { - r.StackQueue.ShutDown() + r.stackQueue.ShutDown() } func (r *StackReconciler) ListStacks(ctx context.Context) *algorithms.Pager[*console.StackRunEdgeFragment] { logger := log.FromContext(ctx) logger.Info("create stack run pager") fetch := func(page *string, size int64) ([]*console.StackRunEdgeFragment, *algorithms.PageInfo, error) { - resp, err := r.ConsoleClient.ListClusterStackRuns(page, &size) + resp, err := r.consoleClient.ListClusterStackRuns(page, &size) if err != nil { logger.Error(err, "failed to fetch stack run") return nil, nil, err @@ -83,10 +101,10 @@ func (r *StackReconciler) ListStacks(ctx context.Context) *algorithms.Pager[*con } return resp.Edges, pageInfo, nil } - return algorithms.NewPager[*console.StackRunEdgeFragment](controller.DefaultPageSize, fetch) + return algorithms.NewPager[*console.StackRunEdgeFragment](common.DefaultPageSize, fetch) } -func (r *StackReconciler) Poll(ctx context.Context) (done bool, err error) { +func (r *StackReconciler) Poll(ctx context.Context) error { logger := log.FromContext(ctx) logger.Info("fetching stacks") pager := r.ListStacks(ctx) @@ -95,21 +113,21 @@ func (r *StackReconciler) Poll(ctx context.Context) (done bool, err error) { stacks, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch stack run list") - return false, nil + return err } for _, stack := range stacks { logger.Info("sending update for", "stack run", stack.Node.ID) - r.StackQueue.Add(stack.Node.ID) + r.stackQueue.Add(stack.Node.ID) } } - return false, nil + return nil } func (r *StackReconciler) Reconcile(ctx context.Context, id string) (reconcile.Result, error) { logger := log.FromContext(ctx) logger.Info("attempting to sync stack run", "id", id) - stackRun, err := r.StackCache.Get(id) + stackRun, err := r.stackCache.Get(id) if err != nil { if clienterrors.IsNotFound(err) { logger.Info("stack run already deleted", "id", id) diff --git a/pkg/controller/v1/types.go b/pkg/controller/v1/types.go new file mode 100644 index 00000000..6beccc40 --- /dev/null +++ b/pkg/controller/v1/types.go @@ -0,0 +1,36 @@ +package v1 + +import ( + "context" + "time" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/pluralsh/deployment-operator/pkg/websocket" +) + +type Reconciler interface { + // Reconcile Kubernetes resources to reflect state from the Console. + Reconcile(context.Context, string) (reconcile.Result, error) + + // Poll Console for any state changes and put them in the queue that will be consumed by Reconcile. + Poll(context.Context) error + + // GetPublisher returns event name, i.e. "event.service", and Publisher that will be registered with this reconciler. + // TODO: Make it optional and/or accept multiple publishers. + GetPublisher() (string, websocket.Publisher) + + // Queue returns a queue. + Queue() workqueue.TypedRateLimitingInterface[string] + + // Shutdown shuts down the reconciler cache & queue + Shutdown() + + // Restart initiates a reconciler restart. It ensures queue and cache are + // safely cleaned up and reinitialized. + Restart() + + // GetPollInterval returns custom poll interval. If 0 then controller manager use default from the options. + GetPollInterval() time.Duration +} diff --git a/pkg/harness/tool/terraform/terraform.go b/pkg/harness/tool/terraform/terraform.go index 8490c0a9..9f0a2263 100644 --- a/pkg/harness/tool/terraform/terraform.go +++ b/pkg/harness/tool/terraform/terraform.go @@ -146,7 +146,7 @@ func (in *Terraform) init() v1.Tool { in.planFileName = "terraform.tfplan" helpers.EnsureFileOrDie(path.Join(in.dir, in.planFileName), nil) - if in.variables != nil && *in.variables == "" { + if in.variables != nil && len(*in.variables) > 0 { in.variablesFileName = "plural.auto.tfvars.json" helpers.EnsureFileOrDie(path.Join(in.dir, in.variablesFileName), in.variables) } diff --git a/pkg/log/zap.go b/pkg/log/zap.go deleted file mode 100644 index ac0ba9a3..00000000 --- a/pkg/log/zap.go +++ /dev/null @@ -1,154 +0,0 @@ -package log - -import ( - "flag" - "fmt" - "os" - "strings" - - "github.com/spf13/pflag" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -func init() { - DefaultOptions = NewDefaultOptions() - Logger = NewFromOptions(DefaultOptions).Sugar() -} - -var ( - Logger *zap.SugaredLogger - DefaultOptions Options -) - -// Options exports options struct to be used by cmd's. -type Options struct { - // Enable debug logs - Debug bool - // Log format (JSON or plain text) - Format Format -} - -func NewDefaultOptions() Options { - return Options{ - Debug: false, - Format: FormatJSON, - } -} - -func (o *Options) AddFlags(fs *flag.FlagSet) { - fs.BoolVar(&o.Debug, "log-debug", o.Debug, "Enables more verbose logging") - fs.Var(&o.Format, "log-format", "Log format, one of "+AvailableFormats.String()) -} - -func (o *Options) AddPFlags(fs *pflag.FlagSet) { - fs.BoolVar(&o.Debug, "log-debug", o.Debug, "Enables more verbose logging") - fs.Var(&o.Format, "log-format", "Log format, one of "+AvailableFormats.String()) -} - -func (o *Options) Validate() error { - if !AvailableFormats.Contains(o.Format) { - return fmt.Errorf("invalid log-format specified %q; available: %s", o.Format, AvailableFormats.String()) - } - return nil -} - -type Format string - -// Type implements the pflag.Value interfaces. -func (f *Format) Type() string { - return "string" -} - -// String implements the cli.Value and flag.Value interfaces. -func (f *Format) String() string { - return string(*f) -} - -// Set implements the cli.Value and flag.Value interfaces. -func (f *Format) Set(s string) error { - switch strings.ToLower(s) { - case "json": - *f = FormatJSON - return nil - case "console": - *f = FormatConsole - return nil - default: - return fmt.Errorf("invalid format '%s'", s) - } -} - -type Formats []Format - -const ( - FormatJSON Format = "JSON" - FormatConsole Format = "Console" -) - -var ( - AvailableFormats = Formats{FormatJSON, FormatConsole} -) - -func (f Formats) String() string { - const separator = ", " - var s string - for _, format := range f { - s = s + separator + string(format) - } - return strings.TrimPrefix(s, separator) -} - -func (f Formats) Contains(s Format) bool { - for _, format := range f { - if s == format { - return true - } - } - return false -} - -func NewFromOptions(o Options) *zap.Logger { - return New(o.Debug, o.Format) -} - -func New(debug bool, format Format) *zap.Logger { - // this basically mimics NewConfig, but with a custom sink - sink := zapcore.AddSync(os.Stderr) - - // Level - We only support setting Info+ or Debug+ - lvl := zap.NewAtomicLevelAt(zap.InfoLevel) - if debug { - lvl = zap.NewAtomicLevelAt(zap.DebugLevel) - } - - encCfg := zap.NewProductionEncoderConfig() - // Having a dateformat makes it easier to look at logs outside of something like Kibana - encCfg.TimeKey = "time" - encCfg.EncodeTime = zapcore.ISO8601TimeEncoder - - // production config encodes durations as a float of the seconds value, but we want a more - // readable, precise representation - encCfg.EncodeDuration = zapcore.StringDurationEncoder - - var enc zapcore.Encoder - if format == FormatJSON { - enc = zapcore.NewJSONEncoder(encCfg) - } else { - enc = zapcore.NewConsoleEncoder(encCfg) - } - - opts := []zap.Option{ - zap.AddCaller(), - zap.ErrorOutput(sink), - } - - // coreLog := zapcore.NewCore(&ctrlruntimelzap.KubeAwareEncoder{Encoder: enc}, sink, lvl) - coreLog := zapcore.NewCore(enc, sink, lvl) - return zap.New(coreLog, opts...) -} - -// NewDefault creates new default logger. -func NewDefault() *zap.Logger { - return New(false, FormatJSON) -} diff --git a/pkg/manifests/template/helm_test.go b/pkg/manifests/template/helm_test.go index 63b97962..699ee1fc 100644 --- a/pkg/manifests/template/helm_test.go +++ b/pkg/manifests/template/helm_test.go @@ -64,7 +64,7 @@ var _ = Describe("Helm template", func() { It("should successfully render the helm template", func() { resp, err := NewHelm(dir).Render(svc, utilFactory) Expect(err).NotTo(HaveOccurred()) - Expect(len(resp)).To(Equal(12)) + Expect(len(resp)).To(Equal(13)) }) })