diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index 4ff74afcbe7e..e949049a058a 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -17,7 +17,6 @@ package blockio import ( "context" "math" - "sync" "time" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -31,6 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/timestamp" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" "go.uber.org/zap" ) @@ -675,21 +675,13 @@ func FindIntervalForBlock(rowids []types.Rowid, id *types.Blockid) (start int, e return } -type BlockReadImpl struct{} +func init() { + model.RD = BlockReadImpl{} +} -var ( - instance *BlockReadImpl - once sync.Once -) +type BlockReadImpl struct{} -func (blk *BlockReadImpl) LoadTableByBlock(loc objectio.Location, fs fileservice.FileService) (bat *batch.Batch, release func(), err error) { +func (blk BlockReadImpl) LoadTableByBlock(loc objectio.Location, fs fileservice.FileService) (bat *batch.Batch, release func(), err error) { bat, release, err = LoadTombstoneColumns(context.Background(), []uint16{0}, nil, fs, loc, nil) return bat, release, err } - -func NewBlockRead() *BlockReadImpl { - once.Do(func() { - instance = &BlockReadImpl{} - }) - return instance -} diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 34760333f1c3..7554691b6d42 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -19,7 +19,6 @@ import ( "context" "errors" "fmt" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" "math/rand" "reflect" "strings" @@ -28,6 +27,8 @@ import ( "testing" "time" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" + "github.com/matrixorigin/matrixone/pkg/fileservice" "sort" @@ -9009,7 +9010,7 @@ func TestPersistTransferTable(t *testing.T) { now := time.Now() if model.RD == nil { - model.SetBlockRead(blockio.NewBlockRead()) + panic("boom") } model.FS = tae.Runtime.Fs.Service page := model.NewTransferHashPage(&id1, now, 10, false, @@ -9086,7 +9087,7 @@ func TestClearPersistTransferTable(t *testing.T) { now := time.Now() if model.RD == nil { - model.SetBlockRead(blockio.NewBlockRead()) + panic("boom") } model.FS = tae.Runtime.Fs.Service diff --git a/pkg/vm/engine/tae/model/pages.go b/pkg/vm/engine/tae/model/pages.go index e2e0e6d59d48..2303b927839a 100644 --- a/pkg/vm/engine/tae/model/pages.go +++ b/pkg/vm/engine/tae/model/pages.go @@ -18,18 +18,18 @@ import ( "bytes" "context" "fmt" - "github.com/gogo/protobuf/proto" - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/pb/api" - v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "sync" "sync/atomic" "time" + "github.com/gogo/protobuf/proto" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/api" + v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" ) @@ -43,16 +43,9 @@ type BlockRead interface { var ( RD BlockRead FS fileservice.FileService - rdOnce sync.Once fsOnce sync.Once ) -func SetBlockRead(rd BlockRead) { - rdOnce.Do(func() { - RD = rd - }) -} - func SetFileService(fs fileservice.FileService) { fsOnce.Do(func() { FS = fs diff --git a/pkg/vm/engine/tae/tables/txnentries/flushTableTail.go b/pkg/vm/engine/tae/tables/txnentries/flushTableTail.go index 12d455b97d5f..6b9adf71c3fd 100644 --- a/pkg/vm/engine/tae/tables/txnentries/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/txnentries/flushTableTail.go @@ -18,17 +18,17 @@ import ( "bytes" "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "io" "time" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -138,7 +138,6 @@ func (entry *flushTableTailEntry) addTransferPages() { } id := entry.ablksHandles[i].Fingerprint() entry.pageIds = append(entry.pageIds, id) - model.SetBlockRead(blockio.NewBlockRead()) model.SetFileService(entry.rt.Fs.Service) page := model.NewTransferHashPage(id, time.Now(), len(m), isTransient) for srcRow, dst := range m { diff --git a/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go b/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go index bc113d1c5b72..93f62eca0ed4 100644 --- a/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go +++ b/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go @@ -17,18 +17,18 @@ package txnentries import ( "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "math" "sync" "time" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" @@ -112,7 +112,6 @@ func (entry *mergeObjectsEntry) prepareTransferPage() { isTransient := !tblEntry.GetLastestSchema().HasPK() id := obj.AsCommonID() id.SetBlockOffset(uint16(j)) - model.SetBlockRead(blockio.NewBlockRead()) model.SetFileService(entry.rt.Fs.Service) page := model.NewTransferHashPage(id, time.Now(), len(m), isTransient) for srcRow, dst := range m {