Skip to content

Commit

Permalink
feat: use client-go leader election (#1128)
Browse files Browse the repository at this point in the history
* feat: use client-go leader election

* chore: allow passing in the namespace for leader election

* feat: set leader label and remove from other replicas on lead

* fix: select other leader replicas by the label not by parent replica set

* fix: run elector again after failure

* feat: stop registered crons

* feat: manage all crons from elector
  • Loading branch information
adityathebe authored Oct 16, 2024
1 parent 48bc164 commit 01710f9
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 95 deletions.
262 changes: 167 additions & 95 deletions leader/election.go
Original file line number Diff line number Diff line change
@@ -1,139 +1,211 @@
package leader

import (
gocontext "context"
"errors"
"fmt"
"log"
"math/rand"
"os"
"sync/atomic"
"strings"
"time"

"github.com/flanksource/duty/context"
"github.com/samber/lo"

v1 "k8s.io/api/coordination/v1"
"github.com/sethvargo/go-retry"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/echo"
"github.com/flanksource/duty/shutdown"
)

var (
identity = getHostname()
hostname string
podNamespace string
)

var isLeader *atomic.Bool
const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

func getPodNamespace() (string, error) {
// passed using K8s downwards API
if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok {
return ns, nil
}

data, err := os.ReadFile(namespaceFilePath)
if err != nil {
return "", fmt.Errorf("failed to read namespace file: %w", err)
}

ns := strings.TrimSpace(string(data))
if ns == "" {
return "", errors.New("namespace was neither found in the env nor in the service account path")
}

return ns, nil
}

func getHostname() string {
hostname, err := os.Hostname()
func init() {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Fatalf("Failed to get hostname: %v", err)
log.Fatalf("failed to get hostname: %v", err)
}

// To test locally
if v, ok := os.LookupEnv("MC_HOSTNAME_OVERRIDE"); ok {
logger.Infof("hostname overriden by MC_HOSTNAME_OVERRIDE: %s", v)
hostname = v
}

if n, err := getPodNamespace(); err == nil {
podNamespace = n
}
return hostname
}

var watchers = []func(isLeader bool){}
func Register(
ctx context.Context,
app string,
namespace string,
onLead func(ctx gocontext.Context),
onStoppedLead func(),
onNewLeader func(identity string),
) error {
if namespace == "" {
namespace = podNamespace
}

func OnElection(ctx context.Context, leaseName string, fn func(isLeader bool)) {
watchers = append(watchers, fn)
ctx = ctx.WithNamespace(namespace)

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: app,
Namespace: namespace,
},
Client: ctx.Kubernetes().CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: hostname,
},
}

if isLeader == nil {
leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute)
isLeader = new(atomic.Bool)
go func() {
for {
electionConfig := leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: ctx.Properties().Duration("leader.lease.duration", 30*time.Second),
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(leadCtx gocontext.Context) {
ctx.Infof("started leading election")

leader, _, err := createOrUpdateLease(ctx, leaseName, 0)
updateLeaderLabel(ctx, app)

if err != nil {
ctx.Warnf("failed to create/update lease: %v", err)
time.Sleep(time.Duration(rand.Intn(60)) * time.Second)
continue
for entry := range echo.Crons.IterBuffered() {
entry.Val.Start()
}

if isLeader.Load() != leader {
isLeader.Store(leader)
notify(leader)
if onLead != nil {
onLead(leadCtx)
}
},
OnStoppedLeading: func() {
ctx.Infof("stopped leading election")

// sleep for just under half the lease duration before trying to renew
time.Sleep(leaseDuration/2 - time.Second*10)
}
}()
}
for entry := range echo.Crons.IterBuffered() {
entry.Val.Stop()
}

}
if onStoppedLead != nil {
onStoppedLead()
}
},
OnNewLeader: func(identity string) {
if identity == hostname {
return
}

func notify(isLeader bool) {
for _, fn := range watchers {
fn(isLeader)
if onNewLeader != nil {
onNewLeader(identity)
}
},
},
}

}

func IsLeader(ctx context.Context, leaseName string) (bool, error) {
leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace())

lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
elector, err := leaderelection.NewLeaderElector(electionConfig)
if err != nil {
return false, err
return err
}

if *lease.Spec.HolderIdentity == identity {
return true, nil
}
leaderContext, cancel := gocontext.WithCancel(ctx)
shutdown.AddHook(func() {
cancel()

// give the elector some time to release the lease
time.Sleep(time.Second * 2)
})

go func() {
// when a network failure occurs for a considerable amount of time (>30s)
// elector.Run terminates and never retries acquiring the lease.
//
// that's why it's run in a never ending loop
for {
select {
case <-leaderContext.Done():
return
default:
elector.Run(leaderContext)
}
}
}()
<-ctx.Done()

return false, nil
return nil
}

func createOrUpdateLease(ctx context.Context, leaseName string, attempt int) (bool, string, error) {
if attempt > 0 {
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
}
if attempt >= ctx.Properties().Int("leader.lease.attempts", 3) {
return false, "", fmt.Errorf("failed to acquire lease %s after %d attempts", leaseName, attempt)
}
now := metav1.MicroTime{Time: time.Now()}
leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace())
leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute)
lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
if err != nil {
return false, "", err
}
if lease == nil {
ctx.Infof("Acquiring lease %s", leaseName)
lease = &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: leaseName,
Namespace: ctx.GetNamespace(),
},
Spec: v1.LeaseSpec{
HolderIdentity: &identity,
LeaseDurationSeconds: lo.ToPtr(int32(leaseDuration.Seconds())),
AcquireTime: &now,
RenewTime: &now,
},
}
_, err = leases.Create(ctx, lease, metav1.CreateOptions{})
// updateLeaderLabel sets leader:true label on the current pod
// and also removes that label from all other replicas.
func updateLeaderLabel(ctx context.Context, app string) {
backoff := retry.WithMaxRetries(3, retry.NewExponential(time.Second))
err := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
labelSelector := fmt.Sprintf("%s/leader=true", app)
podList, err := ctx.Kubernetes().CoreV1().Pods(ctx.GetNamespace()).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return false, "", err
return retry.RetryableError(fmt.Errorf("failed to list pods with labelSelector(%s): %w", labelSelector, err))
}
return true, identity, nil
}

if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == identity {
lease.Spec.RenewTime = &now
ctx.Debugf("Renewing lease %s : %s", leaseName, now.String())
_, err = leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return false, "", err
}
}
renewTime := lease.Spec.RenewTime.Time
if time.Since(renewTime) > leaseDuration {
ctx.Infof("Lease %s held by %s expired", leaseName, *lease.Spec.HolderIdentity)
if err := leases.Delete(ctx, leaseName, metav1.DeleteOptions{}); err != nil {
ctx.Infof("failed to delete leases %s: %v", leaseName, err)
pods := lo.Map(podList.Items, func(p corev1.Pod, _ int) string { return p.Name })
pods = append(pods, hostname)

for _, podName := range lo.Uniq(pods) {
var payload string
if podName == hostname {
ctx.Infof("adding leader metadata to pod: %s", podName)
payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader":"true"}}}`, app)
} else {
ctx.Infof("removing leader metadata to pod: %s", podName)
payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader": null}}}`, app)
}

_, err := ctx.Kubernetes().CoreV1().Pods(ctx.GetNamespace()).Patch(ctx,
podName,
types.MergePatchType,
[]byte(payload),
metav1.PatchOptions{})
if err != nil {
return retry.RetryableError(err)
}
}
return createOrUpdateLease(ctx, leaseName, attempt+1)

return nil
})
if err != nil {
ctx.Errorf("failed to set label: %v", err)
}
ctx.Debugf("Lease %s already held by %s, expires in %s", leaseName, *lease.Spec.HolderIdentity, time.Until(renewTime.Add(leaseDuration)).String())
return false, *lease.Spec.HolderIdentity, nil
}
3 changes: 3 additions & 0 deletions start.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func Start(name string, opts ...StartOption) (context.Context, func(), error) {
return context.Context{}, stop, err
} else {
ctx = *c
stop = func() {
c.Pool().Close()
}
}
}

Expand Down

0 comments on commit 01710f9

Please sign in to comment.