Skip to content

Commit

Permalink
bug fix: full snapshot lease update retry on failure with 1m interval (
Browse files Browse the repository at this point in the history
  • Loading branch information
anveshreddy18 authored Jul 5, 2024
1 parent dda25ea commit dfcf84e
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 79 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
if opts.EnabledLeaseRenewal {
// Update revisions in holder identity of full snapshot lease.
ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, opts.DeltaSnapshotLeaseName); err != nil {
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil {
cp.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
12 changes: 5 additions & 7 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
return nil
}

// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot
// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot.
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if k8sClientset == nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -221,7 +221,6 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

Expand Down Expand Up @@ -288,18 +287,17 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel
return nil
}

// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, deltaSnapshotLeaseName string) error {
// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

// DeltaSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a delta snapshot is taken
// DeltaSnapshotCaseLeaseUpdate Updates the deltasnapshot lease as needed when a delta snapshot is taken
func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8sClientset client.Client, deltaSnapshotLeaseName string, store brtypes.SnapStore) error {
_, latestDeltaSnapshotList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err == nil {
Expand Down Expand Up @@ -353,7 +351,7 @@ func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hco
hb.logger.Info("Stopped member lease renewal timer")
return nil
case <-stopCh:
hb.logger.Info("Stoping the member lease renewal")
hb.logger.Info("Stopping the member lease renewal")
return nil
}
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,46 @@ var _ = Describe("Heartbeat", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expect(os.Getenv("POD_NAMESPACE")).Should(Equal("test_namespace"))

snap := &brtypes.Snapshot{
prevFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 980,
}
prevFullSnap.GenerateSnapshotName()

latestFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 989,
}
snap.GenerateSnapshotName()
latestFullSnap.GenerateSnapshotName()
err := k8sClientset.Create(context.TODO(), lease)
Expect(err).ShouldNot(HaveOccurred())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, snap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
// Update full snapshot lease with the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980")))

Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))
// Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l = &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))

