Skip to content

Commit

Permalink
transfer page gc matrixorigin#16953
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Jul 1, 2024
1 parent 0dd20e5 commit 6f3ddbf
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 125 deletions.
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
CNMergeSched: merge.NewTaskServiceGetter(opts.TaskServiceGetter),
}
fs := objectio.NewObjectFS(opts.Fs, serviceDir)
transferTable := model.NewTransferTable[*model.TransferHashPage](db.Opts.TransferTableTTL)
transferTable := model.NewTransferTable[*model.TransferHashPage]()

switch opts.LogStoreT {
case options.LogstoreBatchStore:
Expand Down
27 changes: 5 additions & 22 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9002,11 +9002,7 @@ func TestPersistTransferTable(t *testing.T) {
tae.BindSchema(schema)
testutil.CreateRelation(t, tae.DB, "db", schema, true)

ttl := time.Minute
table := model.NewTransferTable[*model.TransferHashPage](ttl)
defer table.Close()
sid := objectio.NewSegmentid()

id1 := common.ID{BlockID: *objectio.NewBlockid(sid, 1, 0)}
id2 := common.ID{BlockID: *objectio.NewBlockid(sid, 2, 0)}

Expand All @@ -9015,10 +9011,6 @@ func TestPersistTransferTable(t *testing.T) {
model.SetBlockRead(blockio.NewBlockRead())
}
model.FS = tae.Runtime.Fs.Service
model.Cleaner = &model.TransferPageCleaner{
Pages: make(chan *model.TransferPage, 1000000),
}
go model.Cleaner.Handler()
page := model.NewTransferHashPage(&id1, now, 10, false,
model.WithTTL(time.Second),
)
Expand All @@ -9028,6 +9020,7 @@ func TestPersistTransferTable(t *testing.T) {
page.Train(uint32(i), rowID)
ids[i] = rowID
}
tae.Runtime.TransferTable.AddPage(page)

name := objectio.BuildObjectName(objectio.NewSegmentid(), 0)
var writer *blockio.BlockWriter
Expand Down Expand Up @@ -9065,6 +9058,7 @@ func TestPersistTransferTable(t *testing.T) {
page.SetLocation(location)

time.Sleep(2 * time.Second)
tae.Runtime.TransferTable.RunTTL(time.Now())
assert.True(t, page.IsPersist() == 1)
for i := 0; i < 10; i++ {
id, ok := page.Transfer(uint32(i))
Expand All @@ -9074,8 +9068,6 @@ func TestPersistTransferTable(t *testing.T) {
}

func TestClearPersistTransferTable(t *testing.T) {
duration := time.Second
model.TestDuration.Store(&duration)
ctx := context.Background()
opts := config.WithQuickScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
Expand All @@ -9086,9 +9078,6 @@ func TestClearPersistTransferTable(t *testing.T) {
tae.BindSchema(schema)
testutil.CreateRelation(t, tae.DB, "db", schema, true)

ttl := time.Minute
table := model.NewTransferTable[*model.TransferHashPage](ttl)
defer table.Close()
sid := objectio.NewSegmentid()

id1 := common.ID{BlockID: *objectio.NewBlockid(sid, 1, 0)}
Expand All @@ -9099,10 +9088,6 @@ func TestClearPersistTransferTable(t *testing.T) {
model.SetBlockRead(blockio.NewBlockRead())
}
model.FS = tae.Runtime.Fs.Service
model.Cleaner = &model.TransferPageCleaner{
Pages: make(chan *model.TransferPage, 1000000),
}
go model.Cleaner.Handler()

page := model.NewTransferHashPage(&id1, now, 10, false,
model.WithTTL(time.Second),
Expand Down Expand Up @@ -9151,10 +9136,8 @@ func TestClearPersistTransferTable(t *testing.T) {

page.SetLocation(location)

tae.Runtime.TransferTable.RunTTL(time.Now())
time.Sleep(2 * time.Second)
assert.True(t, page.IsPersist() == 0)
for i := 0; i < 10; i++ {
_, ok := page.Transfer(uint32(i))
assert.False(t, ok)
}
_, err = tae.Runtime.TransferTable.Pin(*page.ID())
assert.Equal(t, err, moerr.GetOkExpectedEOB())
}
74 changes: 14 additions & 60 deletions pkg/vm/engine/tae/model/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ func NewTransferHashPage(id *common.ID, ts time.Time, pageSize int, isTransient
}
page.OnZeroCB = page.Close

cl := getCleaner()
cl.addPage(page)

return page
}

func (page *TransferHashPage) ID() *common.ID { return page.id }
func (page *TransferHashPage) BornTS() time.Time { return page.bornTS }

func (page *TransferHashPage) TTL(now time.Time, ttl time.Duration) bool {
if page.isTransient {
ttl /= 2
func (page *TransferHashPage) TTL() uint8 {
if time.Now().After(page.bornTS.Add(page.params.DiskTTL)) {
return 2
}
if time.Now().After(page.bornTS.Add(page.params.TTL)) {
return 1
}
return now.After(page.bornTS.Add(ttl))
return 0
}

func (page *TransferHashPage) Close() {
Expand Down Expand Up @@ -152,13 +152,11 @@ func (page *TransferHashPage) Pin() *common.PinnedItem[*TransferHashPage] {
}
}

func (page *TransferHashPage) Clean() bool {
if time.Since(page.bornTS) > page.params.DiskTTL {
return false
func (page *TransferHashPage) Clear() {
page.ClearTable()
if time.Now().After(page.bornTS.Add(page.params.DiskTTL)) {
page.ClearPersistTable()
}

page.clearPersistTable()
return true
}

func (page *TransferHashPage) Train(from uint32, to types.Rowid) {
Expand Down Expand Up @@ -212,7 +210,7 @@ func (page *TransferHashPage) SetLocation(location objectio.Location) {
page.loc.Store(&location)
}

func (page *TransferHashPage) clearTable() {
func (page *TransferHashPage) ClearTable() {
if atomic.LoadInt32(&page.isPersisted) == 1 {
return
}
Expand Down Expand Up @@ -244,59 +242,15 @@ func (page *TransferHashPage) loadTable() {
v2.TaskMergeTransferPageSizeGauge.Add(float64(page.Length()))

atomic.StoreInt32(&page.isPersisted, 0)

cl := getCleaner()
cl.addPage(page)
}

func (page *TransferHashPage) clearPersistTable() {
if atomic.LoadInt32(&page.isPersisted) == 0 {
func (page *TransferHashPage) ClearPersistTable() {
if page.loc.Load() == nil {
return
}
atomic.StoreInt32(&page.isPersisted, 0)
FS.Delete(context.Background(), page.loc.Load().Name().String())
}

func (page *TransferHashPage) IsPersist() int32 {
return atomic.LoadInt32(&page.isPersisted)
}

type TransferPageCleaner struct {
Pages chan *TransferPage
}

type TransferPage struct {
page *TransferHashPage
ts time.Time
}

var (
Cleaner *TransferPageCleaner
once sync.Once
)

func getCleaner() *TransferPageCleaner {
once.Do(func() {
Cleaner = &TransferPageCleaner{
Pages: make(chan *TransferPage, 1000000),
}
go Cleaner.Handler()
})
return Cleaner
}

func (c *TransferPageCleaner) addPage(page *TransferHashPage) {
c.Pages <- &TransferPage{page: page, ts: time.Now()}
}

// Handler clean up the hash table in memory
func (c *TransferPageCleaner) Handler() {
for {
page := <-c.Pages
v2.TransferPagesInChannelHistogram.Observe(float64(len(c.Pages)))
if time.Since(page.ts) < page.page.params.TTL {
time.Sleep(page.page.params.TTL - time.Since(page.ts))
}
page.page.clearTable()
}
}
61 changes: 19 additions & 42 deletions pkg/vm/engine/tae/model/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package model
import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"sync"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -27,24 +26,21 @@ import (
type PageT[T common.IRef] interface {
common.IRef
Pin() *common.PinnedItem[T]
TTL(time.Time, time.Duration) bool
TTL() uint8 // 0 skip, 1 clear memory, 2 clear disk
ID() *common.ID
Length() int
Clean() bool
Clear()
}

type TransferTable[T PageT[T]] struct {
sync.RWMutex
ttl time.Duration
pages map[common.ID]*common.PinnedItem[T]
}

func NewTransferTable[T PageT[T]](ttl time.Duration) *TransferTable[T] {
func NewTransferTable[T PageT[T]]() *TransferTable[T] {
table := &TransferTable[T]{
ttl: ttl,
pages: make(map[common.ID]*common.PinnedItem[T]),
}
go table.Clean()
return table
}

Expand All @@ -59,44 +55,46 @@ func (table *TransferTable[T]) Pin(id common.ID) (pinned *common.PinnedItem[T],
}
return
}

func (table *TransferTable[T]) Len() int {
table.RLock()
defer table.RUnlock()
return len(table.pages)
}
func (table *TransferTable[T]) prepareTTL(now time.Time) (items []*common.PinnedItem[T]) {

func (table *TransferTable[T]) prepareTTL() (mem, disk []*common.PinnedItem[T]) {
table.RLock()
defer table.RUnlock()
for _, page := range table.pages {
if page.Item().TTL(now, table.ttl) {
items = append(items, page)
st := page.Item().TTL()
if st == 1 {
mem = append(mem, page)
} else if st == 2 {
disk = append(disk, page)
}
}
return
}

func (table *TransferTable[T]) executeTTL(items []*common.PinnedItem[T]) {
if len(items) == 0 {
return
func (table *TransferTable[T]) executeTTL(mem, disk []*common.PinnedItem[T]) {
for _, page := range mem {
page.Val.Clear()
}

cnt := 0

table.Lock()
for _, pinned := range items {
cnt += pinned.Val.Length()
for _, pinned := range disk {
delete(table.pages, *pinned.Item().ID())
}
table.Unlock()
for _, pinned := range items {
for _, pinned := range disk {
pinned.Val.Clear()
pinned.Close()
}

}

func (table *TransferTable[T]) RunTTL(now time.Time) {
items := table.prepareTTL(now)
table.executeTTL(items)
mem, disk := table.prepareTTL()
table.executeTTL(mem, disk)
}

func (table *TransferTable[T]) AddPage(page T) (dup bool) {
Expand Down Expand Up @@ -143,24 +141,3 @@ func (table *TransferTable[T]) Close() {
}
table.pages = make(map[common.ID]*common.PinnedItem[T])
}

var (
duration = 10 * time.Minute
TestDuration atomic.Pointer[time.Duration]
)

// Clean Clear the hash table on disk regularly
func (table *TransferTable[T]) Clean() {
for {
table.RLock()
for _, item := range table.pages {
item.Item().Clean()
}
table.RUnlock()
if TestDuration.Load() != nil {
time.Sleep(*TestDuration.Load())
} else {
time.Sleep(duration)
}
}
}
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/options/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type StorageCfg struct {

type CheckpointCfg struct {
FlushInterval time.Duration `toml:"flush-inerterval"`
TransferInterval time.Duration `toml:"transfer-interval"`
MetadataCheckInterval time.Duration `toml:"metadata-check-inerterval"`
MinCount int64 `toml:"checkpoint-min-count"`
ScanInterval time.Duration `toml:"scan-interval"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/engine/tae/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (o *Options) FillDefaults(dirname string) *Options {
if o.CheckpointCfg.FlushInterval <= 0 {
o.CheckpointCfg.FlushInterval = DefaultCheckpointFlushInterval
}
if o.CheckpointCfg.TransferInterval <= 0 {
o.CheckpointCfg.TransferInterval = DefaultCheckpointTransferInterval
}
if o.CheckpointCfg.IncrementalInterval <= 0 {
o.CheckpointCfg.IncrementalInterval = DefaultCheckpointIncremetalInterval
}
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/options/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (

DefaultScannerInterval = time.Second * 5
DefaultCheckpointFlushInterval = time.Minute
DefaultCheckpointTransferInterval = time.Second * 5
DefaultCheckpointMinCount = int64(100)
DefaultCheckpointIncremetalInterval = time.Minute
DefaultCheckpointGlobalMinCount = 10
Expand Down

0 comments on commit 6f3ddbf

Please sign in to comment.