Skip to content

Commit

Permalink
Added support to perform cluster promotion/demotion
Browse files Browse the repository at this point in the history
Signed-off-by: Utkarsh Bhatt <[email protected]>
  • Loading branch information
UtkarshBhatthere committed Oct 7, 2024
1 parent f12fbcf commit 56a8485
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 102 deletions.
103 changes: 8 additions & 95 deletions microceph/api/types/replication.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package types

import (
"fmt"
"net/url"
"strings"

"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/constants"
)

Expand All @@ -14,11 +9,16 @@ type ReplicationRequestType string

// This value is split till '-' to get the API request value.
const (
// Put Requests
EnableReplicationRequest ReplicationRequestType = "PUT-" + constants.EventEnableReplication
ConfigureReplicationRequest ReplicationRequestType = "PUT-" + constants.EventConfigureReplication
DisableReplicationRequest ReplicationRequestType = "DELETE-" + constants.EventDisableReplication
StatusReplicationRequest ReplicationRequestType = "GET-" + constants.EventStatusReplication
ListReplicationRequest ReplicationRequestType = "GET-" + constants.EventListReplication
PromoteReplicationRequest ReplicationRequestType = "PUT-" + constants.EventPromoteReplication
DemoteReplicationRequest ReplicationRequestType = "PUT-" + constants.EventDemoteReplication
// Delete Requests
DisableReplicationRequest ReplicationRequestType = "DELETE-" + constants.EventDisableReplication
// Get Requests
StatusReplicationRequest ReplicationRequestType = "GET-" + constants.EventStatusReplication
ListReplicationRequest ReplicationRequestType = "GET-" + constants.EventListReplication
)

type CephWorkloadType string
Expand All @@ -35,90 +35,3 @@ type ReplicationRequest interface {
GetAPIRequestType() string
GetWorkloadRequestType() string
}

// Slices
type MirrorPool struct {
Name string
Mode RbdResourceType
}

type MirrorImage struct {
Name string
Mode RbdReplicationType
}

type MirrorPools []MirrorPool
type MirrorImages []MirrorImage

// ################################## RBD Replication Request ##################################
type RbdResourceType string
type RbdReplicationDirection string

const (
RbdReplicationDirectionRXOnly RbdReplicationDirection = "rx-only"
RbdReplicationDirectionRXTX RbdReplicationDirection = "rx-tx"
)

const (
RbdResourceDisabled RbdResourceType = "disabled"
RbdResourcePool RbdResourceType = "pool"
RbdResourceImage RbdResourceType = "image"
)

type RbdReplicationType string

const (
RbdReplicationDisabled RbdReplicationType = "disable"
RbdReplicationJournaling RbdReplicationType = "journal"
RbdReplicationSnapshot RbdReplicationType = "snapshot"
)

type RbdReplicationRequest struct {
SourcePool string `json:"source_pool" yaml:"source_pool"`
SourceImage string `json:"source_image" yaml:"source_image"`
RemoteName string `json:"remote" yaml:"remote"`
// snapshot in d,h,m format
Schedule string `json:"schedule" yaml:"schedule"`
ReplicationType RbdReplicationType `json:"replication_type" yaml:"replication_type"`
ResourceType RbdResourceType `json:"resource_type" yaml:"resource_type"`
RequestType ReplicationRequestType `json:"request_type" yaml:"request_type"`
IsForceOp bool `json:"force" yaml:"force"`
}

func (req RbdReplicationRequest) GetWorkloadType() CephWorkloadType {
return RbdWorkload
}

func (req RbdReplicationRequest) GetAPIObjectId() string {
// If both Pool and Image values are present encode for query.
if len(req.SourceImage) != 0 && len(req.SourcePool) != 0 {
resource := url.QueryEscape(fmt.Sprintf("%s/%s", req.SourcePool, req.SourceImage))
// TODO: Make this a debug print.
logger.Infof("REPAPI: Resource: %s", resource)
return resource
}

return req.SourcePool
}

func (req RbdReplicationRequest) GetAPIRequestType() string {
frags := strings.Split(string(req.RequestType), "-")
// TODO: Make this a debug print.
logger.Infof("REPAPI: API frags: %v", frags)
if len(frags) == 0 {
return ""
}

return frags[0]
}

func (req RbdReplicationRequest) GetWorkloadRequestType() string {
frags := strings.Split(string(req.RequestType), "-")
// TODO: Make this a debug print.
logger.Infof("REPAPI: Workload frags: %v", frags)
if len(frags) < 2 {
return ""
}

return frags[1]
}
81 changes: 81 additions & 0 deletions microceph/api/types/replication_rbd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package types

import (
"fmt"
"net/url"
"strings"

"github.com/canonical/lxd/shared/logger"
)

