Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : tccfence log table deleted should be optimized #745

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ require (
)

require (
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/agiledragon/gomonkey/v2 v2.11.0
github.com/golang/protobuf v1.5.3
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
google.golang.org/protobuf v1.30.0
)

require (
Expand All @@ -56,7 +58,6 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/jinzhu/copier v0.3.5 // indirect
Expand Down Expand Up @@ -90,7 +91,6 @@ require (
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc=
github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U=
github.com/agiledragon/gomonkey/v2 v2.11.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion pkg/rm/tcc/fence/fennce_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"reflect"
"testing"

gomonkey "github.com/agiledragon/gomonkey/v2"
"github.com/agiledragon/gomonkey/v2"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
)
Expand Down
53 changes: 48 additions & 5 deletions pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type tccFenceWrapperHandler struct {
logCache list.List
logQueueOnce sync.Once
logQueueCloseOnce sync.Once
logTaskOnce sync.Once
}

type FenceLogIdentity struct {
Expand All @@ -50,6 +51,7 @@ type FenceLogIdentity struct {

const (
maxQueueSize = 500
limitDelete = 1000
)

var (
Expand Down Expand Up @@ -163,6 +165,40 @@ func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {
})
}

func (handler *tccFenceWrapperHandler) InitLogCleanTask(tx *sql.Tx) {
handler.logTaskOnce.Do(func() {
// Create a ticker that will fire initially based on the time remaining until the next midnight (00:00:00).
go func() {
ticker := time.NewTicker(getDurationUntilNextZero())
defer ticker.Stop()

for {
select {
case <-ticker.C:
now := time.Now()
// Get the start time of the day before the current day (00:00:00) and assign it to timeBefore with the same location as now.
timeBefore := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())

for {
deletedRowCount, err := handler.tccFenceDao.DeleteTCCFenceDOByMdfDate(tx, timeBefore, limitDelete)
if err != nil {
fmt.Printf("Error occurred during TCC fence clean task: %v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志统一使用 log 来输出

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种方式还有个问题,如果网络抖动一次,当前的删除工作就终止的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果采用评论区说的,使用channel,见听到一定条数删除一次,看看会有啥啥问题不

} else {
fmt.Printf("TCC fence clean task executed success, timeBefore: %v, deleted row count: %d\n", timeBefore, deletedRowCount)
}
if deletedRowCount <= 0 {
break
}
}

// Reset the ticker's interval to the duration until the next midnight to ensure it fires again close to midnight in the future.
ticker.Reset(getDurationUntilNextZero())
}
}
}()
})
}

func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() {
handler.logQueueCloseOnce.Do(func() {
close(handler.logQueue)
Expand All @@ -174,11 +210,6 @@ func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error {
return nil
}

func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
// todo implement
return 0
}

func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
// todo implement
fli := &FenceLogIdentity{
Expand All @@ -202,3 +233,15 @@ func (handler *tccFenceWrapperHandler) traversalCleanChannel() {
}
}
}

func getDurationUntilNextZero() time.Duration {
now := time.Now()
// Calculate the time for the next midnight (00:00:00) by getting the current date and setting the time to midnight.
nextZero := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour)
if now.Hour() == 0 && now.Minute() == 0 && now.Second() == 0 {
// If it's currently midnight, directly add 24 hours to get the time for the next midnight.
nextZero = now.Add(24 * time.Hour)
}
// Return the duration between the current time and the next midnight.
return nextZero.Sub(now)
}
6 changes: 3 additions & 3 deletions pkg/rm/tcc/fence/store/db/dao/store_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type TCCFenceStore interface {

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// param tx the tx will bind with user business method
// param datetime modify time
// return the error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error
// param datetime modify time, int32 limit delete
// return the delete number and error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error)

// SetLogTableName LogTable ColumnName
// param logTableName logTableName
Expand Down
12 changes: 6 additions & 6 deletions pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,24 @@ func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDO(tx *sql.Tx, xid string, b
return nil
}

func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error {
func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error) {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName))
if err != nil {
return fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err)
return -1, fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(datetime)
result, err := prepareStmt.Exec(datetime, limit)
if err != nil {
return fmt.Errorf("delete tcc fence exec sql failed, [%w]", err)
return -1, fmt.Errorf("delete tcc fence exec sql failed, [%w]", err)
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err)
return 0, fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err)
}

return nil
return affected, nil
}

func (t *TccFenceStoreDatabaseMapper) SetLogTableName(logTable string) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,20 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) {
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetDeleteSQLByMdfDateAndStatus("tcc_fence_log")).
ExpectExec().
WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32))).
WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32)), 1000).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}
err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32))
affect, err := GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32), 1000)
tx.Commit()
assert.Equal(t, nil, err)
assert.Equal(t, int64(1), affect)

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %v", err)
}
}
5 changes: 4 additions & 1 deletion pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ var (
deleteByBranchIdAndXid = "delete from " + localTccLogPlaced + " where xid = ? and branch_id = ? "

// deleteByDateAndStatus The enum DeleteByDateAndStatus
deleteByDateAndStatus = "delete from " + localTccLogPlaced + " where gmt_modified < ? and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")"
deleteByDateAndStatus = "delete from " + localTccLogPlaced +
" where gmt_modified < ? and" +
" status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" +
" limit ?"
)

func GetInsertLocalTCCLogSQL(localTccTable string) string {
Expand Down
Loading