From 83cfcce228418aff638b2a19f07bc996bdeed66f Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 15 Dec 2024 14:16:02 +0800 Subject: [PATCH] feat(v2 upgrade/webhook): introduce global lock for data engine upgrade resources Only allow one active dataEngineUpgradeManager and one active nodeDataEngineUppgrade at the same time. Longhorn 9104 Signed-off-by: Derek Su --- app/daemon.go | 4 +- .../dataengineupgrademanager/validator.go | 53 +++++++++++++++++-- .../nodedataengineupgrade/validator.go | 29 ++++++++-- webhook/server/server.go | 37 +++++++++++-- webhook/server/validation.go | 9 ++-- webhook/types/types.go | 15 ++++++ webhook/webhook.go | 9 +++- 7 files changed, 140 insertions(+), 16 deletions(-) create mode 100644 webhook/types/types.go diff --git a/app/daemon.go b/app/daemon.go index 1daf10081d..3afba955e7 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -169,7 +169,7 @@ func startManager(c *cli.Context) error { if err != nil { return err } - if err := webhook.StartWebhook(ctx, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil { + if err := webhook.StartWebhook(ctx, currentNodeID, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil { return err } @@ -195,7 +195,7 @@ func startManager(c *cli.Context) error { return err } - if err := webhook.StartWebhook(ctx, types.WebhookTypeAdmission, clients); err != nil { + if err := webhook.StartWebhook(ctx, currentNodeID, types.WebhookTypeAdmission, clients); err != nil { return err } if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetAdmissionWebhookLabel()); err != nil { diff --git a/webhook/resources/dataengineupgrademanager/validator.go b/webhook/resources/dataengineupgrademanager/validator.go index fdfedc73fd..f76ce423b8 100644 --- a/webhook/resources/dataengineupgrademanager/validator.go +++ b/webhook/resources/dataengineupgrademanager/validator.go @@ -1,9 +1,13 @@ package dataengineupgrademanager import ( + "context" "fmt" "reflect" + "time" + "github.com/jrhouston/k8slock" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" admissionregv1 "k8s.io/api/admissionregistration/v1" @@ -17,11 +21,15 @@ import ( type dataEngineUpgradeManagerValidator struct { admission.DefaultValidator - ds *datastore.DataStore + ds *datastore.DataStore + locker *k8slock.Locker } -func NewValidator(ds *datastore.DataStore) admission.Validator { - return &dataEngineUpgradeManagerValidator{ds: ds} +func NewValidator(ds *datastore.DataStore, locker *k8slock.Locker) admission.Validator { + return &dataEngineUpgradeManagerValidator{ + ds: ds, + locker: locker, + } } func (u *dataEngineUpgradeManagerValidator) Resource() admission.Resource { @@ -38,12 +46,51 @@ func (u *dataEngineUpgradeManagerValidator) Resource() admission.Resource { } } +func (u *dataEngineUpgradeManagerValidator) areAllDataEngineUpgradeManagerStopped() (bool, error) { + upgradeManagers, err := u.ds.ListDataEngineUpgradeManagers() + if err != nil { + return false, err + } + for _, upgradeManager := range upgradeManagers { + if upgradeManager.Status.State != longhorn.UpgradeStateCompleted && + upgradeManager.Status.State != longhorn.UpgradeStateError { + return false, nil + } + } + return true, nil +} + func (u *dataEngineUpgradeManagerValidator) Create(request *admission.Request, newObj runtime.Object) error { upgradeManager, ok := newObj.(*longhorn.DataEngineUpgradeManager) if !ok { return werror.NewInvalidError(fmt.Sprintf("%v is not a *longhorn.DataEngineUpgradeManager", newObj), "") } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := u.locker.LockContext(ctx) + if err != nil { + err = errors.Wrapf(err, "failed to lock for dataEngineUpgradeManager %v", upgradeManager.Name) + return werror.NewInternalError(err.Error()) + } + defer func() { + err = u.locker.UnlockContext(ctx) + if err != nil { + err = errors.Wrapf(err, "failed to unlock for dataEngineUpgradeManager %v", upgradeManager.Name) + } + }() + + allStopped, err := u.areAllDataEngineUpgradeManagerStopped() + if err != nil { + err = errors.Wrapf(err, "failed to check if all dataEngineUpgradeManager are stopped") + return werror.NewInternalError(err.Error()) + } + if !allStopped { + err = fmt.Errorf("another dataEngineUpgradeManager is in progress") + return werror.NewBadRequest(err.Error()) + } + nodes, err := u.ds.ListNodes() if err != nil { return werror.NewInternalError(err.Error()) diff --git a/webhook/resources/nodedataengineupgrade/validator.go b/webhook/resources/nodedataengineupgrade/validator.go index 1449864fd6..c0b0a68e8f 100644 --- a/webhook/resources/nodedataengineupgrade/validator.go +++ b/webhook/resources/nodedataengineupgrade/validator.go @@ -1,8 +1,12 @@ package nodedataengineupgrade import ( + "context" "fmt" + "time" + "github.com/jrhouston/k8slock" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" admissionregv1 "k8s.io/api/admissionregistration/v1" @@ -16,11 +20,15 @@ import ( type nodeDataEngineUpgradeValidator struct { admission.DefaultValidator - ds *datastore.DataStore + ds *datastore.DataStore + locker *k8slock.Locker } -func NewValidator(ds *datastore.DataStore) admission.Validator { - return &nodeDataEngineUpgradeValidator{ds: ds} +func NewValidator(ds *datastore.DataStore, locker *k8slock.Locker) admission.Validator { + return &nodeDataEngineUpgradeValidator{ + ds: ds, + locker: locker, + } } func (u *nodeDataEngineUpgradeValidator) Resource() admission.Resource { @@ -43,6 +51,21 @@ func (u *nodeDataEngineUpgradeValidator) Create(request *admission.Request, newO return werror.NewInvalidError("object is not a *longhorn.NodeDataEngineUpgrade", "") } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := u.locker.LockContext(ctx) + if err != nil { + err = errors.Wrapf(err, "failed to lock for nodeDataEngineUpgrade %v", nodeUpgrade.Name) + return werror.NewInternalError(err.Error()) + } + defer func() { + err = u.locker.UnlockContext(ctx) + if err != nil { + err = errors.Wrapf(err, "failed to unlock for nodeDataEngineUpgrade %v", nodeUpgrade.Name) + } + }() + if nodeUpgrade.Spec.NodeID == "" { return werror.NewInvalidError("nodeID is required", "spec.nodeID") } diff --git a/webhook/server/server.go b/webhook/server/server.go index 45525c3fdb..3113124833 100644 --- a/webhook/server/server.go +++ b/webhook/server/server.go @@ -5,8 +5,11 @@ import ( "fmt" "net/http" "reflect" + "time" + "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/jrhouston/k8slock" "github.com/rancher/dynamiclistener" "github.com/rancher/dynamiclistener/server" "github.com/sirupsen/logrus" @@ -19,6 +22,8 @@ import ( "github.com/longhorn/longhorn-manager/types" "github.com/longhorn/longhorn-manager/util/client" "github.com/longhorn/longhorn-manager/webhook/admission" + + webhooktypes "github.com/longhorn/longhorn-manager/webhook/types" ) var ( @@ -38,19 +43,45 @@ type WebhookServer struct { namespace string webhookType string clients *client.Clients + lockers map[string]*k8slock.Locker } -func New(ctx context.Context, namespace, webhookType string, clients *client.Clients) *WebhookServer { +func New(ctx context.Context, namespace, nodeName, webhookType string, clients *client.Clients) (*WebhookServer, error) { + lockerNames := []string{ + webhooktypes.PascalToKebab(types.LonghornKindDataEngineUpgradeManager), + webhooktypes.PascalToKebab(types.LonghornKindNodeDataEngineUpgrade), + } + lockers := map[string]*k8slock.Locker{} + + for _, name := range lockerNames { + clientID := nodeName + "_" + uuid.New().String() + + logrus.Infof("Creating locker for resource %v with clientID %v", name, clientID) + locker, err := k8slock.NewLocker( + name, + k8slock.Namespace(namespace), + k8slock.ClientID(clientID), + k8slock.InClusterConfig(), + k8slock.TTL(180*time.Second), + ) + + if err != nil { + return nil, err + } + lockers[name] = locker + } + return &WebhookServer{ context: ctx, namespace: namespace, webhookType: webhookType, clients: clients, - } + lockers: lockers, + }, nil } func (s *WebhookServer) admissionWebhookListenAndServe() error { - validationHandler, validationResources, err := Validation(s.clients.Datastore) + validationHandler, validationResources, err := Validation(s.clients.Datastore, s.lockers) if err != nil { return err } diff --git a/webhook/server/validation.go b/webhook/server/validation.go index 74ea0e87cc..3e3fea3406 100644 --- a/webhook/server/validation.go +++ b/webhook/server/validation.go @@ -3,6 +3,7 @@ package server import ( "net/http" + "github.com/jrhouston/k8slock" "github.com/rancher/wrangler/v3/pkg/webhook" "github.com/longhorn/longhorn-manager/datastore" @@ -26,9 +27,11 @@ import ( "github.com/longhorn/longhorn-manager/webhook/resources/systemrestore" "github.com/longhorn/longhorn-manager/webhook/resources/volume" "github.com/longhorn/longhorn-manager/webhook/resources/volumeattachment" + + webhooktypes "github.com/longhorn/longhorn-manager/webhook/types" ) -func Validation(ds *datastore.DataStore) (http.Handler, []admission.Resource, error) { +func Validation(ds *datastore.DataStore, lockers map[string]*k8slock.Locker) (http.Handler, []admission.Resource, error) { currentNodeID, err := util.GetRequiredEnv(types.EnvNodeName) if err != nil { return nil, nil, err @@ -51,8 +54,8 @@ func Validation(ds *datastore.DataStore) (http.Handler, []admission.Resource, er replica.NewValidator(ds), instancemanager.NewValidator(ds), persistentvolumeclaim.NewValidator(ds), - dataengineupgrademanager.NewValidator(ds), - nodedataengineupgrade.NewValidator(ds), + dataengineupgrademanager.NewValidator(ds, lockers[webhooktypes.PascalToKebab(types.LonghornKindDataEngineUpgradeManager)]), + nodedataengineupgrade.NewValidator(ds, lockers[webhooktypes.PascalToKebab(types.LonghornKindNodeDataEngineUpgrade)]), } router := webhook.NewRouter() diff --git a/webhook/types/types.go b/webhook/types/types.go new file mode 100644 index 0000000000..60eb053709 --- /dev/null +++ b/webhook/types/types.go @@ -0,0 +1,15 @@ +package types + +import ( + "regexp" + "strings" +) + +func PascalToKebab(input string) string { + // Use a regex to insert a hyphen before each uppercase letter, except the first one. + re := regexp.MustCompile("([a-z])([A-Z])") + hyphenated := re.ReplaceAllString(input, "$1-$2") + + // Convert the result to lowercase. + return strings.ToLower(hyphenated) +} diff --git a/webhook/webhook.go b/webhook/webhook.go index 138b1a3543..ba71bcc06f 100644 --- a/webhook/webhook.go +++ b/webhook/webhook.go @@ -8,6 +8,7 @@ import ( "net/http" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/longhorn/longhorn-manager/types" @@ -20,7 +21,7 @@ var ( defaultStartTimeout = 60 * time.Second ) -func StartWebhook(ctx context.Context, webhookType string, clients *client.Clients) error { +func StartWebhook(ctx context.Context, nodeName, webhookType string, clients *client.Clients) error { logrus.Infof("Starting longhorn %s webhook server", webhookType) var webhookLocalEndpoint string @@ -33,7 +34,11 @@ func StartWebhook(ctx context.Context, webhookType string, clients *client.Clien return fmt.Errorf("unexpected webhook server type %v", webhookType) } - s := server.New(ctx, clients.Namespace, webhookType, clients) + s, err := server.New(ctx, clients.Namespace, nodeName, webhookType, clients) + if err != nil { + return errors.Wrapf(err, "failed to create %v webhook server", webhookType) + } + go func() { if err := s.ListenAndServe(); err != nil { logrus.Fatalf("Error %v webhook server failed: %v", webhookType, err)