Skip to content

Commit

Permalink
Feature/role listener (#394)
Browse files Browse the repository at this point in the history
* feat: impl role listener

* feat: impl role listener

* feat: impl role listener

* feat: impl role listener

* feat: impl role listener

* feat: remove useless config

* feat: impl syncVTTabletType

* fix: error message

* feat: impl listener

* feat: disable vtconsensus by default

* feat: fix log

* feat: add rule transition

* feat: add rule transition with case insensitive

* feat: fix vtconsensus loop

* feat: support inject host for listener
  • Loading branch information
earayu authored Dec 29, 2023
1 parent 29ee135 commit 4b63f2b
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 1 deletion.
1 change: 0 additions & 1 deletion go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"context"
"os"
"time"

"vitess.io/vitess/go/internal/global"
"vitess.io/vitess/go/vt/tableacl/mysqlbasedacl"

Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtconsensus/vtconsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
scanInterval = 3 * time.Second
scanAndRepairTimeout = 3 * time.Second
vtconsensusConfigFile string
enableVtconsensus bool

localDbPort int
)
Expand All @@ -59,6 +60,7 @@ func init() {
fs.DurationVar(&scanAndRepairTimeout, "scan_repair_timeout", 3*time.Second, "Time to wait for a Diagnose and repair operation.")
fs.StringVar(&vtconsensusConfigFile, "vtconsensus_config", "", "Config file for vtconsensus.")
fs.IntVar(&localDbPort, "db_port", 0, "Local mysql port, set this to enable local fast check.")
fs.BoolVar(&enableVtconsensus, "enable_vtconsensus", false, "enable vtconsensus")
})
}

