Skip to content

Commit

Permalink
Merge branch 'master' into mach-fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed Jul 2, 2024
2 parents a58b014 + dc84707 commit f2e43be
Show file tree
Hide file tree
Showing 28 changed files with 390 additions and 135 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y \
ca-certificates \
wabt && \
wabt \
sysstat && \
/usr/sbin/update-ca-certificates && \
useradd -s /bin/bash user && \
mkdir -p /home/user/l1keystore && \
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ test-go-deps: \
build-replay-env \
$(stylus_test_wasms) \
$(arbitrator_stylus_lib) \
$(arbitrator_generated_header) \
$(patsubst %,$(arbitrator_cases)/%.wasm, global-state read-inboxmsg-10 global-state-wrapper const)

build-prover-header: $(arbitrator_generated_header)
Expand Down
5 changes: 5 additions & 0 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type execClientWrapper struct {
func (w *execClientWrapper) Pause() { w.t.Error("not supported") }
func (w *execClientWrapper) Activate() { w.t.Error("not supported") }
func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil }
func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false }
func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} {
w.t.Error("not supported")
return nil
}

func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
chainConfig := params.ArbitrumDevTestChainConfig()
Expand Down
7 changes: 4 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,15 @@ func createNodeImpl(
var daWriter das.DataAvailabilityServiceWriter
var daReader das.DataAvailabilityServiceReader
var dasLifecycleManager *das.LifecycleManager
var dasKeysetFetcher *das.KeysetFetcher
if config.DataAvailability.Enable {
if config.BatchPoster.Enable {
daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
daWriter, daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
if err != nil {
return nil, err
}
} else {
daReader, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox)
daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox)
if err != nil {
return nil, err
}
Expand All @@ -554,7 +555,7 @@ func createNodeImpl(
}
var dapReaders []daprovider.Reader
if daReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader))
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader, dasKeysetFetcher))
}
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
Expand Down
9 changes: 4 additions & 5 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -150,7 +149,6 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -607,9 +605,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
return c.noRedisError()
}

