Skip to content

Commit

Permalink
binlog: do not decode rows for block table (#7622)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
GMHDBJD committed Nov 24, 2022
1 parent 8346e97 commit 9db6370
Show file tree
Hide file tree
Showing 19 changed files with 234 additions and 75 deletions.
8 changes: 5 additions & 3 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may

// BinlogReaderConfig is the configuration for BinlogReader.
type BinlogReaderConfig struct {
RelayDir string
Timezone *time.Location
Flavor string
RelayDir string
Timezone *time.Location
Flavor string
RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error
}

// BinlogReader is a binlog reader.
Expand Down Expand Up @@ -82,6 +83,7 @@ func newBinlogReader(logger log.Logger, cfg *BinlogReaderConfig, relay Process)
parser.SetVerifyChecksum(true)
// use string representation of decimal, to replicate the exact value
parser.SetUseDecimal(false)
parser.SetRowsEventDecodeFunc(cfg.RowsEventDecodeFunc)
if cfg.Timezone != nil {
parser.SetTimestampStringLocation(cfg.Timezone)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/binlogstream/streamer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca
if c.currentBinlogType == RemoteBinlog {
c.streamProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID}
} else {
c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID}
c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor, RowsEventDecodeFunc: c.syncCfg.RowsEventDecodeFunc}), c.enableGTID}
}

c.upstream, err = newLocationStream(c.streamProducer, location)
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (v *DataValidator) initialize() error {
return err
}

v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone)
v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList)
if err != nil {
return err
}
Expand Down
39 changes: 6 additions & 33 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
return
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone)
s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList)
if err != nil {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList)
if err != nil {
return err
}
Expand Down Expand Up @@ -392,11 +397,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
s.tctx.L(),
)

s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList)
if err != nil {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules)
if err != nil {
return terror.ErrSyncerUnitGenBinlogEventFilter.Delegate(err)
Expand Down Expand Up @@ -3330,33 +3330,6 @@ func (s *Syncer) checkpointID() string {
return strconv.FormatUint(uint64(s.cfg.ServerID), 10)
}

// UpdateFromConfig updates config for `From`.
func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error {
s.Lock()
defer s.Unlock()
s.fromDB.BaseDB.Close()

s.cfg.From = cfg.From

var err error
s.cfg.From.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout)
s.fromDB, err = dbconn.NewUpStreamConn(&s.cfg.From)
if err != nil {
s.tctx.L().Error("fail to create baseConn connection", log.ShortError(err))
return err
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone)
if err != nil {
return err
}

if s.streamerController != nil {
s.streamerController.UpdateSyncCfg(s.syncCfg, s.fromDB)
}
return nil
}

// ShardDDLOperation returns the current pending to handle shard DDL lock operation.
func (s *Syncer) ShardDDLOperation() *pessimism.Operation {
return s.pessimist.PendingOperation()
Expand Down
54 changes: 48 additions & 6 deletions dm/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"go.uber.org/zap"
)

// the time layout for TiDB SHOW DDL statements.
const timeLayout = "2006-01-02 15:04:05"

// everytime retrieve 10 new rows from TiDB history jobs.
const linesOfRows = 10
const (
// the time layout for TiDB SHOW DDL statements.
timeLayout = "2006-01-02 15:04:05"
// everytime retrieve 10 new rows from TiDB history jobs.
linesOfRows = 10
// max capacity of the block/allow list.
maxCapacity = 100000
)

// getTableByDML gets table from INSERT/UPDATE/DELETE statement.
func getTableByDML(dml ast.DMLNode) (*filter.Table, error) {
Expand Down Expand Up @@ -136,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg *config.DB
return loc, tzStr, nil
}

func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location) (replication.BinlogSyncerConfig, error) {
func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) {
var tlsConfig *tls.Config
var err error
if cfg.From.Security != nil {
Expand All @@ -153,6 +156,44 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
}
}

