Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : tccfence log table deleted should be optimized #745

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func initRmClient(cfg *Config) {
config.Init(cfg.ClientConfig.RmConfig.LockConfig)
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
tcc.InitTCC(cfg.TCCConfig.FenceConfig)
at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig)
at.InitXA(cfg.ClientConfig.XaConfig)
})
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func TestLoadPath(t *testing.T) {

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)
assert.Equal(t, false, cfg.TCCConfig.FenceConfig.Enable)
assert.Equal(t, "root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True", cfg.TCCConfig.FenceConfig.Url)
assert.Equal(t, "tcc_fence_log_test", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*60, cfg.TCCConfig.FenceConfig.CleanPeriod)

Expand Down Expand Up @@ -131,7 +133,7 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"enable":false,"url":"root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True","log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
Expand Down Expand Up @@ -168,6 +170,8 @@ func TestLoadJson(t *testing.T) {

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)
assert.Equal(t, false, cfg.TCCConfig.FenceConfig.Enable)
assert.Equal(t, "root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True", cfg.TCCConfig.FenceConfig.Url)
assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*80, cfg.TCCConfig.FenceConfig.CleanPeriod)

Expand Down
25 changes: 15 additions & 10 deletions pkg/rm/tcc/fence/config/tcc_fence_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@
package config

import (
"go.uber.org/atomic"

"database/sql"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/handler"
)

type TccFenceConfig struct {
Initialized atomic.Bool `default:"false"`
LogTableName string `default:"tcc_fence_log"`
}

func InitFence() {
// todo implement

}

