Skip to content

Commit

Permalink
feat(v2 upgrade/webhook): introduce global lock for data engine upgra…
Browse files Browse the repository at this point in the history
…de resources

Only allow one active dataEngineUpgradeManager and one active nodeDataEngineUppgrade
at the same time.

Longhorn 9104

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Dec 15, 2024
1 parent 9e926af commit 83cfcce
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 16 deletions.
4 changes: 2 additions & 2 deletions app/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func startManager(c *cli.Context) error {
if err != nil {
return err
}
if err := webhook.StartWebhook(ctx, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil {
if err := webhook.StartWebhook(ctx, currentNodeID, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil {
return err
}

Expand All @@ -195,7 +195,7 @@ func startManager(c *cli.Context) error {
return err
}

if err := webhook.StartWebhook(ctx, types.WebhookTypeAdmission, clients); err != nil {
if err := webhook.StartWebhook(ctx, currentNodeID, types.WebhookTypeAdmission, clients); err != nil {
return err
}
if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetAdmissionWebhookLabel()); err != nil {
Expand Down
53 changes: 50 additions & 3 deletions webhook/resources/dataengineupgrademanager/validator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package dataengineupgrademanager

import (
"context"
"fmt"
"reflect"
"time"

"github.com/jrhouston/k8slock"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"

admissionregv1 "k8s.io/api/admissionregistration/v1"
Expand All @@ -17,11 +21,15 @@ import (

type dataEngineUpgradeManagerValidator struct {
admission.DefaultValidator
ds *datastore.DataStore
ds *datastore.DataStore
locker *k8slock.Locker
}

func NewValidator(ds *datastore.DataStore) admission.Validator {
return &dataEngineUpgradeManagerValidator{ds: ds}
func NewValidator(ds *datastore.DataStore, locker *k8slock.Locker) admission.Validator {
return &dataEngineUpgradeManagerValidator{
ds: ds,
locker: locker,
}
}

func (u *dataEngineUpgradeManagerValidator) Resource() admission.Resource {
Expand All @@ -38,12 +46,51 @@ func (u *dataEngineUpgradeManagerValidator) Resource() admission.Resource {
}
}

func (u *dataEngineUpgradeManagerValidator) areAllDataEngineUpgradeManagerStopped() (bool, error) {
upgradeManagers, err := u.ds.ListDataEngineUpgradeManagers()
if err != nil {
return false, err
}
for _, upgradeManager := range upgradeManagers {
if upgradeManager.Status.State != longhorn.UpgradeStateCompleted &&
upgradeManager.Status.State != longhorn.UpgradeStateError {
return false, nil
}
}
return true, nil
}

func (u *dataEngineUpgradeManagerValidator) Create(request *admission.Request, newObj runtime.Object) error {
upgradeManager, ok := newObj.(*longhorn.DataEngineUpgradeManager)
if !ok {
return werror.NewInvalidError(fmt.Sprintf("%v is not a *longhorn.DataEngineUpgradeManager", newObj), "")
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := u.locker.LockContext(ctx)
if err != nil {
err = errors.Wrapf(err, "failed to lock for dataEngineUpgradeManager %v", upgradeManager.Name)
return werror.NewInternalError(err.Error())
}
defer func() {
err = u.locker.UnlockContext(ctx)
if err != nil {
err = errors.Wrapf(err, "failed to unlock for dataEngineUpgradeManager %v", upgradeManager.Name)
}
}()

allStopped, err := u.areAllDataEngineUpgradeManagerStopped()
if err != nil {
err = errors.Wrapf(err, "failed to check if all dataEngineUpgradeManager are stopped")
return werror.NewInternalError(err.Error())
}
if !allStopped {
err = fmt.Errorf("another dataEngineUpgradeManager is in progress")
return werror.NewBadRequest(err.Error())
}

nodes, err := u.ds.ListNodes()
if err != nil {
return werror.NewInternalError(err.Error())
Expand Down
29 changes: 26 additions & 3 deletions webhook/resources/nodedataengineupgrade/validator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package nodedataengineupgrade

import (
"context"
"fmt"
"time"

"github.com/jrhouston/k8slock"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"

admissionregv1 "k8s.io/api/admissionregistration/v1"
Expand All @@ -16,11 +20,15 @@ import (

type nodeDataEngineUpgradeValidator struct {
admission.DefaultValidator
ds *datastore.DataStore
ds *datastore.DataStore
locker *k8slock.Locker
}

func NewValidator(ds *datastore.DataStore) admission.Validator {
return &nodeDataEngineUpgradeValidator{ds: ds}
func NewValidator(ds *datastore.DataStore, locker *k8slock.Locker) admission.Validator {
return &nodeDataEngineUpgradeValidator{
ds: ds,
locker: locker,
}
}

func (u *nodeDataEngineUpgradeValidator) Resource() admission.Resource {
Expand All @@ -43,6 +51,21 @@ func (u *nodeDataEngineUpgradeValidator) Create(request *admission.Request, newO
return werror.NewInvalidError("object is not a *longhorn.NodeDataEngineUpgrade", "")
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := u.locker.LockContext(ctx)
if err != nil {
err = errors.Wrapf(err, "failed to lock for nodeDataEngineUpgrade %v", nodeUpgrade.Name)
return werror.NewInternalError(err.Error())
}
defer func() {
err = u.locker.UnlockContext(ctx)
if err != nil {
err = errors.Wrapf(err, "failed to unlock for nodeDataEngineUpgrade %v", nodeUpgrade.Name)
}
}()

if nodeUpgrade.Spec.NodeID == "" {
return werror.NewInvalidError("nodeID is required", "spec.nodeID")
}
Expand Down
37 changes: 34 additions & 3 deletions webhook/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"net/http"
"reflect"
"time"

"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/jrhouston/k8slock"
"github.com/rancher/dynamiclistener"
"github.com/rancher/dynamiclistener/server"
"github.com/sirupsen/logrus"
Expand All @@ -19,6 +22,8 @@ import (
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util/client"
"github.com/longhorn/longhorn-manager/webhook/admission"

webhooktypes "github.com/longhorn/longhorn-manager/webhook/types"
)

var (
Expand All @@ -38,19 +43,45 @@ type WebhookServer struct {
namespace string
webhookType string
clients *client.Clients
lockers map[string]*k8slock.Locker
}

func New(ctx context.Context, namespace, webhookType string, clients *client.Clients) *WebhookServer {
func New(ctx context.Context, namespace, nodeName, webhookType string, clients *client.Clients) (*WebhookServer, error) {
lockerNames := []string{
webhooktypes.PascalToKebab(types.LonghornKindDataEngineUpgradeManager),
webhooktypes.PascalToKebab(types.LonghornKindNodeDataEngineUpgrade),
}
lockers := map[string]*k8slock.Locker{}

for _, name := range lockerNames {
clientID := nodeName + "_" + uuid.New().String()

logrus.Infof("Creating locker for resource %v with clientID %v", name, clientID)
locker, err := k8slock.NewLocker(
name,
k8slock.Namespace(namespace),
k8slock.ClientID(clientID),
k8slock.InClusterConfig(),
k8slock.TTL(180*time.Second),
)

if err != nil {
return nil, err
}
lockers[name] = locker
}

return &WebhookServer{
context: ctx,
namespace: namespace,
webhookType: webhookType,
clients: clients,
}
lockers: lockers,
}, nil
}

func (s *WebhookServer) admissionWebhookListenAndServe() error {
validationHandler, validationResources, err := Validation(s.clients.Datastore)
validationHandler, validationResources, err := Validation(s.clients.Datastore, s.lockers)
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions webhook/server/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"net/http"

"github.com/jrhouston/k8slock"
"github.com/rancher/wrangler/v3/pkg/webhook"

"github.com/longhorn/longhorn-manager/datastore"
Expand All @@ -26,9 +27,11 @@ import (
"github.com/longhorn/longhorn-manager/webhook/resources/systemrestore"
"github.com/longhorn/longhorn-manager/webhook/resources/volume"
"github.com/longhorn/longhorn-manager/webhook/resources/volumeattachment"

webhooktypes "github.com/longhorn/longhorn-manager/webhook/types"
)

func Validation(ds *datastore.DataStore) (http.Handler, []admission.Resource, error) {
func Validation(ds *datastore.DataStore, lockers map[string]*k8slock.Locker) (http.Handler, []admission.Resource, error) {
currentNodeID, err := util.GetRequiredEnv(types.EnvNodeName)
if err != nil {
return nil, nil, err
Expand All @@ -51,8 +54,8 @@ func Validation(ds *datastore.DataStore) (http.Handler, []admission.Resource, er
replica.NewValidator(ds),
instancemanager.NewValidator(ds),
persistentvolumeclaim.NewValidator(ds),
dataengineupgrademanager.NewValidator(ds),
nodedataengineupgrade.NewValidator(ds),
dataengineupgrademanager.NewValidator(ds, lockers[webhooktypes.PascalToKebab(types.LonghornKindDataEngineUpgradeManager)]),
nodedataengineupgrade.NewValidator(ds, lockers[webhooktypes.PascalToKebab(types.LonghornKindNodeDataEngineUpgrade)]),
}

router := webhook.NewRouter()
Expand Down
15 changes: 15 additions & 0 deletions webhook/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

import (
"regexp"
"strings"
)

func PascalToKebab(input string) string {
// Use a regex to insert a hyphen before each uppercase letter, except the first one.
re := regexp.MustCompile("([a-z])([A-Z])")
hyphenated := re.ReplaceAllString(input, "$1-$2")

// Convert the result to lowercase.
return strings.ToLower(hyphenated)
}
9 changes: 7 additions & 2 deletions webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/longhorn/longhorn-manager/types"
Expand All @@ -20,7 +21,7 @@ var (
defaultStartTimeout = 60 * time.Second
)

func StartWebhook(ctx context.Context, webhookType string, clients *client.Clients) error {
func StartWebhook(ctx context.Context, nodeName, webhookType string, clients *client.Clients) error {
logrus.Infof("Starting longhorn %s webhook server", webhookType)

var webhookLocalEndpoint string
Expand All @@ -33,7 +34,11 @@ func StartWebhook(ctx context.Context, webhookType string, clients *client.Clien
return fmt.Errorf("unexpected webhook server type %v", webhookType)
}

s := server.New(ctx, clients.Namespace, webhookType, clients)
s, err := server.New(ctx, clients.Namespace, nodeName, webhookType, clients)
if err != nil {
return errors.Wrapf(err, "failed to create %v webhook server", webhookType)
}

go func() {
if err := s.ListenAndServe(); err != nil {
logrus.Fatalf("Error %v webhook server failed: %v", webhookType, err)
Expand Down

0 comments on commit 83cfcce

Please sign in to comment.