Skip to content

Commit

Permalink
transfer table delete page matrixorigin#16953
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Jul 2, 2024
1 parent 73d6a59 commit 0cdb6a6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/model/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (page *TransferHashPage) Pin() *common.PinnedItem[*TransferHashPage] {
}
}

func (page *TransferHashPage) Clear() {
func (page *TransferHashPage) Clear(clearDisk bool) {
page.ClearTable()
if time.Now().After(page.bornTS.Add(page.params.DiskTTL)) {
if clearDisk {
page.ClearPersistTable()
}
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/vm/engine/tae/model/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ type PageT[T common.IRef] interface {
TTL() uint8 // 0 skip, 1 clear memory, 2 clear disk
ID() *common.ID
Length() int
Clear()
Clear(clearDisk bool)
}

type TransferTable[T PageT[T]] struct {
sync.RWMutex
pages map[common.ID]*common.PinnedItem[T]
pages map[common.ID]*common.PinnedItem[T]
deletedPages []*common.PinnedItem[T]
}

func NewTransferTable[T PageT[T]]() *TransferTable[T] {
Expand Down Expand Up @@ -63,7 +64,6 @@ func (table *TransferTable[T]) Len() int {

func (table *TransferTable[T]) prepareTTL() (mem, disk []*common.PinnedItem[T]) {
table.RLock()
defer table.RUnlock()
for _, page := range table.pages {
st := page.Item().TTL()
if st == 1 {
Expand All @@ -72,12 +72,19 @@ func (table *TransferTable[T]) prepareTTL() (mem, disk []*common.PinnedItem[T])
disk = append(disk, page)
}
}
for _, page := range table.deletedPages {
disk = append(disk, page)
}
table.RUnlock()
table.Lock()
defer table.Unlock()
table.deletedPages = make([]*common.PinnedItem[T], 0)
return
}

func (table *TransferTable[T]) executeTTL(mem, disk []*common.PinnedItem[T]) {
for _, page := range mem {
page.Val.Clear()
page.Val.Clear(false)
}

table.Lock()
Expand All @@ -86,7 +93,7 @@ func (table *TransferTable[T]) executeTTL(mem, disk []*common.PinnedItem[T]) {
}
table.Unlock()
for _, pinned := range disk {
pinned.Val.Clear()
pinned.Val.Clear(true)
pinned.Close()
}
}
Expand Down Expand Up @@ -124,6 +131,7 @@ func (table *TransferTable[T]) DeletePage(id *common.ID) (deleted bool) {
if _, deleted = table.pages[*id]; !deleted {
return
}
table.deletedPages = append(table.deletedPages, table.pages[*id])
delete(table.pages, *id)

// to pass ut
Expand Down

0 comments on commit 0cdb6a6

Please sign in to comment.