Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local fs #5

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added 018f236b-5193-7ffd-8d48-6f11b26cfe51_00000
Binary file not shown.
248 changes: 248 additions & 0 deletions cmd/object-read/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package main

import (
"context"
"fmt"
"strings"
"time"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
)

func TestDecode3() (err error) {
ctx := context.Background()
name := "018f236b-5193-7ffd-8d48-6f11b26cfe51_00000"
fsDir := "/data2/hanfeng/matrixone"
mp, _ := mpool.NewMPool("test", 0, mpool.NoFixed)
memCache := toml.ByteSize(1 << 20)
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: fsDir,
Cache: fileservice.CacheConfig{
MemoryCapacity: &memCache,
},
}

service, err := fileservice.NewFileService(ctx, c, nil)
if err != nil {
return
}

uid, _ := types.ParseUuid("018f236b-5193-7ffd-8d48-6f11b26cfe51")
sid := objectio.Segmentid(uid)
objectName := objectio.BuildObjectName(&sid, 0)

reader, err := blockio.NewFileReader(service, name)
if err != nil {
return
}

objmeta, err := reader.GetObjectReader().ReadAllMeta(ctx, mp)
if err != nil {
return
}

meta := objmeta.MustDataMeta()
stat := fmt.Sprintf("%v blks, %v rows", meta.BlockCount(), meta.BlockHeader().Rows())
extend := reader.GetObjectReader().GetMetaExtent()
rows := int(meta.BlockHeader().Rows())
infos := make([]objectio.BlockInfo, 0, meta.BlockCount())
for i := 0; i < int(meta.BlockCount()); i++ {
blockid := objectio.BuildObjectBlockid(objectName, uint16(i))
brows := 8192
if rows < 8192 {
brows = rows
}
rows -= 8192
loc := objectio.BuildLocation(objectName, *extend, uint32(brows), uint16(i))
info := objectio.BlockInfo{
BlockID: *blockid,
SegmentID: objectName.SegmentId(),
EntryState: false,
Sorted: true,
MetaLoc: objectio.ObjectLocation(loc),
}
infos = append(infos, info)
}
typs := []types.Type{
types.T_uuid.ToType(),
types.T_uuid.ToType(),
types.T_uuid.ToType(),
types.T_varchar.ToType(),
types.T_int64.ToType(),
types.T_varchar.ToType(), // user
types.T_varchar.ToType(), // host
types.T_varchar.ToType(), // database
types.T_text.ToType(), // stmt
types.T_text.ToType(), // stmt tag
types.T_text.ToType(), // stmt fg
types.T_uuid.ToType(),
types.T_varchar.ToType(), // nodetype
types.T_datetime.ToTypeWithScale(6), // request-at
types.T_datetime.ToTypeWithScale(6), //
types.T_uint64.ToType(),
types.T_varchar.ToType(),
types.T_text.ToType(), // error
types.T_text.ToType(), // exec-plan
types.T_int64.ToType(),
types.T_int64.ToType(),
types.T_text.ToType(),
types.T_varchar.ToType(),
types.T_varchar.ToType(),
types.T_text.ToType(), // source type
types.T_int64.ToType(),
types.T_int64.ToType(), // result-cnt
}
cols := []uint16{}
for i := 0; i < 27; i++ {
// if i == 18 {
// continue
// }
cols = append(cols, uint16(i))
}
// logutil.Infof("cols %v len len ", cols, len(typs), len(cols))

read := func() {
batches := []*batch.Batch{}
inst := time.Now()
for _, info := range infos {
ins := time.Now()
var bat *batch.Batch
bat, err = blockio.BlockRead(
ctx,
&info,
nil,
cols,
typs,
types.BuildTS(1714286188916223690, 0).ToTimestamp(),
nil,
nil,
nil,
service,
mp,
nil,
fileservice.Policy(0))
if err != nil {
return
}
fmt.Printf(" read blk batch cost %v\n----\n", time.Since(ins))

Check failure on line 138 in cmd/object-read/main.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

unsafe log fmt.Printf: in github.com/matrixorigin/matrixone/cmd/object-read.TestDecode3()
batches = append(batches, bat)
}
logutil.Infof("%s: read %v batch cost %v", stat, len(batches), time.Since(inst))
}

read()

return
}

