Skip to content

Commit

Permalink
Support txn insert when db sync (#290)
Browse files Browse the repository at this point in the history
Co-authored-by: walter <[email protected]>
  • Loading branch information
lsy3993 and w41ter authored Jan 13, 2025
1 parent 9161da4 commit 9b103b1
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 19 deletions.
70 changes: 52 additions & 18 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,40 @@ func (j *Job) getDbSyncTableRecords(upsert *record.Upsert) []*record.TableRecord
return tableRecords
}

func (j *Job) getStidsByDestTableId(destTableId int64, tableRecords []*record.TableRecord, stidMaps map[int64]int64) ([]int64, error) {
destStids := make([]int64, 0, 1)
uniqStids := make(map[int64]int64)

// first, get the source table id from j.progress.TableMapping
for sourceId, destId := range j.progress.TableMapping {
if destId != destTableId {
continue
}

// second, get the source stids from tableRecords
for _, tableRecord := range tableRecords {
if tableRecord.Id != sourceId {
continue
}

// third, get dest stids from partition
for _, partition := range tableRecord.PartitionRecords {
destStid := stidMaps[partition.Stid]
if destStid != 0 {
uniqStids[destStid] = 1
}
}
}
}

// dest stids may be repeated, get the unique stids
for key := range uniqStids {
destStid := key
destStids = append(destStids, destStid)
}
return destStids, nil
}

func (j *Job) getRelatedTableRecords(upsert *record.Upsert) ([]*record.TableRecord, error) {
var tableRecords []*record.TableRecord //, 0, len(upsert.TableRecords))

Expand Down Expand Up @@ -1478,11 +1512,13 @@ func (j *Job) ingestBinlogForTxnInsert(txnId int64, tableRecords []*record.Table

stidToCommitInfos := ingestBinlogJob.SubTxnToCommitInfos()
subTxnInfos := make([]*festruct.TSubTxnInfo, 0, len(stidMap))
for sourceStid, destStid := range stidMap {
destStid := destStid // if no this line, every element in subTxnInfos is the last tSubTxnInfo
destStids, err := j.getStidsByDestTableId(destTableId, tableRecords, stidMap)

for _, destStid := range destStids {
destStid := destStid
commitInfos := stidToCommitInfos[destStid]
if commitInfos == nil {
log.Warnf("no commit infos from source stid: %d; dest stid %d, just skip", sourceStid, destStid)
log.Warnf("no commit infos from dest stid %d, just skip", destStid)
continue
}

Expand Down Expand Up @@ -1594,10 +1630,6 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
log.Warnf("The txn insert is not supported yet")
return xerror.Errorf(xerror.Normal, "The txn insert is not supported yet")
}
if j.SyncType == DBSync {
log.Warnf("Txn insert is NOT supported when DBSync")
return xerror.Errorf(xerror.Normal, "Txn insert is NOT supported when DBSync")
}
isTxnInsert = true
}

Expand Down Expand Up @@ -1726,18 +1758,20 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {

// Step 3: ingest binlog
if isTxnInsert {
// When txn insert, only one table can be inserted, so use the first DestTableId
destTableId := inMemoryData.DestTableIds[0]

// When txn insert, use subTxnInfos to commit rather than commitInfos.
subTxnInfos, err := j.ingestBinlogForTxnInsert(txnId, tableRecords, stidMap, destTableId)
if err != nil {
rollback(err, inMemoryData)
return err
} else {
inMemoryData.SubTxnInfos = subTxnInfos
j.progress.NextSubCheckpoint(CommitTransaction, inMemoryData)
var allSubTxnInfos = make([]*festruct.TSubTxnInfo, 0, len(stidMap))
for _, destTableId := range inMemoryData.DestTableIds {
// When txn insert, use subTxnInfos to commit rather than commitInfos.
subTxnInfos, err := j.ingestBinlogForTxnInsert(txnId, tableRecords, stidMap, destTableId)
if err != nil {
rollback(err, inMemoryData)
return err
} else {
subTxnInfos := subTxnInfos
allSubTxnInfos = append(allSubTxnInfos, subTxnInfos...)
j.progress.NextSubCheckpoint(CommitTransaction, inMemoryData)
}
}
inMemoryData.SubTxnInfos = allSubTxnInfos
} else {
commitInfos, err := j.ingestBinlog(txnId, tableRecords)
if err != nil {
Expand Down
186 changes: 186 additions & 0 deletions regression-test/suites/db_sync/txn_insert/test_ds_txn_insert.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_txn_insert_db") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

if (!helper.has_feature("feature_txn_insert")) {
logger.info("Skip the test because the feature is not supported.")
return
}

def tableName1 = "t1_" + helper.randomSuffix()
def tableName2 = "t2_" + helper.randomSuffix()
def tableName3 = "t3_" + helper.randomSuffix()
def tableName4 = "t4_" + helper.randomSuffix()
def test_num = 0
def insert_num = 10

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

def hasRollupFull = { res -> Boolean
for (List<Object> row : res) {
if ((row[0] as String) == "${new_rollup_name}") {
return true
}
}
return false
}

helper.enableDbBinlog()

sql """
CREATE TABLE IF NOT EXISTS ${tableName1}
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市"
) ENGINE = olap
unique KEY(`user_id`, `date`)
PARTITION BY RANGE (`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_num" = "1", "binlog.enable" = "true","enable_unique_key_merge_on_write" = "false");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (`id` int)
ENGINE = olap unique KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
("replication_allocation" = "tag.location.default: 1", "binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName3}
(
id int,
name varchar(20)
) ENGINE = olap
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false","replication_allocation" = "tag.location.default: 1");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName4}
(
id int,
name varchar(20)
) ENGINE = olap
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false","replication_allocation" = "tag.location.default: 1");
"""

sql """ insert into ${tableName1} values (1, '2017-03-31', 'a'), (2, '2017-02-28', 'b'), (3, '2017-02-28', 'c'); """
sql """ insert into ${tableName2} values (3),(4),(5); """
sql """ insert into ${tableName3} values (111, 'aa'),(222, 'bb'),(333, 'cc'); """
sql """ insert into ${tableName4} values (12, 'xx'),(23, 'yy'),(34, 'aa'), (45, 'bb'), (56, 'cc'), (67, 'cc') """

helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName1}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName2}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName3}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName4}", 60))


logger.info("=== Test 0: Db sync ===")
sql "sync"
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName3} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName3}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName4} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName4}", 6, 30))

