Skip to content

Commit

Permalink
Merge branch 'master' into fix_flaky_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Dec 26, 2024
2 parents 062e9ed + c2f72ac commit b4a4b4f
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 150 deletions.
20 changes: 13 additions & 7 deletions client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,22 @@ func (td *tsoDispatcher) connectionCtxsUpdater() {
)

log.Info("[tso] start tso connection contexts updater")
setNewUpdateTicker := func(ticker *time.Ticker) {
setNewUpdateTicker := func(interval time.Duration) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
if interval == 0 {
updateTicker = &time.Ticker{}
} else {
updateTicker = time.NewTicker(interval)
}
}
// If the TSO Follower Proxy is enabled, set the update interval to the member update interval.
if option.GetEnableTSOFollowerProxy() {
setNewUpdateTicker(sd.MemberUpdateInterval)
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)
defer setNewUpdateTicker(0)

for {
provider.updateConnectionCtxs(ctx, connectionCtxs)
Expand All @@ -499,13 +507,11 @@ func (td *tsoDispatcher) connectionCtxsUpdater() {
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(sd.MemberUpdateInterval))
setNewUpdateTicker(sd.MemberUpdateInterval)
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
continue
setNewUpdateTicker(0)
}
case <-updateTicker.C:
// Triggered periodically when the TSO Follower Proxy is enabled.
Expand Down
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
["PD:mcs:ErrNotFoundSchedulingPrimary"]
error = '''
cannot find scheduling address
cannot find scheduling primary
'''

["PD:mcs:ErrSchedulingServer"]
Expand Down
67 changes: 64 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package errs

import "github.com/pingcap/errors"
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
)

const (
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
Expand All @@ -31,6 +36,62 @@ const (
NotServedErr = "is not served"
)

// gRPC errors
var (
// Canceled indicates the operation was canceled (typically by the caller).
ErrStreamClosed = status.Error(codes.Canceled, "stream is closed")

// Unknown error. An example of where this error may be returned is
// if a Status value received from another address space belongs to
// an error-space that is not known in this address space. Also
// errors raised by APIs that do not return enough error information
// may be converted to this error.
ErrUnknown = func(err error) error {
return status.Error(codes.Unknown, err.Error())
}

// DeadlineExceeded means operation expired before completion.
// For operations that change the state of the system, this error may be
// returned even if the operation has completed successfully. For
// example, a successful response from a server could have been delayed
// long enough for the deadline to expire.
ErrForwardTSOTimeout = status.Error(codes.DeadlineExceeded, "forward tso request timeout")
ErrTSOProxyRecvFromClientTimeout = status.Error(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrSendHeartbeatTimeout = status.Error(codes.DeadlineExceeded, "send heartbeat timeout")

// NotFound means some requested entity (e.g., file or directory) was
// not found.
ErrNotFoundTSOAddr = status.Error(codes.NotFound, "not found tso address")
ErrNotFoundSchedulingAddr = status.Error(codes.NotFound, "not found scheduling address")
ErrNotFoundService = status.Error(codes.NotFound, "not found service")

// ResourceExhausted indicates some resource has been exhausted, perhaps
// a per-user quota, or perhaps the entire file system is out of space.
ErrMaxCountTSOProxyRoutinesExceeded = status.Error(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrGRPCRateLimitExceeded = func(err error) error {
return status.Error(codes.ResourceExhausted, err.Error())
}

// FailedPrecondition indicates operation was rejected because the
// system is not in a state required for the operation's execution.
// For example, directory to be deleted may be non-empty, an rmdir
// operation is applied to a non-directory, etc.
ErrMismatchClusterID = func(clusterID, requestClusterID uint64) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, requestClusterID)
}

// Unavailable indicates the service is currently unavailable.
// This is a most likely a transient condition and may be corrected
// by retrying with a backoff. Note that it is not always safe to retry
// non-idempotent operations.
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Error(codes.Unavailable, "not leader")
ErrNotStarted = status.Error(codes.Unavailable, "server not started")
ErrEtcdNotStarted = status.Error(codes.Unavailable, "server is started, but etcd not started")
ErrFollowerHandlingNotAllowed = status.Error(codes.Unavailable, "not leader and follower handling not allowed")
)

// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
Expand Down Expand Up @@ -484,6 +545,6 @@ var (

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
10 changes: 2 additions & 8 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ meta_storagepb.MetaStorageServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -81,7 +75,7 @@ func (*Service) RegisterRESTHandler(_ map[string]http.Handler) error {

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader
}
return nil
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,18 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ rmpb.ResourceManagerServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -89,7 +83,7 @@ func (s *Service) GetManager() *Manager {

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,10 @@ func getRegionByID(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
if regionID == 0 {
c.String(http.StatusBadRequest, errs.ErrRegionInvalidID.FastGenByArgs().Error())
return
}
regionInfo := svr.GetBasicCluster().GetRegion(regionID)
if regionInfo == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error())
Expand Down
10 changes: 1 addition & 9 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -41,12 +39,6 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(*Service) (http.Handler, apiutil.APIServiceGroup) {
return dummyRestService{}, apiutil.APIServiceGroup{}
Expand Down Expand Up @@ -107,7 +99,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
return errs.ErrSendHeartbeatTimeout
}
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,18 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

var _ tsopb.TSOServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -102,14 +95,12 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
return errs.ErrNotStarted
}
header := request.GetHeader()
clusterID := header.GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(
codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
}
keyspaceID := header.GetKeyspaceId()
keyspaceGroupID := header.GetKeyspaceGroupId()
Expand All @@ -119,7 +110,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
keyspaceID, keyspaceGroupID,
count)
if err != nil {
return status.Error(codes.Unknown, err.Error())
return errs.ErrUnknown(err)
}
keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10)
tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -220,10 +211,10 @@ func (s *Service) GetMinTS(

func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) {
if s.IsClosed() || s.keyspaceGroupManager == nil {
return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted
return tsopb.ErrorType_NOT_BOOTSTRAPPED, errs.ErrNotStarted
}
if header == nil || header.GetClusterId() != keypath.ClusterID() {
return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched
return tsopb.ErrorType_CLUSTER_MISMATCHED, errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())
}
return tsopb.ErrorType_OK, nil
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
Expand Down Expand Up @@ -279,7 +277,7 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM
// TODO: Check if the sender is from the global TSO allocator
func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted
}
return nil
}
Expand All @@ -288,11 +286,10 @@ func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error {
// TODO: Check if the keyspace replica is the primary
func (s *Server) ValidateRequest(header *tsopb.RequestHeader) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted
}
if header.GetClusterId() != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), header.GetClusterId())
return errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())
}
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"github.com/docker/go-units"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -208,7 +206,7 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe
}
clusterID := request.GetHeader().GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
}
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (h *adminHandler) recoverAllocID(w http.ResponseWriter, r *http.Request) {
func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()
}
var idStr string
if len(id) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, _ *http.Request)
func (h *confHandler) getSchedulingServerConfig() (*config.Config, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return nil, errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()
}
url := fmt.Sprintf("%s/scheduling/api/v1/config", addr)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
Expand Down
Loading

0 comments on commit b4a4b4f

Please sign in to comment.