From 01710f959c2fbb8244e7f713a7e1d29602a52bfe Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 16 Oct 2024 12:35:39 +0545 Subject: [PATCH] feat: use client-go leader election (#1128) * feat: use client-go leader election * chore: allow passing in the namespace for leader election * feat: set leader label and remove from other replicas on lead * fix: select other leader replicas by the label not by parent replica set * fix: run elector again after failure * feat: stop registered crons * feat: manage all crons from elector --- leader/election.go | 262 +++++++++++++++++++++++++++++---------------- start.go | 3 + 2 files changed, 170 insertions(+), 95 deletions(-) diff --git a/leader/election.go b/leader/election.go index 962328ef..2524da9b 100644 --- a/leader/election.go +++ b/leader/election.go @@ -1,139 +1,211 @@ package leader import ( + gocontext "context" + "errors" "fmt" "log" - "math/rand" "os" - "sync/atomic" + "strings" "time" - "github.com/flanksource/duty/context" "github.com/samber/lo" - - v1 "k8s.io/api/coordination/v1" + "github.com/sethvargo/go-retry" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/echo" + "github.com/flanksource/duty/shutdown" ) var ( - identity = getHostname() + hostname string + podNamespace string ) -var isLeader *atomic.Bool +const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + +func getPodNamespace() (string, error) { + // passed using K8s downwards API + if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok { + return ns, nil + } + + data, err := os.ReadFile(namespaceFilePath) + if err != nil { + return "", fmt.Errorf("failed to read namespace file: %w", err) + } + + ns := strings.TrimSpace(string(data)) + if ns == "" { + return "", errors.New("namespace was neither found in the env nor in the service account path") + } + + return ns, nil +} -func getHostname() string { - hostname, err := os.Hostname() +func init() { + var err error + hostname, err = os.Hostname() if err != nil { - log.Fatalf("Failed to get hostname: %v", err) + log.Fatalf("failed to get hostname: %v", err) + } + + // To test locally + if v, ok := os.LookupEnv("MC_HOSTNAME_OVERRIDE"); ok { + logger.Infof("hostname overriden by MC_HOSTNAME_OVERRIDE: %s", v) + hostname = v + } + + if n, err := getPodNamespace(); err == nil { + podNamespace = n } - return hostname } -var watchers = []func(isLeader bool){} +func Register( + ctx context.Context, + app string, + namespace string, + onLead func(ctx gocontext.Context), + onStoppedLead func(), + onNewLeader func(identity string), +) error { + if namespace == "" { + namespace = podNamespace + } -func OnElection(ctx context.Context, leaseName string, fn func(isLeader bool)) { - watchers = append(watchers, fn) + ctx = ctx.WithNamespace(namespace) + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: app, + Namespace: namespace, + }, + Client: ctx.Kubernetes().CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: hostname, + }, + } - if isLeader == nil { - leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute) - isLeader = new(atomic.Bool) - go func() { - for { + electionConfig := leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: ctx.Properties().Duration("leader.lease.duration", 30*time.Second), + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(leadCtx gocontext.Context) { + ctx.Infof("started leading election") - leader, _, err := createOrUpdateLease(ctx, leaseName, 0) + updateLeaderLabel(ctx, app) - if err != nil { - ctx.Warnf("failed to create/update lease: %v", err) - time.Sleep(time.Duration(rand.Intn(60)) * time.Second) - continue + for entry := range echo.Crons.IterBuffered() { + entry.Val.Start() } - if isLeader.Load() != leader { - isLeader.Store(leader) - notify(leader) + if onLead != nil { + onLead(leadCtx) } + }, + OnStoppedLeading: func() { + ctx.Infof("stopped leading election") - // sleep for just under half the lease duration before trying to renew - time.Sleep(leaseDuration/2 - time.Second*10) - } - }() - } + for entry := range echo.Crons.IterBuffered() { + entry.Val.Stop() + } -} + if onStoppedLead != nil { + onStoppedLead() + } + }, + OnNewLeader: func(identity string) { + if identity == hostname { + return + } -func notify(isLeader bool) { - for _, fn := range watchers { - fn(isLeader) + if onNewLeader != nil { + onNewLeader(identity) + } + }, + }, } -} - -func IsLeader(ctx context.Context, leaseName string) (bool, error) { - leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace()) - - lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{}) + elector, err := leaderelection.NewLeaderElector(electionConfig) if err != nil { - return false, err + return err } - if *lease.Spec.HolderIdentity == identity { - return true, nil - } + leaderContext, cancel := gocontext.WithCancel(ctx) + shutdown.AddHook(func() { + cancel() + + // give the elector some time to release the lease + time.Sleep(time.Second * 2) + }) + + go func() { + // when a network failure occurs for a considerable amount of time (>30s) + // elector.Run terminates and never retries acquiring the lease. + // + // that's why it's run in a never ending loop + for { + select { + case <-leaderContext.Done(): + return + default: + elector.Run(leaderContext) + } + } + }() + <-ctx.Done() - return false, nil + return nil } -func createOrUpdateLease(ctx context.Context, leaseName string, attempt int) (bool, string, error) { - if attempt > 0 { - time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond) - } - if attempt >= ctx.Properties().Int("leader.lease.attempts", 3) { - return false, "", fmt.Errorf("failed to acquire lease %s after %d attempts", leaseName, attempt) - } - now := metav1.MicroTime{Time: time.Now()} - leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace()) - leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute) - lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{}) - if err != nil { - return false, "", err - } - if lease == nil { - ctx.Infof("Acquiring lease %s", leaseName) - lease = &v1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: leaseName, - Namespace: ctx.GetNamespace(), - }, - Spec: v1.LeaseSpec{ - HolderIdentity: &identity, - LeaseDurationSeconds: lo.ToPtr(int32(leaseDuration.Seconds())), - AcquireTime: &now, - RenewTime: &now, - }, - } - _, err = leases.Create(ctx, lease, metav1.CreateOptions{}) +// updateLeaderLabel sets leader:true label on the current pod +// and also removes that label from all other replicas. +func updateLeaderLabel(ctx context.Context, app string) { + backoff := retry.WithMaxRetries(3, retry.NewExponential(time.Second)) + err := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error { + labelSelector := fmt.Sprintf("%s/leader=true", app) + podList, err := ctx.Kubernetes().CoreV1().Pods(ctx.GetNamespace()).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) if err != nil { - return false, "", err + return retry.RetryableError(fmt.Errorf("failed to list pods with labelSelector(%s): %w", labelSelector, err)) } - return true, identity, nil - } - if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == identity { - lease.Spec.RenewTime = &now - ctx.Debugf("Renewing lease %s : %s", leaseName, now.String()) - _, err = leases.Update(ctx, lease, metav1.UpdateOptions{}) - if err != nil { - return false, "", err - } - } - renewTime := lease.Spec.RenewTime.Time - if time.Since(renewTime) > leaseDuration { - ctx.Infof("Lease %s held by %s expired", leaseName, *lease.Spec.HolderIdentity) - if err := leases.Delete(ctx, leaseName, metav1.DeleteOptions{}); err != nil { - ctx.Infof("failed to delete leases %s: %v", leaseName, err) + pods := lo.Map(podList.Items, func(p corev1.Pod, _ int) string { return p.Name }) + pods = append(pods, hostname) + + for _, podName := range lo.Uniq(pods) { + var payload string + if podName == hostname { + ctx.Infof("adding leader metadata to pod: %s", podName) + payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader":"true"}}}`, app) + } else { + ctx.Infof("removing leader metadata to pod: %s", podName) + payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader": null}}}`, app) + } + + _, err := ctx.Kubernetes().CoreV1().Pods(ctx.GetNamespace()).Patch(ctx, + podName, + types.MergePatchType, + []byte(payload), + metav1.PatchOptions{}) + if err != nil { + return retry.RetryableError(err) + } } - return createOrUpdateLease(ctx, leaseName, attempt+1) + + return nil + }) + if err != nil { + ctx.Errorf("failed to set label: %v", err) } - ctx.Debugf("Lease %s already held by %s, expires in %s", leaseName, *lease.Spec.HolderIdentity, time.Until(renewTime.Add(leaseDuration)).String()) - return false, *lease.Spec.HolderIdentity, nil } diff --git a/start.go b/start.go index 6c522e18..284d252a 100644 --- a/start.go +++ b/start.go @@ -156,6 +156,9 @@ func Start(name string, opts ...StartOption) (context.Context, func(), error) { return context.Context{}, stop, err } else { ctx = *c + stop = func() { + c.Pool().Close() + } } }