From 27ab98ec3e91890fff87b493b3ba0b611d63b7c2 Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 8 Nov 2024 16:41:32 +0400 Subject: [PATCH 1/2] Improve l1 migration --- blockchain/blockchain.go | 10 +++-- migration/migration.go | 84 ++++++++++++++++++++++++++++++++++------ node/node.go | 2 + 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 4f73076af..853969419 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -375,7 +375,7 @@ func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommit return err } - if err := StoreL1HandlerMsgHashes(txn, block.Transactions); err != nil { + if err := storeL1HandlerMsgHashes(txn, block.Transactions); err != nil { return err } @@ -605,10 +605,10 @@ func blockByHash(txn db.Transaction, hash *felt.Felt) (*core.Block, error) { }) } -func StoreL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction) error { +func storeL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction) error { for _, txn := range blockTxns { if l1Handler, ok := (txn).(*core.L1HandlerTransaction); ok { - err := dbTxn.Set(db.L1HandlerTxnHashByMsgHash.Key(l1Handler.MessageHash()), txn.Hash().Marshal()) + err := StoreL1(dbTxn, txn.Hash(), l1Handler.MessageHash()) if err != nil { return err } @@ -617,6 +617,10 @@ func StoreL1HandlerMsgHashes(dbTxn db.Transaction, blockTxns []core.Transaction) return nil } +func StoreL1(dbTxn db.Transaction, txHash *felt.Felt, messageHash []byte) error { + return dbTxn.Set(db.L1HandlerTxnHashByMsgHash.Key(messageHash), txHash.Marshal()) +} + func storeStateUpdate(txn db.Transaction, blockNumber uint64, update *core.StateUpdate) error { numBytes := core.MarshalBlockNumber(blockNumber) diff --git a/migration/migration.go b/migration/migration.go index 1b829eac6..7c8e7ee39 100644 --- a/migration/migration.go +++ b/migration/migration.go @@ -491,19 +491,81 @@ func calculateBlockCommitments(txn db.Transaction, network *utils.Network) error return processBlocks(txn, processBlockFunc) } -func calculateL1MsgHashes(txn db.Transaction, n *utils.Network) error { - processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error { - txnLock.Lock() - txns, err := blockchain.TransactionsByBlockNumber(txn, blockNumber) - txnLock.Unlock() - if err != nil { - return err +func calculateL1MsgHashes(dbTx db.Transaction, n *utils.Network) error { + numOfWorkers := runtime.GOMAXPROCS(0) + // add one more worker to the pool to handle writes to db + workerPool := pool.New().WithErrors().WithMaxGoroutines(numOfWorkers + 1) + + chainHeight, err := blockchain.ChainHeight(dbTx) + if err != nil { + if errors.Is(err, db.ErrKeyNotFound) { + return nil } - txnLock.Lock() - defer txnLock.Unlock() - return blockchain.StoreL1HandlerMsgHashes(txn, txns) + return err } - return processBlocks(txn, processBlockFunc) + + blockNumbers := make(chan uint64, 1024) //nolint:mnd + go func() { + for bNumber := range chainHeight + 1 { + blockNumbers <- bNumber + } + close(blockNumbers) + }() + + var dbMx sync.Mutex + + type hashPair struct { + txHash *felt.Felt + messageHash []byte + } + computedPairs := make(chan hashPair, 1024) + workerPool.Go(func() error { + for pair := range computedPairs { + dbMx.Lock() + if err := blockchain.StoreL1(dbTx, pair.txHash, pair.messageHash); err != nil { + return err + } + dbMx.Unlock() + } + + return nil + }) + + var wg sync.WaitGroup + wg.Add(numOfWorkers) + fmt.Println("Num of workers") + for range numOfWorkers { + fmt.Println("setup worker") + workerPool.Go(func() error { + defer wg.Done() + for bNumber := range blockNumbers { + fmt.Println("Processing block", bNumber) + dbMx.Lock() + txns, err := blockchain.TransactionsByBlockNumber(dbTx, bNumber) + if err != nil { + return err + } + dbMx.Unlock() + + for _, txn := range txns { + if l1Handler, ok := (txn).(*core.L1HandlerTransaction); ok { + computedPairs <- hashPair{ + txHash: txn.Hash(), + messageHash: l1Handler.MessageHash(), + } + } + } + + } + return nil + }) + } + go func() { + wg.Wait() + close(computedPairs) + }() + + return workerPool.Wait() } func bitset2Key(bs *bitset.BitSet) *trie.Key { diff --git a/node/node.go b/node/node.go index 30ca5b642..7f01f5773 100644 --- a/node/node.go +++ b/node/node.go @@ -368,6 +368,8 @@ func (n *Node) Run(ctx context.Context) { n.log.Errorw("Error while migrating the DB", "err", err) return } + fmt.Println("Migration completed") + return for _, s := range n.services { wg.Go(func() { From 3eed12f8184f91d1b03c9da758a5051aab23fd2e Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 8 Nov 2024 16:43:58 +0400 Subject: [PATCH 2/2] Remove debugging lines --- migration/migration.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/migration/migration.go b/migration/migration.go index 7c8e7ee39..8b9104483 100644 --- a/migration/migration.go +++ b/migration/migration.go @@ -533,13 +533,10 @@ func calculateL1MsgHashes(dbTx db.Transaction, n *utils.Network) error { var wg sync.WaitGroup wg.Add(numOfWorkers) - fmt.Println("Num of workers") for range numOfWorkers { - fmt.Println("setup worker") workerPool.Go(func() error { defer wg.Done() for bNumber := range blockNumbers { - fmt.Println("Processing block", bNumber) dbMx.Lock() txns, err := blockchain.TransactionsByBlockNumber(dbTx, bNumber) if err != nil {