Skip to content

Commit

Permalink
Merge pull request #678 from onflow/janez/close-clients
Browse files Browse the repository at this point in the history
Close the AN clients
  • Loading branch information
janezpodhostnik authored Nov 27, 2024
2 parents c98427b + 5b77d7b commit 528bd94
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 161 deletions.
61 changes: 37 additions & 24 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import (
"math"
"time"

"github.com/onflow/flow-go/module/component"

pebbleDB "github.com/cockroachdb/pebble"

"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
"github.com/onflow/flow-go/fvm/environment"
"github.com/onflow/flow-go/fvm/evm"
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
flowMetrics "github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/util"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"

"github.com/onflow/flow-evm-gateway/api"
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/ingestion"
Expand Down Expand Up @@ -57,9 +57,10 @@ type Bootstrap struct {
publishers *Publishers
collector metrics.Collector
server *api.Server
metrics *metrics.Server
metrics *flowMetrics.Server
events *ingestion.Engine
profiler *api.ProfileServer
db *pebbleDB.DB
}

func New(config *config.Config) (*Bootstrap, error) {
Expand All @@ -72,7 +73,7 @@ func New(config *config.Config) (*Bootstrap, error) {
return nil, err
}

storages, err := setupStorage(config, client, logger)
db, storages, err := setupStorage(config, client, logger)
if err != nil {
return nil, err
}
Expand All @@ -83,6 +84,7 @@ func New(config *config.Config) (*Bootstrap, error) {
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
db: db,
storages: storages,
logger: logger,
config: config,
Expand Down Expand Up @@ -334,15 +336,14 @@ func (b *Bootstrap) StopAPIServer() {
b.server.Stop()
}

func (b *Bootstrap) StartMetricsServer(_ context.Context) error {
func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {
b.logger.Info().Msg("bootstrap starting metrics server")

b.metrics = metrics.NewServer(b.logger, b.config.MetricsPort)
started, err := b.metrics.Start()
b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort))
err := util.WaitClosed(ctx, b.metrics.Ready())
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
<-started

return nil
}
Expand All @@ -352,7 +353,7 @@ func (b *Bootstrap) StopMetricsServer() {
return
}
b.logger.Warn().Msg("shutting down metrics server")
b.metrics.Stop()
<-b.metrics.Done()
}

func (b *Bootstrap) StartProfilerServer(_ context.Context) error {
Expand Down Expand Up @@ -388,15 +389,25 @@ func (b *Bootstrap) StopProfilerServer() {
}

func (b *Bootstrap) StopDB() {
if b.storages == nil || b.storages.Storage == nil {
if b.db == nil {
return
}
err := b.storages.Storage.Close()
err := b.db.Close()
if err != nil {
b.logger.Err(err).Msg("PebbleDB graceful shutdown failed")
}
}

func (b *Bootstrap) StopClient() {
if b.client == nil {
return
}
err := b.client.Close()
if err != nil {
b.logger.Err(err).Msg("CrossSporkClient graceful shutdown failed")
}
}

// StartEngine starts provided engine and panics if there are startup errors.
func StartEngine(
ctx context.Context,
Expand Down Expand Up @@ -466,12 +477,13 @@ func setupStorage(
config *config.Config,
client *requester.CrossSporkClient,
logger zerolog.Logger,
) (*Storages, error) {
) (*pebbleDB.DB, *Storages, error) {
// create pebble storage from the provided database root directory
store, err := pebble.New(config.DatabaseDir, logger)
db, err := pebble.OpenDB(config.DatabaseDir)
if err != nil {
return nil, err
return nil, nil, err
}
store := pebble.New(db, logger)

blocks := pebble.NewBlocks(store, config.FlowNetworkID)
storageAddress := evm.StorageAccountAddress(config.FlowNetworkID)
Expand All @@ -481,7 +493,7 @@ func setupStorage(
if config.ForceStartCadenceHeight != 0 {
logger.Warn().Uint64("height", config.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!")
if err := blocks.SetLatestCadenceHeight(config.ForceStartCadenceHeight, nil); err != nil {
return nil, err
return nil, nil, err
}
}

Expand All @@ -500,12 +512,12 @@ func setupStorage(
evmBlokcHeight := uint64(0)
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
return nil, nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
}

snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight)
if err != nil {
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
return nil, nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
}

delta := storage.NewRegisterDelta(snapshot)
Expand All @@ -516,16 +528,16 @@ func setupStorage(
accountStatus.ToBytes(),
)
if err != nil {
return nil, fmt.Errorf("could not set account status: %w", err)
return nil, nil, fmt.Errorf("could not set account status: %w", err)
}

err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch)
if err != nil {
return nil, fmt.Errorf("could not store register updates: %w", err)
return nil, nil, fmt.Errorf("could not store register updates: %w", err)
}

if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID, batch); err != nil {
return nil, fmt.Errorf(
return nil, nil, fmt.Errorf(
"failed to init the database for block height: %d and ID: %s, with : %w",
cadenceHeight,
cadenceBlock.ID,
Expand All @@ -535,7 +547,7 @@ func setupStorage(

err = batch.Commit(pebbleDB.Sync)
if err != nil {
return nil, fmt.Errorf("could not commit register updates: %w", err)
return nil, nil, fmt.Errorf("could not commit register updates: %w", err)
}

logger.Info().
Expand All @@ -546,7 +558,7 @@ func setupStorage(
// // TODO(JanezP): verify storage account owner is correct
//}

return &Storages{
return db, &Storages{
Storage: store,
Blocks: blocks,
Registers: registerStore,
Expand Down Expand Up @@ -591,6 +603,7 @@ func Run(ctx context.Context, cfg *config.Config, ready component.ReadyFunc) err
boot.StopEventIngestion()
boot.StopMetricsServer()
boot.StopAPIServer()
boot.StopClient()
boot.StopDB()

return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/cockroachdb/pebble v1.1.1
github.com/goccy/go-json v0.10.2
github.com/hashicorp/go-multierror v1.1.1
github.com/onflow/atree v0.8.0
github.com/onflow/cadence v1.2.2
github.com/onflow/flow-go v0.38.0-preview.0.0.20241125190444-25a8af57bea1
Expand Down Expand Up @@ -91,7 +92,6 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
68 changes: 0 additions & 68 deletions metrics/server.go

This file was deleted.

3 changes: 2 additions & 1 deletion services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,9 @@ func defaultReplayerConfig() replayer.Config {
}

func setupStore(t *testing.T) (*pebble.Storage, *pebble.RegisterStorage) {
store, err := pebble.New(t.TempDir(), zerolog.Nop())
db, err := pebble.OpenDB(t.TempDir())
require.NoError(t, err)
store := pebble.New(db, zerolog.Nop())

storageAddress := evm.StorageAccountAddress(flowGo.Emulator)
registerStore := pebble.NewRegisterStorage(store, storageAddress)
Expand Down
3 changes: 2 additions & 1 deletion services/replayer/blocks_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ func TestGetSnapshotAt(t *testing.T) {

func setupBlocksDB(t *testing.T) (*pebble.Storage, storage.BlockIndexer) {
dir := t.TempDir()
db, err := pebble.New(dir, zerolog.Nop())
pebbleDB, err := pebble.OpenDB(dir)
require.NoError(t, err)
db := pebble.New(pebbleDB, zerolog.Nop())
batch := db.NewBatch()

chainID := flowGo.Emulator
Expand Down
25 changes: 23 additions & 2 deletions services/requester/cross-spork_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/hashicorp/go-multierror"
"github.com/onflow/cadence"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-go-sdk"
Expand Down Expand Up @@ -34,6 +35,10 @@ func (s *sporkClient) GetEventsForHeightRange(
return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

func (s *sporkClient) Close() error {
return s.client.Close()
}

type sporkClients []*sporkClient

// addSpork will add a new spork host defined by the first and last height boundary in that spork.
Expand Down Expand Up @@ -102,7 +107,7 @@ func (s *sporkClients) continuous() bool {
// that shadows the original access Client function.
type CrossSporkClient struct {
logger zerolog.Logger
sporkClients *sporkClients
sporkClients sporkClients
currentSporkFirstHeight uint64
access.Client
}
Expand All @@ -127,7 +132,7 @@ func NewCrossSporkClient(
nodeRootBlockHeight = info.NodeRootBlockHeight
}

clients := &sporkClients{}
clients := sporkClients{}
for _, c := range pastSporks {
if err := clients.add(logger, c); err != nil {
return nil, err
Expand Down Expand Up @@ -243,3 +248,19 @@ func (c *CrossSporkClient) GetEventsForHeightRange(
}
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

func (c *CrossSporkClient) Close() error {
var merr *multierror.Error

for _, client := range c.sporkClients {
if err := client.Close(); err != nil {
merr = multierror.Append(merr, err)
}
}
err := c.Client.Close()
if err != nil {
merr = multierror.Append(merr, err)
}

return merr.ErrorOrNil()
}
Loading

0 comments on commit 528bd94

Please sign in to comment.