Skip to content
This repository has been archived by the owner on Jan 16, 2024. It is now read-only.

Commit

Permalink
revert sync by events
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos committed Oct 6, 2021
1 parent 9ce4afe commit 3ee5f3e
Show file tree
Hide file tree
Showing 7 changed files with 1,110 additions and 184 deletions.
18 changes: 17 additions & 1 deletion coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/etherscan"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
Expand Down Expand Up @@ -508,6 +509,21 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) {
closeTestModules(t, modules)
}

// ethAddTokens adds the tokens from the blocks to the blockchain
func ethAddTokens(blocks []common.BlockData, client *test.Client) {
for _, block := range blocks {
for _, token := range block.Rollup.AddedTokens {
consts := eth.ERC20Consts{
Name: fmt.Sprintf("Token %d", token.TokenID),
Symbol: fmt.Sprintf("TK%d", token.TokenID),
Decimals: 18,
}
// tokenConsts[token.TokenID] = consts
client.CtlAddERC20(token.EthAddr, consts)
}
}
}

func TestCoordinatorStress(t *testing.T) {
if os.Getenv("TEST_COORD_STRESS") == "" {
return
Expand All @@ -530,7 +546,7 @@ func TestCoordinatorStress(t *testing.T) {
wg.Add(1)
go func() {
for {
blockData, _, err := syn.Sync(ctx)
blockData, _, err := syn.Sync(ctx, nil)
if ctx.Err() != nil {
wg.Done()
return
Expand Down
150 changes: 150 additions & 0 deletions coordinator/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator

import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
Expand All @@ -13,15 +14,26 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/coordinator/prover"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/etherscan"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test"
"github.com/hermeznetwork/hermez-node/test/til"
"github.com/iden3/go-merkletree"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func newBigInt(s string) *big.Int {
v, ok := new(big.Int).SetString(s, 10)
if !ok {
panic(fmt.Errorf("Can't set big.Int from %s", s))
}
return v
}

func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
Expand Down Expand Up @@ -93,6 +105,144 @@ func TestPipelineShouldL1L2Batch(t *testing.T) {
closeTestModules(t, modules)
}

const (
testTokensLen = 3
testUsersLen = 4
)

func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer,
historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context {
// Create a set with `testTokensLen` tokens and for each token
// `testUsersLen` accounts.
var set []til.Instruction
// set = append(set, til.Instruction{Typ: "Blockchain"})
for tokenID := 1; tokenID < testTokensLen; tokenID++ {
set = append(set, til.Instruction{
Typ: til.TypeAddToken,
TokenID: common.TokenID(tokenID),
})
}
depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10)
require.True(t, ok)
for tokenID := 0; tokenID < testTokensLen; tokenID++ {
for user := 0; user < testUsersLen; user++ {
set = append(set, til.Instruction{
Typ: common.TxTypeCreateAccountDeposit,
TokenID: common.TokenID(tokenID),
DepositAmount: depositAmount,
From: fmt.Sprintf("User%d", user),
})
}
}
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBlock})

tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocksFromInstructions(set)
require.NoError(t, err)
require.NotNil(t, blocks)
// Set StateRoots for batches manually (til doesn't set it)
blocks[0].Rollup.Batches[0].Batch.StateRoot =
newBigInt("0")
blocks[0].Rollup.Batches[1].Batch.StateRoot =
newBigInt("6860514559199319426609623120853503165917774887908204288119245630904770452486")

ethAddTokens(blocks, ethClient)
err = ethClient.CtlAddBlocks(blocks)
require.NoError(t, err)

ctx := context.Background()
for {
syncBlock, discards, err := sync.Sync(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
break
}
}
dbTokens, err := historyDB.GetAllTokens()
require.Nil(t, err)
require.Equal(t, testTokensLen, len(dbTokens))

dbAccounts, err := historyDB.GetAllAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts))

sdbAccounts, err := stateDB.TestGetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))

return tc
}

func TestPipelineForgeBatchWithTxs(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))

var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
etherScanService, _ := etherscan.NewEtherscanService("", "")
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules, etherScanService)
sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules)

// preload the synchronier (via the test ethClient) some tokens and
// users with positive balances
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB)
syncStats := sync.Stats()
batchNum := syncStats.Sync.LastBatch.BatchNum
syncSCVars := sync.SCVars()

pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)

// Insert some l2txs in the Pool
setPool := `
Type: PoolL2
PoolTransfer(0) User0-User1: 100 (126)
PoolTransfer(0) User1-User2: 200 (126)
PoolTransfer(0) User2-User3: 300 (126)
`
l2txs, err := tilCtx.GeneratePoolL2Txs(setPool)
require.NoError(t, err)
for _, tx := range l2txs {
err := modules.l2DB.AddTxTest(&tx) //nolint:gosec
require.NoError(t, err)
}

err = pipeline.reset(batchNum, syncStats, syncSCVars)
require.NoError(t, err)
// Sanity check
sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().TestGetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))

// Sanity check
sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().TestGetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))

// Sanity check
require.Equal(t, modules.stateDB.MT.Root(),
pipeline.batchBuilder.LocalStateDB().MT.Root())

batchNum++

batchInfo, _, err := pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 3, len(batchInfo.L2Txs))

batchNum++
batchInfo, _, err = pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 0, len(batchInfo.L2Txs))

closeTestModules(t, modules)
}

