diff --git a/go.mod b/go.mod index 9c4ca902..153fd71b 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( k8s.io/apiserver v0.29.11 k8s.io/client-go v0.29.11 k8s.io/component-base v0.29.11 + k8s.io/component-helpers v0.29.11 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 sigs.k8s.io/controller-runtime v0.17.5 ) diff --git a/go.sum b/go.sum index 9e3d9a25..c8a54ccd 100644 --- a/go.sum +++ b/go.sum @@ -2019,6 +2019,8 @@ k8s.io/client-go v0.29.11 h1:mBX7Ub0uqpLMwWz3J/AGS/xKOZsjr349qZ1vxVoL1l8= k8s.io/client-go v0.29.11/go.mod h1:WOEoi/eLg2YEg3/yEd7YK3CNScYkM8AEScQadxUnaTE= k8s.io/component-base v0.29.11 h1:H3GJIyDNPrscvXGP6wx+9gApcwwmrUd0YtCGp5BcHBA= k8s.io/component-base v0.29.11/go.mod h1:0qu1WStER4wu5o8RMRndZUWPVcPH1XBy/QQiDcD6lew= +k8s.io/component-helpers v0.29.11 h1:GdZaSLBLlCa+EzjAnpZ4fGB75rA3qqPLLZKk+CsqNyo= +k8s.io/component-helpers v0.29.11/go.mod h1:gloyih9IiE4Qy/7iLUXqAmxYSUduuIpMCiNYuHfYvD4= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kms v0.29.11 h1:pylaiDJhgfqczvcjMDPI89+VH0OVoGQhscPH1VbBzQE= diff --git a/k3k-kubelet/controller/persistentvolumeclaims.go b/k3k-kubelet/controller/persistentvolumeclaims.go index 6573c90c..f7de5ef0 100644 --- a/k3k-kubelet/controller/persistentvolumeclaims.go +++ b/k3k-kubelet/controller/persistentvolumeclaims.go @@ -63,7 +63,6 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r log := r.logger.With("Cluster", r.clusterName, "PersistentVolumeClaim", req.NamespacedName) var ( virtPVC v1.PersistentVolumeClaim - hostPVC v1.PersistentVolumeClaim cluster v1alpha1.Cluster ) if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil { @@ -93,24 +92,17 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, nil } - // getting the cluster for setting the controller reference - // Add finalizer if it does not exist if controllerutil.AddFinalizer(&virtPVC, pvcFinalizerName) { if err := r.virtualClient.Update(ctx, &virtPVC); err != nil { return reconcile.Result{}, err } } - // create or update the pvc on host - if err := r.hostClient.Get(ctx, types.NamespacedName{Name: syncedPVC.Name, Namespace: r.clusterNamespace}, &hostPVC); err != nil { - if apierrors.IsNotFound(err) { - log.Info("creating the persistent volume for the first time on the host cluster") - return reconcile.Result{}, r.hostClient.Create(ctx, syncedPVC) - } - return reconcile.Result{}, err - } - log.Info("updating pvc on the host cluster") - return reconcile.Result{}, r.hostClient.Update(ctx, syncedPVC) + // create the pvc on host + log.Info("creating the persistent volume for the first time on the host cluster") + // note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being + // handled by the host cluster. + return reconcile.Result{}, ctrlruntimeclient.IgnoreAlreadyExists(r.hostClient.Create(ctx, syncedPVC)) } diff --git a/k3k-kubelet/controller/pod.go b/k3k-kubelet/controller/pod.go new file mode 100644 index 00000000..84e86b39 --- /dev/null +++ b/k3k-kubelet/controller/pod.go @@ -0,0 +1,165 @@ +package controller + +import ( + "context" + + "github.com/rancher/k3k/k3k-kubelet/translate" + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/component-helpers/storage/volume" + ctrl "sigs.k8s.io/controller-runtime" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + podController = "pod-pvc-controller" + pseudoPVLabel = "pod.k3k.io/pseudoPV" +) + +type PodReconciler struct { + virtualClient ctrlruntimeclient.Client + hostClient ctrlruntimeclient.Client + clusterName string + clusterNamespace string + Scheme *runtime.Scheme + HostScheme *runtime.Scheme + logger *log.Logger + Translator translate.ToHostTranslator +} + +// AddPodPVCController adds pod controller to k3k-kubelet +func AddPodPVCController(ctx context.Context, virtMgr, hostMgr manager.Manager, clusterName, clusterNamespace string, logger *log.Logger) error { + translator := translate.ToHostTranslator{ + ClusterName: clusterName, + ClusterNamespace: clusterNamespace, + } + // initialize a new Reconciler + reconciler := PodReconciler{ + virtualClient: virtMgr.GetClient(), + hostClient: hostMgr.GetClient(), + Scheme: virtMgr.GetScheme(), + HostScheme: hostMgr.GetScheme(), + logger: logger.Named(podController), + Translator: translator, + clusterName: clusterName, + clusterNamespace: clusterNamespace, + } + return ctrl.NewControllerManagedBy(virtMgr). + For(&v1.Pod{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: maxConcurrentReconciles, + }). + Complete(&reconciler) +} + +func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := ctrl.LoggerFrom(ctx).WithValues("cluster", r.clusterName, "clusterNamespace", r.clusterNamespace) + var ( + virtPod v1.Pod + cluster v1alpha1.Cluster + ) + if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil { + return reconcile.Result{}, err + } + + // handling pod + if err := r.virtualClient.Get(ctx, req.NamespacedName, &virtPod); err != nil { + return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) + } + // reconcile pods with pvcs + for _, vol := range virtPod.Spec.Volumes { + if vol.PersistentVolumeClaim != nil { + log.Info("Handling pod with pvc") + if err := r.reconcilePodWithPVC(ctx, &virtPod, vol.PersistentVolumeClaim); err != nil { + return reconcile.Result{}, err + } + } + } + return reconcile.Result{}, nil +} + +// reconcilePodWithPVC will make sure to create a fake PV for each PVC for any pod so that it can be scheduled on the virtual-kubelet +// and then created on the host, the PV is not synced to the host cluster. +func (r *PodReconciler) reconcilePodWithPVC(ctx context.Context, pod *v1.Pod, pvcSource *v1.PersistentVolumeClaimVolumeSource) error { + log := ctrl.LoggerFrom(ctx).WithValues("PersistentVolumeClaim", pvcSource.ClaimName) + var ( + pvc v1.PersistentVolumeClaim + ) + if err := r.virtualClient.Get(ctx, types.NamespacedName{Name: pvcSource.ClaimName, Namespace: pod.Namespace}, &pvc); err != nil { + return ctrlruntimeclient.IgnoreNotFound(err) + } + log.Info("Creating pseudo Persistent Volume") + pv := r.pseudoPV(&pvc) + if err := r.virtualClient.Create(ctx, pv); err != nil { + return ctrlruntimeclient.IgnoreAlreadyExists(err) + } + orig := pv.DeepCopy() + pv.Status = v1.PersistentVolumeStatus{ + Phase: v1.VolumeBound, + } + if err := r.virtualClient.Status().Patch(ctx, pv, ctrlruntimeclient.MergeFrom(orig)); err != nil { + return err + } + + log.Info("Patch the status of PersistentVolumeClaim to Bound") + pvcPatch := pvc.DeepCopy() + if pvcPatch.Annotations == nil { + pvcPatch.Annotations = make(map[string]string) + } + pvcPatch.Annotations[volume.AnnBoundByController] = "yes" + pvcPatch.Annotations[volume.AnnBindCompleted] = "yes" + pvcPatch.Status.Phase = v1.ClaimBound + pvcPatch.Status.AccessModes = pvcPatch.Spec.AccessModes + + return r.virtualClient.Status().Update(ctx, pvcPatch) +} + +func (r *PodReconciler) pseudoPV(obj *v1.PersistentVolumeClaim) *v1.PersistentVolume { + storageClass := "" + if obj.Spec.StorageClassName != nil { + storageClass = *obj.Spec.StorageClassName + } + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: obj.Name, + Labels: map[string]string{ + pseudoPVLabel: "true", + }, + Annotations: map[string]string{ + volume.AnnBoundByController: "true", + volume.AnnDynamicallyProvisioned: "k3k-kubelet", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "PersistentVolume", + APIVersion: "v1", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + FlexVolume: &v1.FlexPersistentVolumeSource{ + Driver: "pseudopv", + }, + }, + StorageClassName: storageClass, + VolumeMode: obj.Spec.VolumeMode, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete, + AccessModes: obj.Spec.AccessModes, + Capacity: obj.Spec.Resources.Requests, + ClaimRef: &v1.ObjectReference{ + APIVersion: obj.APIVersion, + UID: obj.UID, + ResourceVersion: obj.ResourceVersion, + Kind: obj.Kind, + Namespace: obj.Namespace, + Name: obj.Name, + }, + }, + } +} diff --git a/k3k-kubelet/controller/webhook/pod.go b/k3k-kubelet/controller/webhook/pod.go index 909c65a8..dd041071 100644 --- a/k3k-kubelet/controller/webhook/pod.go +++ b/k3k-kubelet/controller/webhook/pod.go @@ -32,7 +32,6 @@ const ( type webhookHandler struct { client ctrlruntimeclient.Client scheme *runtime.Scheme - nodeName string serviceName string clusterName string clusterNamespace string @@ -42,7 +41,7 @@ type webhookHandler struct { // AddPodMutatorWebhook will add a mutator webhook to the virtual cluster to // modify the nodeName of the created pods with the name of the virtual kubelet node name // as well as remove any status fields of the downward apis env fields -func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName, serviceName string, logger *log.Logger) error { +func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, serviceName string, logger *log.Logger) error { handler := webhookHandler{ client: mgr.GetClient(), scheme: mgr.GetScheme(), @@ -50,7 +49,6 @@ func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient c serviceName: serviceName, clusterName: clusterName, clusterNamespace: clusterNamespace, - nodeName: nodeName, } // create mutator webhook configuration to the cluster @@ -73,9 +71,6 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error return fmt.Errorf("invalid request: object was type %t not cluster", obj) } w.logger.Infow("mutator webhook request", "Pod", pod.Name, "Namespace", pod.Namespace) - if pod.Spec.NodeName == "" { - pod.Spec.NodeName = w.nodeName - } // look for status.* fields in the env if pod.Annotations == nil { pod.Annotations = make(map[string]string) diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index fdb3cce1..7770b22f 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -93,7 +93,10 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet } hostMgr, err := ctrl.NewManager(hostConfig, manager.Options{ - Scheme: baseScheme, + Scheme: baseScheme, + LeaderElection: true, + LeaderElectionNamespace: c.ClusterNamespace, + LeaderElectionID: c.ClusterName, Metrics: ctrlserver.Options{ BindAddress: ":8083", }, @@ -117,8 +120,11 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet CertDir: "/opt/rancher/k3k-webhook", }) virtualMgr, err := ctrl.NewManager(virtConfig, manager.Options{ - Scheme: virtualScheme, - WebhookServer: webhookServer, + Scheme: virtualScheme, + WebhookServer: webhookServer, + LeaderElection: true, + LeaderElectionNamespace: "kube-system", + LeaderElectionID: c.ClusterName, Metrics: ctrlserver.Options{ BindAddress: ":8084", }, @@ -127,7 +133,7 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet return nil, errors.New("unable to create controller-runtime mgr for virtual cluster: " + err.Error()) } logger.Info("adding pod mutator webhook") - if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.AgentHostname, c.ServiceName, logger); err != nil { + if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.ServiceName, logger); err != nil { return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error()) } @@ -141,6 +147,11 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet return nil, errors.New("failed to add pvc syncer controller: " + err.Error()) } + logger.Info("adding pod pvc controller") + if err := k3kkubeletcontroller.AddPodPVCController(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil { + return nil, errors.New("failed to add pod pvc controller: " + err.Error()) + } + clusterIP, err := clusterIP(ctx, c.ServiceName, c.ClusterNamespace, hostClient) if err != nil { return nil, errors.New("failed to extract the clusterIP for the server service: " + err.Error()) diff --git a/pkg/controller/cluster/agent/agent.go b/pkg/controller/cluster/agent/agent.go index 142cbaad..75f51afa 100644 --- a/pkg/controller/cluster/agent/agent.go +++ b/pkg/controller/cluster/agent/agent.go @@ -8,7 +8,6 @@ import ( "github.com/rancher/k3k/pkg/controller" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) @@ -47,7 +46,7 @@ func ensureObject(ctx context.Context, cfg *Config, obj ctrlruntimeclient.Object }) if result != controllerutil.OperationResultNone { - key := client.ObjectKeyFromObject(obj) + key := ctrlruntimeclient.ObjectKeyFromObject(obj) log.Info(fmt.Sprintf("ensuring %T", obj), "key", key, "result", result) } diff --git a/pkg/controller/cluster/agent/shared.go b/pkg/controller/cluster/agent/shared.go index 2076e14f..978d25e1 100644 --- a/pkg/controller/cluster/agent/shared.go +++ b/pkg/controller/cluster/agent/shared.go @@ -19,7 +19,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "sigs.k8s.io/controller-runtime/pkg/client" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -58,11 +57,11 @@ func (s *SharedAgent) EnsureResources(ctx context.Context) error { s.role(ctx), s.roleBinding(ctx), s.service(ctx), - s.deployment(ctx), + s.daemonset(ctx), s.dnsService(ctx), s.webhookTLS(ctx), ); err != nil { - return fmt.Errorf("failed to ensure some resources: %w\n", err) + return fmt.Errorf("failed to ensure some resources: %w", err) } return nil @@ -106,16 +105,16 @@ version: %s`, cluster.Name, cluster.Namespace, ip, serviceName, token, version) } -func (s *SharedAgent) deployment(ctx context.Context) error { +func (s *SharedAgent) daemonset(ctx context.Context) error { labels := map[string]string{ "cluster": s.cluster.Name, "type": "agent", "mode": "shared", } - deploy := &apps.Deployment{ + deploy := &apps.DaemonSet{ TypeMeta: metav1.TypeMeta{ - Kind: "Deployment", + Kind: "DaemonSet", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ @@ -123,7 +122,7 @@ func (s *SharedAgent) deployment(ctx context.Context) error { Namespace: s.cluster.Namespace, Labels: labels, }, - Spec: apps.DeploymentSpec{ + Spec: apps.DaemonSetSpec{ Selector: &metav1.LabelSelector{ MatchLabels: labels, }, @@ -141,9 +140,9 @@ func (s *SharedAgent) deployment(ctx context.Context) error { func (s *SharedAgent) podSpec() v1.PodSpec { var limit v1.ResourceList - return v1.PodSpec{ ServiceAccountName: s.Name(), + NodeSelector: s.cluster.Spec.NodeSelector, Volumes: []v1.Volume{ { Name: "config", @@ -345,6 +344,11 @@ func (s *SharedAgent) role(ctx context.Context) error { Resources: []string{"clusters"}, Verbs: []string{"get", "watch", "list"}, }, + { + APIGroups: []string{"coordination.k8s.io"}, + Resources: []string{"leases"}, + Verbs: []string{"*"}, + }, }, } @@ -390,7 +394,7 @@ func (s *SharedAgent) webhookTLS(ctx context.Context) error { }, } - key := client.ObjectKeyFromObject(webhookSecret) + key := ctrlruntimeclient.ObjectKeyFromObject(webhookSecret) if err := s.client.Get(ctx, key, webhookSecret); err != nil { if !apierrors.IsNotFound(err) { return err diff --git a/pkg/controller/cluster/agent/virtual.go b/pkg/controller/cluster/agent/virtual.go index 2bed7400..2294ed1c 100644 --- a/pkg/controller/cluster/agent/virtual.go +++ b/pkg/controller/cluster/agent/virtual.go @@ -41,7 +41,7 @@ func (v *VirtualAgent) EnsureResources(ctx context.Context) error { v.config(ctx), v.deployment(ctx), ); err != nil { - return fmt.Errorf("failed to ensure some resources: %w\n", err) + return fmt.Errorf("failed to ensure some resources: %w", err) } return nil