diff --git a/pkg/cmd/kubernikus/operator.go b/pkg/cmd/kubernikus/operator.go index 3f1b5eb9c3..bacff3a1a4 100644 --- a/pkg/cmd/kubernikus/operator.go +++ b/pkg/cmd/kubernikus/operator.go @@ -8,6 +8,7 @@ import ( "sync" "syscall" + "github.com/go-kit/kit/log" "github.com/golang/glog" "github.com/sapcc/kubernikus/pkg/cmd" "github.com/sapcc/kubernikus/pkg/controller" @@ -86,12 +87,16 @@ func (o *Options) Complete(args []string) error { } func (o *Options) Run(c *cobra.Command) error { + var logger log.Logger + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + sigs := make(chan os.Signal, 1) stop := make(chan struct{}) signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) // Push signals into channel wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - go controller.NewKubernikusOperator(&o.KubernikusOperatorOptions).Run(stop, wg) + go controller.NewKubernikusOperator(&o.KubernikusOperatorOptions, logger).Run(stop, wg) go controller.ExposeMetrics(o.MetricPort, stop, wg) <-sigs // Wait for signals (this hangs until a signal arrives) diff --git a/pkg/controller/base.go b/pkg/controller/base.go deleted file mode 100644 index 38f88ca1c5..0000000000 --- a/pkg/controller/base.go +++ /dev/null @@ -1,143 +0,0 @@ -package controller - -import ( - "fmt" - "reflect" - "sync" - "time" - - "github.com/golang/glog" - "github.com/sapcc/kubernikus/pkg/controller/config" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -type BaseController interface { - config.Controller - reconcile(key string) error -} - -type Base struct { - Clients - queue workqueue.RateLimitingInterface - informer cache.SharedIndexInformer - Controller BaseController -} - -func NewBaseController(clients Clients, informer cache.SharedIndexInformer) Base { - base := Base{ - Clients: clients, - queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second)), - informer: informer, - } - - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - base.queue.Add(key) - } - }, - UpdateFunc: func(old interface{}, new interface{}) { - key, err := cache.MetaNamespaceKeyFunc(new) - if err == nil { - base.queue.Add(key) - } - }, - DeleteFunc: func(obj interface{}) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - base.queue.Add(key) - } - }, - }) - - return base -} - -func (base *Base) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer base.queue.ShutDown() - defer wg.Done() - wg.Add(1) - glog.Infof("Starting %v with %d workers", base.getName(), threadiness) - - for i := 0; i < threadiness; i++ { - go wait.Until(base.runWorker, time.Second, stopCh) - } - - ticker := time.NewTicker(KLUSTER_RECHECK_INTERVAL) - go func() { - for { - select { - case <-ticker.C: - for _, key := range base.informer.GetStore().ListKeys() { - base.queue.Add(key) - } - case <-stopCh: - ticker.Stop() - return - } - } - }() - - <-stopCh -} - -func (base *Base) runWorker() { - for base.processNextWorkItem() { - } -} - -func (base *Base) processNextWorkItem() bool { - key, quit := base.queue.Get() - if quit { - return false - } - defer base.queue.Done(key) - - // Invoke the method containing the business logic - err := base.reconciliation(key.(string)) - base.handleErr(err, key) - return true -} - -func (base *Base) reconcile(key string) error { - return fmt.Errorf("NotImplemented") -} - -func (base *Base) handleErr(err error, key interface{}) { - if err == nil { - // Forget about the #AddRateLimited history of the key on every successful synchronization. - // This ensures that future processing of updates for this key is not delayed because of - // an outdated error history. - base.queue.Forget(key) - return - } - - glog.Infof("[%v] Error while processing %v: %v", base.getName(), key, err) - // This controller retries 5 times if something goes wrong. After that, it stops trying. - if base.queue.NumRequeues(key) < 5 { - // Re-enqueue the key rate limited. Based on the rate limiter on the - // queue and the re-enqueue history, the key will be processed later again. - base.queue.AddRateLimited(key) - return - } - - glog.V(5).Infof("[%v] Dropping %v from queue because of too many errors...", base.getName(), key) - base.queue.Forget(key) -} - -func getControllerName(c config.Controller) string { - return reflect.TypeOf(c).Elem().Name() - -} - -func (base *Base) getName() string { - return getControllerName(base.Controller) -} - -func (base *Base) reconciliation(key string) error { - glog.V(5).Infof("[%v] Reconciling %v", base.getName(), key) - return base.Controller.reconcile(key) -} diff --git a/pkg/controller/base/controller.go b/pkg/controller/base/controller.go new file mode 100644 index 0000000000..cd0066f714 --- /dev/null +++ b/pkg/controller/base/controller.go @@ -0,0 +1,192 @@ +package base + +import ( + "errors" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" + "github.com/sapcc/kubernikus/pkg/controller/config" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +const ( + BASE_DELAY = 5 * time.Second + MAX_DELAY = 300 * time.Second + KLUSTER_RECHECK_INTERVAL = 5 * time.Minute +) + +var ErrUnkownKluster = errors.New("unkown kluster") + +type Controller interface { + Run(int, <-chan struct{}, *sync.WaitGroup) +} + +type Reconciler interface { + Reconcile(kluster *v1.Kluster) (bool, error) +} + +type controller struct { + config.Factories + config.Clients + + queue workqueue.RateLimitingInterface + reconciler Reconciler + + logger log.Logger +} + +func NewController(factories config.Factories, clients config.Clients, reconciler Reconciler, logger log.Logger) Controller { + c := &controller{ + Factories: factories, + Clients: clients, + queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(BASE_DELAY, MAX_DELAY)), + reconciler: reconciler, + logger: logger, + } + + c.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }) + + return c +} + +func (c *controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) { + c.logger.Log( + "msg", "starting run loop", + "threadiness", threadiness, + ) + + defer c.queue.ShutDown() + defer wg.Done() + wg.Add(1) + + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + ticker := time.NewTicker(KLUSTER_RECHECK_INTERVAL) + go func() { + for { + select { + case <-ticker.C: + c.requeueAllKlusters() + case <-stopCh: + ticker.Stop() + return + } + } + }() + + <-stopCh +} + +func (c *controller) requeueAllKlusters() (err error) { + defer func() { + c.logger.Log( + "msg", "requeued all", + "err", err, + ) + }() + + klusters, err := c.Factories.Kubernikus.Kubernikus().V1().Klusters().Lister().List(labels.Everything()) + if err != nil { + return err + } + + for _, kluster := range klusters { + c.requeueKluster(kluster) + } + + return nil +} + +func (c *controller) requeueKluster(kluster *v1.Kluster) { + c.logger.Log( + "msg", "queuing", + "kluster", kluster.Spec.Name, + "project", kluster.Account(), + ) + + key, err := cache.MetaNamespaceKeyFunc(kluster) + if err == nil { + c.queue.Add(key) + } +} + +func (c *controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *controller) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + var err error + var kluster *v1.Kluster + var requeue bool + + obj, exists, _ := c.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer().GetIndexer().GetByKey(key.(string)) + if !exists { + err = ErrUnkownKluster + } else { + kluster = obj.(*v1.Kluster) + } + + if err == nil { + // Invoke the method containing the business logic + requeue, err = c.reconciler.Reconcile(kluster) + } + + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + + if requeue == false { + c.queue.Forget(key) + } else { + // Requeue requested + c.queue.AddAfter(key, BASE_DELAY) + } + + return true + } + + if c.queue.NumRequeues(key) < 5 { + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + c.queue.AddRateLimited(key) + return true + } + + // Retries exceeded. Forgetting for this reconciliation loop + c.queue.Forget(key) + return true +} diff --git a/pkg/controller/base/logging.go b/pkg/controller/base/logging.go new file mode 100644 index 0000000000..117c2e1c17 --- /dev/null +++ b/pkg/controller/base/logging.go @@ -0,0 +1,49 @@ +package base + +import ( + "fmt" + "time" + + "github.com/go-kit/kit/log" + "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" +) + +var RECONCILLIATION_COUNTER = 0 + +type LoggingReconciler struct { + Reconciler Reconciler + Logger log.Logger +} + +type EventingReconciler struct { + Reconciler +} + +type InstrumentedReconciler struct { + Reconciler +} + +func (r *InstrumentedReconciler) Reconcile(kluster *v1.Kluster) (requeue bool, err error) { + defer func() { + RECONCILLIATION_COUNTER = RECONCILLIATION_COUNTER + 1 + fmt.Printf("Metrics: Reconciled %v kluster\n", RECONCILLIATION_COUNTER) + }() + return r.Reconciler.Reconcile(kluster) +} + +func (r *EventingReconciler) Reconcile(kluster *v1.Kluster) (requeue bool, err error) { + fmt.Printf("EVENT: Reconciled %v\n", kluster.Name) + return r.Reconciler.Reconcile(kluster) +} + +func (r *LoggingReconciler) Reconcile(kluster *v1.Kluster) (requeue bool, err error) { + defer func(begin time.Time) { + r.Logger.Log( + "msg", "reconciled kluster", + "kluster", kluster.Name, + "reqeue", requeue, + "took", time.Since(begin), + "err", err) + }(time.Now()) + return r.Reconciler.Reconcile(kluster) +} diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index a0c2259b5f..963849c8c4 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -1,6 +1,17 @@ package config -import "sync" +import ( + "sync" + + kube "github.com/sapcc/kubernikus/pkg/client/kubernetes" + kubernikus_clientset "github.com/sapcc/kubernikus/pkg/generated/clientset" + kubernikus_informers "github.com/sapcc/kubernikus/pkg/generated/informers/externalversions" + kubernetes_informers "k8s.io/client-go/informers" + kubernetes_clientset "k8s.io/client-go/kubernetes" + + "github.com/sapcc/kubernikus/pkg/client/openstack" + "k8s.io/helm/pkg/helm" +) type Controller interface { Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) @@ -32,3 +43,16 @@ type Config struct { Kubernikus KubernikusConfig Helm HelmConfig } + +type Clients struct { + Kubernikus kubernikus_clientset.Interface + Kubernetes kubernetes_clientset.Interface + Satellites *kube.SharedClientFactory + Openstack openstack.Client + Helm *helm.Client +} + +type Factories struct { + Kubernikus kubernikus_informers.SharedInformerFactory + Kubernetes kubernetes_informers.SharedInformerFactory +} diff --git a/pkg/controller/events/event.go b/pkg/controller/events/event.go new file mode 100644 index 0000000000..8e96b12945 --- /dev/null +++ b/pkg/controller/events/event.go @@ -0,0 +1,8 @@ +package events + +const ( + FailedCreateNode = "FailedCreateNode" + FailedDeleteNode = "FailedDeleteNode" + SuccessfullCreateNode = "SuccessfullCreateNode" + SuccessfullDeleteNode = "SuccessfullDeleteNode" +) diff --git a/pkg/controller/ground.go b/pkg/controller/ground.go index f1077daa57..61caede878 100644 --- a/pkg/controller/ground.go +++ b/pkg/controller/ground.go @@ -39,8 +39,8 @@ const ( ) type GroundControl struct { - Clients - Factories + config.Clients + config.Factories config.Config Recorder record.EventRecorder @@ -49,7 +49,7 @@ type GroundControl struct { podInformer cache.SharedIndexInformer } -func NewGroundController(factories Factories, clients Clients, recorder record.EventRecorder, config config.Config) *GroundControl { +func NewGroundController(factories config.Factories, clients config.Clients, recorder record.EventRecorder, config config.Config) *GroundControl { operator := &GroundControl{ Clients: clients, Factories: factories, diff --git a/pkg/controller/launch.go b/pkg/controller/launch.go deleted file mode 100644 index f0b704b3e5..0000000000 --- a/pkg/controller/launch.go +++ /dev/null @@ -1,333 +0,0 @@ -package controller - -import ( - "fmt" - "reflect" - "sync" - "time" - - "github.com/golang/glog" - "github.com/sapcc/kubernikus/pkg/api/models" - "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" - "github.com/sapcc/kubernikus/pkg/client/openstack" - "github.com/sapcc/kubernikus/pkg/templates" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -type LaunchControl struct { - Factories - Clients - queue workqueue.RateLimitingInterface -} - -func NewLaunchController(factories Factories, clients Clients) *LaunchControl { - launchctl := &LaunchControl{ - Factories: factories, - Clients: clients, - queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second)), - } - - launchctl.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - launchctl.queue.Add(key) - } - }, - UpdateFunc: func(old interface{}, new interface{}) { - key, err := cache.MetaNamespaceKeyFunc(new) - if err == nil { - launchctl.queue.Add(key) - } - }, - DeleteFunc: func(obj interface{}) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - launchctl.queue.Add(key) - } - }, - }) - - return launchctl -} - -func (launchctl *LaunchControl) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer launchctl.queue.ShutDown() - defer wg.Done() - wg.Add(1) - glog.Infof(`Starting LaunchControl with %d "threads"`, threadiness) - - for i := 0; i < threadiness; i++ { - go wait.Until(launchctl.runWorker, time.Second, stopCh) - } - - ticker := time.NewTicker(KLUSTER_RECHECK_INTERVAL) - go func() { - for { - select { - case <-ticker.C: - glog.V(2).Infof("Running periodic recheck. Queuing all Klusters...") - - klusters, err := launchctl.Factories.Kubernikus.Kubernikus().V1().Klusters().Lister().List(labels.Everything()) - if err != nil { - glog.Errorf("Couldn't run periodic recheck. Listing klusters failed: %v", err) - } - - for _, kluster := range klusters { - key, err := cache.MetaNamespaceKeyFunc(kluster) - if err == nil { - launchctl.queue.Add(key) - } - } - case <-stopCh: - ticker.Stop() - return - } - } - }() - - <-stopCh -} - -func (launchctl *LaunchControl) runWorker() { - for launchctl.processNextWorkItem() { - } -} - -func (launchctl *LaunchControl) processNextWorkItem() bool { - key, quit := launchctl.queue.Get() - if quit { - return false - } - defer launchctl.queue.Done(key) - - // Invoke the method containing the business logic - err := launchctl.reconcile(key.(string)) - launchctl.handleErr(err, key) - return true -} - -func (launchctl *LaunchControl) requeue(kluster *v1.Kluster) { - key, err := cache.MetaNamespaceKeyFunc(kluster) - if err == nil { - launchctl.queue.AddAfter(key, 5*time.Second) - } -} - -func (launchctl *LaunchControl) reconcile(key string) error { - obj, exists, err := launchctl.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer().GetIndexer().GetByKey(key) - if err != nil { - return fmt.Errorf("Failed to fetch key %s from cache: %s", key, err) - } - if !exists { - glog.Infof("[%v] Kluster deleted in the meantime", key) - return nil - } - - kluster := obj.(*v1.Kluster) - glog.V(5).Infof("[%v] Reconciling", kluster.Name) - - if !(kluster.Status.Phase == models.KlusterPhaseRunning || kluster.Status.Phase == models.KlusterPhaseTerminating) { - //Kluster not ready yet, do nothing - return nil - } - - for _, pool := range kluster.Spec.NodePools { - err := launchctl.syncPool(kluster, &pool) - if err != nil { - return err - } - } - - return nil -} - -func (launchctl *LaunchControl) syncPool(kluster *v1.Kluster, pool *models.NodePool) error { - nodes, err := launchctl.Clients.Openstack.GetNodes(kluster, pool) - if err != nil { - return fmt.Errorf("[%v] Couldn't list nodes for pool %v: %v", kluster.Name, pool.Name, err) - } - - if kluster.Status.Phase == models.KlusterPhaseTerminating { - if toBeTerminated(nodes) > 0 { - glog.V(3).Infof("[%v] Kluster is terminating. Terminating Nodes for Pool %v.", kluster.Name, pool.Name) - for _, node := range nodes { - err := launchctl.terminateNode(kluster, pool, node.ID) - if err != nil { - return err - } - } - } - - return nil - } - - running := running(nodes) - starting := starting(nodes) - ready := running + starting - - setMetricNodePoolSize(kluster.GetName(), pool.Name, pool.Image, pool.Flavor, pool.Size) - setMetricNodePoolStatus(kluster.GetName(), pool.Name, map[string]int64{"running": running, "starting": starting, "ready": ready}) - - switch { - case ready < pool.Size: - glog.V(3).Infof("[%v] Pool %v: Starting/Running/Total: %v/%v/%v. Too few nodes. Need to spawn more.", kluster.Name, pool.Name, starting, running, pool.Size) - return launchctl.createNode(kluster, pool) - case ready > pool.Size: - glog.V(3).Infof("[%v] Pool %v: Starting/Running/Total: %v/%v/%v. Too many nodes. Need to delete some.", kluster.Name, pool.Name, starting, running, pool.Size) - return launchctl.terminateNode(kluster, pool, nodes[0].ID) - case ready == pool.Size: - glog.V(3).Infof("[%v] Pool %v: Starting/Running/Total: %v/%v/%v. All good. Doing nothing.", kluster.Name, pool.Name, starting, running, pool.Size) - } - - if err = launchctl.updateNodePoolStatus(kluster, pool); err != nil { - return err - } - - return nil -} - -func (launchctl *LaunchControl) createNode(kluster *v1.Kluster, pool *models.NodePool) error { - glog.V(2).Infof("[%v] Pool %v: Creating new node", kluster.Name, pool.Name) - secret, err := launchctl.Clients.Kubernetes.CoreV1().Secrets(kluster.Namespace).Get(kluster.GetName(), metav1.GetOptions{}) - if err != nil { - return err - } - - userdata, err := templates.Ignition.GenerateNode(kluster, secret) - if err != nil { - glog.Errorf("Ignition userdata couldn't be generated: %v", err) - } - - id, err := launchctl.Clients.Openstack.CreateNode(kluster, pool, userdata) - if err != nil { - return err - } - - glog.V(2).Infof("[%v] Pool %v: Created node %v.", kluster.Name, pool.Name, id) - if err = launchctl.updateNodePoolStatus(kluster, pool); err != nil { - return err - } - - launchctl.requeue(kluster) - return nil -} - -func (launchctl *LaunchControl) terminateNode(kluster *v1.Kluster, pool *models.NodePool, id string) error { - err := launchctl.Clients.Openstack.DeleteNode(kluster, id) - if err != nil { - return err - } - - glog.V(2).Infof("[%v] Pool %v: Deleted node %v.", kluster.Name, pool.Name, id) - if err = launchctl.updateNodePoolStatus(kluster, pool); err != nil { - return err - } - - launchctl.requeue(kluster) - return nil -} - -func (launchctl *LaunchControl) updateNodePoolStatus(kluster *v1.Kluster, pool *models.NodePool) error { - nodes, err := launchctl.Clients.Openstack.GetNodes(kluster, pool) - if err != nil { - return fmt.Errorf("[%v] Couldn't list nodes for pool %v: %v", kluster.Name, pool.Name, err) - } - - running := running(nodes) - starting := starting(nodes) - - newInfo := models.NodePoolInfo{ - Name: pool.Name, - Size: pool.Size, - Running: running + starting, // Should be running only - Healthy: running, - Schedulable: running, - } - - copy, err := launchctl.Clients.Kubernikus.Kubernikus().Klusters(kluster.Namespace).Get(kluster.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - for i, curInfo := range copy.Status.NodePools { - if curInfo.Name == newInfo.Name { - if reflect.DeepEqual(curInfo, newInfo) { - return nil - } - - copy.Status.NodePools[i] = newInfo - _, err = launchctl.Clients.Kubernikus.Kubernikus().Klusters(copy.Namespace).Update(copy) - return err - } - } - //status for nodepool not there yet. add it. - copy.Status.NodePools = append(copy.Status.NodePools, newInfo) - _, err = launchctl.Clients.Kubernikus.Kubernikus().Klusters(copy.Namespace).Update(copy) - return err -} - -func (launchctl *LaunchControl) handleErr(err error, key interface{}) { - if err == nil { - // Forget about the #AddRateLimited history of the key on every successful synchronization. - // This ensures that future processing of updates for this key is not delayed because of - // an outdated error history. - launchctl.queue.Forget(key) - return - } - - glog.Errorf("[%v] An error occured: %v", key, err) - - // This controller retries 5 times if something goes wrong. After that, it stops trying. - if launchctl.queue.NumRequeues(key) < 5 { - glog.V(6).Infof("Error while managing nodes for kluster %q: %v", key, err) - - // Re-enqueue the key rate limited. Based on the rate limiter on the - // queue and the re-enqueue history, the key will be processed later again. - launchctl.queue.AddRateLimited(key) - return - } - - launchctl.queue.Forget(key) - glog.V(5).Infof("[%v] Dropping out of the queue. Too many retries...", key) -} - -func starting(nodes []openstack.Node) int64 { - var count int64 = 0 - for _, n := range nodes { - if n.Starting() { - count = count + 1 - } - } - - return count -} - -func running(nodes []openstack.Node) int64 { - var count int64 = 0 - for _, n := range nodes { - if n.Running() { - count = count + 1 - } - } - - return count -} - -func toBeTerminated(nodes []openstack.Node) int64 { - var toBeTerminated int64 = 0 - - for _, n := range nodes { - if n.Stopping() { - continue - } - - toBeTerminated = toBeTerminated + 1 - } - - return toBeTerminated -} diff --git a/pkg/controller/launch/controller.go b/pkg/controller/launch/controller.go new file mode 100644 index 0000000000..a1c086a53e --- /dev/null +++ b/pkg/controller/launch/controller.go @@ -0,0 +1,262 @@ +package launch + +import ( + "github.com/sapcc/kubernikus/pkg/api/models" + "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" + "github.com/sapcc/kubernikus/pkg/client/openstack" + "github.com/sapcc/kubernikus/pkg/controller/base" + "github.com/sapcc/kubernikus/pkg/controller/config" + "github.com/sapcc/kubernikus/pkg/templates" + + "github.com/go-kit/kit/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" +) + +type PoolManager interface { + GetStatus() (*PoolStatus, error) + SetStatus(*PoolStatus) error + CreateNode() (string, error) + DeleteNode(string) error +} + +type LaunchReconciler struct { + config.Clients + + Recorder record.EventRecorder + Logger log.Logger +} + +type PoolStatus struct { + Nodes []string + Running int + Starting int + Stopping int + Needed int + UnNeeded int +} + +type ConcretePoolManager struct { + config.Clients + + Kluster *v1.Kluster + Pool *models.NodePool + Logger log.Logger +} + +func NewController(factories config.Factories, clients config.Clients, recorder record.EventRecorder, logger log.Logger) base.Controller { + logger = log.With(logger, + "controller", "launch") + + var reconciler base.Reconciler + reconciler = &LaunchReconciler{clients, recorder, logger} + reconciler = &base.LoggingReconciler{reconciler, logger} + reconciler = &base.EventingReconciler{reconciler} + reconciler = &base.InstrumentedReconciler{reconciler} + + return base.NewController(factories, clients, reconciler, logger) +} + +func (lr *LaunchReconciler) Reconcile(kluster *v1.Kluster) (requeueRequested bool, err error) { + if !(kluster.Status.Phase == models.KlusterPhaseRunning || kluster.Status.Phase == models.KlusterPhaseTerminating) { + return false, nil + } + + for _, pool := range kluster.Spec.NodePools { + _, requeue, err := lr.reconcilePool(kluster, &pool) + if err != nil { + return false, err + } + + if requeue { + requeueRequested = true + } + } + + return requeueRequested, nil +} + +func (lr *LaunchReconciler) reconcilePool(kluster *v1.Kluster, pool *models.NodePool) (status *PoolStatus, requeue bool, err error) { + + pm := lr.newNodePoolManager(kluster, pool) + status, err = pm.GetStatus() + if err != nil { + return + } + + switch { + case kluster.Status.Phase == models.KlusterPhaseTerminating: + for _, node := range status.Nodes { + requeue = true + if err = pm.DeleteNode(node); err != nil { + return + } + } + return + case status.Needed > 0: + for i := 0; i < int(status.Needed); i++ { + requeue = true + if _, err = pm.CreateNode(); err != nil { + return + } + } + return + case status.UnNeeded > 0: + for i := 0; i < int(status.UnNeeded); i++ { + requeue = true + if err = pm.DeleteNode(status.Nodes[i]); err != nil { + return + } + } + return + case status.Starting > 0: + requeue = true + case status.Stopping > 0: + requeue = true + default: + return + } + + err = pm.SetStatus(status) + return +} + +func (lr *LaunchReconciler) newNodePoolManager(kluster *v1.Kluster, pool *models.NodePool) PoolManager { + logger := log.With(lr.Logger, + "kluster", kluster.Spec.Name, + "project", kluster.Account(), + "pool", pool.Name) + + var pm PoolManager + pm = &ConcretePoolManager{lr.Clients, kluster, pool, logger} + pm = &EventingPoolManager{pm, kluster, lr.Recorder} + pm = &LoggingPoolManager{pm, logger} + + return pm +} + +func (cpm *ConcretePoolManager) GetStatus() (status *PoolStatus, err error) { + status = &PoolStatus{} + nodes, err := cpm.Clients.Openstack.GetNodes(cpm.Kluster, cpm.Pool) + if err != nil { + return status, err + } + + return &PoolStatus{ + Nodes: cpm.nodeIDs(nodes), + Running: cpm.running(nodes), + Starting: cpm.starting(nodes), + Stopping: cpm.stopping(nodes), + Needed: cpm.needed(nodes), + UnNeeded: cpm.unNeeded(nodes), + }, nil +} + +func (cpm *ConcretePoolManager) SetStatus(status *PoolStatus) error { + newInfo := models.NodePoolInfo{ + Name: cpm.Pool.Name, + Size: cpm.Pool.Size, + Running: int64(status.Running + status.Starting), + Healthy: int64(status.Running), + Schedulable: int64(status.Running), + } + + copy, err := cpm.Clients.Kubernikus.Kubernikus().Klusters(cpm.Kluster.Namespace).Get(cpm.Kluster.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + for i, curInfo := range copy.Status.NodePools { + if curInfo.Name == newInfo.Name { + if curInfo == newInfo { + return nil + } + + copy.Status.NodePools[i] = newInfo + _, err = cpm.Clients.Kubernikus.Kubernikus().Klusters(copy.Namespace).Update(copy) + return err + } + } + + return nil +} + +func (cpm *ConcretePoolManager) CreateNode() (id string, err error) { + secret, err := cpm.Clients.Kubernetes.CoreV1().Secrets(cpm.Kluster.Namespace).Get(cpm.Kluster.GetName(), metav1.GetOptions{}) + if err != nil { + return "", err + } + + userdata, err := templates.Ignition.GenerateNode(cpm.Kluster, secret) + if err != nil { + return "", err + } + + id, err = cpm.Clients.Openstack.CreateNode(cpm.Kluster, cpm.Pool, userdata) + if err != nil { + return "", err + } + + return id, nil +} + +func (cpm *ConcretePoolManager) DeleteNode(id string) (err error) { + if err = cpm.Clients.Openstack.DeleteNode(cpm.Kluster, id); err != nil { + return err + } + return nil +} + +func (cpm *ConcretePoolManager) nodeIDs(nodes []openstack.Node) []string { + result := []string{} + for _, n := range nodes { + result = append(result, n.ID) + } + return result +} + +func (cpm *ConcretePoolManager) starting(nodes []openstack.Node) int { + var count int = 0 + for _, n := range nodes { + if n.Starting() { + count = count + 1 + } + } + return count +} + +func (cpm *ConcretePoolManager) stopping(nodes []openstack.Node) int { + var count int = 0 + for _, n := range nodes { + if n.Stopping() { + count = count + 1 + } + } + return count +} + +func (cpm *ConcretePoolManager) running(nodes []openstack.Node) int { + var count int = 0 + for _, n := range nodes { + if n.Running() { + count = count + 1 + } + } + return count +} + +func (cpm *ConcretePoolManager) needed(nodes []openstack.Node) int { + needed := int(cpm.Pool.Size) - cpm.running(nodes) - cpm.starting(nodes) + if needed < 0 { + return 0 + } + return needed +} + +func (cpm ConcretePoolManager) unNeeded(nodes []openstack.Node) int { + unneeded := cpm.running(nodes) + cpm.starting(nodes) - int(cpm.Pool.Size) + if unneeded < 0 { + return 0 + } + return unneeded +} diff --git a/pkg/controller/launch/eventing.go b/pkg/controller/launch/eventing.go new file mode 100644 index 0000000000..4284044167 --- /dev/null +++ b/pkg/controller/launch/eventing.go @@ -0,0 +1,47 @@ +package launch + +import ( + api_v1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" + + "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" + "github.com/sapcc/kubernikus/pkg/controller/events" +) + +type EventingPoolManager struct { + PoolManager PoolManager + Kluster *v1.Kluster + Recorder record.EventRecorder +} + +func (epm *EventingPoolManager) GetStatus() (status *PoolStatus, err error) { + return epm.PoolManager.GetStatus() +} + +func (epm *EventingPoolManager) SetStatus(status *PoolStatus) (err error) { + return epm.PoolManager.SetStatus(status) +} + +func (epm *EventingPoolManager) CreateNode() (id string, err error) { + id, err = epm.PoolManager.CreateNode() + + if err != nil { + epm.Recorder.Eventf(epm.Kluster, api_v1.EventTypeNormal, events.SuccessfullCreateNode, "Successfully created node %v", id) + } else { + epm.Recorder.Eventf(epm.Kluster, api_v1.EventTypeWarning, events.FailedCreateNode, "Failed to created node: ", err) + } + + return id, err +} + +func (epm *EventingPoolManager) DeleteNode(id string) (err error) { + err = epm.PoolManager.DeleteNode(id) + + if err != nil { + epm.Recorder.Eventf(epm.Kluster, api_v1.EventTypeNormal, events.SuccessfullDeleteNode, "Successfully deleted node %v", id) + } else { + epm.Recorder.Eventf(epm.Kluster, api_v1.EventTypeWarning, events.FailedCreateNode, "Failed to delete node: ", err) + } + + return +} diff --git a/pkg/controller/launch/logging.go b/pkg/controller/launch/logging.go new file mode 100644 index 0000000000..5721531142 --- /dev/null +++ b/pkg/controller/launch/logging.go @@ -0,0 +1,68 @@ +package launch + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type LoggingPoolManager struct { + PoolManager PoolManager + Logger log.Logger +} + +func (npm *LoggingPoolManager) GetStatus() (status *PoolStatus, err error) { + defer func(begin time.Time) { + npm.Logger.Log( + "msg", "read status", + "running", status.Running, + "starting", status.Starting, + "stopping", status.Stopping, + "needed", status.Needed, + "unneeded", status.UnNeeded, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return npm.PoolManager.GetStatus() +} + +func (npm *LoggingPoolManager) SetStatus(status *PoolStatus) (err error) { + defer func(begin time.Time) { + npm.Logger.Log( + "msg", "wrote node pool status", + "running", status.Running, + "starting", status.Starting, + "stopping", status.Stopping, + "needed", status.Needed, + "unneeded", status.UnNeeded, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return npm.PoolManager.SetStatus(status) +} + +func (npm *LoggingPoolManager) CreateNode() (id string, err error) { + defer func(begin time.Time) { + npm.Logger.Log( + "msg", "created node", + "node", id, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return npm.PoolManager.CreateNode() +} + +func (npm *LoggingPoolManager) DeleteNode(id string) (err error) { + defer func(begin time.Time) { + npm.Logger.Log( + "msg", "deleted node", + "node", id, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return npm.PoolManager.DeleteNode(id) +} diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index aa1f0a41fd..26bc023e3d 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/go-kit/kit/log" "github.com/golang/glog" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/meta" @@ -19,7 +20,6 @@ import ( api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/helm/pkg/helm" "github.com/sapcc/kubernikus/pkg/apis/kubernikus/v1" helmutil "github.com/sapcc/kubernikus/pkg/client/helm" @@ -27,6 +27,7 @@ import ( "github.com/sapcc/kubernikus/pkg/client/kubernikus" "github.com/sapcc/kubernikus/pkg/client/openstack" "github.com/sapcc/kubernikus/pkg/controller/config" + "github.com/sapcc/kubernikus/pkg/controller/launch" kubernikus_clientset "github.com/sapcc/kubernikus/pkg/generated/clientset" kubernikus_informers "github.com/sapcc/kubernikus/pkg/generated/informers/externalversions" kubernikus_informers_v1 "github.com/sapcc/kubernikus/pkg/generated/informers/externalversions/kubernikus/v1" @@ -52,25 +53,15 @@ type KubernikusOperatorOptions struct { Namespace string Controllers []string MetricPort int -} - -type Clients struct { - Kubernikus kubernikus_clientset.Interface - Kubernetes kubernetes_clientset.Interface - Satellites *kube.SharedClientFactory - Openstack openstack.Client - Helm *helm.Client -} -type Factories struct { - Kubernikus kubernikus_informers.SharedInformerFactory - Kubernetes kubernetes_informers.SharedInformerFactory + logger log.Logger } type KubernikusOperator struct { - Clients config.Config - Factories + config.Factories + config.Clients + logger log.Logger } const ( @@ -86,7 +77,7 @@ var ( } ) -func NewKubernikusOperator(options *KubernikusOperatorOptions) *KubernikusOperator { +func NewKubernikusOperator(options *KubernikusOperatorOptions, logger log.Logger) *KubernikusOperator { var err error o := &KubernikusOperator{ @@ -109,6 +100,7 @@ func NewKubernikusOperator(options *KubernikusOperatorOptions) *KubernikusOperat Controllers: make(map[string]config.Controller), }, }, + logger: logger, } o.Clients.Kubernetes, err = kube.NewClient(options.KubeConfig, options.Context) @@ -171,7 +163,7 @@ func NewKubernikusOperator(options *KubernikusOperatorOptions) *KubernikusOperat case "groundctl": o.Config.Kubernikus.Controllers["groundctl"] = NewGroundController(o.Factories, o.Clients, recorder, o.Config) case "launchctl": - o.Config.Kubernikus.Controllers["launchctl"] = NewLaunchController(o.Factories, o.Clients) + o.Config.Kubernikus.Controllers["launchctl"] = launch.NewController(o.Factories, o.Clients, recorder, logger) } }