From bcf0169a02c58167b1665fa1d35569863f308b2d Mon Sep 17 00:00:00 2001 From: Wenbin1002 Date: Wed, 26 Jun 2024 11:54:26 +0800 Subject: [PATCH] make isPersisted and loc atomic #16953 --- pkg/vm/engine/tae/db/test/db_test.go | 4 +- pkg/vm/engine/tae/model/pages.go | 57 ++++++++++------------------ 2 files changed, 23 insertions(+), 38 deletions(-) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index f63f3cfaf119..6ca01ebd67ab 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -9063,7 +9063,7 @@ func TestPersistTransferTable(t *testing.T) { page.SetLocation(location) time.Sleep(3 * time.Second) - assert.True(t, page.IsPersist()) + assert.True(t, page.IsPersist() == 1) for i := 0; i < 10; i++ { id, ok := page.Transfer(uint32(i)) assert.True(t, ok) @@ -9144,7 +9144,7 @@ func TestClearPersistTransferTable(t *testing.T) { page.SetLocation(location) time.Sleep(3 * time.Second) - assert.True(t, page.IsPersist()) + assert.True(t, page.IsPersist() == 0) for i := 0; i < 10; i++ { _, ok := page.Transfer(uint32(i)) assert.False(t, ok) diff --git a/pkg/vm/engine/tae/model/pages.go b/pkg/vm/engine/tae/model/pages.go index 38f25a750e59..a08cfef84586 100644 --- a/pkg/vm/engine/tae/model/pages.go +++ b/pkg/vm/engine/tae/model/pages.go @@ -26,6 +26,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/api" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "sync" + "sync/atomic" "time" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -80,15 +81,14 @@ func WithDiskTTL(diskTTL time.Duration) Option { } type TransferHashPage struct { - latch sync.Mutex common.RefHelper bornTS time.Time id *common.ID // not include blk offset hashmap api.HashPageMap - loc objectio.Location + loc atomic.Pointer[objectio.Location] params TransferHashPageParams isTransient bool - isPersisted bool + isPersisted int32 } func NewTransferHashPage(id *common.ID, ts time.Time, isTransient bool, opts ...Option) *TransferHashPage { @@ -106,7 +106,7 @@ func NewTransferHashPage(id *common.ID, ts time.Time, isTransient bool, opts ... hashmap: api.HashPageMap{M: make(map[uint32][]byte)}, params: params, isTransient: isTransient, - isPersisted: false, + isPersisted: 0, } page.OnZeroCB = page.Close @@ -152,20 +152,12 @@ func (page *TransferHashPage) Pin() *common.PinnedItem[*TransferHashPage] { } func (page *TransferHashPage) Train(from uint32, to types.Rowid) { - page.latch.Lock() - defer page.latch.Unlock() page.hashmap.M[from] = to[:] v2.TransferRowTotalCounter.Inc() } func (page *TransferHashPage) Transfer(from uint32) (dest types.Rowid, ok bool) { - page.latch.Lock() - if page.isPersisted && page.loc == nil { - logutil.Infof("[TransferHashPage] persist table is cleared") - page.latch.Unlock() - return types.Rowid{}, false - } - if page.isPersisted { + if atomic.LoadInt32(&page.isPersisted) == 1 { diskStart := time.Now() page.loadTable() v2.TransferDiskHitCounter.Inc() @@ -175,12 +167,13 @@ func (page *TransferHashPage) Transfer(from uint32) (dest types.Rowid, ok bool) v2.TransferMemoryHitCounter.Inc() } v2.TransferTotalHitCounter.Inc() - page.latch.Unlock() memstart := time.Now() var m []byte m, ok = page.hashmap.M[from] - dest = types.Rowid(m) + if ok { + dest = types.Rowid(m) + } memduration := time.Since(memstart) v2.TransferDurationMemoryGauge.Set(1000 * memduration.Seconds()) @@ -205,30 +198,26 @@ func (page *TransferHashPage) Unmarshal(data []byte) error { } func (page *TransferHashPage) SetLocation(location objectio.Location) { - page.latch.Lock() - defer page.latch.Unlock() - page.loc = location + page.loc.Store(&location) } func (page *TransferHashPage) clearTable() { - page.latch.Lock() - defer page.latch.Unlock() logutil.Infof("[TransferHashPage] clear hash table") clear(page.hashmap.M) - page.isPersisted = true + atomic.StoreInt32(&page.isPersisted, 1) } func (page *TransferHashPage) loadTable() { - logutil.Infof("[TransferHashPage] load persist table, objectname: %v", page.loc.Name().String()) - if page.loc == nil { + logutil.Infof("[TransferHashPage] load persist table, objectname: %v", page.loc.Load().Name().String()) + if page.loc.Load() == nil { return } - logutil.Infof("[TransferHashPage] loc %v, rd %v, fs %v", page.loc.Name().String(), RD, FS) + logutil.Infof("[TransferHashPage] loc %v, rd %v, fs %v", page.loc.Load().Name().String(), RD, FS) var bat *batch.Batch var release func() - bat, release, err := RD.LoadTableByBlock(page.loc, FS) + bat, release, err := RD.LoadTableByBlock(*page.loc.Load(), FS) defer release() if err != nil { logutil.Errorf("[TransferHashPage] load table failed, %v", err) @@ -241,27 +230,23 @@ func (page *TransferHashPage) loadTable() { v2.TransferRowHitCounter.Add(float64(len(page.hashmap.M))) - page.isPersisted = false + atomic.StoreInt32(&page.isPersisted, 0) cl := getCleaner() cl.addPage(page) } func (page *TransferHashPage) clearPersistTable() { - page.latch.Lock() - defer page.latch.Unlock() - if page.loc == nil { + if atomic.LoadInt32(&page.isPersisted) == 0 { return } - logutil.Infof("[TransferHashPage] clear persist table, objectname: %v", page.loc.Name().String()) - FS.Delete(context.Background(), page.loc.Name().String()) - page.loc = nil + atomic.StoreInt32(&page.isPersisted, 0) + logutil.Infof("[TransferHashPage] clear persist table, objectname: %v", page.loc.Load().Name().String()) + FS.Delete(context.Background(), page.loc.Load().Name().String()) } -func (page *TransferHashPage) IsPersist() bool { - page.latch.Lock() - defer page.latch.Unlock() - return page.isPersisted +func (page *TransferHashPage) IsPersist() int32 { + return atomic.LoadInt32(&page.isPersisted) } type TransferPageCleaner struct {