Skip to content

Commit

Permalink
remove api mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 9, 2025
1 parent 90407fa commit fc2e1bd
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 51 deletions.
14 changes: 5 additions & 9 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ func addFlags(cmd *cobra.Command) {
}

func createServerWrapper(cmd *cobra.Command, args []string) {
isKeyspaceEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceEnabled)
isKeyspaceGroupEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceGroupEnabled)
}

func start(cmd *cobra.Command, args []string, isKeyspaceEnabled bool) {
func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down Expand Up @@ -210,11 +210,7 @@ func start(cmd *cobra.Command, args []string, isKeyspaceEnabled bool) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if isKeyspaceEnabled {
versioninfo.Log(server.PDKeyspaceMode)
} else {
versioninfo.Log(server.PDMode)
}
versioninfo.Log(server.PD)

for _, msg := range cfg.WarningMsgs {
log.Warn(msg)
Expand All @@ -237,7 +233,7 @@ func start(cmd *cobra.Command, args []string, isKeyspaceEnabled bool) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceEnabled, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceGroupEnabled, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
29 changes: 20 additions & 9 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsKeyspaceEnabled() bool
IsKeyspaceGroupEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +156,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isKeyspaceEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
isKeyspaceGroupEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isKeyspaceEnabled = s.IsKeyspaceEnabled()
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isKeyspaceEnabled {
if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
Expand Down Expand Up @@ -419,7 +419,18 @@ func (c *RaftCluster) checkSchedulingService() {
}

// checkTSOService checks the TSO service.
// In non-serverless env:
// 1. If the dynamic switching is disabled, it will switch to the internal TSO service.
// 2. If the dynamic switching is enabled, it will check the external TSO service.
// If the external TSO service is available, it will switch to the external TSO service.
// If the external TSO service is unavailable, it will switch to the internal TSO service.
//
// In serverless env, we don't allow dynamic switching.
// Whether we use the internal TSO service or the external TSO service is determined by the `isKeyspaceGroupEnabled``.
func (c *RaftCluster) checkTSOService() {
if c.isKeyspaceGroupEnabled {
return
}
if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
Expand Down
5 changes: 3 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ const (
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
defaultEnableTSODynamicSwitching = true
defaultEnableSchedulingFallback = true
// In serverless environment, the default value of `enable-scheduling` is always false.
defaultEnableTSODynamicSwitching = false
)

var (
Expand Down
40 changes: 19 additions & 21 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ const (

recoveringMarkPath = "cluster/markers/snapshot-recovering"

// PDMode represents that server is in PD mode.
PDMode = "PD"
// PDKeyspaceMode represents that server is in PD Keyspace mode.
PDKeyspaceMode = "PD Keyspace"
// PD is name of member.
PD = "PD"

// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
// Note: it need to be less than client.defaultPDTimeout
Expand Down Expand Up @@ -227,7 +225,7 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
isKeyspaceGroupEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -240,14 +238,8 @@ type Server struct {
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, isKeyspaceEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
var mode string
if isKeyspaceEnabled {
mode = PDKeyspaceMode
} else {
mode = PDMode
}
log.Info("PD config", zap.Bool("enable-keyspace", isKeyspaceEnabled), zap.Reflect("config", cfg))
func CreateServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -259,7 +251,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, isKeyspaceEnabled boo
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
mode: mode,
isKeyspaceGroupEnabled: isKeyspaceGroupEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -478,7 +470,7 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
if s.IsKeyspaceEnabled() {
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
}
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
Expand Down Expand Up @@ -530,7 +522,7 @@ func (s *Server) Close() {
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsKeyspaceEnabled() {
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager.Close()
}

Expand Down Expand Up @@ -786,9 +778,9 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsKeyspaceEnabled returns whether the server is in PD Keyspace mode.
func (s *Server) IsKeyspaceEnabled() bool {
return s.mode == PDKeyspaceMode
// IsKeyspaceGroupEnabled returns whether the keyspace group is enabled.
func (s *Server) IsKeyspaceGroupEnabled() bool {
return s.isKeyspaceGroupEnabled
}

// GetAddr returns the server urls for clients.
Expand Down Expand Up @@ -1388,7 +1380,13 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {

// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if name == constant.TSOServiceName && s.isKeyspaceGroupEnabled {
return true
}
if !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
return s.cluster.IsServiceIndependent(name)
}
return false
Expand Down Expand Up @@ -1722,13 +1720,13 @@ func (s *Server) campaignLeader() {
}
// EnableLeader to accept the remaining service, such as GetStore, GetRegion.
s.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1)
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
cancel()
s.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0)
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
})

CheckPDVersionWithClusterVersion(s.persistOptions)
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestMode(t *testing.T) {
err = svr.Run()
re.NoError(err)
MustWaitLeader(re, []*Server{svr})
re.True(svr.IsKeyspaceEnabled())
re.True(svr.IsKeyspaceGroupEnabled())
}

func TestIsPathInDirectory(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type TestServer struct {
var zapLogOnce sync.Once

// NewTestServer creates a new TestServer.
func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceEnabled ...bool) (*TestServer, error) {
func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabled ...bool) (*TestServer, error) {
// disable the heartbeat async runner in test
cfg.Schedule.EnableHeartbeatConcurrentRunner = false
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
Expand All @@ -97,11 +97,11 @@ func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceEnabled ..
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
var enableKeyspace bool
if len(isKeyspaceEnabled) > 0 {
enableKeyspace = isKeyspaceEnabled[0]
var enableKeyspaceGroup bool
if len(isKeyspaceGroupEnabled) > 0 {
enableKeyspaceGroup = isKeyspaceGroupEnabled[0]
}
svr, err := server.CreateServer(ctx, cfg, enableKeyspace, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, enableKeyspaceGroup, serviceBuilders...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -437,7 +437,7 @@ func NewTestClusterWithKeyspace(ctx context.Context, initialServerCount int, opt
return createTestCluster(ctx, initialServerCount, true, opts...)
}

func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceEnabled bool, opts ...ConfigOption) (*TestCluster, error) {
func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGroupEnabled bool, opts ...ConfigOption) (*TestCluster, error) {
schedulers.Register()
config := newClusterConfig(initialServerCount)
servers := make(map[string]*TestServer)
Expand All @@ -446,7 +446,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceEn
if err != nil {
return nil, err
}
s, err := NewTestServer(ctx, serverConf, isKeyspaceEnabled)
s, err := NewTestServer(ctx, serverConf, isKeyspaceGroupEnabled)
if err != nil {
return nil, err
}
Expand All @@ -470,7 +470,7 @@ func RestartTestPDCluster(ctx context.Context, cluster *TestCluster) (*TestClust
}

func restartTestCluster(
ctx context.Context, cluster *TestCluster, isKeyspaceEnabled bool,
ctx context.Context, cluster *TestCluster, isKeyspaceGroupEnabled bool,
) (newTestCluster *TestCluster, err error) {
schedulers.Register()
newTestCluster = &TestCluster{
Expand All @@ -497,7 +497,7 @@ func restartTestCluster(
newServer *TestServer
serverErr error
)
newServer, serverErr = NewTestServer(ctx, serverCfg, isKeyspaceEnabled)
newServer, serverErr = NewTestServer(ctx, serverCfg, isKeyspaceGroupEnabled)
serverMap.Store(serverName, newServer)
errorMap.Store(serverName, serverErr)
}(serverName, server)
Expand Down

0 comments on commit fc2e1bd

Please sign in to comment.