err = k8sClientset.Delete(context.TODO(), l)
Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -132,7 +150,7 @@ var _ = Describe("Heartbeat", func() {

Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).Should(HaveOccurred())

err = k8sClientset.Delete(context.TODO(), lease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
if b.config.HealthConfig.SnapshotLeaseRenewalEnabled {
leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, b.config.HealthConfig.DeltaSnapshotLeaseName); err != nil {
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil {
b.logger.Warnf("Snapshot lease update failed : %v", err)
}
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package snapshotter

import (
"context"
"time"

"github.com/gardener/etcd-backup-restore/pkg/health/heartbeat"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
)

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
defer func() {
fullSnapshotLeaseUpdateCancel()
if ssr.FullSnapshotLeaseUpdateTimer != nil {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer = nil
}
}()
logger.Infof("Starting the FullSnapshot lease renewal with interval %v", fullSnapshotLeaseUpdateInterval)
for {
select {
case <-ssr.FullSnapshotLeaseUpdateTimer.C:
if ssr.PrevFullSnapshot != nil {
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
logger.Infof("Resetting the FullSnapshot lease to retry updating with revision %d after %v", ssr.PrevFullSnapshot.LastRevision, fullSnapshotLeaseUpdateInterval)
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
} else {
//FullSnapshot lease successfully updated. Stop the timer
logger.Infof("Stopping the FullSnapshot lease update")
ssr.FullSnapshotLeaseUpdateTimer.Stop()
}
} else {
//Skip the FullSnapshot lease update as no full snapshot has been taken yet. Reset the timer to retry after interval
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-FullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
}
}
124 changes: 62 additions & 62 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,32 +82,33 @@ func NewSnapshotterConfig() *brtypes.SnapshotterConfig {

// Snapshotter is a struct for etcd snapshot taker
type Snapshotter struct {
logger *logrus.Entry
etcdConnectionConfig *brtypes.EtcdConnectionConfig
store brtypes.SnapStore
config *brtypes.SnapshotterConfig
compressionConfig *compressor.CompressionConfig
healthConfig *brtypes.HealthConfig
schedule cron.Schedule
PrevSnapshot *brtypes.Snapshot
PrevFullSnapshot *brtypes.Snapshot
PrevDeltaSnapshots brtypes.SnapList
fullSnapshotReqCh chan bool
deltaSnapshotReqCh chan struct{}
fullSnapshotAckCh chan result
deltaSnapshotAckCh chan result
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []byte
watchCh clientv3.WatchChan
etcdWatchClient *clientv3.Watcher
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState brtypes.SnapshotterState
lastEventRevision int64
K8sClientset client.Client
snapstoreConfig *brtypes.SnapstoreConfig
lastSecretModifiedTime time.Time
logger *logrus.Entry
etcdConnectionConfig *brtypes.EtcdConnectionConfig
store brtypes.SnapStore
config *brtypes.SnapshotterConfig
compressionConfig *compressor.CompressionConfig
HealthConfig *brtypes.HealthConfig
schedule cron.Schedule
PrevSnapshot *brtypes.Snapshot
PrevFullSnapshot *brtypes.Snapshot
PrevDeltaSnapshots brtypes.SnapList
fullSnapshotReqCh chan bool
deltaSnapshotReqCh chan struct{}
fullSnapshotAckCh chan result
deltaSnapshotAckCh chan result
FullSnapshotLeaseUpdateTimer *time.Timer
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []byte
watchCh clientv3.WatchChan
etcdWatchClient *clientv3.Watcher
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState brtypes.SnapshotterState
lastEventRevision int64
K8sClientset client.Client
snapstoreConfig *brtypes.SnapstoreConfig
lastSecretModifiedTime time.Time
}

// NewSnapshotter returns the snapshotter object.
Expand Down Expand Up @@ -153,29 +154,29 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto
config: config,
etcdConnectionConfig: etcdConnectionConfig,
compressionConfig: compressionConfig,
healthConfig: healthConfig,

schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
HealthConfig: healthConfig,
schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
}, nil
}

// Run process loop for scheduled backup
// Setting startWithFullSnapshot to false will start the snapshotter without
// taking the first full snapshot.
func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error {
defer ssr.stop()
FullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(FullSnapshotLeaseStopCh)
if startWithFullSnapshot {
ssr.fullSnapshotTimer = time.NewTimer(0)
} else {
Expand All @@ -196,7 +197,9 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool)
return fmt.Errorf("failed to reset full snapshot timer: %v", err)
}
}

if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
}
ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval)
if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold {
ssr.deltaSnapshotTimer.Stop()
Expand Down Expand Up @@ -241,7 +244,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {

// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop() {
func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
ssr.logger.Info("Closing the Snapshotter...")

if ssr.fullSnapshotTimer != nil {
Expand All @@ -252,6 +255,9 @@ func (ssr *Snapshotter) stop() {
ssr.deltaSnapshotTimer.Stop()
ssr.deltaSnapshotTimer = nil
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
FullSnapshotLeaseStopCh <- emptyStruct
}
ssr.SetSnapshotterInactive()
ssr.closeEtcdClient()
}
Expand Down Expand Up @@ -644,12 +650,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err = heartbeat.FullSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.healthConfig.FullSnapshotLeaseName, ssr.healthConfig.DeltaSnapshotLeaseName); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}

case <-ssr.deltaSnapshotReqCh:
Expand All @@ -662,9 +665,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -674,22 +677,19 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if _, err := ssr.TakeFullSnapshotAndResetTimer(false); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.healthConfig.FullSnapshotLeaseName, ssr.healthConfig.DeltaSnapshotLeaseName); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}

case <-ssr.deltaSnapshotTimer.C:
if ssr.config.DeltaSnapshotPeriod.Duration >= time.Second {
if _, err := ssr.takeDeltaSnapshotAndResetTimer(); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -704,11 +704,11 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err := ssr.handleDeltaWatchEvents(wr); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
//Call UpdateDeltaSnapshotLease only if new delta snapshot taken
if snapshots < len(ssr.PrevDeltaSnapshots) {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
Loading

0 comments on commit dfcf84e

Please sign in to comment.