Skip to content

Commit

Permalink
TestRecounstructionOfBtreeIndex passed. But internal state reload on …
Browse files Browse the repository at this point in the history
…SamehadaDB class is not worked yet and reloading table catalog with the class may broken.
  • Loading branch information
ryogrid committed Dec 21, 2024
1 parent 57cce7c commit 10fffde
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 0 deletions.
229 changes: 229 additions & 0 deletions lib/execution/executors/executor_test/btree_index_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"fmt"
"github.com/ryogrid/SamehadaDB/lib/catalog"
"github.com/ryogrid/SamehadaDB/lib/common"
"github.com/ryogrid/SamehadaDB/lib/execution/executors"
"github.com/ryogrid/SamehadaDB/lib/execution/expression"
"github.com/ryogrid/SamehadaDB/lib/execution/plans"
"github.com/ryogrid/SamehadaDB/lib/recovery/log_recovery"
"github.com/ryogrid/SamehadaDB/lib/samehada"
"github.com/ryogrid/SamehadaDB/lib/storage/index"
"github.com/ryogrid/SamehadaDB/lib/storage/index/index_constants"
"github.com/ryogrid/SamehadaDB/lib/storage/table/column"
"github.com/ryogrid/SamehadaDB/lib/storage/table/schema"
testingpkg "github.com/ryogrid/SamehadaDB/lib/testing/testing_assert"
"github.com/ryogrid/SamehadaDB/lib/testing/testing_pattern_fw"
"github.com/ryogrid/SamehadaDB/lib/types"
"os"
"testing"
Expand Down Expand Up @@ -187,3 +193,226 @@ func TestKeyDuplicateBTreePrallelTxnStrideVarchar(t *testing.T) {
//}
testBTreeParallelTxnStrideRoot[string](t, types.Varchar)
}

func TestRecounstructionOfBtreeIndex(t *testing.T) {
common.TempSuppressOnMemStorageMutex.Lock()
common.TempSuppressOnMemStorage = true

if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage {
os.Remove(t.Name() + ".db")
os.Remove(t.Name() + ".log")
}

//sh := samehada.NewSamehadaDB(t.Name(), 10*1024)
//shi := sh.GetSamehadaInstance()
shi := samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
shi.GetLogManager().ActivateLogging()
testingpkg.Assert(t, shi.GetLogManager().IsEnabledLogging(), "")
fmt.Println("System logging is active.")

txn_mgr := shi.GetTransactionManager()
bpm := shi.GetBufferPoolManager()
txn := txn_mgr.Begin(nil)

c := catalog.BootstrapCatalog(bpm, shi.GetLogManager(), shi.GetLockManager(), txn)
columnA := column.NewColumn("a", types.Integer, true, index_constants.INDEX_KIND_BTREE, types.PageID(-1), nil)
columnB := column.NewColumn("b", types.Integer, true, index_constants.INDEX_KIND_BTREE, types.PageID(-1), nil)
columnC := column.NewColumn("c", types.Varchar, true, index_constants.INDEX_KIND_BTREE, types.PageID(-1), nil)
schema_ := schema.NewSchema([]*column.Column{columnA, columnB, columnC})

tableMetadata := c.CreateTable("test_1", schema_, txn)

row1 := make([]types.Value, 0)
row1 = append(row1, types.NewInteger(20))
row1 = append(row1, types.NewInteger(22))
row1 = append(row1, types.NewVarchar("foo"))

row2 := make([]types.Value, 0)
row2 = append(row2, types.NewInteger(99))
row2 = append(row2, types.NewInteger(55))
row2 = append(row2, types.NewVarchar("bar"))

row3 := make([]types.Value, 0)
row3 = append(row3, types.NewInteger(1225))
row3 = append(row3, types.NewInteger(712))
row3 = append(row3, types.NewVarchar("baz"))

row4 := make([]types.Value, 0)
row4 = append(row4, types.NewInteger(1225))
row4 = append(row4, types.NewInteger(712))
row4 = append(row4, types.NewVarchar("baz"))

rows := make([][]types.Value, 0)
rows = append(rows, row1)
rows = append(rows, row2)
rows = append(rows, row3)
rows = append(rows, row4)

insertPlanNode := plans.NewInsertPlanNode(rows, tableMetadata.OID())

executionEngine := &executors.ExecutionEngine{}
executorContext := executors.NewExecutorContext(c, bpm, txn)
executionEngine.Execute(insertPlanNode, executorContext)

//bpm.FlushAllPages()
txn_mgr.Commit(nil, txn)

txn = shi.GetTransactionManager().Begin(nil)

cases := []testing_pattern_fw.IndexPointScanTestCase{{
"select a ... WHERE b = 55",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}},
testing_pattern_fw.Predicate{"b", expression.Equal, 55},
[]testing_pattern_fw.Assertion{{"a", 99}},
1,
}, {
"select b ... WHERE b = 55",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"b", types.Integer}},
testing_pattern_fw.Predicate{"b", expression.Equal, 55},
[]testing_pattern_fw.Assertion{{"b", 55}},
1,
}, {
"select a, b ... WHERE a = 20",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}, {"b", types.Integer}},
testing_pattern_fw.Predicate{"a", expression.Equal, 20},
[]testing_pattern_fw.Assertion{{"a", 20}, {"b", 22}},
1,
}, {
"select a, b ... WHERE a = 99",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}, {"b", types.Integer}},
testing_pattern_fw.Predicate{"a", expression.Equal, 99},
[]testing_pattern_fw.Assertion{{"a", 99}, {"b", 55}},
1,
}}

