From 8e56f26873b8ce9f8bdcd56cbac25bf50defcee8 Mon Sep 17 00:00:00 2001 From: takaidohigasi Date: Thu, 1 Apr 2021 23:41:03 +0900 Subject: [PATCH] add GTID_SUBTRACT --- .gitignore | 1 + README.md | 31 +++++--- docker/mysql1.cnf | 1 + docker/mysql2.cnf | 1 + docker/mysql3.cnf | 1 + main.go | 18 +++-- src/replica/replica.go | 173 +++++++++++++++++++++++++++-------------- 7 files changed, 149 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index 66d5652..7144329 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea bin/gtid-errant-fixer +.my.cnf diff --git a/README.md b/README.md index 1560623..1e42962 100644 --- a/README.md +++ b/README.md @@ -8,27 +8,34 @@ Please see also the detail on the Percona Live [slide](https://www.percona.com/l ## usage ``` -Usage of ./bin/gtid-errant-fixer: - -c string mysql client config (default ".my.cnf") - -f force execution +$ ./gtid-errant-fixer --help +Usage of ./gtid-errant-fixer: + -c string + mysql client config(need SUPER priv to operate stop / slave) (default ".my.cnf") + -f force execution (skip prompt to confirm Y/N + -p string + mysql client config (need REPLICATION SLAVE) + -u string + mysql client config (need REPLICATION SLAVE) ``` ## example +you can reset errant_gtid which you can confirm with server_id and host (report_host is required to enable) + ``` -% ./bin/gtid-errant-fixer main -errant transaction pre-check: - errant transaction found: 211f4d80-914c-11eb-b1ec-0242ac1f0004:1 +$ ./gtid-errant-fixer +errant transaction pre-check: + errant_gtid ea2c5164-930b-11eb-901c-0242c0a81004:1: server_id: 3, host mysql3 stopping replica -original gtid_executed: -0a59fb94-914c-11eb-8644-0242ac1f0002:1-4, -0a66ee4a-914c-11eb-b206-0242ac1f0003:1-7, -211f4d80-914c-11eb-b1ec-0242ac1f0004:1 -remove 211f4d80-914c-11eb-b1ec-0242ac1f0004:1 from gtid_executed +original gtid_executed: +e9f7ad21-930b-11eb-8646-0242c0a81002:1-18, +ea052114-930b-11eb-8dd3-0242c0a81003:1-18, +ea2c5164-930b-11eb-901c-0242c0a81004:1 would you continue to reset? (y/n) [n]: y RESET SLAVE RESET MASTER -SET GLOBAL gtid_purged='0a59fb94-914c-11eb-8644-0242ac1f0002:1-4,0a66ee4a-914c-11eb-b206-0242ac1f0003:1-7' +SET GLOBAL gtid_purged='e9f7ad21-930b-11eb-8646-0242c0a81002:1-18,ea052114-930b-11eb-8dd3-0242c0a81003:1-18' resuming replica completed. ``` diff --git a/docker/mysql1.cnf b/docker/mysql1.cnf index cbea80d..44bb36d 100644 --- a/docker/mysql1.cnf +++ b/docker/mysql1.cnf @@ -4,6 +4,7 @@ collation-server = utf8_general_ci log-bin = /var/log/mysql/mysqlbin server-id = 1 +report_host = mysql1 gtid_mode = ON enforce_gtid_consistency = ON diff --git a/docker/mysql2.cnf b/docker/mysql2.cnf index 7bb124e..dec3765 100644 --- a/docker/mysql2.cnf +++ b/docker/mysql2.cnf @@ -4,6 +4,7 @@ collation-server = utf8_general_ci log-bin = /var/log/mysql/mysqlbin server-id = 2 +report_host = mysql2 gtid_mode = ON enforce_gtid_consistency = ON diff --git a/docker/mysql3.cnf b/docker/mysql3.cnf index c0abb39..1aae1f2 100644 --- a/docker/mysql3.cnf +++ b/docker/mysql3.cnf @@ -5,6 +5,7 @@ collation-server = utf8_general_ci log-bin = /var/log/mysql/mysqlbin server-id = 3 read_only = ON +report_host = mysql3 # skip_slave_start = ON # required for MSR diff --git a/main.go b/main.go index c7dbd82..75967f4 100644 --- a/main.go +++ b/main.go @@ -16,25 +16,29 @@ func exitWithError(err error) { } func main() { - var conf string + var conf, monitorUser, monitorPass string var forceOption bool - flag.StringVar(&conf, "c", ".my.cnf", "mysql client config") - flag.BoolVar(&forceOption, "f", false, "force execution") + flag.StringVar(&conf, "c", ".my.cnf", "mysql client config(need SUPER priv to operate stop / slave)") + flag.StringVar(&monitorUser, "u", "root", "mysql client config ()") + flag.StringVar(&monitorPass, "p", "", "mysql client config ()") + flag.BoolVar(&forceOption, "f", false, "force execution (skip prompt to confirm Y/N") flag.Parse() - _, err := os.Stat(conf) - if err != nil { + if _, err := os.Stat(conf); err != nil { exitWithError(err) } - db, err := mysql_defaults_file.OpenUsingDefaultsFile("mysql", conf, "") if err != nil { exitWithError(err) } defer db.Close() - mysqlDB := replica.MySQLDB{db, nil, ""} + mysqlDB, err := replica.NewMySQLDB(db, monitorUser, monitorPass) + if err != nil { + exitWithError(err) + } + if err := mysqlDB.FixErrantGTID(forceOption); err != nil { exitWithError(err) } diff --git a/src/replica/replica.go b/src/replica/replica.go index 88c7725..afdcd77 100644 --- a/src/replica/replica.go +++ b/src/replica/replica.go @@ -4,11 +4,13 @@ import ( "database/sql" "errors" "fmt" - _ "github.com/go-sql-driver/mysql" + "strconv" "strings" "github.com/Songmu/prompter" "github.com/jmoiron/sqlx" + + _ "github.com/go-sql-driver/mysql" ) type ( @@ -16,46 +18,68 @@ type ( AutoPosition bool `db:"Auto_Position"` ChannelName string `db:"Channel_Name"` ExecutedGtidSet string `db:"Executed_Gtid_Set"` + GtidExecuted string // @@gtid_executed MasterHost string `db:"Master_Host"` + MasterPort int `db:"Master_Port"` MasterUUID string `db:"Master_UUID"` ReplicaIORunning string `db:"Slave_IO_Running"` ReplicaSQLRunning string `db:"Slave_SQL_Running"` } MySQLDB struct { - Dbh *sql.DB - ReplStatus []ReplicaStatus - ServerUuid string + dbh *sql.DB + replStatus []ReplicaStatus + serverUuid string + monitorUser string + monitorPass string + serverHosts map[string]MySQLServerInfo + } + + MySQLServerInfo struct { + ServerId int `db:"Server_id"` + Host string `db:"Host"` + ServerUuid string `db:"Slave_UUID"` } ) const ( - replicaStatusQuery = `SHOW SLAVE STATUS` - stopReplicaQuery = `STOP SLAVE` - startReplicaQuery = `START SLAVE` - resetReplicaQuery = `RESET SLAVE` - resetMasterQuery = `RESET MASTER` - setGtidPurgedQuery = `SET GLOBAL gtid_purged='%s'` - getServerUuidQuery = `SELECT @@server_uuid` + resetMasterQuery = `RESET MASTER` + resetReplicaQuery = `RESET SLAVE` + getErrantGTIDsQuery = `SELECT GTID_SUBTRACT('%s', '%s')` + getGTIDExecutedQuery = `SELECT @@global.gtid_executed` + getServerUuidQuery = `SELECT @@server_uuid` + // see https://dev.mysql.com/doc/refman/5.7/en/gtid-functions.html + setGtidPurgedQuery = `SET GLOBAL gtid_purged='%s'` + replicaStatusQuery = `SHOW SLAVE STATUS` + showReplicaHostQuery = `SHOW SLAVE HOSTS` + startReplicaQuery = `START SLAVE` + stopReplicaQuery = `STOP SLAVE` ) -// gatherReplicaStatuses update ReplStatus of MySQLDB +func NewMySQLDB(db *sql.DB, monitorUser string, monitorPass string) (*MySQLDB, error) { + var serverUuid string + if err := db.QueryRow(getServerUuidQuery).Scan(&serverUuid); err != nil { + return nil, err + } + return &MySQLDB{db, nil, serverUuid, monitorUser, monitorPass, map[string]MySQLServerInfo{}}, nil +} + +// gatherReplicaStatuses update replStatus of MySQLDB func (db *MySQLDB) gatherReplicaStatuses() error { - sqlxDb := sqlx.NewDb(db.Dbh, "mysql") + sqlxDb := sqlx.NewDb(db.dbh, "mysql") rows, err := sqlxDb.Unsafe().Queryx(replicaStatusQuery) if err != nil { return err } - db.ReplStatus = nil + db.replStatus = nil status := &ReplicaStatus{} for rows.Next() { - err = rows.StructScan(status) - if err != nil { + if err = rows.StructScan(status); err != nil { return err } - db.ReplStatus = append(db.ReplStatus, *status) + db.replStatus = append(db.replStatus, *status) } return nil @@ -63,7 +87,7 @@ func (db *MySQLDB) gatherReplicaStatuses() error { // function autoPosition check whether auto position is enabled for all the channel func (db *MySQLDB) autoPosition() bool { - for _, status := range db.ReplStatus { + for _, status := range db.replStatus { if !status.AutoPosition { return false } @@ -72,7 +96,7 @@ func (db *MySQLDB) autoPosition() bool { } func (db *MySQLDB) replicaStopped() bool { - for _, status := range db.ReplStatus { + for _, status := range db.replStatus { if !(status.ReplicaIORunning == "No" && status.ReplicaSQLRunning != "No") { fmt.Printf("SlaveIORunning: %s, SlaveSQLRunning: %s (channel: %s)", status.ReplicaIORunning, status.ReplicaSQLRunning, status.ChannelName) return false @@ -83,7 +107,7 @@ func (db *MySQLDB) replicaStopped() bool { func (db *MySQLDB) stopReplica() error { fmt.Println("stopping replica") - if _, err := db.Dbh.Exec(stopReplicaQuery); err != nil { + if _, err := db.dbh.Exec(stopReplicaQuery); err != nil { return err } @@ -92,32 +116,61 @@ func (db *MySQLDB) stopReplica() error { func (db *MySQLDB) resumeReplica() error { fmt.Println("resuming replica") - if _, err := db.Dbh.Exec(startReplicaQuery); err != nil { + if _, err := db.dbh.Exec(startReplicaQuery); err != nil { return err } return nil } -func (db *MySQLDB) errantTransaction() bool { +func (db *MySQLDB) errantTransaction() (string, error) { fmt.Println("errant transaction pre-check: ") - // warning: this is not strict check. - for _, gtid := range strings.Split(db.ReplStatus[0].ExecutedGtidSet, ",") { - gtid = strings.Trim(gtid, "\n") - if strings.HasPrefix(gtid, db.ServerUuid) { - fmt.Printf(" errant transaction found: %s\n", gtid) - // TODO: print binlog events - return true + var errantGtidSets string + var executedGtidSets []string + for _, replica := range db.replStatus { + // connect to maser + dsn := db.monitorUser + ":" + db.monitorPass + "@(" + replica.MasterHost + ":" + strconv.Itoa(replica.MasterPort) + ")/" + sqlxdb, err := sqlx.Open("mysql", dsn) + if err != nil { + return errantGtidSets, err + } + defer sqlxdb.Close() + + // replication node info + rows, err := sqlxdb.Unsafe().Queryx(showReplicaHostQuery) + if err != nil { + return errantGtidSets, err + } + server := MySQLServerInfo{} + for rows.Next() { + if err = rows.StructScan(&server); err != nil { + return errantGtidSets, err + } + db.serverHosts[server.ServerUuid] = server + } + + // gtid_executed + if err := sqlxdb.QueryRowx(getGTIDExecutedQuery).Scan(&replica.GtidExecuted); err != nil { + return errantGtidSets, err } + executedGtidSets = append(executedGtidSets, replica.GtidExecuted) } - fmt.Println(" no errant transaction\n") - db.printGTIDSet() - return false -} -func (db *MySQLDB) printGTIDSet() { - fmt.Println("server_uuid: " + db.ServerUuid) - fmt.Println("gtid_executed: \n" + db.ReplStatus[0].ExecutedGtidSet) + replicaGtidSets := strings.Replace(db.replStatus[0].ExecutedGtidSet, "\n", "", -1) + masterGtidSets := strings.Join(executedGtidSets, ",") + sqlxdb := sqlx.NewDb(db.dbh, "mysql") + if err := sqlxdb.QueryRowx(fmt.Sprintf(getErrantGTIDsQuery, replicaGtidSets, masterGtidSets)).Scan(&errantGtidSets); err != nil { + return errantGtidSets, err + } + + if errantGtidSets != "" { + for _, errant := range strings.Split(errantGtidSets, ",") { + host := db.serverHosts[strings.Split(errant, ":")[0]] + fmt.Printf(" errant_gtid %s: server_id: %d, host %s\n", errant, host.ServerId, host.Host) + } + } + + return errantGtidSets, nil } func (db *MySQLDB) FixErrantGTID(forceOption bool) error { @@ -129,11 +182,12 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error { return errors.New("auto position must be enabled for all the channel") } - if err := db.Dbh.QueryRow(getServerUuidQuery).Scan(&db.ServerUuid); err != nil { + errantGtidSets, err := db.errantTransaction() + if err != nil { return err } - - if !db.errantTransaction() { + if errantGtidSets == "" { + fmt.Println("errant GTID not found") return nil } @@ -146,18 +200,22 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error { return err } - executedGtidSet := db.ReplStatus[0].ExecutedGtidSet + // print original state just in case + executedGtidSet := db.replStatus[0].ExecutedGtidSet fmt.Printf("original gtid_executed: \n%s\n", executedGtidSet) - gtidSet := strings.Split(executedGtidSet, ",") var gtidPurged []string for _, gtid := range gtidSet { - gtid = strings.Trim(gtid, "\n") - if !strings.HasPrefix(gtid, db.ServerUuid) { + gtid = strings.Replace(gtid, "\n", "", -1) + errantFound := false + for _, errant := range strings.Split(errantGtidSets, ",") { + if strings.HasPrefix(gtid, strings.Split(errant, ":")[0]) { + errantFound = true + } + } + if !errantFound { gtidPurged = append(gtidPurged, gtid) - } else { - fmt.Printf("remove %s from gtid_executed\n", gtid) } } @@ -166,19 +224,18 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error { fmt.Println("do nothing") return nil } + fmt.Println(resetReplicaQuery) + if _, err := db.dbh.Exec(resetReplicaQuery); err != nil { + return err + } + fmt.Println(resetMasterQuery) + if _, err := db.dbh.Exec(resetMasterQuery); err != nil { + return err + } + fmt.Println(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))) + if _, err := db.dbh.Exec(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))); err != nil { + return err + } } - fmt.Println(resetReplicaQuery) - if _, err := db.Dbh.Exec(resetReplicaQuery); err != nil { - return err - } - fmt.Println(resetMasterQuery) - if _, err := db.Dbh.Exec(resetMasterQuery); err != nil { - return err - } - fmt.Println(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))) - if _, err := db.Dbh.Exec(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))); err != nil { - return err - } - return nil }