func TestEthRollupForgeBatch(t *testing.T) {
if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" {
return
Expand Down
67 changes: 0 additions & 67 deletions eth/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/big"
"sort"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -43,8 +42,6 @@ type EthereumInterface interface {
EthSuggestGasPrice(ctx context.Context) (*big.Int, error)
EthKeyStore() *ethKeystore.KeyStore
EthCall(ctx context.Context, tx *types.Transaction, blockNum *big.Int) ([]byte, error)

EthNextBlockWithSCEvents(ctx context.Context, fromBlock int64, addresses []ethCommon.Address) (int64, error)
}

var (
Expand Down Expand Up @@ -76,8 +73,6 @@ type EthereumClient struct {
ks *ethKeystore.KeyStore
config *EthereumConfig
opts *bind.CallOpts

events map[int64][]types.Log
}

// NewEthereumClient creates a EthereumClient instance. The account is not mandatory (it can
Expand All @@ -102,7 +97,6 @@ func NewEthereumClient(client *ethclient.Client, account *accounts.Account,
return nil, tracerr.Wrap(err)
}
c.chainID = chainID
c.events = make(map[int64][]types.Log)
return c, nil
}

Expand Down Expand Up @@ -355,64 +349,3 @@ func (c *EthereumClient) EthCall(ctx context.Context, tx *types.Transaction,
result, err := c.client.CallContract(ctx, msg, blockNum)
return result, tracerr.Wrap(err)
}

// EthNextBlockWithSCEvents returns the next block with events in the provided SC addresses
func (c *EthereumClient) EthNextBlockWithSCEvents(ctx context.Context, fromBlock int64, addresses []ethCommon.Address) (int64, error) {
const blocksPerCycle int64 = 10000

lastBlock, err := c.EthLastBlock()
if err != nil {
return 0, nil
}

from := fromBlock
to := from + blocksPerCycle

for bn := from; bn <= to; bn++ {
if _, ok := c.events[bn]; ok {
delete(c.events, bn)
return bn, nil
}
}

for {
q := ethereum.FilterQuery{
FromBlock: big.NewInt(from),
ToBlock: big.NewInt(to),
Addresses: addresses,
}

// query logs with filter
logs, err := c.client.FilterLogs(ctx, q)
if err != nil {
return 0, err
}

if len(logs) > 0 {
for _, log := range logs {
c.events[int64(log.BlockNumber)] = append(c.events[int64(log.BlockNumber)], log)
}

// when we have logs, we sort the logs by block ascending and get the first one
sort.Slice(logs, func(i, j int) bool {
return logs[i].BlockNumber < logs[j].BlockNumber
})

return int64(logs[0].BlockNumber), nil
}

// move to the next range until the end of the chain
// if "to" is equal lastBlock then stop searching
if to == lastBlock {
return lastBlock, nil
}

from = to
to += blocksPerCycle
// if the "to" is greater than lastBlock, we set "to" as the lastBlock in order
// to be execute the last try to find a block with events
if to > lastBlock {
to = lastBlock
}
}
}
21 changes: 12 additions & 9 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,20 +834,21 @@ func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats,
return nil
}

func (n *Node) syncLoopFn(ctx context.Context) (time.Duration, error) {
blockData, discarded, err := n.sync.Sync(ctx)
func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common.Block,
time.Duration, error) {
blockData, discarded, err := n.sync.Sync(ctx, lastBlock)
stats := n.sync.Stats()
if err != nil {
// case: error
return n.cfg.Synchronizer.SyncLoopInterval.Duration, tracerr.Wrap(err)
return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration, tracerr.Wrap(err)
} else if discarded != nil {
// case: reorg
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
vars := n.sync.SCVars()
if err := n.handleReorg(ctx, stats, vars); err != nil {
return time.Duration(0), tracerr.Wrap(err)
return nil, time.Duration(0), tracerr.Wrap(err)
}
return time.Duration(0), nil
return nil, time.Duration(0), nil
} else if blockData != nil {
// case: new block
vars := common.SCVariablesPtr{
Expand All @@ -856,12 +857,12 @@ func (n *Node) syncLoopFn(ctx context.Context) (time.Duration, error) {
WDelayer: blockData.WDelayer.Vars,
}
if err := n.handleNewBlock(ctx, stats, &vars, blockData.Rollup.Batches); err != nil {
return time.Duration(0), tracerr.Wrap(err)
return nil, time.Duration(0), tracerr.Wrap(err)
}
return time.Duration(0), nil
return &blockData.Block, time.Duration(0), nil
} else {
// case: no block
return n.cfg.Synchronizer.SyncLoopInterval.Duration, nil
return lastBlock, n.cfg.Synchronizer.SyncLoopInterval.Duration, nil
}
}

Expand All @@ -883,6 +884,7 @@ func (n *Node) StartSynchronizer() {
n.wg.Add(1)
go func() {
var err error
var lastBlock *common.Block
waitDuration := time.Duration(0)
for {
select {
Expand All @@ -891,7 +893,8 @@ func (n *Node) StartSynchronizer() {
n.wg.Done()
return
case <-time.After(waitDuration):
if waitDuration, err = n.syncLoopFn(n.ctx); err != nil {
if lastBlock, waitDuration, err = n.syncLoopFn(n.ctx,
lastBlock); err != nil {
if n.ctx.Err() != nil {
continue
}
Expand Down
Loading

0 comments on commit 3ee5f3e

Please sign in to comment.