Skip to content

Commit

Permalink
Merge branch 'master' into mach-fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed Jun 27, 2024
2 parents 4cfb80b + 3ecd01e commit a58b014
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 258 deletions.
14 changes: 1 addition & 13 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ var (
baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil)
blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil)
l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil)
l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil)
latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil)
blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil)
blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil)
blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil)
Expand Down Expand Up @@ -568,9 +566,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) {
} else {
suggestedTipCapGauge.Update(suggestedTipCap.Int64())
}
l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice()
l1GasPriceGauge.Update(int64(l1GasPrice))
l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1276,7 +1272,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
batchPosterDAFailureCounter.Inc(1)
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain)
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), config.DisableDapFallbackStoreDataOnChain)
if err != nil {
batchPosterDAFailureCounter.Inc(1)
return false, err
Expand Down Expand Up @@ -1369,14 +1365,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
"numBlobs", len(kzgBlobs),
)

surplus := arbmath.SaturatingMul(
arbmath.SaturatingSub(
l1GasPriceGauge.Snapshot().Value(),
l1GasPriceEstimateGauge.Snapshot().Value()),
int64(len(sequencerMsg)*16),
)
latestBatchSurplusGauge.Update(surplus)

recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3
postedMessages := b.building.msgCount - batchPosition.MessageCount
b.messagesPerBatch.Update(uint64(postedMessages))
Expand Down
11 changes: 0 additions & 11 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,17 +792,6 @@ func CreateNode(
return currentNode, nil
}

func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged)
}

func (n *Node) BacklogL1GasCharged() uint64 {
return n.TxStreamer.BacklogL1GasCharged()
}
func (n *Node) BacklogCallDataUnits() uint64 {
return n.TxStreamer.BacklogCallDataUnits()
}

func (n *Node) Start(ctx context.Context) error {
execClient, ok := n.Execution.(*gethexec.ExecutionNode)
if !ok {
Expand Down
122 changes: 1 addition & 121 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge

cachedL1PriceDataMutex sync.RWMutex
cachedL1PriceData *L1PriceData
}

type TransactionStreamerConfig struct {
Expand Down Expand Up @@ -118,9 +115,6 @@ func NewTransactionStreamer(
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
}
err := streamer.cleanupInconsistentState()
if err != nil {
Expand All @@ -129,20 +123,6 @@ func NewTransactionStreamer(
return streamer, nil
}

type L1PriceDataOfMsg struct {
callDataUnits uint64
cummulativeCallDataUnits uint64
l1GasCharged uint64
cummulativeL1GasCharged uint64
}

type L1PriceData struct {
startOfL1PriceDataCache arbutil.MessageIndex
endOfL1PriceDataCache arbutil.MessageIndex
msgToL1PriceData []L1PriceDataOfMsg
currentEstimateOfL1GasPrice uint64
}

// Represents a block's hash in the database.
// Necessary because RLP decoder doesn't produce nil values by default.
type blockHashDBValue struct {
Expand All @@ -153,106 +133,6 @@ const (
BlockHashMismatchLogMsg = "BlockHash from feed doesn't match locally computed hash. Check feed source."
)

func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

currentEstimate, err := s.exec.GetL1GasPriceEstimate()
if err != nil {
log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err)
} else {
s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate
}
return s.cachedL1PriceData.currentEstimateOfL1GasPrice
}

func (s *TransactionStreamer) BacklogCallDataUnits() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits +
s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits)
}

func (s *TransactionStreamer) BacklogL1GasCharged() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged +
s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged)
}

func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

if to < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("trying to trim older cache which doesnt exist anymore")
} else if to >= s.cachedL1PriceData.endOfL1PriceDataCache {
s.cachedL1PriceData.startOfL1PriceDataCache = 0
s.cachedL1PriceData.endOfL1PriceDataCache = 0
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{}
} else {
newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1
s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:]
s.cachedL1PriceData.startOfL1PriceDataCache = to + 1
}
}

func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

resetCache := func() {
s.cachedL1PriceData.startOfL1PriceDataCache = seqNum
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: l1GasCharged,
}}
}
size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 ||
s.cachedL1PriceData.startOfL1PriceDataCache == 0 ||
s.cachedL1PriceData.endOfL1PriceDataCache == 0 ||
arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 {
resetCache()
return
}
if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 {
if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 {
log.Info("message position higher then current end of l1 price data cache, resetting cache to this message")
resetCache()
} else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("message position lower than start of l1 price data cache, ignoring")
} else {
log.Info("message position already seen in l1 price data cache, ignoring")
}
} else {
cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits
cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged
s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged,
})
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
}
}

// Encodes a uint64 as bytes in a lexically sortable manner for database iteration.
// Generally this is only used for database keys, which need sorted.
// A shorter RLP encoding is usually used for database values.
Expand Down Expand Up @@ -773,7 +653,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m

if messagesAreConfirmed {
// Trim confirmed messages from l1pricedataCache
s.TrimCache(pos + arbutil.MessageIndex(len(messages)))
s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages)))
s.reorgMutex.RLock()
dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil)
s.reorgMutex.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion arbstate/daprovider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type DASReader interface {

type DASWriter interface {
// Store requests that the message be stored until timeout (UTC time in unix epoch seconds).
Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*DataAvailabilityCertificate, error)
Store(ctx context.Context, message []byte, timeout uint64) (*DataAvailabilityCertificate, error)
fmt.Stringer
}

