From 0c0c5e063d119526b58a242f2e22d0904c3dcc5a Mon Sep 17 00:00:00 2001 From: ffffwh Date: Wed, 23 Nov 2022 16:27:33 +0800 Subject: [PATCH] use UTC timestamp #957 --- driver/mysql/binlog/binlog_reader.go | 6 +++--- driver/mysql/extractor.go | 2 +- driver/mysql/mysqlconfig/connection.go | 17 ++++++++--------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/driver/mysql/binlog/binlog_reader.go b/driver/mysql/binlog/binlog_reader.go index 2d671f00f..901f1f8c5 100644 --- a/driver/mysql/binlog/binlog_reader.go +++ b/driver/mysql/binlog/binlog_reader.go @@ -253,7 +253,8 @@ func NewBinlogReader( HeartbeatPeriod: 3 * time.Second, ReadTimeout: 12 * time.Second, - ParseTime: false, // must be false, or gencode will complain. + ParseTime: false, // must be false, or gencode will complain. + TimestampStringLocation: time.UTC, MemLimitSize: int64(g.MemAvailable / 5), MemLimitSeconds: 2, @@ -322,10 +323,9 @@ func (b *BinlogReader) ConnectBinlogStreamer(coordinates common.MySQLCoordinates ctx, b.relayCancelF = context.WithCancel(b.ctx) { - loc, _ := time.LoadLocation("Local") // TODO brConfig := &dmstreamer.BinlogReaderConfig{ RelayDir: b.getBinlogDir(), - Timezone: loc, + Timezone: time.UTC, } b.binlogReader = dmstreamer.NewBinlogReader(dmlog.L(), brConfig) } diff --git a/driver/mysql/extractor.go b/driver/mysql/extractor.go index fc990b9db..ca06c3450 100644 --- a/driver/mysql/extractor.go +++ b/driver/mysql/extractor.go @@ -737,7 +737,7 @@ func (e *Extractor) initDBConnections() (err error) { } } // https://github.com/go-sql-driver/mysql#system-variables - dumpUri := fmt.Sprintf("%s&%s='REPEATABLE-READ'", e.mysqlContext.SrcConnectionConfig.GetSingletonDBUri(), + dumpUri := fmt.Sprintf("%s&%s='REPEATABLE-READ'", e.mysqlContext.SrcConnectionConfig.GetDBUri(), getTxIsolationVarName(e.mysqlVersionDigit)) if e.singletonDB, err = sql.CreateDB(dumpUri); err != nil { return err diff --git a/driver/mysql/mysqlconfig/connection.go b/driver/mysql/mysqlconfig/connection.go index 01034e5e5..e44175951 100644 --- a/driver/mysql/mysqlconfig/connection.go +++ b/driver/mysql/mysqlconfig/connection.go @@ -8,8 +8,12 @@ package mysqlconfig import ( "fmt" + "net/url" ) +// insert TIMESTAMP in UTC +var utcTimeZoneQueryStr = fmt.Sprintf("time_zone=%v", url.QueryEscape("'+00:00'")) + // ConnectionConfig is the minimal configuration required to connect to a MySQL server type ConnectionConfig struct { Host string @@ -19,19 +23,14 @@ type ConnectionConfig struct { Charset string } -func (c *ConnectionConfig) GetDBUriByDbName(databaseName string) string { - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%v&maxAllowedPacket=0", c.User, c.Password, c.Host, c.Port, databaseName, c.Charset) -} - func (c *ConnectionConfig) GetDBUri() string { - if "" == c.Charset { + if c.Charset == "" { c.Charset = "utf8mb4" } - return fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&tls=false&autocommit=true&charset=%v&multiStatements=true&maxAllowedPacket=0", c.User, c.Password, c.Host, c.Port, c.Charset) -} -func (c *ConnectionConfig) GetSingletonDBUri() string { - return fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&tls=false&autocommit=false&charset=%v&multiStatements=true&maxAllowedPacket=0", c.User, c.Password, c.Host, c.Port, c.Charset) + return fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&tls=false&autocommit=true&charset=%v&%v&%v&%v", + c.User, c.Password, c.Host, c.Port, c.Charset, utcTimeZoneQueryStr, + "multiStatements=true", "maxAllowedPacket=0") } func (c *ConnectionConfig) GetAddr() string {