func TestNewObjectReader2() {
ctx := context.Background()
name := "018e0838-be27-72c5-a166-aa6f120859ad_00000"

fsDir := "/Users/aptend/code/matrixone"
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: fsDir,
}
service, err := fileservice.NewFileService(ctx, c, nil)
reader, err := blockio.NewFileReader(service, name)
if err != nil {
return
}
// 这里需要 load meta 来看具体的信息
bats, clear, err := reader.LoadAllColumns(ctx, []uint16{0, 1, 13}, common.DefaultAllocator)
defer clear()
if err != nil {
logutil.Infof("load all columns failed: %v", err)
return
}
/*name1, err := EncodeNameFromString(reader.GetName())
assert.Nil(t, err)
location := objectio.BuildLocation(name1, *reader.GetObjectReader().GetMetaExtent(), 51, 1)
_, err = blockio.LoadTombstoneColumns(context.Background(), []uint16{0}, nil, service, location, nil)*/
//applyDelete(bats[0], bb)
// zm, err := reader.LoadZoneMaps(ctx, []uint16{0, 1, objectio.SEQNUM_COMMITTS}, 0, nil)
// logutil.Infof("zm is %v-%v", zm[0].GetMax(), zm[0].GetMin())
// bf, w, _ := reader.LoadOneBF(ctx, 0)
// logutil.Infof("bf is %v, w is %v, err is %v", bf.String(), w, err)
ts := types.TS{}
for _, bat := range bats {
for _, vec := range bat.Vecs {
logutil.Infof("vec is %v", vec.GetType())
}
for i := 2600; i < bat.Vecs[0].Length(); i++ {
//ts.Unmarshal(bats[0].Vecs[1].GetRawBytesAt(i))
num := types.DecodeInt32(bat.Vecs[0].GetRawBytesAt(i))
num1 := types.DecodeInt32(bat.Vecs[1].GetRawBytesAt(i))
ts.Unmarshal(bat.Vecs[2].GetRawBytesAt(i))
logutil.Infof("line %v: num is %d-%d, cmmit is %v", i, num, num1, ts.ToString())
}
//logutil.Infof("bats[0].Vecs[1].String() is %v", bat.Vecs[2].String())
}
}

func TestNewObjectReader1() {
ctx := context.Background()
name := "018e3c0c-6ead-775a-8e65-faef3d690efa_00000"

fsDir := "/Users/aptend/code/matrixone"
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: fsDir,
}
service, err := fileservice.NewFileService(ctx, c, nil)
if err != nil {
return
}
reader, err := blockio.NewFileReader(service, name)
if err != nil {
return
}
// dedicated deltaloc 读取 rowid, ts, pk
bats, clear, err := reader.LoadDeleteAllColumns(ctx, []uint16{0, 1, 2}, common.DefaultAllocator)
defer clear()
if err != nil {
logutil.Infof("load all columns failed: %v", err)
return
}

for _, bat := range bats {
for _, vec := range bat.Vecs {
logutil.Infof("vec is %v, len %v", vec.GetType(), vec.Length())
}
for i := 0; i < bat.Vecs[0].Length(); i++ {
// for i := 0; i < 10; i++ {
rowid := types.Rowid(bat.Vecs[0].GetRawBytesAt(i))
committs := types.TS(bat.Vecs[1].GetRawBytesAt(i))
t, _, _, _ := types.DecodeTuple(bat.Vecs[2].GetRawBytesAt(i))
pkstr := t.ErrString(nil)
if strings.HasPrefix(rowid.String(), "018e3c0c-3ed7-737b-befe-dbe524aac512-0-2") {
fmt.Printf("line %v: rowid %v, ts %v, pkstr is %v\n", i, rowid.String(), committs.ToString(), pkstr)

Check failure on line 233 in cmd/object-read/main.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

unsafe log fmt.Printf: in github.com/matrixorigin/matrixone/cmd/object-read.TestNewObjectReader1()
}
// logutil.Infof("line %v: num is %v-%v, pkstr is %v", i, rowid.String(), committs.ToString(), pkstr)

}
}
}

func main() {
// TestNewObjectReader2()
// TestNewObjectReader1()
err := TestDecode3()
if err != nil {
fmt.Println(err)

Check failure on line 246 in cmd/object-read/main.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

unsafe log fmt.Println: in github.com/matrixorigin/matrixone/cmd/object-read.main()
}
}
20 changes: 20 additions & 0 deletions cmd/object-read/z.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
def sort_lines_by_ts(filename):
with open(filename, 'r') as file:
lines = file.readlines()

# Split each line into components and convert the ts value to an integer
lines = [line.split(", ") for line in lines]
for line in lines:
line[1] = line[1].split(" ")
line[1][1] = int(line[1][1].split("-")[0])

# Sort the lines by the ts value
lines = sorted(lines, key=lambda line: line[1][1])

# Join the components back into a single string
lines = [", ".join([part[0] + " " + str(part[1]) if isinstance(part, list) else part for part in line]) for line in lines]

