From 4bd99b7a2011bcb5b13123088a793d886002da66 Mon Sep 17 00:00:00 2001 From: Mikhail Scherba <41360396+miklezzzz@users.noreply.github.com> Date: Tue, 5 Mar 2024 15:35:20 +0400 Subject: [PATCH] HA mode (#454) Signed-off-by: Mikhail Scherba --- Dockerfile | 1 + cmd/addon-operator/main.go | 160 +++++++++++++++--- .../001-startup-global/addon-operator-cm.yaml | 7 + .../addon-operator-rbac.yaml | 6 +- .../Dockerfile | 3 + .../README.md | 43 +++++ .../addon-operator-cm.yaml | 7 + .../addon-operator-deployment.yaml | 71 ++++++++ .../addon-operator-rbac.yaml | 34 ++++ .../global-hooks/hook.sh | 7 + .../modules/README.md | 1 + pkg/addon-operator/http_server.go | 65 +++++-- pkg/addon-operator/operator.go | 23 ++- 13 files changed, 384 insertions(+), 44 deletions(-) create mode 100644 examples/001-startup-global/addon-operator-cm.yaml create mode 100644 examples/002-startup-global-high-availability/Dockerfile create mode 100644 examples/002-startup-global-high-availability/README.md create mode 100644 examples/002-startup-global-high-availability/addon-operator-cm.yaml create mode 100644 examples/002-startup-global-high-availability/addon-operator-deployment.yaml create mode 100644 examples/002-startup-global-high-availability/addon-operator-rbac.yaml create mode 100755 examples/002-startup-global-high-availability/global-hooks/hook.sh create mode 100644 examples/002-startup-global-high-availability/modules/README.md diff --git a/Dockerfile b/Dockerfile index 89dc402b..e9ab9d09 100644 --- a/Dockerfile +++ b/Dockerfile @@ -51,6 +51,7 @@ COPY --from=builder /app/shell-operator-clone/frameworks/shell/ /framework/shell COPY --from=builder /app/shell-operator-clone/shell_lib.sh / WORKDIR / +RUN mkdir /global-hooks /modules ENV MODULES_DIR /modules ENV GLOBAL_HOOKS_DIR /global-hooks ENTRYPOINT ["/sbin/tini", "--", "/addon-operator"] diff --git a/cmd/addon-operator/main.go b/cmd/addon-operator/main.go index d42690cb..71eb20c4 100644 --- a/cmd/addon-operator/main.go +++ b/cmd/addon-operator/main.go @@ -5,10 +5,15 @@ import ( "fmt" "math/rand" "os" + "strings" + "syscall" "time" log "github.com/sirupsen/logrus" "gopkg.in/alecthomas/kingpin.v2" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" addon_operator "github.com/flant/addon-operator/pkg/addon-operator" "github.com/flant/addon-operator/pkg/app" @@ -20,6 +25,13 @@ import ( utils_signal "github.com/flant/shell-operator/pkg/utils/signal" ) +const ( + leaseName = "addon-operator-leader-election" + leaseDuration = 35 + renewalDeadline = 30 + retryPeriod = 10 +) + func main() { kpApp := kingpin.New(app.AppName, fmt.Sprintf("%s %s: %s", app.AppName, app.Version, app.AppDescription)) @@ -41,41 +53,133 @@ func main() { // start main loop startCmd := kpApp.Command("start", "Start events processing."). Default(). - Action(func(c *kingpin.ParseContext) error { - sh_app.AppStartMessage = fmt.Sprintf("%s %s, shell-operator %s", app.AppName, app.Version, sh_app.Version) + Action(start) - // Init rand generator. - rand.Seed(time.Now().UnixNano()) + app.DefineStartCommandFlags(kpApp, startCmd) - operator := addon_operator.NewAddonOperator(context.Background()) + debug.DefineDebugCommands(kpApp) + app.DefineDebugCommands(kpApp) - bk := configmap.New(log.StandardLogger(), operator.KubeClient(), app.Namespace, app.ConfigMapName) - operator.SetupKubeConfigManager(bk) + kingpin.MustParse(kpApp.Parse(os.Args[1:])) +} - err := operator.Setup() - if err != nil { - fmt.Printf("Setup is failed: %s\n", err) - os.Exit(1) - } +func start(_ *kingpin.ParseContext) error { + sh_app.AppStartMessage = fmt.Sprintf("%s %s, shell-operator %s", app.AppName, app.Version, sh_app.Version) - err = operator.Start() - if err != nil { - fmt.Printf("Start is failed: %s\n", err) - os.Exit(1) - } + // Init rand generator. + rand.Seed(time.Now().UnixNano()) - // Block action by waiting signals from OS. - utils_signal.WaitForProcessInterruption(func() { - operator.Stop() - os.Exit(1) - }) + ctx := context.Background() - return nil - }) - app.DefineStartCommandFlags(kpApp, startCmd) + operator := addon_operator.NewAddonOperator(ctx) - debug.DefineDebugCommands(kpApp) - app.DefineDebugCommands(kpApp) + operator.StartAPIServer() - kingpin.MustParse(kpApp.Parse(os.Args[1:])) + if os.Getenv("ADDON_OPERATOR_HA") == "true" { + log.Info("Addon-operator is starting in HA mode") + runHAMode(ctx, operator) + return nil + } + + err := run(ctx, operator) + if err != nil { + log.Error(err) + os.Exit(1) + } + + return nil +} + +func run(_ context.Context, operator *addon_operator.AddonOperator) error { + bk := configmap.New(log.StandardLogger(), operator.KubeClient(), app.Namespace, app.ConfigMapName) + operator.SetupKubeConfigManager(bk) + + err := operator.Setup() + if err != nil { + fmt.Printf("Setup is failed: %s\n", err) + os.Exit(1) + } + + err = operator.Start() + if err != nil { + fmt.Printf("Start is failed: %s\n", err) + os.Exit(1) + } + + // Block action by waiting signals from OS. + utils_signal.WaitForProcessInterruption(func() { + operator.Stop() + os.Exit(1) + }) + + return nil +} + +func runHAMode(ctx context.Context, operator *addon_operator.AddonOperator) { + podName := os.Getenv("ADDON_OPERATOR_POD") + if len(podName) == 0 { + log.Info("ADDON_OPERATOR_POD env not set or empty") + os.Exit(1) + } + + podIP := os.Getenv("ADDON_OPERATOR_LISTEN_ADDRESS") + if len(podIP) == 0 { + log.Info("ADDON_OPERATOR_LISTEN_ADDRESS env not set or empty") + os.Exit(1) + } + + podNs := os.Getenv("ADDON_OPERATOR_NAMESPACE") + if len(podNs) == 0 { + log.Info("ADDON_OPERATOR_NAMESPACE env not set or empty") + os.Exit(1) + } + + identity := fmt.Sprintf("%s.%s.%s.pod", podName, strings.ReplaceAll(podIP, ".", "-"), podNs) + + err := operator.WithLeaderElector(&leaderelection.LeaderElectionConfig{ + // Create a leaderElectionConfig for leader election + Lock: &resourcelock.LeaseLock{ + LeaseMeta: v1.ObjectMeta{ + Name: leaseName, + Namespace: podNs, + }, + Client: operator.KubeClient().CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: identity, + }, + }, + LeaseDuration: time.Duration(leaseDuration) * time.Second, + RenewDeadline: time.Duration(renewalDeadline) * time.Second, + RetryPeriod: time.Duration(retryPeriod) * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + err := run(ctx, operator) + if err != nil { + log.Info(err) + os.Exit(1) + } + }, + OnStoppedLeading: func() { + log.Info("Restarting because the leadership was handed over") + operator.Stop() + os.Exit(1) + }, + }, + ReleaseOnCancel: true, + }) + if err != nil { + log.Error(err) + } + + go func() { + <-ctx.Done() + log.Info("Context canceled received") + err := syscall.Kill(1, syscall.SIGUSR2) + if err != nil { + log.Infof("Couldn't shutdown addon-operator: %s\n", err) + os.Exit(1) + } + }() + + operator.LeaderElector.Run(ctx) } diff --git a/examples/001-startup-global/addon-operator-cm.yaml b/examples/001-startup-global/addon-operator-cm.yaml new file mode 100644 index 00000000..ce7d011e --- /dev/null +++ b/examples/001-startup-global/addon-operator-cm.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: addon-operator +data: + global: "" diff --git a/examples/001-startup-global/addon-operator-rbac.yaml b/examples/001-startup-global/addon-operator-rbac.yaml index 42c8077c..19ff5126 100644 --- a/examples/001-startup-global/addon-operator-rbac.yaml +++ b/examples/001-startup-global/addon-operator-rbac.yaml @@ -4,7 +4,7 @@ kind: ServiceAccount metadata: name: addon-operator-acc --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: addon-operator @@ -20,7 +20,7 @@ rules: verbs: - "*" --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: addon-operator @@ -31,4 +31,4 @@ roleRef: subjects: - kind: ServiceAccount name: addon-operator-acc - namespace: example-startup-global + namespace: default diff --git a/examples/002-startup-global-high-availability/Dockerfile b/examples/002-startup-global-high-availability/Dockerfile new file mode 100644 index 00000000..3ef97cc0 --- /dev/null +++ b/examples/002-startup-global-high-availability/Dockerfile @@ -0,0 +1,3 @@ +FROM flant/addon-operator:latest +ADD modules /modules +ADD global-hooks /global-hooks diff --git a/examples/002-startup-global-high-availability/README.md b/examples/002-startup-global-high-availability/README.md new file mode 100644 index 00000000..70c5a3bd --- /dev/null +++ b/examples/002-startup-global-high-availability/README.md @@ -0,0 +1,43 @@ +## onStartup global hooks example + +Example of a global hook written as bash script. + +### run + +Build addon-operator image with custom scripts: + +``` +docker build -t "registry.mycompany.com/addon-operator:startup-global" . +docker push registry.mycompany.com/addon-operator:startup-global +``` + +Edit image in addon-operator-pod.yaml and apply manifests: + +``` +kubectl create ns example-startup-global +kubectl -n example-startup-global apply -f addon-operator-rbac.yaml +kubectl -n example-startup-global apply -f addon-operator-pod.yaml +``` + +See in logs that hook.sh was run at startup: + +``` +kubectl -n example-startup-global logs pod/addon-operator -f +... +INFO : Initializing global hooks ... +INFO : INIT: global hook 'hook.sh' ... +... +INFO : TASK_RUN GlobalHookRun@ON_STARTUP hook.sh +INFO : Running global hook 'hook.sh' binding 'ON_STARTUP' ... +OnStartup global hook +... +``` + +### cleanup + +``` +kubectl delete clusterrolebinding/addon-operator +kubectl delete clusterrole/addon-operator +kubectl delete ns/example-startup-global +docker rmi registry.mycompany.com/addon-operator:startup-global +``` diff --git a/examples/002-startup-global-high-availability/addon-operator-cm.yaml b/examples/002-startup-global-high-availability/addon-operator-cm.yaml new file mode 100644 index 00000000..ce7d011e --- /dev/null +++ b/examples/002-startup-global-high-availability/addon-operator-cm.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: addon-operator +data: + global: "" diff --git a/examples/002-startup-global-high-availability/addon-operator-deployment.yaml b/examples/002-startup-global-high-availability/addon-operator-deployment.yaml new file mode 100644 index 00000000..a48bfeaa --- /dev/null +++ b/examples/002-startup-global-high-availability/addon-operator-deployment.yaml @@ -0,0 +1,71 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + annotations: + name: addon-operator +spec: + replicas: 2 + selector: + matchLabels: + app: addon-operator + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: addon-operator + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - addon-operator + topologyKey: kubernetes.io/hostname + weight: 100 + containers: + - env: + - name: ADDON_OPERATOR_POD + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: ADDON_OPERATOR_HA + value: "true" + - name: ADDON_OPERATOR_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: ADDON_OPERATOR_LISTEN_ADDRESS + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + image: registry.mycompany.com/addon-operator:ha + imagePullPolicy: IfNotPresent + name: addon-operator + readinessProbe: + httpGet: + path: /readyz + port: 9650 + scheme: HTTP + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 1 + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + dnsPolicy: ClusterFirst + restartPolicy: Always + schedulerName: default-scheduler + serviceAccount: addon-operator-acc + serviceAccountName: addon-operator-acc + terminationGracePeriodSeconds: 30 diff --git a/examples/002-startup-global-high-availability/addon-operator-rbac.yaml b/examples/002-startup-global-high-availability/addon-operator-rbac.yaml new file mode 100644 index 00000000..19ff5126 --- /dev/null +++ b/examples/002-startup-global-high-availability/addon-operator-rbac.yaml @@ -0,0 +1,34 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: addon-operator-acc +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: addon-operator +rules: +- apiGroups: + - "*" + resources: + - "*" + verbs: + - "*" +- nonResourceURLs: + - "*" + verbs: + - "*" +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: addon-operator +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: addon-operator +subjects: + - kind: ServiceAccount + name: addon-operator-acc + namespace: default diff --git a/examples/002-startup-global-high-availability/global-hooks/hook.sh b/examples/002-startup-global-high-availability/global-hooks/hook.sh new file mode 100755 index 00000000..87f3eb9a --- /dev/null +++ b/examples/002-startup-global-high-availability/global-hooks/hook.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +if [[ $1 == "--config" ]] ; then + echo '{"configVersion":"v1", "onStartup": 1}' +else + echo "OnStartup global shell hook" +fi diff --git a/examples/002-startup-global-high-availability/modules/README.md b/examples/002-startup-global-high-availability/modules/README.md new file mode 100644 index 00000000..5106fe69 --- /dev/null +++ b/examples/002-startup-global-high-availability/modules/README.md @@ -0,0 +1 @@ +> FIXME: `modules` directory is required even if only global-hooks are used diff --git a/pkg/addon-operator/http_server.go b/pkg/addon-operator/http_server.go index 2df9f91e..f0aec37f 100644 --- a/pkg/addon-operator/http_server.go +++ b/pkg/addon-operator/http_server.go @@ -1,14 +1,69 @@ package addon_operator import ( + "context" "fmt" "net/http" "strings" + "time" "github.com/flant/addon-operator/pkg/addon-operator/converge" "github.com/flant/addon-operator/pkg/app" ) +func (op *AddonOperator) registerReadyzRoute() { + op.engine.APIServer.RegisterRoute(http.MethodGet, "/readyz", func(w http.ResponseWriter, request *http.Request) { + // check if ha mode is enabled and current instance isn't the leader - return ok so as not to spam with failed readiness probes + if op.LeaderElector != nil { + if op.LeaderElector.IsLeader() { + if op.IsStartupConvergeDone() { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("Startup converge done.\n")) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("Startup converge in progress\n")) + } + } else if leader := op.LeaderElector.GetLeader(); len(leader) > 0 { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/readyz", leader, app.ListenPort), nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("HA mode is enabled but couldn't craft a request to the leader\n")) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("HA mode is enabled but couldn't send a request to the leader\n")) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("HA mode is enabled but the leader's status response code isn't OK\n")) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("HA mode is enabled and waiting for acquiring the lock.\n")) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("HA mode is enabled but something went wrong\n")) + } + } else { + if op.IsStartupConvergeDone() { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("Startup converge done.\n")) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("Startup converge in progress\n")) + } + } + }) +} + func (op *AddonOperator) registerDefaultRoutes() { op.engine.APIServer.RegisterRoute(http.MethodGet, "/", func(writer http.ResponseWriter, request *http.Request) { _, _ = writer.Write([]byte(fmt.Sprintf(` @@ -31,16 +86,6 @@ func (op *AddonOperator) registerDefaultRoutes() { writer.WriteHeader(http.StatusOK) }) - op.engine.APIServer.RegisterRoute(http.MethodGet, "/readyz", func(w http.ResponseWriter, request *http.Request) { - if op.IsStartupConvergeDone() { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("Startup converge done.\n")) - } else { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte("Startup converge in progress\n")) - } - }) - op.engine.APIServer.RegisterRoute(http.MethodGet, "/status/converge", func(writer http.ResponseWriter, request *http.Request) { convergeTasks := ConvergeTasksInQueue(op.engine.TaskQueues.GetMain()) diff --git a/pkg/addon-operator/operator.go b/pkg/addon-operator/operator.go index f3c12e72..78c69751 100644 --- a/pkg/addon-operator/operator.go +++ b/pkg/addon-operator/operator.go @@ -10,6 +10,7 @@ import ( "github.com/gofrs/uuid/v5" log "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/leaderelection" "github.com/flant/addon-operator/pkg/addon-operator/converge" "github.com/flant/addon-operator/pkg/app" @@ -69,6 +70,9 @@ type AddonOperator struct { // AdmissionServer handles validation and mutation admission webhooks AdmissionServer *AdmissionServer + + // LeaderElector represents leaderelection client for HA mode + LeaderElector *leaderelection.LeaderElector } func NewAddonOperator(ctx context.Context) *AddonOperator { @@ -107,6 +111,16 @@ func NewAddonOperator(ctx context.Context) *AddonOperator { return ao } +func (op *AddonOperator) WithLeaderElector(config *leaderelection.LeaderElectionConfig) error { + var err error + op.LeaderElector, err = leaderelection.NewLeaderElector(*config) + if err != nil { + return err + } + + return nil +} + func (op *AddonOperator) Setup() error { // Helm client factory. helmClient, err := helm.InitHelmClientFactory(op.engine.KubeClient) @@ -150,9 +164,6 @@ func (op *AddonOperator) Start() error { return err } - // start http server with metrics - op.engine.APIServer.Start(op.ctx) - log.Info("Start first converge for modules") // Loading the onStartup hooks into the queue and running all modules. // Turning tracking changes on only after startup ends. @@ -188,6 +199,12 @@ func (op *AddonOperator) Stop() { } } +func (op *AddonOperator) StartAPIServer() { + // start http server with metrics + op.engine.APIServer.Start(op.ctx) + op.registerReadyzRoute() +} + // KubeClient returns default common kubernetes client initialized by shell-operator func (op *AddonOperator) KubeClient() *client.Client { return op.engine.KubeClient