Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update #17

Closed
wants to merge 14 commits into from
175 changes: 149 additions & 26 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -132,6 +133,11 @@ func (txn *Transaction) WriteBatch(
}
}

if typ == DELETE && tableId != catalog.MO_DATABASE_ID &&
tableId != catalog.MO_TABLES_ID && tableId != catalog.MO_COLUMNS_ID {
txn.approximateInMemDeleteCnt += bat.RowCount()
}

e := Entry{
typ: typ,
accountId: accountId,
Expand Down Expand Up @@ -405,7 +411,8 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {

//offset < 0 indicates commit.
if offset < 0 {
if txn.workspaceSize < txn.engine.workspaceThreshold && txn.insertCount < txn.engine.insertEntryMaxCount {
if txn.workspaceSize < txn.engine.workspaceThreshold && txn.insertCount < txn.engine.insertEntryMaxCount &&
txn.approximateInMemDeleteCnt < txn.engine.insertEntryMaxCount {
return nil
}
} else {
Expand Down Expand Up @@ -439,19 +446,47 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
size = 0
}
txn.hasS3Op.Store(true)
mp := make(map[tableKey][]*batch.Batch)

if err := txn.dumpInsertBatchLocked(offset, &size, &pkCount); err != nil {
return err
}

if dumpAll {
if txn.approximateInMemDeleteCnt >= txn.engine.insertEntryMaxCount {
if err := txn.dumpDeleteBatchLocked(offset); err != nil {
return err
}
}
txn.approximateInMemDeleteCnt = 0
txn.workspaceSize = 0
txn.pkCount -= pkCount
// modifies txn.writes.
writes := txn.writes[:0]
for i, write := range txn.writes {
if write.bat != nil {
writes = append(writes, txn.writes[i])
}
}
txn.writes = writes
} else {
txn.workspaceSize -= size
txn.pkCount -= pkCount
}
return nil
}

func (txn *Transaction) dumpInsertBatchLocked(offset int, size *uint64, pkCount *int) error {
mp := make(map[tableKey][]*batch.Batch)
lastTxnWritesIndex := offset
write := txn.writes
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].tableId == catalog.MO_DATABASE_ID ||
txn.writes[i].tableId == catalog.MO_TABLES_ID ||
txn.writes[i].tableId == catalog.MO_COLUMNS_ID {
txn.writes[lastTxnWritesIndex] = txn.writes[i]
if txn.writes[i].isCatalog() {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
txn.writes[lastTxnWritesIndex] = txn.writes[i]
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
Expand All @@ -465,8 +500,8 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
name: txn.writes[i].tableName,
}
bat := txn.writes[i].bat
size += uint64(bat.Size())
pkCount += bat.RowCount()
*size += uint64(bat.Size())
*pkCount += bat.RowCount()
// skip rowid
newBat := batch.NewWithSize(len(bat.Vecs) - 1)
newBat.SetAttributes(bat.Attrs[1:])
Expand All @@ -479,11 +514,12 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
}

if keepElement {
txn.writes[lastTxnWritesIndex] = txn.writes[i]
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
}
}
txn.writes = txn.writes[:lastTxnWritesIndex]

txn.writes = write[:lastTxnWritesIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
Expand Down Expand Up @@ -527,9 +563,7 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
} else {
table = tbl.(*txnTable)
}
fileName := objectio.DecodeBlockInfo(
blockInfo.Vecs[0].GetBytesAt(0)).
MetaLocation().Name().String()
fileName := objectio.DecodeBlockInfo(blockInfo.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String()
err = table.getTxn().WriteFileLocked(
INSERT,
table.accountId,
Expand All @@ -545,21 +579,110 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
return err
}
}
return nil
}

