diff --git a/canal/canal.go b/canal/canal.go index 987744995..a01a96ecc 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -34,15 +34,16 @@ type Canal struct { syncer *replication.BinlogSyncer eventHandler EventHandler + observer Observer connLock sync.Mutex conn *client.Conn - tableLock sync.RWMutex - tables map[string]*schema.Table - errorTablesGetTime map[string]time.Time + tracker *schema.SchemaTracker + + tableLock sync.RWMutex + tableMatchCache map[string]bool - tableMatchCache map[string]bool includeTableRegex []*regexp.Regexp excludeTableRegex []*regexp.Regexp @@ -55,6 +56,8 @@ var UnknownTableRetryPeriod = time.Second * time.Duration(10) var ErrExcludedTable = errors.New("excluded table meta") func NewCanal(cfg *Config) (*Canal, error) { + var err error + c := new(Canal) c.cfg = cfg @@ -63,13 +66,11 @@ func NewCanal(cfg *Config) (*Canal, error) { c.dumpDoneCh = make(chan struct{}) c.eventHandler = &DummyEventHandler{} - c.tables = make(map[string]*schema.Table) - if c.cfg.DiscardNoMetaRowEvent { - c.errorTablesGetTime = make(map[string]time.Time) - } c.master = &masterInfo{} - var err error + if err = c.prepareTracker(); err != nil { + return nil, errors.Trace(err) + } if err = c.prepareDumper(); err != nil { return nil, errors.Trace(err) @@ -166,6 +167,24 @@ func (c *Canal) prepareDumper() error { return nil } +func (c *Canal) prepareTracker() error { + var err error + trackerCfg := &schema.TrackerConfig{ + CharsetServer: "utf8", + Storage: c.cfg.Tracker.Storage, + Dir: c.cfg.Tracker.Dir, + Addr: c.cfg.Tracker.Addr, + User: c.cfg.Tracker.User, + Password: c.cfg.Tracker.Password, + Database: c.cfg.Tracker.Database, + } + c.tracker, err = schema.NewSchemaTracker(trackerCfg) + if err != nil { + return errors.Trace(err) + } + return nil +} + // Run will first try to dump all data from MySQL master `mysqldump`, // then sync from the binlog position in the dump data. // It will run forever until meeting an error or Canal closed. @@ -285,87 +304,25 @@ func (c *Canal) checkTableMatch(key string) bool { return matchFlag } -func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { +func (c *Canal) GetTable(db string, table string) (*schema.TableDef, error) { key := fmt.Sprintf("%s.%s", db, table) // if table is excluded, return error and skip parsing event or dump if !c.checkTableMatch(key) { return nil, ErrExcludedTable } - c.tableLock.RLock() - t, ok := c.tables[key] - c.tableLock.RUnlock() - - if ok { - return t, nil - } - - if c.cfg.DiscardNoMetaRowEvent { - c.tableLock.RLock() - lastTime, ok := c.errorTablesGetTime[key] - c.tableLock.RUnlock() - if ok && time.Now().Sub(lastTime) < UnknownTableRetryPeriod { - return nil, schema.ErrMissingTableMeta - } - } - - t, err := schema.NewTable(c, db, table) - if err != nil { - // check table not exists - if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok { - return nil, schema.ErrTableNotExist - } - // work around : RDS HAHeartBeat - // ref : https://github.com/alibaba/canal/blob/master/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L385 - // issue : https://github.com/alibaba/canal/issues/222 - // This is a common error in RDS that canal can't get HAHealthCheckSchema's meta, so we mock a table meta. - // If canal just skip and log error, as RDS HA heartbeat interval is very short, so too many HAHeartBeat errors will be logged. - if key == schema.HAHealthCheckSchema { - // mock ha_health_check meta - ta := &schema.Table{ - Schema: db, - Name: table, - Columns: make([]schema.TableColumn, 0, 2), - Indexes: make([]*schema.Index, 0), - } - ta.AddColumn("id", "bigint(20)", "", "") - ta.AddColumn("type", "char(1)", "", "") - c.tableLock.Lock() - c.tables[key] = ta - c.tableLock.Unlock() - return ta, nil - } - // if DiscardNoMetaRowEvent is true, we just log this error - if c.cfg.DiscardNoMetaRowEvent { - c.tableLock.Lock() - c.errorTablesGetTime[key] = time.Now() - c.tableLock.Unlock() - // log error and return ErrMissingTableMeta - log.Errorf("canal get table meta err: %v", errors.Trace(err)) - return nil, schema.ErrMissingTableMeta - } - return nil, err - } + return c.tracker.GetTableDef(db, table) +} - c.tableLock.Lock() - c.tables[key] = t - if c.cfg.DiscardNoMetaRowEvent { - // if get table info success, delete this key from errorTablesGetTime - delete(c.errorTablesGetTime, key) - } - c.tableLock.Unlock() +func (c *Canal) GetDatabases() []string { + return c.tracker.GetDatabases() +} - return t, nil +func (c *Canal) GetTables(db string) ([]string, error) { + return c.tracker.GetTables(db) } -// ClearTableCache clear table cache -func (c *Canal) ClearTableCache(db []byte, table []byte) { - key := fmt.Sprintf("%s.%s", db, table) - c.tableLock.Lock() - delete(c.tables, key) - if c.cfg.DiscardNoMetaRowEvent { - delete(c.errorTablesGetTime, key) - } - c.tableLock.Unlock() +func (c *Canal) ExecDDL(db string, statement string) error { + return c.tracker.Exec(db, statement) } // Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB diff --git a/canal/canal_test.go b/canal/canal_test.go old mode 100755 new mode 100644 index bd879c466..e2b38ef2b --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -14,6 +14,9 @@ import ( ) var testHost = flag.String("host", "127.0.0.1", "MySQL host") +var testPort = flag.Int("port", 3306, "MySQL port") +var testUser = flag.String("user", "root", "MySQL user") +var testPassword = flag.String("password", "", "MySQL password") func Test(t *testing.T) { TestingT(t) @@ -27,8 +30,9 @@ var _ = Suite(&canalTestSuite{}) func (s *canalTestSuite) SetUpSuite(c *C) { cfg := NewDefaultConfig() - cfg.Addr = fmt.Sprintf("%s:3306", *testHost) - cfg.User = "root" + cfg.Addr = fmt.Sprintf("%s:%d", *testHost, *testPort) + cfg.User = *testUser + cfg.Password = *testPassword cfg.HeartbeatPeriod = 200 * time.Millisecond cfg.ReadTimeout = 300 * time.Millisecond cfg.Dump.ExecutionPath = "mysqldump" @@ -61,8 +65,6 @@ func (s *canalTestSuite) SetUpSuite(c *C) { s.execute(c, "DELETE FROM test.canal_test") s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c") - s.execute(c, "SET GLOBAL binlog_format = 'ROW'") - s.c.SetEventHandler(&testEventHandler{c: c}) go func() { err = s.c.Run() diff --git a/canal/config.go b/canal/config.go index a0f87b54b..8cbc0d3f2 100644 --- a/canal/config.go +++ b/canal/config.go @@ -8,6 +8,7 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/schema" ) type DumpConfig struct { @@ -41,6 +42,24 @@ type DumpConfig struct { Protocol string `toml:"protocol"` } +type TrackerConfig struct { + // The charset_set_server of source mysql, we need + // this charset to handle ddl statement + CharsetServer string `toml:"charset_server"` + + // Storage type to store schema data, may be boltdb or mysql + Storage string `toml:"storage"` + + // Boltdb file path to store data + Dir string `toml:"dir"` + + // MySQL info to connect + Addr string `toml:"addr"` + User string `toml:"user"` + Password string `toml:"password"` + Database string `toml:"database"` +} + type Config struct { Addr string `toml:"addr"` User string `toml:"user"` @@ -60,11 +79,10 @@ type Config struct { IncludeTableRegex []string `toml:"include_table_regex"` ExcludeTableRegex []string `toml:"exclude_table_regex"` - // discard row event without table meta - DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` - Dump DumpConfig `toml:"dump"` + Tracker TrackerConfig `toml:"schema_tracker"` + UseDecimal bool `toml:"use_decimal"` ParseTime bool `toml:"parse_time"` @@ -112,5 +130,8 @@ func NewDefaultConfig() *Config { c.Dump.DiscardErr = true c.Dump.SkipMasterData = false + c.Tracker.Storage = schema.StorageType_Boltdb + c.Tracker.Dir = "." + return c } diff --git a/canal/dump.go b/canal/dump.go index e69e6410b..ebac8c885 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -27,6 +27,16 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error { return nil } +func (h *dumpParseHandler) DDL(db string, statement string) error { + if err := h.c.ctx.Err(); err != nil { + return err + } + if err := h.c.tracker.Exec(db, statement); err != nil { + return err + } + return nil +} + func (h *dumpParseHandler) Data(db string, table string, values []string) error { if err := h.c.ctx.Err(); err != nil { return err @@ -35,9 +45,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error tableInfo, err := h.c.GetTable(db, table) if err != nil { e := errors.Cause(err) - if e == ErrExcludedTable || - e == schema.ErrTableNotExist || - e == schema.ErrMissingTableMeta { + if e == ErrExcludedTable { return nil } log.Errorf("get %s.%s information err: %v", db, table, err) @@ -52,19 +60,23 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } else if v == "_binary ''" { vs[i] = []byte{} } else if v[0] != '\'' { - if tableInfo.Columns[i].Type == schema.TYPE_NUMBER { + if tableInfo.Columns[i].InnerType == schema.TypeShort || + tableInfo.Columns[i].InnerType == schema.TypeInt24 || + tableInfo.Columns[i].InnerType == schema.TypeLong || + tableInfo.Columns[i].InnerType == schema.TypeLonglong { n, err := strconv.ParseInt(v, 10, 64) if err != nil { return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err) } vs[i] = n - } else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT { + } else if tableInfo.Columns[i].InnerType == schema.TypeFloat { f, err := strconv.ParseFloat(v, 64) if err != nil { return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err) } vs[i] = f - } else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL { + } else if tableInfo.Columns[i].InnerType == schema.TypeDecimal || + tableInfo.Columns[i].InnerType == schema.TypeNewDecimal { if h.c.cfg.UseDecimal { d, err := decimal.NewFromString(v) if err != nil { @@ -158,9 +170,6 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) - if err := c.eventHandler.OnPosSynced(pos, true); err != nil { - return errors.Trace(err) - } var startPos fmt.Stringer = pos if h.gset != nil { c.master.UpdateGTIDSet(h.gset) @@ -172,6 +181,8 @@ func (c *Canal) dump() error { } func (c *Canal) tryDump() error { + var err error + pos := c.master.Position() gset := c.master.GTIDSet() if (len(pos.Name) > 0 && pos.Pos > 0) || @@ -186,5 +197,29 @@ func (c *Canal) tryDump() error { return nil } - return c.dump() + // Reset schema info + err = c.tracker.Reset() + if err != nil { + return err + } + + err = c.dump() + if err != nil { + return err + } + + // Tell schema tracker to make a snapshot + pos = c.master.Position() + err = c.tracker.Persist(pos) + if err != nil { + return err + } + + // If data and schema in backup file are persisted, + // we tell event handler to save the pos + if err = c.eventHandler.OnPosSynced(pos, true); err != nil { + return errors.Trace(err) + } + + return nil } diff --git a/canal/hook.go b/canal/hook.go new file mode 100644 index 000000000..4beb74ce1 --- /dev/null +++ b/canal/hook.go @@ -0,0 +1,30 @@ +package canal + +type Observer struct { + BeforeSchemaChange func(string, string) error + OnSchemaChangeFailed func(string, string, error) (bool, error) +} + +// Register a hook that will be called before schema change +func (c *Canal) RegisterBeforeSchemaChangeHook(fn func(string, string) error) { + c.observer.BeforeSchemaChange = fn +} + +// Register a hook that will be called on DDL failed +func (c *Canal) RegisterOnSchemaChangeFailedHook(fn func(string, string, error) (bool, error)) { + c.observer.OnSchemaChangeFailed = fn +} + +func (c *Canal) runBeforeSchemaChangeHook(db string, statement string) error { + if c.observer.BeforeSchemaChange == nil { + return nil + } + return c.observer.BeforeSchemaChange(db, statement) +} + +func (c *Canal) runOnSchemaChangeFailedHook(db string, statement string, err error) (bool, error) { + if c.observer.OnSchemaChangeFailed == nil { + return false, err + } + return c.observer.OnSchemaChangeFailed(db, statement, err) +} diff --git a/canal/rows.go b/canal/rows.go index e246ee5a2..d02433498 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -16,7 +16,7 @@ const ( // RowsEvent is the event for row replication. type RowsEvent struct { - Table *schema.Table + Table *schema.TableDef Action string // changed row list // binlog has three update event version, v0, v1 and v2. @@ -28,7 +28,7 @@ type RowsEvent struct { Header *replication.EventHeader } -func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent { +func newRowsEvent(table *schema.TableDef, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent { e := new(RowsEvent) e.Table = table @@ -45,12 +45,19 @@ func (r *RowsEvent) handleUnsigned() { // Handle Unsigned Columns here, for binlog replication, we can't know the integer is unsigned or not, // so we use int type but this may cause overflow outside sometimes, so we must convert to the really . // unsigned type - if len(r.Table.UnsignedColumns) == 0 { + var unsignedColumns []int + for idx, column := range r.Table.Columns { + if column.Unsigned { + unsignedColumns = append(unsignedColumns, idx) + } + } + + if len(unsignedColumns) == 0 { return } for i := 0; i < len(r.Rows); i++ { - for _, index := range r.Table.UnsignedColumns { + for _, index := range unsignedColumns { switch t := r.Rows[i][index].(type) { case int8: r.Rows[i][index] = uint8(t) @@ -71,5 +78,5 @@ func (r *RowsEvent) handleUnsigned() { // String implements fmt.Stringer interface. func (r *RowsEvent) String() string { - return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows) + return fmt.Sprintf("%s %v %v", r.Action, r.Table, r.Rows) } diff --git a/canal/sync.go b/canal/sync.go index 6a62e673c..6ceb4f7c2 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -10,7 +10,6 @@ import ( "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" - "github.com/siddontang/go-mysql/schema" ) var ( @@ -81,14 +80,8 @@ func (c *Canal) runSyncBinlog() error { // we only focus row based event err = c.handleRowsEvent(ev) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) - return errors.Trace(err) - } + log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) } continue case *replication.XIDEvent: @@ -149,12 +142,16 @@ func (c *Canal) runSyncBinlog() error { savePos = true force = true - c.ClearTableCache(db, table) - log.Infof("table structure changed, clear table cache: %s.%s\n", db, table) - if err = c.eventHandler.OnTableChanged(string(db), string(table)); err != nil && errors.Cause(err) != schema.ErrTableNotExist { + + if err = c.eventHandler.OnTableChanged(string(db), string(table)); err != nil { return errors.Trace(err) } + // Track and replay this ddl + if err = c.trackDDL(string(db), string(e.Query), pos); err != nil { + return err + } + // Now we only handle Table Changed DDL, maybe we will support more later. if err = c.eventHandler.OnDDL(pos, e); err != nil { return errors.Trace(err) @@ -175,6 +172,42 @@ func (c *Canal) runSyncBinlog() error { return nil } +func (c *Canal) trackDDL(db string, query string, pos mysql.Position) error { + // Before track the ddl, we need to ensure all dml events before has synced, + // because once we have change the schmea, we don't know the old schema info + // related with old dml events. + err := c.runBeforeSchemaChangeHook(db, query) + if err != nil { + log.Errorf("run before_schema_change hook error: %s", err) + return errors.Trace(err) + } + + for { + err = c.tracker.ExecAndPersist(db, query, mysql.Position{pos.Name, pos.Pos}) + if err == nil { + return nil + } + if err != nil { + var skip bool + log.Errorf("exec and persist error: %s", err) + skip, err = c.runOnSchemaChangeFailedHook(db, query, err) + if err != nil { + log.Errorf("run on_schema_change_failed error: %s", err) + return errors.Trace(err) + } + + if skip { + log.Warnf("skip a ddl statement: %s", query) + return nil + } + log.Warnf("try to execute ddl again...") + } + } + + return nil + +} + func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { ev := e.Event.(*replication.RowsEvent) @@ -184,7 +217,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { t, err := c.GetTable(schema, table) if err != nil { - return err + if err != ErrExcludedTable { + return err + } + return nil } var action string switch e.Header.EventType { diff --git a/dump/dump.go b/dump/dump.go index 18272620b..d7f1e3139 100644 --- a/dump/dump.go +++ b/dump/dump.go @@ -160,9 +160,6 @@ func (d *Dumper) Dump(w io.Writer) error { args = append(args, "--skip-opt") args = append(args, "--quick") - // We only care about data - args = append(args, "--no-create-info") - // Multi row is easy for us to parse the data args = append(args, "--skip-extended-insert") @@ -197,6 +194,7 @@ func (d *Dumper) Dump(w io.Writer) error { // If we only dump some tables, the dump data will not have database name // which makes us hard to parse, so here we add it manually. + w.Write([]byte(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;\n", d.TableDB))) w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB))) } diff --git a/dump/dump_test.go b/dump/dump_test.go index eed4c7507..0131fec35 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -16,6 +16,7 @@ import ( // use docker mysql for test var host = flag.String("host", "127.0.0.1", "MySQL host") var port = flag.Int("port", 3306, "MySQL host") +var password = flag.String("password", "", "MySQL password") var execution = flag.String("exec", "mysqldump", "mysqldump execution path") @@ -32,10 +33,10 @@ var _ = Suite(&schemaTestSuite{}) func (s *schemaTestSuite) SetUpSuite(c *C) { var err error - s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, *port), "root", "", "") + s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, *port), "root", *password, "") c.Assert(err, IsNil) - s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", "") + s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", *password) c.Assert(err, IsNil) c.Assert(s.d, NotNil) @@ -123,6 +124,10 @@ func (h *testParseHandler) Data(schema string, table string, values []string) er return nil } +func (h *testParseHandler) DDL(schema string, statement string) error { + return nil +} + func (s *parserTestSuite) TestParseFindTable(c *C) { tbl := []struct { sql string diff --git a/dump/parser.go b/dump/parser.go index f0222a9eb..e731dbce3 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -20,16 +20,19 @@ type ParseHandler interface { // Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos; BinLog(name string, pos uint64) error + DDL(schema string, statement string) error Data(schema string, table string, values []string) error } var binlogExp *regexp.Regexp var useExp *regexp.Regexp +var ddlExp *regexp.Regexp var valuesExp *regexp.Regexp func init() { binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") + ddlExp = regexp.MustCompile("^CREATE\\s.*") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") } @@ -40,7 +43,7 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { var db string var binlogParsed bool - + sql := "" for { line, err := rb.ReadString('\n') if err != nil && err != io.EOF { @@ -54,12 +57,17 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { return c == '\r' || c == '\n' }) + sql = sql + line + if line == "" || line[len(line)-1] != ';' { + continue + } + if parseBinlogPos && !binlogParsed { - if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + if m := binlogExp.FindAllStringSubmatch(sql, -1); len(m) == 1 { name := m[0][1] pos, err := strconv.ParseUint(m[0][2], 10, 64) if err != nil { - return errors.Errorf("parse binlog %v err, invalid number", line) + return errors.Errorf("parse binlog %v err, invalid number", sql) } if err = h.BinLog(name, pos); err != nil && err != ErrSkip { @@ -70,29 +78,36 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { } } - if m := useExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + if m := useExp.FindAllStringSubmatch(sql, -1); len(m) == 1 { db = m[0][1] } - if m := valuesExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + if m := ddlExp.FindAllStringSubmatch(sql, -1); len(m) == 1 { + if err = h.DDL(db, sql); err != nil { + return errors.Trace(err) + } + } + + if m := valuesExp.FindAllStringSubmatch(sql, -1); len(m) == 1 { table := m[0][1] values, err := parseValues(m[0][2]) if err != nil { - return errors.Errorf("parse values %v err", line) + return errors.Errorf("parse values %v err", sql) } if err = h.Data(db, table, values); err != nil && err != ErrSkip { return errors.Trace(err) } } + sql = "" } return nil } func parseValues(str string) ([]string, error) { - // values are seperated by comma, but we can not split using comma directly + // values are separated by comma, but we can not split using comma directly // string is enclosed by single quote // a simple implementation, may be more robust later. diff --git a/go.mod b/go.mod index e2a7a652a..364843989 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,9 @@ module github.com/siddontang/go-mysql require ( github.com/BurntSushi/toml v0.3.1 + github.com/boltdb/bolt v1.3.1 + github.com/bytewatch/ddl-executor v0.0.0-20190123135826-00acd083c12d + github.com/go-sql-driver/mysql v1.4.1 github.com/pingcap/errors v0.11.0 github.com/satori/go.uuid v1.2.0 github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 diff --git a/go.sum b/go.sum index 6206e99ff..6d7c71f6f 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,153 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/blacktear23/go-proxyprotocol v0.0.0-20171102103907-62e368e1c470/go.mod h1:VKt7CNAQxpFpSDz3sXyj9hY/GbVsQCr0sB3w59nE7lU= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bytewatch/ddl-executor v0.0.0-20190123135826-00acd083c12d h1:deAXWiKT05pPwnyC7UxS4Myp90ZoeUzuM+a7ofOHyqc= +github.com/bytewatch/ddl-executor v0.0.0-20190123135826-00acd083c12d/go.mod h1:uEecs69IKjDiKwEczboNG1qOzoFeenUEEKYPjOGgfHI= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= +github.com/coreos/bbolt v1.3.0/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cznic/mathutil v0.0.0-20181021201202-eba54fb065b7 h1:y+DH9ARrWiiNBV+6waYP2IPcsRbxdU1qsnycPfShF4c= +github.com/cznic/mathutil v0.0.0-20181021201202-eba54fb065b7/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/etcd-io/gofail v0.0.0-20180808172546-51ce9a71510a/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-sql-driver/mysql v0.0.0-20170715192408-3955978caca4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.5.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/myesui/uuid v1.0.0/go.mod h1:2CDfNgU0LR8mIdO8vdWd8i9gWWxLlcoIGGpSNgafq84= +github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= +github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= +github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20181203065228-c14302da291c/go.mod h1:Ja9XPjot9q4/3JyCZodnWDGNXt4pKemhIYCvVJM7P24= +github.com/pingcap/parser v0.0.0-20190108044100-02812c3c22e7/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20190123063514-f8c3dff115d5 h1:WBU3Axs+DW+2GxtyDsyNUSd5Sp0C0SVAZ5GNuDsfhGU= +github.com/pingcap/parser v0.0.0-20190123063514-f8c3dff115d5/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E= +github.com/pingcap/tidb v0.0.0-20190108123336-c68ee7318319 h1:ltRU5YUxYpW29ywVKnFXIRRTnY6r2cYxauB79L5gU2E= +github.com/pingcap/tidb v0.0.0-20190108123336-c68ee7318319/go.mod h1:qXpdYNt83vgSegvc/TNcxKGiAo4Pa4EtIJl0ka7yGXE= +github.com/pingcap/tidb-tools v2.1.3-0.20190104033906-883b07a04a73+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tipb v0.0.0-20170310053819-1043caee48da/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7 h1:wnjdQRhybddDesBVBKyOLUPgDaOFdtqA92pduBgWvVQ= +github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 h1:/NRJ5vAYoqz+7sG51ubIDHXeWO8DlTSrToPu6q11ziA= +github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs= +github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= +github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/struCoder/pidusage v0.1.2/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= +github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= +github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY= +github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= +github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/ugorji/go/codec v0.0.0-20181127175209-856da096dbdf/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= +github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181029044818-c44066c5c816/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.0.0-20171214130843-f21a4dfb5e38/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181008205924-a2b3f7f249e9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= +google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= +sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= diff --git a/schema/config.go b/schema/config.go new file mode 100644 index 000000000..c7fc81d61 --- /dev/null +++ b/schema/config.go @@ -0,0 +1,24 @@ +package schema + +const ( + StorageType_Boltdb string = "boltdb" + StorageType_Mysql string = "mysql" +) + +type TrackerConfig struct { + // The charset_set_server of source mysql, we need + // this charset to handle ddl statement + CharsetServer string + + // Storage type to store schema data, may be boltdb or mysql + Storage string + + // Boltdb file path to store data + Dir string + + // MySQL info to connect + Addr string + User string + Password string + Database string +} diff --git a/schema/definition.go b/schema/definition.go new file mode 100644 index 000000000..f51243443 --- /dev/null +++ b/schema/definition.go @@ -0,0 +1,90 @@ +package schema + +import ( + "github.com/bytewatch/ddl-executor" +) + +// MySQL type information. +const ( + TypeDecimal byte = 0 + TypeTiny byte = 1 + TypeShort byte = 2 + TypeLong byte = 3 + TypeFloat byte = 4 + TypeDouble byte = 5 + TypeNull byte = 6 + TypeTimestamp byte = 7 + TypeLonglong byte = 8 + TypeInt24 byte = 9 + TypeDate byte = 10 + /* Original name was TypeTime, renamed to Duration to resolve the conflict with Go type Time.*/ + TypeDuration byte = 11 + TypeDatetime byte = 12 + TypeYear byte = 13 + TypeNewDate byte = 14 + TypeVarchar byte = 15 + TypeBit byte = 16 + + TypeJSON byte = 0xf5 + TypeNewDecimal byte = 0xf6 + TypeEnum byte = 0xf7 + TypeSet byte = 0xf8 + TypeTinyBlob byte = 0xf9 + TypeMediumBlob byte = 0xfa + TypeLongBlob byte = 0xfb + TypeBlob byte = 0xfc + TypeVarString byte = 0xfd + TypeString byte = 0xfe + TypeGeometry byte = 0xff +) + +type IndexType string + +const ( + IndexType_NONE IndexType = "" + IndexType_PRI = "PRI" + IndexType_UNI = "UNI" + IndexType_MUL = "MUL" +) + +type TableDef struct { + Database string `json:"database"` + Name string `json:"name"` + Columns []*ColumnDef `json:"columns"` + Charset string `json:"charset"` +} + +type ColumnDef struct { + Name string `json:"name"` + Type string `json:"type"` + InnerType byte `json:"inner_type"` + Key IndexType `json:"key"` + Charset string `json:"charset"` + Unsigned bool `json:"unsigned"` + Nullable bool `json:"nullable"` +} + +func makeColumnDef(c *executor.ColumnDef) *ColumnDef { + return &ColumnDef{ + Name: c.Name, + Type: c.Type, + InnerType: c.InnerType, + Key: IndexType(c.Key), + Charset: c.Charset, + Unsigned: c.Unsigned, + Nullable: c.Nullable, + } +} + +func makeTableDef(t *executor.TableDef) *TableDef { + tableDef := &TableDef{ + Database: t.Database, + Name: t.Name, + Charset: t.Charset, + Columns: make([]*ColumnDef, 0, len(t.Columns)), + } + for _, c := range t.Columns { + tableDef.Columns = append(tableDef.Columns, makeColumnDef(c)) + } + return tableDef +} diff --git a/schema/schema.go b/schema/schema.go deleted file mode 100644 index a3ec4c7ee..000000000 --- a/schema/schema.go +++ /dev/null @@ -1,398 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package schema - -import ( - "database/sql" - "fmt" - "strings" - - "github.com/pingcap/errors" - "github.com/siddontang/go-mysql/mysql" -) - -var ErrTableNotExist = errors.New("table is not exist") -var ErrMissingTableMeta = errors.New("missing table meta") -var HAHealthCheckSchema = "mysql.ha_health_check" - -// Different column type -const ( - TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year - TYPE_FLOAT // float, double - TYPE_ENUM // enum - TYPE_SET // set - TYPE_STRING // other - TYPE_DATETIME // datetime - TYPE_TIMESTAMP // timestamp - TYPE_DATE // date - TYPE_TIME // time - TYPE_BIT // bit - TYPE_JSON // json - TYPE_DECIMAL // decimal -) - -type TableColumn struct { - Name string - Type int - Collation string - RawType string - IsAuto bool - IsUnsigned bool - EnumValues []string - SetValues []string -} - -type Index struct { - Name string - Columns []string - Cardinality []uint64 -} - -type Table struct { - Schema string - Name string - - Columns []TableColumn - Indexes []*Index - PKColumns []int - - UnsignedColumns []int -} - -func (ta *Table) String() string { - return fmt.Sprintf("%s.%s", ta.Schema, ta.Name) -} - -func (ta *Table) AddColumn(name string, columnType string, collation string, extra string) { - index := len(ta.Columns) - ta.Columns = append(ta.Columns, TableColumn{Name: name, Collation: collation}) - ta.Columns[index].RawType = columnType - - if strings.HasPrefix(columnType, "float") || - strings.HasPrefix(columnType, "double") { - ta.Columns[index].Type = TYPE_FLOAT - } else if strings.HasPrefix(columnType, "decimal") { - ta.Columns[index].Type = TYPE_DECIMAL - } else if strings.HasPrefix(columnType, "enum") { - ta.Columns[index].Type = TYPE_ENUM - ta.Columns[index].EnumValues = strings.Split(strings.Replace( - strings.TrimSuffix( - strings.TrimPrefix( - columnType, "enum("), - ")"), - "'", "", -1), - ",") - } else if strings.HasPrefix(columnType, "set") { - ta.Columns[index].Type = TYPE_SET - ta.Columns[index].SetValues = strings.Split(strings.Replace( - strings.TrimSuffix( - strings.TrimPrefix( - columnType, "set("), - ")"), - "'", "", -1), - ",") - } else if strings.HasPrefix(columnType, "datetime") { - ta.Columns[index].Type = TYPE_DATETIME - } else if strings.HasPrefix(columnType, "timestamp") { - ta.Columns[index].Type = TYPE_TIMESTAMP - } else if strings.HasPrefix(columnType, "time") { - ta.Columns[index].Type = TYPE_TIME - } else if "date" == columnType { - ta.Columns[index].Type = TYPE_DATE - } else if strings.HasPrefix(columnType, "bit") { - ta.Columns[index].Type = TYPE_BIT - } else if strings.HasPrefix(columnType, "json") { - ta.Columns[index].Type = TYPE_JSON - } else if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") { - ta.Columns[index].Type = TYPE_NUMBER - } else { - ta.Columns[index].Type = TYPE_STRING - } - - if strings.Contains(columnType, "unsigned") || strings.Contains(columnType, "zerofill") { - ta.Columns[index].IsUnsigned = true - ta.UnsignedColumns = append(ta.UnsignedColumns, index) - } - - if extra == "auto_increment" { - ta.Columns[index].IsAuto = true - } -} - -func (ta *Table) FindColumn(name string) int { - for i, col := range ta.Columns { - if col.Name == name { - return i - } - } - return -1 -} - -func (ta *Table) GetPKColumn(index int) *TableColumn { - return &ta.Columns[ta.PKColumns[index]] -} - -func (ta *Table) AddIndex(name string) (index *Index) { - index = NewIndex(name) - ta.Indexes = append(ta.Indexes, index) - return index -} - -func NewIndex(name string) *Index { - return &Index{name, make([]string, 0, 8), make([]uint64, 0, 8)} -} - -func (idx *Index) AddColumn(name string, cardinality uint64) { - idx.Columns = append(idx.Columns, name) - if cardinality == 0 { - cardinality = uint64(len(idx.Cardinality) + 1) - } - idx.Cardinality = append(idx.Cardinality, cardinality) -} - -func (idx *Index) FindColumn(name string) int { - for i, colName := range idx.Columns { - if name == colName { - return i - } - } - return -1 -} - -func IsTableExist(conn mysql.Executer, schema string, name string) (bool, error) { - query := fmt.Sprintf("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' and TABLE_NAME = '%s' LIMIT 1", schema, name) - r, err := conn.Execute(query) - if err != nil { - return false, errors.Trace(err) - } - - return r.RowNumber() == 1, nil -} - -func NewTableFromSqlDB(conn *sql.DB, schema string, name string) (*Table, error) { - ta := &Table{ - Schema: schema, - Name: name, - Columns: make([]TableColumn, 0, 16), - Indexes: make([]*Index, 0, 8), - } - - if err := ta.fetchColumnsViaSqlDB(conn); err != nil { - return nil, errors.Trace(err) - } - - if err := ta.fetchIndexesViaSqlDB(conn); err != nil { - return nil, errors.Trace(err) - } - - return ta, nil -} - -func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) { - ta := &Table{ - Schema: schema, - Name: name, - Columns: make([]TableColumn, 0, 16), - Indexes: make([]*Index, 0, 8), - } - - if err := ta.fetchColumns(conn); err != nil { - return nil, errors.Trace(err) - } - - if err := ta.fetchIndexes(conn); err != nil { - return nil, errors.Trace(err) - } - - return ta, nil -} - -func (ta *Table) fetchColumns(conn mysql.Executer) error { - r, err := conn.Execute(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name)) - if err != nil { - return errors.Trace(err) - } - - for i := 0; i < r.RowNumber(); i++ { - name, _ := r.GetString(i, 0) - colType, _ := r.GetString(i, 1) - collation, _ := r.GetString(i, 2) - extra, _ := r.GetString(i, 6) - - ta.AddColumn(name, colType, collation, extra) - } - - return nil -} - -func (ta *Table) fetchColumnsViaSqlDB(conn *sql.DB) error { - r, err := conn.Query(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name)) - if err != nil { - return errors.Trace(err) - } - - defer r.Close() - - var unusedVal interface{} - unused := &unusedVal - - for r.Next() { - var name, colType, extra string - var collation sql.NullString - err := r.Scan(&name, &colType, &collation, &unused, &unused, &unused, &extra, &unused, &unused) - if err != nil { - return errors.Trace(err) - } - ta.AddColumn(name, colType, collation.String, extra) - } - - return r.Err() -} - -func (ta *Table) fetchIndexes(conn mysql.Executer) error { - r, err := conn.Execute(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name)) - if err != nil { - return errors.Trace(err) - } - var currentIndex *Index - currentName := "" - - for i := 0; i < r.RowNumber(); i++ { - indexName, _ := r.GetString(i, 2) - if currentName != indexName { - currentIndex = ta.AddIndex(indexName) - currentName = indexName - } - cardinality, _ := r.GetUint(i, 6) - colName, _ := r.GetString(i, 4) - currentIndex.AddColumn(colName, cardinality) - } - - return ta.fetchPrimaryKeyColumns() - -} - -func (ta *Table) fetchIndexesViaSqlDB(conn *sql.DB) error { - r, err := conn.Query(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name)) - if err != nil { - return errors.Trace(err) - } - - defer r.Close() - - var currentIndex *Index - currentName := "" - - var unusedVal interface{} - unused := &unusedVal - - for r.Next() { - var indexName, colName string - var cardinality interface{} - - err := r.Scan( - &unused, - &unused, - &indexName, - &unused, - &colName, - &unused, - &cardinality, - &unused, - &unused, - &unused, - &unused, - &unused, - &unused, - ) - if err != nil { - return errors.Trace(err) - } - - if currentName != indexName { - currentIndex = ta.AddIndex(indexName) - currentName = indexName - } - - c := toUint64(cardinality) - currentIndex.AddColumn(colName, c) - } - - return ta.fetchPrimaryKeyColumns() -} - -func toUint64(i interface{}) uint64 { - switch i := i.(type) { - case int: - return uint64(i) - case int8: - return uint64(i) - case int16: - return uint64(i) - case int32: - return uint64(i) - case int64: - return uint64(i) - case uint: - return uint64(i) - case uint8: - return uint64(i) - case uint16: - return uint64(i) - case uint32: - return uint64(i) - case uint64: - return uint64(i) - } - - return 0 -} - -func (ta *Table) fetchPrimaryKeyColumns() error { - if len(ta.Indexes) == 0 { - return nil - } - - pkIndex := ta.Indexes[0] - if pkIndex.Name != "PRIMARY" { - return nil - } - - ta.PKColumns = make([]int, len(pkIndex.Columns)) - for i, pkCol := range pkIndex.Columns { - ta.PKColumns[i] = ta.FindColumn(pkCol) - } - - return nil -} - -// Get primary keys in one row for a table, a table may use multi fields as the PK -func (ta *Table) GetPKValues(row []interface{}) ([]interface{}, error) { - indexes := ta.PKColumns - if len(indexes) == 0 { - return nil, errors.Errorf("table %s has no PK", ta) - } else if len(ta.Columns) != len(row) { - return nil, errors.Errorf("table %s has %d columns, but row data %v len is %d", ta, - len(ta.Columns), row, len(row)) - } - - values := make([]interface{}, 0, len(indexes)) - - for _, index := range indexes { - values = append(values, row[index]) - } - - return values, nil -} - -// Get term column's value -func (ta *Table) GetColumnValue(column string, row []interface{}) (interface{}, error) { - index := ta.FindColumn(column) - if index == -1 { - return nil, errors.Errorf("table %s has no column name %s", ta, column) - } - - return row[index], nil -} diff --git a/schema/schema_test.go b/schema/schema_test.go deleted file mode 100644 index c5bafe162..000000000 --- a/schema/schema_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package schema - -import ( - "database/sql" - "flag" - "fmt" - "testing" - - . "github.com/pingcap/check" - "github.com/siddontang/go-mysql/client" - _ "github.com/siddontang/go-mysql/driver" -) - -// use docker mysql for test -var host = flag.String("host", "127.0.0.1", "MySQL host") - -func Test(t *testing.T) { - TestingT(t) -} - -type schemaTestSuite struct { - conn *client.Conn - sqlDB *sql.DB -} - -var _ = Suite(&schemaTestSuite{}) - -func (s *schemaTestSuite) SetUpSuite(c *C) { - var err error - s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, 3306), "root", "", "test") - c.Assert(err, IsNil) - - s.sqlDB, err = sql.Open("mysql", fmt.Sprintf("root:@%s:3306", *host)) - c.Assert(err, IsNil) -} - -func (s *schemaTestSuite) TearDownSuite(c *C) { - if s.conn != nil { - s.conn.Close() - } - - if s.sqlDB != nil { - s.sqlDB.Close() - } -} - -func (s *schemaTestSuite) TestSchema(c *C) { - _, err := s.conn.Execute(`DROP TABLE IF EXISTS schema_test`) - c.Assert(err, IsNil) - - str := ` - CREATE TABLE IF NOT EXISTS schema_test ( - id INT, - id1 INT, - id2 INT, - name VARCHAR(256), - status ENUM('appointing','serving','abnormal','stop','noaftermarket','finish','financial_audit'), - se SET('a', 'b', 'c'), - f FLOAT, - d DECIMAL(2, 1), - uint INT UNSIGNED, - zfint INT ZEROFILL, - name_ucs VARCHAR(256) CHARACTER SET ucs2, - name_utf8 VARCHAR(256) CHARACTER SET utf8, - PRIMARY KEY(id2, id), - UNIQUE (id1), - INDEX name_idx (name) - ) ENGINE = INNODB; - ` - - _, err = s.conn.Execute(str) - c.Assert(err, IsNil) - - ta, err := NewTable(s.conn, "test", "schema_test") - c.Assert(err, IsNil) - - c.Assert(ta.Columns, HasLen, 12) - c.Assert(ta.Indexes, HasLen, 3) - c.Assert(ta.PKColumns, DeepEquals, []int{2, 0}) - c.Assert(ta.Indexes[0].Columns, HasLen, 2) - c.Assert(ta.Indexes[0].Name, Equals, "PRIMARY") - c.Assert(ta.Indexes[2].Name, Equals, "name_idx") - c.Assert(ta.Columns[4].EnumValues, DeepEquals, []string{"appointing", "serving", "abnormal", "stop", "noaftermarket", "finish", "financial_audit"}) - c.Assert(ta.Columns[5].SetValues, DeepEquals, []string{"a", "b", "c"}) - c.Assert(ta.Columns[7].Type, Equals, TYPE_DECIMAL) - c.Assert(ta.Columns[0].IsUnsigned, IsFalse) - c.Assert(ta.Columns[8].IsUnsigned, IsTrue) - c.Assert(ta.Columns[9].IsUnsigned, IsTrue) - c.Assert(ta.Columns[10].Collation, Matches, "^ucs2.*") - c.Assert(ta.Columns[11].Collation, Matches, "^utf8.*") - - taSqlDb, err := NewTableFromSqlDB(s.sqlDB, "test", "schema_test") - c.Assert(err, IsNil) - - c.Assert(taSqlDb, DeepEquals, ta) -} - -func (s *schemaTestSuite) TestQuoteSchema(c *C) { - str := "CREATE TABLE IF NOT EXISTS `a-b_test` (`a.b` INT) ENGINE = INNODB" - - _, err := s.conn.Execute(str) - c.Assert(err, IsNil) - - ta, err := NewTable(s.conn, "test", "a-b_test") - c.Assert(err, IsNil) - - c.Assert(ta.Columns[0].Name, Equals, "a.b") -} diff --git a/schema/storage.go b/schema/storage.go new file mode 100644 index 000000000..87f46c345 --- /dev/null +++ b/schema/storage.go @@ -0,0 +1,28 @@ +package schema + +import ( + "github.com/siddontang/go-mysql/mysql" +) + +type SchemaStorage interface { + // SaveSnapshot will be called when schema tracker decides to save a snapshot + SaveSnapshot(data []byte, pos mysql.Position) error + + // SaveSnapshot will be called when schema tracker decides to save a ddl statement + SaveStatement(db string, statement string, pos mysql.Position) error + + // LoadLastSnapshot will be called when schema tracker need to restore snapshot, as base data + LoadLastSnapshot() ([]byte, mysql.Position, error) + + // Reset will be called to get a empty storage + Reset() error + + // StatementIterator return an iterator which can iterate all ddl statements after last snapshot + StatementIterator() Iterator +} + +type Iterator interface { + First() (string, string, mysql.Position, error) + Next() (string, string, mysql.Position, error) + End() bool +} diff --git a/schema/storage_boltdb.go b/schema/storage_boltdb.go new file mode 100644 index 000000000..790a450cc --- /dev/null +++ b/schema/storage_boltdb.go @@ -0,0 +1,281 @@ +package schema + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "github.com/boltdb/bolt" + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" + "time" +) + +// The value of blotdb +type value struct { + Name string + Pos uint32 + Snapshot []byte + Database string + Statement string + Time time.Time +} + +type boltdbStorage struct { + path string + curServerID uint32 + + db *bolt.DB +} + +func NewBoltdbStorage(path string) (*boltdbStorage, error) { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + return nil, err + } + + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte("meta")) + if err != nil { + return err + } + _, err = tx.CreateBucketIfNotExists([]byte("data")) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return nil, err + } + + storage := &boltdbStorage{ + path: path, + db: db, + } + + return storage, nil +} + +// Save snapshot data +func (o *boltdbStorage) SaveSnapshot(data []byte, pos mysql.Position) error { + err := o.db.Update(func(tx *bolt.Tx) error { + var err error + bucket := tx.Bucket([]byte("data")) + + id, _ := bucket.NextSequence() + + // Make a sortable key base on id + key, err := makeKey(id) + if err != nil { + return err + } + value, err := makeValue(data, "", "", pos) + if err != nil { + return err + } + + // Save snapshot + err = bucket.Put(key, value) + if err != nil { + return err + } + + // Save the key of last snapshot into meta + meta := tx.Bucket([]byte("meta")) + err = meta.Put([]byte("last_snapshot"), key) + if err != nil { + return err + } + + // Purge the expired data + return o.purge(tx) + }) + + if err != nil { + return err + } + + return nil +} + +func (o *boltdbStorage) LoadLastSnapshot() ([]byte, mysql.Position, error) { + var pos mysql.Position + var value value + var data []byte + + err := o.db.View(func(tx *bolt.Tx) error { + meta := tx.Bucket([]byte("meta")) + key := meta.Get([]byte("last_snapshot")) + + if key == nil { + // Maybe this is in initial startup + return nil + } + + bucket := tx.Bucket([]byte("data")) + + valueBytes := bucket.Get([]byte(key)) + err := json.Unmarshal(valueBytes, &value) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return nil, pos, err + } + + pos.Name = value.Name + pos.Pos = value.Pos + data = value.Snapshot + + return data, pos, nil +} + +func (o *boltdbStorage) SaveStatement(db string, statement string, pos mysql.Position) error { + // TODO + return nil +} + +func (o *boltdbStorage) Reset() error { + err := o.db.Update(func(tx *bolt.Tx) error { + // Delete meta bucket if exists + if tx.Bucket([]byte("meta")) != nil { + err := tx.DeleteBucket([]byte("meta")) + if err != nil { + return err + } + } + // Re-create meta bucket + _, err := tx.CreateBucketIfNotExists([]byte("meta")) + if err != nil { + return err + } + + // Delete data bucket if exists + if tx.Bucket([]byte("data")) != nil { + err := tx.DeleteBucket([]byte("data")) + if err != nil { + return err + } + } + // Re-create data bucket + _, err = tx.CreateBucketIfNotExists([]byte("data")) + if err != nil { + return err + } + + // Reset current server_id + o.curServerID = 0 + return nil + }) + + if err != nil { + return err + } + + return nil +} + +func (o *boltdbStorage) StatementIterator() Iterator { + return &boltdbStorageIterator{} +} + +// Purge the snapshot or statement before the last snapshot +func (o *boltdbStorage) purge(tx *bolt.Tx) error { + meta := tx.Bucket([]byte("meta")) + key := meta.Get([]byte("last_snapshot")) + + if key == nil { + return nil + } + + bucket := tx.Bucket([]byte("data")) + if bucket == nil { + return errors.Errorf("the bucket of server_id: %d is missing", o.curServerID) + } + + c := bucket.Cursor() + k, _ := c.Seek(key) + if k == nil { + return errors.Errorf("the k-v of key: %d is missing", key) + } + + for { + k, v := c.Prev() + if k == nil { + break + } + + var value value + err := json.Unmarshal(v, &value) + if err != nil { + return err + } + if time.Now().Sub(value.Time) >= 7*24*time.Hour { + bucket.Delete(k) + } + + } + + return nil +} + +func makeValue(snapshot []byte, db string, statement string, pos mysql.Position) ([]byte, error) { + var value value + value.Name = pos.Name + value.Pos = pos.Pos + value.Snapshot = snapshot + value.Database = db + value.Statement = statement + value.Time = time.Now() + + buf, err := json.Marshal(value) + if err != nil { + return nil, err + } + return buf, err +} + +// Make a sortable bytes slice, used as key of blotdb key-value +func makeKey(id uint64) ([]byte, error) { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, id) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func unmarshal(b []byte, v interface{}) error { + buffer := bytes.NewBuffer(b) + err := binary.Read(buffer, binary.BigEndian, v) + if err != nil { + return err + } + return nil +} + +type boltdbStorageIterator struct { + nextKey []byte + end bool +} + +func (o *boltdbStorageIterator) First() (string, string, mysql.Position, error) { + // TODO + var pos mysql.Position + return "", "", pos, nil +} + +func (o *boltdbStorageIterator) Next() (string, string, mysql.Position, error) { + // TODO + var pos mysql.Position + return "", "", pos, nil +} + +func (o *boltdbStorageIterator) End() bool { + // TODO + return o.end +} diff --git a/schema/storage_mysql.go b/schema/storage_mysql.go new file mode 100644 index 000000000..bbea5c3a4 --- /dev/null +++ b/schema/storage_mysql.go @@ -0,0 +1,189 @@ +package schema + +import ( + "database/sql" + "fmt" + _ "github.com/go-sql-driver/mysql" + "github.com/siddontang/go-log/log" + "github.com/siddontang/go-mysql/mysql" +) + +var ( + tableName string = "tb_schema_data" + initQuery string = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( + id INT NOT NULL AUTO_INCREMENT, + name VARCHAR(255) NOT NULL DEFAULT '' COMMENT 'binlog name', + pos INT UNSIGNED NOT NULL DEFAULT 0 COMMENT 'binlog pos', + snapshot LONGBLOB NOT NULL COMMENT 'snapshot of schema', + statement LONGBLOB NOT NULL COMMENT 'ddl statement', + type ENUM('snapshot','statement') NOT NULL DEFAULT 'snapshot' COMMENT 'snapshot or statement', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time of this record', + PRIMARY KEY(id) + )`, tableName) +) + +type mysqlStorage struct { + dsn string +} + +func NewMysqlStorage(addr string, user string, password string, database string) (*mysqlStorage, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", user, password, addr, database) + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + defer db.Close() + + _, err = db.Exec(initQuery) + if err != nil { + log.Errorf("create table error: %s", err) + return nil, err + } + + storage := &mysqlStorage{ + dsn: dsn, + } + return storage, nil +} + +func (o *mysqlStorage) SaveSnapshot(data []byte, pos mysql.Position) error { + var err error + db, err := sql.Open("mysql", o.dsn) + if err != nil { + return err + } + defer db.Close() + + tx, err := db.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + } + }() + + query := fmt.Sprintf( + "INSERT INTO %s SET name=?, pos=?, snapshot=?, statement='', type = ?", + tableName) + _, err = tx.Exec(query, pos.Name, pos.Pos, data, "snapshot") + if err != nil { + log.Errorf("insert into db error: %s", err) + return err + } + + // Purge expired data + err = o.purge(tx) + if err != nil { + log.Errorf("purge data error: %s", err) + return err + } + + err = tx.Commit() + if err != nil { + log.Errorf("commit transaction error: %s", err) + return err + } + + return nil +} + +func (o *mysqlStorage) LoadLastSnapshot() ([]byte, mysql.Position, error) { + var pos mysql.Position + var data []byte + + db, err := sql.Open("mysql", o.dsn) + if err != nil { + return nil, pos, err + } + defer db.Close() + + query := fmt.Sprintf( + "SELECT name, pos, snapshot FROM %s WHERE type='snapshot' ORDER BY id DESC LIMIT 1 ", + tableName) + row := db.QueryRow(query) + err = row.Scan(&pos.Name, &pos.Pos, &data) + if err != nil && err != sql.ErrNoRows { + log.Errorf("query from db error: %s", err) + return nil, pos, err + } + + return data, pos, nil +} + +func (o *mysqlStorage) SaveStatement(db string, statement string, pos mysql.Position) error { + // TODO + return nil +} + +func (o *mysqlStorage) Reset() error { + db, err := sql.Open("mysql", o.dsn) + if err != nil { + return err + } + defer db.Close() + + sql := fmt.Sprintf("DELETE FROM `%s`", tableName) + _, err = db.Exec(sql) + if err != nil { + log.Errorf("insert into db error: %s", err) + return err + } + + if err != nil { + return err + } + + return nil +} + +func (o *mysqlStorage) StatementIterator() Iterator { + return &mysqlStorageIterator{} +} + +// Purge the snapshot or statement before the last snapshot +func (o *mysqlStorage) purge(tx *sql.Tx) error { + var err error + var lastSnapshotId int + query := fmt.Sprintf( + "SELECT id FROM %s WHERE type='snapshot' ORDER BY id DESC LIMIT 1 ", + tableName) + row := tx.QueryRow(query) + err = row.Scan(&lastSnapshotId) + if err != nil && err != sql.ErrNoRows { + log.Errorf("query from db error: %s", err) + return err + } + query = fmt.Sprintf("DELETE FROM %s WHERE id < ? AND datediff(curdate(),create_time) >= 7", tableName) + _, err = tx.Exec(query, lastSnapshotId) + if err != nil { + log.Errorf("delete from db error: %s", err) + return err + } + + return nil + +} + +type mysqlStorageIterator struct { + nextId int + end bool +} + +func (o *mysqlStorageIterator) First() (string, string, mysql.Position, error) { + // TODO + var pos mysql.Position + return "", "", pos, nil +} + +func (o *mysqlStorageIterator) Next() (string, string, mysql.Position, error) { + // TODO + var pos mysql.Position + return "", "", pos, nil +} + +func (o *mysqlStorageIterator) End() bool { + // TODO + return o.end +} diff --git a/schema/tracker.go b/schema/tracker.go new file mode 100644 index 000000000..c88df6e72 --- /dev/null +++ b/schema/tracker.go @@ -0,0 +1,226 @@ +package schema + +import ( + "fmt" + "github.com/bytewatch/ddl-executor" + "github.com/siddontang/go-log/log" + "github.com/siddontang/go-mysql/mysql" +) + +var HAHealthCheckSchema = "mysql.ha_health_check" + +type SchemaTracker struct { + cfg *TrackerConfig + + curPos mysql.Position + + executor *executor.Executor + + storage SchemaStorage +} + +// New a schema tracker that can track DDL statements, making a schema mirror. +func NewSchemaTracker(cfg *TrackerConfig) (*SchemaTracker, error) { + var err error + var storage SchemaStorage + + switch cfg.Storage { + case StorageType_Boltdb: + storage, err = NewBoltdbStorage(cfg.Dir + "/schema.dat") + case StorageType_Mysql: + storage, err = NewMysqlStorage(cfg.Addr, cfg.User, cfg.Password, cfg.Database) + default: + err = fmt.Errorf("unknown storage type: %s", cfg.Storage) + } + if err != nil { + log.Errorf("new storage error: %s", err) + return nil, err + } + + // Restore schema from storage, into memory + snapshot, pos, err := storage.LoadLastSnapshot() + if err != nil { + log.Errorf("load last snapshot from storage error: %s", err) + return nil, err + } + + executor := executor.NewExecutor(&executor.Config{ + CharsetServer: cfg.CharsetServer, + LowerCaseTableNames: true, + NeedAtomic: true, + }) + err = executor.Restore(snapshot) + if err != nil { + log.Errorf("set snapshot to executor error: %s", err) + return nil, err + } + + // TODO: Replay all statements after last snapshot + + tracker := &SchemaTracker{ + cfg: cfg, + executor: executor, + storage: storage, + curPos: pos, + } + + return tracker, nil +} + +// Check whether the SQL statement is DDL, means not DML/DCL +func (o *SchemaTracker) IsDdl(sql string) (bool, error) { + return o.executor.IsDdl(sql) +} + +// Persistent the schema info into storage. +// Before Persist is called, must ensure the binlog DML events previous is synced. +func (o *SchemaTracker) Persist(pos mysql.Position) error { + snapshot, err := o.executor.Snapshot() + if err != nil { + log.Errorf("get executor snapshot error: %s", err) + return err + } + err = o.storage.SaveSnapshot(snapshot, pos) + if err != nil { + log.Errorf("save snapshot error: %s", err) + return err + } + + log.Infof("save snapshot succeed, pos: %s", pos) + o.curPos = pos + + return nil +} + +// Exec ddl statement, and persistent the schema info into storage +func (o *SchemaTracker) ExecAndPersist(db string, statement string, pos mysql.Position) error { + var err error + + // Check whether this ddl we have executed already. + // The comparison here doesn't care about server_id in order to + // be compatible with go-mysql/mysql.Position struct. + if pos.Compare(o.curPos) == 0 { + log.Warnf("this statement has been executed before: %s", pos) + return nil + } + + err = o.Exec(db, statement) + if err != nil { + return err + } + + if o.needTriggerSnapshot() { + snapshot, err := o.executor.Snapshot() + if err != nil { + log.Errorf("get executor snapshot error: %s", err) + return err + } + err = o.storage.SaveSnapshot(snapshot, pos) + if err != nil { + log.Errorf("save snapshot error: %s", err) + return err + } + log.Infof("save snapshot succeed, pos: %s", pos) + } else { + o.storage.SaveStatement(db, statement, pos) + if err != nil { + log.Errorf("save statement error: %s", err) + return err + } + log.Infof("save statement succeed, pos: %s", pos) + } + + o.curPos = pos + + return nil +} + +// Exec ddl statement, but don't persistent the schema info +func (o *SchemaTracker) Exec(db string, statement string) error { + var err error + if db != "" { + sql := "USE " + db + err = o.executor.Exec(sql) + if err != nil { + log.Errorf("execute sql error: %s, sql: %s", err, sql) + return err + } + } + + log.Infof("executing sql: %s", statement) + err = o.executor.Exec(statement) + if err != nil { + log.Errorf("execute sql error: %s, sql: %s", err, statement) + return err + } + + return nil +} + +func (o *SchemaTracker) GetTableDef(db string, table string) (*TableDef, error) { + t, err := o.executor.GetTableDef(db, table) + if err != nil { + // work around : RDS HAHeartBeat + // ref : https://github.com/alibaba/canal/blob/master/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L385 + // issue : https://github.com/alibaba/canal/issues/222 + // This is a common error in RDS that canal can't get HAHealthCheckSchema's meta, so we mock a table meta. + // If canal just skip and log error, as RDS HA heartbeat interval is very short, so too many HAHeartBeat errors will be logged. + key := fmt.Sprintf("%s.%s", db, table) + if key == HAHealthCheckSchema { + // mock ha_health_check meta + tableDef := &TableDef{ + Database: db, + Name: table, + Columns: make([]*ColumnDef, 0, 2), + } + tableDef.Columns = append( + tableDef.Columns, &ColumnDef{ + Name: "id", + Type: "bigint(20)", + InnerType: TypeLong, + }, &ColumnDef{ + Name: "type", + Type: "char(1)", + InnerType: TypeVarString, + }) + return tableDef, nil + } + return nil, err + } + + return makeTableDef(t), nil +} + +// Get all db names, like 'SHOW DATABASES' +func (o *SchemaTracker) GetDatabases() []string { + return o.executor.GetDatabases() +} + +// Get all table names in specified db, like 'SHOW TABLES' +func (o *SchemaTracker) GetTables(db string) ([]string, error) { + return o.executor.GetTables(db) +} + +// Reset executor's schema info and storage +func (o *SchemaTracker) Reset() error { + o.executor.Reset() + err := o.storage.Reset() + if err != nil { + log.Errorf("reset storage error: %s", err) + return err + } + return nil +} + +func (o *SchemaTracker) needTriggerSnapshot() bool { + // TODO + return true +} + +func (o *SchemaTracker) replayNextStatement() { + // TODO +} + +func (o *SchemaTracker) replayAllStatements() { + // TODO +}