Expand Down Expand Up @@ -162,6 +164,9 @@ func (vtconsensus *VTConsensus) RefreshCluster() {
shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, refreshInterval.Seconds())
ticker := time.Tick(refreshInterval)
for range ticker {
if !enableVtconsensus {
continue
}
ctx, cancel := context.WithTimeout(vtconsensus.ctx, refreshInterval)
shard.RefreshTabletsInShardWithLock(ctx)
cancel()
Expand All @@ -177,6 +182,9 @@ func (vtconsensus *VTConsensus) ScanAndRepair() {
scanInterval.Seconds(), scanAndRepairTimeout.Seconds())
ticker := time.Tick(scanInterval)
for range ticker {
if !enableVtconsensus {
continue
}
func() {
ctx, cancel := context.WithTimeout(vtconsensus.ctx, scanAndRepairTimeout)
defer cancel()
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/role"

"vitess.io/vitess/go/internal/global"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -156,6 +158,8 @@ type TabletManager struct {

// tmState manages the TabletManager state.
tmState *tmState
// roleListener probes mysql role, then call tmState to change vttablet's type
roleListener *role.Listener

// tabletAlias is saved away from tablet for read-only access
tabletAlias *topodatapb.TabletAlias
Expand Down Expand Up @@ -382,6 +386,10 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti
servenv.OnTerm(tm.VDiffEngine.Close)
}

tm.roleListener = role.NewListener(tm.syncVTTabletType)
servenv.OnRun(tm.roleListener.Open)
servenv.OnTerm(tm.roleListener.Close)

// The following initializations don't need to be done
// in any specific order.
tm.startShardSync()
Expand Down Expand Up @@ -881,3 +889,19 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t

return currentPrimary, nil
}

func (tm *TabletManager) syncVTTabletType(ctx context.Context, lastUpdate time.Time, targetTabletType topodatapb.TabletType) (bool, error) {
within30Seconds := func() bool {
return time.Since(lastUpdate) <= 30*time.Second
}

if tm.Tablet().Type == targetTabletType && within30Seconds() {
return false, nil
}
log.Infof("start to change vttablet role to %s", targetTabletType.String())
err := tm.ChangeType(ctx, targetTabletType, false)
if err != nil {
return false, err
}
return true, nil
}
222 changes: 222 additions & 0 deletions go/vt/vttablet/tabletserver/role/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
Copyright ApeCloud, Inc.
Licensed under the Apache v2(found in the LICENSE file in the root directory).
*/

package role

import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"

"github.com/spf13/pflag"

"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

var (
mysqlRoleProbeInterval = 1 * time.Second
mysqlRoleProbeTimeout = 1 * time.Second
)

var (
mysqlProbeServicePort int64 = 3501
mysqlProbeServiceHost = "localhost"
)

const LORRY_HTTP_PORT_ENV_NAME = "LORRY_HTTP_PORT"
const LORRY_HTTP_HOST_ENV_NAME = "LORRY_HTTP_HOST"

const (
PRIMARY = "primary"
SECONDARY = "secondary"
MASTER = "master"
SLAVE = "slave"
LEADER = "Leader"
FOLLOWER = "Follower"
LEARNER = "Learner"
CANDIDATE = "Candidate"
LOGGER = "Logger"
)

func transitionRoleType(role string) topodatapb.TabletType {
// Convert the role to lower case for case-insensitive comparison
role = strings.ToLower(role)

switch role {
case strings.ToLower(LEADER), strings.ToLower(PRIMARY), strings.ToLower(MASTER):
return topodatapb.TabletType_PRIMARY
case strings.ToLower(FOLLOWER), strings.ToLower(SECONDARY), strings.ToLower(SLAVE):
return topodatapb.TabletType_REPLICA
case strings.ToLower(CANDIDATE):
return topodatapb.TabletType_REPLICA
case strings.ToLower(LEARNER):
return topodatapb.TabletType_RDONLY
case strings.ToLower(LOGGER):
return topodatapb.TabletType_SPARE
default:
return topodatapb.TabletType_UNKNOWN
}
}

func init() {
servenv.OnParseFor("vttablet", registerGCFlags)
}

func setUpMysqlProbeServicePort() {
portStr, ok := os.LookupEnv(LORRY_HTTP_PORT_ENV_NAME)
if !ok {
return
}
// parse portStr to int
portFromEnv, err := strconv.ParseInt(portStr, 10, 64)
if err != nil {
return
}
mysqlProbeServicePort = portFromEnv
}

func setUpMysqlProbeServiceHost() {
host, ok := os.LookupEnv(LORRY_HTTP_HOST_ENV_NAME)
if !ok {
return
}
mysqlProbeServiceHost = host
}

func registerGCFlags(fs *pflag.FlagSet) {
fs.DurationVar(&mysqlRoleProbeInterval, "mysql_role_probe_interval", mysqlRoleProbeInterval, "Interval between garbage collection checks")
fs.DurationVar(&mysqlRoleProbeTimeout, "mysql_role_probe_timeout", mysqlRoleProbeTimeout, "Interval between garbage collection checks")
}

type Listener struct {
isOpen int64
cancelOperation context.CancelFunc

stateMutex sync.Mutex
reconcileMutex sync.Mutex

lastUpdate time.Time

changeTypeFunc func(ctx context.Context, lastUpdate time.Time, targetTabletType topodatapb.TabletType) (bool, error)
}

func NewListener(changeTypeFunc func(ctx context.Context, lastUpdate time.Time, tabletType topodatapb.TabletType) (bool, error)) *Listener {
l := &Listener{
isOpen: 0,
changeTypeFunc: changeTypeFunc,
}
return l
}

// Open opens database pool and initializes the schema
func (collector *Listener) Open() {
collector.stateMutex.Lock()
defer collector.stateMutex.Unlock()
if collector.isOpen > 0 {
// already open
return
}

log.Info("Listener: opening")
atomic.StoreInt64(&collector.isOpen, 1)
ctx := context.Background()
ctx, collector.cancelOperation = context.WithCancel(ctx)
go collector.probeLoop(ctx)
}

func (collector *Listener) Close() {
log.Infof("Listener - started execution of Close. Acquiring initMutex lock")
collector.stateMutex.Lock()
defer collector.stateMutex.Unlock()
log.Infof("Listener - acquired lock")
if collector.isOpen == 0 {
log.Infof("Listener - no collector is open")
// not open
return
}

log.Info("Listener: closing")
if collector.cancelOperation != nil {
collector.cancelOperation()
}
log.Infof("Listener - closing pool")
atomic.StoreInt64(&collector.isOpen, 0)
log.Infof("Listener - finished execution of Close")
}

func (collector *Listener) probeLoop(ctx context.Context) {
ticker := timer.NewSuspendableTicker(mysqlRoleProbeInterval, false)
defer ticker.Stop()
go ticker.TickNow()

log.Info("Listener: enter probeLoop")
for {
select {
case <-ctx.Done():
log.Info("Listener: exit probeLoop")
return
case <-ticker.C:
{
collector.reconcileLeadership(ctx)
}
}
}
}

func (collector *Listener) reconcileLeadership(ctx context.Context) {
collector.reconcileMutex.Lock()
defer collector.reconcileMutex.Unlock()

setUpMysqlProbeServicePort()
setUpMysqlProbeServiceHost()
getRoleUrl := fmt.Sprintf("http://%s:%d/v1.0/bindings/mysql?operation=getRole", mysqlProbeServiceHost, mysqlProbeServicePort)

kvResp, err := probe(ctx, mysqlRoleProbeTimeout, http.MethodGet, getRoleUrl, nil)
if err != nil {
log.Errorf("try to probe mysql role, but error happened: %v", err)
return
}
role, ok := kvResp["role"]
if !ok {
log.Errorf("unable to get mysql role from probe response, response content: %v", kvResp)
return
}

// Safely assert the type of role to string.
roleStr, ok := role.(string)
if !ok {
log.Error("role value is not a string, role:%v", role)
return
}

tabletType := transitionRoleType(roleStr)

switch tabletType {
case topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY:
changeTypeCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
changed, err := collector.changeTypeFunc(changeTypeCtx, collector.lastUpdate, tabletType)
if err != nil {
log.Errorf("change vttablet role to %s, error:%w", tabletType.String(), err)
}
if changed {
collector.lastUpdate = time.Now()
log.Infof("change vttablet role to %s successfully", tabletType.String())
}
default:
log.Errorf("role value is not a string, role:%v", role)
}
}
Loading

0 comments on commit 4b63f2b

Please sign in to comment.