if dumpAll {
txn.workspaceSize = 0
txn.pkCount -= pkCount
// modifies txn.writes.
writes := txn.writes[:0]
for i, write := range txn.writes {
if write.bat != nil {
writes = append(writes, txn.writes[i])
func (txn *Transaction) dumpDeleteBatchLocked(offset int) error {
deleteCnt := 0
mp := make(map[tableKey][]*batch.Batch)
lastTxnWritesIndex := offset
write := txn.writes
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
continue
}

keepElement := true
if txn.writes[i].typ == DELETE && txn.writes[i].fileName == "" {
tbKey := tableKey{
accountId: txn.writes[i].accountId,
databaseId: txn.writes[i].databaseId,
dbName: txn.writes[i].databaseName,
name: txn.writes[i].tableName,
}
bat := txn.writes[i].bat
deleteCnt += bat.RowCount()

newBat := batch.NewWithSize(len(bat.Vecs))
newBat.SetAttributes(bat.Attrs)
newBat.Vecs = bat.Vecs
newBat.SetRowCount(bat.Vecs[0].Length())

mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)

keepElement = false
}

if keepElement {
write[lastTxnWritesIndex] = write[i]
lastTxnWritesIndex++
}
}

if deleteCnt < txn.engine.insertEntryMaxCount {
return nil
}

txn.writes = write[:lastTxnWritesIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
tbl, err := txn.getTable(tbKey.accountId, tbKey.dbName, tbKey.name)
if err != nil {
return err
}

s3Writer, err := colexec.NewS3TombstoneWriter()
if err != nil {
return err
}
defer s3Writer.Free(txn.proc)
for i := 0; i < len(mp[tbKey]); i++ {
s3Writer.StashBatch(txn.proc, mp[tbKey][i])
}
_, stats, err := s3Writer.SortAndSync(txn.proc)
if err != nil {
return err
}
bat := batch.NewWithSize(2)
bat.Attrs = []string{catalog2.ObjectAttr_ObjectStats, catalog2.AttrPKVal}
bat.SetVector(0, vector.NewVec(types.T_text.ToType()))
if err = vector.AppendBytes(
bat.GetVector(0), stats.Marshal(), false, txn.proc.GetMPool()); err != nil {
return err
}

bat.SetRowCount(bat.Vecs[0].Length())

var table *txnTable
if v, ok := tbl.(*txnTableDelegate); ok {
table = v.origin
} else {
table = tbl.(*txnTable)
}
fileName := stats.ObjectLocation().String()
err = table.getTxn().WriteFileLocked(
DELETE,
table.accountId,
table.db.databaseId,
table.tableId,
table.db.databaseName,
table.tableName,
fileName,
bat,
table.getTxn().tnStores[0],
)
if err != nil {
return err
}
txn.writes = writes
} else {
txn.workspaceSize -= size
txn.pkCount -= pkCount
}
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ type Transaction struct {
workspaceSize uint64
// the total row count for insert entries when txn commits.
insertCount int
// the approximation of total row count for delete entries when txn commits.
approximateInMemDeleteCnt int
// the last snapshot write offset
snapshotWriteOffset int

Expand Down
127 changes: 127 additions & 0 deletions pkg/vm/engine/test/workspace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
"context"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func Test_BigDeleteWriteS3(t *testing.T) {
var (
//err error
mp *mpool.MPool
accountId = catalog.System_Account
tableName = "test_reader_table"
databaseName = "test_reader_database"

primaryKeyIdx = 3

taeEngine *testutil.TestTxnStorage
rpcAgent *testutil.MockRPCAgent
disttaeEngine *testutil.TestDisttaeEngine
)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)

// mock a schema with 4 columns and the 4th column as primary key
// the first column is the 9th column in the predefined columns in
// the mock function. Here we exepct the type of the primary key
// is types.T_char or types.T_varchar
schema := catalog2.MockSchemaEnhanced(4, primaryKeyIdx, 9)
schema.Name = tableName

{
opt, err := testutil.GetS3SharedFileServiceOption(ctx, testutil.GetDefaultTestPath("test", t))
require.NoError(t, err)

disttaeEngine, taeEngine, rpcAgent, mp = testutil.CreateEngines(
ctx,
testutil.TestOptions{TaeEngineOptions: opt},
t,
testutil.WithDisttaeEngineInsertEntryMaxCount(1),
testutil.WithDisttaeEngineWorkspaceThreshold(1),
)
defer func() {
disttaeEngine.Close(ctx)
taeEngine.Close(true)
rpcAgent.Close()
}()

_, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema)
require.NoError(t, err)
}

insertCnt := 150
deleteCnt := 100
var bat2 *batch.Batch

{
// insert 150 rows
_, table, txn, err := disttaeEngine.GetTable(ctx, databaseName, tableName)
require.NoError(t, err)

bat := catalog2.MockBatch(schema, insertCnt)
err = table.Write(ctx, containers.ToCNBatch(bat))
require.NoError(t, err)

txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites(
table.GetDBID(ctx), table.GetTableID(ctx), 1, func(entry disttae.Entry) {
waitedDeletes := vector.MustFixedColWithTypeCheck[types.Rowid](entry.Bat().GetVector(0))
waitedDeletes = waitedDeletes[:deleteCnt]
bat2 = batch.NewWithSize(1)
bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType())
bat2.SetRowCount(len(waitedDeletes))
require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes, nil, mp))
})

//delete 100 rows
require.NoError(t, table.Delete(ctx, bat2, catalog.Row_ID))
require.NoError(t, txn.Commit(ctx))
}

{
_, _, reader, err := testutil.GetTableTxnReader(
ctx,
disttaeEngine,
databaseName,
tableName,
nil,
mp,
t,
)
require.NoError(t, err)

ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
_, err = reader.Read(ctx, ret.Attrs, nil, mp, ret)
require.NoError(t, err)
require.Equal(t, insertCnt-deleteCnt, ret.RowCount())
}
}
Loading