From 5434e0439bd77ef42f1be9d7bf3e300abcee1adf Mon Sep 17 00:00:00 2001 From: Johannes Ait <32078609+jait91@users.noreply.github.com> Date: Fri, 31 Jan 2025 11:55:03 +0200 Subject: [PATCH] Skip nil block if current round has increased (#12) --- block_store/mongodb/mongodb_store.go | 29 +++++++++++++++------- blocks/block_processor.go | 20 ++++++++------- blocksync/block_sync.go | 37 +++++++++++++++++++++++++--- cmd/config.yaml | 2 -- cmd/main.go | 16 +++++++++--- restapi/blocks.go | 6 ++--- restapi/txs.go | 6 ++--- 7 files changed, 82 insertions(+), 34 deletions(-) diff --git a/block_store/mongodb/mongodb_store.go b/block_store/mongodb/mongodb_store.go index df98023..307288e 100644 --- a/block_store/mongodb/mongodb_store.go +++ b/block_store/mongodb/mongodb_store.go @@ -3,6 +3,7 @@ package mongodb import ( "context" "fmt" + "time" "github.com/alphabill-org/alphabill-go-base/types" "go.mongodb.org/mongo-driver/bson" @@ -23,6 +24,10 @@ const ( txHashesKey = "txhashes" targetUnitsKey = "transaction.servermetadata.targetunits" latestBlockNumberKey = "latestblocknumber" + + connectTimeout = time.Minute + connectionRetries = 5 + connectionRetryDelay = 5 * time.Second ) type MongoBlockStore struct { @@ -30,16 +35,22 @@ type MongoBlockStore struct { } func NewMongoBlockStore(ctx context.Context, uri string) (*MongoBlockStore, error) { - client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, fmt.Errorf("failed to connect to mongo: %w", err) - } - - store := &MongoBlockStore{db: client.Database(databaseName)} - if err = store.createCollections(ctx); err != nil { - return nil, err + for i := 0; ; i++ { + client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri), options.Client().SetConnectTimeout(connectTimeout)) + if err != nil { + if i == connectionRetries { + return nil, fmt.Errorf("failed to connect to mongo: %w", err) + } + fmt.Printf("Failed to connect to mongo, retrying after %v... err = %s\n", connectionRetryDelay, err) + time.Sleep(connectionRetryDelay) + continue + } + store := &MongoBlockStore{db: client.Database(databaseName)} + if err = store.createCollections(ctx); err != nil { + return nil, err + } + return store, nil } - return store, nil } // ensureCollectionExists creates the collection if it doesn't exist diff --git a/blocks/block_processor.go b/blocks/block_processor.go index f0eacc1..92d5925 100644 --- a/blocks/block_processor.go +++ b/blocks/block_processor.go @@ -8,16 +8,18 @@ import ( "github.com/alphabill-org/alphabill-go-base/types" ) -type Store interface { - GetBlockNumber(ctx context.Context, partitionID types.PartitionID) (uint64, error) - SetBlockNumber(ctx context.Context, partitionID types.PartitionID, blockNumber uint64) error - SetTxInfo(ctx context.Context, txInfo *domain.TxInfo) error - SetBlockInfo(ctx context.Context, blockInfo *domain.BlockInfo) error -} +type ( + Store interface { + GetBlockNumber(ctx context.Context, partitionID types.PartitionID) (uint64, error) + SetBlockNumber(ctx context.Context, partitionID types.PartitionID, blockNumber uint64) error + SetTxInfo(ctx context.Context, txInfo *domain.TxInfo) error + SetBlockInfo(ctx context.Context, blockInfo *domain.BlockInfo) error + } -type BlockProcessor struct { - store Store -} + BlockProcessor struct { + store Store + } +) func NewBlockProcessor(store Store) (*BlockProcessor, error) { return &BlockProcessor{store: store}, nil diff --git a/blocksync/block_sync.go b/blocksync/block_sync.go index 5ae72fc..9c3e7a0 100644 --- a/blocksync/block_sync.go +++ b/blocksync/block_sync.go @@ -11,8 +11,13 @@ import ( "golang.org/x/sync/errgroup" ) -type BlockLoaderFunc func(ctx context.Context, rn uint64) (*types.Block, error) -type BlockProcessorFunc func(context.Context, *types.Block, types.PartitionTypeID) error +type ( + BlockLoaderFunc func(ctx context.Context, rn uint64) (*types.Block, error) + BlockProcessorFunc func(context.Context, *types.Block, types.PartitionTypeID) error + GetRoundNumberFunc func(context.Context) (uint64, error) +) + +const fetchBlockRetryCount = 5 /* Run loads blocks using "getBlocks" and processes them using "processor" until: @@ -32,6 +37,7 @@ loaded and processed successfully. func Run( ctx context.Context, getBlock BlockLoaderFunc, + getRoundNumber GetRoundNumberFunc, startingBlockNumber, maxBlockNumber uint64, batchSize int, @@ -56,7 +62,7 @@ func Run( g.Go(func() error { defer close(blocks) - err := fetchBlocks(ctx, getBlock, startingBlockNumber, blocks) + err := fetchBlocks(ctx, getBlock, getRoundNumber, startingBlockNumber, blocks) if err != nil && errors.Is(err, errMaxBlockReached) { return nil } @@ -70,7 +76,14 @@ func Run( return g.Wait() } -func fetchBlocks(ctx context.Context, getBlock BlockLoaderFunc, blockNumber uint64, out chan<- *types.Block) error { +func fetchBlocks( + ctx context.Context, + getBlock BlockLoaderFunc, + getRoundNumber GetRoundNumberFunc, + blockNumber uint64, + out chan<- *types.Block, +) error { + retries := 0 for { if err := ctx.Err(); err != nil { return err @@ -87,8 +100,24 @@ func fetchBlocks(ctx context.Context, getBlock BlockLoaderFunc, blockNumber uint out <- block blockNumber = round + 1 + retries = 0 continue } + + // if after retries the block is still nil, check if round number has increased and move on to next block + if retries >= fetchBlockRetryCount { + roundNumber, err := getRoundNumber(ctx) + if err != nil { + fmt.Println("Failed to get latest round number: ", err) + } else if roundNumber > blockNumber { + fmt.Printf("Could not get block %d after %d retries, skipping (current round = %d)\n", blockNumber, retries, roundNumber) + blockNumber++ + } + retries = 0 + continue + } + retries++ + // we have reached to the last block the source currently has - wait a bit before asking for more select { case <-ctx.Done(): diff --git a/cmd/config.yaml b/cmd/config.yaml index f5b290c..e7413a8 100644 --- a/cmd/config.yaml +++ b/cmd/config.yaml @@ -1,8 +1,6 @@ nodes: - url: "dev-ab-money-archive.abdev1.guardtime.com/rpc" block_number: 100 - - url: "dev-ab-tokens-archive.abdev1.guardtime.com/rpc" - block_number: 100 db: url: "mongodb://localhost:27017" diff --git a/cmd/main.go b/cmd/main.go index a1d1479..194eb6a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -42,7 +42,7 @@ func main() { } func Run(ctx context.Context, config *Config) error { - println("creating bill store...") + println("creating block store...") store, err := mongodb.NewMongoBlockStore(ctx, config.DB.URL) if err != nil { return fmt.Errorf("failed to get storage: %w", err) @@ -115,11 +115,20 @@ func Run(ctx context.Context, config *Config) error { } return storedBN, nil } + + getRoundNumber := func(ctx context.Context) (uint64, error) { + info, err := stateClient.GetRoundInfo(ctx) + if err != nil { + return 0, err + } + return info.RoundNumber, nil + } + // we act as if all errors returned by block sync are recoverable ie we // just retry in a loop until ctx is cancelled for { println("starting block sync") - err := runBlockSync(ctx, stateClient.GetBlock, getBlockNumber, 100, + err := runBlockSync(ctx, stateClient.GetBlock, getRoundNumber, getBlockNumber, 100, blockProcessor.ProcessBlock, nodeInfo.PartitionID, nodeInfo.PartitionTypeID) if err != nil { println(fmt.Errorf("synchronizing blocks returned error: %w", err).Error()) @@ -139,6 +148,7 @@ func Run(ctx context.Context, config *Config) error { func runBlockSync( ctx context.Context, getBlocks blocksync.BlockLoaderFunc, + getRoundNumber blocksync.GetRoundNumberFunc, getBlockNumber func(ctx context.Context, partitionID types.PartitionID) (uint64, error), batchSize int, processor blocksync.BlockProcessorFunc, @@ -151,5 +161,5 @@ func runBlockSync( } // on bootstrap storage returns 0 as current block and as block numbering // starts from 1 by adding 1 to it we start with the first block - return blocksync.Run(ctx, getBlocks, blockNumber+1, 0, batchSize, processor, partitionTypeID) + return blocksync.Run(ctx, getBlocks, getRoundNumber, blockNumber+1, 0, batchSize, processor, partitionTypeID) } diff --git a/restapi/blocks.go b/restapi/blocks.go index e8a5ce7..0ace71e 100644 --- a/restapi/blocks.go +++ b/restapi/blocks.go @@ -144,11 +144,10 @@ func (api *RestAPI) getBlocksInRange(w http.ResponseWriter, r *http.Request) { http.Error(w, "unable to get last blocks", http.StatusBadRequest) return } - var response []BlockInfo + var response = []BlockInfo{} for _, block := range lastBlocks[partitionID] { response = append(response, blockInfoResponse(block)) } - api.rw.WriteResponse(w, response) return } @@ -162,11 +161,10 @@ func (api *RestAPI) getBlocksInRange(w http.ResponseWriter, r *http.Request) { prevBlockNumberStr := strconv.FormatUint(prevBlockNumber, 10) setLinkHeader(r.URL, w, prevBlockNumberStr) - var response []BlockInfo + var response = []BlockInfo{} for _, block := range blocks { response = append(response, blockInfoResponse(block)) } - api.rw.WriteResponse(w, response) } diff --git a/restapi/txs.go b/restapi/txs.go index 0cda856..0d41d48 100644 --- a/restapi/txs.go +++ b/restapi/txs.go @@ -91,7 +91,7 @@ func (api *RestAPI) getTxs(w http.ResponseWriter, r *http.Request) { return } - var response []TxInfo + var response = []TxInfo{} for _, txInfo := range txs { response = append(response, TxInfo{ TxRecordHash: txInfo.TxRecordHash, @@ -151,7 +151,7 @@ func (api *RestAPI) getBlockTxsByBlockNumber(w http.ResponseWriter, r *http.Requ return } - var response []TxInfo + var response = []TxInfo{} for _, txInfo := range txs { response = append(response, TxInfo{ TxRecordHash: txInfo.TxRecordHash, @@ -199,7 +199,7 @@ func (api *RestAPI) getTxsByUnitID(w http.ResponseWriter, r *http.Request) { return } - var response []TxInfo + var response = []TxInfo{} for _, txInfo := range txs { response = append(response, TxInfo{ TxRecordHash: txInfo.TxRecordHash,