diff --git a/src/k8s/pkg/k8sd/api/capi_access_handler.go b/src/k8s/pkg/k8sd/api/capi_access_handler.go index c83733ecc..49dc1d48e 100644 --- a/src/k8s/pkg/k8sd/api/capi_access_handler.go +++ b/src/k8s/pkg/k8sd/api/capi_access_handler.go @@ -23,7 +23,7 @@ func ValidateCAPIAuthTokenAccessHandler(tokenHeaderName string) func(s state.Sta var err error tokenIsValid, err = database.ValidateClusterAPIToken(ctx, tx, token) if err != nil { - return fmt.Errorf("failed to check CAPI auth token: %w", err) + return fmt.Errorf("ffailed to check CAPI auth token: %w", err) } return nil }); err != nil { diff --git a/src/k8s/pkg/k8sd/api/cluster_config.go b/src/k8s/pkg/k8sd/api/cluster_config.go index 7bc01fcae..7354cef4b 100644 --- a/src/k8s/pkg/k8sd/api/cluster_config.go +++ b/src/k8s/pkg/k8sd/api/cluster_config.go @@ -39,7 +39,6 @@ func (e *Endpoints) putClusterConfig(s state.State, r *http.Request) response.Re return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err)) } - e.provider.NotifyUpdateNodeConfigController() e.provider.NotifyFeatureController( !requestedConfig.Network.Empty(), !requestedConfig.Gateway.Empty(), diff --git a/src/k8s/pkg/k8sd/api/provider.go b/src/k8s/pkg/k8sd/api/provider.go index b7f66f249..bb916c500 100644 --- a/src/k8s/pkg/k8sd/api/provider.go +++ b/src/k8s/pkg/k8sd/api/provider.go @@ -9,6 +9,5 @@ import ( type Provider interface { MicroCluster() *microcluster.MicroCluster Snap() snap.Snap - NotifyUpdateNodeConfigController() NotifyFeatureController(network, gateway, ingress, loadBalancer, localStorage, metricsServer, dns bool) } diff --git a/src/k8s/pkg/k8sd/app/app.go b/src/k8s/pkg/k8sd/app/app.go index ecd11188c..db83f8ceb 100644 --- a/src/k8s/pkg/k8sd/app/app.go +++ b/src/k8s/pkg/k8sd/app/app.go @@ -19,6 +19,7 @@ import ( "github.com/canonical/microcluster/v2/client" "github.com/canonical/microcluster/v2/microcluster" "github.com/canonical/microcluster/v2/state" + ctrl "sigs.k8s.io/controller-runtime" ) // Config defines configuration for the k8sd app. @@ -63,8 +64,8 @@ type App struct { csrsigningController *csrsigning.Controller // updateNodeConfigController - triggerUpdateNodeConfigControllerCh chan struct{} - updateNodeConfigController *controllers.UpdateNodeConfigurationController + nodeConfigReconciler *controllers.NodeConfigurationReconciler + manager ctrl.Manager // featureController triggerFeatureControllerNetworkCh chan struct{} @@ -121,13 +122,10 @@ func New(cfg Config) (*App, error) { log.L().Info("control-plane-config-controller disabled via config") } - app.triggerUpdateNodeConfigControllerCh = make(chan struct{}, 1) - if !cfg.DisableUpdateNodeConfigController { - app.updateNodeConfigController = controllers.NewUpdateNodeConfigurationController( - cfg.Snap, + app.nodeConfigReconciler = controllers.NewNodeConfigurationReconciler( + app.config.Snap, app.readyWg.Wait, - app.triggerUpdateNodeConfigControllerCh, ) } else { log.L().Info("update-node-config-controller disabled via config") diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 0f14fcd26..0210e3198 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -519,6 +519,5 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.MetricsServer.GetEnabled(), cfg.DNS.GetEnabled(), ) - a.NotifyUpdateNodeConfigController() return nil } diff --git a/src/k8s/pkg/k8sd/app/hooks_start.go b/src/k8s/pkg/k8sd/app/hooks_start.go index 54fd2e692..996a8cc83 100644 --- a/src/k8s/pkg/k8sd/app/hooks_start.go +++ b/src/k8s/pkg/k8sd/app/hooks_start.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/canonical/k8s/pkg/k8sd/controllers" "github.com/canonical/k8s/pkg/k8sd/database" databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" "github.com/canonical/k8s/pkg/k8sd/types" @@ -14,6 +15,11 @@ import ( "github.com/canonical/k8s/pkg/utils" pkiutil "github.com/canonical/k8s/pkg/utils/pki" "github.com/canonical/microcluster/v2/state" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" ) func (a *App) onStart(ctx context.Context, s state.State) error { @@ -48,12 +54,11 @@ func (a *App) onStart(ctx context.Context, s state.State) error { } // start update node config controller - if a.updateNodeConfigController != nil { - go a.updateNodeConfigController.Run(ctx, func(ctx context.Context) (types.ClusterConfig, error) { + if a.nodeConfigReconciler != nil { + a.nodeConfigReconciler.SetConfigGetter(func(ctx context.Context) (types.ClusterConfig, error) { return databaseutil.GetClusterConfig(ctx, s) }) } - // start feature controller if a.featureController != nil { go a.featureController.Run( @@ -91,9 +96,6 @@ func (a *App) onStart(ctx context.Context, s state.State) error { return fmt.Errorf("database transaction to update cluster configuration failed: %w", err) } - // DNS IP has changed, notify node config controller - a.NotifyUpdateNodeConfigController() - return nil }, func(ctx context.Context, name types.FeatureName, featureStatus types.FeatureStatus) error { @@ -124,5 +126,68 @@ func (a *App) onStart(ctx context.Context, s state.State) error { ) } + go func() { + mgr, err := setupManager(ctx, a) + if err != nil { + log.FromContext(ctx).Error(err, "Failed to setup manager") + return + } + + a.manager = mgr + log.FromContext(ctx).Info("Starting controller manager") + if err := mgr.Start(ctx); err != nil { + log.FromContext(ctx).Error(err, "Manager failed to start") + } + }() + return nil } + +func setupManager(ctx context.Context, app *App) (manager.Manager, error) { + log.FromContext(ctx).Info("Setting up controller manager, waiting for ready signal") + app.readyWg.Wait() + log.FromContext(ctx).Info("Received ready signal, setting up controller manager") + + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + k8sClient, err := app.config.Snap.KubernetesClient("kube-system") + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes client: %w", err) + } + log.FromContext(ctx).Info("Created kubernetes client") + + options := ctrl.Options{ + Scheme: scheme, + } + + mgr, err := ctrl.NewManager(k8sClient.RESTConfig(), options) + if err != nil { + return nil, fmt.Errorf("failed to create manager: %w", err) + } + + log.FromContext(ctx).Info("Created controller manager") + + if _, err := setupControllers(ctx, app, mgr); err != nil { + return nil, fmt.Errorf("failed to setup controllers: %w", err) + } + + return mgr, nil +} + +func setupControllers(ctx context.Context, app *App, mgr manager.Manager) (*controllers.NodeConfigurationReconciler, error) { + log.FromContext(ctx).Info("Setting up controllers") + if app.nodeConfigReconciler != nil { + log.FromContext(ctx).Info("Setting up node configuration reconciler") + + app.nodeConfigReconciler.SetClient(mgr.GetClient()) + app.nodeConfigReconciler.SetScheme(mgr.GetScheme()) + + if err := app.nodeConfigReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("failed to setup node configuration reconciler: %w", err) + } + return app.nodeConfigReconciler, nil + } + + return nil, nil +} diff --git a/src/k8s/pkg/k8sd/app/provider.go b/src/k8s/pkg/k8sd/app/provider.go index 2e71b94d8..d06a6adce 100644 --- a/src/k8s/pkg/k8sd/app/provider.go +++ b/src/k8s/pkg/k8sd/app/provider.go @@ -15,10 +15,6 @@ func (a *App) Snap() snap.Snap { return a.snap } -func (a *App) NotifyUpdateNodeConfigController() { - utils.MaybeNotify(a.triggerUpdateNodeConfigControllerCh) -} - func (a *App) NotifyFeatureController(network, gateway, ingress, loadBalancer, localStorage, metricsServer, dns bool) { if network { utils.MaybeNotify(a.triggerFeatureControllerNetworkCh) diff --git a/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler.go b/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler.go new file mode 100644 index 000000000..2306e486c --- /dev/null +++ b/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler.go @@ -0,0 +1,193 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/log" + "github.com/canonical/k8s/pkg/snap" + snaputil "github.com/canonical/k8s/pkg/snap/util" + pkiutil "github.com/canonical/k8s/pkg/utils/pki" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type NodeConfigurationReconciler struct { + client.Client + snap snap.Snap + scheme *runtime.Scheme + + waitReady func() + + getClusterConfig func(ctx context.Context) (types.ClusterConfig, error) + + reconciledCh chan struct{} +} + +func NewNodeConfigurationReconciler( + snap snap.Snap, + waitReady func(), +) *NodeConfigurationReconciler { + return &NodeConfigurationReconciler{ + snap: snap, + waitReady: waitReady, + reconciledCh: make(chan struct{}, 1), + } +} + +func (c *NodeConfigurationReconciler) SetupWithManager(mgr ctrl.Manager) error { + if c.Client == nil { + return fmt.Errorf("client must be set before setting up with manager") + } + if c.scheme == nil { + return fmt.Errorf("scheme must be set before setting up with manager") + } + + if err := mgr.Add(c); err != nil { // This registers the Start method + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.ConfigMap{}).WithEventFilter(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return isTargetConfigMap(e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return isTargetConfigMap(e.ObjectNew) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return isTargetConfigMap(e.Object) + }, + GenericFunc: func(e event.GenericEvent) bool { + return isTargetConfigMap(e.Object) + }, + }).Complete(c) +} + +func isTargetConfigMap(obj client.Object) bool { + return obj.GetName() == "k8sd-config" && obj.GetNamespace() == "kube-system" +} + +func (c *NodeConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues( + "controller", "update-node-configuration", + "configmap", req.NamespacedName, + ) + + logger.Info("Reconciling node configuration") + + // Check if we're running on a worker node + if isWorker, err := snaputil.IsWorker(c.snap); err != nil { + logger.Error(err, "Failed to check if running on a worker node") + return reconcile.Result{RequeueAfter: time.Second * 30}, err + } else if isWorker { + logger.Info("Running on worker node, skipping reconciliation") + return reconcile.Result{}, nil + } + + // Get cluster configuration + config, err := c.getClusterConfig(ctx) + if err != nil { + logger.Error(err, "Failed to retrieve cluster configuration") + return reconcile.Result{RequeueAfter: time.Second * 30}, err + } + + logger.Info("Retrieved cluster configuration") + + // Load and process certificates + keyPEM := config.Certificates.GetK8sdPrivateKey() + key, err := pkiutil.LoadRSAPrivateKey(keyPEM) + if err != nil && keyPEM != "" { + return reconcile.Result{}, fmt.Errorf("failed to load cluster RSA key: %w", err) + } + + // Generate ConfigMap data + cmData, err := config.Kubelet.ToConfigMap(key) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to format kubelet configmap data: %w", err) + } + + logger.Info("Generated ConfigMap data") + + // Get existing ConfigMap + cm := &corev1.ConfigMap{} + if err := c.Get(ctx, req.NamespacedName, cm); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "Failed to get ConfigMap") + return reconcile.Result{}, err + } else { + logger.Info("ConfigMap not found, creating new ConfigMap") + cm = &corev1.ConfigMap{ + ObjectMeta: ctrl.ObjectMeta{ + Name: req.Name, + Namespace: req.Namespace, + }, + Data: cmData, + } + if err := c.Create(ctx, cm); err != nil { + logger.Error(err, "Failed to create ConfigMap") + return reconcile.Result{}, err + } + logger.Info("Created ConfigMap") + return reconcile.Result{}, nil + } + } + + logger.Info("Retrieved existing ConfigMap, will update") + + // Update ConfigMap + cm.Data = cmData + if err := c.Update(ctx, cm); err != nil { + logger.Error(err, "Failed to update ConfigMap") + return reconcile.Result{}, err + } + + logger.Info("Updated ConfigMap, reconcile complete") + + // Notify that reconciliation is complete + select { + case c.reconciledCh <- struct{}{}: + default: + } + + return reconcile.Result{}, nil +} + +// ReconciledCh returns the channel that receives notifications when reconciliation completes +func (c *NodeConfigurationReconciler) ReconciledCh() <-chan struct{} { + return c.reconciledCh +} + +func (r *NodeConfigurationReconciler) SetConfigGetter(getter func(context.Context) (types.ClusterConfig, error)) { + r.getClusterConfig = getter +} + +func (r *NodeConfigurationReconciler) SetScheme(scheme *runtime.Scheme) { + r.scheme = scheme +} + +func (r *NodeConfigurationReconciler) SetClient(client client.Client) { + r.Client = client +} + +func (r *NodeConfigurationReconciler) Start(ctx context.Context) error { + // Trigger initial reconciliation + _, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: client.ObjectKey{ + Name: "k8sd-config", + Namespace: "kube-system", + }, + }) + if err != nil { + return err + } + return nil +} diff --git a/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler_test.go b/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler_test.go new file mode 100644 index 000000000..c4edbd83f --- /dev/null +++ b/src/k8s/pkg/k8sd/controllers/node_configuration_reconciler_test.go @@ -0,0 +1,134 @@ +package controllers_test + +import ( + "context" + "path/filepath" + "testing" + + "github.com/canonical/k8s/pkg/k8sd/controllers" + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/snap/mock" + "github.com/canonical/k8s/pkg/utils" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func TestNodeConfigurationReconciler(t *testing.T) { + testCases := []struct { + name string + existingMap bool + expectedConfig types.ClusterConfig + }{ + { + name: "ControlPlane_NotExist", + existingMap: false, + expectedConfig: types.ClusterConfig{ + Kubelet: types.Kubelet{ + ClusterDomain: utils.Pointer("cluster.local"), + }, + }, + }, + { + name: "ControlPlane_ExistingConfig", + existingMap: true, + expectedConfig: types.ClusterConfig{ + Kubelet: types.Kubelet{ + ClusterDomain: utils.Pointer("cluster.local"), + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := NewWithT(t) + ctx := context.Background() + + // Setup scheme + scheme := runtime.NewScheme() + g.Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + // Setup objects for fake client + objects := []client.Object{} + if tc.existingMap { + // Only create initial ConfigMap if test case requires it + initialMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "k8sd-config", + Namespace: "kube-system", + }, + Data: map[string]string{"initial": "data"}, + } + objects = append(objects, initialMap) + } + + // Create fake client + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + Build() + + // Setup mock snap + s := &mock.Snap{ + Mock: mock.Mock{ + EtcdPKIDir: filepath.Join(t.TempDir(), "etcd-pki"), + ServiceArgumentsDir: filepath.Join(t.TempDir(), "args"), + }, + } + + // Create controller + reconciler := controllers.NewNodeConfigurationReconciler( + s, + func() {}, // Mock ready function + ) + + reconciler.SetClient(k8sClient) + reconciler.SetScheme(scheme) + + // Set the config getter + reconciler.SetConfigGetter(func(context.Context) (types.ClusterConfig, error) { + return tc.expectedConfig, nil + }) + + // Verify ConfigMap doesn't exist if it shouldn't + if !tc.existingMap { + var cm corev1.ConfigMap + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "k8sd-config", + Namespace: "kube-system", + }, &cm) + g.Expect(err).To(HaveOccurred()) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue(), "Expected ConfigMap not to exist") + } + + // Trigger reconciliation + req := reconcile.Request{ + NamespacedName: client.ObjectKey{ + Name: "k8sd-config", + Namespace: "kube-system", + }, + } + + result, err := reconciler.Reconcile(ctx, req) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(result).To(Equal(reconcile.Result{})) + + // Verify results + var getResult corev1.ConfigMap + g.Expect(k8sClient.Get(ctx, client.ObjectKey{ + Name: "k8sd-config", + Namespace: "kube-system", + }, &getResult)).To(Succeed()) + + expectedConfigMap, err := tc.expectedConfig.Kubelet.ToConfigMap(nil) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(getResult.Data).To(Equal(expectedConfigMap)) + }) + } +}