Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/role listener #394

Merged
merged 15 commits into from
Dec 29, 2023
1 change: 0 additions & 1 deletion go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"context"
"os"
"time"

"vitess.io/vitess/go/internal/global"
"vitess.io/vitess/go/vt/tableacl/mysqlbasedacl"

Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtconsensus/vtconsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
scanInterval = 3 * time.Second
scanAndRepairTimeout = 3 * time.Second
vtconsensusConfigFile string
enableVtconsensus bool

localDbPort int
)
Expand All @@ -59,6 +60,7 @@ func init() {
fs.DurationVar(&scanAndRepairTimeout, "scan_repair_timeout", 3*time.Second, "Time to wait for a Diagnose and repair operation.")
fs.StringVar(&vtconsensusConfigFile, "vtconsensus_config", "", "Config file for vtconsensus.")
fs.IntVar(&localDbPort, "db_port", 0, "Local mysql port, set this to enable local fast check.")
fs.BoolVar(&enableVtconsensus, "enable_vtconsensus", false, "enable vtconsensus")
})
}

Expand Down Expand Up @@ -162,6 +164,9 @@ func (vtconsensus *VTConsensus) RefreshCluster() {
shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, refreshInterval.Seconds())
ticker := time.Tick(refreshInterval)
for range ticker {
if !enableVtconsensus {
continue
}
ctx, cancel := context.WithTimeout(vtconsensus.ctx, refreshInterval)
shard.RefreshTabletsInShardWithLock(ctx)
cancel()
Expand All @@ -177,6 +182,9 @@ func (vtconsensus *VTConsensus) ScanAndRepair() {
scanInterval.Seconds(), scanAndRepairTimeout.Seconds())
ticker := time.Tick(scanInterval)
for range ticker {
if !enableVtconsensus {
continue
}
func() {
ctx, cancel := context.WithTimeout(vtconsensus.ctx, scanAndRepairTimeout)
defer cancel()
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/role"

"vitess.io/vitess/go/internal/global"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -156,6 +158,8 @@ type TabletManager struct {

// tmState manages the TabletManager state.
tmState *tmState
// roleListener probes mysql role, then call tmState to change vttablet's type
roleListener *role.Listener

// tabletAlias is saved away from tablet for read-only access
tabletAlias *topodatapb.TabletAlias
Expand Down Expand Up @@ -382,6 +386,10 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti
servenv.OnTerm(tm.VDiffEngine.Close)
}

tm.roleListener = role.NewListener(tm.syncVTTabletType)
servenv.OnRun(tm.roleListener.Open)
servenv.OnTerm(tm.roleListener.Close)

// The following initializations don't need to be done
// in any specific order.
tm.startShardSync()
Expand Down Expand Up @@ -881,3 +889,19 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t

return currentPrimary, nil
}

func (tm *TabletManager) syncVTTabletType(ctx context.Context, lastUpdate time.Time, targetTabletType topodatapb.TabletType) (bool, error) {
within30Seconds := func() bool {
return time.Since(lastUpdate) <= 30*time.Second
}

if tm.Tablet().Type == targetTabletType && within30Seconds() {
return false, nil
}
log.Infof("start to change vttablet role to %s", targetTabletType.String())
err := tm.ChangeType(ctx, targetTabletType, false)
if err != nil {
return false, err
}
return true, nil
}
212 changes: 212 additions & 0 deletions go/vt/vttablet/tabletserver/role/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
Copyright ApeCloud, Inc.
Licensed under the Apache v2(found in the LICENSE file in the root directory).
*/

package role

import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"

"github.com/spf13/pflag"

"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

var (
mysqlRoleProbeInterval = 1 * time.Second
mysqlRoleProbeTimeout = 1 * time.Second
)

var (
mysqlProbeServicePort int64 = 3501
mysqlProbeServiceHost = "localhost"
)

const LORRY_HTTP_PORT_ENV_NAME = "LORRY_HTTP_PORT"

const (
PRIMARY = "primary"
SECONDARY = "secondary"
MASTER = "master"
SLAVE = "slave"
LEADER = "Leader"
FOLLOWER = "Follower"
LEARNER = "Learner"
CANDIDATE = "Candidate"
LOGGER = "Logger"
)

func transitionRoleType(role string) topodatapb.TabletType {
// Convert the role to lower case for case-insensitive comparison
role = strings.ToLower(role)

switch role {
case strings.ToLower(LEADER), strings.ToLower(PRIMARY), strings.ToLower(MASTER):
return topodatapb.TabletType_PRIMARY
case strings.ToLower(FOLLOWER), strings.ToLower(SECONDARY), strings.ToLower(SLAVE):
return topodatapb.TabletType_REPLICA
case strings.ToLower(CANDIDATE):
return topodatapb.TabletType_REPLICA
case strings.ToLower(LEARNER):
return topodatapb.TabletType_RDONLY
case strings.ToLower(LOGGER):
return topodatapb.TabletType_SPARE
default:
return topodatapb.TabletType_UNKNOWN
}
}

func init() {
servenv.OnParseFor("vttablet", registerGCFlags)
}

func setUpMysqlProbeServicePort() {
portStr, ok := os.LookupEnv(LORRY_HTTP_PORT_ENV_NAME)
if !ok {
return
}
// parse portStr to int
portFromEnv, err := strconv.ParseInt(portStr, 10, 64)
if err != nil {
return
}
mysqlProbeServicePort = portFromEnv
}

func registerGCFlags(fs *pflag.FlagSet) {
fs.DurationVar(&mysqlRoleProbeInterval, "mysql_role_probe_interval", mysqlRoleProbeInterval, "Interval between garbage collection checks")
fs.DurationVar(&mysqlRoleProbeTimeout, "mysql_role_probe_timeout", mysqlRoleProbeTimeout, "Interval between garbage collection checks")
}

type Listener struct {
isOpen int64
cancelOperation context.CancelFunc

stateMutex sync.Mutex
reconcileMutex sync.Mutex

lastUpdate time.Time

changeTypeFunc func(ctx context.Context, lastUpdate time.Time, targetTabletType topodatapb.TabletType) (bool, error)
}

func NewListener(changeTypeFunc func(ctx context.Context, lastUpdate time.Time, tabletType topodatapb.TabletType) (bool, error)) *Listener {
l := &Listener{
isOpen: 0,
changeTypeFunc: changeTypeFunc,
}
return l
}

// Open opens database pool and initializes the schema
func (collector *Listener) Open() {
collector.stateMutex.Lock()
defer collector.stateMutex.Unlock()
if collector.isOpen > 0 {
// already open
return
}

log.Info("Listener: opening")
atomic.StoreInt64(&collector.isOpen, 1)
ctx := context.Background()
ctx, collector.cancelOperation = context.WithCancel(ctx)
go collector.probeLoop(ctx)
}

func (collector *Listener) Close() {
log.Infof("Listener - started execution of Close. Acquiring initMutex lock")
collector.stateMutex.Lock()
defer collector.stateMutex.Unlock()
log.Infof("Listener - acquired lock")
if collector.isOpen == 0 {
log.Infof("Listener - no collector is open")
// not open
return
}

log.Info("Listener: closing")
if collector.cancelOperation != nil {
collector.cancelOperation()
}
log.Infof("Listener - closing pool")
atomic.StoreInt64(&collector.isOpen, 0)
log.Infof("Listener - finished execution of Close")
}

func (collector *Listener) probeLoop(ctx context.Context) {
ticker := timer.NewSuspendableTicker(mysqlRoleProbeInterval, false)
defer ticker.Stop()
go ticker.TickNow()

log.Info("Listener: enter probeLoop")
for {
select {
case <-ctx.Done():
log.Info("Listener: exit probeLoop")
return
case <-ticker.C:
{
collector.reconcileLeadership(ctx)
}
}
}
}

func (collector *Listener) reconcileLeadership(ctx context.Context) {
collector.reconcileMutex.Lock()
defer collector.reconcileMutex.Unlock()

setUpMysqlProbeServicePort()
getRoleUrl := fmt.Sprintf("http://%s:%d/v1.0/bindings/mysql?operation=getRole", mysqlProbeServiceHost, mysqlProbeServicePort)

kvResp, err := probe(ctx, mysqlRoleProbeTimeout, http.MethodGet, getRoleUrl, nil)
if err != nil {
log.Errorf("try to probe mysql role, but error happened: %v", err)
return
}
role, ok := kvResp["role"]
if !ok {
log.Errorf("unable to get mysql role from probe response, response content: %v", kvResp)
return
}

// Safely assert the type of role to string.
roleStr, ok := role.(string)
if !ok {
log.Error("role value is not a string, role:%v", role)
return
}

tabletType := transitionRoleType(roleStr)

switch tabletType {
case topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY:
changeTypeCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
changed, err := collector.changeTypeFunc(changeTypeCtx, collector.lastUpdate, tabletType)
if err != nil {
log.Errorf("change vttablet role to %s, error:%w", tabletType.String(), err)
}
if changed {
collector.lastUpdate = time.Now()
log.Infof("change vttablet role to %s successfully", tabletType.String())
}
default:
log.Errorf("role value is not a string, role:%v", role)
}
}
Loading
Loading