Skip to content

Commit

Permalink
add GTID_SUBTRACT
Browse files Browse the repository at this point in the history
  • Loading branch information
takaidohigasi committed Apr 2, 2021
1 parent 70be79f commit 8e56f26
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 77 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
bin/gtid-errant-fixer
.my.cnf
31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,34 @@ Please see also the detail on the Percona Live [slide](https://www.percona.com/l
## usage

```
Usage of ./bin/gtid-errant-fixer:
-c string mysql client config (default ".my.cnf")
-f force execution
$ ./gtid-errant-fixer --help
Usage of ./gtid-errant-fixer:
-c string
mysql client config(need SUPER priv to operate stop / slave) (default ".my.cnf")
-f force execution (skip prompt to confirm Y/N
-p string
mysql client config (need REPLICATION SLAVE)
-u string
mysql client config (need REPLICATION SLAVE)
```

## example

you can reset errant_gtid which you can confirm with server_id and host (report_host is required to enable)

```
% ./bin/gtid-errant-fixer main
errant transaction pre-check:
errant transaction found: 211f4d80-914c-11eb-b1ec-0242ac1f0004:1
$ ./gtid-errant-fixer
errant transaction pre-check:
errant_gtid ea2c5164-930b-11eb-901c-0242c0a81004:1: server_id: 3, host mysql3
stopping replica
original gtid_executed:
0a59fb94-914c-11eb-8644-0242ac1f0002:1-4,
0a66ee4a-914c-11eb-b206-0242ac1f0003:1-7,
211f4d80-914c-11eb-b1ec-0242ac1f0004:1
remove 211f4d80-914c-11eb-b1ec-0242ac1f0004:1 from gtid_executed
original gtid_executed:
e9f7ad21-930b-11eb-8646-0242c0a81002:1-18,
ea052114-930b-11eb-8dd3-0242c0a81003:1-18,
ea2c5164-930b-11eb-901c-0242c0a81004:1
would you continue to reset? (y/n) [n]: y
RESET SLAVE
RESET MASTER
SET GLOBAL gtid_purged='0a59fb94-914c-11eb-8644-0242ac1f0002:1-4,0a66ee4a-914c-11eb-b206-0242ac1f0003:1-7'
SET GLOBAL gtid_purged='e9f7ad21-930b-11eb-8646-0242c0a81002:1-18,ea052114-930b-11eb-8dd3-0242c0a81003:1-18'
resuming replica
completed.
```
1 change: 1 addition & 0 deletions docker/mysql1.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ collation-server = utf8_general_ci

log-bin = /var/log/mysql/mysqlbin
server-id = 1
report_host = mysql1

gtid_mode = ON
enforce_gtid_consistency = ON
1 change: 1 addition & 0 deletions docker/mysql2.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ collation-server = utf8_general_ci

log-bin = /var/log/mysql/mysqlbin
server-id = 2
report_host = mysql2

gtid_mode = ON
enforce_gtid_consistency = ON
1 change: 1 addition & 0 deletions docker/mysql3.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ collation-server = utf8_general_ci
log-bin = /var/log/mysql/mysqlbin
server-id = 3
read_only = ON
report_host = mysql3
# skip_slave_start = ON

# required for MSR
Expand Down
18 changes: 11 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,29 @@ func exitWithError(err error) {
}

func main() {
var conf string
var conf, monitorUser, monitorPass string
var forceOption bool

flag.StringVar(&conf, "c", ".my.cnf", "mysql client config")
flag.BoolVar(&forceOption, "f", false, "force execution")
flag.StringVar(&conf, "c", ".my.cnf", "mysql client config(need SUPER priv to operate stop / slave)")
flag.StringVar(&monitorUser, "u", "root", "mysql client config ()")
flag.StringVar(&monitorPass, "p", "", "mysql client config ()")
flag.BoolVar(&forceOption, "f", false, "force execution (skip prompt to confirm Y/N")
flag.Parse()

_, err := os.Stat(conf)
if err != nil {
if _, err := os.Stat(conf); err != nil {
exitWithError(err)
}

db, err := mysql_defaults_file.OpenUsingDefaultsFile("mysql", conf, "")
if err != nil {
exitWithError(err)
}
defer db.Close()

mysqlDB := replica.MySQLDB{db, nil, ""}
mysqlDB, err := replica.NewMySQLDB(db, monitorUser, monitorPass)
if err != nil {
exitWithError(err)
}

if err := mysqlDB.FixErrantGTID(forceOption); err != nil {
exitWithError(err)
}
Expand Down
173 changes: 115 additions & 58 deletions src/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,90 @@ import (
"database/sql"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"strconv"
"strings"

"github.com/Songmu/prompter"
"github.com/jmoiron/sqlx"

_ "github.com/go-sql-driver/mysql"
)

type (
ReplicaStatus struct {
AutoPosition bool `db:"Auto_Position"`
ChannelName string `db:"Channel_Name"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
GtidExecuted string // @@gtid_executed
MasterHost string `db:"Master_Host"`
MasterPort int `db:"Master_Port"`
MasterUUID string `db:"Master_UUID"`
ReplicaIORunning string `db:"Slave_IO_Running"`
ReplicaSQLRunning string `db:"Slave_SQL_Running"`
}

MySQLDB struct {
Dbh *sql.DB
ReplStatus []ReplicaStatus
ServerUuid string
dbh *sql.DB
replStatus []ReplicaStatus
serverUuid string
monitorUser string
monitorPass string
serverHosts map[string]MySQLServerInfo
}

MySQLServerInfo struct {
ServerId int `db:"Server_id"`
Host string `db:"Host"`
ServerUuid string `db:"Slave_UUID"`
}
)

const (
replicaStatusQuery = `SHOW SLAVE STATUS`
stopReplicaQuery = `STOP SLAVE`
startReplicaQuery = `START SLAVE`
resetReplicaQuery = `RESET SLAVE`
resetMasterQuery = `RESET MASTER`
setGtidPurgedQuery = `SET GLOBAL gtid_purged='%s'`
getServerUuidQuery = `SELECT @@server_uuid`
resetMasterQuery = `RESET MASTER`
resetReplicaQuery = `RESET SLAVE`
getErrantGTIDsQuery = `SELECT GTID_SUBTRACT('%s', '%s')`
getGTIDExecutedQuery = `SELECT @@global.gtid_executed`
getServerUuidQuery = `SELECT @@server_uuid`
// see https://dev.mysql.com/doc/refman/5.7/en/gtid-functions.html
setGtidPurgedQuery = `SET GLOBAL gtid_purged='%s'`
replicaStatusQuery = `SHOW SLAVE STATUS`
showReplicaHostQuery = `SHOW SLAVE HOSTS`
startReplicaQuery = `START SLAVE`
stopReplicaQuery = `STOP SLAVE`
)

// gatherReplicaStatuses update ReplStatus of MySQLDB
func NewMySQLDB(db *sql.DB, monitorUser string, monitorPass string) (*MySQLDB, error) {
var serverUuid string
if err := db.QueryRow(getServerUuidQuery).Scan(&serverUuid); err != nil {
return nil, err
}
return &MySQLDB{db, nil, serverUuid, monitorUser, monitorPass, map[string]MySQLServerInfo{}}, nil
}

// gatherReplicaStatuses update replStatus of MySQLDB
func (db *MySQLDB) gatherReplicaStatuses() error {
sqlxDb := sqlx.NewDb(db.Dbh, "mysql")
sqlxDb := sqlx.NewDb(db.dbh, "mysql")

rows, err := sqlxDb.Unsafe().Queryx(replicaStatusQuery)
if err != nil {
return err
}

db.ReplStatus = nil
db.replStatus = nil
status := &ReplicaStatus{}
for rows.Next() {
err = rows.StructScan(status)
if err != nil {
if err = rows.StructScan(status); err != nil {
return err
}
db.ReplStatus = append(db.ReplStatus, *status)
db.replStatus = append(db.replStatus, *status)
}

return nil
}

// function autoPosition check whether auto position is enabled for all the channel
func (db *MySQLDB) autoPosition() bool {
for _, status := range db.ReplStatus {
for _, status := range db.replStatus {
if !status.AutoPosition {
return false
}
Expand All @@ -72,7 +96,7 @@ func (db *MySQLDB) autoPosition() bool {
}

func (db *MySQLDB) replicaStopped() bool {
for _, status := range db.ReplStatus {
for _, status := range db.replStatus {
if !(status.ReplicaIORunning == "No" && status.ReplicaSQLRunning != "No") {
fmt.Printf("SlaveIORunning: %s, SlaveSQLRunning: %s (channel: %s)", status.ReplicaIORunning, status.ReplicaSQLRunning, status.ChannelName)
return false
Expand All @@ -83,7 +107,7 @@ func (db *MySQLDB) replicaStopped() bool {

func (db *MySQLDB) stopReplica() error {
fmt.Println("stopping replica")
if _, err := db.Dbh.Exec(stopReplicaQuery); err != nil {
if _, err := db.dbh.Exec(stopReplicaQuery); err != nil {
return err
}

Expand All @@ -92,32 +116,61 @@ func (db *MySQLDB) stopReplica() error {

func (db *MySQLDB) resumeReplica() error {
fmt.Println("resuming replica")
if _, err := db.Dbh.Exec(startReplicaQuery); err != nil {
if _, err := db.dbh.Exec(startReplicaQuery); err != nil {
return err
}
return nil
}

func (db *MySQLDB) errantTransaction() bool {
func (db *MySQLDB) errantTransaction() (string, error) {
fmt.Println("errant transaction pre-check: ")

// warning: this is not strict check.
for _, gtid := range strings.Split(db.ReplStatus[0].ExecutedGtidSet, ",") {
gtid = strings.Trim(gtid, "\n")
if strings.HasPrefix(gtid, db.ServerUuid) {
fmt.Printf(" errant transaction found: %s\n", gtid)
// TODO: print binlog events
return true
var errantGtidSets string
var executedGtidSets []string
for _, replica := range db.replStatus {
// connect to maser
dsn := db.monitorUser + ":" + db.monitorPass + "@(" + replica.MasterHost + ":" + strconv.Itoa(replica.MasterPort) + ")/"
sqlxdb, err := sqlx.Open("mysql", dsn)
if err != nil {
return errantGtidSets, err
}
defer sqlxdb.Close()

// replication node info
rows, err := sqlxdb.Unsafe().Queryx(showReplicaHostQuery)
if err != nil {
return errantGtidSets, err
}
server := MySQLServerInfo{}
for rows.Next() {
if err = rows.StructScan(&server); err != nil {
return errantGtidSets, err
}
db.serverHosts[server.ServerUuid] = server
}

// gtid_executed
if err := sqlxdb.QueryRowx(getGTIDExecutedQuery).Scan(&replica.GtidExecuted); err != nil {
return errantGtidSets, err
}
executedGtidSets = append(executedGtidSets, replica.GtidExecuted)
}
fmt.Println(" no errant transaction\n")
db.printGTIDSet()
return false
}

func (db *MySQLDB) printGTIDSet() {
fmt.Println("server_uuid: " + db.ServerUuid)
fmt.Println("gtid_executed: \n" + db.ReplStatus[0].ExecutedGtidSet)
replicaGtidSets := strings.Replace(db.replStatus[0].ExecutedGtidSet, "\n", "", -1)
masterGtidSets := strings.Join(executedGtidSets, ",")
sqlxdb := sqlx.NewDb(db.dbh, "mysql")
if err := sqlxdb.QueryRowx(fmt.Sprintf(getErrantGTIDsQuery, replicaGtidSets, masterGtidSets)).Scan(&errantGtidSets); err != nil {
return errantGtidSets, err
}

if errantGtidSets != "" {
for _, errant := range strings.Split(errantGtidSets, ",") {
host := db.serverHosts[strings.Split(errant, ":")[0]]
fmt.Printf(" errant_gtid %s: server_id: %d, host %s\n", errant, host.ServerId, host.Host)
}
}

return errantGtidSets, nil
}

func (db *MySQLDB) FixErrantGTID(forceOption bool) error {
Expand All @@ -129,11 +182,12 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error {
return errors.New("auto position must be enabled for all the channel")
}

if err := db.Dbh.QueryRow(getServerUuidQuery).Scan(&db.ServerUuid); err != nil {
errantGtidSets, err := db.errantTransaction()
if err != nil {
return err
}

if !db.errantTransaction() {
if errantGtidSets == "" {
fmt.Println("errant GTID not found")
return nil
}

Expand All @@ -146,18 +200,22 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error {
return err
}

executedGtidSet := db.ReplStatus[0].ExecutedGtidSet
// print original state just in case
executedGtidSet := db.replStatus[0].ExecutedGtidSet
fmt.Printf("original gtid_executed: \n%s\n", executedGtidSet)

gtidSet := strings.Split(executedGtidSet, ",")

var gtidPurged []string
for _, gtid := range gtidSet {
gtid = strings.Trim(gtid, "\n")
if !strings.HasPrefix(gtid, db.ServerUuid) {
gtid = strings.Replace(gtid, "\n", "", -1)
errantFound := false
for _, errant := range strings.Split(errantGtidSets, ",") {
if strings.HasPrefix(gtid, strings.Split(errant, ":")[0]) {
errantFound = true
}
}
if !errantFound {
gtidPurged = append(gtidPurged, gtid)
} else {
fmt.Printf("remove %s from gtid_executed\n", gtid)
}
}

Expand All @@ -166,19 +224,18 @@ func (db *MySQLDB) FixErrantGTID(forceOption bool) error {
fmt.Println("do nothing")
return nil
}
fmt.Println(resetReplicaQuery)
if _, err := db.dbh.Exec(resetReplicaQuery); err != nil {
return err
}
fmt.Println(resetMasterQuery)
if _, err := db.dbh.Exec(resetMasterQuery); err != nil {
return err
}
fmt.Println(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ",")))
if _, err := db.dbh.Exec(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))); err != nil {
return err
}
}
fmt.Println(resetReplicaQuery)
if _, err := db.Dbh.Exec(resetReplicaQuery); err != nil {
return err
}
fmt.Println(resetMasterQuery)
if _, err := db.Dbh.Exec(resetMasterQuery); err != nil {
return err
}
fmt.Println(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ",")))
if _, err := db.Dbh.Exec(fmt.Sprintf(setGtidPurgedQuery, strings.Join(gtidPurged, ","))); err != nil {
return err
}

return nil
}

0 comments on commit 8e56f26

Please sign in to comment.