Skip to content

Commit

Permalink
refactor, unify some MySQL and MariaDB interface
Browse files Browse the repository at this point in the history
MySQL GTID failover use position check like MHA
  • Loading branch information
siddontang committed Jan 14, 2015
1 parent 03e1e4e commit 75746a7
Show file tree
Hide file tree
Showing 15 changed files with 652 additions and 685 deletions.
4 changes: 4 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *clientTestSuite) SetUpSuite(c *C) {
}

func (s *clientTestSuite) TearDownSuite(c *C) {
if s.c == nil {
return
}

s.testConn_DropTable(c)
s.testStmt_DropTable(c)

Expand Down
19 changes: 12 additions & 7 deletions docker/Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
IMAGE=siddontang/mysql:latest
MYSQL_IMAGE=siddontang/mysql:latest
MARIADB_IMAGE=siddontang/mariadb:latest

all: build

build:
@docker run -d -p 3306:3306 --name=mysql1 -e "GTID_MODE=on" -e "SERVER_ID=1" ${IMAGE}
@docker run -d -p 3307:3306 --name=mysql2 -e "GTID_MODE=on" -e "SERVER_ID=2" ${IMAGE}
@docker run -d -p 3308:3306 --name=mysql3 -e "GTID_MODE=on" -e "SERVER_ID=3" ${IMAGE}
@docker run -d -p 3306:3306 --name=mysql1 -e "GTID_MODE=on" -e "SERVER_ID=1" ${MYSQL_IMAGE}
@docker run -d -p 3307:3306 --name=mysql2 -e "GTID_MODE=on" -e "SERVER_ID=2" ${MYSQL_IMAGE}
@docker run -d -p 3308:3306 --name=mysql3 -e "GTID_MODE=on" -e "SERVER_ID=3" ${MYSQL_IMAGE}
@docker run -d -p 3316:3306 --name=mariadb1 -e "SERVER_ID=4" ${MARIADB_IMAGE}
@docker run -d -p 3317:3306 --name=mariadb2 -e "SERVER_ID=5" ${MARIADB_IMAGE}
@docker run -d -p 3318:3306 --name=mariadb3 -e "SERVER_ID=6" ${MARIADB_IMAGE}

image:
@docker pull ${IMAGE}
@docker pull ${MYSQL_IMAGE}
@docker pull ${MARIADB_IMAGE}

stop:
@docker stop mysql1 mysql2 mysql3
@docker stop mysql1 mysql2 mysql3 mariadb4 mariadb5 mariadb6

clean:
@docker rm -f mysql1 mysql2 mysql3
@docker rm -f mysql1 mysql2 mysql3 mariadb4 mariadb5 mariadb6
44 changes: 15 additions & 29 deletions failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package failover

import (
"fmt"
"github.com/siddontang/go-mysql/mysql"
)

// Failover will do below things after the master down
Expand All @@ -14,19 +15,22 @@ import (
// 2, If the failover error, the whole topology may be wrong, we must handle this error manually
// 3, Slaves must have same replication mode, all use GTID or not
//
func Failover(slaves []*Server) ([]*Server, error) {
// First check slaves use gtid or not
gtidMode, err := CheckGTIDMode(slaves)
if err != nil {
return nil, err
}

func Failover(flavor string, slaves []*Server) ([]*Server, error) {
var h Handler
var err error

if gtidMode == GTIDModeOn {
h = new(GTIDHandler)
} else {
return nil, fmt.Errorf("failover only supports GTID mode")
switch flavor {
case mysql.MySQLFlavor:
h = new(MysqlGTIDHandler)
case mysql.MariaDBFlavor:
return nil, fmt.Errorf("MariaDB failover is not supported now")
default:
return nil, fmt.Errorf("invalid flavor %s", flavor)
}

// First check slaves use gtid or not
if err := h.CheckGTIDMode(slaves); err != nil {
return nil, err
}

// Stop all slave IO_THREAD and wait the relay log done
Expand Down Expand Up @@ -55,21 +59,3 @@ func Failover(slaves []*Server) ([]*Server, error) {

return slaves, nil
}

// Check slaves have same GTID used or not
func CheckGTIDMode(slaves []*Server) (string, error) {
gtidMode, err := slaves[0].GTIDMode()
if err != nil {
return GTIDModeOff, err
}
for i := 1; i < len(slaves); i++ {
mode, err := slaves[i].GTIDMode()
if err != nil {
return GTIDModeOff, err
} else if gtidMode != mode {
return GTIDModeOff, fmt.Errorf("%s use GTID %s, but %s use GTID %s", slaves[0].Addr, gtidMode, slaves[i].Addr, mode)
}
}

return gtidMode, nil
}
22 changes: 6 additions & 16 deletions failover/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type failoverTestSuite struct {
var _ = Suite(&failoverTestSuite{})

func (s *failoverTestSuite) SetUpSuite(c *C) {
ports := []int{3306, 3307, 3308}
ports := []int{3306, 3307, 3308, 3316, 3317, 3318}

s.s = make([]*Server, len(ports))

Expand All @@ -38,6 +38,9 @@ func (s *failoverTestSuite) SetUpSuite(c *C) {
err = s.s[i].ResetSlave()
c.Assert(err, IsNil)

_, err = s.s[i].Execute("CREATE DATABASE IF NOT EXISTS test")
c.Assert(err, IsNil)

_, err = s.s[i].Execute("DROP TABLE IF EXISTS test.go_mysql_test")
c.Assert(err, IsNil)

Expand All @@ -52,8 +55,8 @@ func (s *failoverTestSuite) SetUpSuite(c *C) {
func (s *failoverTestSuite) TearDownSuite(c *C) {
}

func (s *failoverTestSuite) TestGTID(c *C) {
h := new(GTIDHandler)
func (s *failoverTestSuite) TestMysqlFailover(c *C) {
h := new(MysqlGTIDHandler)

m := s.s[0]
s1 := s.s[1]
Expand Down Expand Up @@ -88,9 +91,6 @@ func (s *failoverTestSuite) TestGTID(c *C) {

s.checkSelect(c, s1, id, "c")

s.checkCompare(c, h, s1, s2, 1)
s.checkCompare(c, h, s2, s1, -1)

best, err := h.FindBestSlaves([]*Server{s1, s2})
c.Assert(err, IsNil)
c.Assert(best, DeepEquals, []*Server{s1})
Expand Down Expand Up @@ -121,8 +121,6 @@ func (s *failoverTestSuite) TestGTID(c *C) {
err = h.WaitCatchMaster(s2, m)
c.Assert(err, IsNil)

s.checkCompare(c, h, s1, s2, 0)

best, err = h.FindBestSlaves([]*Server{s1, s2})
c.Assert(err, IsNil)
c.Assert(best, DeepEquals, []*Server{s1, s2})
Expand All @@ -133,8 +131,6 @@ func (s *failoverTestSuite) TestGTID(c *C) {
id = s.checkInsert(c, m, "e")
err = h.WaitCatchMaster(s1, m)

s.checkCompare(c, h, s1, s2, 1)

best, err = h.FindBestSlaves([]*Server{s1, s2})
c.Assert(err, IsNil)
c.Assert(best, DeepEquals, []*Server{s1})
Expand All @@ -153,9 +149,3 @@ func (s *failoverTestSuite) checkInsert(c *C, m *Server, name string) uint64 {

return r.InsertId
}

func (s *failoverTestSuite) checkCompare(c *C, h Handler, s1 *Server, s2 *Server, cv int) {
v, err := h.Compare(s1, s2)
c.Assert(err, IsNil)
c.Assert(v, Equals, cv)
}
11 changes: 3 additions & 8 deletions failover/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@ type Handler interface {
// Change slave s to master m and replicate from it
ChangeMasterTo(s *Server, m *Server) error

// Compare slave s1, s2 and decide which one has more replicated data from master
// 1, s1 has more
// 0, equal
// -1, s2 has more
// s1 and s2 must have same master
Compare(s1 *Server, s2 *Server) (int, error)

// Ensure all relay log done, it will stop slave IO_THREAD
// You must start slave again if you want to do replication continuatively
WaitRelayLogDone(s *Server) error
Expand All @@ -22,6 +15,8 @@ type Handler interface {
WaitCatchMaster(s *Server, m *Server) error

// Find best slave which has the most up-to-date data from master
// If two or more slave have the same, find the higher weight
FindBestSlaves(slaves []*Server) ([]*Server, error)

// Check all slaves have gtid enabled
CheckGTIDMode(slaves []*Server) error
}
89 changes: 31 additions & 58 deletions failover/gtid_handler.go → failover/mysql_gtid_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package failover

import (
"fmt"
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/mysql"
. "github.com/siddontang/go-mysql/mysql"
"net"
)

type GTIDHandler struct {
type MysqlGTIDHandler struct {
Handler
}

func (h *GTIDHandler) Promote(s *Server) error {
func (h *MysqlGTIDHandler) Promote(s *Server) error {
if err := h.WaitRelayLogDone(s); err != nil {
return err
}
Expand All @@ -23,49 +22,33 @@ func (h *GTIDHandler) Promote(s *Server) error {
return nil
}

func (h *GTIDHandler) Compare(s1 *Server, s2 *Server) (int, error) {
set1, err := h.readExecutedGTIDSet(s1)
if err != nil {
return 0, err
}

set2, err := h.readExecutedGTIDSet(s2)
if err != nil {
return 0, err
}

if !uuid.Equal(set1.SID, set2.SID) {
return 0, fmt.Errorf("%s, %s have different master", s1.Addr, s2.Addr)
}
func (h *MysqlGTIDHandler) FindBestSlaves(slaves []*Server) ([]*Server, error) {
// MHA use Relay_Master_Log_File and Exec_Master_Log_Pos to determind which is the best slave

return set1.Intervals.Compare(set2.Intervals), nil
}

func (h *GTIDHandler) FindBestSlaves(slaves []*Server) ([]*Server, error) {
bestSlaves := []*Server{}

sets := make([]*mysql.UUIDSet, len(slaves))
ps := make([]Position, len(slaves))

lastIndex := -1

var lastSet *mysql.UUIDSet = nil
for i, slave := range slaves {
set, err := h.readExecutedGTIDSet(slave)
pos, err := slave.FetchSlaveExecutePos()

if err != nil {
return nil, err
}

sets[i] = set
ps[i] = pos

if lastSet == nil {
lastSet = set
if lastIndex == -1 {
lastIndex = i
bestSlaves = []*Server{slave}
} else if !uuid.Equal(lastSet.SID, set.SID) {
return nil, fmt.Errorf("%s, %s have different master", slaves[0].Addr, slave.Addr)
} else {
switch lastSet.Intervals.Compare(set.Intervals) {
switch ps[lastIndex].Compare(pos) {
case 1:
//do nothing
case -1:
lastSet = set
lastIndex = i
bestSlaves = []*Server{slave}
case 0:
// these two slaves have same data,
Expand All @@ -82,7 +65,7 @@ const changeMasterToWithAuto = `CHANGE MASTER TO
MASTER_USER = "%s", MASTER_PASSWORD = "%s",
MASTER_AUTO_POSITION = 1`

func (h *GTIDHandler) ChangeMasterTo(s *Server, m *Server) error {
func (h *MysqlGTIDHandler) ChangeMasterTo(s *Server, m *Server) error {
if err := h.WaitRelayLogDone(s); err != nil {
return err
}
Expand All @@ -109,7 +92,7 @@ func (h *GTIDHandler) ChangeMasterTo(s *Server, m *Server) error {
return nil
}

func (h *GTIDHandler) WaitRelayLogDone(s *Server) error {
func (h *MysqlGTIDHandler) WaitRelayLogDone(s *Server) error {
if err := s.StopSlaveIOThread(); err != nil {
return err
}
Expand All @@ -126,7 +109,7 @@ func (h *GTIDHandler) WaitRelayLogDone(s *Server) error {
return h.waitUntilAfterGTIDs(s, retrieved)
}

func (h *GTIDHandler) WaitCatchMaster(s *Server, m *Server) error {
func (h *MysqlGTIDHandler) WaitCatchMaster(s *Server, m *Server) error {
r, err := m.MasterStatus()
if err != nil {
return err
Expand All @@ -137,30 +120,20 @@ func (h *GTIDHandler) WaitCatchMaster(s *Server, m *Server) error {
return h.waitUntilAfterGTIDs(s, masterGTIDSet)
}

func (h *GTIDHandler) waitUntilAfterGTIDs(s *Server, gtids string) error {
_, err := s.Execute(fmt.Sprintf("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')", gtids))
return err
}

func (h *GTIDHandler) readExecutedGTIDSet(s *Server) (*mysql.UUIDSet, error) {
r, err := s.SlaveStatus()
if err != nil {
return nil, err
func (h *MysqlGTIDHandler) CheckGTIDMode(slaves []*Server) error {
for i := 0; i < len(slaves); i++ {
mode, err := slaves[i].MysqlGTIDMode()
if err != nil {
return err
} else if mode != GTIDModeOn {
return fmt.Errorf("%s use not GTID mode", slaves[i].Addr)
}
}

masterUUID, _ := r.GetStringByName(0, "Master_UUID")
executed, _ := r.GetStringByName(0, "Executed_Gtid_Set")

g, err := mysql.ParseGTIDSet(executed)
if err != nil {
return nil, err
}
return nil
}

set, ok := g.Sets[masterUUID]
if ok {
return set, nil
} else {
u, _ := uuid.FromString(masterUUID)
return &mysql.UUIDSet{u, mysql.IntervalSlice{}}, nil
}
func (h *MysqlGTIDHandler) waitUntilAfterGTIDs(s *Server, gtids string) error {
_, err := s.Execute(fmt.Sprintf("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')", gtids))
return err
}
Loading

0 comments on commit 75746a7

Please sign in to comment.