Skip to content

Commit

Permalink
shutter: implement provide txns (#13865)
Browse files Browse the repository at this point in the history
closes #13385
  • Loading branch information
taratorio authored Feb 19, 2025
1 parent b59245b commit 54548ac
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 7 deletions.
1 change: 1 addition & 0 deletions eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func getNextTransactions(
provideOpts := []txnprovider.ProvideOption{
txnprovider.WithAmount(amount),
txnprovider.WithParentBlockNum(executionAt),
txnprovider.WithBlockTime(header.Time),
txnprovider.WithGasTarget(remainingGas),
txnprovider.WithBlobGasTarget(remainingBlobGas),
txnprovider.WithTxnIdsFilter(alreadyYielded),
Expand Down
7 changes: 7 additions & 0 deletions txnprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func WithParentBlockNum(blockNum uint64) ProvideOption {
}
}

func WithBlockTime(blockTime uint64) ProvideOption {
return func(opt *ProvideOptions) {
opt.BlockTime = blockTime
}
}

func WithAmount(amount int) ProvideOption {
return func(opt *ProvideOptions) {
opt.Amount = amount
Expand All @@ -68,6 +74,7 @@ func WithTxnIdsFilter(txnIdsFilter mapset.Set[[32]byte]) ProvideOption {
}

type ProvideOptions struct {
BlockTime uint64
ParentBlockNum uint64
Amount int
GasTarget uint64
Expand Down
109 changes: 109 additions & 0 deletions txnprovider/shutter/block_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2025 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package shutter

import (
"context"
"errors"
"sync"

"github.com/erigontech/erigon-lib/log/v3"
)

type BlockTracker struct {
logger log.Logger
blockListener BlockListener
blockChangeMu *sync.Mutex
blockChangeCond *sync.Cond
currentBlockNum uint64
stopped bool
}

func NewBlockTracker(logger log.Logger, blockListener BlockListener) BlockTracker {
blockChangeMu := sync.Mutex{}
return BlockTracker{
logger: logger,
blockListener: blockListener,
blockChangeMu: &blockChangeMu,
blockChangeCond: sync.NewCond(&blockChangeMu),
}
}

func (bt BlockTracker) Run(ctx context.Context) error {
defer bt.logger.Info("block tracker stopped")
bt.logger.Info("running block tracker")

defer func() {
// make sure we wake up all waiters upon getting stopped
bt.blockChangeMu.Lock()
bt.stopped = true
bt.blockChangeCond.Broadcast()
bt.blockChangeMu.Unlock()
}()

blockEventC := make(chan BlockEvent)
unregisterBlockEventObserver := bt.blockListener.RegisterObserver(func(blockEvent BlockEvent) {
select {
case <-ctx.Done(): // no-op
case blockEventC <- blockEvent:
}
})

defer unregisterBlockEventObserver()
for {
select {
case <-ctx.Done():
return ctx.Err()
case blockEvent := <-blockEventC:
bt.blockChangeMu.Lock()
bt.currentBlockNum = blockEvent.LatestBlockNum
bt.blockChangeCond.Broadcast()
bt.blockChangeMu.Unlock()
}
}
}

func (bt BlockTracker) Wait(ctx context.Context, blockNum uint64) error {
done := make(chan struct{})
go func() {
defer close(done)

bt.blockChangeMu.Lock()
defer bt.blockChangeMu.Unlock()

for bt.currentBlockNum < blockNum && !bt.stopped && ctx.Err() == nil {
bt.blockChangeCond.Wait()
}
}()

select {
case <-ctx.Done():
// note the below will wake up all waiters prematurely, but thanks to the for loop condition
// in the waiting goroutine the ones that still need to wait will go back to sleep
bt.blockChangeCond.Broadcast()
return ctx.Err()
case <-done:
bt.blockChangeMu.Lock()
defer bt.blockChangeMu.Unlock()

if bt.currentBlockNum < blockNum && bt.stopped {
return errors.New("block tracker stopped")
}

return nil
}
}
5 changes: 5 additions & 0 deletions txnprovider/shutter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package shutter

import (
"crypto/ecdsa"
"time"

"github.com/holiman/uint256"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -44,6 +45,7 @@ type Config struct {
MaxPooledEncryptedTxns int
EncryptedGasLimit uint64
EncryptedTxnsLookBackDistance uint64
MaxDecryptionKeysDelay time.Duration
}

type P2pConfig struct {
Expand Down Expand Up @@ -98,6 +100,7 @@ var (
MaxPooledEncryptedTxns: defaultMaxPooledEncryptedTxns,
EncryptedGasLimit: defaultEncryptedGasLimit,
EncryptedTxnsLookBackDistance: defaultEncryptedTxnsLookBackDistance,
MaxDecryptionKeysDelay: defaultMaxDecryptionKeysDelay,
P2pConfig: P2pConfig{
ListenPort: defaultP2PListenPort,
BootstrapNodes: []string{
Expand All @@ -122,6 +125,7 @@ var (
MaxPooledEncryptedTxns: defaultMaxPooledEncryptedTxns,
EncryptedGasLimit: defaultEncryptedGasLimit,
EncryptedTxnsLookBackDistance: defaultEncryptedTxnsLookBackDistance,
MaxDecryptionKeysDelay: defaultMaxDecryptionKeysDelay,
P2pConfig: P2pConfig{
ListenPort: defaultP2PListenPort,
BootstrapNodes: []string{
Expand All @@ -139,4 +143,5 @@ const (
defaultMaxPooledEncryptedTxns = 10_000
defaultEncryptedGasLimit = 10_000_000
defaultEncryptedTxnsLookBackDistance = 128
defaultMaxDecryptionKeysDelay = 1_666 * time.Millisecond
)
2 changes: 2 additions & 0 deletions txnprovider/shutter/decryption_keys_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

//go:build !abigen

package shutter

import (
Expand Down
38 changes: 38 additions & 0 deletions txnprovider/shutter/internal/testhelpers/slot_calculator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 93 additions & 7 deletions txnprovider/shutter/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

//go:build !abigen

package shutter

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -35,11 +40,13 @@ type Pool struct {
config Config
secondaryTxnProvider txnprovider.TxnProvider
blockListener BlockListener
blockTracker BlockTracker
eonTracker EonTracker
decryptionKeysListener DecryptionKeysListener
decryptionKeysProcessor DecryptionKeysProcessor
encryptedTxnsPool *EncryptedTxnsPool
decryptedTxnsPool *DecryptedTxnsPool
slotCalculator SlotCalculator
}

func NewPool(
Expand All @@ -52,6 +59,7 @@ func NewPool(
logger = logger.New("component", "shutter")
slotCalculator := NewBeaconChainSlotCalculator(config.BeaconChainGenesisTimestamp, config.SecondsPerSlot)
blockListener := NewBlockListener(logger, stateChangesClient)
blockTracker := NewBlockTracker(logger, blockListener)
eonTracker := NewKsmEonTracker(logger, config, blockListener, contractBackend)
decryptionKeysValidator := NewDecryptionKeysExtendedValidator(logger, config, slotCalculator, eonTracker)
decryptionKeysListener := NewDecryptionKeysListener(logger, config, decryptionKeysValidator)
Expand All @@ -69,12 +77,14 @@ func NewPool(
logger: logger,
config: config,
blockListener: blockListener,
blockTracker: blockTracker,
eonTracker: eonTracker,
secondaryTxnProvider: secondaryTxnProvider,
decryptionKeysListener: decryptionKeysListener,
decryptionKeysProcessor: decryptionKeysProcessor,
encryptedTxnsPool: encryptedTxnsPool,
decryptedTxnsPool: decryptedTxnsPool,
slotCalculator: slotCalculator,
}
}

Expand All @@ -89,6 +99,7 @@ func (p Pool) Run(ctx context.Context) error {

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return p.blockListener.Run(ctx) })
eg.Go(func() error { return p.blockTracker.Run(ctx) })
eg.Go(func() error { return p.eonTracker.Run(ctx) })
eg.Go(func() error { return p.decryptionKeysListener.Run(ctx) })
eg.Go(func() error { return p.decryptionKeysProcessor.Run(ctx) })
Expand All @@ -97,11 +108,86 @@ func (p Pool) Run(ctx context.Context) error {
}

func (p Pool) ProvideTxns(ctx context.Context, opts ...txnprovider.ProvideOption) ([]types.Transaction, error) {
//
// TODO - implement shutter spec
// 1) fetch corresponding txns for current slot and fill the remaining gas
// with the secondary txn provider (devp2p)
// 2) if no decryption keys arrive for current slot then return empty transactions
//
return p.secondaryTxnProvider.ProvideTxns(ctx, opts...)
provideOpts := txnprovider.ApplyProvideOptions(opts...)
blockTime := provideOpts.BlockTime
if blockTime == 0 {
return nil, errors.New("block time option is required by the shutter provider")
}

parentBlockNum := provideOpts.ParentBlockNum
parentBlockWaitTime := time.Second * time.Duration(p.slotCalculator.SecondsPerSlot())
parentBlockWaitCtx, parentBlockWaitCtxCancel := context.WithTimeout(ctx, parentBlockWaitTime)
defer parentBlockWaitCtxCancel()
err := p.blockTracker.Wait(parentBlockWaitCtx, parentBlockNum)
if err != nil {
return nil, fmt.Errorf("issue while waiting for parent block %d: %w", parentBlockNum, err)
}

eon, ok := p.eonTracker.EonByBlockNum(parentBlockNum)
if !ok {
return nil, fmt.Errorf("unknown eon for block num %d", parentBlockNum)
}

slot, err := p.slotCalculator.CalcSlot(blockTime)
if err != nil {
return nil, err
}

decryptionMark := DecryptionMark{Slot: slot, Eon: eon.Index}
slotAge := p.slotCalculator.CalcSlotAge(slot)
keysWaitTime := p.config.MaxDecryptionKeysDelay - slotAge
decryptionWaitCtx, decryptionWaitCtxCancel := context.WithTimeout(ctx, keysWaitTime)
defer decryptionWaitCtxCancel()
err = p.decryptedTxnsPool.Wait(decryptionWaitCtx, decryptionMark)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn(
"decryption keys wait timeout, falling back to secondary txn provider",
"slot", slot,
"age", slotAge,
)

// Note: specs say to produce empty block in case decryption keys do not arrive on time.
// However, upon discussion with Shutter and Nethermind it was agreed that this is not
// practical at this point in time as it can hurt validator rewards across the network,
// and also it doesn't in any way prevent any cheating from happening.
// To properly address cheating, we need a mechanism for slashing which is a future
// work stream item for the Shutter team. For now, we follow what Nethermind does
// and fallback to the public devp2p mempool - any changes to this should be
// co-ordinated with them.
return p.secondaryTxnProvider.ProvideTxns(ctx, opts...)
}

return nil, err
}

return p.provide(ctx, decryptionMark, opts...)
}

func (p Pool) provide(ctx context.Context, mark DecryptionMark, opts ...txnprovider.ProvideOption) ([]types.Transaction, error) {
decryptedTxns, ok := p.decryptedTxnsPool.DecryptedTxns(mark)
if !ok {
return nil, fmt.Errorf("unexpected missing decrypted txns for mark: slot=%d, eon=%d", mark.Slot, mark.Eon)
}

decryptedTxnsGas := decryptedTxns.TotalGasLimit
provideOpts := txnprovider.ApplyProvideOptions(opts...)
totalGasTarget := provideOpts.GasTarget
if decryptedTxnsGas > totalGasTarget {
// note this should never happen because EncryptedGasLimit must always be <= gasLimit for a block
return nil, fmt.Errorf("decrypted txns gas gt target: %d > %d", decryptedTxnsGas, totalGasTarget)
}

if decryptedTxnsGas == totalGasTarget {
return decryptedTxns.Transactions, nil
}

remGasTarget := totalGasTarget - decryptedTxnsGas
opts = append(opts, txnprovider.WithGasTarget(remGasTarget)) // overrides option
additionalTxns, err := p.secondaryTxnProvider.ProvideTxns(ctx, opts...)
if err != nil {
return nil, err
}

return append(decryptedTxns.Transactions, additionalTxns...), nil
}
5 changes: 5 additions & 0 deletions txnprovider/shutter/slot_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type SlotCalculator interface {
CalcSlot(timestamp uint64) (uint64, error)
CalcSlotAge(slot uint64) time.Duration
CalcCurrentSlot() uint64
SecondsPerSlot() uint64
}

type BeaconChainSlotCalculator struct {
Expand Down Expand Up @@ -63,3 +64,7 @@ func (sc BeaconChainSlotCalculator) CalcCurrentSlot() uint64 {

return slot
}

func (sc BeaconChainSlotCalculator) SecondsPerSlot() uint64 {
return sc.secondsPerSlot
}

0 comments on commit 54548ac

Please sign in to comment.