Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

L1 migration #2259

Open
wants to merge 2 commits into
base: rianhughes/rpc8-getMessageStatus2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommit
return err
}

if err := StoreL1HandlerMsgHashes(txn, block.Transactions); err != nil {
if err := storeL1HandlerMsgHashes(txn, block.Transactions); err != nil {
return err
}

Expand Down Expand Up @@ -605,10 +605,10 @@ func blockByHash(txn db.Transaction, hash *felt.Felt) (*core.Block, error) {
})
}

func StoreL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction) error {
func storeL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction) error {
for _, txn := range blockTxns {
if l1Handler, ok := (txn).(*core.L1HandlerTransaction); ok {
err := dbTxn.Set(db.L1HandlerTxnHashByMsgHash.Key(l1Handler.MessageHash()), txn.Hash().Marshal())
err := StoreL1(dbTxn, txn.Hash(), l1Handler.MessageHash())
if err != nil {
return err
}
Expand All @@ -617,6 +617,10 @@ func StoreL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction)
return nil
}

func StoreL1(dbTxn db.Transaction, txHash *felt.Felt, messageHash []byte) error {
return dbTxn.Set(db.L1HandlerTxnHashByMsgHash.Key(messageHash), txHash.Marshal())
}

func storeStateUpdate(txn db.Transaction, blockNumber uint64, update *core.StateUpdate) error {
numBytes := core.MarshalBlockNumber(blockNumber)

Expand Down
81 changes: 70 additions & 11 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,19 +491,78 @@
return processBlocks(txn, processBlockFunc)
}

func calculateL1MsgHashes(txn db.Transaction, n *utils.Network) error {
processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error {
txnLock.Lock()
txns, err := blockchain.TransactionsByBlockNumber(txn, blockNumber)
txnLock.Unlock()
if err != nil {
return err
func calculateL1MsgHashes(dbTx db.Transaction, n *utils.Network) error {
numOfWorkers := runtime.GOMAXPROCS(0)
// add one more worker to the pool to handle writes to db
workerPool := pool.New().WithErrors().WithMaxGoroutines(numOfWorkers + 1)

chainHeight, err := blockchain.ChainHeight(dbTx)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
return nil
}
txnLock.Lock()
defer txnLock.Unlock()
return blockchain.StoreL1HandlerMsgHashes(txn, txns)
return err

Check warning on line 504 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L504

Added line #L504 was not covered by tests
}
return processBlocks(txn, processBlockFunc)

blockNumbers := make(chan uint64, 1024) //nolint:mnd
go func() {
for bNumber := range chainHeight + 1 {
blockNumbers <- bNumber
}
close(blockNumbers)
}()

var dbMx sync.Mutex

type hashPair struct {
txHash *felt.Felt
messageHash []byte
}
computedPairs := make(chan hashPair, 1024)

Check failure on line 521 in migration/migration.go

View workflow job for this annotation

GitHub Actions / lint

Magic number: 1024, in <argument> detected (mnd)
workerPool.Go(func() error {
for pair := range computedPairs {
dbMx.Lock()
if err := blockchain.StoreL1(dbTx, pair.txHash, pair.messageHash); err != nil {
return err
}

Check warning on line 527 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L526-L527

Added lines #L526 - L527 were not covered by tests
dbMx.Unlock()
}

return nil
})

var wg sync.WaitGroup
wg.Add(numOfWorkers)
for range numOfWorkers {
workerPool.Go(func() error {
defer wg.Done()
for bNumber := range blockNumbers {
dbMx.Lock()
txns, err := blockchain.TransactionsByBlockNumber(dbTx, bNumber)
if err != nil {
return err
}

Check warning on line 544 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L543-L544

Added lines #L543 - L544 were not covered by tests
dbMx.Unlock()

for _, txn := range txns {
if l1Handler, ok := (txn).(*core.L1HandlerTransaction); ok {
computedPairs <- hashPair{
txHash: txn.Hash(),
messageHash: l1Handler.MessageHash(),
}
}
}

}

Check failure on line 556 in migration/migration.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
return nil
})
}
go func() {
wg.Wait()
close(computedPairs)
}()

return workerPool.Wait()
}

func bitset2Key(bs *bitset.BitSet) *trie.Key {
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,10 @@
n.log.Errorw("Error while migrating the DB", "err", err)
return
}
fmt.Println("Migration completed")
return

Check warning on line 372 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L371-L372

Added lines #L371 - L372 were not covered by tests

for _, s := range n.services {

Check failure on line 374 in node/node.go

View workflow job for this annotation

GitHub Actions / lint

unreachable: unreachable code (govet)
wg.Go(func() {
// Immediately acknowledge panicing services by shutting down the node
// Without the deffered cancel(), we would have to wait for user to hit Ctrl+C
Expand Down
Loading