for _, test := range cases {
t.Run(test.Description, func(t *testing.T) {
testing_pattern_fw.ExecuteIndexPointScanTestCase(t, test, index_constants.INDEX_KIND_BTREE)
})
}

shi.GetTransactionManager().Commit(nil, txn)

// serialize internal page mapping entries of B-tree container to pages
allTables := c.GetAllTables()
for _, table := range allTables {
for _, index_ := range table.Indexes() {
if index_ != nil {
switch index_.(type) {
case *index.BTreeIndex:
// serialize internal page mapping entries to pages
index_.(*index.BTreeIndex).WriteOutContainerStateToBPM()
default:
//do nothing
}
}
}
}

shi.Shutdown(samehada.ShutdownPatternCloseFiles)
//sh.Shutdown()

// ----------- check recovery includes index data ----------

// recovery catalog data and tuple datas

//sh = samehada.NewSamehadaDB(t.Name(), 10*1024)
//shi = sh.GetSamehadaInstance()
//c = sh.GetCatalogForTesting()
//tableMetadata = c.GetTableByName("test_1")

shi = samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
shi.GetLogManager().DeactivateLogging()
txn = shi.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)

log_recovery := log_recovery.NewLogRecovery(
shi.GetDiskManager(),
shi.GetBufferPoolManager(),
shi.GetLogManager())
greatestLSN, _, _ := log_recovery.Redo(txn)
log_recovery.Undo(txn)

dman := shi.GetDiskManager()
dman.GCLogFile()
shi.GetLogManager().SetNextLSN(greatestLSN + 1)
c = catalog.RecoveryCatalogFromCatalogPage(shi.GetBufferPoolManager(), shi.GetLogManager(), shi.GetLockManager(), txn, true)

// reconstruct all index data of all column
tableMetadata = c.GetTableByName("test_1")
//samehada.ReconstructAllIndexData(c, dman, txn)
shi.GetTransactionManager().Commit(nil, txn)

// checking reconstruction result of index data by getting tuples using index
txn = shi.GetTransactionManager().Begin(nil)

txn.SetIsRecoveryPhase(false)
shi.GetLogManager().ActivateLogging()
testingpkg.Assert(t, shi.GetLogManager().IsEnabledLogging(), "")
fmt.Println("System logging is active.")