syncProgress := c.sync.SyncProgressMap()
synced := len(syncProgress) == 0
// Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago
synced := c.sequencer.Synced()
if !synced {
syncProgress := c.sequencer.FullSyncProgressMap()
var detailsList []interface{}
for key, value := range syncProgress {
detailsList = append(detailsList, key, value)
Expand Down Expand Up @@ -849,7 +848,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) {
defer c.wantsLockoutMutex.Unlock()
c.avoidLockout--
log.Info("seeking lockout", "myUrl", c.config.Url())
if c.sync.Synced() {
if c.sequencer.Synced() {
// Even if this errors we still internally marked ourselves as wanting the lockout
err := c.wantsLockoutUpdateWithMutex(ctx)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions arbos/programs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func newApiClosures(
}
captureHostio := func(name string, args, outs []byte, startInk, endInk uint64) {
tracingInfo.Tracer.CaptureStylusHostio(name, args, outs, startInk, endInk)
if name == "evm_gas_left" || name == "evm_ink_left" {
tracingInfo.Tracer.CaptureState(0, vm.GAS, 0, 0, scope, []byte{}, depth, nil)
tracingInfo.Tracer.CaptureState(0, vm.POP, 0, 0, scope, []byte{}, depth, nil)
}
}

return func(req RequestType, input []byte) ([]byte, []byte, uint64) {
Expand Down
12 changes: 8 additions & 4 deletions arbstate/daprovider/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type Reader interface {

// NewReaderForDAS is generally meant to be only used by nitro.
// DA Providers should implement methods in the Reader interface independently
func NewReaderForDAS(dasReader DASReader) *readerForDAS {
return &readerForDAS{dasReader: dasReader}
func NewReaderForDAS(dasReader DASReader, keysetFetcher DASKeysetFetcher) *readerForDAS {
return &readerForDAS{
dasReader: dasReader,
keysetFetcher: keysetFetcher,
}
}

type readerForDAS struct {
dasReader DASReader
dasReader DASReader
keysetFetcher DASKeysetFetcher
}

func (d *readerForDAS) IsValidHeaderByte(headerByte byte) bool {
Expand All @@ -50,7 +54,7 @@ func (d *readerForDAS) RecoverPayloadFromBatch(
preimageRecorder PreimageRecorder,
validateSeqMsg bool,
) ([]byte, error) {
return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, preimageRecorder, validateSeqMsg)
return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, d.keysetFetcher, preimageRecorder, validateSeqMsg)
}

// NewReaderForBlobReader is generally meant to be only used by nitro.
Expand Down
7 changes: 6 additions & 1 deletion arbstate/daprovider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type DASWriter interface {
fmt.Stringer
}

type DASKeysetFetcher interface {
GetKeysetByHash(context.Context, common.Hash) ([]byte, error)
}

type BlobReader interface {
GetBlobs(
ctx context.Context,
Expand Down Expand Up @@ -138,6 +142,7 @@ func RecoverPayloadFromDasBatch(
batchNum uint64,
sequencerMsg []byte,
dasReader DASReader,
keysetFetcher DASKeysetFetcher,
preimageRecorder PreimageRecorder,
validateSeqMsg bool,
) ([]byte, error) {
Expand Down Expand Up @@ -181,7 +186,7 @@ func RecoverPayloadFromDasBatch(
return preimage, nil
}

keysetPreimage, err := getByHash(ctx, cert.KeysetHash)
keysetPreimage, err := keysetFetcher.GetKeysetByHash(ctx, cert.KeysetHash)
if err != nil {
log.Error("Couldn't get keyset", "err", err)
return nil, err
Expand Down
13 changes: 7 additions & 6 deletions cmd/conf/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var PersistentConfigDefault = PersistentConfig{
LogDir: "",
Handles: 512,
Ancient: "",
DBEngine: "leveldb",
DBEngine: "", // auto-detect database type based on the db dir contents
Pebble: PebbleConfigDefault,
}

Expand All @@ -42,7 +42,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".log-dir", PersistentConfigDefault.LogDir, "directory to store log file")
f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database")
f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened")
f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')")
f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use. If set to empty string the database type will be autodetected and if no pre-existing database is found it will default to creating new pebble database ('leveldb', 'pebble' or '' = auto-detect)")
PebbleConfigAddOptions(prefix+".pebble", f)
}

Expand Down Expand Up @@ -97,11 +97,12 @@ func DatabaseInDirectory(path string) bool {
}

func (c *PersistentConfig) Validate() error {
// we are validating .db-engine here to avoid unintended behaviour as empty string value also has meaning in geth's node.Config.DBEngine
if c.DBEngine != "leveldb" && c.DBEngine != "pebble" {
return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb" or "pebble"`, c.DBEngine)
if c.DBEngine != "leveldb" && c.DBEngine != "pebble" && c.DBEngine != "" {
return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb", "pebble" or ""`, c.DBEngine)
}
if c.DBEngine == "pebble" {
// if DBEngine == "" then we may end up opening pebble database, so we want to validate the Pebble config
// if pre-existing database is leveldb backed, then user shouldn't change the Pebble config defaults => this check should also succeed
if c.DBEngine == "pebble" || c.DBEngine == "" {
if err := c.Pebble.Validate(); err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ func parseDumpKeyset(args []string) (*DumpKeysetConfig, error) {
return nil, err
}

if err = das.FixKeysetCLIParsing("keyset.backends", k); err != nil {
return nil, err
}

var config DumpKeysetConfig
if err := confighelpers.EndCommonParse(k, &config); err != nil {
return nil, err
Expand All @@ -334,7 +338,7 @@ func parseDumpKeyset(args []string) (*DumpKeysetConfig, error) {
if config.Keyset.AssumedHonest == 0 {
return nil, errors.New("--keyset.assumed-honest must be set")
}
if config.Keyset.Backends == "" {
if config.Keyset.Backends == nil {
return nil, errors.New("--keyset.backends must be set")
}

Expand Down
5 changes: 4 additions & 1 deletion cmd/nitro/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/cmd/util/confighelpers"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/util/colors"
"github.com/offchainlabs/nitro/util/testhelpers"

Expand All @@ -28,6 +29,8 @@ func TestEmptyCliConfig(t *testing.T) {
NodeConfigAddOptions(f)
k, err := confighelpers.BeginCommonParse(f, []string{})
Require(t, err)
err = das.FixKeysetCLIParsing("node.data-availability.rpc-aggregator.backends", k)
Require(t, err)
var emptyCliNodeConfig NodeConfig
err = confighelpers.EndCommonParse(k, &emptyCliNodeConfig)
Require(t, err)
Expand Down Expand Up @@ -57,7 +60,7 @@ func TestValidatorConfig(t *testing.T) {
}

func TestAggregatorConfig(t *testing.T) {
args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends {[\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\",\"signerMask\":0x1]}", " ")
args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends [{\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\"}]", " ")
_, _, err := ParseNode(context.Background(), args)
Require(t, err)
}
Expand Down
14 changes: 14 additions & 0 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -326,6 +327,16 @@ func dirExists(path string) bool {
return info.IsDir()
}

var pebbleNotExistErrorRegex = regexp.MustCompile("pebble: database .* does not exist")

func isPebbleNotExistError(err error) bool {
return pebbleNotExistErrorRegex.MatchString(err.Error())
}

func isLeveldbNotExistError(err error) bool {
return os.IsNotExist(err)
}

func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
if !config.Init.Force {
if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil {
Expand Down Expand Up @@ -396,6 +407,9 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
return chainDb, l2BlockChain, nil
}
readOnlyDb.Close()
} else if !isLeveldbNotExistError(err) && !isPebbleNotExistError(err) {
// we only want to continue if the error is pebble or leveldb not exist error
return nil, nil, fmt.Errorf("Failed to open database: %w", err)
}
}

Expand Down
33 changes: 33 additions & 0 deletions cmd/nitro/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/node"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/util/testhelpers"
)
Expand Down Expand Up @@ -177,3 +178,35 @@ func startFileServer(t *testing.T, ctx context.Context, dir string) string {
}()
return addr
}

func testIsNotExistError(t *testing.T, dbEngine string, isNotExist func(error) bool) {
stackConf := node.DefaultConfig
stackConf.DataDir = t.TempDir()
stackConf.DBEngine = dbEngine
stack, err := node.New(&stackConf)
if err != nil {
t.Fatalf("Failed to created test stack: %v", err)
}
defer stack.Close()
readonly := true
_, err = stack.OpenDatabaseWithExtraOptions("test", 16, 16, "", readonly, nil)
if err == nil {
t.Fatal("Opening non-existent database did not fail")
}
if !isNotExist(err) {
t.Fatalf("Failed to classify error as not exist error - internal implementation of OpenDatabaseWithExtraOptions might have changed, err: %v", err)
}
err = errors.New("some other error")
if isNotExist(err) {
t.Fatalf("Classified other error as not exist, err: %v", err)
}
}

func TestIsNotExistError(t *testing.T) {
t.Run("TestIsPebbleNotExistError", func(t *testing.T) {
testIsNotExistError(t, "pebble", isPebbleNotExistError)
})
t.Run("TestIsLeveldbNotExistError", func(t *testing.T) {
testIsNotExistError(t, "leveldb", isLeveldbNotExistError)
})
}
10 changes: 10 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/cmd/util"
"github.com/offchainlabs/nitro/cmd/util/confighelpers"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution/gethexec"
_ "github.com/offchainlabs/nitro/execution/nodeInterface"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand All @@ -60,6 +61,7 @@ import (
"github.com/offchainlabs/nitro/staker/validatorwallet"
"github.com/offchainlabs/nitro/util/colors"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/iostat"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/validator/server_common"
Expand Down Expand Up @@ -404,6 +406,10 @@ func mainImpl() int {
return 1
}

if nodeConfig.Metrics {
go iostat.RegisterAndPopulateMetrics(ctx, 1, 5)
}

var deferFuncs []func()
defer func() {
for i := range deferFuncs {
Expand Down Expand Up @@ -874,6 +880,10 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa
return nil, nil, err
}

if err = das.FixKeysetCLIParsing("node.data-availability.rpc-aggregator.backends", k); err != nil {
return nil, nil, err
}

var nodeConfig NodeConfig
if err := confighelpers.EndCommonParse(k, &nodeConfig); err != nil {
return nil, nil, err
Expand Down
9 changes: 8 additions & 1 deletion cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (dasReader *PreimageDASReader) GetByHash(ctx context.Context, hash common.H
return dastree.Content(hash, oracle)
}

func (dasReader *PreimageDASReader) GetKeysetByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
return dasReader.GetByHash(ctx, hash)
}

func (dasReader *PreimageDASReader) HealthCheck(ctx context.Context) error {
return nil
}
Expand Down Expand Up @@ -205,8 +209,11 @@ func main() {
delayedMessagesRead = lastBlockHeader.Nonce.Uint64()
}
var dasReader daprovider.DASReader
var dasKeysetFetcher daprovider.DASKeysetFetcher
if dasEnabled {
// DAS batch and keysets are all together in the same preimage binary.
dasReader = &PreimageDASReader{}
dasKeysetFetcher = &PreimageDASReader{}
}
backend := WavmInbox{}
var keysetValidationMode = daprovider.KeysetPanicIfInvalid
Expand All @@ -215,7 +222,7 @@ func main() {
}
var dapReaders []daprovider.Reader
if dasReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(dasReader))
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(dasReader, dasKeysetFetcher))
}
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(&BlobPreimageReader{}))
inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, dapReaders, keysetValidationMode)
Expand Down
Loading

0 comments on commit f2e43be

Please sign in to comment.