Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use controller-runtime for update-node-configuration-controller #901

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/capi_access_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ValidateCAPIAuthTokenAccessHandler(tokenHeaderName string) func(s state.Sta
var err error
tokenIsValid, err = database.ValidateClusterAPIToken(ctx, tx, token)
if err != nil {
return fmt.Errorf("failed to check CAPI auth token: %w", err)
return fmt.Errorf("ffailed to check CAPI auth token: %w", err)
}
return nil
}); err != nil {
Expand Down
1 change: 0 additions & 1 deletion src/k8s/pkg/k8sd/api/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (e *Endpoints) putClusterConfig(s state.State, r *http.Request) response.Re
return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err))
}

e.provider.NotifyUpdateNodeConfigController()
e.provider.NotifyFeatureController(
!requestedConfig.Network.Empty(),
!requestedConfig.Gateway.Empty(),
Expand Down
1 change: 0 additions & 1 deletion src/k8s/pkg/k8sd/api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ import (
type Provider interface {
MicroCluster() *microcluster.MicroCluster
Snap() snap.Snap
NotifyUpdateNodeConfigController()
NotifyFeatureController(network, gateway, ingress, loadBalancer, localStorage, metricsServer, dns bool)
}
12 changes: 5 additions & 7 deletions src/k8s/pkg/k8sd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/canonical/microcluster/v2/client"
"github.com/canonical/microcluster/v2/microcluster"
"github.com/canonical/microcluster/v2/state"
ctrl "sigs.k8s.io/controller-runtime"
)

// Config defines configuration for the k8sd app.
Expand Down Expand Up @@ -63,8 +64,8 @@ type App struct {
csrsigningController *csrsigning.Controller

// updateNodeConfigController
triggerUpdateNodeConfigControllerCh chan struct{}
updateNodeConfigController *controllers.UpdateNodeConfigurationController
nodeConfigReconciler *controllers.NodeConfigurationReconciler
manager ctrl.Manager

// featureController
triggerFeatureControllerNetworkCh chan struct{}
Expand Down Expand Up @@ -121,13 +122,10 @@ func New(cfg Config) (*App, error) {
log.L().Info("control-plane-config-controller disabled via config")
}

app.triggerUpdateNodeConfigControllerCh = make(chan struct{}, 1)

if !cfg.DisableUpdateNodeConfigController {
app.updateNodeConfigController = controllers.NewUpdateNodeConfigurationController(
cfg.Snap,
app.nodeConfigReconciler = controllers.NewNodeConfigurationReconciler(
app.config.Snap,
app.readyWg.Wait,
app.triggerUpdateNodeConfigControllerCh,
)
} else {
log.L().Info("update-node-config-controller disabled via config")
Expand Down
1 change: 0 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,5 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
cfg.MetricsServer.GetEnabled(),
cfg.DNS.GetEnabled(),
)
a.NotifyUpdateNodeConfigController()
return nil
}
77 changes: 71 additions & 6 deletions src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import (
"fmt"
"time"

"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/k8sd/database"
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
pkiutil "github.com/canonical/k8s/pkg/utils/pki"
"github.com/canonical/microcluster/v2/state"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func (a *App) onStart(ctx context.Context, s state.State) error {
Expand Down Expand Up @@ -48,12 +54,11 @@ func (a *App) onStart(ctx context.Context, s state.State) error {
}

// start update node config controller
if a.updateNodeConfigController != nil {
go a.updateNodeConfigController.Run(ctx, func(ctx context.Context) (types.ClusterConfig, error) {
if a.nodeConfigReconciler != nil {
a.nodeConfigReconciler.SetConfigGetter(func(ctx context.Context) (types.ClusterConfig, error) {
return databaseutil.GetClusterConfig(ctx, s)
})
}

// start feature controller
if a.featureController != nil {
go a.featureController.Run(
Expand Down Expand Up @@ -91,9 +96,6 @@ func (a *App) onStart(ctx context.Context, s state.State) error {
return fmt.Errorf("database transaction to update cluster configuration failed: %w", err)
}

// DNS IP has changed, notify node config controller
a.NotifyUpdateNodeConfigController()

return nil
},
func(ctx context.Context, name types.FeatureName, featureStatus types.FeatureStatus) error {
Expand Down Expand Up @@ -124,5 +126,68 @@ func (a *App) onStart(ctx context.Context, s state.State) error {
)
}

go func() {
mgr, err := setupManager(ctx, a)
if err != nil {
log.FromContext(ctx).Error(err, "Failed to setup manager")
return
}

a.manager = mgr
log.FromContext(ctx).Info("Starting controller manager")
if err := mgr.Start(ctx); err != nil {
log.FromContext(ctx).Error(err, "Manager failed to start")
}
}()

return nil
}

func setupManager(ctx context.Context, app *App) (manager.Manager, error) {
log.FromContext(ctx).Info("Setting up controller manager, waiting for ready signal")
app.readyWg.Wait()
log.FromContext(ctx).Info("Received ready signal, setting up controller manager")

scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

k8sClient, err := app.config.Snap.KubernetesClient("kube-system")
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
log.FromContext(ctx).Info("Created kubernetes client")

options := ctrl.Options{
Scheme: scheme,
}

mgr, err := ctrl.NewManager(k8sClient.RESTConfig(), options)
if err != nil {
return nil, fmt.Errorf("failed to create manager: %w", err)
}

log.FromContext(ctx).Info("Created controller manager")

if _, err := setupControllers(ctx, app, mgr); err != nil {
return nil, fmt.Errorf("failed to setup controllers: %w", err)
}

return mgr, nil
}

func setupControllers(ctx context.Context, app *App, mgr manager.Manager) (*controllers.NodeConfigurationReconciler, error) {
log.FromContext(ctx).Info("Setting up controllers")
if app.nodeConfigReconciler != nil {
log.FromContext(ctx).Info("Setting up node configuration reconciler")

app.nodeConfigReconciler.SetClient(mgr.GetClient())
app.nodeConfigReconciler.SetScheme(mgr.GetScheme())

if err := app.nodeConfigReconciler.SetupWithManager(mgr); err != nil {
return nil, fmt.Errorf("failed to setup node configuration reconciler: %w", err)
}
return app.nodeConfigReconciler, nil
}

return nil, nil
}
4 changes: 0 additions & 4 deletions src/k8s/pkg/k8sd/app/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ func (a *App) Snap() snap.Snap {
return a.snap
}

func (a *App) NotifyUpdateNodeConfigController() {
utils.MaybeNotify(a.triggerUpdateNodeConfigControllerCh)
}

func (a *App) NotifyFeatureController(network, gateway, ingress, loadBalancer, localStorage, metricsServer, dns bool) {
if network {
utils.MaybeNotify(a.triggerFeatureControllerNetworkCh)
Expand Down
193 changes: 193 additions & 0 deletions src/k8s/pkg/k8sd/controllers/node_configuration_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package controllers

import (
"context"
"fmt"
"time"

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
pkiutil "github.com/canonical/k8s/pkg/utils/pki"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type NodeConfigurationReconciler struct {
client.Client
snap snap.Snap
scheme *runtime.Scheme

waitReady func()

getClusterConfig func(ctx context.Context) (types.ClusterConfig, error)

reconciledCh chan struct{}
}

func NewNodeConfigurationReconciler(
snap snap.Snap,
waitReady func(),
) *NodeConfigurationReconciler {
return &NodeConfigurationReconciler{
snap: snap,
waitReady: waitReady,
reconciledCh: make(chan struct{}, 1),
}
}

func (c *NodeConfigurationReconciler) SetupWithManager(mgr ctrl.Manager) error {
if c.Client == nil {
return fmt.Errorf("client must be set before setting up with manager")
}
if c.scheme == nil {
return fmt.Errorf("scheme must be set before setting up with manager")
}

if err := mgr.Add(c); err != nil { // This registers the Start method
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).WithEventFilter(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return isTargetConfigMap(e.Object)
},
UpdateFunc: func(e event.UpdateEvent) bool {
return isTargetConfigMap(e.ObjectNew)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return isTargetConfigMap(e.Object)
},
GenericFunc: func(e event.GenericEvent) bool {
return isTargetConfigMap(e.Object)
},
}).Complete(c)
}

func isTargetConfigMap(obj client.Object) bool {
return obj.GetName() == "k8sd-config" && obj.GetNamespace() == "kube-system"
}

func (c *NodeConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues(
"controller", "update-node-configuration",
"configmap", req.NamespacedName,
)

logger.Info("Reconciling node configuration")

// Check if we're running on a worker node
if isWorker, err := snaputil.IsWorker(c.snap); err != nil {
logger.Error(err, "Failed to check if running on a worker node")
return reconcile.Result{RequeueAfter: time.Second * 30}, err
} else if isWorker {
logger.Info("Running on worker node, skipping reconciliation")
return reconcile.Result{}, nil
}

// Get cluster configuration
config, err := c.getClusterConfig(ctx)
if err != nil {
logger.Error(err, "Failed to retrieve cluster configuration")
return reconcile.Result{RequeueAfter: time.Second * 30}, err
}

logger.Info("Retrieved cluster configuration")

// Load and process certificates
keyPEM := config.Certificates.GetK8sdPrivateKey()
key, err := pkiutil.LoadRSAPrivateKey(keyPEM)
if err != nil && keyPEM != "" {
return reconcile.Result{}, fmt.Errorf("failed to load cluster RSA key: %w", err)
}

// Generate ConfigMap data
cmData, err := config.Kubelet.ToConfigMap(key)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to format kubelet configmap data: %w", err)
}

logger.Info("Generated ConfigMap data")

// Get existing ConfigMap
cm := &corev1.ConfigMap{}
if err := c.Get(ctx, req.NamespacedName, cm); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to get ConfigMap")
return reconcile.Result{}, err
} else {
logger.Info("ConfigMap not found, creating new ConfigMap")
cm = &corev1.ConfigMap{
ObjectMeta: ctrl.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
},
Data: cmData,
}
if err := c.Create(ctx, cm); err != nil {
logger.Error(err, "Failed to create ConfigMap")
return reconcile.Result{}, err
}
logger.Info("Created ConfigMap")
return reconcile.Result{}, nil
}
}

logger.Info("Retrieved existing ConfigMap, will update")

// Update ConfigMap
cm.Data = cmData
if err := c.Update(ctx, cm); err != nil {
logger.Error(err, "Failed to update ConfigMap")
return reconcile.Result{}, err
}

logger.Info("Updated ConfigMap, reconcile complete")

// Notify that reconciliation is complete
select {
case c.reconciledCh <- struct{}{}:
default:
}

return reconcile.Result{}, nil
}

// ReconciledCh returns the channel that receives notifications when reconciliation completes
func (c *NodeConfigurationReconciler) ReconciledCh() <-chan struct{} {
return c.reconciledCh
}

func (r *NodeConfigurationReconciler) SetConfigGetter(getter func(context.Context) (types.ClusterConfig, error)) {
r.getClusterConfig = getter
}

func (r *NodeConfigurationReconciler) SetScheme(scheme *runtime.Scheme) {
r.scheme = scheme
}

func (r *NodeConfigurationReconciler) SetClient(client client.Client) {
r.Client = client
}

func (r *NodeConfigurationReconciler) Start(ctx context.Context) error {
// Trigger initial reconciliation
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: client.ObjectKey{
Name: "k8sd-config",
Namespace: "kube-system",
},
})
if err != nil {
return err
}
return nil
}
Loading
Loading