diff --git a/Dockerfile b/Dockerfile index 2863b76dd1..01d9ef37f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -255,7 +255,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ apt-get install -y \ ca-certificates \ - wabt && \ + wabt \ + sysstat && \ /usr/sbin/update-ca-certificates && \ useradd -s /bin/bash user && \ mkdir -p /home/user/l1keystore && \ diff --git a/Makefile b/Makefile index 53b89c8d72..dc9b4e3ddf 100644 --- a/Makefile +++ b/Makefile @@ -162,6 +162,7 @@ test-go-deps: \ build-replay-env \ $(stylus_test_wasms) \ $(arbitrator_stylus_lib) \ + $(arbitrator_generated_header) \ $(patsubst %,$(arbitrator_cases)/%.wasm, global-state read-inboxmsg-10 global-state-wrapper const) build-prover-header: $(arbitrator_generated_header) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 252d7c9b7d..ef4acd038c 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -37,6 +37,11 @@ type execClientWrapper struct { func (w *execClientWrapper) Pause() { w.t.Error("not supported") } func (w *execClientWrapper) Activate() { w.t.Error("not supported") } func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil } +func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false } +func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} { + w.t.Error("not supported") + return nil +} func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) { chainConfig := params.ArbitrumDevTestChainConfig() diff --git a/arbnode/node.go b/arbnode/node.go index 1fae09c108..a2ccdd1834 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -523,14 +523,15 @@ func createNodeImpl( var daWriter das.DataAvailabilityServiceWriter var daReader das.DataAvailabilityServiceReader var dasLifecycleManager *das.LifecycleManager + var dasKeysetFetcher *das.KeysetFetcher if config.DataAvailability.Enable { if config.BatchPoster.Enable { - daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox) + daWriter, daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox) if err != nil { return nil, err } } else { - daReader, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox) + daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox) if err != nil { return nil, err } @@ -554,7 +555,7 @@ func createNodeImpl( } var dapReaders []daprovider.Reader if daReader != nil { - dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader)) + dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader, dasKeysetFetcher)) } if blobReader != nil { dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader)) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ecf38ddf42..cdf1011b11 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -39,7 +39,6 @@ type SeqCoordinator struct { redisutil.RedisCoordinator - sync *SyncMonitor streamer *TransactionStreamer sequencer execution.ExecutionSequencer delayedSequencer *DelayedSequencer @@ -150,7 +149,6 @@ func NewSeqCoordinator( } coordinator := &SeqCoordinator{ RedisCoordinator: *redisCoordinator, - sync: sync, streamer: streamer, sequencer: sequencer, config: config, @@ -607,9 +605,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { return c.noRedisError() } - syncProgress := c.sync.SyncProgressMap() - synced := len(syncProgress) == 0 + // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago + synced := c.sequencer.Synced() if !synced { + syncProgress := c.sequencer.FullSyncProgressMap() var detailsList []interface{} for key, value := range syncProgress { detailsList = append(detailsList, key, value) @@ -849,7 +848,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) { defer c.wantsLockoutMutex.Unlock() c.avoidLockout-- log.Info("seeking lockout", "myUrl", c.config.Url()) - if c.sync.Synced() { + if c.sequencer.Synced() { // Even if this errors we still internally marked ourselves as wanting the lockout err := c.wantsLockoutUpdateWithMutex(ctx) if err != nil { diff --git a/arbos/programs/api.go b/arbos/programs/api.go index c8241a72b5..787f127ea4 100644 --- a/arbos/programs/api.go +++ b/arbos/programs/api.go @@ -266,6 +266,10 @@ func newApiClosures( } captureHostio := func(name string, args, outs []byte, startInk, endInk uint64) { tracingInfo.Tracer.CaptureStylusHostio(name, args, outs, startInk, endInk) + if name == "evm_gas_left" || name == "evm_ink_left" { + tracingInfo.Tracer.CaptureState(0, vm.GAS, 0, 0, scope, []byte{}, depth, nil) + tracingInfo.Tracer.CaptureState(0, vm.POP, 0, 0, scope, []byte{}, depth, nil) + } } return func(req RequestType, input []byte) ([]byte, []byte, uint64) { diff --git a/arbstate/daprovider/reader.go b/arbstate/daprovider/reader.go index 560af3af1d..488b156454 100644 --- a/arbstate/daprovider/reader.go +++ b/arbstate/daprovider/reader.go @@ -30,12 +30,16 @@ type Reader interface { // NewReaderForDAS is generally meant to be only used by nitro. // DA Providers should implement methods in the Reader interface independently -func NewReaderForDAS(dasReader DASReader) *readerForDAS { - return &readerForDAS{dasReader: dasReader} +func NewReaderForDAS(dasReader DASReader, keysetFetcher DASKeysetFetcher) *readerForDAS { + return &readerForDAS{ + dasReader: dasReader, + keysetFetcher: keysetFetcher, + } } type readerForDAS struct { - dasReader DASReader + dasReader DASReader + keysetFetcher DASKeysetFetcher } func (d *readerForDAS) IsValidHeaderByte(headerByte byte) bool { @@ -50,7 +54,7 @@ func (d *readerForDAS) RecoverPayloadFromBatch( preimageRecorder PreimageRecorder, validateSeqMsg bool, ) ([]byte, error) { - return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, preimageRecorder, validateSeqMsg) + return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, d.keysetFetcher, preimageRecorder, validateSeqMsg) } // NewReaderForBlobReader is generally meant to be only used by nitro. diff --git a/arbstate/daprovider/util.go b/arbstate/daprovider/util.go index 7d8f1a404f..8f880b9228 100644 --- a/arbstate/daprovider/util.go +++ b/arbstate/daprovider/util.go @@ -34,6 +34,10 @@ type DASWriter interface { fmt.Stringer } +type DASKeysetFetcher interface { + GetKeysetByHash(context.Context, common.Hash) ([]byte, error) +} + type BlobReader interface { GetBlobs( ctx context.Context, @@ -138,6 +142,7 @@ func RecoverPayloadFromDasBatch( batchNum uint64, sequencerMsg []byte, dasReader DASReader, + keysetFetcher DASKeysetFetcher, preimageRecorder PreimageRecorder, validateSeqMsg bool, ) ([]byte, error) { @@ -181,7 +186,7 @@ func RecoverPayloadFromDasBatch( return preimage, nil } - keysetPreimage, err := getByHash(ctx, cert.KeysetHash) + keysetPreimage, err := keysetFetcher.GetKeysetByHash(ctx, cert.KeysetHash) if err != nil { log.Error("Couldn't get keyset", "err", err) return nil, err diff --git a/cmd/conf/database.go b/cmd/conf/database.go index 6fde00579f..a75cca77d5 100644 --- a/cmd/conf/database.go +++ b/cmd/conf/database.go @@ -32,7 +32,7 @@ var PersistentConfigDefault = PersistentConfig{ LogDir: "", Handles: 512, Ancient: "", - DBEngine: "leveldb", + DBEngine: "", // auto-detect database type based on the db dir contents Pebble: PebbleConfigDefault, } @@ -42,7 +42,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".log-dir", PersistentConfigDefault.LogDir, "directory to store log file") f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database") f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened") - f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')") + f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use. If set to empty string the database type will be autodetected and if no pre-existing database is found it will default to creating new pebble database ('leveldb', 'pebble' or '' = auto-detect)") PebbleConfigAddOptions(prefix+".pebble", f) } @@ -97,11 +97,12 @@ func DatabaseInDirectory(path string) bool { } func (c *PersistentConfig) Validate() error { - // we are validating .db-engine here to avoid unintended behaviour as empty string value also has meaning in geth's node.Config.DBEngine - if c.DBEngine != "leveldb" && c.DBEngine != "pebble" { - return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb" or "pebble"`, c.DBEngine) + if c.DBEngine != "leveldb" && c.DBEngine != "pebble" && c.DBEngine != "" { + return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb", "pebble" or ""`, c.DBEngine) } - if c.DBEngine == "pebble" { + // if DBEngine == "" then we may end up opening pebble database, so we want to validate the Pebble config + // if pre-existing database is leveldb backed, then user shouldn't change the Pebble config defaults => this check should also succeed + if c.DBEngine == "pebble" || c.DBEngine == "" { if err := c.Pebble.Validate(); err != nil { return err } diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index 4017457ba9..ba60cbbd4d 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -316,6 +316,10 @@ func parseDumpKeyset(args []string) (*DumpKeysetConfig, error) { return nil, err } + if err = das.FixKeysetCLIParsing("keyset.backends", k); err != nil { + return nil, err + } + var config DumpKeysetConfig if err := confighelpers.EndCommonParse(k, &config); err != nil { return nil, err @@ -334,7 +338,7 @@ func parseDumpKeyset(args []string) (*DumpKeysetConfig, error) { if config.Keyset.AssumedHonest == 0 { return nil, errors.New("--keyset.assumed-honest must be set") } - if config.Keyset.Backends == "" { + if config.Keyset.Backends == nil { return nil, errors.New("--keyset.backends must be set") } diff --git a/cmd/nitro/config_test.go b/cmd/nitro/config_test.go index d76dd1b7b9..f94f941e0b 100644 --- a/cmd/nitro/config_test.go +++ b/cmd/nitro/config_test.go @@ -16,6 +16,7 @@ import ( "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/cmd/util/confighelpers" + "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/util/colors" "github.com/offchainlabs/nitro/util/testhelpers" @@ -28,6 +29,8 @@ func TestEmptyCliConfig(t *testing.T) { NodeConfigAddOptions(f) k, err := confighelpers.BeginCommonParse(f, []string{}) Require(t, err) + err = das.FixKeysetCLIParsing("node.data-availability.rpc-aggregator.backends", k) + Require(t, err) var emptyCliNodeConfig NodeConfig err = confighelpers.EndCommonParse(k, &emptyCliNodeConfig) Require(t, err) @@ -57,7 +60,7 @@ func TestValidatorConfig(t *testing.T) { } func TestAggregatorConfig(t *testing.T) { - args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends {[\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\",\"signerMask\":0x1]}", " ") + args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends [{\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\"}]", " ") _, _, err := ParseNode(context.Background(), args) Require(t, err) } diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 49d30fd59c..9c23cdf852 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -17,6 +17,7 @@ import ( "os" "path" "path/filepath" + "regexp" "runtime" "strings" "sync" @@ -326,6 +327,16 @@ func dirExists(path string) bool { return info.IsDir() } +var pebbleNotExistErrorRegex = regexp.MustCompile("pebble: database .* does not exist") + +func isPebbleNotExistError(err error) bool { + return pebbleNotExistErrorRegex.MatchString(err.Error()) +} + +func isLeveldbNotExistError(err error) bool { + return os.IsNotExist(err) +} + func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil { @@ -396,6 +407,9 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo return chainDb, l2BlockChain, nil } readOnlyDb.Close() + } else if !isLeveldbNotExistError(err) && !isPebbleNotExistError(err) { + // we only want to continue if the error is pebble or leveldb not exist error + return nil, nil, fmt.Errorf("Failed to open database: %w", err) } } diff --git a/cmd/nitro/init_test.go b/cmd/nitro/init_test.go index 17bac3d670..47ab4b4491 100644 --- a/cmd/nitro/init_test.go +++ b/cmd/nitro/init_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/node" "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/util/testhelpers" ) @@ -177,3 +178,35 @@ func startFileServer(t *testing.T, ctx context.Context, dir string) string { }() return addr } + +func testIsNotExistError(t *testing.T, dbEngine string, isNotExist func(error) bool) { + stackConf := node.DefaultConfig + stackConf.DataDir = t.TempDir() + stackConf.DBEngine = dbEngine + stack, err := node.New(&stackConf) + if err != nil { + t.Fatalf("Failed to created test stack: %v", err) + } + defer stack.Close() + readonly := true + _, err = stack.OpenDatabaseWithExtraOptions("test", 16, 16, "", readonly, nil) + if err == nil { + t.Fatal("Opening non-existent database did not fail") + } + if !isNotExist(err) { + t.Fatalf("Failed to classify error as not exist error - internal implementation of OpenDatabaseWithExtraOptions might have changed, err: %v", err) + } + err = errors.New("some other error") + if isNotExist(err) { + t.Fatalf("Classified other error as not exist, err: %v", err) + } +} + +func TestIsNotExistError(t *testing.T) { + t.Run("TestIsPebbleNotExistError", func(t *testing.T) { + testIsNotExistError(t, "pebble", isPebbleNotExistError) + }) + t.Run("TestIsLeveldbNotExistError", func(t *testing.T) { + testIsNotExistError(t, "leveldb", isLeveldbNotExistError) + }) +} diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index b2eb07f69c..1c4ad80186 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -51,6 +51,7 @@ import ( "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/cmd/util" "github.com/offchainlabs/nitro/cmd/util/confighelpers" + "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/execution/gethexec" _ "github.com/offchainlabs/nitro/execution/nodeInterface" "github.com/offchainlabs/nitro/solgen/go/bridgegen" @@ -60,6 +61,7 @@ import ( "github.com/offchainlabs/nitro/staker/validatorwallet" "github.com/offchainlabs/nitro/util/colors" "github.com/offchainlabs/nitro/util/headerreader" + "github.com/offchainlabs/nitro/util/iostat" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/validator/server_common" @@ -404,6 +406,10 @@ func mainImpl() int { return 1 } + if nodeConfig.Metrics { + go iostat.RegisterAndPopulateMetrics(ctx, 1, 5) + } + var deferFuncs []func() defer func() { for i := range deferFuncs { @@ -874,6 +880,10 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa return nil, nil, err } + if err = das.FixKeysetCLIParsing("node.data-availability.rpc-aggregator.backends", k); err != nil { + return nil, nil, err + } + var nodeConfig NodeConfig if err := confighelpers.EndCommonParse(k, &nodeConfig); err != nil { return nil, nil, err diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 71ea6760ed..5723b4a030 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -113,6 +113,10 @@ func (dasReader *PreimageDASReader) GetByHash(ctx context.Context, hash common.H return dastree.Content(hash, oracle) } +func (dasReader *PreimageDASReader) GetKeysetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { + return dasReader.GetByHash(ctx, hash) +} + func (dasReader *PreimageDASReader) HealthCheck(ctx context.Context) error { return nil } @@ -205,8 +209,11 @@ func main() { delayedMessagesRead = lastBlockHeader.Nonce.Uint64() } var dasReader daprovider.DASReader + var dasKeysetFetcher daprovider.DASKeysetFetcher if dasEnabled { + // DAS batch and keysets are all together in the same preimage binary. dasReader = &PreimageDASReader{} + dasKeysetFetcher = &PreimageDASReader{} } backend := WavmInbox{} var keysetValidationMode = daprovider.KeysetPanicIfInvalid @@ -215,7 +222,7 @@ func main() { } var dapReaders []daprovider.Reader if dasReader != nil { - dapReaders = append(dapReaders, daprovider.NewReaderForDAS(dasReader)) + dapReaders = append(dapReaders, daprovider.NewReaderForDAS(dasReader, dasKeysetFetcher)) } dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(&BlobPreimageReader{})) inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, dapReaders, keysetValidationMode) diff --git a/das/aggregator.go b/das/aggregator.go index f82174fb1c..9aa558b92c 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math/bits" + "sync/atomic" "time" flag "github.com/spf13/pflag" @@ -25,23 +26,35 @@ import ( "github.com/offchainlabs/nitro/util/pretty" ) +const metricBase string = "arb/das/rpc/aggregator/store" + +var ( + // This metric shows 1 if there was any error posting to the backends, until + // there was a Store that had no backend failures. + anyErrorGauge = metrics.GetOrRegisterGauge(metricBase+"/error/gauge", nil) + +// Other aggregator metrics are generated dynamically in the Store function. +) + type AggregatorConfig struct { - Enable bool `koanf:"enable"` - AssumedHonest int `koanf:"assumed-honest"` - Backends string `koanf:"backends"` - MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` + Enable bool `koanf:"enable"` + AssumedHonest int `koanf:"assumed-honest"` + Backends BackendConfigList `koanf:"backends"` + MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` } var DefaultAggregatorConfig = AggregatorConfig{ AssumedHonest: 0, - Backends: "", + Backends: nil, MaxStoreChunkBodySize: 512 * 1024, } +var parsedBackendsConf BackendConfigList + func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types") f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.") - f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration") + f.Var(&parsedBackendsConf, prefix+".backends", "JSON RPC backend configuration. This can be specified on the command line as a JSON array, eg: [{\"url\": \"...\", \"pubkey\": \"...\"},...], or as a JSON array in the config file.") f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers") } @@ -154,13 +167,22 @@ type storeResponse struct { // (eg via TimeoutWrapper) then it also returns an error. func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { log.Trace("das.Aggregator.Store", "message", pretty.FirstFewBytes(message), "timeout", time.Unix(int64(timeout), 0)) + + allBackendsSucceeded := false + defer func() { + if allBackendsSucceeded { + anyErrorGauge.Update(0) + } else { + anyErrorGauge.Update(1) + } + }() + responses := make(chan storeResponse, len(a.services)) expectedHash := dastree.Hash(message) for _, d := range a.services { go func(ctx context.Context, d ServiceDetails) { storeCtx, cancel := context.WithTimeout(ctx, a.requestTimeout) - const metricBase string = "arb/das/rpc/aggregator/store" var metricWithServiceName = metricBase + "/" + d.metricName defer cancel() incFailureMetric := func() { @@ -226,22 +248,22 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) err error } + var storeFailures atomic.Int64 // Collect responses from backends. certDetailsChan := make(chan certDetails) go func() { var pubKeys []blsSignatures.PublicKey var sigs []blsSignatures.Signature var aggSignersMask uint64 - var storeFailures, successfullyStoredCount int + var successfullyStoredCount int var returned bool for i := 0; i < len(a.services); i++ { - select { case <-ctx.Done(): break case r := <-responses: if r.err != nil { - storeFailures++ + _ = storeFailures.Add(1) log.Warn("das.Aggregator: Error from backend", "backend", r.details.service, "signerMask", r.details.signersMask, "err", r.err) } else { pubKeys = append(pubKeys, r.details.pubKey) @@ -265,10 +287,10 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) certDetailsChan <- cd returned = true if a.maxAllowedServiceStoreFailures > 0 && // Ignore the case where AssumedHonest = 1, probably a testnet - storeFailures+1 > a.maxAllowedServiceStoreFailures { + int(storeFailures.Load())+1 > a.maxAllowedServiceStoreFailures { log.Error("das.Aggregator: storing the batch data succeeded to enough DAS commitee members to generate the Data Availability Cert, but if one more had failed then the cert would not have been able to be generated. Look for preceding logs with \"Error from backend\"") } - } else if storeFailures > a.maxAllowedServiceStoreFailures { + } else if int(storeFailures.Load()) > a.maxAllowedServiceStoreFailures { cd := certDetails{} cd.err = fmt.Errorf("aggregator failed to store message to at least %d out of %d DASes (assuming %d are honest). %w", a.requiredServicesForStore, len(a.services), a.config.AssumedHonest, daprovider.ErrBatchToDasFailed) certDetailsChan <- cd @@ -302,6 +324,11 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64) if !verified { return nil, fmt.Errorf("failed aggregate signature check. %w", daprovider.ErrBatchToDasFailed) } + + if storeFailures.Load() == 0 { + allBackendsSucceeded = true + } + return &aggCert, nil } diff --git a/das/chain_fetch_das.go b/das/chain_fetch_das.go index 99311decaa..465b54f400 100644 --- a/das/chain_fetch_das.go +++ b/das/chain_fetch_das.go @@ -8,7 +8,6 @@ import ( "errors" "sync" - "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/util/pretty" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -37,59 +36,41 @@ func (c *syncedKeysetCache) put(key [32]byte, value []byte) { c.cache[key] = value } -type ChainFetchReader struct { - daprovider.DASReader +type KeysetFetcher struct { seqInboxCaller *bridgegen.SequencerInboxCaller seqInboxFilterer *bridgegen.SequencerInboxFilterer keysetCache syncedKeysetCache } -func NewChainFetchReader(inner daprovider.DASReader, l1client arbutil.L1Interface, seqInboxAddr common.Address) (*ChainFetchReader, error) { +func NewKeysetFetcher(l1client arbutil.L1Interface, seqInboxAddr common.Address) (*KeysetFetcher, error) { seqInbox, err := bridgegen.NewSequencerInbox(seqInboxAddr, l1client) if err != nil { return nil, err } - return NewChainFetchReaderWithSeqInbox(inner, seqInbox) + return NewKeysetFetcherWithSeqInbox(seqInbox) } -func NewChainFetchReaderWithSeqInbox(inner daprovider.DASReader, seqInbox *bridgegen.SequencerInbox) (*ChainFetchReader, error) { - return &ChainFetchReader{ - DASReader: inner, +func NewKeysetFetcherWithSeqInbox(seqInbox *bridgegen.SequencerInbox) (*KeysetFetcher, error) { + return &KeysetFetcher{ seqInboxCaller: &seqInbox.SequencerInboxCaller, seqInboxFilterer: &seqInbox.SequencerInboxFilterer, keysetCache: syncedKeysetCache{cache: make(map[[32]byte][]byte)}, }, nil } -func (c *ChainFetchReader) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { - log.Trace("das.ChainFetchReader.GetByHash", "hash", pretty.PrettyHash(hash)) - return chainFetchGetByHash(ctx, c.DASReader, &c.keysetCache, c.seqInboxCaller, c.seqInboxFilterer, hash) -} -func (c *ChainFetchReader) String() string { - return "ChainFetchReader" -} +func (c *KeysetFetcher) GetKeysetByHash(ctx context.Context, hash common.Hash) ([]byte, error) { + log.Trace("das.KeysetFetcher.GetKeysetByHash", "hash", pretty.PrettyHash(hash)) + cache := &c.keysetCache + seqInboxCaller := c.seqInboxCaller + seqInboxFilterer := c.seqInboxFilterer -func chainFetchGetByHash( - ctx context.Context, - daReader daprovider.DASReader, - cache *syncedKeysetCache, - seqInboxCaller *bridgegen.SequencerInboxCaller, - seqInboxFilterer *bridgegen.SequencerInboxFilterer, - hash common.Hash, -) ([]byte, error) { // try to fetch from the cache res, ok := cache.get(hash) if ok { return res, nil } - // try to fetch from the inner DAS - innerRes, err := daReader.GetByHash(ctx, hash) - if err == nil && dastree.ValidHash(hash, innerRes) { - return innerRes, nil - } - // try to fetch from the L1 chain blockNumBig, err := seqInboxCaller.GetKeysetCreationBlock(&bind.CallOpts{Context: ctx}, hash) if err != nil { diff --git a/das/factory.go b/das/factory.go index d9eacd0ada..fd6f60abb2 100644 --- a/das/factory.go +++ b/das/factory.go @@ -97,37 +97,37 @@ func CreateBatchPosterDAS( dataSigner signature.DataSignerFunc, l1Reader arbutil.L1Interface, sequencerInboxAddr common.Address, -) (DataAvailabilityServiceWriter, DataAvailabilityServiceReader, *LifecycleManager, error) { +) (DataAvailabilityServiceWriter, DataAvailabilityServiceReader, *KeysetFetcher, *LifecycleManager, error) { if !config.Enable { - return nil, nil, nil, nil + return nil, nil, nil, nil, nil } // Check config requirements if !config.RPCAggregator.Enable || !config.RestAggregator.Enable { - return nil, nil, nil, errors.New("--node.data-availability.rpc-aggregator.enable and rest-aggregator.enable must be set when running a Batch Poster in AnyTrust mode") + return nil, nil, nil, nil, errors.New("--node.data-availability.rpc-aggregator.enable and rest-aggregator.enable must be set when running a Batch Poster in AnyTrust mode") } // Done checking config requirements var daWriter DataAvailabilityServiceWriter daWriter, err := NewRPCAggregator(ctx, *config, dataSigner) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } restAgg.Start(ctx) var lifecycleManager LifecycleManager lifecycleManager.Register(restAgg) var daReader DataAvailabilityServiceReader = restAgg - daReader, err = NewChainFetchReader(daReader, l1Reader, sequencerInboxAddr) + keysetFetcher, err := NewKeysetFetcher(l1Reader, sequencerInboxAddr) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - return daWriter, daReader, &lifecycleManager, nil + return daWriter, daReader, keysetFetcher, &lifecycleManager, nil } func CreateDAComponentsForDaserver( @@ -233,13 +233,6 @@ func CreateDAComponentsForDaserver( } } - if seqInboxAddress != nil { - daReader, err = NewChainFetchReader(daReader, (*l1Reader).Client(), *seqInboxAddress) - if err != nil { - return nil, nil, nil, nil, nil, err - } - } - return daReader, daWriter, signatureVerifier, daHealthChecker, dasLifecycleManager, nil } @@ -248,18 +241,18 @@ func CreateDAReaderForNode( config *DataAvailabilityConfig, l1Reader *headerreader.HeaderReader, seqInboxAddress *common.Address, -) (DataAvailabilityServiceReader, *LifecycleManager, error) { +) (DataAvailabilityServiceReader, *KeysetFetcher, *LifecycleManager, error) { if !config.Enable { - return nil, nil, nil + return nil, nil, nil, nil } // Check config requirements if config.RPCAggregator.Enable { - return nil, nil, errors.New("node.data-availability.rpc-aggregator is only for Batch Poster mode") + return nil, nil, nil, errors.New("node.data-availability.rpc-aggregator is only for Batch Poster mode") } if !config.RestAggregator.Enable { - return nil, nil, fmt.Errorf("--node.data-availability.enable was set but not --node.data-availability.rest-aggregator. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") + return nil, nil, nil, fmt.Errorf("--node.data-availability.enable was set but not --node.data-availability.rest-aggregator. When running a Nitro Anytrust node in non-Batch Poster mode, some way to get the batch data is required.") } // Done checking config requirements @@ -269,23 +262,25 @@ func CreateDAReaderForNode( var restAgg *SimpleDASReaderAggregator restAgg, err := NewRestfulClientAggregator(ctx, &config.RestAggregator) if err != nil { - return nil, nil, err + return nil, nil, nil, err } restAgg.Start(ctx) lifecycleManager.Register(restAgg) daReader = restAgg } + var keysetFetcher *KeysetFetcher if seqInboxAddress != nil { seqInbox, err := bridgegen.NewSequencerInbox(*seqInboxAddress, (*l1Reader).Client()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - daReader, err = NewChainFetchReaderWithSeqInbox(daReader, seqInbox) + keysetFetcher, err = NewKeysetFetcherWithSeqInbox(seqInbox) if err != nil { - return nil, nil, err + return nil, nil, nil, err } + } - return daReader, &lifecycleManager, nil + return daReader, keysetFetcher, &lifecycleManager, nil } diff --git a/das/rpc_aggregator.go b/das/rpc_aggregator.go index 7e363c6179..24a470be5b 100644 --- a/das/rpc_aggregator.go +++ b/das/rpc_aggregator.go @@ -12,6 +12,8 @@ import ( "math/bits" "net/url" + "github.com/knadh/koanf" + "github.com/knadh/koanf/providers/confmap" "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/solgen/go/bridgegen" @@ -23,9 +25,54 @@ import ( ) type BackendConfig struct { - URL string `json:"url"` - PubKeyBase64Encoded string `json:"pubkey"` - SignerMask uint64 `json:"signermask"` + URL string `koanf:"url" json:"url"` + Pubkey string `koanf:"pubkey" json:"pubkey"` +} + +type BackendConfigList []BackendConfig + +func (l *BackendConfigList) String() string { + b, _ := json.Marshal(*l) + return string(b) +} + +func (l *BackendConfigList) Set(value string) error { + return l.UnmarshalJSON([]byte(value)) +} + +func (l *BackendConfigList) UnmarshalJSON(data []byte) error { + var tmp []BackendConfig + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + *l = tmp + return nil +} + +func (l *BackendConfigList) Type() string { + return "backendConfigList" +} + +func FixKeysetCLIParsing(path string, k *koanf.Koanf) error { + rawBackends := k.Get(path) + if bk, ok := rawBackends.(string); ok { + err := parsedBackendsConf.UnmarshalJSON([]byte(bk)) + if err != nil { + return err + } + + // Create a map with the parsed backend configurations + tempMap := map[string]interface{}{ + path: parsedBackendsConf, + } + + // Load the map into koanf + if err = k.Load(confmap.Provider(tempMap, "."), nil); err != nil { + return err + } + + } + return nil } func NewRPCAggregator(ctx context.Context, config DataAvailabilityConfig, signer signature.DataSignerFunc) (*Aggregator, error) { @@ -53,15 +100,9 @@ func NewRPCAggregatorWithSeqInboxCaller(config DataAvailabilityConfig, seqInboxC } func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]ServiceDetails, error) { - var cs []BackendConfig - err := json.Unmarshal([]byte(config.Backends), &cs) - if err != nil { - return nil, err - } - var services []ServiceDetails - for _, b := range cs { + for i, b := range config.Backends { url, err := url.Parse(b.URL) if err != nil { return nil, err @@ -73,12 +114,12 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([] return nil, err } - pubKey, err := DecodeBase64BLSPublicKey([]byte(b.PubKeyBase64Encoded)) + pubKey, err := DecodeBase64BLSPublicKey([]byte(b.Pubkey)) if err != nil { return nil, err } - d, err := NewServiceDetails(service, *pubKey, b.SignerMask, metricName) + d, err := NewServiceDetails(service, *pubKey, 1<= consensusSyncTarget { - return make(map[string]interface{}) - } + if s.Synced() { + return make(map[string]interface{}) } return s.FullSyncProgressMap() } @@ -112,7 +108,14 @@ func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error) } func (s *SyncMonitor) Synced() bool { - return len(s.SyncProgressMap()) == 0 + if s.consensus.Synced() { + built, err := s.exec.HeadMessageNumber() + consensusSyncTarget := s.consensus.SyncTargetMessageCount() + if err == nil && built+1 >= consensusSyncTarget { + return true + } + } + return false } func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) { diff --git a/execution/interface.go b/execution/interface.go index 32ec7dd0f7..ddf30b4b2a 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -57,6 +57,8 @@ type ExecutionSequencer interface { SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) MarkFeedStart(to arbutil.MessageIndex) + Synced() bool + FullSyncProgressMap() map[string]interface{} } type FullExecutionClient interface { diff --git a/nitro-testnode b/nitro-testnode index c334820b2d..9dc0588c50 160000 --- a/nitro-testnode +++ b/nitro-testnode @@ -1 +1 @@ -Subproject commit c334820b2dba6dfa4078f81ed242afbbccc19c91 +Subproject commit 9dc0588c5066e2efd25c09adf12df7b28ef18cb6 diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 222f39b623..16d6b2f131 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -1157,9 +1157,8 @@ func setupConfigWithDAS( Require(t, err) beConfigA := das.BackendConfig{ - URL: "http://" + rpcLis.Addr().String(), - PubKeyBase64Encoded: blsPubToBase64(dasSignerKey), - SignerMask: 1, + URL: "http://" + rpcLis.Addr().String(), + Pubkey: blsPubToBase64(dasSignerKey), } l1NodeConfigA.DataAvailability.RPCAggregator = aggConfigForBackend(t, beConfigA) l1NodeConfigA.DataAvailability.Enable = true diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 77958ab5aa..2332f4ee9e 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -6,7 +6,6 @@ package arbtest import ( "context" "encoding/base64" - "encoding/json" "io" "math/big" "net" @@ -82,9 +81,8 @@ func startLocalDASServer( restServer, err := das.NewRestfulDasServerOnListener(restLis, genericconf.HTTPServerTimeoutConfigDefault, storageService, storageService) Require(t, err) beConfig := das.BackendConfig{ - URL: "http://" + rpcLis.Addr().String(), - PubKeyBase64Encoded: blsPubToBase64(pubkey), - SignerMask: 1, + URL: "http://" + rpcLis.Addr().String(), + Pubkey: blsPubToBase64(pubkey), } return rpcServer, pubkey, beConfig, restServer, "http://" + restLis.Addr().String() } @@ -97,12 +95,10 @@ func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { } func aggConfigForBackend(t *testing.T, backendConfig das.BackendConfig) das.AggregatorConfig { - backendsJsonByte, err := json.Marshal([]das.BackendConfig{backendConfig}) - Require(t, err) return das.AggregatorConfig{ Enable: true, AssumedHonest: 1, - Backends: string(backendsJsonByte), + Backends: das.BackendConfigList{backendConfig}, MaxStoreChunkBodySize: 512 * 1024, } } @@ -308,9 +304,8 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { RequestTimeout: 5 * time.Second, } beConfigA := das.BackendConfig{ - URL: "http://" + rpcLis.Addr().String(), - PubKeyBase64Encoded: blsPubToBase64(pubkey), - SignerMask: 1, + URL: "http://" + rpcLis.Addr().String(), + Pubkey: blsPubToBase64(pubkey), } l1NodeConfigA.DataAvailability.RPCAggregator = aggConfigForBackend(t, beConfigA) l1NodeConfigA.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig diff --git a/util/iostat/iostat.go b/util/iostat/iostat.go new file mode 100644 index 0000000000..9bc5ff800c --- /dev/null +++ b/util/iostat/iostat.go @@ -0,0 +1,114 @@ +package iostat + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "runtime" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +func RegisterAndPopulateMetrics(ctx context.Context, spawnInterval, maxDeviceCount int) { + if runtime.GOOS != "linux" { + log.Warn("Iostat command not supported disabling corresponding metrics") + return + } + deviceMetrics := make(map[string]map[string]metrics.GaugeFloat64) + statReceiver := make(chan DeviceStats) + go Run(ctx, spawnInterval, statReceiver) + for { + stat, ok := <-statReceiver + if !ok { + log.Info("Iostat statReceiver channel was closed due to error or command being completed") + return + } + if _, ok := deviceMetrics[stat.DeviceName]; !ok { + // Register metrics for a maximum of maxDeviceCount (fail safe incase iostat command returns incorrect names indefinitely) + if len(deviceMetrics) < maxDeviceCount { + baseMetricName := fmt.Sprintf("isotat/%s/", stat.DeviceName) + deviceMetrics[stat.DeviceName] = make(map[string]metrics.GaugeFloat64) + deviceMetrics[stat.DeviceName]["readspersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"readspersecond", nil) + deviceMetrics[stat.DeviceName]["writespersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"writespersecond", nil) + deviceMetrics[stat.DeviceName]["await"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"await", nil) + } else { + continue + } + } + deviceMetrics[stat.DeviceName]["readspersecond"].Update(stat.ReadsPerSecond) + deviceMetrics[stat.DeviceName]["writespersecond"].Update(stat.WritesPerSecond) + deviceMetrics[stat.DeviceName]["await"].Update(stat.Await) + } +} + +type DeviceStats struct { + DeviceName string + ReadsPerSecond float64 + WritesPerSecond float64 + Await float64 +} + +func Run(ctx context.Context, interval int, receiver chan DeviceStats) { + defer close(receiver) + // #nosec G204 + cmd := exec.CommandContext(ctx, "iostat", "-dNxy", strconv.Itoa(interval)) + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Error("Failed to get stdout", "err", err) + return + } + if err := cmd.Start(); err != nil { + log.Error("Failed to start iostat command", "err", err) + return + } + var fields []string + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if strings.HasPrefix(line, "Device") { + fields = strings.Fields(line) + continue + } + data := strings.Fields(line) + if len(data) == 0 { + continue + } + stat := DeviceStats{} + var err error + for i, field := range fields { + switch field { + case "Device", "Device:": + stat.DeviceName = data[i] + case "r/s": + stat.ReadsPerSecond, err = strconv.ParseFloat(data[i], 64) + case "w/s": + stat.WritesPerSecond, err = strconv.ParseFloat(data[i], 64) + case "await": + stat.Await, err = strconv.ParseFloat(data[i], 64) + } + if err != nil { + log.Error("Error parsing command result from iostat", "err", err) + continue + } + } + if stat.DeviceName == "" { + continue + } + receiver <- stat + } + if scanner.Err() != nil { + log.Error("Iostat scanner error", err, scanner.Err()) + } + if err := cmd.Process.Kill(); err != nil { + log.Error("Failed to kill iostat process", "err", err) + } + if err := cmd.Wait(); err != nil { + log.Error("Error waiting for iostat to exit", "err", err) + } + stdout.Close() + log.Info("Iostat command terminated") +}