Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/issue_16953' into issue_16953
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Jul 2, 2024
2 parents 0cdb6a6 + a3482c8 commit 4314192
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 36 deletions.
20 changes: 6 additions & 14 deletions pkg/vm/engine/tae/blockio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package blockio
import (
"context"
"math"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -675,21 +675,13 @@ func FindIntervalForBlock(rowids []types.Rowid, id *types.Blockid) (start int, e
return
}

type BlockReadImpl struct{}
func init() {
model.RD = BlockReadImpl{}
}

var (
instance *BlockReadImpl
once sync.Once
)
type BlockReadImpl struct{}

func (blk *BlockReadImpl) LoadTableByBlock(loc objectio.Location, fs fileservice.FileService) (bat *batch.Batch, release func(), err error) {
func (blk BlockReadImpl) LoadTableByBlock(loc objectio.Location, fs fileservice.FileService) (bat *batch.Batch, release func(), err error) {
bat, release, err = LoadTombstoneColumns(context.Background(), []uint16{0}, nil, fs, loc, nil)
return bat, release, err
}

func NewBlockRead() *BlockReadImpl {
once.Do(func() {
instance = &BlockReadImpl{}
})
return instance
}
7 changes: 4 additions & 3 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"errors"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"math/rand"
"reflect"
"strings"
Expand All @@ -28,6 +27,8 @@ import (
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"

"github.com/matrixorigin/matrixone/pkg/fileservice"

"sort"
Expand Down Expand Up @@ -9009,7 +9010,7 @@ func TestPersistTransferTable(t *testing.T) {

now := time.Now()
if model.RD == nil {
model.SetBlockRead(blockio.NewBlockRead())
panic("boom")
}
model.FS = tae.Runtime.Fs.Service
page := model.NewTransferHashPage(&id1, now, 10, false,
Expand Down Expand Up @@ -9086,7 +9087,7 @@ func TestClearPersistTransferTable(t *testing.T) {

now := time.Now()
if model.RD == nil {
model.SetBlockRead(blockio.NewBlockRead())
panic("boom")
}
model.FS = tae.Runtime.Fs.Service

Expand Down
19 changes: 6 additions & 13 deletions pkg/vm/engine/tae/model/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"bytes"
"context"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
)

Expand All @@ -43,16 +43,9 @@ type BlockRead interface {
var (
RD BlockRead
FS fileservice.FileService
rdOnce sync.Once
fsOnce sync.Once
)

func SetBlockRead(rd BlockRead) {
rdOnce.Do(func() {
RD = rd
})
}

func SetFileService(fs fileservice.FileService) {
fsOnce.Do(func() {
FS = fs
Expand Down
5 changes: 2 additions & 3 deletions pkg/vm/engine/tae/tables/txnentries/flushTableTail.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import (
"bytes"
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"io"
"time"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
Expand Down Expand Up @@ -138,7 +138,6 @@ func (entry *flushTableTailEntry) addTransferPages() {
}
id := entry.ablksHandles[i].Fingerprint()
entry.pageIds = append(entry.pageIds, id)
model.SetBlockRead(blockio.NewBlockRead())
model.SetFileService(entry.rt.Fs.Service)
page := model.NewTransferHashPage(id, time.Now(), len(m), isTransient)
for srcRow, dst := range m {
Expand Down
5 changes: 2 additions & 3 deletions pkg/vm/engine/tae/tables/txnentries/mergeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ package txnentries
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"math"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
Expand Down Expand Up @@ -112,7 +112,6 @@ func (entry *mergeObjectsEntry) prepareTransferPage() {
isTransient := !tblEntry.GetLastestSchema().HasPK()
id := obj.AsCommonID()
id.SetBlockOffset(uint16(j))
model.SetBlockRead(blockio.NewBlockRead())
model.SetFileService(entry.rt.Fs.Service)
page := model.NewTransferHashPage(id, time.Now(), len(m), isTransient)
for srcRow, dst := range m {
Expand Down

0 comments on commit 4314192

Please sign in to comment.