diff --git a/.golangci.yml b/.golangci.yml index 235651d9..4590817f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,3 +10,4 @@ linters: # extra - misspell + - wsl diff --git a/cli/cmds/cluster/create.go b/cli/cmds/cluster/create.go index ecebf4b1..d3ec03d8 100644 --- a/cli/cmds/cluster/create.go +++ b/cli/cmds/cluster/create.go @@ -97,6 +97,7 @@ func createAction(config *CreateConfig) cli.ActionFunc { if config.token != "" { logrus.Infof("Creating cluster token secret") + obj := k3kcluster.TokenSecretObj(config.token, name, cmds.Namespace()) if err := ctrlClient.Create(ctx, &obj); err != nil { return err @@ -116,10 +117,12 @@ func createAction(config *CreateConfig) cli.ActionFunc { if err != nil { return err } + host := strings.Split(url.Host, ":") if config.kubeconfigServerHost != "" { host = []string{config.kubeconfigServerHost} } + cluster.Spec.TLSSANs = []string{host[0]} if err := ctrlClient.Create(ctx, cluster); err != nil { @@ -144,6 +147,7 @@ func createAction(config *CreateConfig) cli.ActionFunc { cfg := kubeconfig.New() var kubeconfig *clientcmdapi.Config + if err := retry.OnError(availableBackoff, apierrors.IsNotFound, func() error { kubeconfig, err = cfg.Extract(ctx, ctrlClient, cluster, host[0]) return err @@ -199,6 +203,7 @@ func newCluster(name, namespace string, config *CreateConfig) *v1alpha1.Cluster if config.storageClassName == "" { cluster.Spec.Persistence.StorageClassName = nil } + if config.token != "" { cluster.Spec.TokenSecretRef = &v1.SecretReference{ Name: k3kcluster.TokenSecretName(name), diff --git a/cli/cmds/cluster/delete.go b/cli/cmds/cluster/delete.go index d99f38b8..ec56b6e6 100644 --- a/cli/cmds/cluster/delete.go +++ b/cli/cmds/cluster/delete.go @@ -57,5 +57,6 @@ func delete(clx *cli.Context) error { Namespace: cmds.Namespace(), }, } + return ctrlClient.Delete(ctx, &cluster) } diff --git a/cli/cmds/kubeconfig/kubeconfig.go b/cli/cmds/kubeconfig/kubeconfig.go index 39d05cc3..8e080d37 100644 --- a/cli/cmds/kubeconfig/kubeconfig.go +++ b/cli/cmds/kubeconfig/kubeconfig.go @@ -102,6 +102,7 @@ func NewCommand() *cli.Command { func generate(clx *cli.Context) error { var cluster v1alpha1.Cluster + ctx := context.Background() restConfig, err := clientcmd.BuildConfigFromFlags("", cmds.Kubeconfig) @@ -115,6 +116,7 @@ func generate(clx *cli.Context) error { if err != nil { return err } + clusterKey := types.NamespacedName{ Name: name, Namespace: cmds.Namespace(), @@ -128,11 +130,12 @@ func generate(clx *cli.Context) error { if err != nil { return err } + host := strings.Split(url.Host, ":") if kubeconfigServerHost != "" { host = []string{kubeconfigServerHost} - err := altNames.Set(kubeconfigServerHost) - if err != nil { + + if err := altNames.Set(kubeconfigServerHost); err != nil { return err } } @@ -154,6 +157,7 @@ func generate(clx *cli.Context) error { logrus.Infof("waiting for cluster to be available..") var kubeconfig *clientcmdapi.Config + if err := retry.OnError(controller.Backoff, apierrors.IsNotFound, func() error { kubeconfig, err = cfg.Extract(ctx, ctrlClient, &cluster, host[0]) return err diff --git a/cli/cmds/root.go b/cli/cmds/root.go index ab36b2c2..189e2fbf 100644 --- a/cli/cmds/root.go +++ b/cli/cmds/root.go @@ -48,6 +48,7 @@ func NewApp() *cli.App { if debug { logrus.SetLevel(logrus.DebugLevel) } + return nil } @@ -58,5 +59,6 @@ func Namespace() string { if namespace == "" { return defaultNamespace } + return namespace } diff --git a/k3k-kubelet/config.go b/k3k-kubelet/config.go index 7b36c20c..258aa289 100644 --- a/k3k-kubelet/config.go +++ b/k3k-kubelet/config.go @@ -31,33 +31,43 @@ func (c *config) unmarshalYAML(data []byte) error { if c.ClusterName == "" { c.ClusterName = conf.ClusterName } + if c.ClusterNamespace == "" { c.ClusterNamespace = conf.ClusterNamespace } + if c.HostConfigPath == "" { c.HostConfigPath = conf.HostConfigPath } + if c.VirtualConfigPath == "" { c.VirtualConfigPath = conf.VirtualConfigPath } + if c.KubeletPort == "" { c.KubeletPort = conf.KubeletPort } + if c.AgentHostname == "" { c.AgentHostname = conf.AgentHostname } + if c.ServiceName == "" { c.ServiceName = conf.ServiceName } + if c.Token == "" { c.Token = conf.Token } + if c.ServerIP == "" { c.ServerIP = conf.ServerIP } + if c.Version == "" { c.Version = conf.Version } + return nil } @@ -65,12 +75,15 @@ func (c *config) validate() error { if c.ClusterName == "" { return errors.New("cluster name is not provided") } + if c.ClusterNamespace == "" { return errors.New("cluster namespace is not provided") } + if c.AgentHostname == "" { return errors.New("agent Hostname is not provided") } + return nil } @@ -83,5 +96,6 @@ func (c *config) parse(path string) error { if err != nil { return err } + return c.unmarshalYAML(b) } diff --git a/k3k-kubelet/controller/configmap.go b/k3k-kubelet/controller/configmap.go index cffea360..0c9d23f2 100644 --- a/k3k-kubelet/controller/configmap.go +++ b/k3k-kubelet/controller/configmap.go @@ -38,6 +38,7 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request) // return immediately without re-enqueueing. We aren't watching this resource return reconcile.Result{}, nil } + var virtual corev1.ConfigMap if err := c.VirtualClient.Get(ctx, req.NamespacedName, &virtual); err != nil { @@ -45,16 +46,19 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request) Requeue: true, }, fmt.Errorf("unable to get configmap %s/%s from virtual cluster: %w", req.Namespace, req.Name, err) } + translated, err := c.TranslateFunc(&virtual) if err != nil { return reconcile.Result{ Requeue: true, }, fmt.Errorf("unable to translate configmap %s/%s from virtual cluster: %w", req.Namespace, req.Name, err) } + translatedKey := types.NamespacedName{ Namespace: translated.Namespace, Name: translated.Name, } + var host corev1.ConfigMap if err = c.HostClient.Get(ctx, translatedKey, &host); err != nil { if apierrors.IsNotFound(err) { @@ -66,6 +70,7 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request) }, fmt.Errorf("unable to create host configmap %s/%s for virtual configmap %s/%s: %w", translated.Namespace, translated.Name, req.Namespace, req.Name, err) } + return reconcile.Result{Requeue: true}, fmt.Errorf("unable to get host configmap %s/%s: %w", translated.Namespace, translated.Name, err) } // we are going to use the host in order to avoid conflicts on update @@ -79,13 +84,14 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request) for key, value := range translated.Labels { host.Labels[key] = value } + if err = c.HostClient.Update(ctx, &host); err != nil { return reconcile.Result{ Requeue: true, }, fmt.Errorf("unable to update host configmap %s/%s for virtual configmap %s/%s: %w", translated.Namespace, translated.Name, req.Namespace, req.Name, err) - } + return reconcile.Result{}, nil } @@ -94,6 +100,7 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request) func (c *ConfigMapSyncer) isWatching(key types.NamespacedName) bool { c.mutex.RLock() defer c.mutex.RUnlock() + return c.objs.Has(key) } @@ -104,23 +111,29 @@ func (c *ConfigMapSyncer) AddResource(ctx context.Context, namespace, name strin Namespace: namespace, Name: name, } + // if we already sync this object, no need to writelock/add it if c.isWatching(objKey) { return nil } + // lock in write mode since we are now adding the key c.mutex.Lock() if c.objs == nil { c.objs = sets.Set[types.NamespacedName]{} } + c.objs = c.objs.Insert(objKey) c.mutex.Unlock() + _, err := c.Reconcile(ctx, reconcile.Request{ NamespacedName: objKey, }) + if err != nil { return fmt.Errorf("unable to reconcile new object %s/%s: %w", objKey.Namespace, objKey.Name, err) } + return nil } @@ -143,24 +156,34 @@ func (c *ConfigMapSyncer) RemoveResource(ctx context.Context, namespace, name st }); err != nil { return fmt.Errorf("unable to remove configmap: %w", err) } + c.mutex.Lock() if c.objs == nil { c.objs = sets.Set[types.NamespacedName]{} } + c.objs = c.objs.Delete(objKey) c.mutex.Unlock() + return nil } func (c *ConfigMapSyncer) removeHostConfigMap(ctx context.Context, virtualNamespace, virtualName string) error { var vConfigMap corev1.ConfigMap - err := c.VirtualClient.Get(ctx, types.NamespacedName{Namespace: virtualNamespace, Name: virtualName}, &vConfigMap) - if err != nil { + + key := types.NamespacedName{ + Namespace: virtualNamespace, + Name: virtualName, + } + + if err := c.VirtualClient.Get(ctx, key, &vConfigMap); err != nil { return fmt.Errorf("unable to get virtual configmap %s/%s: %w", virtualNamespace, virtualName, err) } + translated, err := c.TranslateFunc(&vConfigMap) if err != nil { return fmt.Errorf("unable to translate virtual secret: %s/%s: %w", virtualNamespace, virtualName, err) } + return c.HostClient.Delete(ctx, translated) } diff --git a/k3k-kubelet/controller/handler.go b/k3k-kubelet/controller/handler.go index c129a648..b4a9fb61 100644 --- a/k3k-kubelet/controller/handler.go +++ b/k3k-kubelet/controller/handler.go @@ -45,17 +45,22 @@ type updateableReconciler interface { func (c *ControllerHandler) AddResource(ctx context.Context, obj client.Object) error { c.RLock() + controllers := c.controllers if controllers != nil { if r, ok := c.controllers[obj.GetObjectKind().GroupVersionKind()]; ok { err := r.AddResource(ctx, obj.GetNamespace(), obj.GetName()) c.RUnlock() + return err } } + // we need to manually lock/unlock since we intned on write locking to add a new controller c.RUnlock() + var r updateableReconciler + switch obj.(type) { case *v1.Secret: r = &SecretSyncer{ @@ -89,19 +94,23 @@ func (c *ControllerHandler) AddResource(ctx context.Context, obj client.Object) // TODO: Technically, the configmap/secret syncers are relatively generic, and this // logic could be used for other types. return fmt.Errorf("unrecognized type: %T", obj) - } + err := ctrl.NewControllerManagedBy(c.Mgr). For(&v1.ConfigMap{}). Complete(r) + if err != nil { return fmt.Errorf("unable to start configmap controller: %w", err) } + c.Lock() if c.controllers == nil { c.controllers = map[schema.GroupVersionKind]updateableReconciler{} } + c.controllers[obj.GetObjectKind().GroupVersionKind()] = r + c.Unlock() return r.AddResource(ctx, obj.GetNamespace(), obj.GetName()) @@ -112,8 +121,10 @@ func (c *ControllerHandler) RemoveResource(ctx context.Context, obj client.Objec c.RLock() ctrl, ok := c.controllers[obj.GetObjectKind().GroupVersionKind()] c.RUnlock() + if !ok { return fmt.Errorf("no controller found for gvk %s", obj.GetObjectKind().GroupVersionKind()) } + return ctrl.RemoveResource(ctx, obj.GetNamespace(), obj.GetName()) } diff --git a/k3k-kubelet/controller/persistentvolumeclaims.go b/k3k-kubelet/controller/persistentvolumeclaims.go index f7de5ef0..039abdaf 100644 --- a/k3k-kubelet/controller/persistentvolumeclaims.go +++ b/k3k-kubelet/controller/persistentvolumeclaims.go @@ -51,6 +51,7 @@ func AddPVCSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, cluster clusterName: clusterName, clusterNamespace: clusterNamespace, } + return ctrl.NewControllerManagedBy(virtMgr). For(&v1.PersistentVolumeClaim{}). WithOptions(controller.Options{ @@ -61,10 +62,12 @@ func AddPVCSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, cluster func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := r.logger.With("Cluster", r.clusterName, "PersistentVolumeClaim", req.NamespacedName) + var ( virtPVC v1.PersistentVolumeClaim cluster v1alpha1.Cluster ) + if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil { return reconcile.Result{}, err } @@ -73,10 +76,12 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r if err := r.virtualClient.Get(ctx, req.NamespacedName, &virtPVC); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } + syncedPVC := r.pvc(&virtPVC) if err := controllerutil.SetControllerReference(&cluster, syncedPVC, r.HostScheme); err != nil { return reconcile.Result{}, err } + // handle deletion if !virtPVC.DeletionTimestamp.IsZero() { // deleting the synced service if exists @@ -89,6 +94,7 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, err } } + return reconcile.Result{}, nil } @@ -98,16 +104,18 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, err } } + // create the pvc on host log.Info("creating the persistent volume for the first time on the host cluster") + // note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being // handled by the host cluster. return reconcile.Result{}, ctrlruntimeclient.IgnoreAlreadyExists(r.hostClient.Create(ctx, syncedPVC)) - } func (r *PVCReconciler) pvc(obj *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { hostPVC := obj.DeepCopy() r.Translator.TranslateTo(hostPVC) + return hostPVC } diff --git a/k3k-kubelet/controller/pod.go b/k3k-kubelet/controller/pod.go index 84e86b39..717c0241 100644 --- a/k3k-kubelet/controller/pod.go +++ b/k3k-kubelet/controller/pod.go @@ -51,6 +51,7 @@ func AddPodPVCController(ctx context.Context, virtMgr, hostMgr manager.Manager, clusterName: clusterName, clusterNamespace: clusterNamespace, } + return ctrl.NewControllerManagedBy(virtMgr). For(&v1.Pod{}). WithOptions(controller.Options{ @@ -61,10 +62,12 @@ func AddPodPVCController(ctx context.Context, virtMgr, hostMgr manager.Manager, func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx).WithValues("cluster", r.clusterName, "clusterNamespace", r.clusterNamespace) + var ( virtPod v1.Pod cluster v1alpha1.Cluster ) + if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil { return reconcile.Result{}, err } @@ -73,15 +76,18 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r if err := r.virtualClient.Get(ctx, req.NamespacedName, &virtPod); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } + // reconcile pods with pvcs for _, vol := range virtPod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { log.Info("Handling pod with pvc") + if err := r.reconcilePodWithPVC(ctx, &virtPod, vol.PersistentVolumeClaim); err != nil { return reconcile.Result{}, err } } } + return reconcile.Result{}, nil } @@ -89,30 +95,41 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r // and then created on the host, the PV is not synced to the host cluster. func (r *PodReconciler) reconcilePodWithPVC(ctx context.Context, pod *v1.Pod, pvcSource *v1.PersistentVolumeClaimVolumeSource) error { log := ctrl.LoggerFrom(ctx).WithValues("PersistentVolumeClaim", pvcSource.ClaimName) - var ( - pvc v1.PersistentVolumeClaim - ) - if err := r.virtualClient.Get(ctx, types.NamespacedName{Name: pvcSource.ClaimName, Namespace: pod.Namespace}, &pvc); err != nil { + + var pvc v1.PersistentVolumeClaim + + key := types.NamespacedName{ + Name: pvcSource.ClaimName, + Namespace: pod.Namespace, + } + + if err := r.virtualClient.Get(ctx, key, &pvc); err != nil { return ctrlruntimeclient.IgnoreNotFound(err) } + log.Info("Creating pseudo Persistent Volume") + pv := r.pseudoPV(&pvc) if err := r.virtualClient.Create(ctx, pv); err != nil { return ctrlruntimeclient.IgnoreAlreadyExists(err) } + orig := pv.DeepCopy() pv.Status = v1.PersistentVolumeStatus{ Phase: v1.VolumeBound, } + if err := r.virtualClient.Status().Patch(ctx, pv, ctrlruntimeclient.MergeFrom(orig)); err != nil { return err } log.Info("Patch the status of PersistentVolumeClaim to Bound") + pvcPatch := pvc.DeepCopy() if pvcPatch.Annotations == nil { pvcPatch.Annotations = make(map[string]string) } + pvcPatch.Annotations[volume.AnnBoundByController] = "yes" pvcPatch.Annotations[volume.AnnBindCompleted] = "yes" pvcPatch.Status.Phase = v1.ClaimBound @@ -122,10 +139,12 @@ func (r *PodReconciler) reconcilePodWithPVC(ctx context.Context, pod *v1.Pod, pv } func (r *PodReconciler) pseudoPV(obj *v1.PersistentVolumeClaim) *v1.PersistentVolume { - storageClass := "" + var storageClass string + if obj.Spec.StorageClassName != nil { storageClass = *obj.Spec.StorageClassName } + return &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: obj.Name, diff --git a/k3k-kubelet/controller/secret.go b/k3k-kubelet/controller/secret.go index e2dab374..85b1156d 100644 --- a/k3k-kubelet/controller/secret.go +++ b/k3k-kubelet/controller/secret.go @@ -38,6 +38,7 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re // return immediately without re-enqueueing. We aren't watching this resource return reconcile.Result{}, nil } + var virtual corev1.Secret if err := s.VirtualClient.Get(ctx, req.NamespacedName, &virtual); err != nil { @@ -45,16 +46,19 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re Requeue: true, }, fmt.Errorf("unable to get secret %s/%s from virtual cluster: %w", req.Namespace, req.Name, err) } + translated, err := s.TranslateFunc(&virtual) if err != nil { return reconcile.Result{ Requeue: true, }, fmt.Errorf("unable to translate secret %s/%s from virtual cluster: %w", req.Namespace, req.Name, err) } + translatedKey := types.NamespacedName{ Namespace: translated.Namespace, Name: translated.Name, } + var host corev1.Secret if err = s.HostClient.Get(ctx, translatedKey, &host); err != nil { if apierrors.IsNotFound(err) { @@ -66,6 +70,7 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re }, fmt.Errorf("unable to create host secret %s/%s for virtual secret %s/%s: %w", translated.Namespace, translated.Name, req.Namespace, req.Name, err) } + return reconcile.Result{Requeue: true}, fmt.Errorf("unable to get host secret %s/%s: %w", translated.Namespace, translated.Name, err) } // we are going to use the host in order to avoid conflicts on update @@ -79,13 +84,14 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re for key, value := range translated.Labels { host.Labels[key] = value } + if err = s.HostClient.Update(ctx, &host); err != nil { return reconcile.Result{ Requeue: true, }, fmt.Errorf("unable to update host secret %s/%s for virtual secret %s/%s: %w", translated.Namespace, translated.Name, req.Namespace, req.Name, err) - } + return reconcile.Result{}, nil } @@ -94,6 +100,7 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re func (s *SecretSyncer) isWatching(key types.NamespacedName) bool { s.mutex.RLock() defer s.mutex.RUnlock() + return s.objs.Has(key) } @@ -113,14 +120,18 @@ func (s *SecretSyncer) AddResource(ctx context.Context, namespace, name string) if s.objs == nil { s.objs = sets.Set[types.NamespacedName]{} } + s.objs = s.objs.Insert(objKey) s.mutex.Unlock() + _, err := s.Reconcile(ctx, reconcile.Request{ NamespacedName: objKey, }) + if err != nil { return fmt.Errorf("unable to reconcile new object %s/%s: %w", objKey.Namespace, objKey.Name, err) } + return nil } @@ -148,8 +159,10 @@ func (s *SecretSyncer) RemoveResource(ctx context.Context, namespace, name strin if s.objs == nil { s.objs = sets.Set[types.NamespacedName]{} } + s.objs = s.objs.Delete(objKey) s.mutex.Unlock() + return nil } @@ -159,12 +172,15 @@ func (s *SecretSyncer) removeHostSecret(ctx context.Context, virtualNamespace, v Namespace: virtualNamespace, Name: virtualName, }, &vSecret) + if err != nil { return fmt.Errorf("unable to get virtual secret %s/%s: %w", virtualNamespace, virtualName, err) } + translated, err := s.TranslateFunc(&vSecret) if err != nil { return fmt.Errorf("unable to translate virtual secret: %s/%s: %w", virtualNamespace, virtualName, err) } + return s.HostClient.Delete(ctx, translated) } diff --git a/k3k-kubelet/controller/service.go b/k3k-kubelet/controller/service.go index a490d666..b97b7c72 100644 --- a/k3k-kubelet/controller/service.go +++ b/k3k-kubelet/controller/service.go @@ -53,6 +53,7 @@ func AddServiceSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, clu clusterName: clusterName, clusterNamespace: clusterNamespace, } + return ctrl.NewControllerManagedBy(virtMgr). For(&v1.Service{}). WithOptions(controller.Options{ @@ -63,9 +64,11 @@ func AddServiceSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, clu func (s *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := s.logger.With("Cluster", s.clusterName, "Service", req.NamespacedName) + if req.Name == "kubernetes" || req.Name == "kube-dns" { return reconcile.Result{}, nil } + var ( virtService v1.Service hostService v1.Service @@ -75,9 +78,11 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request if err := s.hostClient.Get(ctx, types.NamespacedName{Name: s.clusterName, Namespace: s.clusterNamespace}, &cluster); err != nil { return reconcile.Result{}, err } + if err := s.virtualClient.Get(ctx, req.NamespacedName, &virtService); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } + syncedService := s.service(&virtService) if err := controllerutil.SetControllerReference(&cluster, syncedService, s.HostScheme); err != nil { return reconcile.Result{}, err @@ -89,19 +94,23 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request if err := s.hostClient.Delete(ctx, syncedService); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } + // remove the finalizer after cleaning up the synced service if controllerutil.ContainsFinalizer(&virtService, serviceFinalizerName) { controllerutil.RemoveFinalizer(&virtService, serviceFinalizerName) + if err := s.virtualClient.Update(ctx, &virtService); err != nil { return reconcile.Result{}, err } } + return reconcile.Result{}, nil } // Add finalizer if it does not exist if !controllerutil.ContainsFinalizer(&virtService, serviceFinalizerName) { controllerutil.AddFinalizer(&virtService, serviceFinalizerName) + if err := s.virtualClient.Update(ctx, &virtService); err != nil { return reconcile.Result{}, err } @@ -112,9 +121,12 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request log.Info("creating the service for the first time on the host cluster") return reconcile.Result{}, s.hostClient.Create(ctx, syncedService) } + return reconcile.Result{}, err } + log.Info("updating service on the host cluster") + return reconcile.Result{}, s.hostClient.Update(ctx, syncedService) } diff --git a/k3k-kubelet/controller/webhook/pod.go b/k3k-kubelet/controller/webhook/pod.go index dd041071..b6464e9e 100644 --- a/k3k-kubelet/controller/webhook/pod.go +++ b/k3k-kubelet/controller/webhook/pod.go @@ -56,6 +56,7 @@ func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient c if err != nil { return err } + if err := handler.client.Create(ctx, config); err != nil { if !apierrors.IsAlreadyExists(err) { return err @@ -70,11 +71,13 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error if !ok { return fmt.Errorf("invalid request: object was type %t not cluster", obj) } + w.logger.Infow("mutator webhook request", "Pod", pod.Name, "Namespace", pod.Namespace) // look for status.* fields in the env if pod.Annotations == nil { pod.Annotations = make(map[string]string) } + for i, container := range pod.Spec.Containers { for j, env := range container.Env { if env.ValueFrom == nil || env.ValueFrom.FieldRef == nil { @@ -89,22 +92,28 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error } } } + return nil } func (w *webhookHandler) configuration(ctx context.Context, hostClient ctrlruntimeclient.Client) (*admissionregistrationv1.MutatingWebhookConfiguration, error) { w.logger.Infow("extracting webhook tls from host cluster") + var ( webhookTLSSecret v1.Secret ) + if err := hostClient.Get(ctx, types.NamespacedName{Name: agent.WebhookSecretName(w.clusterName), Namespace: w.clusterNamespace}, &webhookTLSSecret); err != nil { return nil, err } + caBundle, ok := webhookTLSSecret.Data["ca.crt"] if !ok { return nil, errors.New("webhook CABundle does not exist in secret") } + webhookURL := "https://" + w.serviceName + ":" + webhookPort + webhookPath + return &admissionregistrationv1.MutatingWebhookConfiguration{ TypeMeta: metav1.TypeMeta{ APIVersion: "admissionregistration.k8s.io/v1", @@ -151,10 +160,12 @@ func ParseFieldPathAnnotationKey(annotationKey string) (int, string, error) { if len(s) != 3 { return -1, "", errors.New("fieldpath annotation is not set correctly") } + containerIndex, err := strconv.Atoi(s[1]) if err != nil { return -1, "", err } + envName := s[2] return containerIndex, envName, nil diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index 7770b22f..29ea45ac 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -82,6 +82,7 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet if err != nil { return nil, err } + virtConfig, err := virtRestConfig(ctx, c.VirtualConfigPath, hostClient, c.ClusterName, c.ClusterNamespace, c.Token, logger) if err != nil { return nil, err @@ -110,15 +111,16 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet return nil, errors.New("unable to create controller-runtime mgr for host cluster: " + err.Error()) } - virtualScheme := runtime.NewScheme() // virtual client will only use core types (for now), no need to add anything other than the basics - err = clientgoscheme.AddToScheme(virtualScheme) - if err != nil { + virtualScheme := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(virtualScheme); err != nil { return nil, errors.New("unable to add client go types to virtual cluster scheme: " + err.Error()) } + webhookServer := webhook.NewServer(webhook.Options{ CertDir: "/opt/rancher/k3k-webhook", }) + virtualMgr, err := ctrl.NewManager(virtConfig, manager.Options{ Scheme: virtualScheme, WebhookServer: webhookServer, @@ -129,25 +131,31 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet BindAddress: ":8084", }, }) + if err != nil { return nil, errors.New("unable to create controller-runtime mgr for virtual cluster: " + err.Error()) } + logger.Info("adding pod mutator webhook") + if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.ServiceName, logger); err != nil { return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error()) } logger.Info("adding service syncer controller") + if err := k3kkubeletcontroller.AddServiceSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil { return nil, errors.New("failed to add service syncer controller: " + err.Error()) } logger.Info("adding pvc syncer controller") + if err := k3kkubeletcontroller.AddPVCSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil { return nil, errors.New("failed to add pvc syncer controller: " + err.Error()) } logger.Info("adding pod pvc controller") + if err := k3kkubeletcontroller.AddPodPVCController(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil { return nil, errors.New("failed to add pod pvc controller: " + err.Error()) } @@ -159,6 +167,7 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet // get the cluster's DNS IP to be injected to pods var dnsService v1.Service + dnsName := controller.SafeConcatNameWithPrefix(c.ClusterName, "kube-dns") if err := hostClient.Get(ctx, types.NamespacedName{Name: dnsName, Namespace: c.ClusterNamespace}, &dnsService); err != nil { return nil, errors.New("failed to get the DNS service for the cluster: " + err.Error()) @@ -188,10 +197,16 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet func clusterIP(ctx context.Context, serviceName, clusterNamespace string, hostClient ctrlruntimeclient.Client) (string, error) { var service v1.Service - serviceKey := types.NamespacedName{Namespace: clusterNamespace, Name: serviceName} + + serviceKey := types.NamespacedName{ + Namespace: clusterNamespace, + Name: serviceName, + } + if err := hostClient.Get(ctx, serviceKey, &service); err != nil { return "", err } + return service.Spec.ClusterIP, nil } @@ -200,10 +215,12 @@ func (k *kubelet) registerNode(ctx context.Context, agentIP, srvPort, namespace, nodeOpts := k.nodeOpts(ctx, srvPort, namespace, name, hostname, agentIP) var err error + k.node, err = nodeutil.NewNode(k.name, providerFunc, nodeutil.WithClient(k.virtClient), nodeOpts) if err != nil { return errors.New("unable to start kubelet: " + err.Error()) } + return nil } @@ -236,10 +253,13 @@ func (k *kubelet) start(ctx context.Context) { if err := k.node.WaitReady(context.Background(), time.Minute*1); err != nil { k.logger.Fatalw("node was not ready within timeout of 1 minute", zap.Error(err)) } + <-k.node.Done() + if err := k.node.Err(); err != nil { k.logger.Fatalw("node stopped with an error", zap.Error(err)) } + k.logger.Info("node exited successfully") } @@ -264,13 +284,16 @@ func (k *kubelet) nodeOpts(ctx context.Context, srvPort, namespace, name, hostna if err := nodeutil.AttachProviderRoutes(mux)(c); err != nil { return errors.New("unable to attach routes: " + err.Error()) } + c.Handler = mux tlsConfig, err := loadTLSConfig(ctx, k.hostClient, name, namespace, k.name, hostname, k.token, agentIP) if err != nil { return errors.New("unable to get tls config: " + err.Error()) } + c.TLSConfig = tlsConfig + return nil } } @@ -284,8 +307,11 @@ func virtRestConfig(ctx context.Context, virtualConfigPath string, hostClient ct if err := hostClient.Get(ctx, types.NamespacedName{Namespace: clusterNamespace, Name: clusterName}, &cluster); err != nil { return nil, err } + endpoint := server.ServiceName(cluster.Name) + "." + cluster.Namespace + var b *bootstrap.ControlRuntimeBootstrap + if err := retry.OnError(controller.Backoff, func(err error) bool { return err != nil }, func() error { @@ -296,20 +322,27 @@ func virtRestConfig(ctx context.Context, virtualConfigPath string, hostClient ct }); err != nil { return nil, errors.New("unable to decode bootstrap: " + err.Error()) } + adminCert, adminKey, err := certs.CreateClientCertKey( - controller.AdminCommonName, []string{user.SystemPrivilegedGroup}, - nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, time.Hour*24*time.Duration(356), + controller.AdminCommonName, + []string{user.SystemPrivilegedGroup}, + nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + time.Hour*24*time.Duration(356), b.ClientCA.Content, - b.ClientCAKey.Content) + b.ClientCAKey.Content, + ) + if err != nil { return nil, err } url := fmt.Sprintf("https://%s:%d", server.ServiceName(cluster.Name), server.ServerPort) + kubeconfigData, err := kubeconfigBytes(url, []byte(b.ServerCA.Content), adminCert, adminKey) if err != nil { return nil, err } + return clientcmd.RESTConfigFromKubeConfig(kubeconfigData) } @@ -341,10 +374,13 @@ func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clu cluster v1alpha1.Cluster b *bootstrap.ControlRuntimeBootstrap ) + if err := hostClient.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: clusterNamespace}, &cluster); err != nil { return nil, err } + endpoint := fmt.Sprintf("%s.%s", server.ServiceName(cluster.Name), cluster.Namespace) + if err := retry.OnError(controller.Backoff, func(err error) bool { return err != nil }, func() error { @@ -354,27 +390,34 @@ func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clu }); err != nil { return nil, errors.New("unable to decode bootstrap: " + err.Error()) } + ip := net.ParseIP(agentIP) + altNames := certutil.AltNames{ DNSNames: []string{hostname}, IPs: []net.IP{ip}, } + cert, key, err := certs.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content) if err != nil { return nil, errors.New("unable to get cert and key: " + err.Error()) } + clientCert, err := tls.X509KeyPair(cert, key) if err != nil { return nil, errors.New("unable to get key pair: " + err.Error()) } + // create rootCA CertPool certs, err := certutil.ParseCertsPEM([]byte(b.ServerCA.Content)) if err != nil { return nil, errors.New("unable to create ca certs: " + err.Error()) } + if len(certs) < 1 { return nil, errors.New("ca cert is not parsed correctly") } + pool := x509.NewCertPool() pool.AddCert(certs[0]) diff --git a/k3k-kubelet/main.go b/k3k-kubelet/main.go index 8e17c6f8..4d85af26 100644 --- a/k3k-kubelet/main.go +++ b/k3k-kubelet/main.go @@ -102,9 +102,11 @@ func main() { app.Before = func(clx *cli.Context) error { logger = log.New(debug) ctrlruntimelog.SetLogger(zapr.NewLogger(logger.Desugar().WithOptions(zap.AddCallerSkip(1)))) + return nil } app.Action = run + if err := app.Run(os.Args); err != nil { logrus.Fatal(err) } @@ -112,6 +114,7 @@ func main() { func run(clx *cli.Context) error { ctx := context.Background() + if err := cfg.parse(configFile); err != nil { logger.Fatalw("failed to parse config file", "path", configFile, zap.Error(err)) } @@ -119,6 +122,7 @@ func run(clx *cli.Context) error { if err := cfg.validate(); err != nil { logger.Fatalw("failed to validate config", zap.Error(err)) } + k, err := newKubelet(ctx, &cfg, logger) if err != nil { logger.Fatalw("failed to create new virtual kubelet instance", zap.Error(err)) diff --git a/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go index e67822ff..a6861f71 100644 --- a/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go +++ b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go @@ -107,6 +107,7 @@ func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *compbasemet // custom collector in a way that only collects metrics for active containers. func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) { var errorCount float64 + defer func() { ch <- compbasemetrics.NewLazyConstMetric(resourceScrapeResultDesc, compbasemetrics.GaugeValue, errorCount) }() @@ -121,6 +122,7 @@ func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- compbasemetri rc.collectContainerCPUMetrics(ch, pod, container) rc.collectContainerMemoryMetrics(ch, pod, container) } + rc.collectPodCPUMetrics(ch, pod) rc.collectPodMemoryMetrics(ch, pod) } diff --git a/k3k-kubelet/provider/configure.go b/k3k-kubelet/provider/configure.go index 85ac5295..643f61d1 100644 --- a/k3k-kubelet/provider/configure.go +++ b/k3k-kubelet/provider/configure.go @@ -119,6 +119,7 @@ func updateNodeCapacity(coreClient typedv1.CoreV1Interface, virtualClient client // If some node labels are specified only the matching nodes will be considered. func getResourcesFromNodes(ctx context.Context, coreClient typedv1.CoreV1Interface, nodeLabels map[string]string) (v1.ResourceList, v1.ResourceList, error) { listOpts := metav1.ListOptions{} + if nodeLabels != nil { labelSelector := metav1.LabelSelector{MatchLabels: nodeLabels} listOpts.LabelSelector = labels.Set(labelSelector.MatchLabels).String() @@ -134,7 +135,6 @@ func getResourcesFromNodes(ctx context.Context, coreClient typedv1.CoreV1Interfa virtualAvailableResources := corev1.ResourceList{} for _, node := range nodeList.Items { - // check if the node is Ready for _, condition := range node.Status.Conditions { if condition.Type != corev1.NodeReady { diff --git a/k3k-kubelet/provider/provider.go b/k3k-kubelet/provider/provider.go index 2be518f7..732d619c 100644 --- a/k3k-kubelet/provider/provider.go +++ b/k3k-kubelet/provider/provider.go @@ -110,24 +110,30 @@ func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, con Follow: opts.Follow, Previous: opts.Previous, } + if opts.Tail != 0 { tailLines := int64(opts.Tail) options.TailLines = &tailLines } + if opts.LimitBytes != 0 { limitBytes := int64(opts.LimitBytes) options.LimitBytes = &limitBytes } + if opts.SinceSeconds != 0 { sinceSeconds := int64(opts.SinceSeconds) options.SinceSeconds = &sinceSeconds } + if !opts.SinceTime.IsZero() { sinceTime := metav1.NewTime(opts.SinceTime) options.SinceTime = &sinceTime } + closer, err := p.CoreClient.Pods(p.ClusterNamespace).GetLogs(hostPodName, &options).Stream(ctx) p.logger.Infof("got error %s when getting logs for %s in %s", err, hostPodName, p.ClusterNamespace) + return closer, err } @@ -148,10 +154,12 @@ func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, conta Stdout: attach.Stdout() != nil, Stderr: attach.Stderr() != nil, }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL()) if err != nil { return err } + return exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: attach.Stdin(), Stdout: attach.Stdout(), @@ -179,10 +187,12 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co Stdout: attach.Stdout() != nil, Stderr: attach.Stderr() != nil, }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL()) if err != nil { return err } + return exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: attach.Stdin(), Stdout: attach.Stdout(), @@ -204,8 +214,10 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, } // fetch the stats from all the nodes - var nodeStats statsv1alpha1.NodeStats - var allPodsStats []statsv1alpha1.PodStats + var ( + nodeStats statsv1alpha1.NodeStats + allPodsStats []statsv1alpha1.PodStats + ) for _, n := range nodeList.Items { res, err := p.CoreClient.RESTClient(). @@ -240,6 +252,7 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, } podsNameMap := make(map[string]*v1.Pod) + for _, pod := range pods { hostPodName := p.Translator.TranslateName(pod.Namespace, pod.Name) podsNameMap[hostPodName] = pod @@ -284,6 +297,7 @@ func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, if err != nil { return nil, errors.Join(err, errors.New("error gathering metrics from collector")) } + return metricFamily, nil } @@ -300,9 +314,9 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port if err != nil { return err } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL()) portAsString := strconv.Itoa(int(port)) - readyChannel := make(chan struct{}) stopChannel := make(chan struct{}, 1) @@ -333,7 +347,9 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error { Namespace: p.ClusterNamespace, Name: p.ClusterName, } + var cluster v1alpha1.Cluster + if err := p.HostClient.Get(ctx, clusterKey, &cluster); err != nil { return fmt.Errorf("unable to get cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err) } @@ -391,7 +407,9 @@ func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *v1.Po interval = 2 * time.Second timeout = 10 * time.Second ) + var allErrors error + // retryFn will retry until the operation succeed, or the timeout occurs retryFn := func(ctx context.Context) (bool, error) { if lastErr := f(ctx, pod); lastErr != nil { @@ -399,11 +417,14 @@ func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *v1.Po allErrors = errors.Join(allErrors, lastErr) return false, nil } + return true, nil } + if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, retryFn); err != nil { return errors.Join(allErrors, ErrRetryTimeout) } + return nil } @@ -412,6 +433,7 @@ func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *v1.Po func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, volumes []corev1.Volume) error { for _, volume := range volumes { var optional bool + if strings.HasPrefix(volume.Name, kubeAPIAccessPrefix) { continue } @@ -420,17 +442,21 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo if volume.ConfigMap.Optional != nil { optional = *volume.ConfigMap.Optional } + if err := p.syncConfigmap(ctx, podNamespace, volume.ConfigMap.Name, optional); err != nil { return fmt.Errorf("unable to sync configmap volume %s: %w", volume.Name, err) } + volume.ConfigMap.Name = p.Translator.TranslateName(podNamespace, volume.ConfigMap.Name) } else if volume.Secret != nil { if volume.Secret.Optional != nil { optional = *volume.Secret.Optional } + if err := p.syncSecret(ctx, podNamespace, volume.Secret.SecretName, optional); err != nil { return fmt.Errorf("unable to sync secret volume %s: %w", volume.Name, err) } + volume.Secret.SecretName = p.Translator.TranslateName(podNamespace, volume.Secret.SecretName) } else if volume.Projected != nil { for _, source := range volume.Projected.Sources { @@ -438,15 +464,18 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo if source.ConfigMap.Optional != nil { optional = *source.ConfigMap.Optional } + configMapName := source.ConfigMap.Name if err := p.syncConfigmap(ctx, podNamespace, configMapName, optional); err != nil { return fmt.Errorf("unable to sync projected configmap %s: %w", configMapName, err) } + source.ConfigMap.Name = p.Translator.TranslateName(podNamespace, configMapName) } else if source.Secret != nil { if source.Secret.Optional != nil { optional = *source.Secret.Optional } + secretName := source.Secret.Name if err := p.syncSecret(ctx, podNamespace, secretName, optional); err != nil { return fmt.Errorf("unable to sync projected secret %s: %w", secretName, err) @@ -460,56 +489,65 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo if downwardAPI.FieldRef.FieldPath == translate.MetadataNameField { downwardAPI.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.ResourceNameAnnotation) } + if downwardAPI.FieldRef.FieldPath == translate.MetadataNamespaceField { downwardAPI.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.ResourceNamespaceAnnotation) } } } } + return nil } // syncConfigmap will add the configmap object to the queue of the syncer controller to be synced to the host cluster func (p *Provider) syncConfigmap(ctx context.Context, podNamespace string, configMapName string, optional bool) error { var configMap corev1.ConfigMap + nsName := types.NamespacedName{ Namespace: podNamespace, Name: configMapName, } - err := p.VirtualClient.Get(ctx, nsName, &configMap) - if err != nil { + + if err := p.VirtualClient.Get(ctx, nsName, &configMap); err != nil { // check if its optional configmap if apierrors.IsNotFound(err) && optional { return nil } + return fmt.Errorf("unable to get configmap to sync %s/%s: %w", nsName.Namespace, nsName.Name, err) } - err = p.Handler.AddResource(ctx, &configMap) - if err != nil { + + if err := p.Handler.AddResource(ctx, &configMap); err != nil { return fmt.Errorf("unable to add configmap to sync %s/%s: %w", nsName.Namespace, nsName.Name, err) } + return nil } // syncSecret will add the secret object to the queue of the syncer controller to be synced to the host cluster func (p *Provider) syncSecret(ctx context.Context, podNamespace string, secretName string, optional bool) error { p.logger.Infow("Syncing secret", "Name", secretName, "Namespace", podNamespace, "optional", optional) + var secret corev1.Secret + nsName := types.NamespacedName{ Namespace: podNamespace, Name: secretName, } - err := p.VirtualClient.Get(ctx, nsName, &secret) - if err != nil { + + if err := p.VirtualClient.Get(ctx, nsName, &secret); err != nil { if apierrors.IsNotFound(err) && optional { return nil } + return fmt.Errorf("unable to get secret to sync %s/%s: %w", nsName.Namespace, nsName.Name, err) } - err = p.Handler.AddResource(ctx, &secret) - if err != nil { + + if err := p.Handler.AddResource(ctx, &secret); err != nil { return fmt.Errorf("unable to add secret to sync %s/%s: %w", nsName.Namespace, nsName.Name, err) } + return nil } @@ -599,16 +637,20 @@ func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error { func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error { p.logger.Infof("Got request to delete pod %s", pod.Name) hostName := p.Translator.TranslateName(pod.Namespace, pod.Name) + err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostName, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("unable to delete pod %s/%s: %w", pod.Namespace, pod.Name, err) } + if err = p.pruneUnusedVolumes(ctx, pod); err != nil { // note that we don't return an error here. The pod was successfully deleted, another process // should clean this without affecting the user p.logger.Errorf("failed to prune leftover volumes for %s/%s: %w, resources may be left", pod.Namespace, pod.Name, err) } + p.logger.Infof("Deleted pod %s", pod.Name) + return nil } @@ -619,6 +661,7 @@ func (p *Provider) pruneUnusedVolumes(ctx context.Context, pod *corev1.Pod) erro // for pruning pruneSecrets := sets.Set[string]{}.Insert(rawSecrets...) pruneConfigMap := sets.Set[string]{}.Insert(rawConfigMaps...) + var pods corev1.PodList // only pods in the same namespace could be using secrets/configmaps that this pod is using err := p.VirtualClient.List(ctx, &pods, &client.ListOptions{ @@ -627,35 +670,43 @@ func (p *Provider) pruneUnusedVolumes(ctx context.Context, pod *corev1.Pod) erro if err != nil { return fmt.Errorf("unable to list pods: %w", err) } + for _, vPod := range pods.Items { if vPod.Name == pod.Name { continue } + secrets, configMaps := getSecretsAndConfigmaps(&vPod) pruneSecrets.Delete(secrets...) pruneConfigMap.Delete(configMaps...) } + for _, secretName := range pruneSecrets.UnsortedList() { var secret corev1.Secret - err := p.VirtualClient.Get(ctx, types.NamespacedName{ + + key := types.NamespacedName{ Name: secretName, Namespace: pod.Namespace, - }, &secret) - if err != nil { + } + + if err := p.VirtualClient.Get(ctx, key, &secret); err != nil { return fmt.Errorf("unable to get secret %s/%s for pod volume: %w", pod.Namespace, secretName, err) } - err = p.Handler.RemoveResource(ctx, &secret) - if err != nil { + + if err = p.Handler.RemoveResource(ctx, &secret); err != nil { return fmt.Errorf("unable to remove secret %s/%s for pod volume: %w", pod.Namespace, secretName, err) } } + for _, configMapName := range pruneConfigMap.UnsortedList() { var configMap corev1.ConfigMap - err := p.VirtualClient.Get(ctx, types.NamespacedName{ + + key := types.NamespacedName{ Name: configMapName, Namespace: pod.Namespace, - }, &configMap) - if err != nil { + } + + if err := p.VirtualClient.Get(ctx, key, &configMap); err != nil { return fmt.Errorf("unable to get configMap %s/%s for pod volume: %w", pod.Namespace, configMapName, err) } @@ -663,6 +714,7 @@ func (p *Provider) pruneUnusedVolumes(ctx context.Context, pod *corev1.Pod) erro return fmt.Errorf("unable to remove configMap %s/%s for pod volume: %w", pod.Namespace, configMapName, err) } } + return nil } @@ -676,12 +728,15 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1. Namespace: p.ClusterNamespace, Name: p.Translator.TranslateName(namespace, name), } + var pod corev1.Pod - err := p.HostClient.Get(ctx, hostNamespaceName, &pod) - if err != nil { + + if err := p.HostClient.Get(ctx, hostNamespaceName, &pod); err != nil { return nil, fmt.Errorf("error when retrieving pod: %w", err) } + p.Translator.TranslateFrom(&pod) + return &pod, nil } @@ -691,11 +746,14 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1. // to return a version after DeepCopy. func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) { p.logger.Debugw("got a request for pod status", "Namespace", namespace, "Name", name) + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, fmt.Errorf("unable to get pod for status: %w", err) } + p.logger.Debugw("got pod status", "Namespace", namespace, "Name", name, "Status", pod.Status) + return pod.Status.DeepCopy(), nil } @@ -705,21 +763,28 @@ func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*c // to return a version after DeepCopy. func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) { selector := labels.NewSelector() + requirement, err := labels.NewRequirement(translate.ClusterNameLabel, selection.Equals, []string{p.ClusterName}) if err != nil { return nil, fmt.Errorf("unable to create label selector: %w", err) } + selector = selector.Add(*requirement) + var podList corev1.PodList err = p.HostClient.List(ctx, &podList, &client.ListOptions{LabelSelector: selector}) + if err != nil { return nil, fmt.Errorf("unable to list pods: %w", err) } + retPods := []*corev1.Pod{} + for _, pod := range podList.DeepCopy().Items { p.Translator.TranslateFrom(&pod) retPods = append(retPods, &pod) } + return retPods, nil } @@ -773,7 +838,6 @@ func configureNetworking(pod *corev1.Pod, podName, podNamespace, serverIP, dnsIP for i := range pod.Spec.InitContainers { pod.Spec.InitContainers[i].Env = overrideEnvVars(pod.Spec.InitContainers[i].Env, updatedEnvVars) } - } // overrideEnvVars will override the orig environment variables if found in the updated list @@ -800,8 +864,11 @@ func overrideEnvVars(orig, updated []corev1.EnvVar) []corev1.EnvVar { // getSecretsAndConfigmaps retrieves a list of all secrets/configmaps that are in use by a given pod. Useful // for removing/seeing which virtual cluster resources need to be in the host cluster. func getSecretsAndConfigmaps(pod *corev1.Pod) ([]string, []string) { - var secrets []string - var configMaps []string + var ( + secrets []string + configMaps []string + ) + for _, volume := range pod.Spec.Volumes { if volume.Secret != nil { secrets = append(secrets, volume.Secret.SecretName) @@ -817,6 +884,7 @@ func getSecretsAndConfigmaps(pod *corev1.Pod) ([]string, []string) { } } } + return secrets, configMaps } @@ -837,28 +905,33 @@ func (p *Provider) configureFieldPathEnv(pod, tPod *v1.Pod) error { envVar.ValueFrom.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.ResourceNameAnnotation) pod.Spec.InitContainers[i].Env[j] = envVar } + if fieldPath == translate.MetadataNamespaceField { envVar.ValueFrom.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.MetadataNamespaceField) pod.Spec.InitContainers[i].Env[j] = envVar } } } + for i, container := range pod.Spec.Containers { for j, envVar := range container.Env { if envVar.ValueFrom == nil || envVar.ValueFrom.FieldRef == nil { continue } + fieldPath := envVar.ValueFrom.FieldRef.FieldPath if fieldPath == translate.MetadataNameField { envVar.ValueFrom.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.ResourceNameAnnotation) pod.Spec.Containers[i].Env[j] = envVar } + if fieldPath == translate.MetadataNamespaceField { envVar.ValueFrom.FieldRef.FieldPath = fmt.Sprintf("metadata.annotations['%s']", translate.ResourceNameAnnotation) pod.Spec.Containers[i].Env[j] = envVar } } } + for name, value := range pod.Annotations { if strings.Contains(name, webhook.FieldpathField) { containerIndex, envName, err := webhook.ParseFieldPathAnnotationKey(name) @@ -878,5 +951,6 @@ func (p *Provider) configureFieldPathEnv(pod, tPod *v1.Pod) error { delete(tPod.Annotations, name) } } + return nil } diff --git a/k3k-kubelet/provider/provider_test.go b/k3k-kubelet/provider/provider_test.go index 1fc63679..051b64ab 100644 --- a/k3k-kubelet/provider/provider_test.go +++ b/k3k-kubelet/provider/provider_test.go @@ -13,6 +13,7 @@ func Test_overrideEnvVars(t *testing.T) { orig []corev1.EnvVar new []corev1.EnvVar } + tests := []struct { name string args args diff --git a/k3k-kubelet/provider/token.go b/k3k-kubelet/provider/token.go index 8d1e1ed2..4fd17c8c 100644 --- a/k3k-kubelet/provider/token.go +++ b/k3k-kubelet/provider/token.go @@ -30,12 +30,14 @@ func (p *Provider) transformTokens(ctx context.Context, pod, tPod *corev1.Pod) e } virtualSecretName := k3kcontroller.SafeConcatNameWithPrefix(pod.Spec.ServiceAccountName, "token") + virtualSecret := virtualSecret(virtualSecretName, pod.Namespace, pod.Spec.ServiceAccountName) if err := p.VirtualClient.Create(ctx, virtualSecret); err != nil { if !apierrors.IsAlreadyExists(err) { return err } } + // extracting the tokens data from the secret we just created virtualSecretKey := types.NamespacedName{ Name: virtualSecret.Name, @@ -49,9 +51,11 @@ func (p *Provider) transformTokens(ctx context.Context, pod, tPod *corev1.Pod) e if len(virtualSecret.Data) < 3 { return fmt.Errorf("token secret %s/%s data is empty", virtualSecret.Namespace, virtualSecret.Name) } + hostSecret := virtualSecret.DeepCopy() hostSecret.Type = "" hostSecret.Annotations = make(map[string]string) + p.Translator.TranslateTo(hostSecret) if err := p.HostClient.Create(ctx, hostSecret); err != nil { @@ -59,7 +63,9 @@ func (p *Provider) transformTokens(ctx context.Context, pod, tPod *corev1.Pod) e return err } } + p.translateToken(tPod, hostSecret.Name) + return nil } @@ -96,6 +102,7 @@ func isKubeAccessVolumeFound(pod *corev1.Pod) bool { return true } } + return false } diff --git a/k3k-kubelet/provider/util.go b/k3k-kubelet/provider/util.go index 473b1306..8fe53021 100644 --- a/k3k-kubelet/provider/util.go +++ b/k3k-kubelet/provider/util.go @@ -17,9 +17,9 @@ func (t *translatorSizeQueue) Next() *remotecommand.TerminalSize { if !ok { return nil } - newSize := remotecommand.TerminalSize{ + + return &remotecommand.TerminalSize{ Width: size.Width, Height: size.Height, } - return &newSize } diff --git a/k3k-kubelet/translate/host.go b/k3k-kubelet/translate/host.go index 3ec372f4..31bcb262 100644 --- a/k3k-kubelet/translate/host.go +++ b/k3k-kubelet/translate/host.go @@ -45,14 +45,17 @@ func (t *ToHostTranslator) TranslateTo(obj client.Object) { if annotations == nil { annotations = map[string]string{} } + annotations[ResourceNameAnnotation] = obj.GetName() annotations[ResourceNamespaceAnnotation] = obj.GetNamespace() obj.SetAnnotations(annotations) + // add a label to quickly identify objects owned by a given virtual cluster labels := obj.GetLabels() if labels == nil { labels = map[string]string{} } + labels[ClusterNameLabel] = t.ClusterName obj.SetLabels(labels) @@ -77,6 +80,7 @@ func (t *ToHostTranslator) TranslateFrom(obj client.Object) { // In this case, we need to have some sort of fallback or error return name := annotations[ResourceNameAnnotation] namespace := annotations[ResourceNamespaceAnnotation] + obj.SetName(name) obj.SetNamespace(namespace) delete(annotations, ResourceNameAnnotation) @@ -91,7 +95,6 @@ func (t *ToHostTranslator) TranslateFrom(obj client.Object) { // resource version/UID won't match what's in the virtual cluster. obj.SetResourceVersion("") obj.SetUID("") - } // TranslateName returns the name of the resource in the host cluster. Will not update the object with this name. @@ -106,5 +109,6 @@ func (t *ToHostTranslator) TranslateName(namespace string, name string) string { nameKey := fmt.Sprintf("%s+%s+%s", name, namespace, t.ClusterName) // it's possible that the suffix will be in the name, so we use hex to make it valid for k8s nameSuffix := hex.EncodeToString([]byte(nameKey)) + return controller.SafeConcatName(namePrefix, nameSuffix) } diff --git a/main.go b/main.go index 1458d7be..8c88741c 100644 --- a/main.go +++ b/main.go @@ -82,9 +82,12 @@ func main() { if err := validate(); err != nil { return err } + logger = log.New(debug) + return nil } + if err := app.Run(os.Args); err != nil { logger.Fatalw("failed to run k3k controller", zap.Error(err)) } @@ -111,22 +114,26 @@ func run(clx *cli.Context) error { ctrlruntimelog.SetLogger(zapr.NewLogger(logger.Desugar().WithOptions(zap.AddCallerSkip(1)))) logger.Info("adding cluster controller") + if err := cluster.Add(ctx, mgr, sharedAgentImage, sharedAgentImagePullPolicy); err != nil { return fmt.Errorf("failed to add the new cluster controller: %v", err) } logger.Info("adding etcd pod controller") + if err := cluster.AddPodController(ctx, mgr); err != nil { return fmt.Errorf("failed to add the new cluster controller: %v", err) } logger.Info("adding clusterset controller") + if err := clusterset.Add(ctx, mgr, clusterCIDR); err != nil { return fmt.Errorf("failed to add the clusterset controller: %v", err) } if clusterCIDR == "" { logger.Info("adding networkpolicy node controller") + if err := clusterset.AddNodeController(ctx, mgr); err != nil { return fmt.Errorf("failed to add the clusterset node controller: %v", err) } @@ -147,5 +154,6 @@ func validate() error { return errors.New("invalid value for shared agent image policy") } } + return nil } diff --git a/pkg/apis/k3k.io/v1alpha1/register.go b/pkg/apis/k3k.io/v1alpha1/register.go index 6fb0e830..6f531dd5 100644 --- a/pkg/apis/k3k.io/v1alpha1/register.go +++ b/pkg/apis/k3k.io/v1alpha1/register.go @@ -25,5 +25,6 @@ func addKnownTypes(s *runtime.Scheme) error { &ClusterSetList{}, ) metav1.AddToGroupVersion(s, SchemeGroupVersion) + return nil } diff --git a/pkg/controller/certs/certs.go b/pkg/controller/certs/certs.go index 0e3cdc09..9ca8f206 100644 --- a/pkg/controller/certs/certs.go +++ b/pkg/controller/certs/certs.go @@ -40,6 +40,7 @@ func CreateClientCertKey(commonName string, organization []string, altNames *cer if altNames != nil { cfg.AltNames = *altNames } + cert, err := certutil.NewSignedCert(cfg, key.(crypto.Signer), caCertPEM[0], caKeyPEM.(crypto.Signer)) if err != nil { return nil, nil, err @@ -59,6 +60,7 @@ func generateKey() (data []byte, err error) { func AddSANs(sans []string) certutil.AltNames { var altNames certutil.AltNames + for _, san := range sans { ip := net.ParseIP(san) if ip == nil { @@ -67,5 +69,6 @@ func AddSANs(sans []string) certutil.AltNames { altNames.IPs = append(altNames.IPs, ip) } } + return altNames } diff --git a/pkg/controller/cluster/agent/shared.go b/pkg/controller/cluster/agent/shared.go index 978d25e1..c3f3dcf2 100644 --- a/pkg/controller/cluster/agent/shared.go +++ b/pkg/controller/cluster/agent/shared.go @@ -96,6 +96,7 @@ func sharedAgentData(cluster *v1alpha1.Cluster, serviceName, token, ip string) s if cluster.Spec.Version == "" { version = cluster.Status.HostVersion } + return fmt.Sprintf(`clusterName: %s clusterNamespace: %s serverIP: %s @@ -139,7 +140,6 @@ func (s *SharedAgent) daemonset(ctx context.Context) error { } func (s *SharedAgent) podSpec() v1.PodSpec { - var limit v1.ResourceList return v1.PodSpec{ ServiceAccountName: s.Name(), NodeSelector: s.cluster.Spec.NodeSelector, @@ -187,7 +187,7 @@ func (s *SharedAgent) podSpec() v1.PodSpec { Image: s.image, ImagePullPolicy: v1.PullPolicy(s.imagePullPolicy), Resources: v1.ResourceRequirements{ - Limits: limit, + Limits: v1.ResourceList{}, }, Args: []string{ "--config", @@ -406,6 +406,7 @@ func (s *SharedAgent) webhookTLS(ctx context.Context) error { } altNames := []string{s.Name(), s.cluster.Name} + webhookCert, webhookKey, err := newWebhookCerts(s.Name(), altNames, caPrivateKeyPEM, caCertPEM) if err != nil { return err diff --git a/pkg/controller/cluster/agent/shared_test.go b/pkg/controller/cluster/agent/shared_test.go index f3bf9ac2..6484ab67 100644 --- a/pkg/controller/cluster/agent/shared_test.go +++ b/pkg/controller/cluster/agent/shared_test.go @@ -16,6 +16,7 @@ func Test_sharedAgentData(t *testing.T) { ip string token string } + tests := []struct { name string args args @@ -100,6 +101,7 @@ func Test_sharedAgentData(t *testing.T) { }, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { config := sharedAgentData(tt.args.cluster, tt.args.serviceName, tt.args.token, tt.args.ip) diff --git a/pkg/controller/cluster/agent/virtual.go b/pkg/controller/cluster/agent/virtual.go index 2294ed1c..3f1fb394 100644 --- a/pkg/controller/cluster/agent/virtual.go +++ b/pkg/controller/cluster/agent/virtual.go @@ -81,6 +81,7 @@ func (v *VirtualAgent) deployment(ctx context.Context) error { image := controller.K3SImage(v.cluster) const name = "k3k-agent" + selector := metav1.LabelSelector{ MatchLabels: map[string]string{ "cluster": v.cluster.Name, @@ -116,7 +117,9 @@ func (v *VirtualAgent) deployment(ctx context.Context) error { func (v *VirtualAgent) podSpec(image, name string, args []string, affinitySelector *metav1.LabelSelector) v1.PodSpec { var limit v1.ResourceList + args = append([]string{"agent", "--config", "/opt/rancher/k3s/config.yaml"}, args...) + podSpec := v1.PodSpec{ Volumes: []v1.Volume{ { diff --git a/pkg/controller/cluster/agent/virtual_test.go b/pkg/controller/cluster/agent/virtual_test.go index 350ccd52..886349d7 100644 --- a/pkg/controller/cluster/agent/virtual_test.go +++ b/pkg/controller/cluster/agent/virtual_test.go @@ -12,6 +12,7 @@ func Test_virtualAgentData(t *testing.T) { serviceIP string token string } + tests := []struct { name string args args @@ -30,6 +31,7 @@ func Test_virtualAgentData(t *testing.T) { }, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { config := virtualAgentData(tt.args.serviceIP, tt.args.token) diff --git a/pkg/controller/cluster/cluster.go b/pkg/controller/cluster/cluster.go index 86d376d8..a2beb15d 100644 --- a/pkg/controller/cluster/cluster.go +++ b/pkg/controller/cluster/cluster.go @@ -184,9 +184,12 @@ func (c *ClusterReconciler) reconcileCluster(ctx context.Context, cluster *v1alp // in shared mode try to lookup the serviceCIDR if cluster.Spec.Mode == v1alpha1.SharedClusterMode { log.Info("looking up Service CIDR for shared mode") + cluster.Status.ServiceCIDR, err = c.lookupServiceCIDR(ctx) + if err != nil { log.Error(err, "error while looking up Cluster Service CIDR") + cluster.Status.ServiceCIDR = defaultSharedServiceCIDR } } @@ -194,6 +197,7 @@ func (c *ClusterReconciler) reconcileCluster(ctx context.Context, cluster *v1alp // in virtual mode assign a default serviceCIDR if cluster.Spec.Mode == v1alpha1.VirtualClusterMode { log.Info("assign default service CIDR for virtual mode") + cluster.Status.ServiceCIDR = defaultVirtualServiceCIDR } } @@ -202,6 +206,7 @@ func (c *ClusterReconciler) reconcileCluster(ctx context.Context, cluster *v1alp if err != nil { return err } + serviceIP := service.Spec.ClusterIP if err := c.createClusterConfigs(ctx, cluster, s, serviceIP); err != nil { @@ -252,8 +257,10 @@ func (c *ClusterReconciler) ensureBootstrapSecret(ctx context.Context, cluster * bootstrapSecret.Data = map[string][]byte{ "bootstrap": bootstrapData, } + return nil }) + return err } @@ -279,9 +286,11 @@ func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v if err != nil { return err } + if err := controllerutil.SetControllerReference(cluster, serverConfig, c.Scheme); err != nil { return err } + if err := c.Client.Create(ctx, serverConfig); err != nil { if !apierrors.IsAlreadyExists(err) { return err @@ -304,8 +313,10 @@ func (c *ClusterReconciler) ensureClusterService(ctx context.Context, cluster *v } currentService.Spec = expectedService.Spec + return nil }) + if err != nil { return nil, err } @@ -341,6 +352,7 @@ func (c *ClusterReconciler) ensureIngress(ctx context.Context, cluster *v1alpha1 return nil }) + if err != nil { return err } @@ -361,6 +373,7 @@ func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluste if err := controllerutil.SetControllerReference(cluster, serverStatefulService, c.Scheme); err != nil { return err } + if err := c.Client.Create(ctx, serverStatefulService); err != nil { if !apierrors.IsAlreadyExists(err) { return err @@ -371,12 +384,15 @@ func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluste if err != nil { return err } + currentServerStatefulSet := expectedServerStatefulSet.DeepCopy() result, err := controllerutil.CreateOrUpdate(ctx, c.Client, currentServerStatefulSet, func() error { if err := controllerutil.SetControllerReference(cluster, currentServerStatefulSet, c.Scheme); err != nil { return err } + currentServerStatefulSet.Spec = expectedServerStatefulSet.Spec + return nil }) @@ -397,6 +413,7 @@ func (c *ClusterReconciler) bindNodeProxyClusterRole(ctx context.Context, cluste subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName) found := false + for _, subject := range clusterRoleBinding.Subjects { if subject.Name == subjectName && subject.Namespace == cluster.Namespace { found = true @@ -431,6 +448,7 @@ func (c *ClusterReconciler) validate(cluster *v1alpha1.Cluster) error { if cluster.Name == ClusterInvalidName { return errors.New("invalid cluster name " + cluster.Name + " no action will be taken") } + return nil } @@ -462,6 +480,7 @@ func (c *ClusterReconciler) lookupServiceCIDR(ctx context.Context) (string, erro if err != nil { return "", err } + return serviceCIDRAddr.String(), nil } } @@ -499,11 +518,13 @@ func (c *ClusterReconciler) lookupServiceCIDR(ctx context.Context) (string, erro log.Error(err, "serviceCIDR is not valid") break } + return serviceCIDRAddr.String(), nil } } } log.Info("cannot find serviceCIDR from lookup") + return "", nil } diff --git a/pkg/controller/cluster/cluster_finalize.go b/pkg/controller/cluster/cluster_finalize.go index 99c1c651..8b39a14b 100644 --- a/pkg/controller/cluster/cluster_finalize.go +++ b/pkg/controller/cluster/cluster_finalize.go @@ -34,6 +34,7 @@ func (c *ClusterReconciler) finalizeCluster(ctx context.Context, cluster v1alpha for _, pod := range podList.Items { if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) { controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName) + if err := c.Client.Update(ctx, &pod); err != nil { return reconcile.Result{}, err } @@ -47,10 +48,12 @@ func (c *ClusterReconciler) finalizeCluster(ctx context.Context, cluster v1alpha if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) { // remove finalizer from the cluster and update it. controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName) + if err := c.Client.Update(ctx, &cluster); err != nil { return reconcile.Result{}, err } } + return reconcile.Result{}, nil } @@ -63,6 +66,7 @@ func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, clus subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName) var cleanedSubjects []rbacv1.Subject + for _, subject := range clusterRoleBinding.Subjects { if subject.Name != subjectName || subject.Namespace != cluster.Namespace { cleanedSubjects = append(cleanedSubjects, subject) @@ -75,5 +79,6 @@ func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, clus } clusterRoleBinding.Subjects = cleanedSubjects + return c.Client.Update(ctx, clusterRoleBinding) } diff --git a/pkg/controller/cluster/pod.go b/pkg/controller/cluster/pod.go index 820b7626..7cd996bf 100644 --- a/pkg/controller/cluster/pod.go +++ b/pkg/controller/cluster/pod.go @@ -66,16 +66,20 @@ func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r if len(s) < 1 { return reconcile.Result{}, nil } + if s[0] != "k3k" { return reconcile.Result{}, nil } + clusterName := s[1] + var cluster v1alpha1.Cluster if err := p.Client.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: req.Namespace}, &cluster); err != nil { if !apierrors.IsNotFound(err) { return reconcile.Result{}, err } } + matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"}) listOpts := &ctrlruntimeclient.ListOptions{Namespace: req.Namespace} matchingLabels.ApplyToList(listOpts) @@ -84,14 +88,17 @@ func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r if err := p.Client.List(ctx, &podList, listOpts); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } + if len(podList.Items) == 1 { return reconcile.Result{}, nil } + for _, pod := range podList.Items { if err := p.handleServerPod(ctx, cluster, &pod); err != nil { return reconcile.Result{}, err } } + return reconcile.Result{}, nil } @@ -115,16 +122,20 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl if cluster.Name == "" { if controllerutil.ContainsFinalizer(pod, etcdPodFinalizerName) { controllerutil.RemoveFinalizer(pod, etcdPodFinalizerName) + if err := p.Client.Update(ctx, pod); err != nil { return err } } + return nil } + tlsConfig, err := p.getETCDTLS(ctx, &cluster) if err != nil { return err } + // remove server from etcd client, err := clientv3.New(clientv3.Config{ Endpoints: []string{ @@ -143,11 +154,13 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl // remove our finalizer from the list and update it. if controllerutil.ContainsFinalizer(pod, etcdPodFinalizerName) { controllerutil.RemoveFinalizer(pod, etcdPodFinalizerName) + if err := p.Client.Update(ctx, pod); err != nil { return err } } } + if !controllerutil.ContainsFinalizer(pod, etcdPodFinalizerName) { controllerutil.AddFinalizer(pod, etcdPodFinalizerName) return p.Client.Update(ctx, pod) @@ -164,9 +177,11 @@ func (p *PodReconciler) getETCDTLS(ctx context.Context, cluster *v1alpha1.Cluste if err != nil { return nil, err } + endpoint := server.ServiceName(cluster.Name) + "." + cluster.Namespace var b *bootstrap.ControlRuntimeBootstrap + if err := retry.OnError(k3kcontroller.Backoff, func(err error) bool { return true }, func() error { @@ -181,6 +196,7 @@ func (p *PodReconciler) getETCDTLS(ctx context.Context, cluster *v1alpha1.Cluste if err != nil { return nil, err } + clientCert, err := tls.X509KeyPair(etcdCert, etcdKey) if err != nil { return nil, err @@ -190,6 +206,7 @@ func (p *PodReconciler) getETCDTLS(ctx context.Context, cluster *v1alpha1.Cluste if err != nil { return nil, err } + pool := x509.NewCertPool() pool.AddCert(cert[0]) @@ -206,6 +223,7 @@ func removePeer(ctx context.Context, client *clientv3.Client, name, address stri ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout) defer cancel() + members, err := client.MemberList(ctx) if err != nil { return err @@ -215,6 +233,7 @@ func removePeer(ctx context.Context, client *clientv3.Client, name, address stri if !strings.Contains(member.Name, name) { continue } + for _, peerURL := range member.PeerURLs { u, err := url.Parse(peerURL) if err != nil { @@ -224,9 +243,11 @@ func removePeer(ctx context.Context, client *clientv3.Client, name, address stri if u.Hostname() == address { log.Info("removing member from etcd", "name", member.Name, "id", member.ID, "address", address) _, err := client.MemberRemove(ctx, member.ID) + if errors.Is(err, rpctypes.ErrGRPCMemberNotFound) { return nil } + return err } } @@ -237,18 +258,23 @@ func removePeer(ctx context.Context, client *clientv3.Client, name, address stri func (p *PodReconciler) clusterToken(ctx context.Context, cluster *v1alpha1.Cluster) (string, error) { var tokenSecret v1.Secret + nn := types.NamespacedName{ Name: TokenSecretName(cluster.Name), Namespace: cluster.Namespace, } + if cluster.Spec.TokenSecretRef != nil { nn.Name = TokenSecretName(cluster.Name) } + if err := p.Client.Get(ctx, nn, &tokenSecret); err != nil { return "", err } + if _, ok := tokenSecret.Data["token"]; !ok { return "", fmt.Errorf("no token field in secret %s/%s", nn.Namespace, nn.Name) } + return string(tokenSecret.Data["token"]), nil } diff --git a/pkg/controller/cluster/server/bootstrap/bootstrap.go b/pkg/controller/cluster/server/bootstrap/bootstrap.go index 0c7defe3..6b1c48a3 100644 --- a/pkg/controller/cluster/server/bootstrap/bootstrap.go +++ b/pkg/controller/cluster/server/bootstrap/bootstrap.go @@ -45,7 +45,6 @@ func GenerateBootstrapData(ctx context.Context, cluster *v1alpha1.Cluster, ip, t } return json.Marshal(bootstrap) - } func requestBootstrap(token, serverIP string) (*ControlRuntimeBootstrap, error) { @@ -64,6 +63,7 @@ func requestBootstrap(token, serverIP string) (*ControlRuntimeBootstrap, error) if err != nil { return nil, err } + req.Header.Add("Authorization", "Basic "+basicAuth("server", token)) resp, err := client.Do(req) @@ -91,6 +91,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ClientCA.Content = string(decoded) //client-ca-key @@ -98,6 +99,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ClientCAKey.Content = string(decoded) //server-ca @@ -105,6 +107,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ServerCA.Content = string(decoded) //server-ca-key @@ -112,6 +115,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ServerCAKey.Content = string(decoded) //etcd-ca @@ -119,6 +123,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ETCDServerCA.Content = string(decoded) //etcd-ca-key @@ -126,6 +131,7 @@ func decodeBootstrap(bootstrap *ControlRuntimeBootstrap) error { if err != nil { return err } + bootstrap.ETCDServerCAKey.Content = string(decoded) return nil @@ -162,5 +168,6 @@ func GetFromSecret(ctx context.Context, client client.Client, cluster *v1alpha1. var bootstrap ControlRuntimeBootstrap err := json.Unmarshal(bootstrapData, &bootstrap) + return &bootstrap, err } diff --git a/pkg/controller/cluster/server/config.go b/pkg/controller/cluster/server/config.go index a80f0bbb..190ead74 100644 --- a/pkg/controller/cluster/server/config.go +++ b/pkg/controller/cluster/server/config.go @@ -22,6 +22,7 @@ func (s *Server) Config(init bool, serviceIP string) (*v1.Secret, error) { if init { config = initConfigData(s.cluster, s.token) } + return &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", @@ -52,21 +53,26 @@ func serverOptions(cluster *v1alpha1.Cluster, token string) string { if token != "" { opts = "token: " + token + "\n" } + if cluster.Status.ClusterCIDR != "" { opts = opts + "cluster-cidr: " + cluster.Status.ClusterCIDR + "\n" } + if cluster.Status.ServiceCIDR != "" { opts = opts + "service-cidr: " + cluster.Status.ServiceCIDR + "\n" } + if cluster.Spec.ClusterDNS != "" { opts = opts + "cluster-dns: " + cluster.Spec.ClusterDNS + "\n" } + if len(cluster.Status.TLSSANs) > 0 { opts = opts + "tls-san:\n" for _, addr := range cluster.Status.TLSSANs { opts = opts + "- " + addr + "\n" } } + if cluster.Spec.Mode != agent.VirtualNodeMode { opts = opts + "disable-agent: true\negress-selector-mode: disabled\ndisable:\n- servicelb\n- traefik\n- metrics-server\n- local-storage" } @@ -79,5 +85,6 @@ func configSecretName(clusterName string, init bool) string { if !init { return controller.SafeConcatNameWithPrefix(clusterName, configName) } + return controller.SafeConcatNameWithPrefix(clusterName, initConfigName) } diff --git a/pkg/controller/cluster/server/server.go b/pkg/controller/cluster/server/server.go index a3763aff..72ea8948 100644 --- a/pkg/controller/cluster/server/server.go +++ b/pkg/controller/cluster/server/server.go @@ -50,6 +50,7 @@ func (s *Server) podSpec(image, name string, persistent bool, startupCmd string) if s.cluster.Spec.Limit != nil && s.cluster.Spec.Limit.ServerLimit != nil { limit = s.cluster.Spec.Limit.ServerLimit } + podSpec := v1.PodSpec{ NodeSelector: s.cluster.Spec.NodeSelector, PriorityClassName: s.cluster.Spec.PriorityClass, @@ -218,6 +219,7 @@ func (s *Server) podSpec(image, name string, persistent bool, startupCmd string) Privileged: ptr.To(true), } } + return podSpec } @@ -228,6 +230,7 @@ func (s *Server) StatefulServer(ctx context.Context) (*apps.StatefulSet, error) err error persistent bool ) + image := controller.K3SImage(s.cluster) name := controller.SafeConcatNameWithPrefix(s.cluster.Name, serverName) @@ -238,8 +241,11 @@ func (s *Server) StatefulServer(ctx context.Context) (*apps.StatefulSet, error) pvClaim = s.setupDynamicPersistence() } - var volumes []v1.Volume - var volumeMounts []v1.VolumeMount + var ( + volumes []v1.Volume + volumeMounts []v1.VolumeMount + ) + for _, addon := range s.cluster.Spec.Addons { namespace := k3kSystemNamespace if addon.SecretNamespace != "" { @@ -306,6 +312,7 @@ func (s *Server) StatefulServer(ctx context.Context) (*apps.StatefulSet, error) if err != nil { return nil, err } + podSpec := s.podSpec(image, name, persistent, startupCommand) podSpec.Volumes = append(podSpec.Volumes, volumes...) podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, volumeMounts...) @@ -359,7 +366,6 @@ func (s *Server) setupDynamicPersistence() v1.PersistentVolumeClaim { }, }, } - } func (s *Server) setupStartCommand() (string, error) { @@ -369,10 +375,12 @@ func (s *Server) setupStartCommand() (string, error) { if *s.cluster.Spec.Servers > 1 { tmpl = HAServerTemplate } + tmplCmd, err := template.New("").Parse(tmpl) if err != nil { return "", err } + if err := tmplCmd.Execute(&output, map[string]string{ "ETCD_DIR": "/var/lib/rancher/k3s/server/db/etcd", "INIT_CONFIG": "/opt/rancher/k3s/init/config.yaml", @@ -381,5 +389,6 @@ func (s *Server) setupStartCommand() (string, error) { }); err != nil { return "", err } + return output.String(), nil } diff --git a/pkg/controller/cluster/server/service.go b/pkg/controller/cluster/server/service.go index d9bd307a..6effdb64 100644 --- a/pkg/controller/cluster/server/service.go +++ b/pkg/controller/cluster/server/service.go @@ -54,9 +54,11 @@ func Service(cluster *v1alpha1.Cluster) *v1.Service { if nodePortConfig.ServerPort != nil { k3sServerPort.NodePort = *nodePortConfig.ServerPort } + if nodePortConfig.ServicePort != nil { k3sServicePort.NodePort = *nodePortConfig.ServicePort } + if nodePortConfig.ETCDPort != nil { etcdPort.NodePort = *nodePortConfig.ETCDPort } diff --git a/pkg/controller/cluster/token.go b/pkg/controller/cluster/token.go index e2ace64d..a8650b41 100644 --- a/pkg/controller/cluster/token.go +++ b/pkg/controller/cluster/token.go @@ -26,13 +26,17 @@ func (c *ClusterReconciler) token(ctx context.Context, cluster *v1alpha1.Cluster Name: cluster.Spec.TokenSecretRef.Name, Namespace: cluster.Spec.TokenSecretRef.Namespace, } + var tokenSecret v1.Secret + if err := c.Client.Get(ctx, nn, &tokenSecret); err != nil { return "", err } + if _, ok := tokenSecret.Data["token"]; !ok { return "", fmt.Errorf("no token field in secret %s/%s", nn.Namespace, nn.Name) } + return string(tokenSecret.Data["token"]), nil } @@ -75,15 +79,16 @@ func (c *ClusterReconciler) ensureTokenSecret(ctx context.Context, cluster *v1al } return token, err - } func random(size int) (string, error) { token := make([]byte, size) + _, err := rand.Read(token) if err != nil { return "", err } + return hex.EncodeToString(token), err } diff --git a/pkg/controller/clusterset/clusterset.go b/pkg/controller/clusterset/clusterset.go index 047349b2..5316d56a 100644 --- a/pkg/controller/clusterset/clusterset.go +++ b/pkg/controller/clusterset/clusterset.go @@ -67,10 +67,13 @@ func Add(ctx context.Context, mgr manager.Manager, clusterCIDR string) error { // namespaceEventHandler will enqueue reconciling requests for all the ClusterSets in the changed namespace func namespaceEventHandler(reconciler ClusterSetReconciler) handler.MapFunc { return func(ctx context.Context, obj client.Object) []reconcile.Request { - var requests []reconcile.Request - var set v1alpha1.ClusterSetList + var ( + requests []reconcile.Request + set v1alpha1.ClusterSetList + ) _ = reconciler.Client.List(ctx, &set, client.InNamespace(obj.GetName())) + for _, clusterSet := range set.Items { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -87,10 +90,13 @@ func namespaceEventHandler(reconciler ClusterSetReconciler) handler.MapFunc { // sameNamespaceEventHandler will enqueue reconciling requests for all the ClusterSets in the changed namespace func sameNamespaceEventHandler(reconciler ClusterSetReconciler) handler.MapFunc { return func(ctx context.Context, obj client.Object) []reconcile.Request { - var requests []reconcile.Request - var set v1alpha1.ClusterSetList + var ( + requests []reconcile.Request + set v1alpha1.ClusterSetList + ) _ = reconciler.Client.List(ctx, &set, client.InNamespace(obj.GetNamespace())) + for _, clusterSet := range set.Items { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -199,6 +205,7 @@ func netpol(ctx context.Context, clusterCIDR string, clusterSet *v1alpha1.Cluste if err := client.List(ctx, &nodeList); err != nil { return nil, err } + for _, node := range nodeList.Items { cidrList = append(cidrList, node.Spec.PodCIDRs...) } @@ -261,6 +268,7 @@ func (c *ClusterSetReconciler) reconcileNamespacePodSecurityLabels(ctx context.C log.Info("reconciling Namespace") var ns v1.Namespace + key := types.NamespacedName{Name: clusterSet.Namespace} if err := c.Client.Get(ctx, key, &ns); err != nil { return err @@ -295,8 +303,10 @@ func (c *ClusterSetReconciler) reconcileNamespacePodSecurityLabels(ctx context.C log.V(1).Info("labels changed, updating namespace") ns.Labels = newLabels + return c.Client.Update(ctx, &ns) } + return nil } diff --git a/pkg/controller/clusterset/node.go b/pkg/controller/clusterset/node.go index 05c4a232..6c78a102 100644 --- a/pkg/controller/clusterset/node.go +++ b/pkg/controller/clusterset/node.go @@ -68,6 +68,7 @@ func (n *NodeReconciler) ensureNetworkPolicies(ctx context.Context, clusterSetLi log.Info("ensuring network policies") var setNetworkPolicy *networkingv1.NetworkPolicy + for _, cs := range clusterSetList.Items { if cs.Spec.DisableNetworkPolicy { continue @@ -78,14 +79,17 @@ func (n *NodeReconciler) ensureNetworkPolicies(ctx context.Context, clusterSetLi var err error setNetworkPolicy, err = netpol(ctx, "", &cs, n.Client) + if err != nil { return err } log.Info("new NetworkPolicy for clusterset") + if err := n.Client.Update(ctx, setNetworkPolicy); err != nil { return err } } + return nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5bee65a9..6eccfdb2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -51,7 +51,9 @@ func SafeConcatName(name ...string) string { if len(fullPath) < 64 { return fullPath } + digest := sha256.Sum256([]byte(fullPath)) + // since we cut the string in the middle, the last char may not be compatible with what is expected in k8s // we are checking and if necessary removing the last char c := fullPath[56] diff --git a/pkg/controller/kubeconfig/kubeconfig.go b/pkg/controller/kubeconfig/kubeconfig.go index baad8bbe..aa977c33 100644 --- a/pkg/controller/kubeconfig/kubeconfig.go +++ b/pkg/controller/kubeconfig/kubeconfig.go @@ -40,6 +40,7 @@ func (k *KubeConfig) Extract(ctx context.Context, client client.Client, cluster if err != nil { return nil, err } + serverCACert := []byte(bootstrapData.ServerCA.Content) adminCert, adminKey, err := certs.CreateClientCertKey( @@ -110,6 +111,7 @@ func getURLFromService(ctx context.Context, client client.Client, cluster *v1alp expose := cluster.Spec.Expose if expose != nil && expose.Ingress != nil { var k3kIngress networkingv1.Ingress + ingressKey := types.NamespacedName{ Name: server.IngressName(cluster.Name), Namespace: cluster.Namespace, @@ -118,6 +120,7 @@ func getURLFromService(ctx context.Context, client client.Client, cluster *v1alp if err := client.Get(ctx, ingressKey, &k3kIngress); err != nil { return "", err } + url = fmt.Sprintf("https://%s", k3kIngress.Spec.Rules[0].Host) } diff --git a/tests/common_test.go b/tests/common_test.go index 2095a32c..f30abde5 100644 --- a/tests/common_test.go +++ b/tests/common_test.go @@ -55,17 +55,19 @@ func NewVirtualCluster(cluster v1alpha1.Cluster) { WithTimeout(time.Minute * 2). WithPolling(time.Second * 5). Should(BeTrue()) - } // NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster func NewVirtualK8sClient(cluster v1alpha1.Cluster) *kubernetes.Clientset { GinkgoHelper() - var err error + var ( + err error + config *clientcmdapi.Config + ) + ctx := context.Background() - var config *clientcmdapi.Config Eventually(func() error { vKubeconfig := kubeconfig.New() kubeletAltName := fmt.Sprintf("k3k-%s-kubelet", cluster.Name) diff --git a/tests/tests_suite_test.go b/tests/tests_suite_test.go index 4f49f536..9e2eec7f 100644 --- a/tests/tests_suite_test.go +++ b/tests/tests_suite_test.go @@ -161,8 +161,10 @@ func buildScheme() *runtime.Scheme { } func writeK3kLogs() { - var err error - var podList v1.PodList + var ( + err error + podList v1.PodList + ) ctx := context.Background() err = k8sClient.List(ctx, &podList, &client.ListOptions{Namespace: "k3k-system"}) @@ -176,11 +178,14 @@ func writeK3kLogs() { } func writeLogs(filename string, logs io.ReadCloser) { + defer logs.Close() + logsStr, err := io.ReadAll(logs) Expect(err).To(Not(HaveOccurred())) - defer logs.Close() + tempfile := path.Join(os.TempDir(), filename) err = os.WriteFile(tempfile, []byte(logsStr), 0644) Expect(err).To(Not(HaveOccurred())) + fmt.Fprintln(GinkgoWriter, "logs written to: "+filename) }