Skip to content

Commit

Permalink
Skip nil block if current round has increased (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
jait91 authored Jan 31, 2025
1 parent 85e68fc commit 5434e04
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
29 changes: 20 additions & 9 deletions block_store/mongodb/mongodb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mongodb
import (
"context"
"fmt"
"time"

"github.com/alphabill-org/alphabill-go-base/types"
"go.mongodb.org/mongo-driver/bson"
Expand All @@ -23,23 +24,33 @@ const (
txHashesKey = "txhashes"
targetUnitsKey = "transaction.servermetadata.targetunits"
latestBlockNumberKey = "latestblocknumber"

connectTimeout = time.Minute
connectionRetries = 5
connectionRetryDelay = 5 * time.Second
)

type MongoBlockStore struct {
db *mongo.Database
}

func NewMongoBlockStore(ctx context.Context, uri string) (*MongoBlockStore, error) {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
return nil, fmt.Errorf("failed to connect to mongo: %w", err)
}

store := &MongoBlockStore{db: client.Database(databaseName)}
if err = store.createCollections(ctx); err != nil {
return nil, err
for i := 0; ; i++ {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri), options.Client().SetConnectTimeout(connectTimeout))
if err != nil {
if i == connectionRetries {
return nil, fmt.Errorf("failed to connect to mongo: %w", err)
}
fmt.Printf("Failed to connect to mongo, retrying after %v... err = %s\n", connectionRetryDelay, err)
time.Sleep(connectionRetryDelay)
continue
}
store := &MongoBlockStore{db: client.Database(databaseName)}
if err = store.createCollections(ctx); err != nil {
return nil, err
}
return store, nil
}
return store, nil
}

// ensureCollectionExists creates the collection if it doesn't exist
Expand Down
20 changes: 11 additions & 9 deletions blocks/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (
"github.com/alphabill-org/alphabill-go-base/types"
)

type Store interface {
GetBlockNumber(ctx context.Context, partitionID types.PartitionID) (uint64, error)
SetBlockNumber(ctx context.Context, partitionID types.PartitionID, blockNumber uint64) error
SetTxInfo(ctx context.Context, txInfo *domain.TxInfo) error
SetBlockInfo(ctx context.Context, blockInfo *domain.BlockInfo) error
}
type (
Store interface {
GetBlockNumber(ctx context.Context, partitionID types.PartitionID) (uint64, error)
SetBlockNumber(ctx context.Context, partitionID types.PartitionID, blockNumber uint64) error
SetTxInfo(ctx context.Context, txInfo *domain.TxInfo) error
SetBlockInfo(ctx context.Context, blockInfo *domain.BlockInfo) error
}

type BlockProcessor struct {
store Store
}
BlockProcessor struct {
store Store
}
)

