Skip to content

Commit

Permalink
Merge pull request #260 from cybozu-go/watch-pod
Browse files Browse the repository at this point in the history
Watch Pod deletion and notify the cluster manager
  • Loading branch information
ymmt2005 authored Jun 3, 2021
2 parents a3c2b0f + 86ca69d commit 0cddb66
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 41 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [0.9.4] - 2021-06-03

### Fixed
- Automatic switchover did not take place immediately (#260)

## [0.9.3] - 2021-06-02

### Fixed
Expand Down Expand Up @@ -185,7 +190,8 @@ The `MySQLCluster` created by MOCO `< v0.5.0` has no compatibility with `>= v0.5

- Bootstrap a vanilla MySQL cluster with no replicas (#2).

[Unreleased]: https://github.com/cybozu-go/moco/compare/v0.9.3...HEAD
[Unreleased]: https://github.com/cybozu-go/moco/compare/v0.9.4...HEAD
[0.9.4]: https://github.com/cybozu-go/moco/compare/v0.9.3...v0.9.4
[0.9.3]: https://github.com/cybozu-go/moco/compare/v0.9.2...v0.9.3
[0.9.2]: https://github.com/cybozu-go/moco/compare/v0.9.1...v0.9.2
[0.9.1]: https://github.com/cybozu-go/moco/compare/v0.9.0...v0.9.1
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CTRL_RUNTIME_VERSION := $(shell awk '/sigs.k8s.io\/controller-runtime/ {print su
KUSTOMIZE_VERSION = 4.1.3
CRD_TO_MARKDOWN_VERSION = 0.0.3
MYSQLSH_VERSION = 8.0.25-1
MDBOOK_VERSION = 0.4.8
MDBOOK_VERSION = 0.4.9
OS_VERSION := $(shell . /etc/os-release; echo $$VERSION_ID)

# Test tools
Expand Down
12 changes: 12 additions & 0 deletions clustering/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// This interface is meant to be used by MySQLClusterReconciler.
type ClusterManager interface {
Update(context.Context, types.NamespacedName)
UpdateNoStart(context.Context, types.NamespacedName)
Stop(types.NamespacedName)
StopAll()
}
Expand Down Expand Up @@ -59,6 +60,14 @@ type clusterManager struct {
}

func (m *clusterManager) Update(ctx context.Context, name types.NamespacedName) {
m.update(ctx, name, false)
}

func (m *clusterManager) UpdateNoStart(ctx context.Context, name types.NamespacedName) {
m.update(ctx, name, true)
}

func (m *clusterManager) update(ctx context.Context, name types.NamespacedName, noStart bool) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -68,6 +77,9 @@ func (m *clusterManager) Update(ctx context.Context, name types.NamespacedName)
p.Update()
return
}
if noStart {
return
}

ctx, cancel := context.WithCancel(ctx)

Expand Down
8 changes: 8 additions & 0 deletions cmd/moco-controller/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func subMain(ns, addr string, port int) error {
return err
}

if err = (&controllers.PodWatcher{
Client: mgr.GetClient(),
ClusterManager: clusterMgr,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodWatcher")
return err
}

if err = (&mocov1beta1.MySQLCluster{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup webhook", "webhook", "MySQLCluster")
return err
Expand Down
63 changes: 63 additions & 0 deletions controllers/mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package controllers

import (
"context"
"sync"

"github.com/cybozu-go/moco/clustering"
"k8s.io/apimachinery/pkg/types"
)

type mockManager struct {
mu sync.Mutex
clusters map[string]struct{}
updated []types.NamespacedName
}

var _ clustering.ClusterManager = &mockManager{}

func (m *mockManager) Update(ctx context.Context, key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

m.clusters[key.String()] = struct{}{}
}

func (m *mockManager) UpdateNoStart(ctx context.Context, key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

m.updated = append(m.updated, key)
}

func (m *mockManager) Stop(key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.clusters, key.String())
}

func (m *mockManager) StopAll() {}

func (m *mockManager) getKeys() map[string]bool {
m.mu.Lock()
defer m.mu.Unlock()

keys := make(map[string]bool)
for k := range m.clusters {
keys[k] = true
}
return keys
}

func (m *mockManager) isUpdated(key types.NamespacedName) bool {
m.mu.Lock()
defer m.mu.Unlock()

for _, k := range m.updated {
if k.Namespace == key.Namespace && k.Name == key.Name {
return true
}
}
return false
}
37 changes: 0 additions & 37 deletions controllers/mysqlcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

mocov1beta1 "github.com/cybozu-go/moco/api/v1beta1"
"github.com/cybozu-go/moco/clustering"
"github.com/cybozu-go/moco/pkg/constants"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -21,7 +19,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,40 +27,6 @@ import (
. "github.com/onsi/gomega"
)

type mockManager struct {
mu sync.Mutex
clusters map[string]struct{}
}

var _ clustering.ClusterManager = &mockManager{}

func (m *mockManager) Update(ctx context.Context, key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

m.clusters[key.String()] = struct{}{}
}

func (m *mockManager) Stop(key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.clusters, key.String())
}

func (m *mockManager) StopAll() {}

func (m *mockManager) getKeys() map[string]bool {
m.mu.Lock()
defer m.mu.Unlock()

keys := make(map[string]bool)
for k := range m.clusters {
keys[k] = true
}
return keys
}

const (
testMocoSystemNamespace = "moco-system"
testAgentImage = "foobar:123"
Expand Down
86 changes: 86 additions & 0 deletions controllers/pod_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package controllers

import (
"context"

mocov1beta1 "github.com/cybozu-go/moco/api/v1beta1"
"github.com/cybozu-go/moco/clustering"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

// PodWatcher Watched MySQL pods and informs the cluster manager of the event.
type PodWatcher struct {
client.Client
ClusterManager clustering.ClusterManager
}

//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch

// Reconcile implements Reconciler interface.
// See https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile#Reconciler
func (r *PodWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := crlog.FromContext(ctx)

pod := &corev1.Pod{}
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if pod.DeletionTimestamp == nil {
return ctrl.Result{}, nil
}

ref := metav1.GetControllerOfNoCopy(pod)
if ref == nil {
return ctrl.Result{}, nil
}
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
//lint:ignore nilerr intentional
return ctrl.Result{}, nil
}
if ref.Kind != "StatefulSet" || refGV.Group != appsv1.GroupName {
return ctrl.Result{}, nil
}

sts := &appsv1.StatefulSet{}
if err := r.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: ref.Name}, sts); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

ref = metav1.GetControllerOfNoCopy(sts)
if ref == nil {
return ctrl.Result{}, nil
}
refGV, err = schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
//lint:ignore nilerr intentional
return ctrl.Result{}, nil
}
if ref.Kind != "MySQLCluster" || refGV.Group != mocov1beta1.GroupVersion.Group {
return ctrl.Result{}, nil
}

log.Info("detected mysql pod deletion", "name", pod.Name)
r.ClusterManager.UpdateNoStart(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: ref.Name})
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *PodWatcher) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithOptions(
controller.Options{MaxConcurrentReconciles: 8},
).
Complete(r)
}
Loading

0 comments on commit 0cddb66

Please sign in to comment.