Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beacon sync revisting fc automove base2 #2995

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nimbus/beacon/api_handler/api_forkchoice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,

# Update sync header (if any)
com.syncReqNewHead(header)
com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash)
com.reqBeaconSyncTargetCB(header)

return simpleFCU(PayloadExecutionStatus.syncing)

Expand Down
8 changes: 4 additions & 4 deletions nimbus/common/common.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2022-2024 Status Research & Development GmbH
# Copyright (c) 2022-2025 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -43,7 +43,7 @@ type
SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].}
## Update head for syncing

ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].}
ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].}
## Ditto (for beacon sync)

NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -344,10 +344,10 @@ proc syncReqNewHead*(com: CommonRef; header: Header)
if not com.syncReqNewHead.isNil:
com.syncReqNewHead(header)

proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) =
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) =
## Used by RPC updater
if not com.reqBeaconSyncTargetCB.isNil:
com.reqBeaconSyncTargetCB(header, finHash)
com.reqBeaconSyncTargetCB(header)

proc notifyBadBlock*(com: CommonRef; invalid, origin: Header)
{.gcsafe, raises: [].} =
Expand Down
62 changes: 59 additions & 3 deletions nimbus/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,50 @@ proc updateHeadIfNecessary(c: ForkedChainRef, pvarc: PivotArc) =

c.setHead(pvarc)

proc autoUpdateBase(c: ForkedChainRef): Result[void, string] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, this function is a step in a good direction: the responsibility for updating the base should indeed rest firmly with FC.

However, there's nothing "special" with this flow at all, like this documentation makes it seem - instead, it should be considered a normal part of the importBlock flow and of the fCU flow.

There are two things that can cause the base to be updated:

  • we received new blocks (either from RPC or the syncer)
  • we move the finalization point (via fCU)

Both of these should update base potentially and both flows should call the same function to do so, also so that the hysteresis computation below applies to both "sources" of base updates equally. This means that this same function should be called for all base updates that happen in FC.

Long-range syncing, ie syncing blocks that are older than finality, is a special case of the above logic because FC already knows that they form a linear history, but for the updateBase logic it does not matter: the execution head is what determines base in this case, that and the execution head in the case of long-range syncing is simply a block on the DAG branch that leads to finality and eventually the consensus head.

