Skip to content

Commit

Permalink
make isPersisted and loc atomic matrixorigin#16953
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Jun 26, 2024
1 parent c30304c commit bcf0169
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9063,7 +9063,7 @@ func TestPersistTransferTable(t *testing.T) {
page.SetLocation(location)

time.Sleep(3 * time.Second)
assert.True(t, page.IsPersist())
assert.True(t, page.IsPersist() == 1)
for i := 0; i < 10; i++ {
id, ok := page.Transfer(uint32(i))
assert.True(t, ok)
Expand Down Expand Up @@ -9144,7 +9144,7 @@ func TestClearPersistTransferTable(t *testing.T) {
page.SetLocation(location)

time.Sleep(3 * time.Second)
assert.True(t, page.IsPersist())
assert.True(t, page.IsPersist() == 0)
for i := 0; i < 10; i++ {
_, ok := page.Transfer(uint32(i))
assert.False(t, ok)
Expand Down
57 changes: 21 additions & 36 deletions pkg/vm/engine/tae/model/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"sync"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -80,15 +81,14 @@ func WithDiskTTL(diskTTL time.Duration) Option {
}

type TransferHashPage struct {
latch sync.Mutex
common.RefHelper
bornTS time.Time
id *common.ID // not include blk offset
hashmap api.HashPageMap
loc objectio.Location
loc atomic.Pointer[objectio.Location]
params TransferHashPageParams
isTransient bool
isPersisted bool
isPersisted int32
}

func NewTransferHashPage(id *common.ID, ts time.Time, isTransient bool, opts ...Option) *TransferHashPage {
Expand All @@ -106,7 +106,7 @@ func NewTransferHashPage(id *common.ID, ts time.Time, isTransient bool, opts ...
hashmap: api.HashPageMap{M: make(map[uint32][]byte)},
params: params,
isTransient: isTransient,
isPersisted: false,
isPersisted: 0,
}
page.OnZeroCB = page.Close

Expand Down Expand Up @@ -152,20 +152,12 @@ func (page *TransferHashPage) Pin() *common.PinnedItem[*TransferHashPage] {
}

func (page *TransferHashPage) Train(from uint32, to types.Rowid) {
page.latch.Lock()
defer page.latch.Unlock()
page.hashmap.M[from] = to[:]
v2.TransferRowTotalCounter.Inc()
}

func (page *TransferHashPage) Transfer(from uint32) (dest types.Rowid, ok bool) {
page.latch.Lock()
if page.isPersisted && page.loc == nil {
logutil.Infof("[TransferHashPage] persist table is cleared")
page.latch.Unlock()
return types.Rowid{}, false
}
if page.isPersisted {
if atomic.LoadInt32(&page.isPersisted) == 1 {
diskStart := time.Now()
page.loadTable()
v2.TransferDiskHitCounter.Inc()
Expand All @@ -175,12 +167,13 @@ func (page *TransferHashPage) Transfer(from uint32) (dest types.Rowid, ok bool)
v2.TransferMemoryHitCounter.Inc()
}
v2.TransferTotalHitCounter.Inc()
page.latch.Unlock()

memstart := time.Now()
var m []byte
m, ok = page.hashmap.M[from]
dest = types.Rowid(m)
if ok {
dest = types.Rowid(m)
}
memduration := time.Since(memstart)
v2.TransferDurationMemoryGauge.Set(1000 * memduration.Seconds())

Expand All @@ -205,30 +198,26 @@ func (page *TransferHashPage) Unmarshal(data []byte) error {
}

func (page *TransferHashPage) SetLocation(location objectio.Location) {
page.latch.Lock()
defer page.latch.Unlock()
page.loc = location
page.loc.Store(&location)
}

func (page *TransferHashPage) clearTable() {
page.latch.Lock()
defer page.latch.Unlock()
logutil.Infof("[TransferHashPage] clear hash table")
clear(page.hashmap.M)
page.isPersisted = true
atomic.StoreInt32(&page.isPersisted, 1)
}

func (page *TransferHashPage) loadTable() {
logutil.Infof("[TransferHashPage] load persist table, objectname: %v", page.loc.Name().String())
if page.loc == nil {
logutil.Infof("[TransferHashPage] load persist table, objectname: %v", page.loc.Load().Name().String())
if page.loc.Load() == nil {
return
}

logutil.Infof("[TransferHashPage] loc %v, rd %v, fs %v", page.loc.Name().String(), RD, FS)
logutil.Infof("[TransferHashPage] loc %v, rd %v, fs %v", page.loc.Load().Name().String(), RD, FS)

var bat *batch.Batch
var release func()
bat, release, err := RD.LoadTableByBlock(page.loc, FS)
bat, release, err := RD.LoadTableByBlock(*page.loc.Load(), FS)
defer release()
if err != nil {
logutil.Errorf("[TransferHashPage] load table failed, %v", err)
Expand All @@ -241,27 +230,23 @@ func (page *TransferHashPage) loadTable() {

v2.TransferRowHitCounter.Add(float64(len(page.hashmap.M)))

page.isPersisted = false
atomic.StoreInt32(&page.isPersisted, 0)

cl := getCleaner()
cl.addPage(page)
}

func (page *TransferHashPage) clearPersistTable() {
page.latch.Lock()
defer page.latch.Unlock()
if page.loc == nil {
if atomic.LoadInt32(&page.isPersisted) == 0 {
return
}
logutil.Infof("[TransferHashPage] clear persist table, objectname: %v", page.loc.Name().String())
FS.Delete(context.Background(), page.loc.Name().String())
page.loc = nil
atomic.StoreInt32(&page.isPersisted, 0)
logutil.Infof("[TransferHashPage] clear persist table, objectname: %v", page.loc.Load().Name().String())
FS.Delete(context.Background(), page.loc.Load().Name().String())
}

func (page *TransferHashPage) IsPersist() bool {
page.latch.Lock()
defer page.latch.Unlock()
return page.isPersisted
func (page *TransferHashPage) IsPersist() int32 {
return atomic.LoadInt32(&page.isPersisted)
}

type TransferPageCleaner struct {
Expand Down

0 comments on commit bcf0169

Please sign in to comment.