executionEngine = &executors.ExecutionEngine{}
executorContext = executors.NewExecutorContext(c, shi.GetBufferPoolManager(), txn)

cases = []testing_pattern_fw.IndexPointScanTestCase{{
"select a ... WHERE b = 55",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}},
testing_pattern_fw.Predicate{"b", expression.Equal, 55},
[]testing_pattern_fw.Assertion{{"a", 99}},
1,
}, {
"select b ... WHERE b = 55",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"b", types.Integer}},
testing_pattern_fw.Predicate{"b", expression.Equal, 55},
[]testing_pattern_fw.Assertion{{"b", 55}},
1,
}, {
"select a, b ... WHERE a = 20",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}, {"b", types.Integer}},
testing_pattern_fw.Predicate{"a", expression.Equal, 20},
[]testing_pattern_fw.Assertion{{"a", 20}, {"b", 22}},
1,
}, {
"select a, b ... WHERE a = 99",
executionEngine,
executorContext,
tableMetadata,
[]testing_pattern_fw.Column{{"a", types.Integer}, {"b", types.Integer}},
testing_pattern_fw.Predicate{"a", expression.Equal, 99},
[]testing_pattern_fw.Assertion{{"a", 99}, {"b", 55}},
1,
}}

for _, test := range cases {
t.Run(test.Description, func(t *testing.T) {
testing_pattern_fw.ExecuteIndexPointScanTestCase(t, test, index_constants.INDEX_KIND_BTREE)
})
}

shi.GetTransactionManager().Commit(nil, txn)

common.TempSuppressOnMemStorage = false
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
common.TempSuppressOnMemStorageMutex.Unlock()
}
1 change: 1 addition & 0 deletions lib/planner/simple_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (pner *SimplePlanner) MakeCreateTablePlan() (error, plans.Plan) {
columns := make([]*column.Column, 0)
for _, cdefExp := range pner.qi.ColDefExpressions_ {
columns = append(columns, column.NewColumn(*cdefExp.ColName_, *cdefExp.ColType_, true, index_constants.INDEX_KIND_SKIP_LIST, types.PageID(-1), nil))
//columns = append(columns, column.NewColumn(*cdefExp.ColName_, *cdefExp.ColType_, true, index_constants.INDEX_KIND_BTREE, types.PageID(-1), nil))
}
schema_ := schema.NewSchema(columns)

Expand Down
11 changes: 11 additions & 0 deletions lib/samehada/samehada.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type reqResult struct {
callerCh *chan *reqResult
}

// return internal object (for testing)
func (sdb *SamehadaDB) GetSamehadaInstance() *SamehadaInstance {
return sdb.shi_
}

func reconstructIndexDataOfATbl(t *catalog.TableMetadata, c *catalog.Catalog, dman disk.DiskManager, txn *access.Transaction) {
executionEngine := &executors.ExecutionEngine{}
executorContext := executors.NewExecutorContext(c, t.Table().GetBufferPoolManager(), txn)
Expand Down Expand Up @@ -289,10 +294,16 @@ func (sdb *SamehadaDB) ShutdownForTescase() {
sdb.shi_.GetCheckpointManager().StopCheckpointTh()
sdb.statistics_updator.StopStatsUpdateTh()
sdb.request_manager.StopTh()
sdb.finalizeIndexesInternalState()
sdb.shi_.CloseFilesForTesting()
}

func (sdb *SamehadaDB) ForceCheckpointingForTestcase() {
sdb.shi_.GetCheckpointManager().BeginCheckpoint()
sdb.shi_.GetCheckpointManager().EndCheckpoint()
}

// for internal unit testing
func (sdb *SamehadaDB) GetCatalogForTesting() *catalog.Catalog {
return sdb.catalog_
}

0 comments on commit 10fffde

Please sign in to comment.