logger.info("=== Test 1: insert only ===")
sql """
begin;
insert into ${tableName1} select id, '2017-02-28', 'y1' from ${tableName4} where id = 23;
insert into ${tableName2} select id from ${tableName4} where id = 12;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} ", 4, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} ", 4, 30))


logger.info("=== Test 2: insert A + delete B ===")
sql """
set delete_without_partition = true;
begin;
insert into ${tableName2} select id from ${tableName4} where id = 23;
delete from ${tableName1} where user_id = 1;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} ", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} ", 5, 30))


logger.info("=== Test 3: insert A + delete B + update B ===")
sql """
set delete_without_partition = true;
begin;
insert into ${tableName2} select id from ${tableName4} where id = 34;
delete from ${tableName1} where user_id = 2;
update ${tableName1} set city = 'new' where user_id = 3;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} where city = 'new'", 1, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} where id = 34", 1, 30))



logger.info("=== Test 4: insert A + update B + delete C ===")
sql """
begin;
insert into ${tableName1} select id,'2017-03-01','xyz' from ${tableName4} where id = 45;
delete from ${tableName2} where id = 34;
update ${tableName3} set name = 'new' where id = 111;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} where city = 'xyz' and date = '2017-03-01'", 1, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2}", 5, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName3} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName3} where name = 'new'", 1, 30))
}


Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_txn_insert") {
suite("test_txn_insert_table") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

Expand Down

0 comments on commit 9b103b1

Please sign in to comment.