Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Sep 14, 2024
1 parent af1839a commit 632b895
Showing 1 changed file with 217 additions and 126 deletions.
343 changes: 217 additions & 126 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -446,154 +447,244 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
}
txn.hasS3Op.Store(true)

dump := func(typ int) error {
deleteCnt := 0
mp := make(map[tableKey][]*batch.Batch)
lastTxnWritesIndex := offset
write := txn.writes
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
if err := txn.dumpInsertBatchLocked(offset, &size, &pkCount); err != nil {
return err
}

if dumpAll {
if txn.approximateInMemDeleteCnt >= txn.engine.insertEntryMaxCount {
if err := txn.dumpDeleteBatchLocked(offset); err != nil {
return err
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
txn.approximateInMemDeleteCnt = 0
txn.workspaceSize = 0
txn.pkCount -= pkCount
// modifies txn.writes.
writes := txn.writes[:0]
for i, write := range txn.writes {
if write.bat != nil {
writes = append(writes, txn.writes[i])
}
}
txn.writes = writes
} else {
txn.workspaceSize -= size
txn.pkCount -= pkCount
}
return nil
}

keepElement := true
if txn.writes[i].typ == typ && txn.writes[i].fileName == "" {
tbKey := tableKey{
accountId: txn.writes[i].accountId,
databaseId: txn.writes[i].databaseId,
dbName: txn.writes[i].databaseName,
name: txn.writes[i].tableName,
}
bat := txn.writes[i].bat
if typ == INSERT {
size += uint64(bat.Size())
pkCount += bat.RowCount()
} else {
deleteCnt += bat.RowCount()
}
var newBat *batch.Batch
if typ == INSERT {
// skip rowid
newBat = batch.NewWithSize(len(bat.Vecs) - 1)
newBat.SetAttributes(bat.Attrs[1:])
newBat.Vecs = bat.Vecs[1:]
newBat.SetRowCount(bat.Vecs[0].Length())
} else {
newBat = batch.NewWithSize(len(bat.Vecs))
newBat.SetAttributes(bat.Attrs)
newBat.Vecs = bat.Vecs
newBat.SetRowCount(bat.Vecs[0].Length())
}
mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)
func (txn *Transaction) dumpInsertBatchLocked(offset int, size *uint64, pkCount *int) error {
mp := make(map[tableKey][]*batch.Batch)
lastTxnWritesIndex := offset
write := txn.writes
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}

keepElement = false
keepElement := true
if txn.writes[i].typ == INSERT && txn.writes[i].fileName == "" {
tbKey := tableKey{
accountId: txn.writes[i].accountId,
databaseId: txn.writes[i].databaseId,
dbName: txn.writes[i].databaseName,
name: txn.writes[i].tableName,
}
bat := txn.writes[i].bat
*size += uint64(bat.Size())
*pkCount += bat.RowCount()
var newBat *batch.Batch

Check failure on line 505 in pkg/vm/engine/disttae/txn.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

S1021: should merge variable declaration with assignment on next line (gosimple)
// skip rowid
newBat = batch.NewWithSize(len(bat.Vecs) - 1)
newBat.SetAttributes(bat.Attrs[1:])
newBat.Vecs = bat.Vecs[1:]
newBat.SetRowCount(bat.Vecs[0].Length())
mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)

if keepElement {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
}
keepElement = false
}

if typ == DELETE && deleteCnt < txn.engine.insertEntryMaxCount {
return nil
if keepElement {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
}
}

txn.writes = write[:lastTxnWritesIndex]
txn.writes = write[:lastTxnWritesIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
tbl, err := txn.getTable(tbKey.accountId, tbKey.dbName, tbKey.name)
if err != nil {
return err
}
for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
tbl, err := txn.getTable(tbKey.accountId, tbKey.dbName, tbKey.name)
if err != nil {
return err
}

tableDef := tbl.GetTableDef(txn.proc.Ctx)
tableDef := tbl.GetTableDef(txn.proc.Ctx)

s3Writer, err := colexec.NewS3Writer(tableDef, 0)
if err != nil {
return err
}
defer s3Writer.Free(txn.proc)
for i := 0; i < len(mp[tbKey]); i++ {
s3Writer.StashBatch(txn.proc, mp[tbKey][i])
}
blockInfos, stats, err := s3Writer.SortAndSync(txn.proc)
if err != nil {
return err
}
err = s3Writer.FillBlockInfoBat(blockInfos, stats, txn.proc.GetMPool())
if err != nil {
return err
}
blockInfo := s3Writer.GetBlockInfoBat()
s3Writer, err := colexec.NewS3Writer(tableDef, 0)
if err != nil {
return err
}
defer s3Writer.Free(txn.proc)
for i := 0; i < len(mp[tbKey]); i++ {
s3Writer.StashBatch(txn.proc, mp[tbKey][i])
}
blockInfos, stats, err := s3Writer.SortAndSync(txn.proc)
if err != nil {
return err
}
err = s3Writer.FillBlockInfoBat(blockInfos, stats, txn.proc.GetMPool())
if err != nil {
return err
}
blockInfo := s3Writer.GetBlockInfoBat()

lenVecs := len(blockInfo.Attrs)
// only remain the metaLoc col and object stats
for i := 0; i < lenVecs-2; i++ {
blockInfo.Vecs[i].Free(txn.proc.GetMPool())
}
blockInfo.Vecs = blockInfo.Vecs[lenVecs-2:]
blockInfo.Attrs = blockInfo.Attrs[lenVecs-2:]
blockInfo.SetRowCount(blockInfo.Vecs[0].Length())
lenVecs := len(blockInfo.Attrs)
// only remain the metaLoc col and object stats
for i := 0; i < lenVecs-2; i++ {
blockInfo.Vecs[i].Free(txn.proc.GetMPool())
}
blockInfo.Vecs = blockInfo.Vecs[lenVecs-2:]
blockInfo.Attrs = blockInfo.Attrs[lenVecs-2:]
blockInfo.SetRowCount(blockInfo.Vecs[0].Length())

var table *txnTable
if v, ok := tbl.(*txnTableDelegate); ok {
table = v.origin
} else {
table = tbl.(*txnTable)
}
fileName := objectio.DecodeBlockInfo(blockInfo.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String()
err = table.getTxn().WriteFileLocked(
typ,
table.accountId,
table.db.databaseId,
table.tableId,
table.db.databaseName,
table.tableName,
fileName,
blockInfo,
table.getTxn().tnStores[0],
)
if err != nil {
return err
var table *txnTable
if v, ok := tbl.(*txnTableDelegate); ok {
table = v.origin
} else {
table = tbl.(*txnTable)
}
fileName := objectio.DecodeBlockInfo(blockInfo.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String()
err = table.getTxn().WriteFileLocked(
INSERT,
table.accountId,
table.db.databaseId,
table.tableId,
table.db.databaseName,
table.tableName,
fileName,
blockInfo,
table.getTxn().tnStores[0],
)
if err != nil {
return err
}
}
return nil
}

func (txn *Transaction) dumpDeleteBatchLocked(offset int) error {
deleteCnt := 0
mp := make(map[tableKey][]*batch.Batch)
lastTxnWritesIndex := offset
write := txn.writes
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}

keepElement := true
if txn.writes[i].typ == INSERT && txn.writes[i].fileName == "" {
tbKey := tableKey{
accountId: txn.writes[i].accountId,
databaseId: txn.writes[i].databaseId,
dbName: txn.writes[i].databaseName,
name: txn.writes[i].tableName,
}
bat := txn.writes[i].bat
deleteCnt += bat.RowCount()

var newBat *batch.Batch

Check failure on line 614 in pkg/vm/engine/disttae/txn.go

View workflow job for this annotation

GitHub Actions / Matrixone CI / SCA Test on Ubuntu/x86

S1021: should merge variable declaration with assignment on next line (gosimple)
newBat = batch.NewWithSize(len(bat.Vecs))
newBat.SetAttributes(bat.Attrs)
newBat.Vecs = bat.Vecs
newBat.SetRowCount(bat.Vecs[0].Length())

mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)

keepElement = false
}

if keepElement {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
}
return nil
}

if err := dump(INSERT); err != nil {
return err
if deleteCnt < txn.engine.insertEntryMaxCount {
return nil
}

if dumpAll {
if txn.approximateInMemDeleteCnt >= txn.engine.insertEntryMaxCount {
if err := dump(DELETE); err != nil {
return err
}
txn.writes = write[:lastTxnWritesIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
tbl, err := txn.getTable(tbKey.accountId, tbKey.dbName, tbKey.name)
if err != nil {
return err
}
txn.approximateInMemDeleteCnt = 0
txn.workspaceSize = 0
txn.pkCount -= pkCount
// modifies txn.writes.
writes := txn.writes[:0]
for i, write := range txn.writes {
if write.bat != nil {
writes = append(writes, txn.writes[i])
}

s3Writer, err := colexec.NewS3TombstoneWriter()
if err != nil {
return err
}
defer s3Writer.Free(txn.proc)
for i := 0; i < len(mp[tbKey]); i++ {
s3Writer.StashBatch(txn.proc, mp[tbKey][i])
}
_, stats, err := s3Writer.SortAndSync(txn.proc)
if err != nil {
return err
}
bat := batch.NewWithSize(2)
bat.Attrs = []string{catalog2.ObjectAttr_ObjectStats, catalog2.AttrPKVal}
bat.SetVector(0, vector.NewVec(types.T_text.ToType()))
if err = vector.AppendBytes(
bat.GetVector(0), stats.Marshal(), false, txn.proc.GetMPool()); err != nil {
return err
}

bat.SetRowCount(bat.Vecs[0].Length())

var table *txnTable
if v, ok := tbl.(*txnTableDelegate); ok {
table = v.origin
} else {
table = tbl.(*txnTable)
}
fileName := stats.ObjectLocation().String()
err = table.getTxn().WriteFileLocked(
DELETE,
table.accountId,
table.db.databaseId,
table.tableId,
table.db.databaseName,
table.tableName,
fileName,
bat,
table.getTxn().tnStores[0],
)
if err != nil {
return err
}
txn.writes = writes
} else {
txn.workspaceSize -= size
txn.pkCount -= pkCount
}
return nil
}
Expand Down Expand Up @@ -653,7 +744,7 @@ func (txn *Transaction) WriteFileLocked(
tnStore DNStore) error {
txn.hasS3Op.Store(true)
newBat := bat
if bat.Attrs[0] != catalog.BlockMeta_MetaLoc && bat.Attrs[0] != catalog.ObjectMeta_ObjectStats {
if typ == INSERT {
newBat = batch.NewWithSize(len(bat.Vecs))
newBat.SetAttributes([]string{catalog.BlockMeta_MetaLoc, catalog.ObjectMeta_ObjectStats})

Expand Down

0 comments on commit 632b895

Please sign in to comment.