diff --git a/go.mod b/go.mod index 4e3314bcfa..74474598c3 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 github.com/jinzhu/copier v0.4.0 + github.com/jrhouston/k8slock v0.2.0 github.com/kubernetes-csi/csi-lib-utils v0.19.0 github.com/longhorn/backing-image-manager v1.8.0-dev-20241215 github.com/longhorn/backupstore v0.0.0-20241208150358-e44f7d35fe1f @@ -68,7 +69,7 @@ require ( github.com/longhorn/go-iscsi-helper v0.0.0-20241208120713-c4ac270fbe7e github.com/longhorn/go-spdk-helper v0.0.0-20241210055426-92898a883955 github.com/longhorn/longhorn-engine v1.8.0-dev-20241215 - github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215 + github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215.0.20241215041917-483e7a8c95d3 github.com/longhorn/longhorn-share-manager v1.8.0-dev-20241215 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 diff --git a/go.sum b/go.sum index 2ed8b8577b..d32aecd393 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jrhouston/k8slock v0.2.0 h1:/303qjpwpTeNh0pCKj+yA1qUdYDoGxZ+kL/Ecu5p0AY= +github.com/jrhouston/k8slock v0.2.0/go.mod h1:olQjuryLy6Q32mFUlBJrVyJKm4S8JFKFUFPXWrsO6x8= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -163,8 +165,8 @@ github.com/longhorn/go-spdk-helper v0.0.0-20241210055426-92898a883955 h1:QcnR9b2 github.com/longhorn/go-spdk-helper v0.0.0-20241210055426-92898a883955/go.mod h1:isAM1U36SWOh7XWfktlbveHWSLXV3HfEF7p/tyNqAUQ= github.com/longhorn/longhorn-engine v1.8.0-dev-20241215 h1:sTQn2kHZqgwycM9Y3n3fuiSaURxeIS4h98ACJbbJDl8= github.com/longhorn/longhorn-engine v1.8.0-dev-20241215/go.mod h1:fcQI1dyu1KcQDvxrnHJ3rsSSauCKV1+IzaCLTbtdVY0= -github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215 h1:Q4rOIUWqEgwYDxVPLtrFrRBwxh6RTbMPuflwMxxa1qc= -github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215/go.mod h1:rZXKK0fs2avpeDONPfyR46N8xqnLv0Hnp/m0C8sYbWw= +github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215.0.20241215041917-483e7a8c95d3 h1:VNDjeT733DsSKvSv37B3IPmVVLI56MHf8/ZCseTsRBs= +github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215.0.20241215041917-483e7a8c95d3/go.mod h1:8CEoeY03OrSg/gbXlg7aC6MmVgI3T7TUEvAqZtMZIVo= github.com/longhorn/longhorn-share-manager v1.8.0-dev-20241215 h1:UBdkcAFfE1ipvtQZCyOIGZNdfw6rmMayyQKC+qHl5/M= github.com/longhorn/longhorn-share-manager v1.8.0-dev-20241215/go.mod h1:/EfupmQMayDFanwroXFLVqRbWI46M6e1iJypf1Oa7p8= github.com/longhorn/types v0.0.0-20241214151852-cf9e27d3d0a9 h1:mgIwf1do39qeJWwZDK+MhGMZmeNIbEluQITZ2TsDUGg= diff --git a/vendor/github.com/jrhouston/k8slock/LICENSE b/vendor/github.com/jrhouston/k8slock/LICENSE new file mode 100644 index 0000000000..ea9b7b90db --- /dev/null +++ b/vendor/github.com/jrhouston/k8slock/LICENSE @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2020 John Houston + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/jrhouston/k8slock/README.md b/vendor/github.com/jrhouston/k8slock/README.md new file mode 100644 index 0000000000..b3b31f2aad --- /dev/null +++ b/vendor/github.com/jrhouston/k8slock/README.md @@ -0,0 +1,78 @@ +# k8slock [![Godoc](https://godoc.org/github.com/jrhouston/k8slock?status.svg)](https://godoc.org/github.com/jrhouston/k8slock) [![Go Report Card](https://goreportcard.com/badge/github.com/jrhouston/k8slock)](https://goreportcard.com/report/github.com/jrhouston/k8slock) + +k8slock is a Go module that makes it easy to do distributed locking using the [Lease](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/) resource from the Kubernetes coordination API. + +If you want to use Kubernetes to create a simple distributed lock, this module is for you. + +This module implements the [sync.Locker](https://golang.org/pkg/sync/#Locker) interface using the `Lock()` and `Unlock()` functions. + +This module also supports using contexts via the `LockContext()` and `UnlockContext()` functions. + + +## Basic Usage + +```go +package main + +import "github.com/jrhouston/k8slock" + +func main() { + locker, err := k8slock.NewLocker("example-lock") + if err != nil { + panic(err) + } + + locker.Lock() + // do some work + locker.Unlock() +} +``` + +## Basic Usage – Context + +```go +package main + +import ( + "context" + "github.com/jrhouston/k8slock" +) + +func main() { + locker, err := k8slock.NewLocker("example-lock") + if err != nil { + panic(err) + } + + ctx := context.Background() + if err := locker.LockContext(ctx); err != nil { + fmt.Println("Error trying to lock", err) + } + + // do some work + + if err := locker.Unlock(ctx); err != nil { + fmt.Println("Error trying to unlock", err) + } +} +``` + +# Locker Options + +The locker can be configured using the following [functional options](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis): + +| Option | Details | +|---|---| +| `TTL(duration)` | The duration until the lock expires and can be forcibly claimed. By default the lock can be held infinitely. | +| `RetryWaitDuration(duration)` | The duration to wait before retrying after failing to acquired the lock. Default: 1 second. | +| `InClusterConfig()` | Get the kubernetes client config from inside a pod. Defaults to a clientset using the local kubeconfig. | +| `Clientset(kubernetes.Interface)` | Configure a custom Kubernetes Clientset. Defaults to a clientset using the local kubeconfig. | +| `Namespace(string)` | The kubernetes namespace to store the Lease resource. Defaults to "default". | +| `ClientID(string)` | A unique ID for the client that is trying to obtain the lock. Defaults to a random UUID. | +| `CreateLease(bool)` | Create a Lease resource if it does not already exist. Defaults to `true`. | + +e.g: + +```go +locker, err := k8slock.NewLocker("example-lock", k8slock.Namespace("locks"), k8slock.ClientID("client-0")) +``` diff --git a/vendor/github.com/jrhouston/k8slock/locker.go b/vendor/github.com/jrhouston/k8slock/locker.go new file mode 100644 index 0000000000..ec05b41a6f --- /dev/null +++ b/vendor/github.com/jrhouston/k8slock/locker.go @@ -0,0 +1,297 @@ +package k8slock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/utils/pointer" + + coordinationv1 "k8s.io/api/coordination/v1" + coordinationclientv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" +) + +// Locker implements the Locker interface using the kubernetes Lease resource +type Locker struct { + clientset kubernetes.Interface + leaseClient coordinationclientv1.LeaseInterface + namespace string + name string + clientID string + retryWait time.Duration + ttl time.Duration + skipLeaseCreation bool +} + +type lockerOption func(*Locker) error + +// Namespace is the namespace used to store the Lease +func Namespace(ns string) lockerOption { + return func(l *Locker) error { + l.namespace = ns + return nil + } +} + +// InClusterConfig configures the Kubernetes client assuming it is running inside a pod +func InClusterConfig() lockerOption { + return func(l *Locker) error { + c, err := inClusterClientset() + if err != nil { + return err + } + l.clientset = c + return nil + } +} + +// Clientset configures a custom Kubernetes Clientset +func Clientset(c kubernetes.Interface) lockerOption { + return func(l *Locker) error { + l.clientset = c + return nil + } +} + +// RetryWaitDuration is the duration the Lock function will wait before retrying +// after failing to acquire the lock +func RetryWaitDuration(d time.Duration) lockerOption { + return func(l *Locker) error { + l.retryWait = d + return nil + } +} + +// ClientID is a unique ID for the client acquiring the lock +func ClientID(id string) lockerOption { + return func(l *Locker) error { + l.clientID = id + return nil + } +} + +// TTL is the duration a lock can exist before it can be forcibly acquired +// by another client +func TTL(ttl time.Duration) lockerOption { + return func(l *Locker) error { + l.ttl = ttl + return nil + } +} + +// CreateLease specifies whether to create lease when it's absent. +func CreateLease(create bool) lockerOption { + return func(l *Locker) error { + l.skipLeaseCreation = !create + return nil + } +} + +// NewLocker creates a Locker +func NewLocker(name string, options ...lockerOption) (*Locker, error) { + locker := &Locker{ + name: name, + } + + for _, opt := range options { + err := opt(locker) + if err != nil { + return nil, fmt.Errorf("locker options: %v", err) + } + } + + if locker.namespace == "" { + locker.namespace = "default" + } + + if locker.clientID == "" { + locker.clientID = uuid.NewString() + } + + if locker.retryWait == 0 { + locker.retryWait = time.Duration(1) * time.Second + } + + if locker.clientset == nil { + c, err := localClientset() + if err != nil { + return nil, err + } + locker.clientset = c + } + + leaseClient := locker.clientset.CoordinationV1().Leases(locker.namespace) + + if !locker.skipLeaseCreation { + // create the Lease if it doesn't exist + _, err := leaseClient.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: coordinationv1.LeaseSpec{ + LeaseTransitions: pointer.Int32(0), + }, + } + + _, err := leaseClient.Create(context.TODO(), lease, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + } + } + + locker.leaseClient = leaseClient + return locker, nil +} + +func (l *Locker) lock(ctx context.Context) error { + // block until we get a lock + for { + // get the Lease + lease, err := l.leaseClient.Get(ctx, l.name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get Lease resource for lock: %w", err) + } + + if lease.Spec.HolderIdentity != nil { + if lease.Spec.LeaseDurationSeconds == nil { + // The lock is already held and has no expiry + time.Sleep(l.retryWait) + continue + } + + acquireTime := lease.Spec.AcquireTime.Time + leaseDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second + + if acquireTime.Add(leaseDuration).After(time.Now()) { + // The lock is already held and hasn't expired yet + time.Sleep(l.retryWait) + continue + } + } + + // nobody holds the lock, try and lock it + lease.Spec.HolderIdentity = pointer.String(l.clientID) + if lease.Spec.LeaseTransitions != nil { + lease.Spec.LeaseTransitions = pointer.Int32((*lease.Spec.LeaseTransitions) + 1) + } else { + lease.Spec.LeaseTransitions = pointer.Int32((*lease.Spec.LeaseTransitions) + 1) + } + lease.Spec.AcquireTime = &metav1.MicroTime{ + Time: time.Now(), + } + if l.ttl.Seconds() > 0 { + lease.Spec.LeaseDurationSeconds = pointer.Int32(int32(l.ttl.Seconds())) + } + _, err = l.leaseClient.Update(ctx, lease, metav1.UpdateOptions{}) + if err == nil { + // we got the lock, break the loop + break + } + + if !k8serrors.IsConflict(err) { + // if the error isn't a conflict then something went horribly wrong + return fmt.Errorf("lock: error when trying to update Lease: %w", err) + } + + // Another client beat us to the lock + time.Sleep(l.retryWait) + } + + return nil +} + +func (l *Locker) unlock(ctx context.Context) error { + lease, err := l.leaseClient.Get(ctx, l.name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get Lease resource for lock: %w", err) + } + + // the holder has to have a value and has to be our ID for us to be able to unlock + if lease.Spec.HolderIdentity == nil { + return fmt.Errorf("unlock: no lock holder value") + } + + if *lease.Spec.HolderIdentity != l.clientID { + return fmt.Errorf("unlock: not the lock holder (%v != %v)", *lease.Spec.HolderIdentity, l.clientID) + } + + lease.Spec.HolderIdentity = nil + lease.Spec.AcquireTime = nil + lease.Spec.LeaseDurationSeconds = nil + _, err = l.leaseClient.Update(ctx, lease, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("unlock: error when trying to update Lease: %w", err) + } + + return nil +} + +// Lock will block until the client is the holder of the Lease resource +func (l *Locker) Lock() { + err := l.lock(context.Background()) + if err != nil { + panic(err) + } +} + +// Unlock will remove the client as the holder of the Lease resource +func (l *Locker) Unlock() { + err := l.unlock(context.Background()) + if err != nil { + panic(err) + } +} + +// LockContext will block until the client is the holder of the Lease resource +func (l *Locker) LockContext(ctx context.Context) error { + return l.lock(ctx) +} + +// UnlockContext will remove the client as the holder of the Lease resource +func (l *Locker) UnlockContext(ctx context.Context) error { + return l.unlock(ctx) +} + +func localClientset() (kubernetes.Interface, error) { + rules := clientcmd.NewDefaultClientConfigLoadingRules() + overrides := &clientcmd.ConfigOverrides{} + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() + if err != nil { + return nil, err + } + + if config == nil { + config = &rest.Config{} + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return clientset, nil +} + +func inClusterClientset() (kubernetes.Interface, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return clientset, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 8eb297b87c..45659871c2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -200,6 +200,9 @@ github.com/jinzhu/copier # github.com/josharian/intern v1.0.0 ## explicit; go 1.5 github.com/josharian/intern +# github.com/jrhouston/k8slock v0.2.0 +## explicit; go 1.20 +github.com/jrhouston/k8slock # github.com/json-iterator/go v1.1.12 ## explicit; go 1.12 github.com/json-iterator/go @@ -274,7 +277,7 @@ github.com/longhorn/longhorn-engine/pkg/meta github.com/longhorn/longhorn-engine/pkg/replica/client github.com/longhorn/longhorn-engine/pkg/types github.com/longhorn/longhorn-engine/pkg/util -# github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215 +# github.com/longhorn/longhorn-instance-manager v1.8.0-dev-20241215.0.20241215041917-483e7a8c95d3 ## explicit; go 1.23.0 github.com/longhorn/longhorn-instance-manager/pkg/api github.com/longhorn/longhorn-instance-manager/pkg/client