## To be called after`importBlock()` for implied `base` update so that
## there is no need to know about a finalised block. Here the `base` is
## kept at a certain distance from the current `latest` cursor head.
##
# This function code is a tweaked version of `importBlockBlindly()`
# from draft PR #2845.
#
let
distanceFromBase = c.cursorHeader.number - c.baseHeader.number
hysteresis = max(1'u64, min(c.baseDistance div 4'u64, 32'u64))
# Finalizer threshold is baseDistance + 25% of baseDistancce capped at 32.
if distanceFromBase < c.baseDistance + hysteresis:
return ok()

# Move the base forward and stay away `baseDistance` blocks from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessarily valid? For example, a chain might never have finalized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be discussed/re-visited, probably. Was ported from an earlier PR.

# the top block.
let
target = c.cursorHeader.number - c.baseDistance
pvarc = ?c.findCursorArc(c.cursorHash)
newBase = c.calculateNewBase(target, pvarc)

doAssert newBase.pvHash != c.baseHash

# Write segment from base+1 to newBase into database
c.stagingTx.rollback()
c.stagingTx = c.db.ctx.txFrameBegin()
c.replaySegment(newBase.pvHash)
c.writeBaggage(newBase.pvHash)
c.stagingTx.commit()
c.stagingTx = nil

# Update base forward to newBase
c.updateBase(newBase)
c.db.persistent(newBase.pvNumber).isOkOr:
return err("Failed to save state: " & $$error)

# Move chain state forward to current head
c.stagingTx = c.db.ctx.txFrameBegin()
c.replaySegment(pvarc.pvHash)
c.setHead(pvarc)

ok()

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -523,7 +567,11 @@ proc newForkedChain*(com: CommonRef,
com.syncStart = baseHeader.number
chain

proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
proc importBlock*(
c: ForkedChainRef;
blk: Block;
autoRebase = false;
): Result[void, string] =
# Try to import block to canonical or side chain.
# return error if the block is invalid
if c.stagingTx.isNil:
Expand All @@ -533,7 +581,10 @@ proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
blk.header

if header.parentHash == c.cursorHash:
return c.validateBlock(c.cursorHeader, blk)
?c.validateBlock(c.cursorHeader, blk)
if autoRebase:
return c.autoUpdateBase()
return ok()

if header.parentHash == c.baseHash:
c.stagingTx.rollback()
Expand All @@ -555,7 +606,12 @@ proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
# `base` is the point of no return, we only update it on finality.

c.replaySegment(header.parentHash)
c.validateBlock(c.cursorHeader, blk)
?c.validateBlock(c.cursorHeader, blk)
if autoRebase:
return c.autoUpdateBase()

ok()


proc forkChoice*(c: ForkedChainRef,
headHash: Hash32,
Expand Down
4 changes: 2 additions & 2 deletions nimbus/sync/beacon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ Running the sync process for *MainNet*
--------------------------------------

For syncing, a beacon node is needed that regularly informs via *RPC* of a
recently finalised block header.
recent target block header.

The beacon node program used here is the *nimbus_beacon_node* binary from the
*nimbus-eth2* project (any other, e.g.the *light client* will do.)
Expand All @@ -230,7 +230,7 @@ The beacon node program used here is the *nimbus_beacon_node* binary from the
--jwt-secret=/tmp/jwtsecret

where *http://127.0.0.1:8551* is the URL of the sync process that receives the
finalised block header (here on the same physical machine) and `/tmp/jwtsecret`
target block headers (here on the same physical machine) and `/tmp/jwtsecret`
is the shared secret file needed for mutual communication authentication.

It will take a while for *nimbus_beacon_node* to catch up (see the
Expand Down
6 changes: 1 addition & 5 deletions nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand Down Expand Up @@ -170,10 +170,6 @@ proc runPeer*(
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
buddy.only.nMultiLoop.inc # statistics/debugging

# Update consensus header target when needed. It comes with a finalised
# header hash where we need to complete the block number.
await buddy.headerStagedUpdateTarget info

if not await buddy.napUnlessSomethingToFetch():
#
# Layout of a triple of linked header chains (see `README.md`)
Expand Down
31 changes: 3 additions & 28 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ proc blocksStagedImport*(
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)

info "Importing blocks", iv, nBlocks,
base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr,
target=ctx.layout.final.bnStr
base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr

var maxImport = iv.maxPt
block importLoop:
Expand All @@ -312,7 +311,7 @@ proc blocksStagedImport*(
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short
continue
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
ctx.pool.chain.importBlock(qItem.data.blocks[n], autoRebase=true).isOkOr:
mjfh marked this conversation as resolved.
Show resolved Hide resolved
warn info & ": import block error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short, `error`=error
Expand All @@ -331,30 +330,6 @@ proc blocksStagedImport*(
maxImport = ctx.chain.latestNumber()
break importLoop

# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash
else: ctx.layout.finalHash

doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
warn info & ": fork choice error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash=nthHash.short,
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break importLoop

# Allow pseudo/async thread switch.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
if not ctx.daemon:
maxImport = ctx.chain.latestNumber()
break importLoop

# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:
ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number)
Expand All @@ -367,7 +342,7 @@ proc blocksStagedImport*(
ctx.updateMetrics()

info "Import done", iv, nBlocks, base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, target=ctx.layout.final.bnStr
head=ctx.chain.latestNumber.bnStr
mjfh marked this conversation as resolved.
Show resolved Hide resolved
return true

# ------------------------------------------------------------------------------
Expand Down
6 changes: 1 addition & 5 deletions nimbus/sync/beacon/worker/db.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand Down Expand Up @@ -92,10 +92,6 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
# If there was a manual import after a previous sync, then saved state
# might be outdated.
if rc.isOk and
# The base number is the least record of the FCU chains/tree. So the
# finalised entry must not be smaller.
ctx.chain.baseNumber() <= rc.value.final and

# If the latest FCU number is not larger than the head, there is nothing
# to do (might also happen after a manual import.)
latest < rc.value.head and
Expand Down
31 changes: 1 addition & 30 deletions nimbus/sync/beacon/worker/headers_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import
../worker_desc,
./update/metrics,
./headers_staged/[headers, linked_hchain],
"."/[headers_unproc, update]
./headers_unproc

# ------------------------------------------------------------------------------
# Private functions
Expand Down Expand Up @@ -53,35 +53,6 @@ proc fetchAndCheck(
# Public functions
# ------------------------------------------------------------------------------

proc headerStagedUpdateTarget*(
buddy: BeaconBuddyRef;
info: static[string];
) {.async: (raises: []).} =
## Fetch finalised beacon header if there is an update available
let
ctx = buddy.ctx
peer = buddy.peer
if ctx.layout.lastState == idleSyncState and
ctx.target.final == 0 and
ctx.target.finalHash != zeroHash32 and
not ctx.target.locked:
const iv = BnRange.new(1u,1u) # dummy interval

ctx.target.locked = true
let rc = await buddy.headersFetchReversed(iv, ctx.target.finalHash, info)
ctx.target.locked = false

if rc.isOk:
let hash = rlp.encode(rc.value[0]).keccak256
if hash != ctx.target.finalHash:
# Oops
buddy.ctrl.zombie = true
debug info & ": finalised header hash mismatch", peer, hash,
expected=ctx.target.finalHash
else:
ctx.updateFinalBlockHeader(rc.value[0], ctx.target.finalHash, info)


proc headersStagedCollect*(
buddy: BeaconBuddyRef;
info: static[string];
Expand Down
30 changes: 6 additions & 24 deletions nimbus/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand Down Expand Up @@ -38,7 +38,7 @@ when enableTicker:
head: ctx.layout.head,
headOk: ctx.layout.lastState != idleSyncState,
target: ctx.target.consHead.number,
targetOk: ctx.target.final != 0,
targetOk: ctx.target.changed,

nHdrStaged: ctx.headersStagedQueueLen(),
hdrStagedTop: ctx.headersStagedQueueTopKey(),
Expand All @@ -62,31 +62,13 @@ proc updateBeaconHeaderCB(
): ReqBeaconSyncTargetCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =

# Check whether there is an update running (otherwise take next upate)
if not ctx.target.locked and # ignore if currently updating
ctx.target.final == 0 and # ignore if complete already
f != zeroHash32 and # finalised hash is set
return proc(h: Header) {.gcsafe, raises: [].} =
if ctx.chain.baseNumber() < h.number and # sanity check
ctx.layout.head < h.number and # update is advancing
ctx.target.consHead.number < h.number: # .. ditto

ctx.target.consHead = h
ctx.target.finalHash = f
ctx.target.changed = true

# Check whether `FC` knows about the finalised block already.
#
# On a full node, all blocks before the current state are stored on the
# database which is also accessed by `FC`. So one can already decude here
# whether `FC` id capable of handling that finalised block (the number of
# must be at least the `base` from `FC`.)
#
# Otherwise the block header will need to be fetched from a peer when
# available and checked there (see `headerStagedUpdateTarget()`.)
#
let finHdr = ctx.chain.headerByHash(f).valueOr: return
ctx.updateFinalBlockHeader(finHdr, f, info)
ctx.target.changed = true # enable this dataset
ctx.updateFromHibernating info # wake up if sleeping

# ------------------------------------------------------------------------------
# Public functions
Expand Down
Loading
Loading