with open(filename, 'w') as file:
file.write("\n".join(lines))

sort_lines_by_ts('lines')
11 changes: 8 additions & 3 deletions pkg/fileservice/file_with_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewFileWithChecksum[T FileLike](
return &FileWithChecksum[T]{
ctx: ctx,
underlying: underlying,
blockSize: blockContentSize + _ChecksumSize,
blockSize: blockContentSize,
blockContentSize: blockContentSize,
perfCounterSets: perfCounterSets,
}
Expand Down Expand Up @@ -93,6 +93,11 @@ var emptyFileWithChecksumOSFile FileWithChecksum[*os.File]
var _ FileLike = new(FileWithChecksum[*os.File])

func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error) {
n, err = f.underlying.ReadAt(buf, offset)
if err != nil && err != io.EOF {
return 0, err
}
return
defer func() {
perfcounter.Update(f.ctx, func(c *perfcounter.CounterSet) {
c.FileService.FileWithChecksum.Read.Add(int64(n))
Expand Down Expand Up @@ -209,8 +214,8 @@ func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error) {
return 0, err
}

nBlock := ceilingDiv(fileSize, int64(f.blockSize))
contentSize := fileSize - _ChecksumSize*nBlock
// nBlock := ceilingDiv(fileSize, int64(f.blockSize))
contentSize := fileSize // - _ChecksumSize*nBlock

switch whence {
case io.SeekStart:
Expand Down
32 changes: 26 additions & 6 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
Expand Down Expand Up @@ -426,6 +427,18 @@
}

func (l *LocalFS) read(ctx context.Context, vector *IOVector, bytesCounter *atomic.Int64) (err error) {
inst1 := time.Now()
var cost, exec_plan_cost time.Duration
defer func() {
total := time.Since(inst1)
fmt.Printf(

Check failure on line 434 in pkg/fileservice/local_fs.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

unsafe log fmt.Printf: in github.com/matrixorigin/matrixone/pkg/fileservice.LocalFS.read()
"underlying read cost %v, decompress %.2f%%(%v where exec_plan took %.2f%%)\n",
total,
float64(cost)/float64(total)*100,
cost,
float64(exec_plan_cost)/float64(cost)*100,
)
}()
if vector.allDone() {
// all cache hit
return nil
Expand Down Expand Up @@ -596,9 +609,16 @@
}
}

inst := time.Now()
if err = entry.setCachedData(); err != nil {
return err
}
setCachedCost := time.Since(inst)
if i == 18 {
exec_plan_cost = setCachedCost
}
cost += setCachedCost
// logutil.Infof("tocachedata %d cost %v", i, setCachedCost)

vector.Entries[i] = entry

Expand Down Expand Up @@ -650,8 +670,8 @@
return nil, err
}
fileSize := info.Size()
nBlock := ceilingDiv(fileSize, _BlockSize)
contentSize := fileSize - _ChecksumSize*nBlock
// nBlock := ceilingDiv(fileSize, _BlockSize)
// contentSize := fileSize - _ChecksumSize*nBlock

isDir, err := entryIsDir(nativePath, name, info)
if err != nil {
Expand All @@ -660,7 +680,7 @@
ret = append(ret, DirEntry{
Name: name,
IsDir: isDir,
Size: contentSize,
Size: fileSize,
})
}

Expand Down Expand Up @@ -702,13 +722,13 @@
}

fileSize := stat.Size()
nBlock := ceilingDiv(fileSize, _BlockSize)
contentSize := fileSize - _ChecksumSize*nBlock
// nBlock := ceilingDiv(fileSize, _BlockSize)
// contentSize := fileSize - _ChecksumSize*nBlock

return &DirEntry{
Name: pathpkg.Base(filePath),
IsDir: false,
Size: contentSize,
Size: fileSize,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/objectio/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ func ReadAllBlocksWithMeta(
for blk := uint32(0); blk < meta.BlockCount(); blk++ {
for _, seqnum := range cols {
blkmeta := meta.GetBlockMeta(blk)
if seqnum > blkmeta.GetMaxSeqnum() || blkmeta.ColumnMeta(seqnum).DataType() == 0 {
if blkmeta.ColumnMeta(seqnum).DataType() == 0 {
// prefetch, do not generate
logutil.Infof("inputseq:%v maxnormalseq:%v colcnt:%v", seqnum, blkmeta.GetMaxSeqnum(), blkmeta.GetColumnCount())
panic("ReadAllBlocksWithMeta expect no schema changes")
}
col := blkmeta.ColumnMeta(seqnum)
Expand Down
Loading
Loading