From 56a848533581063ea2fe840df67210d596490c3c Mon Sep 17 00:00:00 2001 From: Utkarsh Bhatt Date: Mon, 7 Oct 2024 18:55:03 +0530 Subject: [PATCH] Added support to perform cluster promotion/demotion Signed-off-by: Utkarsh Bhatt --- microceph/api/types/replication.go | 103 ++---------------- microceph/api/types/replication_rbd.go | 81 ++++++++++++++ microceph/ceph/rbd_mirror.go | 32 ++++++ microceph/ceph/replication.go | 22 +++- microceph/ceph/replication_rbd.go | 84 +++++++++++++- microceph/client/remote_replication.go | 6 +- .../cmd/microceph/remote_replication_rbd.go | 8 ++ .../remote_replication_rbd_demote.go | 61 +++++++++++ .../remote_replication_rbd_promote.go | 61 +++++++++++ microceph/constants/constants.go | 2 + 10 files changed, 358 insertions(+), 102 deletions(-) create mode 100644 microceph/cmd/microceph/remote_replication_rbd_demote.go create mode 100644 microceph/cmd/microceph/remote_replication_rbd_promote.go diff --git a/microceph/api/types/replication.go b/microceph/api/types/replication.go index e856eef5..ee0f175a 100644 --- a/microceph/api/types/replication.go +++ b/microceph/api/types/replication.go @@ -1,11 +1,6 @@ package types import ( - "fmt" - "net/url" - "strings" - - "github.com/canonical/lxd/shared/logger" "github.com/canonical/microceph/microceph/constants" ) @@ -14,11 +9,16 @@ type ReplicationRequestType string // This value is split till '-' to get the API request value. const ( + // Put Requests EnableReplicationRequest ReplicationRequestType = "PUT-" + constants.EventEnableReplication ConfigureReplicationRequest ReplicationRequestType = "PUT-" + constants.EventConfigureReplication - DisableReplicationRequest ReplicationRequestType = "DELETE-" + constants.EventDisableReplication - StatusReplicationRequest ReplicationRequestType = "GET-" + constants.EventStatusReplication - ListReplicationRequest ReplicationRequestType = "GET-" + constants.EventListReplication + PromoteReplicationRequest ReplicationRequestType = "PUT-" + constants.EventPromoteReplication + DemoteReplicationRequest ReplicationRequestType = "PUT-" + constants.EventDemoteReplication + // Delete Requests + DisableReplicationRequest ReplicationRequestType = "DELETE-" + constants.EventDisableReplication + // Get Requests + StatusReplicationRequest ReplicationRequestType = "GET-" + constants.EventStatusReplication + ListReplicationRequest ReplicationRequestType = "GET-" + constants.EventListReplication ) type CephWorkloadType string @@ -35,90 +35,3 @@ type ReplicationRequest interface { GetAPIRequestType() string GetWorkloadRequestType() string } - -// Slices -type MirrorPool struct { - Name string - Mode RbdResourceType -} - -type MirrorImage struct { - Name string - Mode RbdReplicationType -} - -type MirrorPools []MirrorPool -type MirrorImages []MirrorImage - -// ################################## RBD Replication Request ################################## -type RbdResourceType string -type RbdReplicationDirection string - -const ( - RbdReplicationDirectionRXOnly RbdReplicationDirection = "rx-only" - RbdReplicationDirectionRXTX RbdReplicationDirection = "rx-tx" -) - -const ( - RbdResourceDisabled RbdResourceType = "disabled" - RbdResourcePool RbdResourceType = "pool" - RbdResourceImage RbdResourceType = "image" -) - -type RbdReplicationType string - -const ( - RbdReplicationDisabled RbdReplicationType = "disable" - RbdReplicationJournaling RbdReplicationType = "journal" - RbdReplicationSnapshot RbdReplicationType = "snapshot" -) - -type RbdReplicationRequest struct { - SourcePool string `json:"source_pool" yaml:"source_pool"` - SourceImage string `json:"source_image" yaml:"source_image"` - RemoteName string `json:"remote" yaml:"remote"` - // snapshot in d,h,m format - Schedule string `json:"schedule" yaml:"schedule"` - ReplicationType RbdReplicationType `json:"replication_type" yaml:"replication_type"` - ResourceType RbdResourceType `json:"resource_type" yaml:"resource_type"` - RequestType ReplicationRequestType `json:"request_type" yaml:"request_type"` - IsForceOp bool `json:"force" yaml:"force"` -} - -func (req RbdReplicationRequest) GetWorkloadType() CephWorkloadType { - return RbdWorkload -} - -func (req RbdReplicationRequest) GetAPIObjectId() string { - // If both Pool and Image values are present encode for query. - if len(req.SourceImage) != 0 && len(req.SourcePool) != 0 { - resource := url.QueryEscape(fmt.Sprintf("%s/%s", req.SourcePool, req.SourceImage)) - // TODO: Make this a debug print. - logger.Infof("REPAPI: Resource: %s", resource) - return resource - } - - return req.SourcePool -} - -func (req RbdReplicationRequest) GetAPIRequestType() string { - frags := strings.Split(string(req.RequestType), "-") - // TODO: Make this a debug print. - logger.Infof("REPAPI: API frags: %v", frags) - if len(frags) == 0 { - return "" - } - - return frags[0] -} - -func (req RbdReplicationRequest) GetWorkloadRequestType() string { - frags := strings.Split(string(req.RequestType), "-") - // TODO: Make this a debug print. - logger.Infof("REPAPI: Workload frags: %v", frags) - if len(frags) < 2 { - return "" - } - - return frags[1] -} diff --git a/microceph/api/types/replication_rbd.go b/microceph/api/types/replication_rbd.go index 2ef6b395..dc9a5148 100644 --- a/microceph/api/types/replication_rbd.go +++ b/microceph/api/types/replication_rbd.go @@ -1,5 +1,13 @@ package types +import ( + "fmt" + "net/url" + "strings" + + "github.com/canonical/lxd/shared/logger" +) + // Types for RBD Pool status table. type RbdPoolStatusImageBrief struct { Name string `json:"name" yaml:"name"` @@ -56,3 +64,76 @@ type RbdPoolBrief struct { } type RbdPoolList []RbdPoolBrief + +// ################################## RBD Replication Request ################################## +type RbdResourceType string +type RbdReplicationDirection string + +const ( + RbdReplicationDirectionRXOnly RbdReplicationDirection = "rx-only" + RbdReplicationDirectionRXTX RbdReplicationDirection = "rx-tx" +) + +const ( + RbdResourceDisabled RbdResourceType = "disabled" + RbdResourcePool RbdResourceType = "pool" + RbdResourceImage RbdResourceType = "image" +) + +type RbdReplicationType string + +const ( + RbdReplicationDisabled RbdReplicationType = "disable" + RbdReplicationJournaling RbdReplicationType = "journal" + RbdReplicationSnapshot RbdReplicationType = "snapshot" +) + +type RbdReplicationRequest struct { + SourcePool string `json:"source_pool" yaml:"source_pool"` + SourceImage string `json:"source_image" yaml:"source_image"` + RemoteName string `json:"remote" yaml:"remote"` + // snapshot in d,h,m format + Schedule string `json:"schedule" yaml:"schedule"` + ReplicationType RbdReplicationType `json:"replication_type" yaml:"replication_type"` + ResourceType RbdResourceType `json:"resource_type" yaml:"resource_type"` + RequestType ReplicationRequestType `json:"request_type" yaml:"request_type"` + IsForceOp bool `json:"force" yaml:"force"` +} + +func (req RbdReplicationRequest) GetWorkloadType() CephWorkloadType { + return RbdWorkload +} + +func (req RbdReplicationRequest) GetAPIObjectId() string { + // If both Pool and Image values are present encode for query. + if len(req.SourceImage) != 0 && len(req.SourcePool) != 0 { + resource := url.QueryEscape(fmt.Sprintf("%s/%s", req.SourcePool, req.SourceImage)) + // TODO: Make this a debug print. + logger.Infof("REPAPI: Resource: %s", resource) + return resource + } + + return req.SourcePool +} + +func (req RbdReplicationRequest) GetAPIRequestType() string { + frags := strings.Split(string(req.RequestType), "-") + // TODO: Make this a debug print. + logger.Infof("REPAPI: API frags: %v", frags) + if len(frags) == 0 { + return "" + } + + return frags[0] +} + +func (req RbdReplicationRequest) GetWorkloadRequestType() string { + frags := strings.Split(string(req.RequestType), "-") + // TODO: Make this a debug print. + logger.Infof("REPAPI: Workload frags: %v", frags) + if len(frags) < 2 { + return "" + } + + return frags[1] +} diff --git a/microceph/ceph/rbd_mirror.go b/microceph/ceph/rbd_mirror.go index c723465a..c3cbea16 100644 --- a/microceph/ceph/rbd_mirror.go +++ b/microceph/ceph/rbd_mirror.go @@ -466,6 +466,38 @@ func peerRemove(pool string, peerId string, localName string, remoteName string) return nil } +func promotePool(poolName string, remoteName string, localName string) error { + args := []string{ + "mirror", "pool", "promote", poolName, + } + + // add --cluster and --id args + args = appendRemoteClusterArgs(args, remoteName, localName) + + _, err := processExec.RunCommand("rbd", args...) + if err != nil { + return fmt.Errorf("failed to promote pool(%s): %v", poolName, err) + } + + return nil +} + +func demotePool(poolName string, remoteName string, localName string) error { + args := []string{ + "mirror", "pool", "demote", poolName, + } + + // add --cluster and --id args + args = appendRemoteClusterArgs(args, remoteName, localName) + + _, err := processExec.RunCommand("rbd", args...) + if err != nil { + return fmt.Errorf("failed to promote pool(%s): %v", poolName, err) + } + + return nil +} + // ########################### HELPERS ########################### // appendRemoteClusterArgs appends the cluster and client arguments to ceph commands diff --git a/microceph/ceph/replication.go b/microceph/ceph/replication.go index 7195ac00..5ad01f4f 100644 --- a/microceph/ceph/replication.go +++ b/microceph/ceph/replication.go @@ -32,8 +32,11 @@ type ReplicationHandlerInterface interface { EnableHandler(ctx context.Context, args ...any) error DisableHandler(ctx context.Context, args ...any) error ConfigureHandler(ctx context.Context, args ...any) error - ListHandler(ctx context.Context, args ...any) error StatusHandler(ctx context.Context, args ...any) error + // Cluster wide Operations (don't require any pool/image info.) + ListHandler(ctx context.Context, args ...any) error + PromoteHandler(ctx context.Context, args ...any) error + DemoteHandler(ctx context.Context, args ...any) error } func GetReplicationHandler(name string) ReplicationHandlerInterface { @@ -57,7 +60,8 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM Permit(constants.EventEnableReplication, StateEnabledReplication). OnEntryFrom(constants.EventDisableReplication, disableHandler). InternalTransition(constants.EventListReplication, listHandler). - InternalTransition(constants.EventDisableReplication, disableHandler) + InternalTransition(constants.EventDisableReplication, disableHandler). + InternalTransition(constants.EventPromoteReplication, statusHandler) // Configure transitions for enabled state. newFsm.Configure(StateEnabledReplication). @@ -66,7 +70,8 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM InternalTransition(constants.EventEnableReplication, enableHandler). InternalTransition(constants.EventConfigureReplication, configureHandler). InternalTransition(constants.EventListReplication, listHandler). - InternalTransition(constants.EventStatusReplication, statusHandler) + InternalTransition(constants.EventStatusReplication, statusHandler). + InternalTransition(constants.EventPromoteReplication, statusHandler) // Check Event params type. var output *string @@ -80,6 +85,7 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM newFsm.SetTriggerParameters(constants.EventConfigureReplication, inputType, outputType, stateType) newFsm.SetTriggerParameters(constants.EventListReplication, inputType, outputType, stateType) newFsm.SetTriggerParameters(constants.EventStatusReplication, inputType, outputType, stateType) + newFsm.SetTriggerParameters(constants.EventPromoteReplication, inputType, outputType, stateType) // Add logger callback for all transitions newFsm.OnTransitioning(logTransitionHandler) @@ -125,3 +131,13 @@ func statusHandler(ctx context.Context, args ...any) error { logger.Infof("REPFSM: Entered Status Handler") return rh.StatusHandler(ctx, args...) } +func PromoteHandler(ctx context.Context, args ...any) error { + rh := args[repArgHandler].(ReplicationHandlerInterface) + logger.Infof("REPFSM: Entered Status Handler") + return rh.PromoteHandler(ctx, args...) +} +func DemoteHandler(ctx context.Context, args ...any) error { + rh := args[repArgHandler].(ReplicationHandlerInterface) + logger.Infof("REPFSM: Entered Status Handler") + return rh.DemoteHandler(ctx, args...) +} diff --git a/microceph/ceph/replication_rbd.go b/microceph/ceph/replication_rbd.go index d622fb78..0e7125d4 100644 --- a/microceph/ceph/replication_rbd.go +++ b/microceph/ceph/replication_rbd.go @@ -245,7 +245,6 @@ func (rh *RbdReplicationHandler) StatusHandler(ctx context.Context, args ...any) } // Also add image info - resp = types.RbdPoolStatus{ Name: rh.Request.SourcePool, Type: string(rh.PoolInfo.Mode), @@ -299,6 +298,34 @@ func (rh *RbdReplicationHandler) StatusHandler(ctx context.Context, args ...any) return nil } +// PromoteHandler promotes sequentially promote all secondary cluster pools to primary. +func (rh *RbdReplicationHandler) PromoteHandler(ctx context.Context, args ...any) error { + // fetch all ceph pools initialised with rbd application. + pools := ListPools("rbd") + + // TODO: make this print debug + logger.Infof("REPRBD: Scan active pools %v", pools) + // fetch verbose pool status for each pool + successPoolList := []string{} + for _, pool := range pools { + err := handlePoolDePro(rh, pool.Name) + if err != nil { + logger.Errorf("Failed to perform (%s) operation on (%s) pool, quitting.", rh.Request.RequestType, pool.Name) + return err + } + + successPoolList = append(successPoolList, pool.Name) + } + + // TODO: Make this print debug. + logger.Infof("REPRBD: Operated %v pools for %s request.", successPoolList, rh.Request.RequestType) + return nil +} + +func (rh *RbdReplicationHandler) DemoteHandler(ctx context.Context, args ...any) error { + return nil +} + // ################### Helper Functions ################### // Enable handler for pool resource. func handlePoolEnablement(rh *RbdReplicationHandler, localSite string, remoteSite string) error { @@ -382,3 +409,58 @@ func handleImageDisablement(rh *RbdReplicationHandler) error { rh.Request.ReplicationType = types.RbdReplicationDisabled return configureImageMirroring(rh.Request) } + +// handlePoolDePro verifies the pool state/ peers and +func handlePoolDePro(rh *RbdReplicationHandler, poolName string) error { + poolStatus, err := GetRbdMirrorPoolStatus(poolName, "", "") + if err != nil { + logger.Warnf("failed to fetch status for %s pool: %v", poolName, err) + return err + } + + poolInfo, err := GetRbdMirrorPoolInfo(poolName, "", "") + if err != nil { + logger.Warnf("failed to fetch status for %s pool: %v", poolName, err) + return err + } + + if poolStatus.State != StateEnabledReplication { + logger.Infof("pool(%s) is not an rbd mirror pool.", poolName) + return nil + } + + for _, peer := range poolInfo.Peers { + // Perform operation only if the remote cluster is a known peer. + if peer.RemoteName == rh.Request.RemoteName { + if rh.Request.RequestType == constants.EventPromoteReplication { + // Promote the pool to primary. + err := handlePoolPromotion(rh, poolName) + if err != nil { + // Stop if failed. + logger.Errorf("failed to promote local site to primary: %v", err) + return err + } + } else if rh.Request.RequestType == constants.EventDemoteReplication { + // Demote the pool to secondary. + err := handlePoolDemotion(rh, poolName) + if err != nil { + // Stop if failed. + logger.Errorf("failed to promote local site to primary: %v", err) + return err + } + } + } + } + + return nil +} + +// Promote local pool to primary. +func handlePoolPromotion(_ *RbdReplicationHandler, poolName string) error { + return promotePool(poolName, "", "") +} + +// Demote local pool to secondary. +func handlePoolDemotion(_ *RbdReplicationHandler, poolName string) error { + return demotePool(poolName, "", "") +} diff --git a/microceph/client/remote_replication.go b/microceph/client/remote_replication.go index cd15f807..80929849 100644 --- a/microceph/client/remote_replication.go +++ b/microceph/client/remote_replication.go @@ -7,7 +7,6 @@ import ( "github.com/canonical/lxd/shared/api" "github.com/canonical/microceph/microceph/api/types" - "github.com/canonical/microceph/microceph/constants" microCli "github.com/canonical/microcluster/v2/client" ) @@ -18,8 +17,9 @@ func SendRemoteReplicationRequest(ctx context.Context, c *microCli.Client, data queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) defer cancel() - if data.GetWorkloadRequestType() == constants.EventListReplication { - // list request uses replication/$workload endpoint + // If no API object provided, create API request to the root endpoint. + if len(data.GetAPIObjectId()) == 0 { + // uses replication/$workload endpoint err = c.Query( queryCtx, data.GetAPIRequestType(), types.ExtendedPathPrefix, api.NewURL().Path("ops", "replication", string(data.GetWorkloadType())), diff --git a/microceph/cmd/microceph/remote_replication_rbd.go b/microceph/cmd/microceph/remote_replication_rbd.go index 4a595c0a..cadfe3c4 100644 --- a/microceph/cmd/microceph/remote_replication_rbd.go +++ b/microceph/cmd/microceph/remote_replication_rbd.go @@ -34,5 +34,13 @@ func (c *cmdRemoteReplicationRbd) Command() *cobra.Command { remoteReplicationRbdConfigureCmd := cmdRemoteReplicationConfigureRbd{common: c.common} cmd.AddCommand(remoteReplicationRbdConfigureCmd.Command()) + // Replication promote command + remoteReplicationRbdPromoteCmd := cmdRemoteReplicationPromoteRbd{common: c.common} + cmd.AddCommand(remoteReplicationRbdPromoteCmd.Command()) + + // Replication demote command + remoteReplicationRbdDemoteCmd := cmdRemoteReplicationDemoteRbd{common: c.common} + cmd.AddCommand(remoteReplicationRbdDemoteCmd.Command()) + return cmd } diff --git a/microceph/cmd/microceph/remote_replication_rbd_demote.go b/microceph/cmd/microceph/remote_replication_rbd_demote.go new file mode 100644 index 00000000..58f08c2a --- /dev/null +++ b/microceph/cmd/microceph/remote_replication_rbd_demote.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/v2/microcluster" + "github.com/spf13/cobra" +) + +type cmdRemoteReplicationDemoteRbd struct { + common *CmdControl +} + +func (c *cmdRemoteReplicationDemoteRbd) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "demote ", + Short: "demote local cluster to secondary", + RunE: c.Run, + } + + return cmd +} + +func (c *cmdRemoteReplicationDemoteRbd) Run(cmd *cobra.Command, args []string) error { + if len(args) != 0 { + return cmd.Help() + } + + m, err := microcluster.App(microcluster.Args{StateDir: c.common.FlagStateDir}) + if err != nil { + return err + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + payload, err := c.prepareRbdPayload(types.DemoteReplicationRequest, args[0]) + if err != nil { + return err + } + + _, err = client.SendRemoteReplicationRequest(context.Background(), cli, payload) + if err != nil { + return err + } + + return nil +} + +func (c *cmdRemoteReplicationDemoteRbd) prepareRbdPayload(requestType types.ReplicationRequestType, remoteName string) (types.RbdReplicationRequest, error) { + retReq := types.RbdReplicationRequest{ + RemoteName: remoteName, + RequestType: requestType, + } + + return retReq, nil +} diff --git a/microceph/cmd/microceph/remote_replication_rbd_promote.go b/microceph/cmd/microceph/remote_replication_rbd_promote.go new file mode 100644 index 00000000..61d1abd5 --- /dev/null +++ b/microceph/cmd/microceph/remote_replication_rbd_promote.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/v2/microcluster" + "github.com/spf13/cobra" +) + +type cmdRemoteReplicationPromoteRbd struct { + common *CmdControl +} + +func (c *cmdRemoteReplicationPromoteRbd) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "promote ", + Short: "promote a secondary remote cluster to primary", + RunE: c.Run, + } + + return cmd +} + +func (c *cmdRemoteReplicationPromoteRbd) Run(cmd *cobra.Command, args []string) error { + if len(args) != 0 { + return cmd.Help() + } + + m, err := microcluster.App(microcluster.Args{StateDir: c.common.FlagStateDir}) + if err != nil { + return err + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + payload, err := c.prepareRbdPayload(types.PromoteReplicationRequest, args[0]) + if err != nil { + return err + } + + _, err = client.SendRemoteReplicationRequest(context.Background(), cli, payload) + if err != nil { + return err + } + + return nil +} + +func (c *cmdRemoteReplicationPromoteRbd) prepareRbdPayload(requestType types.ReplicationRequestType, remoteName string) (types.RbdReplicationRequest, error) { + retReq := types.RbdReplicationRequest{ + RemoteName: remoteName, + RequestType: requestType, + } + + return retReq, nil +} diff --git a/microceph/constants/constants.go b/microceph/constants/constants.go index 975b1701..ef51ed32 100644 --- a/microceph/constants/constants.go +++ b/microceph/constants/constants.go @@ -79,3 +79,5 @@ const EventDisableReplication = "disable_replication" const EventListReplication = "list_replication" const EventStatusReplication = "status_replication" const EventConfigureReplication = "configure_replication" +const EventPromoteReplication = "promote_replication" +const EventDemoteReplication = "demote_replication"