Skip to content

Commit

Permalink
Merge pull request #1578 from orbs-network/feature/sync_desc_order_sup
Browse files Browse the repository at this point in the history
refactoring fixes and tests
  • Loading branch information
gadcl authored Jul 5, 2020
2 parents 2bd2505 + 5ea6b90 commit 132a85b
Show file tree
Hide file tree
Showing 19 changed files with 539 additions and 207 deletions.
1 change: 0 additions & 1 deletion bootstrap/inmemory/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (n *Node) Destroy() {
n.nodeLogic = nil
}

// TODO: Gad change to accommodate out of order
func (n *Node) ExtractBlocks() ([]*protocol.BlockPairContainer, error) {

lastBlock, err := n.blockPersistence.GetLastBlock()
Expand Down
8 changes: 4 additions & 4 deletions config/system_factory_presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ func ForE2E(

cfg.SetUint32(VIRTUAL_CHAIN_ID, uint32(virtualChainId))

// 2*slow_network_latency + avg_network_latency + 2*execution_time = 700ms
cfg.SetDuration(BENCHMARK_CONSENSUS_RETRY_INTERVAL, 700*time.Millisecond)
cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 700*time.Millisecond)
// should be longer than tx_empty_block_time
cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 2000*time.Millisecond)
cfg.SetBool(LEAN_HELIX_SHOW_DEBUG, true)
cfg.SetActiveConsensusAlgo(activeConsensusAlgo)
cfg.SetBenchmarkConsensusConstantLeader(constantConsensusLeader)

// 4*LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, if below TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS we'll constantly have syncs
// longer than tx_empty_block_time and consensus round time
cfg.SetDuration(BLOCK_SYNC_NO_COMMIT_INTERVAL, 3*time.Second)

// 1MB blocks, 1KB per tx
Expand All @@ -166,7 +166,7 @@ func ForE2E(
cfg.SetDuration(BLOCK_TRACKER_GRACE_TIMEOUT, 1*time.Second)

// if above round time, we'll have leader changes when no traffic
cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 3*time.Second) // this is the time between empty blocks when no transactions, need to be large so we don't close infinite blocks on idle
cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 1*time.Second) // this is the time between empty blocks when no transactions, need to be large so we don't close infinite blocks on idle

// makes sync slower, 4*slow_network_latency
cfg.SetDuration(BLOCK_SYNC_COLLECT_RESPONSE_TIMEOUT, 500*time.Millisecond)
Expand Down
23 changes: 11 additions & 12 deletions services/blockstorage/adapter/filesystem/file_system_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,19 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer

bh := getBlockHeight(blockPair)

syncState := f.bhIndex.getSyncState()
if err := f.bhIndex.validateCandidateBlockHeight(bh); err != nil {
return false, syncState.InOrderHeight, nil
return false, f.bhIndex.getLastBlockHeight(), nil
}

n, err := f.blockWriter.writeBlock(blockPair)
if err != nil {
return false, syncState.InOrderHeight, err
return false, f.bhIndex.getLastBlockHeight(), err
}

startPos := f.bhIndex.fetchNextOffset()
err = f.bhIndex.appendBlock(startPos+int64(n), blockPair, f.blockTracker)
if err != nil {
return false, syncState.InOrderHeight, errors.Wrap(err, "failed to update index after writing block")
return false, f.bhIndex.getLastBlockHeight(), errors.Wrap(err, "failed to update index after writing block")
}

f.metrics.sizeOnDisk.Add(int64(n))
Expand All @@ -275,9 +274,9 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer

func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint8, cursor adapter.CursorFunc) error {

inOrderHeight := f.bhIndex.getLastBlockHeight()
if (inOrderHeight < from) || from == 0 {
return fmt.Errorf("requested unsupported block height %d. Supported range for scan is determined by inOrder(%d)", from, inOrderHeight)
sequentialHeight := f.bhIndex.getLastBlockHeight()
if (sequentialHeight < from) || from == 0 {
return fmt.Errorf("requested unsupported block height %d. Supported range for scan is determined by sequence top height (%d)", from, sequentialHeight)
}

file, err := os.Open(f.blockFileName())
Expand All @@ -289,13 +288,13 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint
fromHeight := from
wantsMore := true
eof := false
for fromHeight <= inOrderHeight && wantsMore && !eof {
for fromHeight <= sequentialHeight && wantsMore && !eof {
toHeight := fromHeight + primitives.BlockHeight(pageSize) - 1
if toHeight > inOrderHeight {
toHeight = inOrderHeight
if toHeight > sequentialHeight {
toHeight = sequentialHeight
}
page := make([]*protocol.BlockPairContainer, 0, pageSize)
// TODO: Gad allow update of inOrder inside page
// TODO: Gad allow update of sequence height inside page
for height := fromHeight; height <= toHeight; height++ {
aBlock, err := f.fetchBlockFromFile(height, file)
if err != nil {
Expand All @@ -310,7 +309,7 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint
if len(page) > 0 {
wantsMore = cursor(page[0].ResultsBlock.Header.BlockHeight(), page)
}
inOrderHeight = f.bhIndex.getLastBlockHeight()
sequentialHeight = f.bhIndex.getLastBlockHeight()
fromHeight = toHeight + 1
}

Expand Down
50 changes: 25 additions & 25 deletions services/blockstorage/adapter/filesystem/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type blockHeightIndex struct {
heightOffset map[primitives.BlockHeight]int64
firstBlockInTsBucket map[uint32]primitives.BlockHeight
nextOffset int64
inOrderBlock *protocol.BlockPairContainer
sequentialTopBlock *protocol.BlockPairContainer
topBlock *protocol.BlockPairContainer
lastSyncedHeight primitives.BlockHeight
lastWrittenHeight primitives.BlockHeight
logger log.Logger
}

Expand All @@ -33,9 +33,9 @@ func newBlockHeightIndex(logger log.Logger, firstBlockOffset int64) *blockHeight
heightOffset: map[primitives.BlockHeight]int64{},
firstBlockInTsBucket: map[uint32]primitives.BlockHeight{},
nextOffset: firstBlockOffset,
inOrderBlock: nil,
sequentialTopBlock: nil,
topBlock: nil,
lastSyncedHeight: 0,
lastWrittenHeight: 0,
}
}

Expand All @@ -44,16 +44,16 @@ func (i *blockHeightIndex) getSyncState() internodesync.SyncState {
defer i.RUnlock()
return internodesync.SyncState{
TopHeight: getBlockHeight(i.topBlock),
InOrderHeight: getBlockHeight(i.inOrderBlock),
LastSyncedHeight: i.lastSyncedHeight,
InOrderHeight: getBlockHeight(i.sequentialTopBlock),
LastSyncedHeight: i.lastWrittenHeight,
}
}

func (i *blockHeightIndex) getLastSyncedHeight() primitives.BlockHeight {
func (i *blockHeightIndex) getLastWrittenHeight() primitives.BlockHeight {
i.RLock()
defer i.RUnlock()

return i.lastSyncedHeight
return i.lastWrittenHeight
}

func (i *blockHeightIndex) fetchNextOffset() int64 {
Expand All @@ -78,10 +78,10 @@ func (i *blockHeightIndex) getEarliestTxBlockInBucketForTsRange(rangeStart primi

fromBucket := blockTsBucketKey(rangeStart)
toBucket := blockTsBucketKey(rangeEnd)
inOrderHeight := getBlockHeight(i.inOrderBlock)
sequentialHeight := getBlockHeight(i.sequentialTopBlock)
for b := fromBucket; b <= toBucket; b++ {
blockHeight, exists := i.firstBlockInTsBucket[b]
if blockHeight > inOrderHeight {
if blockHeight > sequentialHeight {
return 0, false
} else if exists {
return blockHeight, true
Expand All @@ -96,13 +96,13 @@ func (i *blockHeightIndex) validateCandidateBlockHeight(candidateBlockHeight pri
defer i.RUnlock()

topHeight := getBlockHeight(i.topBlock)
inOrderHeight := getBlockHeight(i.inOrderBlock)
sequentialHeight := getBlockHeight(i.sequentialTopBlock)

if i.lastSyncedHeight > inOrderHeight && candidateBlockHeight != i.lastSyncedHeight-1 {
err = fmt.Errorf("sync session in progress, expected block height %d", i.lastSyncedHeight-1)
if i.lastWrittenHeight > sequentialHeight && candidateBlockHeight != i.lastWrittenHeight-1 {
err = fmt.Errorf("sync session in progress, expected block height %d", i.lastWrittenHeight-1)

} else if inOrderHeight == topHeight && candidateBlockHeight <= inOrderHeight {
err = fmt.Errorf("expected block height higher than current top %d", inOrderHeight)
} else if sequentialHeight == topHeight && candidateBlockHeight <= sequentialHeight {
err = fmt.Errorf("expected block height higher than current top %d", sequentialHeight)
}

if err != nil {
Expand All @@ -120,30 +120,30 @@ func (i *blockHeightIndex) appendBlock(newOffset int64, newBlock *protocol.Block
i.Lock()
defer i.Unlock()
topHeight := getBlockHeight(i.topBlock)
inOrderHeight := getBlockHeight(i.inOrderBlock)
sequentialHeight := getBlockHeight(i.sequentialTopBlock)
numTxReceipts := newBlock.ResultsBlock.Header.NumTransactionReceipts()
blockTs := newBlock.ResultsBlock.Header.Timestamp()

i.heightOffset[newBlockHeight] = i.nextOffset
i.nextOffset = newOffset
// update indices
i.lastSyncedHeight = newBlockHeight
i.lastWrittenHeight = newBlockHeight
if newBlockHeight > topHeight {
i.topBlock = newBlock
topHeight = newBlockHeight
}
if i.lastSyncedHeight == inOrderHeight+1 {
for height := inOrderHeight + 1; height <= topHeight; height++ {
if i.lastWrittenHeight == sequentialHeight+1 {
for height := sequentialHeight + 1; height <= topHeight; height++ {
if _, ok := i.heightOffset[height]; !ok { // block does not exists
i.lastSyncedHeight = topHeight
return fmt.Errorf("offset missing for blockHeight (%d), in range (%d - %d) assumed to exist in file storage", uint64(height), uint64(inOrderHeight+1), uint64(topHeight))
i.lastWrittenHeight = topHeight
return fmt.Errorf("offset missing for blockHeight (%d), in range (%d - %d) assumed to exist in file storage", uint64(height), uint64(sequentialHeight+1), uint64(topHeight))
}
if blockTracker != nil {
blockTracker.IncrementTo(height)
}
}
i.lastSyncedHeight = topHeight
i.inOrderBlock = i.topBlock
i.lastWrittenHeight = topHeight
i.sequentialTopBlock = i.topBlock
}

if numTxReceipts > 0 {
Expand All @@ -160,13 +160,13 @@ func (i *blockHeightIndex) appendBlock(newOffset int64, newBlock *protocol.Block
func (i *blockHeightIndex) getLastBlock() *protocol.BlockPairContainer {
i.RLock()
defer i.RUnlock()
return i.inOrderBlock
return i.sequentialTopBlock
}

func (i *blockHeightIndex) getLastBlockHeight() primitives.BlockHeight {
i.RLock()
defer i.RUnlock()
return getBlockHeight(i.inOrderBlock)
return getBlockHeight(i.sequentialTopBlock)
}

const minuteToNanoRatio = 60 * 1000 * 1000 * 1000
Expand Down
Loading

0 comments on commit 132a85b

Please sign in to comment.