diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 51db0b2019..1829ae29f5 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -58,8 +58,6 @@ var ( baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil) - l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil) - latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil) blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil) blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil) blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil) @@ -568,9 +566,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { } else { suggestedTipCapGauge.Update(suggestedTipCap.Int64()) } - l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice() l1GasPriceGauge.Update(int64(l1GasPrice)) - l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate)) case <-ctx.Done(): return } @@ -1276,7 +1272,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) batchPosterDAFailureCounter.Inc(1) return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce) } - sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain) + sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), config.DisableDapFallbackStoreDataOnChain) if err != nil { batchPosterDAFailureCounter.Inc(1) return false, err @@ -1369,14 +1365,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "numBlobs", len(kzgBlobs), ) - surplus := arbmath.SaturatingMul( - arbmath.SaturatingSub( - l1GasPriceGauge.Snapshot().Value(), - l1GasPriceEstimateGauge.Snapshot().Value()), - int64(len(sequencerMsg)*16), - ) - latestBatchSurplusGauge.Update(surplus) - recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount b.messagesPerBatch.Update(uint64(postedMessages)) diff --git a/arbnode/node.go b/arbnode/node.go index 5592574823..1fae09c108 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -792,17 +792,6 @@ func CreateNode( return currentNode, nil } -func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { - n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged) -} - -func (n *Node) BacklogL1GasCharged() uint64 { - return n.TxStreamer.BacklogL1GasCharged() -} -func (n *Node) BacklogCallDataUnits() uint64 { - return n.TxStreamer.BacklogCallDataUnits() -} - func (n *Node) Start(ctx context.Context) error { execClient, ok := n.Execution.(*gethexec.ExecutionNode) if !ok { diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index c948bd8169..5c02129ee6 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -69,9 +69,6 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge - - cachedL1PriceDataMutex sync.RWMutex - cachedL1PriceData *L1PriceData } type TransactionStreamerConfig struct { @@ -118,9 +115,6 @@ func NewTransactionStreamer( fatalErrChan: fatalErrChan, config: config, snapSyncConfig: snapSyncConfig, - cachedL1PriceData: &L1PriceData{ - msgToL1PriceData: []L1PriceDataOfMsg{}, - }, } err := streamer.cleanupInconsistentState() if err != nil { @@ -129,20 +123,6 @@ func NewTransactionStreamer( return streamer, nil } -type L1PriceDataOfMsg struct { - callDataUnits uint64 - cummulativeCallDataUnits uint64 - l1GasCharged uint64 - cummulativeL1GasCharged uint64 -} - -type L1PriceData struct { - startOfL1PriceDataCache arbutil.MessageIndex - endOfL1PriceDataCache arbutil.MessageIndex - msgToL1PriceData []L1PriceDataOfMsg - currentEstimateOfL1GasPrice uint64 -} - // Represents a block's hash in the database. // Necessary because RLP decoder doesn't produce nil values by default. type blockHashDBValue struct { @@ -153,106 +133,6 @@ const ( BlockHashMismatchLogMsg = "BlockHash from feed doesn't match locally computed hash. Check feed source." ) -func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { - s.cachedL1PriceDataMutex.Lock() - defer s.cachedL1PriceDataMutex.Unlock() - - currentEstimate, err := s.exec.GetL1GasPriceEstimate() - if err != nil { - log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err) - } else { - s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate - } - return s.cachedL1PriceData.currentEstimateOfL1GasPrice -} - -func (s *TransactionStreamer) BacklogCallDataUnits() uint64 { - s.cachedL1PriceDataMutex.RLock() - defer s.cachedL1PriceDataMutex.RUnlock() - - size := len(s.cachedL1PriceData.msgToL1PriceData) - if size == 0 { - return 0 - } - return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - - s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits + - s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits) -} - -func (s *TransactionStreamer) BacklogL1GasCharged() uint64 { - s.cachedL1PriceDataMutex.RLock() - defer s.cachedL1PriceDataMutex.RUnlock() - - size := len(s.cachedL1PriceData.msgToL1PriceData) - if size == 0 { - return 0 - } - return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - - s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged + - s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) -} - -func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) { - s.cachedL1PriceDataMutex.Lock() - defer s.cachedL1PriceDataMutex.Unlock() - - if to < s.cachedL1PriceData.startOfL1PriceDataCache { - log.Info("trying to trim older cache which doesnt exist anymore") - } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { - s.cachedL1PriceData.startOfL1PriceDataCache = 0 - s.cachedL1PriceData.endOfL1PriceDataCache = 0 - s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{} - } else { - newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1 - s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:] - s.cachedL1PriceData.startOfL1PriceDataCache = to + 1 - } -} - -func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { - s.cachedL1PriceDataMutex.Lock() - defer s.cachedL1PriceDataMutex.Unlock() - - resetCache := func() { - s.cachedL1PriceData.startOfL1PriceDataCache = seqNum - s.cachedL1PriceData.endOfL1PriceDataCache = seqNum - s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{ - callDataUnits: callDataUnits, - cummulativeCallDataUnits: callDataUnits, - l1GasCharged: l1GasCharged, - cummulativeL1GasCharged: l1GasCharged, - }} - } - size := len(s.cachedL1PriceData.msgToL1PriceData) - if size == 0 || - s.cachedL1PriceData.startOfL1PriceDataCache == 0 || - s.cachedL1PriceData.endOfL1PriceDataCache == 0 || - arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 { - resetCache() - return - } - if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 { - if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 { - log.Info("message position higher then current end of l1 price data cache, resetting cache to this message") - resetCache() - } else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache { - log.Info("message position lower than start of l1 price data cache, ignoring") - } else { - log.Info("message position already seen in l1 price data cache, ignoring") - } - } else { - cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{ - callDataUnits: callDataUnits, - cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits, - l1GasCharged: l1GasCharged, - cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged, - }) - s.cachedL1PriceData.endOfL1PriceDataCache = seqNum - } -} - // Encodes a uint64 as bytes in a lexically sortable manner for database iteration. // Generally this is only used for database keys, which need sorted. // A shorter RLP encoding is usually used for database values. @@ -773,7 +653,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache - s.TrimCache(pos + arbutil.MessageIndex(len(messages))) + s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages))) s.reorgMutex.RLock() dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil) s.reorgMutex.RUnlock() diff --git a/arbstate/daprovider/util.go b/arbstate/daprovider/util.go index 054bde5503..7d8f1a404f 100644 --- a/arbstate/daprovider/util.go +++ b/arbstate/daprovider/util.go @@ -30,7 +30,7 @@ type DASReader interface { type DASWriter interface { // Store requests that the message be stored until timeout (UTC time in unix epoch seconds). - Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*DataAvailabilityCertificate, error) + Store(ctx context.Context, message []byte, timeout uint64) (*DataAvailabilityCertificate, error) fmt.Stringer } diff --git a/arbstate/daprovider/writer.go b/arbstate/daprovider/writer.go index 75b356c4b8..a26e53c94d 100644 --- a/arbstate/daprovider/writer.go +++ b/arbstate/daprovider/writer.go @@ -17,7 +17,6 @@ type Writer interface { ctx context.Context, message []byte, timeout uint64, - sig []byte, disableFallbackStoreDataOnChain bool, ) ([]byte, error) } @@ -32,8 +31,8 @@ type writerForDAS struct { dasWriter DASWriter } -func (d *writerForDAS) Store(ctx context.Context, message []byte, timeout uint64, sig []byte, disableFallbackStoreDataOnChain bool) ([]byte, error) { - cert, err := d.dasWriter.Store(ctx, message, timeout, []byte{}) +func (d *writerForDAS) Store(ctx context.Context, message []byte, timeout uint64, disableFallbackStoreDataOnChain bool) ([]byte, error) { + cert, err := d.dasWriter.Store(ctx, message, timeout) if errors.Is(err, ErrBatchToDasFailed) { if disableFallbackStoreDataOnChain { return nil, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index cdea134bcb..4017457ba9 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -166,9 +166,9 @@ func startClientStore(args []string) error { if err != nil { return err } - cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, message, uint64(time.Now().Add(config.DASRetentionPeriod).Unix())) } else if len(config.Message) > 0 { - cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) + cert, err = client.Store(ctx, []byte(config.Message), uint64(time.Now().Add(config.DASRetentionPeriod).Unix())) } else { return errors.New("--message or --random-message-size must be specified") } diff --git a/das/aggregator.go b/das/aggregator.go index 25db73a76e..f82174fb1c 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -22,7 +22,6 @@ import ( "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/das/dastree" "github.com/offchainlabs/nitro/solgen/go/bridgegen" - "github.com/offchainlabs/nitro/util/contracts" "github.com/offchainlabs/nitro/util/pretty" ) @@ -56,7 +55,6 @@ type Aggregator struct { maxAllowedServiceStoreFailures int keysetHash [32]byte keysetBytes []byte - addrVerifier *contracts.AddressVerifier } type ServiceDetails struct { @@ -124,11 +122,6 @@ func NewAggregatorWithSeqInboxCaller( return nil, err } - var addrVerifier *contracts.AddressVerifier - if seqInboxCaller != nil { - addrVerifier = contracts.NewAddressVerifier(seqInboxCaller) - } - return &Aggregator{ config: config.RPCAggregator, services: services, @@ -137,7 +130,6 @@ func NewAggregatorWithSeqInboxCaller( maxAllowedServiceStoreFailures: config.RPCAggregator.AssumedHonest - 1, keysetHash: keysetHash, keysetBytes: keysetBytes, - addrVerifier: addrVerifier, }, nil } @@ -160,27 +152,8 @@ type storeResponse struct { // // If Store gets not enough successful responses by the time its context is canceled // (eg via TimeoutWrapper) then it also returns an error. -// -// If Sequencer Inbox contract details are provided when a das.Aggregator is -// constructed, calls to Store(...) will try to verify the passed-in data's signature -// is from the batch poster. If the contract details are not provided, then the -// signature is not checked, which is useful for testing. -func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { - log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig)) - if a.addrVerifier != nil { - actualSigner, err := DasRecoverSigner(message, sig, timeout) - if err != nil { - return nil, err - } - isBatchPosterOrSequencer, err := a.addrVerifier.IsBatchPosterOrSequencer(ctx, actualSigner) - if err != nil { - return nil, err - } - if !isBatchPosterOrSequencer { - return nil, errors.New("store request not properly signed") - } - } - +func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { + log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0)) responses := make(chan storeResponse, len(a.services)) expectedHash := dastree.Hash(message) @@ -195,7 +168,7 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64, metrics.GetOrRegisterCounter(metricBase+"/error/all/total", nil).Inc(1) } - cert, err := d.service.Store(storeCtx, message, timeout, sig) + cert, err := d.service.Store(storeCtx, message, timeout) if err != nil { incFailureMetric() if errors.Is(err, context.DeadlineExceeded) { diff --git a/das/aggregator_test.go b/das/aggregator_test.go index 728db6cf50..4bc209513e 100644 --- a/das/aggregator_test.go +++ b/das/aggregator_test.go @@ -54,7 +54,7 @@ func TestDAS_BasicAggregationLocal(t *testing.T) { Require(t, err) rawMsg := []byte("It's time for you to see the fnords.") - cert, err := aggregator.Store(ctx, rawMsg, 0, []byte{}) + cert, err := aggregator.Store(ctx, rawMsg, 0) Require(t, err, "Error storing message") for _, storageService := range storageServices { @@ -123,17 +123,17 @@ type WrapStore struct { DataAvailabilityServiceWriter } -func (w *WrapStore) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { +func (w *WrapStore) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { switch w.injector.shouldFail() { case success: - return w.DataAvailabilityServiceWriter.Store(ctx, message, timeout, sig) + return w.DataAvailabilityServiceWriter.Store(ctx, message, timeout) case immediateError: return nil, errors.New("expected Store failure") case tooSlow: <-ctx.Done() return nil, ctx.Err() case dataCorruption: - cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout, sig) + cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout) if err != nil { return nil, err } @@ -214,7 +214,7 @@ func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) { Require(t, err) rawMsg := []byte("It's time for you to see the fnords.") - cert, err := aggregator.Store(ctx, rawMsg, 0, []byte{}) + cert, err := aggregator.Store(ctx, rawMsg, 0) if !shouldFailAggregation { Require(t, err, "Error storing message") } else { diff --git a/das/das.go b/das/das.go index fea1e6c6a2..5528323a9c 100644 --- a/das/das.go +++ b/das/das.go @@ -20,7 +20,7 @@ import ( type DataAvailabilityServiceWriter interface { // Store requests that the message be stored until timeout (UTC time in unix epoch seconds). - Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) + Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) fmt.Stringer } diff --git a/das/dasRpcClient.go b/das/dasRpcClient.go index 8d8db02ff4..ca2ee8e7d4 100644 --- a/das/dasRpcClient.go +++ b/das/dasRpcClient.go @@ -57,7 +57,7 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu }, nil } -func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) { +func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { timestamp := uint64(time.Now().Unix()) nChunks := uint64(len(message)) / c.chunkSize lastChunkSize := uint64(len(message)) % c.chunkSize diff --git a/das/dasRpcServer.go b/das/dasRpcServer.go index 1e5c95089f..9e6228ca5d 100644 --- a/das/dasRpcServer.go +++ b/das/dasRpcServer.go @@ -125,7 +125,7 @@ func (s *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout return nil, err } - cert, err := s.daWriter.Store(ctx, message, uint64(timeout), nil) + cert, err := s.daWriter.Store(ctx, message, uint64(timeout)) if err != nil { return nil, err } @@ -325,7 +325,7 @@ func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, batchId hexutil.U return nil, err } - cert, err := s.daWriter.Store(ctx, message, timeout, nil) + cert, err := s.daWriter.Store(ctx, message, timeout) success := false defer func() { if success { diff --git a/das/das_test.go b/das/das_test.go index 950b63d9d9..c52616fe20 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -56,7 +56,7 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { timeout := uint64(time.Now().Add(time.Hour * 24).Unix()) messageSaved := []byte("hello world") - cert, err := daWriter.Store(firstCtx, messageSaved, timeout, []byte{}) + cert, err := daWriter.Store(firstCtx, messageSaved, timeout) Require(t, err, "Error storing message") if cert.Timeout != timeout { Fail(t, fmt.Sprintf("Expected timeout of %d in cert, was %d", timeout, cert.Timeout)) @@ -145,7 +145,7 @@ func testDASMissingMessage(t *testing.T, storageType string) { messageSaved := []byte("hello world") timeout := uint64(time.Now().Add(time.Hour * 24).Unix()) - cert, err := daWriter.Store(ctx, messageSaved, timeout, []byte{}) + cert, err := daWriter.Store(ctx, messageSaved, timeout) Require(t, err, "Error storing message") if cert.Timeout != timeout { Fail(t, fmt.Sprintf("Expected timeout of %d in cert, was %d", timeout, cert.Timeout)) diff --git a/das/extra_signature_checker_test.go b/das/extra_signature_checker_test.go index 9fdf86cdf5..11c218ae03 100644 --- a/das/extra_signature_checker_test.go +++ b/das/extra_signature_checker_test.go @@ -5,25 +5,19 @@ package das import ( "bytes" - "context" "encoding/hex" "errors" "io/ioutil" "testing" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/util/signature" ) -type StubSignatureCheckDAS struct { - keyDir string -} - -func (s *StubSignatureCheckDAS) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { - pubkeyEncoded, err := ioutil.ReadFile(s.keyDir + "/ecdsa.pub") +func checkSig(keyDir string, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { + pubkeyEncoded, err := ioutil.ReadFile(keyDir + "/ecdsa.pub") if err != nil { return nil, err } @@ -39,22 +33,6 @@ func (s *StubSignatureCheckDAS) Store(ctx context.Context, message []byte, timeo return nil, nil } -func (s *StubSignatureCheckDAS) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { - return daprovider.KeepForever, nil -} - -func (s *StubSignatureCheckDAS) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - return []byte{}, nil -} - -func (s *StubSignatureCheckDAS) HealthCheck(ctx context.Context) error { - return nil -} - -func (s *StubSignatureCheckDAS) String() string { - return "StubSignatureCheckDAS" -} - func TestExtraSignatureCheck(t *testing.T) { keyDir := t.TempDir() err := GenerateAndStoreECDSAKeys(keyDir) @@ -64,13 +42,11 @@ func TestExtraSignatureCheck(t *testing.T) { Require(t, err) signer := signature.DataSignerFromPrivateKey(privateKey) - var da DataAvailabilityServiceWriter = &StubSignatureCheckDAS{keyDir} - msg := []byte("Hello world") timeout := uint64(1234) sig, err := applyDasSigner(signer, msg, timeout) Require(t, err) - _, err = da.Store(context.Background(), msg, timeout, sig) + _, err = checkSig(keyDir, msg, timeout, sig) Require(t, err) } diff --git a/das/panic_wrapper.go b/das/panic_wrapper.go index dbb61cba96..3530cb651d 100644 --- a/das/panic_wrapper.go +++ b/das/panic_wrapper.go @@ -26,8 +26,8 @@ func (w *WriterPanicWrapper) String() string { return fmt.Sprintf("WriterPanicWrapper{%v}", w.DataAvailabilityServiceWriter) } -func (w *WriterPanicWrapper) Store(ctx context.Context, message []byte, timeout uint64, sig []byte) (*daprovider.DataAvailabilityCertificate, error) { - cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout, sig) +func (w *WriterPanicWrapper) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { + cert, err := w.DataAvailabilityServiceWriter.Store(ctx, message, timeout) if err != nil { panic(fmt.Sprintf("panic wrapper Store: %v", err)) } diff --git a/das/rpc_test.go b/das/rpc_test.go index 5f97ef8828..d3c99e6367 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -96,7 +96,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool) { runStore := func() { defer wg.Done() msg := testhelpers.RandomizeSlice(make([]byte, size)) - cert, err := rpcAgg.Store(ctx, msg, 0, nil) + cert, err := rpcAgg.Store(ctx, msg, 0) testhelpers.RequireImpl(t, err) retrievedMessage, err := storageService.GetByHash(ctx, cert.DataHash) diff --git a/das/sign_after_store_das_writer.go b/das/sign_after_store_das_writer.go index ab6ac91cef..0e31d30ae9 100644 --- a/das/sign_after_store_das_writer.go +++ b/das/sign_after_store_das_writer.go @@ -104,10 +104,8 @@ func NewSignAfterStoreDASWriter(ctx context.Context, config DataAvailabilityConf }, nil } -func (d *SignAfterStoreDASWriter) Store( - ctx context.Context, message []byte, timeout uint64, sig []byte, -) (c *daprovider.DataAvailabilityCertificate, err error) { - log.Trace("das.SignAfterStoreDASWriter.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "sig", pretty.FirstFewBytes(sig), "this", d) +func (d *SignAfterStoreDASWriter) Store(ctx context.Context, message []byte, timeout uint64) (c *daprovider.DataAvailabilityCertificate, err error) { + log.Trace("das.SignAfterStoreDASWriter.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0), "this", d) c = &daprovider.DataAvailabilityCertificate{ Timeout: timeout, DataHash: dastree.Hash(message), diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 083f59dfab..95b865df5a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -46,6 +46,7 @@ import ( ) var ( + l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/l1gasprice/estimate", nil) baseFeeGauge = metrics.NewRegisteredGauge("arb/block/basefee", nil) blockGasUsedHistogram = metrics.NewRegisteredHistogram("arb/block/gasused", nil, metrics.NewBoundedHistogramSample()) txCountHistogram = metrics.NewRegisteredHistogram("arb/block/transactions/count", nil, metrics.NewBoundedHistogramSample()) @@ -53,6 +54,20 @@ var ( gasUsedSinceStartupCounter = metrics.NewRegisteredCounter("arb/gas_used", nil) ) +type L1PriceDataOfMsg struct { + callDataUnits uint64 + cummulativeCallDataUnits uint64 + l1GasCharged uint64 + cummulativeL1GasCharged uint64 +} + +type L1PriceData struct { + mutex sync.RWMutex + startOfL1PriceDataCache arbutil.MessageIndex + endOfL1PriceDataCache arbutil.MessageIndex + msgToL1PriceData []L1PriceDataOfMsg +} + type ExecutionEngine struct { stopwaiter.StopWaiter @@ -72,17 +87,69 @@ type ExecutionEngine struct { reorgSequencing bool prefetchBlock bool + + cachedL1PriceData *L1PriceData +} + +func NewL1PriceData() *L1PriceData { + return &L1PriceData{ + msgToL1PriceData: []L1PriceDataOfMsg{}, + } } func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { return &ExecutionEngine{ - bc: bc, - resequenceChan: make(chan []*arbostypes.MessageWithMetadata), - newBlockNotifier: make(chan struct{}, 1), + bc: bc, + resequenceChan: make(chan []*arbostypes.MessageWithMetadata), + newBlockNotifier: make(chan struct{}, 1), + cachedL1PriceData: NewL1PriceData(), }, nil } -func (n *ExecutionEngine) Initialize(rustCacheSize uint32) { +func (s *ExecutionEngine) backlogCallDataUnits() uint64 { + s.cachedL1PriceData.mutex.RLock() + defer s.cachedL1PriceData.mutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits + + s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits) +} + +func (s *ExecutionEngine) backlogL1GasCharged() uint64 { + s.cachedL1PriceData.mutex.RLock() + defer s.cachedL1PriceData.mutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged + + s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) +} + +func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) { + s.cachedL1PriceData.mutex.Lock() + defer s.cachedL1PriceData.mutex.Unlock() + + if to < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("trying to trim older cache which doesnt exist anymore") + } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { + s.cachedL1PriceData.startOfL1PriceDataCache = 0 + s.cachedL1PriceData.endOfL1PriceDataCache = 0 + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{} + } else { + newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1 + s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:] + s.cachedL1PriceData.startOfL1PriceDataCache = to + 1 + } +} + +func (s *ExecutionEngine) Initialize(rustCacheSize uint32) { if rustCacheSize != 0 { programs.ResizeWasmLruCache(rustCacheSize) } @@ -456,8 +523,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. if err != nil { return nil, err } - - s.cacheL1PriceDataOfMsg(pos, receipts, block) + s.cacheL1PriceDataOfMsg(pos, receipts, block, false) return block, nil } @@ -477,7 +543,7 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp expectedDelayed := currentHeader.Nonce.Uint64() - lastMsg, err := s.BlockNumberToMessageIndex(currentHeader.Number.Uint64()) + pos, err := s.BlockNumberToMessageIndex(currentHeader.Number.Uint64() + 1) if err != nil { return nil, err } @@ -503,7 +569,7 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp return nil, err } - err = s.consensus.WriteMessageFromSequencer(lastMsg+1, messageWithMeta, *msgResult) + err = s.consensus.WriteMessageFromSequencer(pos, messageWithMeta, *msgResult) if err != nil { return nil, err } @@ -512,8 +578,9 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp if err != nil { return nil, err } + s.cacheL1PriceDataOfMsg(pos, receipts, block, true) - log.Info("ExecutionEngine: Added DelayedMessages", "pos", lastMsg+1, "delayed", delayedSeqNum, "block-header", block.Header()) + log.Info("ExecutionEngine: Added DelayedMessages", "pos", pos, "delayed", delayedSeqNum, "block-header", block.Header()) return block, nil } @@ -600,6 +667,7 @@ func (s *ExecutionEngine) appendBlock(block *types.Block, statedb *state.StateDB } blockGasUsedHistogram.Update(int64(blockGasused)) gasUsedSinceStartupCounter.Inc(int64(blockGasused)) + s.updateL1GasPriceEstimateMetric() return nil } @@ -618,22 +686,25 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } -func (s *ExecutionEngine) GetL1GasPriceEstimate() (uint64, error) { +func (s *ExecutionEngine) updateL1GasPriceEstimateMetric() { bc := s.bc latestHeader := bc.CurrentBlock() latestState, err := bc.StateAt(latestHeader.Root) if err != nil { - return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + log.Error("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + return } arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) if err != nil { - return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + log.Error("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + return } l2EstimateL1GasPrice, err := arbState.L1PricingState().PricePerUnit() if err != nil { - return 0, errors.New("error fetching l2 Estimate of L1 GasPrice") + log.Error("error fetching l2 Estimate of L1 GasPrice") + return } - return l2EstimateL1GasPrice.Uint64(), nil + l1GasPriceEstimateGauge.Update(l2EstimateL1GasPrice.Int64()) } func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { @@ -654,17 +725,65 @@ func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { return surplus.Int64(), nil } -func (s *ExecutionEngine) cacheL1PriceDataOfMsg(num arbutil.MessageIndex, receipts types.Receipts, block *types.Block) { +func (s *ExecutionEngine) cacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, receipts types.Receipts, block *types.Block, blockBuiltUsingDelayedMessage bool) { var gasUsedForL1 uint64 - for i := 1; i < len(receipts); i++ { - gasUsedForL1 += receipts[i].GasUsedForL1 - } - gasChargedForL1 := gasUsedForL1 * block.BaseFee().Uint64() var callDataUnits uint64 - for _, tx := range block.Transactions() { - callDataUnits += tx.CalldataUnits + if !blockBuiltUsingDelayedMessage { + // s.cachedL1PriceData tracks L1 price data for messages posted by Nitro, + // so delayed messages should not update cummulative values kept on it. + + // First transaction in every block is an Arbitrum internal transaction, + // so we skip it here. + for i := 1; i < len(receipts); i++ { + gasUsedForL1 += receipts[i].GasUsedForL1 + } + for _, tx := range block.Transactions() { + callDataUnits += tx.CalldataUnits + } + } + l1GasCharged := gasUsedForL1 * block.BaseFee().Uint64() + + s.cachedL1PriceData.mutex.Lock() + defer s.cachedL1PriceData.mutex.Unlock() + + resetCache := func() { + s.cachedL1PriceData.startOfL1PriceDataCache = seqNum + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: l1GasCharged, + }} + } + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 || + s.cachedL1PriceData.startOfL1PriceDataCache == 0 || + s.cachedL1PriceData.endOfL1PriceDataCache == 0 || + arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 { + resetCache() + return + } + if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 { + if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 { + log.Info("message position higher then current end of l1 price data cache, resetting cache to this message") + resetCache() + } else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("message position lower than start of l1 price data cache, ignoring") + } else { + log.Info("message position already seen in l1 price data cache, ignoring") + } + } else { + cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits + cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged + s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged, + }) + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum } - s.consensus.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1) } // DigestMessage is used to create a block by executing msg against the latest state and storing it. @@ -712,6 +831,7 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, if err != nil { return nil, err } + s.cacheL1PriceDataOfMsg(num, receipts, block, false) if time.Now().After(s.nextScheduledVersionCheck) { s.nextScheduledVersionCheck = time.Now().Add(time.Minute) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index eb0d39d253..cb2bfe12e8 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -277,8 +277,8 @@ func CreateExecutionNode( } -func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) { - return n.ExecEngine.GetL1GasPriceEstimate() +func (n *ExecutionNode) MarkFeedStart(to arbutil.MessageIndex) { + n.ExecEngine.MarkFeedStart(to) } func (n *ExecutionNode) Initialize(ctx context.Context) error { diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 189261b95b..2bace9b677 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -1058,8 +1058,8 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) { if err != nil { return 0, fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) } - backlogL1GasCharged := int64(s.execEngine.consensus.BacklogL1GasCharged()) - backlogCallDataUnits := int64(s.execEngine.consensus.BacklogCallDataUnits()) + backlogL1GasCharged := int64(s.execEngine.backlogL1GasCharged()) + backlogCallDataUnits := int64(s.execEngine.backlogCallDataUnits()) expectedSurplus := int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) // update metrics l1GasPriceGauge.Update(int64(l1GasPrice)) diff --git a/execution/interface.go b/execution/interface.go index 66aefe9a5e..32ec7dd0f7 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -56,7 +56,7 @@ type ExecutionSequencer interface { ForwardTo(url string) error SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) - GetL1GasPriceEstimate() (uint64, error) + MarkFeedStart(to arbutil.MessageIndex) } type FullExecutionClient interface { @@ -94,9 +94,6 @@ type ConsensusInfo interface { type ConsensusSequencer interface { WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult MessageResult) error ExpectChosenSequencer() error - CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) - BacklogL1GasCharged() uint64 - BacklogCallDataUnits() uint64 } type FullConsensusClient interface { diff --git a/util/headerreader/blob_client.go b/util/headerreader/blob_client.go index 73849d0d3a..2b47a940c3 100644 --- a/util/headerreader/blob_client.go +++ b/util/headerreader/blob_client.go @@ -229,10 +229,11 @@ func (b *BlobClient) blobSidecars(ctx context.Context, slot uint64, versionedHas var found bool for outputIdx = range versionedHashes { if versionedHashes[outputIdx] == versionedHash { - found = true if outputsFound[outputIdx] { - return nil, fmt.Errorf("found blob with versioned hash %v twice", versionedHash) + // Duplicate, skip this one + break } + found = true outputsFound[outputIdx] = true break }