func InitCleanTask() {
handler.GetFenceHandler().InitLogCleanChannel()
func InitCleanTask(dsn string) {

db, err := sql.Open("mysql", dsn)
if err != nil {
panic(err)
}

defer db.Close()

handler.GetFenceHandler().InitLogCleanChannel(db)

handler.GetFenceHandler().InitLogCleanTask(db)

}

func Destroy() {
Expand Down
19 changes: 19 additions & 0 deletions pkg/rm/tcc/fence/fence.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,35 @@ package fence

import (
"flag"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/config"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"time"
)

var (
FenceConfig Config
)

func InitFenceConfig(cfg Config) {
FenceConfig = cfg

if FenceConfig.Enable {
dao.GetTccFenceStoreDatabaseMapper().InitLogTableName(FenceConfig.LogTableName)
config.InitCleanTask(FenceConfig.Url)
}
}

type Config struct {
Enable bool `yaml:"enable" json:"enable" koanf:"enable"`
Url string `yaml:"url" json:"url" koanf:"url"`
LogTableName string `yaml:"log-table-name" json:"log-table-name" koanf:"log-table-name"`
CleanPeriod time.Duration `yaml:"clean-period" json:"clean-period" koanf:"clean-period"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enable, prefix+".enable", false, "Whether the fence is initialized.")
f.StringVar(&cfg.Url, prefix+".url", "", "Data source name.")
f.StringVar(&cfg.LogTableName, prefix+".log-table-name", "tcc_fence_log", "Undo log table name.")
f.DurationVar(&cfg.CleanPeriod, prefix+".clean-period", 24*time.Hour, "Undo log retention time.")
}
2 changes: 1 addition & 1 deletion pkg/rm/tcc/fence/fennce_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"reflect"
"testing"

gomonkey "github.com/agiledragon/gomonkey/v2"
"github.com/agiledragon/gomonkey/v2"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
)
Expand Down
115 changes: 87 additions & 28 deletions pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,30 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model"
"sync"
"time"

"github.com/go-sql-driver/mysql"

"seata.apache.org/seata-go/pkg/rm/tcc/fence/enum"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model"
"seata.apache.org/seata-go/pkg/tm"
"seata.apache.org/seata-go/pkg/util/log"
)

type tccFenceWrapperHandler struct {
tccFenceDao dao.TCCFenceStore
logQueue chan *FenceLogIdentity
logQueue chan *model.FenceLogIdentity
logCache list.List
logQueueOnce sync.Once
logQueueCloseOnce sync.Once
}

type FenceLogIdentity struct {
xid string
branchId int64
logTaskOnce sync.Once
}

const (
maxQueueSize = 500
maxQueueSize = 500
channelDelete = 5
cleanInterval = 5 * time.Minute
)

var (
Expand All @@ -76,7 +73,7 @@ func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql
err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried)
if err != nil {
if mysqlError, ok := errors.Unwrap(err).(*mysql.MySQLError); ok && mysqlError.Number == 1062 {
// todo add clean command to channel.
log.Warnf("tcc fence record already exists, idempotency rejected. xid: %s, branchId: %d", xid, branchId)
handler.pushCleanChannel(xid, branchId)
}
return fmt.Errorf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
Expand Down Expand Up @@ -157,33 +154,70 @@ func (handler *tccFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string,
return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status)
}

func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {
func (handler *tccFenceWrapperHandler) InitLogCleanChannel(db *sql.DB) {

handler.logQueueOnce.Do(func() {
go handler.traversalCleanChannel()
go handler.traversalCleanChannel(db)
})

handler.logTaskOnce.Do(func() {
go handler.InitLogCleanTask(db)
})
}

func (handler *tccFenceWrapperHandler) InitLogCleanTask(db *sql.DB) {

ticker := time.NewTicker(cleanInterval)
defer ticker.Stop()

for range ticker.C {
tx, err := db.Begin()
if err != nil {
log.Errorf("failed to begin transaction: %v", err)
continue
}

expiredTime := time.Now().Add(-cleanInterval)
identityList, err := handler.tccFenceDao.QueryTCCFenceLogIdentityByMdDate(tx, expiredTime)

if err != nil {
log.Errorf("failed to delete expired logs: %v", err)
tx.Rollback()
continue
}

err = tx.Commit()
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
}

// push to clean channel
for _, identity := range identityList {
handler.logQueue <- &identity
}
}

}

func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() {
handler.logQueueCloseOnce.Do(func() {
close(handler.logQueue)
})
}

func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error {
// todo implement
func (handler *tccFenceWrapperHandler) deleteBatchFence(tx *sql.Tx, batch []model.FenceLogIdentity) error {
err := handler.tccFenceDao.DeleteMultipleTCCFenceLogIdentity(tx, batch)
if err != nil {
return fmt.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
}
return nil
}

func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
// todo implement
return 0
}

func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
// todo implement
fli := &FenceLogIdentity{
xid: xid,
branchId: branchId,
fli := &model.FenceLogIdentity{
Xid: xid,
BranchId: branchId,
}
select {
case handler.logQueue <- fli:
Expand All @@ -194,11 +228,36 @@ func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int
log.Infof("add one log to clean queue: %v ", fli)
}

func (handler *tccFenceWrapperHandler) traversalCleanChannel() {
handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize)
func (handler *tccFenceWrapperHandler) traversalCleanChannel(db *sql.DB) {
handler.logQueue = make(chan *model.FenceLogIdentity, maxQueueSize)

counter := 0
batch := []model.FenceLogIdentity{}

for li := range handler.logQueue {
if err := handler.deleteFence(li.xid, li.branchId); err != nil {
log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId)
counter++
batch = append(batch, *li)

if counter%channelDelete == 0 {
tx, _ := db.Begin()
err := handler.deleteBatchFence(tx, batch)
if err != nil {
log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
} else {
tx.Commit()
}

batch = []model.FenceLogIdentity{}
}
}

if len(batch) > 0 {
tx, _ := db.Begin()
err := handler.deleteBatchFence(tx, batch)
if err != nil {
log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
} else {
tx.Commit()
}
}
}
17 changes: 14 additions & 3 deletions pkg/rm/tcc/fence/store/db/dao/store_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type TCCFenceStore interface {
// return the tcc fence do and error msg
QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error)

// QueryTCCFenceLogIdentityByMdDate tcc fence do by status.
// param tx the tx will bind with user business method
// param datetime modify time
QueryTCCFenceLogIdentityByMdDate(tx *sql.Tx, datetime time.Time) ([]model.FenceLogIdentity, error)

// InsertTCCFenceDO tcc fence do boolean.
// param tx the tx will bind with user business method
// param tccFenceDO the tcc fence do
Expand All @@ -57,11 +62,17 @@ type TCCFenceStore interface {
// return the error msg
DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// DeleteMultipleTCCFenceLogIdentity tcc fence log identity boolean.
// param tx the tx will bind with user business method
// param datetime modify time
// param identity the tcc fence log identity
// return the error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error
DeleteMultipleTCCFenceLogIdentity(tx *sql.Tx, identity []model.FenceLogIdentity) error

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// param tx the tx will bind with user business method
// param datetime modify time, int32 limit delete
// return the delete number and error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error)

// SetLogTableName LogTable ColumnName
// param logTableName logTableName
Expand Down
Loading
Loading