diff --git a/pkg/rm/tcc/fence/config/tcc_fence_config.go b/pkg/rm/tcc/fence/config/tcc_fence_config.go index 1cc516a71..96a6b836b 100644 --- a/pkg/rm/tcc/fence/config/tcc_fence_config.go +++ b/pkg/rm/tcc/fence/config/tcc_fence_config.go @@ -18,8 +18,9 @@ package config import ( + "database/sql" "go.uber.org/atomic" - + "seata.apache.org/seata-go/pkg/rm/tcc/fence" "seata.apache.org/seata-go/pkg/rm/tcc/fence/handler" ) @@ -32,8 +33,19 @@ func InitFence() { // todo implement } -func InitCleanTask() { - handler.GetFenceHandler().InitLogCleanChannel() +func InitCleanTask(dsn string) { + + db, err := sql.Open(fence.SeataFenceMySQLDriver, dsn) + if err != nil { + panic(err) + } + + defer db.Close() + + handler.GetFenceHandler().InitLogCleanChannel(db) + + handler.GetFenceHandler() + } func Destroy() { diff --git a/pkg/rm/tcc/fence/fennce_driver_test.go b/pkg/rm/tcc/fence/fennce_driver_test.go index c0e804e8b..6213d9d13 100644 --- a/pkg/rm/tcc/fence/fennce_driver_test.go +++ b/pkg/rm/tcc/fence/fennce_driver_test.go @@ -24,7 +24,7 @@ import ( "reflect" "testing" - gomonkey "github.com/agiledragon/gomonkey/v2" + "github.com/agiledragon/gomonkey/v2" "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" ) diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go index d719f01e1..8bf188f59 100644 --- a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -23,33 +23,30 @@ import ( "database/sql" "errors" "fmt" + "github.com/go-sql-driver/mysql" + "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao" + "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model" "sync" "time" - "github.com/go-sql-driver/mysql" - "seata.apache.org/seata-go/pkg/rm/tcc/fence/enum" - "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao" - "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model" "seata.apache.org/seata-go/pkg/tm" "seata.apache.org/seata-go/pkg/util/log" ) type tccFenceWrapperHandler struct { tccFenceDao dao.TCCFenceStore - logQueue chan *FenceLogIdentity + logQueue chan *model.FenceLogIdentity logCache list.List logQueueOnce sync.Once logQueueCloseOnce sync.Once -} - -type FenceLogIdentity struct { - xid string - branchId int64 + logTaskOnce sync.Once } const ( - maxQueueSize = 500 + maxQueueSize = 500 + channelDelete = 5 + cleanInterval = 5 * time.Minute ) var ( @@ -76,7 +73,7 @@ func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried) if err != nil { if mysqlError, ok := errors.Unwrap(err).(*mysql.MySQLError); ok && mysqlError.Number == 1062 { - // todo add clean command to channel. + log.Warnf("tcc fence record already exists, idempotency rejected. xid: %s, branchId: %d", xid, branchId) handler.pushCleanChannel(xid, branchId) } return fmt.Errorf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) @@ -157,33 +154,70 @@ func (handler *tccFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string, return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status) } -func (handler *tccFenceWrapperHandler) InitLogCleanChannel() { +func (handler *tccFenceWrapperHandler) InitLogCleanChannel(db *sql.DB) { + handler.logQueueOnce.Do(func() { - go handler.traversalCleanChannel() + go handler.traversalCleanChannel(db) + }) + + handler.logTaskOnce.Do(func() { + go handler.InitLogCleanTask(db) }) } +func (handler *tccFenceWrapperHandler) InitLogCleanTask(db *sql.DB) { + + ticker := time.NewTicker(cleanInterval) + defer ticker.Stop() + + for range ticker.C { + tx, err := db.Begin() + if err != nil { + log.Errorf("failed to begin transaction: %v", err) + continue + } + + expiredTime := time.Now().Add(-cleanInterval) + identityList, err := handler.tccFenceDao.QueryTCCFenceLogIdentityByMdDate(tx, expiredTime) + + if err != nil { + log.Errorf("failed to delete expired logs: %v", err) + tx.Rollback() + continue + } + + err = tx.Commit() + if err != nil { + log.Errorf("failed to commit transaction: %v", err) + } + + // push to clean channel + for _, identity := range identityList { + handler.logQueue <- &identity + } + } + +} + func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() { handler.logQueueCloseOnce.Do(func() { close(handler.logQueue) }) } -func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error { - // todo implement +func (handler *tccFenceWrapperHandler) deleteBatchFence(tx *sql.Tx, batch []model.FenceLogIdentity) error { + err := handler.tccFenceDao.DeleteMultipleTCCFenceLogIdentity(tx, batch) + if err != nil { + return fmt.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) + } return nil } -func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 { - // todo implement - return 0 -} - func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) { // todo implement - fli := &FenceLogIdentity{ - xid: xid, - branchId: branchId, + fli := &model.FenceLogIdentity{ + Xid: xid, + BranchId: branchId, } select { case handler.logQueue <- fli: @@ -194,11 +228,36 @@ func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int log.Infof("add one log to clean queue: %v ", fli) } -func (handler *tccFenceWrapperHandler) traversalCleanChannel() { - handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize) +func (handler *tccFenceWrapperHandler) traversalCleanChannel(db *sql.DB) { + handler.logQueue = make(chan *model.FenceLogIdentity, maxQueueSize) + + counter := 0 + batch := []model.FenceLogIdentity{} + for li := range handler.logQueue { - if err := handler.deleteFence(li.xid, li.branchId); err != nil { - log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId) + counter++ + batch = append(batch, *li) + + if counter%channelDelete == 0 { + tx, _ := db.Begin() + err := handler.deleteBatchFence(tx, batch) + if err != nil { + log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) + } else { + tx.Commit() + } + + batch = []model.FenceLogIdentity{} + } + } + + if len(batch) > 0 { + tx, _ := db.Begin() + err := handler.deleteBatchFence(tx, batch) + if err != nil { + log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) + } else { + tx.Commit() } } } diff --git a/pkg/rm/tcc/fence/store/db/dao/store_api.go b/pkg/rm/tcc/fence/store/db/dao/store_api.go index 7029daf04..47305b91a 100644 --- a/pkg/rm/tcc/fence/store/db/dao/store_api.go +++ b/pkg/rm/tcc/fence/store/db/dao/store_api.go @@ -36,6 +36,11 @@ type TCCFenceStore interface { // return the tcc fence do and error msg QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error) + // QueryTCCFenceLogIdentityByMdDate tcc fence do by status. + // param tx the tx will bind with user business method + // param datetime modify time + QueryTCCFenceLogIdentityByMdDate(tx *sql.Tx, datetime time.Time) ([]model.FenceLogIdentity, error) + // InsertTCCFenceDO tcc fence do boolean. // param tx the tx will bind with user business method // param tccFenceDO the tcc fence do @@ -57,11 +62,17 @@ type TCCFenceStore interface { // return the error msg DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error - // DeleteTCCFenceDOByMdfDate tcc fence by datetime. + // DeleteMultipleTCCFenceLogIdentity tcc fence log identity boolean. // param tx the tx will bind with user business method - // param datetime modify time + // param identity the tcc fence log identity // return the error msg - DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error + DeleteMultipleTCCFenceLogIdentity(tx *sql.Tx, identity []model.FenceLogIdentity) error + + // DeleteTCCFenceDOByMdfDate tcc fence by datetime. + // param tx the tx will bind with user business method + // param datetime modify time, int32 limit delete + // return the delete number and error msg + DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error) // SetLogTableName LogTable ColumnName // param logTableName logTableName diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go index 5da7ad7a6..377bc8a6e 100644 --- a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go @@ -21,6 +21,7 @@ import ( "context" "database/sql" "fmt" + "strings" "sync" "time" @@ -92,6 +93,35 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br return tccFenceDo, nil } +func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceLogIdentityByMdDate(tx *sql.Tx, datetime time.Time) ([]model.FenceLogIdentity, error) { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetQuerySQLByMdDate(t.logTableName)) + if err != nil { + return nil, fmt.Errorf("query tcc fence prepare sql failed, [%w]", err) + } + defer prepareStmt.Close() + + rows, err := prepareStmt.Query(datetime) + if err != nil { + return nil, fmt.Errorf("query tcc fence exec sql failed, [%w]", err) + } + defer rows.Close() + + var fenceLogIdentities []model.FenceLogIdentity + for rows.Next() { + var xid string + var branchId int64 + err := rows.Scan(&xid, &branchId) + if err != nil { + return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err) + } + fenceLogIdentities = append(fenceLogIdentities, model.FenceLogIdentity{ + Xid: xid, + BranchId: branchId, + }) + } + return fenceLogIdentities, nil +} + func (t *TccFenceStoreDatabaseMapper) InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error { prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetInsertLocalTCCLogSQL(t.logTableName)) if err != nil { @@ -157,26 +187,55 @@ func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDO(tx *sql.Tx, xid string, b return nil } -func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error { - prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName)) +func (t *TccFenceStoreDatabaseMapper) DeleteMultipleTCCFenceLogIdentity(tx *sql.Tx, identities []model.FenceLogIdentity) error { + + placeholders := strings.Repeat("(?,?),", len(identities)-1) + "(?,?)" + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GertDeleteSQLByBranchIdsAndXids(t.logTableName, placeholders)) if err != nil { return fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err) } defer prepareStmt.Close() - result, err := prepareStmt.Exec(datetime) + // prepare args + args := make([]interface{}, 0, len(identities)*2) + for _, identity := range identities { + args = append(args, identity.Xid, identity.BranchId) + } + + result, err := prepareStmt.Exec(args...) + if err != nil { - return fmt.Errorf("delete tcc fence exec sql failed, [%w]", err) + return fmt.Errorf("delete tcc fences exec sql failed, [%w]", err) } affected, err := result.RowsAffected() if err != nil || affected == 0 { - return fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err) + return fmt.Errorf("delete tcc fences get affected rows failed, [%w]", err) } return nil } +func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error) { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName)) + if err != nil { + return -1, fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err) + } + defer prepareStmt.Close() + + result, err := prepareStmt.Exec(datetime, limit) + if err != nil { + return -1, fmt.Errorf("delete tcc fence exec sql failed, [%w]", err) + } + + affected, err := result.RowsAffected() + if err != nil || affected == 0 { + return 0, fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err) + } + + return affected, nil +} + func (t *TccFenceStoreDatabaseMapper) SetLogTableName(logTable string) { t.logTableName = logTable } diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go index 3cd9056aa..9717f1ee9 100644 --- a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go @@ -23,6 +23,7 @@ import ( "database/sql/driver" "math" "reflect" + "strings" "testing" "time" @@ -111,6 +112,40 @@ func TestTccFenceStoreDatabaseMapper_QueryTCCFenceDO(t *testing.T) { assert.Nil(t, err) } +func TestTccFenceStoreDatabaseMapper_QueryTCCFenceLogIdentityByMdDate(t *testing.T) { + now := time.Now() + tccFenceDo := &model.TCCFenceDO{ + Xid: "123123124124", + BranchId: 12312312312, + ActionName: "fence_test", + Status: enum.StatusTried, + GmtCreate: now, + GmtModified: now, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetQuerySQLByMdDate("tcc_fence_log")). + ExpectQuery(). + WithArgs(driver.Value(tccFenceDo.GmtModified)). + WillReturnRows(sqlmock.NewRows([]string{"xid", "branch_id"}). + AddRow(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId))) + mock.ExpectCommit() + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + actualFenceDo, err := GetTccFenceStoreDatabaseMapper().QueryTCCFenceLogIdentityByMdDate(tx, tccFenceDo.GmtModified) + tx.Commit() + assert.Equal(t, tccFenceDo.Xid, actualFenceDo[0].Xid) + assert.Equal(t, tccFenceDo.BranchId, actualFenceDo[0].BranchId) + assert.Nil(t, err) +} + func TestTccFenceStoreDatabaseMapper_UpdateTCCFenceDO(t *testing.T) { now := time.Now() tccFenceDo := &model.TCCFenceDO{ @@ -177,6 +212,35 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDO(t *testing.T) { assert.Equal(t, nil, err) } +func TestTccFenceStoreDatabaseMapper_DeleteMultipleTCCFenceDO(t *testing.T) { + identities := []model.FenceLogIdentity{ + {Xid: "123123124124", BranchId: 12312312312}, + {Xid: "123123124125", BranchId: 12312312313}, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + + placeholders := strings.Repeat("(?,?),", len(identities)-1) + "(?,?)" + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GertDeleteSQLByBranchIdsAndXids("tcc_fence_log", placeholders)). + ExpectExec(). + WithArgs(driver.Value(identities[0].Xid), driver.Value(identities[0].BranchId), driver.Value(identities[1].Xid), driver.Value(identities[1].BranchId)). + WillReturnResult(sqlmock.NewResult(1, 2)) + mock.ExpectCommit() + + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + err = GetTccFenceStoreDatabaseMapper().DeleteMultipleTCCFenceLogIdentity(tx, identities) + tx.Commit() + assert.Equal(t, nil, err) +} + func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) { now := time.Now() tccFenceDo := &model.TCCFenceDO{ @@ -190,7 +254,7 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) { mock.ExpectBegin() mock.ExpectPrepare(sql2.GetDeleteSQLByMdfDateAndStatus("tcc_fence_log")). ExpectExec(). - WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32))). + WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32)), 1000). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() @@ -198,7 +262,12 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) { if err != nil { t.Fatalf("open conn failed msg :%v", err) } - err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32)) + affect, err := GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32), 1000) tx.Commit() assert.Equal(t, nil, err) + assert.Equal(t, int64(1), affect) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %v", err) + } } diff --git a/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go b/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go index 68a02a2ea..0acc014b2 100644 --- a/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go +++ b/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go @@ -44,3 +44,8 @@ type TCCFenceDO struct { // GmtModified update time GmtModified time.Time } + +type FenceLogIdentity struct { + Xid string + BranchId int64 +} diff --git a/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go b/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go index e4889497a..a2b510ee1 100644 --- a/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go +++ b/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go @@ -28,20 +28,32 @@ var ( // localTccLogPlaced The enum LocalTccLogPlaced localTccLogPlaced = " %s " + pramPlaceHolder = "%s " + // insertLocalTccLog The enum InsertLocalTccLog insertLocalTccLog = "insert into " + localTccLogPlaced + " (xid, branch_id, action_name, status, gmt_create, gmt_modified) values ( ?,?,?,?,?,?)" // queryByBranchIdAndXid The enum QueryByBranchIdAndXid queryByBranchIdAndXid = "select xid, branch_id, action_name, status, gmt_create, gmt_modified from " + localTccLogPlaced + " where xid = ? and branch_id = ? for update" + // queryByMdDate The enum QueryByMdDate + queryByMdDate = "select xid, branch_id from " + localTccLogPlaced + " where gmt_modified < ? " + + " and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" + // updateStatusByBranchIdAndXid The enum UpdateStatusByBranchIdAndXid updateStatusByBranchIdAndXid = "update " + localTccLogPlaced + " set status = ?, gmt_modified = ? where xid = ? and branch_id = ? and status = ? " // deleteByBranchIdAndXid The enum DeleteByBranchIdAndXid deleteByBranchIdAndXid = "delete from " + localTccLogPlaced + " where xid = ? and branch_id = ? " + // deleteByBranchIdsAndXids The enum DeleteByBranchIdsAndXids + deleteByBranchIdsAndXids = "delete from " + localTccLogPlaced + " where (xid,branch_id) in " + pramPlaceHolder + // deleteByDateAndStatus The enum DeleteByDateAndStatus - deleteByDateAndStatus = "delete from " + localTccLogPlaced + " where gmt_modified < ? and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" + deleteByDateAndStatus = "delete from " + localTccLogPlaced + + " where gmt_modified < ? and" + + " status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" + + " limit ?" ) func GetInsertLocalTCCLogSQL(localTccTable string) string { @@ -60,6 +72,14 @@ func GetDeleteSQLByBranchIdAndXid(localTccTable string) string { return fmt.Sprintf(deleteByBranchIdAndXid, localTccTable) } +func GertDeleteSQLByBranchIdsAndXids(localTccTable string, paramsPlaceHolder string) string { + return fmt.Sprintf(deleteByBranchIdsAndXids, localTccTable, paramsPlaceHolder) +} + func GetDeleteSQLByMdfDateAndStatus(localTccTable string) string { return fmt.Sprintf(deleteByDateAndStatus, localTccTable) } + +func GetQuerySQLByMdDate(localTccTable string) string { + return fmt.Sprintf(queryByMdDate, localTccTable) +}