Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve filter rules reloading #605

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vttablet/customrule/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
DatabaseCustomRuleDbName = sidecardb.SidecarDBName
DatabaseCustomRuleTableName = "wescale_plugin"
DatabaseCustomRuleReloadInterval = 60 * time.Second
DatabaseCustomRuleNotifierDelayTime = 100 * time.Millisecond
DatabaseCustomRuleNotifierDelayTime = 100 * time.Millisecond // time to wait for primary inserting filters and the filters are synced to replicas
)

func registerFlags(fs *pflag.FlagSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package databasecustomrule

import (
"context"
"sync"

"fmt"
"reflect"
Expand All @@ -27,6 +28,8 @@ const databaseCustomRuleSource string = "DATABASE_CUSTOM_RULE"

// databaseCustomRule is the database backed implementation.
type databaseCustomRule struct {
mu sync.Mutex

// controller is set at construction time.
controller tabletserver.Controller

Expand All @@ -45,13 +48,18 @@ func newDatabaseCustomRule(qsc tabletserver.Controller) (*databaseCustomRule, er

func (cr *databaseCustomRule) start() {
go func() {
// reload rules that already in the database once start
if err := cr.reloadRulesFromDatabase(); err != nil {
log.Warningf("Background watch of database custom rule failed: %v", err)
}

intervalTicker := time.NewTicker(customrule.DatabaseCustomRuleReloadInterval)
defer intervalTicker.Stop()

for {
select {
case <-intervalTicker.C:
case <-customrule.Watch():
//case <-customrule.Watch():
}

if err := cr.reloadRulesFromDatabase(); err != nil {
Expand Down Expand Up @@ -84,6 +92,9 @@ func (cr *databaseCustomRule) applyRules(qr *sqltypes.Result) error {
qrs.Add(rule)
}

cr.mu.Lock()
defer cr.mu.Unlock()

if !reflect.DeepEqual(cr.qrs, qrs) {
cr.qrs = qrs.Copy()
cr.controller.SetQueryRules(databaseCustomRuleSource, qrs)
Expand Down Expand Up @@ -122,6 +133,7 @@ func activateTopoCustomRules(qsc tabletserver.Controller) {
if err != nil {
log.Fatalf("cannot start DatabaseCustomRule: %v", err)
}
customrule.WaitForFilter = cr.WaitForFilter
cr.start()

servenv.OnTerm(cr.stop)
Expand All @@ -131,3 +143,39 @@ func activateTopoCustomRules(qsc tabletserver.Controller) {
func init() {
tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, activateTopoCustomRules)
}

func (cr *databaseCustomRule) WaitForFilter(name string, shouldExists bool) error {
timeoutDuration := 5 * time.Second
interval := 100 * time.Millisecond

timeout := time.After(timeoutDuration)
ticker := time.NewTicker(interval)
defer ticker.Stop()

err := cr.reloadRulesFromDatabase()
if err != nil {
return fmt.Errorf("failed to reload rules from database: %v", err)
}

for {
select {
case <-timeout:
return fmt.Errorf("wait for filter reload timeout")
case <-ticker.C:
filter := cr.FindFilter(name)
if shouldExists && filter != nil || !shouldExists && filter == nil {
return nil
}
err := cr.reloadRulesFromDatabase()
if err != nil {
return fmt.Errorf("failed to reload rules from database: %v", err)
}
}
}
}

func (cr *databaseCustomRule) FindFilter(name string) *rules.Rule {
cr.mu.Lock()
defer cr.mu.Unlock()
return cr.qrs.Find(name)
}
2 changes: 2 additions & 0 deletions go/vt/vttablet/customrule/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ func NotifyReload() {
func Watch() <-chan struct{} {
return customRuleChanged
}

var WaitForFilter func(name string, shouldExists bool) error
54 changes: 41 additions & 13 deletions go/vt/vttablet/tabletserver/common_query_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"strconv"
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"

"vitess.io/vitess/go/vt/vttablet/customrule"

Expand Down Expand Up @@ -82,34 +83,61 @@ func (qe *QueryEngine) TabletsPlans(alias *topodatapb.TabletAlias) (*sqltypes.Re
}

func (qe *QueryEngine) HandleWescaleFilterRequest(sql string, isPrimary bool) (*sqltypes.Result, error) {

stmt, _, err := sqlparser.Parse2(sql)
if err != nil {
return nil, err
}

switch s := stmt.(type) {
case *sqlparser.CreateWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleCreateFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleCreateFilter(s)

customrule.WaitForFilter(s.Name, true)
return rst, nil

case *sqlparser.AlterWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleAlterFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleAlterFilter(s)

nameToWait := s.AlterInfo.Name
if nameToWait == rules.UnsetValueOfStmt {
nameToWait = s.OriginName
}
customrule.WaitForFilter(nameToWait, true)
return rst, nil

case *sqlparser.DropWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleDropFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleDropFilter(s)

customrule.WaitForFilter(s.Name, false)
return rst, nil

case *sqlparser.ShowWescaleFilter:
return qe.HandleShowFilter(s)
}
Expand Down
Loading