Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: rename deployMode to enableSchedule (#273)
Browse files Browse the repository at this point in the history
## Rationale
`DeployMode` is not a good name, it is not easily understandable. I
suggest changing it to `enableSchedule`.

## Detailed Changes
Change `DeployMode` to `EnableSchedule`.

## Test Plan
CI.
  • Loading branch information
chunshao90 authored Nov 14, 2023
1 parent da81e53 commit 8a070ad
Show file tree
Hide file tree
Showing 16 changed files with 53 additions and 86 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()
procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)

schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())
schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())

return &Cluster{
logger: logger,
Expand Down
3 changes: 0 additions & 3 deletions server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize,
CreatedAt: uint64(createTime),
Expand Down Expand Up @@ -188,7 +187,6 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize,
CreatedAt: c.GetMetadata().GetCreateTime(),
Expand Down Expand Up @@ -436,7 +434,6 @@ func (m *managerImpl) Start(ctx context.Context) error {
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize,
CreatedAt: metadataStorage.CreatedAt,
Expand Down
7 changes: 0 additions & 7 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,6 @@ func (c *ClusterMetadata) GetTotalShardNum() uint32 {
return c.metaData.ShardTotal
}

func (c *ClusterMetadata) GetEnableSchedule() bool {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData.EnableSchedule
}

func (c *ClusterMetadata) GetTopologyType() storage.TopologyType {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
1 change: 0 additions & 1 deletion server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type CreateClusterOpts struct {
}

type UpdateClusterOpts struct {
EnableSchedule bool
TopologyType storage.TopologyType
ProcedureExecutingBatchSize uint32
}
Expand Down
2 changes: 0 additions & 2 deletions server/coordinator/procedure/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
Name: ClusterName,
MinNodeCount: DefaultNodeCount,
ShardTotal: DefaultShardTotal,
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize,
CreatedAt: 0,
Expand Down Expand Up @@ -173,7 +172,6 @@ func InitEmptyClusterWithConfig(ctx context.Context, t *testing.T, shardNumber i
Name: ClusterName,
MinNodeCount: uint32(nodeNumber),
ShardTotal: uint32(shardNumber),
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize,
CreatedAt: 0,
Expand Down
43 changes: 13 additions & 30 deletions server/coordinator/scheduler/manager/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ type SchedulerManager interface {

Stop(ctx context.Context) error

UpdateEnableSchedule(ctx context.Context, enableSchedule bool)
// UpdateEnableSchedule can only be used in dynamic mode, it will throw error when topology type is static.
// when enableSchedule is true, shard topology will not be updated, it is usually used in scenarios such as cluster deploy.
UpdateEnableSchedule(ctx context.Context, enable bool) error

// UpdateDeployMode can only be used in dynamic mode, it will throw error when topology type is static.
// when deploy mode is true, shard topology will not be updated, it is usually used in scenarios such as cluster deploy.
UpdateDeployMode(ctx context.Context, enable bool) error

// GetDeployMode can only be used in dynamic mode, it will throw error when topology type is static.
GetDeployMode(ctx context.Context) (bool, error)
// GetEnableSchedule can only be used in dynamic mode, it will throw error when topology type is static.
GetEnableSchedule(ctx context.Context) (bool, error)

// AddShardAffinityRule adds a shard affinity rule to the manager, and then apply it to the underlying schedulers.
AddShardAffinityRule(ctx context.Context, rule scheduler.ShardAffinityRule) error
Expand Down Expand Up @@ -91,14 +89,13 @@ type schedulerManagerImpl struct {
registerSchedulers []scheduler.Scheduler
shardWatch watch.ShardWatch
isRunning atomic.Bool
enableSchedule bool
topologyType storage.TopologyType
procedureExecutingBatchSize uint32
deployMode bool
enableSchedule bool
shardAffinities map[storage.ShardID]scheduler.ShardAffinityRule
}

func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) SchedulerManager {
func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) SchedulerManager {
var shardWatch watch.ShardWatch
switch topologyType {
case storage.TopologyTypeDynamic:
Expand All @@ -120,10 +117,9 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory
registerSchedulers: []scheduler.Scheduler{},
shardWatch: shardWatch,
isRunning: atomic.Bool{},
enableSchedule: enableSchedule,
topologyType: topologyType,
procedureExecutingBatchSize: procedureExecutingBatchSize,
deployMode: false,
enableSchedule: false,
shardAffinities: make(map[storage.ShardID]scheduler.ShardAffinityRule),
}
}
Expand Down Expand Up @@ -170,11 +166,6 @@ func (m *schedulerManagerImpl) Start(ctx context.Context) error {
clusterSnapshot := m.clusterMetadata.GetClusterSnapshot()
m.logger.Debug("scheduler manager invoke", zap.String("clusterSnapshot", fmt.Sprintf("%v", clusterSnapshot)))

// TODO: Perhaps these codes related to schedulerOperator need to be refactored.
// If schedulerOperator is turned on, the scheduler will only be scheduled in the non-stable state.
if !m.enableSchedule && clusterSnapshot.Topology.ClusterView.State == storage.ClusterStateStable {
continue
}
if clusterSnapshot.Topology.IsPrepareFinished() {
m.logger.Info("try to update cluster state to stable")
if err := m.clusterMetadata.UpdateClusterView(ctx, storage.ClusterStateStable, clusterSnapshot.Topology.ClusterView.ShardNodes); err != nil {
Expand Down Expand Up @@ -270,39 +261,31 @@ func (m *schedulerManagerImpl) Scheduler(ctx context.Context, clusterSnapshot me
return results
}

func (m *schedulerManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) {
m.lock.Lock()
m.enableSchedule = enableSchedule
m.lock.Unlock()

m.logger.Info("scheduler manager update enableSchedule", zap.Bool("enableSchedule", enableSchedule))
}

func (m *schedulerManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error {
func (m *schedulerManagerImpl) UpdateEnableSchedule(ctx context.Context, enable bool) error {
m.lock.Lock()
defer m.lock.Unlock()

if m.topologyType != storage.TopologyTypeDynamic {
return ErrInvalidTopologyType.WithCausef("deploy mode could only update when topology type is dynamic")
}

m.deployMode = enable
m.enableSchedule = enable
for _, scheduler := range m.registerSchedulers {
scheduler.UpdateDeployMode(ctx, enable)
scheduler.UpdateEnableSchedule(ctx, enable)
}

return nil
}

func (m *schedulerManagerImpl) GetDeployMode(_ context.Context) (bool, error) {
func (m *schedulerManagerImpl) GetEnableSchedule(_ context.Context) (bool, error) {
m.lock.RLock()
defer m.lock.RUnlock()

if m.topologyType != storage.TopologyTypeDynamic {
return false, ErrInvalidTopologyType.WithCausef("deploy mode could only get when topology type is dynamic")
}

return m.deployMode, nil
return m.enableSchedule, nil
}

func (m *schedulerManagerImpl) AddShardAffinityRule(ctx context.Context, rule scheduler.ShardAffinityRule) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func TestSchedulerManager(t *testing.T) {
_, client, _ := etcdutil.PrepareEtcdServerAndClient(t)

// Create scheduler manager with enableScheduler equal to false.
schedulerManager := manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1)
schedulerManager := manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", storage.TopologyTypeStatic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
err = schedulerManager.Stop(ctx)
re.NoError(err)

// Create scheduler manager with static topology.
schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1)
schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", storage.TopologyTypeStatic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
schedulers := schedulerManager.ListScheduler()
Expand All @@ -61,7 +61,7 @@ func TestSchedulerManager(t *testing.T) {
re.NoError(err)

// Create scheduler manager with dynamic topology.
schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1)
schedulerManager = manager.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", storage.TopologyTypeDynamic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
schedulers = schedulerManager.ListScheduler()
Expand Down
20 changes: 10 additions & 10 deletions server/coordinator/scheduler/rebalanced/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ type schedulerImpl struct {
// The lock is used to protect following fields.
lock sync.Mutex
// latestShardNodeMapping is used to record last stable shard topology,
// when deployMode is true, rebalancedShardScheduler will recover cluster according to the topology.
// when enableSchedule is true, rebalancedShardScheduler will recover cluster according to the topology.
latestShardNodeMapping map[storage.ShardID]metadata.RegisteredNode
// The `latestShardNodeMapping` will be used directly, if deployMode is set.
deployMode bool
// The `latestShardNodeMapping` will be used directly, if enableSchedule is set.
enableSchedule bool
// shardAffinityRule is used to control the shard distribution.
shardAffinityRule map[storage.ShardID]scheduler.ShardAffinity
}
Expand All @@ -58,7 +58,7 @@ func NewShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePic
procedureExecutingBatchSize: procedureExecutingBatchSize,
lock: sync.Mutex{},
latestShardNodeMapping: map[storage.ShardID]metadata.RegisteredNode{},
deployMode: false,
enableSchedule: false,
shardAffinityRule: map[storage.ShardID]scheduler.ShardAffinity{},
}
}
Expand All @@ -67,8 +67,8 @@ func (r *schedulerImpl) Name() string {
return "rebalanced_scheduler"
}

func (r *schedulerImpl) UpdateDeployMode(_ context.Context, enable bool) {
r.updateDeployMode(enable)
func (r *schedulerImpl) UpdateEnableSchedule(_ context.Context, enable bool) {
r.updateEnableSchedule(enable)
}

func (r *schedulerImpl) AddShardAffinityRule(_ context.Context, rule scheduler.ShardAffinityRule) error {
Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *schedulerImpl) Schedule(ctx context.Context, clusterSnapshot metadata.S
var procedures []procedure.Procedure
var reasons strings.Builder

// ShardNodeMapping only update when deployMode is false.
// ShardNodeMapping only update when enableSchedule is false.
shardNodeMapping, err := r.generateLatestShardNodeMapping(ctx, clusterSnapshot)
if err != nil {
return emptySchedulerRes, nil
Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *schedulerImpl) generateLatestShardNodeMapping(ctx context.Context, snap
defer r.lock.Unlock()
var err error
shardNodeMapping := r.latestShardNodeMapping
if !r.deployMode {
if !r.enableSchedule {
pickConfig := nodepicker.Config{
NumTotalShards: numShards,
ShardAffinityRule: maps.Clone(r.shardAffinityRule),
Expand All @@ -219,9 +219,9 @@ func (r *schedulerImpl) generateLatestShardNodeMapping(ctx context.Context, snap
return shardNodeMapping, nil
}

func (r *schedulerImpl) updateDeployMode(deployMode bool) {
func (r *schedulerImpl) updateEnableSchedule(enableSchedule bool) {
r.lock.Lock()
defer r.lock.Unlock()

r.deployMode = deployMode
r.enableSchedule = enableSchedule
}
4 changes: 2 additions & 2 deletions server/coordinator/scheduler/reopen/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (r schedulerImpl) Name() string {
return "reopen_scheduler"
}

func (r schedulerImpl) UpdateDeployMode(_ context.Context, _ bool) {
// ReopenShardScheduler do not need deployMode.
func (r schedulerImpl) UpdateEnableSchedule(_ context.Context, _ bool) {
// ReopenShardScheduler do not need enableSchedule.
}

func (r schedulerImpl) AddShardAffinityRule(_ context.Context, _ scheduler.ShardAffinityRule) error {
Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Scheduler interface {
Name() string
// Schedule will generate procedure based on current cluster snapshot, which will be submitted to ProcedureManager, and whether it is actually executed depends on the current state of ProcedureManager.
Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error)
// UpdateDeployMode is used to update deployMode for scheduler,
// DeployMode means that the cluster topology is locked and the mapping between shards and nodes cannot be changed.
UpdateDeployMode(ctx context.Context, enable bool)
// UpdateEnableSchedule is used to update enableSchedule for scheduler,
// EnableSchedule means that the cluster topology is locked and the mapping between shards and nodes cannot be changed.
UpdateEnableSchedule(ctx context.Context, enable bool)
AddShardAffinityRule(ctx context.Context, rule ShardAffinityRule) error
RemoveShardAffinityRule(ctx context.Context, shardID storage.ShardID) error
ListShardAffinityRule(ctx context.Context) (ShardAffinityRule, error)
Expand Down
4 changes: 2 additions & 2 deletions server/coordinator/scheduler/static/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (s schedulerImpl) Name() string {
return "static_scheduler"
}

func (s schedulerImpl) UpdateDeployMode(_ context.Context, _ bool) {
// StaticTopologyShardScheduler do not need deployMode.
func (s schedulerImpl) UpdateEnableSchedule(_ context.Context, _ bool) {
// StaticTopologyShardScheduler do not need EnableSchedule.
}

func (s schedulerImpl) AddShardAffinityRule(_ context.Context, _ scheduler.ShardAffinityRule) error {
Expand Down
21 changes: 10 additions & 11 deletions server/service/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (a *API) NewAPIRouter() *Router {
router.DebugGet("/pprof/threadCreate", a.pprofThreadCreate)
router.DebugGet(fmt.Sprintf("/diagnose/:%s/shards", clusterNameParam), wrap(a.diagnoseShards, true, a.forwardClient))
router.DebugGet("/leader", wrap(a.getLeader, false, a.forwardClient))
router.DebugGet(fmt.Sprintf("/clusters/:%s/deployMode", clusterNameParam), wrap(a.getDeployMode, true, a.forwardClient))
router.DebugPut(fmt.Sprintf("/clusters/:%s/deployMode", clusterNameParam), wrap(a.updateDeployMode, true, a.forwardClient))
router.DebugGet(fmt.Sprintf("/clusters/:%s/enableSchedule", clusterNameParam), wrap(a.getEnableSchedule, true, a.forwardClient))
router.DebugPut(fmt.Sprintf("/clusters/:%s/enableSchedule", clusterNameParam), wrap(a.updateEnableSchedule, true, a.forwardClient))

// Register ETCD API.
router.Post("/etcd/promoteLearner", wrap(a.etcdAPI.promoteLearner, false, a.forwardClient))
Expand Down Expand Up @@ -344,7 +344,6 @@ func (a *API) updateCluster(req *http.Request) apiFuncResult {
}

if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{
EnableSchedule: updateClusterRequest.EnableSchedule,
TopologyType: topologyType,
ProcedureExecutingBatchSize: updateClusterRequest.ProcedureExecutingBatchSize,
}); err != nil {
Expand Down Expand Up @@ -515,7 +514,7 @@ func (a *API) queryTable(r *http.Request) apiFuncResult {
return okResult(tables)
}

func (a *API) getDeployMode(r *http.Request) apiFuncResult {
func (a *API) getEnableSchedule(r *http.Request) apiFuncResult {
ctx := r.Context()
clusterName := Param(ctx, clusterNameParam)
if len(clusterName) == 0 {
Expand All @@ -527,15 +526,15 @@ func (a *API) getDeployMode(r *http.Request) apiFuncResult {
return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error()))
}

deployMode, err := c.GetSchedulerManager().GetDeployMode(r.Context())
enableSchedule, err := c.GetSchedulerManager().GetEnableSchedule(r.Context())
if err != nil {
return errResult(ErrGetDeployMode, err.Error())
return errResult(ErrGetEnableSchedule, err.Error())
}

return okResult(deployMode)
return okResult(enableSchedule)
}

func (a *API) updateDeployMode(r *http.Request) apiFuncResult {
func (a *API) updateEnableSchedule(r *http.Request) apiFuncResult {
ctx := r.Context()
clusterName := Param(ctx, clusterNameParam)
if len(clusterName) == 0 {
Expand All @@ -547,15 +546,15 @@ func (a *API) updateDeployMode(r *http.Request) apiFuncResult {
return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error()))
}

var req UpdateDeployModeRequest
var req UpdateEnableScheduleRequest
err = json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return errResult(ErrParseRequest, err.Error())
}

err = c.GetSchedulerManager().UpdateDeployMode(r.Context(), req.Enable)
err = c.GetSchedulerManager().UpdateEnableSchedule(r.Context(), req.Enable)
if err != nil {
return errResult(ErrUpdateDeployMode, err.Error())
return errResult(ErrUpdateEnableSchedule, err.Error())
}

return okResult(req.Enable)
Expand Down
4 changes: 2 additions & 2 deletions server/service/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ var (
ErrHealthCheck = coderr.NewCodeError(coderr.Internal, "server health check")
ErrParseTopology = coderr.NewCodeError(coderr.Internal, "parse topology type")
ErrUpdateFlowLimiter = coderr.NewCodeError(coderr.Internal, "update flow limiter")
ErrGetDeployMode = coderr.NewCodeError(coderr.Internal, "get deploy mode")
ErrUpdateDeployMode = coderr.NewCodeError(coderr.Internal, "update deploy mode")
ErrGetEnableSchedule = coderr.NewCodeError(coderr.Internal, "get enableSchedule")
ErrUpdateEnableSchedule = coderr.NewCodeError(coderr.Internal, "update enableSchedule")
ErrAddLearner = coderr.NewCodeError(coderr.Internal, "add member as learner")
ErrListMembers = coderr.NewCodeError(coderr.Internal, "get member list")
ErrRemoveMembers = coderr.NewCodeError(coderr.Internal, "remove member")
Expand Down
2 changes: 1 addition & 1 deletion server/service/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type UpdateFlowLimiterRequest struct {
Enable bool `json:"enable"`
}

type UpdateDeployModeRequest struct {
type UpdateEnableScheduleRequest struct {
Enable bool `json:"enable"`
}

Expand Down
1 change: 0 additions & 1 deletion server/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestStorage_CreateAndListCluster(t *testing.T) {
Name: fmt.Sprintf(nameFormat, i),
MinNodeCount: uint32(i),
ShardTotal: uint32(i),
EnableSchedule: false,
TopologyType: TopologyTypeStatic,
ProcedureExecutingBatchSize: 100,
CreatedAt: uint64(time.Now().UnixMilli()),
Expand Down
Loading

0 comments on commit 8a070ad

Please sign in to comment.