diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 7f0c6c0bb..ce468ef9f 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -7,16 +7,17 @@ import ( "math" "time" - "github.com/onflow/flow-go/module/component" - pebbleDB "github.com/cockroachdb/pebble" - + "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-go-sdk/access" "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go-sdk/crypto" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/fvm/evm" flowGo "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + flowMetrics "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/module/util" gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" @@ -24,7 +25,6 @@ import ( "github.com/onflow/flow-evm-gateway/api" "github.com/onflow/flow-evm-gateway/config" - "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/ingestion" @@ -57,9 +57,10 @@ type Bootstrap struct { publishers *Publishers collector metrics.Collector server *api.Server - metrics *metrics.Server + metrics *flowMetrics.Server events *ingestion.Engine profiler *api.ProfileServer + db *pebbleDB.DB } func New(config *config.Config) (*Bootstrap, error) { @@ -72,7 +73,7 @@ func New(config *config.Config) (*Bootstrap, error) { return nil, err } - storages, err := setupStorage(config, client, logger) + db, storages, err := setupStorage(config, client, logger) if err != nil { return nil, err } @@ -83,6 +84,7 @@ func New(config *config.Config) (*Bootstrap, error) { Transaction: models.NewPublisher[*gethTypes.Transaction](), Logs: models.NewPublisher[[]*gethTypes.Log](), }, + db: db, storages: storages, logger: logger, config: config, @@ -334,15 +336,14 @@ func (b *Bootstrap) StopAPIServer() { b.server.Stop() } -func (b *Bootstrap) StartMetricsServer(_ context.Context) error { +func (b *Bootstrap) StartMetricsServer(ctx context.Context) error { b.logger.Info().Msg("bootstrap starting metrics server") - b.metrics = metrics.NewServer(b.logger, b.config.MetricsPort) - started, err := b.metrics.Start() + b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort)) + err := util.WaitClosed(ctx, b.metrics.Ready()) if err != nil { return fmt.Errorf("failed to start metrics server: %w", err) } - <-started return nil } @@ -352,7 +353,7 @@ func (b *Bootstrap) StopMetricsServer() { return } b.logger.Warn().Msg("shutting down metrics server") - b.metrics.Stop() + <-b.metrics.Done() } func (b *Bootstrap) StartProfilerServer(_ context.Context) error { @@ -388,15 +389,25 @@ func (b *Bootstrap) StopProfilerServer() { } func (b *Bootstrap) StopDB() { - if b.storages == nil || b.storages.Storage == nil { + if b.db == nil { return } - err := b.storages.Storage.Close() + err := b.db.Close() if err != nil { b.logger.Err(err).Msg("PebbleDB graceful shutdown failed") } } +func (b *Bootstrap) StopClient() { + if b.client == nil { + return + } + err := b.client.Close() + if err != nil { + b.logger.Err(err).Msg("CrossSporkClient graceful shutdown failed") + } +} + // StartEngine starts provided engine and panics if there are startup errors. func StartEngine( ctx context.Context, @@ -466,12 +477,13 @@ func setupStorage( config *config.Config, client *requester.CrossSporkClient, logger zerolog.Logger, -) (*Storages, error) { +) (*pebbleDB.DB, *Storages, error) { // create pebble storage from the provided database root directory - store, err := pebble.New(config.DatabaseDir, logger) + db, err := pebble.OpenDB(config.DatabaseDir) if err != nil { - return nil, err + return nil, nil, err } + store := pebble.New(db, logger) blocks := pebble.NewBlocks(store, config.FlowNetworkID) storageAddress := evm.StorageAccountAddress(config.FlowNetworkID) @@ -481,7 +493,7 @@ func setupStorage( if config.ForceStartCadenceHeight != 0 { logger.Warn().Uint64("height", config.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!") if err := blocks.SetLatestCadenceHeight(config.ForceStartCadenceHeight, nil); err != nil { - return nil, err + return nil, nil, err } } @@ -500,12 +512,12 @@ func setupStorage( evmBlokcHeight := uint64(0) cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight) if err != nil { - return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err) + return nil, nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err) } snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight) if err != nil { - return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err) + return nil, nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err) } delta := storage.NewRegisterDelta(snapshot) @@ -516,16 +528,16 @@ func setupStorage( accountStatus.ToBytes(), ) if err != nil { - return nil, fmt.Errorf("could not set account status: %w", err) + return nil, nil, fmt.Errorf("could not set account status: %w", err) } err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch) if err != nil { - return nil, fmt.Errorf("could not store register updates: %w", err) + return nil, nil, fmt.Errorf("could not store register updates: %w", err) } if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID, batch); err != nil { - return nil, fmt.Errorf( + return nil, nil, fmt.Errorf( "failed to init the database for block height: %d and ID: %s, with : %w", cadenceHeight, cadenceBlock.ID, @@ -535,7 +547,7 @@ func setupStorage( err = batch.Commit(pebbleDB.Sync) if err != nil { - return nil, fmt.Errorf("could not commit register updates: %w", err) + return nil, nil, fmt.Errorf("could not commit register updates: %w", err) } logger.Info(). @@ -546,7 +558,7 @@ func setupStorage( // // TODO(JanezP): verify storage account owner is correct //} - return &Storages{ + return db, &Storages{ Storage: store, Blocks: blocks, Registers: registerStore, @@ -591,6 +603,7 @@ func Run(ctx context.Context, cfg *config.Config, ready component.ReadyFunc) err boot.StopEventIngestion() boot.StopMetricsServer() boot.StopAPIServer() + boot.StopClient() boot.StopDB() return nil diff --git a/go.mod b/go.mod index d03175466..42016a609 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22 require ( github.com/cockroachdb/pebble v1.1.1 github.com/goccy/go-json v0.10.2 + github.com/hashicorp/go-multierror v1.1.1 github.com/onflow/atree v0.8.0 github.com/onflow/cadence v1.2.2 github.com/onflow/flow-go v0.38.0-preview.0.0.20241125190444-25a8af57bea1 @@ -91,7 +92,6 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/metrics/server.go b/metrics/server.go deleted file mode 100644 index d2042f8d9..000000000 --- a/metrics/server.go +++ /dev/null @@ -1,68 +0,0 @@ -package metrics - -import ( - "context" - "errors" - "fmt" - "net" - "net/http" - "time" - - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog" -) - -// endpoint where metrics are available for scraping -const endpoint = "/metrics" - -// Server is the http server that will be serving metrics requests -type Server struct { - server *http.Server - log zerolog.Logger -} - -// NewServer creates a new server that will start on the specified port, -// and responds to only the `/metrics` endpoint -func NewServer(log zerolog.Logger, port int) *Server { - log = log.With().Str("component", "metrics-server").Logger() - addr := fmt.Sprintf(":%d", port) - - mux := http.NewServeMux() - mux.Handle(endpoint, promhttp.Handler()) - - return &Server{ - server: &http.Server{Addr: addr, Handler: mux}, - log: log, - } -} - -// Start starts the server and returns a channel which is closed -// when the server is ready to serve requests. -func (s *Server) Start() (<-chan struct{}, error) { - ready := make(chan struct{}) - defer close(ready) - - listener, err := net.Listen("tcp", s.server.Addr) - if err != nil { - s.log.Err(err).Msg("error listening on address") - return nil, err - } - - go func() { - err := s.server.Serve(listener) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - s.log.Err(err).Msg("error serving metrics server") - } - }() - - return ready, nil -} - -func (s *Server) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := s.server.Shutdown(ctx); err != nil { - s.log.Err(err).Msg("error shutting down metrics server") - } -} diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index bb00e5484..384f0da8b 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -636,8 +636,9 @@ func defaultReplayerConfig() replayer.Config { } func setupStore(t *testing.T) (*pebble.Storage, *pebble.RegisterStorage) { - store, err := pebble.New(t.TempDir(), zerolog.Nop()) + db, err := pebble.OpenDB(t.TempDir()) require.NoError(t, err) + store := pebble.New(db, zerolog.Nop()) storageAddress := evm.StorageAccountAddress(flowGo.Emulator) registerStore := pebble.NewRegisterStorage(store, storageAddress) diff --git a/services/replayer/blocks_provider_test.go b/services/replayer/blocks_provider_test.go index d318ae3d6..2fe091356 100644 --- a/services/replayer/blocks_provider_test.go +++ b/services/replayer/blocks_provider_test.go @@ -279,8 +279,9 @@ func TestGetSnapshotAt(t *testing.T) { func setupBlocksDB(t *testing.T) (*pebble.Storage, storage.BlockIndexer) { dir := t.TempDir() - db, err := pebble.New(dir, zerolog.Nop()) + pebbleDB, err := pebble.OpenDB(dir) require.NoError(t, err) + db := pebble.New(pebbleDB, zerolog.Nop()) batch := db.NewBatch() chainID := flowGo.Emulator diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 4025aa3c0..cddfc9297 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/hashicorp/go-multierror" "github.com/onflow/cadence" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-go-sdk" @@ -34,6 +35,10 @@ func (s *sporkClient) GetEventsForHeightRange( return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } +func (s *sporkClient) Close() error { + return s.client.Close() +} + type sporkClients []*sporkClient // addSpork will add a new spork host defined by the first and last height boundary in that spork. @@ -102,7 +107,7 @@ func (s *sporkClients) continuous() bool { // that shadows the original access Client function. type CrossSporkClient struct { logger zerolog.Logger - sporkClients *sporkClients + sporkClients sporkClients currentSporkFirstHeight uint64 access.Client } @@ -127,7 +132,7 @@ func NewCrossSporkClient( nodeRootBlockHeight = info.NodeRootBlockHeight } - clients := &sporkClients{} + clients := sporkClients{} for _, c := range pastSporks { if err := clients.add(logger, c); err != nil { return nil, err @@ -243,3 +248,19 @@ func (c *CrossSporkClient) GetEventsForHeightRange( } return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } + +func (c *CrossSporkClient) Close() error { + var merr *multierror.Error + + for _, client := range c.sporkClients { + if err := client.Close(); err != nil { + merr = multierror.Append(merr, err) + } + } + err := c.Client.Close() + if err != nil { + merr = multierror.Append(merr, err) + } + + return merr.ErrorOrNil() +} diff --git a/storage/pebble/db.go b/storage/pebble/db.go new file mode 100644 index 000000000..5e3c0f89c --- /dev/null +++ b/storage/pebble/db.go @@ -0,0 +1,62 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/bloom" +) + +// OpenDB opens a new pebble database at the provided directory. +func OpenDB(dir string) (*pebble.DB, error) { + cache := pebble.NewCache(1 << 20) + defer cache.Unref() + + // currently pebble is only used for registers + opts := &pebble.Options{ + Cache: cache, + Comparer: NewMVCCComparer(), + FormatMajorVersion: pebble.FormatNewest, + L0CompactionThreshold: 2, + L0StopWritesThreshold: 1000, + // When the maximum number of bytes for a level is exceeded, compaction is requested. + LBaseMaxBytes: 64 << 20, // 64 MB + Levels: make([]pebble.LevelOptions, 7), + MaxOpenFiles: 16384, + // Writes are stopped when the sum of the queued memtable sizes exceeds MemTableStopWritesThreshold*MemTableSize. + MemTableSize: 64 << 20, + MemTableStopWritesThreshold: 4, + // The default is 1. + MaxConcurrentCompactions: func() int { return 4 }, + } + + for i := 0; i < len(opts.Levels); i++ { + l := &opts.Levels[i] + // The default is 4KiB (uncompressed), which is too small + // for good performance (esp. on stripped storage). + l.BlockSize = 32 << 10 // 32 KB + l.IndexBlockSize = 256 << 10 // 256 KB + + // The bloom filter speedsup our SeekPrefixGE by skipping + // sstables that do not contain the prefix + l.FilterPolicy = bloom.FilterPolicy(MinLookupKeyLen) + l.FilterType = pebble.TableFilter + + if i > 0 { + // L0 starts at 2MiB, each level is 2x the previous. + l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 + } + l.EnsureDefaults() + } + + // Splitting sstables during flush allows increased compaction flexibility and concurrency when those + // tables are compacted to lower levels. + opts.FlushSplitBytes = opts.Levels[0].TargetFileSize + opts.EnsureDefaults() + + db, err := pebble.Open(dir, opts) + if err != nil { + return nil, fmt.Errorf("failed to open db for dir: %s, with: %w", dir, err) + } + return db, nil +} diff --git a/storage/pebble/storage.go b/storage/pebble/storage.go index eb33ad97e..94c420cf1 100644 --- a/storage/pebble/storage.go +++ b/storage/pebble/storage.go @@ -2,11 +2,8 @@ package pebble import ( "errors" - "fmt" "io" - "github.com/cockroachdb/pebble/bloom" - "github.com/cockroachdb/pebble" "github.com/rs/zerolog" @@ -18,64 +15,12 @@ type Storage struct { log zerolog.Logger } -// New creates a new storage instance using the provided dir location as the storage directory. -func New(dir string, log zerolog.Logger) (*Storage, error) { - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - - log = log.With().Str("component", "storage").Logger() - - // currently pebble is only used for registers - opts := &pebble.Options{ - Cache: cache, - Comparer: NewMVCCComparer(), - FormatMajorVersion: pebble.FormatNewest, - L0CompactionThreshold: 2, - L0StopWritesThreshold: 1000, - // When the maximum number of bytes for a level is exceeded, compaction is requested. - LBaseMaxBytes: 64 << 20, // 64 MB - Levels: make([]pebble.LevelOptions, 7), - MaxOpenFiles: 16384, - // Writes are stopped when the sum of the queued memtable sizes exceeds MemTableStopWritesThreshold*MemTableSize. - MemTableSize: 64 << 20, - MemTableStopWritesThreshold: 4, - // The default is 1. - MaxConcurrentCompactions: func() int { return 4 }, - } - - for i := 0; i < len(opts.Levels); i++ { - l := &opts.Levels[i] - // The default is 4KiB (uncompressed), which is too small - // for good performance (esp. on stripped storage). - l.BlockSize = 32 << 10 // 32 KB - l.IndexBlockSize = 256 << 10 // 256 KB - - // The bloom filter speedsup our SeekPrefixGE by skipping - // sstables that do not contain the prefix - l.FilterPolicy = bloom.FilterPolicy(MinLookupKeyLen) - l.FilterType = pebble.TableFilter - - if i > 0 { - // L0 starts at 2MiB, each level is 2x the previous. - l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 - } - l.EnsureDefaults() - } - - // Splitting sstables during flush allows increased compaction flexibility and concurrency when those - // tables are compacted to lower levels. - opts.FlushSplitBytes = opts.Levels[0].TargetFileSize - opts.EnsureDefaults() - - db, err := pebble.Open(dir, opts) - if err != nil { - return nil, fmt.Errorf("failed to open db for dir: %s, with: %w", dir, err) - } - +// New creates a new storage instance using the provided db. +func New(db *pebble.DB, log zerolog.Logger) *Storage { return &Storage{ db: db, log: log, - }, nil + } } // set key-value pair identified by key code (which act as an entity identifier). @@ -116,7 +61,3 @@ func (s *Storage) get(keyCode byte, key ...[]byte) ([]byte, error) { func (s *Storage) NewBatch() *pebble.Batch { return s.db.NewBatch() } - -func (s *Storage) Close() error { - return s.db.Close() -} diff --git a/storage/pebble/storage_test.go b/storage/pebble/storage_test.go index 5e80dd90c..a5aada0c5 100644 --- a/storage/pebble/storage_test.go +++ b/storage/pebble/storage_test.go @@ -172,8 +172,9 @@ func TestBatch(t *testing.T) { func runDB(name string, t *testing.T, f func(t *testing.T, db *Storage)) { dir := t.TempDir() - db, err := New(dir, zerolog.New(zerolog.NewTestWriter(t))) + pebbleDB, err := OpenDB(dir) require.NoError(t, err) + db := New(pebbleDB, zerolog.New(zerolog.NewTestWriter(t))) t.Run(name, func(t *testing.T) { f(t, db) diff --git a/storage/register_delta_test.go b/storage/register_delta_test.go index 10c2e501e..165d7410c 100644 --- a/storage/register_delta_test.go +++ b/storage/register_delta_test.go @@ -215,8 +215,9 @@ func Test_RegisterDeltaWithStorage(t *testing.T) { func runDB(name string, t *testing.T, f func(t *testing.T, db *pebbleStorage.Storage)) { dir := t.TempDir() - db, err := pebbleStorage.New(dir, zerolog.New(zerolog.NewTestWriter(t))) + pebbleDB, err := pebbleStorage.OpenDB(dir) require.NoError(t, err) + db := pebbleStorage.New(pebbleDB, zerolog.New(zerolog.NewTestWriter(t))) t.Run(name, func(t *testing.T) { f(t, db)