Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove api mode #8780

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
type serviceType int

const (
apiService serviceType = iota
pdService serviceType = iota
tsoService
)

Expand Down Expand Up @@ -678,7 +678,7 @@
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *serviceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
case pdService:

Check warning on line 681 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L681

Added line #L681 was not covered by tests
urls = c.GetServiceURLs()
case tsoService:
leaderURL := c.getLeaderURL()
Expand Down
34 changes: 11 additions & 23 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -52,8 +51,8 @@ import (
)

const (
apiMode = "api"
tsoMode = "tso"
// Only serverless use this variable, we can use it to determine whether the PD is running in keyspace mode.
// The name is a little misleading, but it's kept for backward compatibility.
serviceModeEnv = "PD_SERVICE_MODE"
)

Expand Down Expand Up @@ -89,7 +88,7 @@ func NewServiceCommand() *cobra.Command {
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: tsoMode,
Use: "tso",
Short: "Run the TSO service",
Run: tso.CreateServerWrapper,
}
Expand Down Expand Up @@ -129,11 +128,12 @@ func NewSchedulingServiceCommand() *cobra.Command {
}

// NewPDServiceCommand returns the PD service command.
// We can use pd directly. This command is kept for backward compatibility.
func NewPDServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: apiMode,
Use: "api",
Short: "Run the PD service",
Run: createPDServiceWrapper,
Run: createServerWrapper,
}
addFlags(cmd)
return cmd
Expand All @@ -160,20 +160,12 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
}

func createPDServiceWrapper(cmd *cobra.Command, args []string) {
start(cmd, args, cmd.CalledAs())
}

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == apiMode {
start(cmd, args, apiMode)
} else {
start(cmd, args)
}
isKeyspaceGroupEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceGroupEnabled)
}

func start(cmd *cobra.Command, args []string, services ...string) {
func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down Expand Up @@ -218,11 +210,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.PDServiceMode)
} else {
versioninfo.Log(server.PDMode)
}
versioninfo.Log(server.PD)

for _, msg := range cfg.WarningMsgs {
log.Warn(msg)
Expand All @@ -245,7 +233,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceGroupEnabled, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 88 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L88

Added line #L88 was not covered by tests
} else {
log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service already registered", zap.String("prefix", prefix), zap.String("service-name", name))

Check warning on line 90 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L90

Added line #L90 was not covered by tests
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 97 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L97

Added line #L97 was not covered by tests
} else {
log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (c *Cluster) triggerMembershipCheck() {
}
}

// SwitchPDServiceLeader switches the PD service leader.
func (c *Cluster) SwitchPDServiceLeader(new pdpb.PDClient) bool {
// SwitchPDLeader switches the PD leader.
func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool {
old := c.pdLeader.Load()
return c.pdLeader.CompareAndSwap(old, new)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD service,
// storeConfig will be fetched from TiKV by PD,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = ttl
Expand Down Expand Up @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any configuration changes.
// Watcher is used to watch the PD for any configuration changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -76,7 +76,7 @@ type persistedConfig struct {
Store sc.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD service.
// NewWatcher creates a new watcher to watch the config meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to PD service.
// TODO: if we need to send the error back to PD.

Check warning on line 162 in pkg/mcs/scheduling/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/grpc_service.go#L162

Added line #L162 was not covered by tests
log.Error("failed handle region heartbeat", zap.Error(err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any meta changes.
// Watcher is used to watch the PD for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand All @@ -48,7 +48,7 @@ type Watcher struct {
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD service.
// NewWatcher creates a new watcher to watch the meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any Placement Rule changes.
// Watcher is used to watch the PD for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ type Watcher struct {
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD service.
// NewWatcher creates a new watcher to watch the Placement Rule change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

// for watching the PD service meta info updates that are related to the scheduling.
// for watching the PD meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
Expand Down Expand Up @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updatePDServiceMemberLoop()
go s.updatePDMemberLoop()
}

func (s *Server) updatePDServiceMemberLoop() {
func (s *Server) updatePDMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Server) updatePDServiceMemberLoop() {
// double check
break
}
if s.cluster.SwitchPDServiceLeader(pdpb.NewPDClient(cc)) {
if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/PD service or the tsopb.Participant in the microservices.
// for in PD or the tsopb.Participant in the microservices.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in PD service mode.
// TODO: handle the plugin in microservice env.

Check warning on line 392 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L392

Added line #L392 was not covered by tests
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Controller struct {
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers are used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
// and used in the non-microservice env now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the PD service mode now.
// which will only be initialized and used in the microservice env now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct {
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/PD service. It's in the format of "/pd/{cluster_id}".
// the PD. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
// 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the
// storage endpoint.
Expand Down
7 changes: 4 additions & 3 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsPDServiceMode() {
if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -145,7 +145,8 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
zap.String("path", r.URL.Path), zap.String("addr", addr),
zap.String("target", rule.targetServiceName))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
Expand Down Expand Up @@ -223,7 +224,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/PD service.
// Prevent more than one redirection among PD.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@
}

func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
return fmt.Sprintf("This operation was executed in PD but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 257 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L257

Added line #L257 was not covered by tests
}
1 change: 0 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}

for _, m := range members.GetMembers() {
var e error
m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
Expand Down
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// Following requests are **not** redirected:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
// Because the writing of all the config of the scheduling service is in the PD service,
// Because the writing of all the config of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/config/placement-rule",
constant.SchedulingServiceName,
[]string{http.MethodGet}),
// because the writing of all the meta information of the scheduling service is in the PD service,
// because the writing of all the meta information of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
for _, opt := range opts {
opt(cfg)
}
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
err = s.Run()
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion server/api/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *server.Server)
go func(cfg *config.Config) {
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`))
reqCh <- struct{}{}
Expand Down
Loading
Loading