func NewBlockProcessor(store Store) (*BlockProcessor, error) {
return &BlockProcessor{store: store}, nil
Expand Down
37 changes: 33 additions & 4 deletions blocksync/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import (
"golang.org/x/sync/errgroup"
)

type BlockLoaderFunc func(ctx context.Context, rn uint64) (*types.Block, error)
type BlockProcessorFunc func(context.Context, *types.Block, types.PartitionTypeID) error
type (
BlockLoaderFunc func(ctx context.Context, rn uint64) (*types.Block, error)
BlockProcessorFunc func(context.Context, *types.Block, types.PartitionTypeID) error
GetRoundNumberFunc func(context.Context) (uint64, error)
)

const fetchBlockRetryCount = 5

/*
Run loads blocks using "getBlocks" and processes them using "processor" until:
Expand All @@ -32,6 +37,7 @@ loaded and processed successfully.
func Run(
ctx context.Context,
getBlock BlockLoaderFunc,
getRoundNumber GetRoundNumberFunc,
startingBlockNumber,
maxBlockNumber uint64,
batchSize int,
Expand All @@ -56,7 +62,7 @@ func Run(

g.Go(func() error {
defer close(blocks)
err := fetchBlocks(ctx, getBlock, startingBlockNumber, blocks)
err := fetchBlocks(ctx, getBlock, getRoundNumber, startingBlockNumber, blocks)
if err != nil && errors.Is(err, errMaxBlockReached) {
return nil
}
Expand All @@ -70,7 +76,14 @@ func Run(
return g.Wait()
}

func fetchBlocks(ctx context.Context, getBlock BlockLoaderFunc, blockNumber uint64, out chan<- *types.Block) error {
func fetchBlocks(
ctx context.Context,
getBlock BlockLoaderFunc,
getRoundNumber GetRoundNumberFunc,
blockNumber uint64,
out chan<- *types.Block,
) error {
retries := 0
for {
if err := ctx.Err(); err != nil {
return err
Expand All @@ -87,8 +100,24 @@ func fetchBlocks(ctx context.Context, getBlock BlockLoaderFunc, blockNumber uint

out <- block
blockNumber = round + 1
retries = 0
continue
}

// if after retries the block is still nil, check if round number has increased and move on to next block
if retries >= fetchBlockRetryCount {
roundNumber, err := getRoundNumber(ctx)
if err != nil {
fmt.Println("Failed to get latest round number: ", err)
} else if roundNumber > blockNumber {
fmt.Printf("Could not get block %d after %d retries, skipping (current round = %d)\n", blockNumber, retries, roundNumber)
blockNumber++
}
retries = 0
continue
}
retries++

// we have reached to the last block the source currently has - wait a bit before asking for more
select {
case <-ctx.Done():
Expand Down
2 changes: 0 additions & 2 deletions cmd/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
nodes:
- url: "dev-ab-money-archive.abdev1.guardtime.com/rpc"
block_number: 100
- url: "dev-ab-tokens-archive.abdev1.guardtime.com/rpc"
block_number: 100

db:
url: "mongodb://localhost:27017"
Expand Down
16 changes: 13 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

func Run(ctx context.Context, config *Config) error {
println("creating bill store...")
println("creating block store...")
store, err := mongodb.NewMongoBlockStore(ctx, config.DB.URL)
if err != nil {
return fmt.Errorf("failed to get storage: %w", err)
Expand Down Expand Up @@ -115,11 +115,20 @@ func Run(ctx context.Context, config *Config) error {
}
return storedBN, nil
}

getRoundNumber := func(ctx context.Context) (uint64, error) {
info, err := stateClient.GetRoundInfo(ctx)
if err != nil {
return 0, err
}
return info.RoundNumber, nil
}

// we act as if all errors returned by block sync are recoverable ie we
// just retry in a loop until ctx is cancelled
for {
println("starting block sync")
err := runBlockSync(ctx, stateClient.GetBlock, getBlockNumber, 100,
err := runBlockSync(ctx, stateClient.GetBlock, getRoundNumber, getBlockNumber, 100,
blockProcessor.ProcessBlock, nodeInfo.PartitionID, nodeInfo.PartitionTypeID)
if err != nil {
println(fmt.Errorf("synchronizing blocks returned error: %w", err).Error())
Expand All @@ -139,6 +148,7 @@ func Run(ctx context.Context, config *Config) error {
func runBlockSync(
ctx context.Context,
getBlocks blocksync.BlockLoaderFunc,
getRoundNumber blocksync.GetRoundNumberFunc,
getBlockNumber func(ctx context.Context, partitionID types.PartitionID) (uint64, error),
batchSize int,
processor blocksync.BlockProcessorFunc,
Expand All @@ -151,5 +161,5 @@ func runBlockSync(
}
// on bootstrap storage returns 0 as current block and as block numbering
// starts from 1 by adding 1 to it we start with the first block
return blocksync.Run(ctx, getBlocks, blockNumber+1, 0, batchSize, processor, partitionTypeID)
return blocksync.Run(ctx, getBlocks, getRoundNumber, blockNumber+1, 0, batchSize, processor, partitionTypeID)
}
6 changes: 2 additions & 4 deletions restapi/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,10 @@ func (api *RestAPI) getBlocksInRange(w http.ResponseWriter, r *http.Request) {
http.Error(w, "unable to get last blocks", http.StatusBadRequest)
return
}
var response []BlockInfo
var response = []BlockInfo{}
for _, block := range lastBlocks[partitionID] {
response = append(response, blockInfoResponse(block))
}

api.rw.WriteResponse(w, response)
return
}
Expand All @@ -162,11 +161,10 @@ func (api *RestAPI) getBlocksInRange(w http.ResponseWriter, r *http.Request) {
prevBlockNumberStr := strconv.FormatUint(prevBlockNumber, 10)
setLinkHeader(r.URL, w, prevBlockNumberStr)

var response []BlockInfo
var response = []BlockInfo{}
for _, block := range blocks {
response = append(response, blockInfoResponse(block))
}

api.rw.WriteResponse(w, response)
}

Expand Down
6 changes: 3 additions & 3 deletions restapi/txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (api *RestAPI) getTxs(w http.ResponseWriter, r *http.Request) {
return
}

var response []TxInfo
var response = []TxInfo{}
for _, txInfo := range txs {
response = append(response, TxInfo{
TxRecordHash: txInfo.TxRecordHash,
Expand Down Expand Up @@ -151,7 +151,7 @@ func (api *RestAPI) getBlockTxsByBlockNumber(w http.ResponseWriter, r *http.Requ
return
}

var response []TxInfo
var response = []TxInfo{}
for _, txInfo := range txs {
response = append(response, TxInfo{
TxRecordHash: txInfo.TxRecordHash,
Expand Down Expand Up @@ -199,7 +199,7 @@ func (api *RestAPI) getTxsByUnitID(w http.ResponseWriter, r *http.Request) {
return
}

var response []TxInfo
var response = []TxInfo{}
for _, txInfo := range txs {
response = append(response, TxInfo{
TxRecordHash: txInfo.TxRecordHash,
Expand Down

0 comments on commit 5434e04

Please sign in to comment.