Skip to content

Commit

Permalink
pendingblocks mod + log
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Aug 13, 2024
1 parent 07f9ef7 commit b512039
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 14 deletions.
5 changes: 3 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,13 @@ func (m *Manager) Start(ctx context.Context) error {

// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
Expand Down
15 changes: 8 additions & 7 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ func SubmitLoopInner(

pendingBytes := atomic.Uint64{}

pendingBlocks := atomic.Uint64{}
pendingBlocks.Store(pendingSubmittedBlocks())
trigger := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on trigger thread
submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread

eg.Go(func() error {
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBlockSkew <= pendingBlocks.Load() {
if maxBlockSkew <= pendingSubmittedBlocks() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -74,12 +72,12 @@ func SubmitLoopInner(
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
pendingBlocks.Add(uint64(1))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}
}

types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBlocks.Set(float64(pendingBlocks.Load()))
types.RollappPendingSubmissionsSkewNumBlocks.Set(float64(pendingSubmittedBlocks()))
submitter.Nudge()
}
})
Expand All @@ -97,7 +95,7 @@ func SubmitLoopInner(
}
pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBlocks.Set(float64(pendingBlocks.Load()))
types.RollappPendingSubmissionsSkewNumBlocks.Set(float64(pendingSubmittedBlocks()))

// while there are accumulated blocks, create and submit batches!!
for {
Expand All @@ -108,7 +106,10 @@ func SubmitLoopInner(
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
}

nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
logger.Debug("Create and submit batch", "consumed", nConsumed)

if err != nil {
err = fmt.Errorf("create and submit batch: %w", err)
if errors.Is(err, gerrc.ErrInternal) {
Expand All @@ -121,7 +122,6 @@ func SubmitLoopInner(
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
pendingBlocks.Store(pendingSubmittedBlocks())
}
trigger.Nudge()
}
Expand Down Expand Up @@ -235,6 +235,7 @@ func (m *Manager) GetUnsubmittedBytes() int {
On node start we want to include the count of any blocks which were produced and not submitted in a previous instance
*/
currH := m.State.Height()

for h := m.NextHeightToSubmit(); h <= currH; h++ {
block, err := m.Store.LoadBlock(h)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func testSubmitLoop(
submitBatch := func(maxSize uint64) (uint64, error) { // mock the batch submission
time.Sleep(approx(args.submitTime))
if rand.Float64() < args.submissionHaltProbability {
t.Log("stopped")
time.Sleep(args.submissionHaltTime)
timeLastProgress.Store(time.Now().Unix()) // we have now recovered
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("block_batch_size_bytes must be positive")
}

// 345 is the min block+commit size. therefore block skew should not be smaller than max num blocks in a batch, otherwise block production will be stopped before
// 345 is the min block+commit size. therefore block skew should not be smaller than max num blocks in a batch, otherwise block production will be stopped before having the chance to submit
if c.BatchSkewBlocks <= c.BatchSubmitBytes/345 {
return fmt.Errorf("batch_skew_blocks must greater than %d", c.BatchSubmitBytes/345)
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ replace (
github.com/evmos/evmos/v12 => github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4
github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1
github.com/libp2p/go-libp2p-pubsub => github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2
github.com/tendermint/tendermint => github.com/dymensionxyz/cometbft v0.34.29-0.20240806124126-f84b87caf3a4
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 h1:vmAdUGUc4rTIiO3Phezr
github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3/go.mod h1:LfPv2O1HXMgETpka81Pg3nXy+U/7urq8dn85ZnSXK5Y=
github.com/dymensionxyz/gerr-cosmos v1.0.0 h1:oi91rgOkpJWr41oX9JOyjvvBnhGY54tj513x8VlDAEc=
github.com/dymensionxyz/gerr-cosmos v1.0.0/go.mod h1:n+0olxPogzWqFKba45mCpvrHLGmeS8W9UZjggHnWk6c=
github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2 h1:5FMEOpX5OuoRfwwjjA+LxRJXoDT0fFvg8/rlat7z8bE=
github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw=
github.com/dymensionxyz/rpc v1.3.1 h1:7EXWIobaBes5zldRvTIg7TmNsEKjicrWA/OjCc0NaGs=
github.com/dymensionxyz/rpc v1.3.1/go.mod h1:f+WpX8ysy8wt95iGc6auYlHcnHj2bUkhiRVkkKNys8c=
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
Expand Down Expand Up @@ -693,6 +691,8 @@ github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0Trt
github.com/libp2p/go-libp2p-kad-dht v0.25.2/go.mod h1:6za56ncRHYXX4Nc2vn8z7CZK0P4QiMcrn77acKLM2Oo=
github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0=
github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0=
github.com/libp2p/go-libp2p-pubsub v0.10.1 h1:/RqOZpEtAolsr8/9CC8KqROJSOZeu7lK7fPftn4MwNg=
github.com/libp2p/go-libp2p-pubsub v0.10.1/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw=
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk=
github.com/libp2p/go-libp2p-routing-helpers v0.7.3 h1:u1LGzAMVRK9Nqq5aYDVOiq/HaB93U9WWczBzGyAC5ZY=
Expand Down

0 comments on commit b512039

Please sign in to comment.