var rowsEventDecodeFunc func(*replication.RowsEvent, []byte) error
if baList != nil {
// we don't track delete table events, so simply reset the cache if it's full
// TODO: use LRU or CLOCK cache if needed.
allowListCache := make(map[uint64]struct{}, maxCapacity)
blockListCache := make(map[uint64]struct{}, maxCapacity)

rowsEventDecodeFunc = func(re *replication.RowsEvent, data []byte) error {
pos, err := re.DecodeHeader(data)
if err != nil {
return err
}
if _, ok := blockListCache[re.TableID]; ok {
return nil
} else if _, ok := allowListCache[re.TableID]; ok {
return re.DecodeData(pos, data)
}

tb := &filter.Table{
Schema: string(re.Table.Schema),
Name: string(re.Table.Table),
}
if skipByTable(baList, tb) {
if len(blockListCache) >= maxCapacity {
blockListCache = make(map[uint64]struct{}, maxCapacity)
}
blockListCache[re.TableID] = struct{}{}
return nil
}

if len(allowListCache) >= maxCapacity {
allowListCache = make(map[uint64]struct{}, maxCapacity)
}
allowListCache[re.TableID] = struct{}{}
return re.DecodeData(pos, data)
}
}

syncCfg := replication.BinlogSyncerConfig{
ServerID: cfg.ServerID,
Flavor: cfg.Flavor,
Expand All @@ -162,6 +203,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
Password: cfg.From.Password,
TimestampStringLocation: timezone,
TLSConfig: tlsConfig,
RowsEventDecodeFunc: rowsEventDecodeFunc,
}
// when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode,
// which may get duplicate binlog event after retry success. so just set retry count = 1, and task
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/binlog_parse/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["binlog_parse.t1"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/binlog_parse/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
42 changes: 42 additions & 0 deletions dm/tests/binlog_parse/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["binlog_parse"]
do-tables:
- db-name: "binlog_parse"
tbl-name: "t1"

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
2 changes: 2 additions & 0 deletions dm/tests/binlog_parse/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
12 changes: 12 additions & 0 deletions dm/tests/binlog_parse/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source-id: mysql-replica-01
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: false
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
3 changes: 3 additions & 0 deletions dm/tests/binlog_parse/data/db1.increment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use binlog_parse;
insert into t1 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01');
insert into t2 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01');
3 changes: 3 additions & 0 deletions dm/tests/binlog_parse/data/db1.increment1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use binlog_parse;
insert into t1 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01');
insert into t2 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01');
7 changes: 7 additions & 0 deletions dm/tests/binlog_parse/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
drop database if exists `binlog_parse`;
create database `binlog_parse` character set utf8;
use `binlog_parse`;
create table t1 (id int, created_time timestamp, primary key(`id`)) character set utf8;
create table t2 (id int, created_time timestamp(3), primary key(`id`)) character set utf8;
insert into t1 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01');
insert into t2 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01');
67 changes: 67 additions & 0 deletions dm/tests/binlog_parse/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

# skip one tale, sync another table
# mariadb10.0 timestamp(3) will panic before dm v6.4.0
function run() {
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

echo "prepare data"
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

echo "start task"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml --remove-meta"

echo "check full phase"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"validation start test" \
"\"result\": true" 1

echo "prepare incremental data"
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

echo "check incremental phase"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30

run_sql_tidb_with_retry "select count(1) from binlog_parse.t1;" "count(1): 4"

# relay error in mariadb:10.0, success in mysql
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1 \
"\"worker\": \"worker1\"" 1
# "TCPReader get relay event with error" 1

echo "prepare incremental data 2"
run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"validation start test" \
"\"result\": true" 1

echo "check incremental phase 2"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions dm/tests/others_integration_2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ sql_mode
http_proxies
openapi
duplicate_event
binlog_parse
17 changes: 0 additions & 17 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,23 +678,6 @@ func (st *SubTask) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchem
return syncUnit.OperateSchema(ctx, req)
}

// UpdateFromConfig updates config for `From`.
func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error {
st.Lock()
defer st.Unlock()

if sync, ok := st.currUnit.(*syncer.Syncer); ok {
err := sync.UpdateFromConfig(cfg)
if err != nil {
return err
}
}

st.cfg.From = cfg.From

return nil
}

// CheckUnit checks whether current unit is sync unit.
func (st *SubTask) CheckUnit() bool {
st.RLock()
Expand Down
Loading

0 comments on commit 9db6370

Please sign in to comment.