Skip to content

Commit

Permalink
add provider spec, refactor sleep state reconcil, add tests (#91)
Browse files Browse the repository at this point in the history
* add provider spec

* refactor sleep state reconciliation

* add sleep test reconciliation tests
  • Loading branch information
waveywaves authored Feb 27, 2024
1 parent 67ec5e9 commit d5e15a8
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 293 deletions.
7 changes: 7 additions & 0 deletions config/crd/bases/uffizzi.com_uffizziclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ spec:
type: object
manifests:
type: string
provider:
default: vanila
enum:
- vanila
- gke
- eks
type: string
resourceQuota:
description: UffizziClusterResourceQuota defines the resource quota
which defines the quota of resources a namespace has access to
Expand Down
1 change: 0 additions & 1 deletion controllers/constants/constants.go

This file was deleted.

7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func main() {
probeAddr string
enableLeaderElection bool
concurrentReconciliations int
k8sProvider string
)

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -72,7 +71,6 @@ func main() {
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&concurrentReconciliations, "concurrent", 5, "The number of concurrent reconciles per controller.")
flag.StringVar(&k8sProvider, "k8s-provider", "", "The k8s provider to use for the UffizziCluster")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -101,9 +99,8 @@ func main() {
}
// Setup UffizziClusterReconciler
if err = (&uffizzicluster.UffizziClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
KubernetesProvider: k8sProvider,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "UffizziCluster")
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion src/api/v1alpha1/uffizzicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ type UffizziClusterResourceCount struct {
type UffizziClusterSpec struct {
//+kubebuilder:default:="k3s"
//+kubebuilder:validation:Enum=k3s;k8s
Distro string `json:"distro,omitempty"`
Distro string `json:"distro,omitempty"`
//+kubebuilder:default:="vanila"
//+kubebuilder:validation:Enum=vanila;gke;eks
Provider string `json:"provider,omitempty"`
APIServer UffizziClusterAPIServer `json:"apiServer,omitempty"`
Ingress UffizziClusterIngress `json:"ingress,omitempty"`
TTL string `json:"ttl,omitempty"`
Expand Down
197 changes: 78 additions & 119 deletions src/controllers/uffizzicluster/uffizzicluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package uffizzicluster
import (
"context"
"encoding/json"
"fmt"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/api/v1alpha1"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/pkg/constants"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/pkg/helm/build/vcluster"
Expand All @@ -42,8 +43,7 @@ import (
// UffizziClusterReconciler reconciles a UffizziCluster object
type UffizziClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
KubernetesProvider string
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=uffizzi.com,resources=uffizziclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -250,15 +250,15 @@ func (r *UffizziClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// ----------------------
// UCLUSTER SLEEP
// ----------------------
//if err := r.reconcileSleepState(ctx, uCluster); err != nil {
// if k8serrors.IsNotFound(err) {
// // logger.Info("vcluster statefulset not found, requeueing")
// return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
// }
// // cluster did not sleep
// logger.Info("Failed to reconcile sleep state, reconciling again", "Error", err.Error())
// return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
//}
if err := r.reconcileSleepState(ctx, uCluster); err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("vcluster statefulset not found, will check again in the next round")
return ctrl.Result{}, nil
}
// cluster did not sleep
logger.Info("Failed to reconcile sleep state, reconciling again", "Error", err.Error())
return ctrl.Result{}, nil
}

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -299,128 +299,87 @@ func (r *UffizziClusterReconciler) createEgressPolicy(ctx context.Context, uClus
// reconcileSleepState reconciles the sleep state of the vcluster
// it also makes sure that the vcluster is up and running before setting the sleep state
func (r *UffizziClusterReconciler) reconcileSleepState(ctx context.Context, uCluster *v1alpha1.UffizziCluster) error {
// get the patch copy of the uCluster so that we can have a good diff between the uCluster and patch object
//
patch := client.MergeFrom(uCluster.DeepCopy())
var (
// get the patch copy of the uCluster so that we can have a good diff between the uCluster and patch object
patch = client.MergeFrom(uCluster.DeepCopy())
ucWorkload runtime.Object
etcdStatefulSet *appsv1.StatefulSet
err error
)
// get the stateful set or deployment created by the helm chart
ucWorkload, err := r.getUffizziClusterWorkload(ctx, uCluster)
if err != nil {
return err
// k3s is a statefulset, k8s is a deployment
if ucWorkload, err = r.getUffizziClusterWorkload(ctx, uCluster); err != nil {
return fmt.Errorf("failed to get uffizzicluster workload: %w", err)
}
// get the etcd stateful set created by the helm chart
etcdStatefulSet, err := r.getEtcdStatefulSet(ctx, uCluster)
if uCluster.Spec.ExternalDatastore == constants.ETCD {
if etcdStatefulSet, err = r.getEtcdStatefulSet(ctx, uCluster); err != nil {
return fmt.Errorf("failed to get etcd statefulset: %w", err)
}
}
// execute sleep reconciliation based on the type of workload
// TODO: Abstract the actual sleep reconciliation logic into a separate function so that it can be reused
// for different types of workloads, i.e. statefulset, deployment, daemonset
switch ucWorkload.(type) {
case *appsv1.StatefulSet:
ucStatefulSet := ucWorkload.(*appsv1.StatefulSet)
currentReplicas := ucStatefulSet.Spec.Replicas
// scale the vcluster instance to 0 if the sleep flag is true
if uCluster.Spec.Sleep && *currentReplicas > 0 {
var err error
if err = r.waitForStatefulSetToScale(ctx, 0, ucStatefulSet); err == nil {
setCondition(uCluster, APINotReady())
}
if uCluster.Spec.ExternalDatastore == constants.ETCD {
if err = r.waitForStatefulSetToScale(ctx, 0, etcdStatefulSet); err == nil {
setCondition(uCluster, DataStoreNotReady())
}
} else {
setCondition(uCluster, DataStoreNotReady())
}
if err != nil {
return err
}
err = r.deleteWorkloads(ctx, uCluster)
if err != nil {
return err
}
sleepingTime := metav1.Now().Rfc3339Copy()
setCondition(uCluster, Sleeping(sleepingTime))
// if the current replicas is 0, then do nothing
} else if !uCluster.Spec.Sleep && *currentReplicas == 0 {
statefulSets := []*appsv1.StatefulSet{}
statefulSets = append(statefulSets, ucStatefulSet)
if uCluster.Spec.ExternalDatastore == constants.ETCD {
statefulSets = append(statefulSets, etcdStatefulSet)
}
if err := r.scaleStatefulSets(ctx, 1, statefulSets...); err != nil {
return err
}
currentReplicas := r.getReplicasForWorkload(ucWorkload)
// scale the vcluster instance to 0 if the sleep flag is true
if uCluster.Spec.Sleep && currentReplicas > 0 {
if err = r.scaleWorkloads(ctx, 0, ucWorkload); err != nil {
return err
}
// ensure that the statefulset is up if the cluster is not sleeping
if !uCluster.Spec.Sleep {
// set status for vcluster waking up
lastAwakeTime := metav1.Now().Rfc3339Copy()
uCluster.Status.LastAwakeTime = lastAwakeTime
// if the above runs successfully, then set the status to awake
setCondition(uCluster, Awoken(lastAwakeTime))
var err error
if uCluster.Spec.ExternalDatastore == constants.ETCD {
if err = r.waitForStatefulSetToScale(ctx, 1, etcdStatefulSet); err == nil {
setCondition(uCluster, DataStoreReady())
}
} else {
setCondition(uCluster, DataStoreReady())
}
if err = r.waitForStatefulSetToScale(ctx, 1, ucStatefulSet); err == nil {
setCondition(uCluster, APIReady())
}
if err != nil {
return err
}
if err = r.waitForWorkloadToScale(ctx, 0, ucWorkload); err == nil {
setCondition(uCluster, APINotReady())
}
case *appsv1.Deployment:
ucDeployment := ucWorkload.(*appsv1.Deployment)
currentReplicas := ucDeployment.Spec.Replicas
// scale the vcluster instance to 0 if the sleep flag is true
if uCluster.Spec.Sleep && *currentReplicas > 0 {
var err error
if err = r.waitForDeploymentToScale(ctx, 0, ucDeployment); err == nil {
setCondition(uCluster, APINotReady())
}
if err = r.waitForStatefulSetToScale(ctx, 0, etcdStatefulSet); err == nil {
setCondition(uCluster, DataStoreNotReady())
}
if err != nil {
return err
}

err = r.deleteWorkloads(ctx, uCluster)
if err != nil {
if uCluster.Spec.ExternalDatastore == constants.ETCD {
if err = r.scaleWorkloads(ctx, 0, etcdStatefulSet); err != nil {
return err
}
sleepingTime := metav1.Now().Rfc3339Copy()
setCondition(uCluster, Sleeping(sleepingTime))
// if the current replicas is 0, then do nothing
} else if !uCluster.Spec.Sleep && *currentReplicas == 0 {
if err := r.scaleDeployments(ctx, 1, ucDeployment); err != nil {
return err
if err = r.waitForWorkloadToScale(ctx, 0, etcdStatefulSet); err == nil {
setCondition(uCluster, DataStoreNotReady())
}
} else {
setCondition(uCluster, DataStoreNotReady())
}
// ensure that the deployment is up if the cluster is not sleeping
if !uCluster.Spec.Sleep {
// set status for vcluster waking up
lastAwakeTime := metav1.Now().Rfc3339Copy()
uCluster.Status.LastAwakeTime = lastAwakeTime
// if the above runs successfully, then set the status to awake
setCondition(uCluster, Awoken(lastAwakeTime))
var err error
if err = r.waitForStatefulSetToScale(ctx, 1, etcdStatefulSet); err == nil {
setCondition(uCluster, APIReady())
}
if err = r.waitForDeploymentToScale(ctx, 1, ucDeployment); err == nil {
if err != nil {
return err
}
err = r.deleteWorkloads(ctx, uCluster)
if err != nil {
return err
}
sleepingTime := metav1.Now().Rfc3339Copy()
setCondition(uCluster, Sleeping(sleepingTime))
// if the current replicas is 0, then do nothing
} else if !uCluster.Spec.Sleep && currentReplicas == 0 {
workloads := []runtime.Object{}
workloads = append(workloads, ucWorkload)
if uCluster.Spec.ExternalDatastore == constants.ETCD {
workloads = append(workloads, etcdStatefulSet)
}
if err := r.scaleWorkloads(ctx, 1, workloads...); err != nil {
return err
}
}
// ensure that the statefulset is up if the cluster is not sleeping
if !uCluster.Spec.Sleep {
// set status for vcluster waking up
lastAwakeTime := metav1.Now().Rfc3339Copy()
uCluster.Status.LastAwakeTime = lastAwakeTime
// if the above runs successfully, then set the status to awake
setCondition(uCluster, Awoken(lastAwakeTime))
var err error
if uCluster.Spec.ExternalDatastore == constants.ETCD {
if err = r.waitForWorkloadToScale(ctx, 1, etcdStatefulSet); err == nil {
setCondition(uCluster, DataStoreReady())
}
if err != nil {
return err
}
} else {
setCondition(uCluster, DataStoreReady())
}
if err = r.waitForWorkloadToScale(ctx, 1, ucWorkload); err == nil {
setCondition(uCluster, APIReady())
}
if err != nil {
return err
}

default:
return errors.New("unknown workload type for vcluster")
}

if err := r.Status().Patch(ctx, uCluster, patch); err != nil {
return err
}
Expand Down
62 changes: 62 additions & 0 deletions src/controllers/uffizzicluster/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package uffizzicluster

import (
context "context"
"fmt"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/api/v1alpha1"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/pkg/constants"
"github.com/UffizziCloud/uffizzi-cluster-operator/src/pkg/helm/build/vcluster"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -33,3 +35,63 @@ func (r *UffizziClusterReconciler) deleteWorkloads(ctx context.Context, uc *v1al
}
return nil
}

func (r *UffizziClusterReconciler) scaleWorkloads(ctx context.Context, scale int, workloads ...runtime.Object) error {
sss := []*appsv1.StatefulSet{}
ds := []*appsv1.Deployment{}
for _, w := range workloads {
if w != nil {
switch w.(type) {
case *appsv1.StatefulSet:
ss := w.(*appsv1.StatefulSet)
sss = append(sss, ss)
case *appsv1.Deployment:
d := w.(*appsv1.Deployment)
ds = append(ds, d)
}
}
}
if len(sss) > 0 {
if err := r.scaleStatefulSets(ctx, scale, sss...); err != nil {
return fmt.Errorf("failed to scale stateful sets: %w", err)
}
}
if len(ds) > 0 {
if err := r.scaleDeployments(ctx, scale, ds...); err != nil {
return fmt.Errorf("failed to scale deployments: %w", err)
}
}
return nil
}

func (r *UffizziClusterReconciler) waitForWorkloadToScale(ctx context.Context, scale int, w runtime.Object) error {
if w != nil {
switch w.(type) {
case *appsv1.StatefulSet:
ss := w.(*appsv1.StatefulSet)
if err := r.waitForStatefulSetToScale(ctx, scale, ss); err != nil {
return fmt.Errorf("failed to wait for stateful sets to scale: %w", err)
}
case *appsv1.Deployment:
d := w.(*appsv1.Deployment)
if err := r.waitForDeploymentToScale(ctx, scale, d); err != nil {
return fmt.Errorf("failed to wait for deployments to scale: %w", err)
}
}
}
return nil
}

func (r *UffizziClusterReconciler) getReplicasForWorkload(w runtime.Object) int {
if w != nil {
switch w.(type) {
case *appsv1.StatefulSet:
ss := w.(*appsv1.StatefulSet)
return int(*ss.Spec.Replicas)
case *appsv1.Deployment:
d := w.(*appsv1.Deployment)
return int(*d.Spec.Replicas)
}
}
return 0
}
13 changes: 6 additions & 7 deletions src/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const (
LOFT_CHART_REPO_URL = "https://charts.loft.sh"
VCLUSTER_K3S_DISTRO = "k3s"
VCLUSTER_K8S_DISTRO = "k8s"
PROVIDER_NOTHING = "vanila"
PROVIDER_GKE = "gke"
PROVIDER_EKS = "eks"
K3S_DATASTORE_ENDPOINT = "K3S_DATASTORE_ENDPOINT"
VCLUSTER_INGRESS_HOSTNAME = "VCLUSTER_INGRESS_HOST"
DEFAULT_K3S_VERSION = "rancher/k3s:v1.27.3-k3s1"
Expand All @@ -31,13 +34,9 @@ const (
)

type LIFECYCLE_OP_TYPE string
type KUBERNETES_PROVIDER string

const (
LIFECYCLE_OP_TYPE_CREATE LIFECYCLE_OP_TYPE = "create"
LIFECYCLE_OP_TYPE_UPDATE LIFECYCLE_OP_TYPE = "update"
LIFECYCLE_OP_TYPE_DELETE LIFECYCLE_OP_TYPE = "delete"
NO_KUBE_PROVIDER KUBERNETES_PROVIDER = ""
GKE_KUBE_PROVIDER KUBERNETES_PROVIDER = "gke"
EKS_KUBE_PROVIDER KUBERNETES_PROVIDER = "eks"
LIFECYCLE_OP_TYPE_CREATE LIFECYCLE_OP_TYPE = "create"
LIFECYCLE_OP_TYPE_UPDATE LIFECYCLE_OP_TYPE = "update"
LIFECYCLE_OP_TYPE_DELETE LIFECYCLE_OP_TYPE = "delete"
)
Loading

0 comments on commit d5e15a8

Please sign in to comment.