Expand Down
5 changes: 2 additions & 3 deletions arbstate/daprovider/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Writer interface {
ctx context.Context,
message []byte,
timeout uint64,
sig []byte,
disableFallbackStoreDataOnChain bool,
) ([]byte, error)
}
Expand All @@ -32,8 +31,8 @@ type writerForDAS struct {
dasWriter DASWriter
}

func (d *writerForDAS) Store(ctx context.Context, message []byte, timeout uint64, sig []byte, disableFallbackStoreDataOnChain bool) ([]byte, error) {
cert, err := d.dasWriter.Store(ctx, message, timeout, []byte{})
func (d *writerForDAS) Store(ctx context.Context, message []byte, timeout uint64, disableFallbackStoreDataOnChain bool) ([]byte, error) {
cert, err := d.dasWriter.Store(ctx, message, timeout)
if errors.Is(err, ErrBatchToDasFailed) {
if disableFallbackStoreDataOnChain {
return nil, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
Expand Down
4 changes: 2 additions & 2 deletions cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func startClientStore(args []string) error {
if err != nil {
return err
}
cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()))
} else if len(config.Message) > 0 {
cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{})
cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()))
} else {
return errors.New("--message or --random-message-size must be specified")
}
Expand Down
33 changes: 3 additions & 30 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/pretty"
)

Expand Down Expand Up @@ -56,7 +55,6 @@ type Aggregator struct {
maxAllowedServiceStoreFailures int
keysetHash [32]byte
keysetBytes []byte
addrVerifier *contracts.AddressVerifier
}

type ServiceDetails struct {
Expand Down Expand Up @@ -124,11 +122,6 @@ func NewAggregatorWithSeqInboxCaller(
return nil, err
}

var addrVerifier *contracts.AddressVerifier
if seqInboxCaller != nil {
addrVerifier = contracts.NewAddressVerifier(seqInboxCaller)
}

return &Aggregator{
config: config.RPCAggregator,
services: services,
Expand All @@ -137,7 +130,6 @@ func NewAggregatorWithSeqInboxCaller(
maxAllowedServiceStoreFailures: config.RPCAggregator.AssumedHonest - 1,
keysetHash: keysetHash,
keysetBytes: keysetBytes,
addrVerifier: addrVerifier,
}, nil
}

Expand All @@ -160,27 +152,8 @@ type storeResponse struct {
//
// If Store gets not enough successful responses by the time its context is canceled
// (eg via TimeoutWrapper) then it also returns an error.
//
// If Sequencer Inbox contract details are provided when a das.Aggregator is
// constructed, calls to Store(...) will try to verify the passed-in data's signature
// is from the batch poster. If the contract details are not provided, then the
// signature is not checked, which is useful for testing.
func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig))
if a.addrVerifier != nil {
actualSigner, err := DasRecoverSigner(message, sig, timeout)
if err != nil {
return nil, err
}
isBatchPosterOrSequencer, err := a.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner)
if err != nil {
return nil, err
}
if !isBatchPosterOrSequencer {
return nil, errors.New("store request not properly signed")
}
}

func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0))
responses := make(chan storeResponse, len(a.services))

expectedHash := dastree.Hash(message)
Expand All @@ -195,7 +168,7 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64,
metrics.GetOrRegisterCounter(metricBase+"/error/all/total", nil).Inc(1)
}

cert, err := d.service.Store(storeCtx, message, timeout, sig)
cert, err := d.service.Store(storeCtx, message, timeout)
if err != nil {
incFailureMetric()
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
10 changes: 5 additions & 5 deletions das/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestDAS_BasicAggregationLocal(t *testing.T) {
Require(t, err)

rawMsg := []byte("It's time for you to see the fnords.")
cert, err := aggregator.Store(ctx, rawMsg, 0, []byte{})
cert, err := aggregator.Store(ctx, rawMsg, 0)
Require(t, err, "Error storing message")

for _, storageService := range storageServices {
Expand Down Expand Up @@ -123,17 +123,17 @@ type WrapStore struct {
DataAvailabilityServiceWriter
}

func (w *WrapStore) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) {
func (w *WrapStore) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
switch w.injector.shouldFail() {
case success:
return w.DataAvailabilityServiceWriter.Store(ctx, message, timeout, sig)
return w.DataAvailabilityServiceWriter.Store(ctx, message, timeout)
case immediateError:
return nil, errors.New("expected Store failure")
case tooSlow:
<-ctx.Done()
return nil, ctx.Err()
case dataCorruption:
cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout, sig)
cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) {
Require(t, err)

rawMsg := []byte("It's time for you to see the fnords.")
cert, err := aggregator.Store(ctx, rawMsg, 0, []byte{})
cert, err := aggregator.Store(ctx, rawMsg, 0)
if !shouldFailAggregation {
Require(t, err, "Error storing message")
} else {
Expand Down
2 changes: 1 addition & 1 deletion das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

type DataAvailabilityServiceWriter interface {
// Store requests that the message be stored until timeout (UTC time in unix epoch seconds).
Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error)
Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error)
fmt.Stringer
}

Expand Down
2 changes: 1 addition & 1 deletion das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) {
func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
timestamp := uint64(time.Now().Unix())
nChunks := uint64(len(message)) / c.chunkSize
lastChunkSize := uint64(len(message)) % c.chunkSize
Expand Down
Loading

0 comments on commit a58b014

Please sign in to comment.