// Types for RBD Pool status table.
type RbdPoolStatusImageBrief struct {
Name string `json:"name" yaml:"name"`
Expand Down Expand Up @@ -56,3 +64,76 @@ type RbdPoolBrief struct {
}

type RbdPoolList []RbdPoolBrief

// ################################## RBD Replication Request ##################################
type RbdResourceType string
type RbdReplicationDirection string

const (
RbdReplicationDirectionRXOnly RbdReplicationDirection = "rx-only"
RbdReplicationDirectionRXTX RbdReplicationDirection = "rx-tx"
)

const (
RbdResourceDisabled RbdResourceType = "disabled"
RbdResourcePool RbdResourceType = "pool"
RbdResourceImage RbdResourceType = "image"
)

type RbdReplicationType string

const (
RbdReplicationDisabled RbdReplicationType = "disable"
RbdReplicationJournaling RbdReplicationType = "journal"
RbdReplicationSnapshot RbdReplicationType = "snapshot"
)

type RbdReplicationRequest struct {
SourcePool string `json:"source_pool" yaml:"source_pool"`
SourceImage string `json:"source_image" yaml:"source_image"`
RemoteName string `json:"remote" yaml:"remote"`
// snapshot in d,h,m format
Schedule string `json:"schedule" yaml:"schedule"`
ReplicationType RbdReplicationType `json:"replication_type" yaml:"replication_type"`
ResourceType RbdResourceType `json:"resource_type" yaml:"resource_type"`
RequestType ReplicationRequestType `json:"request_type" yaml:"request_type"`
IsForceOp bool `json:"force" yaml:"force"`
}

func (req RbdReplicationRequest) GetWorkloadType() CephWorkloadType {
return RbdWorkload
}

func (req RbdReplicationRequest) GetAPIObjectId() string {
// If both Pool and Image values are present encode for query.
if len(req.SourceImage) != 0 && len(req.SourcePool) != 0 {
resource := url.QueryEscape(fmt.Sprintf("%s/%s", req.SourcePool, req.SourceImage))
// TODO: Make this a debug print.
logger.Infof("REPAPI: Resource: %s", resource)
return resource
}

return req.SourcePool
}

func (req RbdReplicationRequest) GetAPIRequestType() string {
frags := strings.Split(string(req.RequestType), "-")
// TODO: Make this a debug print.
logger.Infof("REPAPI: API frags: %v", frags)
if len(frags) == 0 {
return ""
}

return frags[0]
}

func (req RbdReplicationRequest) GetWorkloadRequestType() string {
frags := strings.Split(string(req.RequestType), "-")
// TODO: Make this a debug print.
logger.Infof("REPAPI: Workload frags: %v", frags)
if len(frags) < 2 {
return ""
}

return frags[1]
}
32 changes: 32 additions & 0 deletions microceph/ceph/rbd_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,38 @@ func peerRemove(pool string, peerId string, localName string, remoteName string)
return nil
}

func promotePool(poolName string, remoteName string, localName string) error {
args := []string{
"mirror", "pool", "promote", poolName,
}

// add --cluster and --id args
args = appendRemoteClusterArgs(args, remoteName, localName)

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
return fmt.Errorf("failed to promote pool(%s): %v", poolName, err)
}

return nil
}

func demotePool(poolName string, remoteName string, localName string) error {
args := []string{
"mirror", "pool", "demote", poolName,
}

// add --cluster and --id args
args = appendRemoteClusterArgs(args, remoteName, localName)

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
return fmt.Errorf("failed to promote pool(%s): %v", poolName, err)
}

return nil
}

// ########################### HELPERS ###########################

// appendRemoteClusterArgs appends the cluster and client arguments to ceph commands
Expand Down
22 changes: 19 additions & 3 deletions microceph/ceph/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ type ReplicationHandlerInterface interface {
EnableHandler(ctx context.Context, args ...any) error
DisableHandler(ctx context.Context, args ...any) error
ConfigureHandler(ctx context.Context, args ...any) error
ListHandler(ctx context.Context, args ...any) error
StatusHandler(ctx context.Context, args ...any) error
// Cluster wide Operations (don't require any pool/image info.)
ListHandler(ctx context.Context, args ...any) error
PromoteHandler(ctx context.Context, args ...any) error
DemoteHandler(ctx context.Context, args ...any) error
}

func GetReplicationHandler(name string) ReplicationHandlerInterface {
Expand All @@ -57,7 +60,8 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM
Permit(constants.EventEnableReplication, StateEnabledReplication).
OnEntryFrom(constants.EventDisableReplication, disableHandler).
InternalTransition(constants.EventListReplication, listHandler).
InternalTransition(constants.EventDisableReplication, disableHandler)
InternalTransition(constants.EventDisableReplication, disableHandler).
InternalTransition(constants.EventPromoteReplication, statusHandler)

// Configure transitions for enabled state.
newFsm.Configure(StateEnabledReplication).
Expand All @@ -66,7 +70,8 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM
InternalTransition(constants.EventEnableReplication, enableHandler).
InternalTransition(constants.EventConfigureReplication, configureHandler).
InternalTransition(constants.EventListReplication, listHandler).
InternalTransition(constants.EventStatusReplication, statusHandler)
InternalTransition(constants.EventStatusReplication, statusHandler).
InternalTransition(constants.EventPromoteReplication, statusHandler)

// Check Event params type.
var output *string
Expand All @@ -80,6 +85,7 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM
newFsm.SetTriggerParameters(constants.EventConfigureReplication, inputType, outputType, stateType)
newFsm.SetTriggerParameters(constants.EventListReplication, inputType, outputType, stateType)
newFsm.SetTriggerParameters(constants.EventStatusReplication, inputType, outputType, stateType)
newFsm.SetTriggerParameters(constants.EventPromoteReplication, inputType, outputType, stateType)

// Add logger callback for all transitions
newFsm.OnTransitioning(logTransitionHandler)
Expand Down Expand Up @@ -125,3 +131,13 @@ func statusHandler(ctx context.Context, args ...any) error {
logger.Infof("REPFSM: Entered Status Handler")
return rh.StatusHandler(ctx, args...)
}
func PromoteHandler(ctx context.Context, args ...any) error {
rh := args[repArgHandler].(ReplicationHandlerInterface)
logger.Infof("REPFSM: Entered Status Handler")
return rh.PromoteHandler(ctx, args...)
}
func DemoteHandler(ctx context.Context, args ...any) error {
rh := args[repArgHandler].(ReplicationHandlerInterface)
logger.Infof("REPFSM: Entered Status Handler")
return rh.DemoteHandler(ctx, args...)
}
Loading

0 comments on commit 56a8485

Please sign in to comment.