diff --git a/cmd/cartesi-rollups-claimer/root/root.go b/cmd/cartesi-rollups-claimer/root/root.go index af357f090..4e3472b1d 100644 --- a/cmd/cartesi-rollups-claimer/root/root.go +++ b/cmd/cartesi-rollups-claimer/root/root.go @@ -54,7 +54,7 @@ func init() { } func run(cmd *cobra.Command, args []string) { - cobra.CheckErr(claimer.Create(createInfo, &claimerService)) + cobra.CheckErr(claimer.Create(&createInfo, &claimerService)) claimerService.CreateDefaultHandlers("/" + claimerService.Name) cobra.CheckErr(claimerService.Serve()) } diff --git a/cmd/cartesi-rollups-evm-reader/root/root.go b/cmd/cartesi-rollups-evm-reader/root/root.go index 7d7f43e28..97f5e9f9f 100644 --- a/cmd/cartesi-rollups-evm-reader/root/root.go +++ b/cmd/cartesi-rollups-evm-reader/root/root.go @@ -4,7 +4,9 @@ package root import ( + "github.com/cartesi/rollups-node/internal/config" "github.com/cartesi/rollups-node/internal/evmreader" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/service" "github.com/spf13/cobra" @@ -24,8 +26,11 @@ var ( TelemetryAddress: ":10000", Impl: &readerService, }, - DefaultBlockString: "safe", + EvmReaderPersistentConfig: model.EvmReaderPersistentConfig{ + DefaultBlock: model.DefaultBlockStatusSafe, + }, } + DefaultBlockString = "safe" ) var Cmd = &cobra.Command{ @@ -38,8 +43,8 @@ var Cmd = &cobra.Command{ func init() { createInfo.LoadEnv() - Cmd.Flags().StringVarP(&createInfo.DefaultBlockString, - "default-block", "d", createInfo.DefaultBlockString, + Cmd.Flags().StringVarP(&DefaultBlockString, + "default-block", "d", DefaultBlockString, `Default block to be used when fetching new blocks. One of 'latest', 'safe', 'pending', 'finalized'`) @@ -78,8 +83,13 @@ func init() { } func run(cmd *cobra.Command, args []string) { - ready := make(chan struct{}, 1) + if cmd.Flags().Changed("default-block") { + var err error + createInfo.DefaultBlock, err = config.ToDefaultBlockFromString(DefaultBlockString) + cobra.CheckErr(err) + } + cobra.CheckErr(evmreader.Create(&createInfo, &readerService)) readerService.CreateDefaultHandlers("/" + readerService.Name) - cobra.CheckErr(readerService.Start(nil, ready)) + cobra.CheckErr(readerService.Serve()) } diff --git a/cmd/cartesi-rollups-node/root/root.go b/cmd/cartesi-rollups-node/root/root.go index 3191fb93e..6ede74edc 100644 --- a/cmd/cartesi-rollups-node/root/root.go +++ b/cmd/cartesi-rollups-node/root/root.go @@ -4,84 +4,44 @@ package root import ( - "context" - "log/slog" - "os" - "os/signal" - "syscall" - "time" - - "github.com/cartesi/rollups-node/internal/config" "github.com/cartesi/rollups-node/internal/node" - "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/service" "github.com/spf13/cobra" ) -const CMD_NAME = "node" - var ( // Should be overridden during the final release build with ldflags // to contain the actual version number buildVersion = "devel" - Cmd = &cobra.Command{ - Use: CMD_NAME, - Short: "Runs the Cartesi Rollups Node", - Long: "Runs the Cartesi Rollups Node as a single process", - RunE: run, + nodeService = node.Service{} + createInfo = node.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "supervisor", + ProcOwner: true, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: ":10001", + Impl: &nodeService, + }, } - enableClaimSubmission bool ) +var Cmd = &cobra.Command{ + Use: createInfo.Name, + Short: "Runs " + createInfo.Name, + Long: "Runs " + createInfo.Name + " as a single process", + Run: run, +} + func init() { - Cmd.Flags().BoolVar(&enableClaimSubmission, - "claim-submission", true, + createInfo.LoadEnv() + Cmd.Flags().BoolVar(&createInfo.EnableClaimSubmission, + "claim-submission", createInfo.EnableClaimSubmission, "enable or disable claim submission (reader mode)") } -func run(cmd *cobra.Command, args []string) error { - startTime := time.Now() - - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() - - cfg := config.FromEnv() - if cmd.Flags().Lookup("claim-submission").Changed { - cfg.FeatureClaimSubmissionEnabled = enableClaimSubmission - if enableClaimSubmission && cfg.Auth == nil { - cfg.Auth = config.AuthFromEnv() - } - } - - database, err := repository.Connect(ctx, cfg.PostgresEndpoint.Value) - if err != nil { - slog.Error("Node couldn't connect to the database", "error", err) - os.Exit(1) - } - defer database.Close() - - // create the node supervisor - supervisor, err := node.Setup(ctx, cfg, database) - if err != nil { - slog.Error("Node exited with an error", "error", err) - os.Exit(1) - } - - // logs startup time - ready := make(chan struct{}, 1) - go func() { - select { - case <-ready: - duration := time.Since(startTime) - slog.Info("Node is ready", "after", duration) - case <-ctx.Done(): - } - }() - - // start supervisor - if err := supervisor.Start(ctx, ready); err != nil { - slog.Error("Node exited with an error", "error", err) - os.Exit(1) - } - - return err +func run(cmd *cobra.Command, args []string) { + cobra.CheckErr(node.Create(&createInfo, &nodeService)) + nodeService.CreateDefaultHandlers("") + cobra.CheckErr(nodeService.Serve()) } diff --git a/internal/advancer/advancer.go b/internal/advancer/advancer.go index e205ba28a..743f810d3 100644 --- a/internal/advancer/advancer.go +++ b/internal/advancer/advancer.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" "net/http" "time" @@ -43,25 +42,11 @@ type IAdvancerMachines interface { Apps() []Address } -type Advancer struct { - repository IAdvancerRepository - machines IAdvancerMachines -} - type Service struct { service.Service - Advancer - inspector *inspect.Inspector -} - -func New(machines IAdvancerMachines, repository IAdvancerRepository) (*Advancer, error) { - if machines == nil { - return nil, ErrInvalidMachines - } - if repository == nil { - return nil, ErrInvalidRepository - } - return &Advancer{machines: machines, repository: repository}, nil + repository IAdvancerRepository + machines IAdvancerMachines + inspector inspect.Inspector } type CreateInfo struct { @@ -92,29 +77,41 @@ func Create(c *CreateInfo, s *Service) error { return err } - if c.Repository == nil { - c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) - if err != nil { - return err + if s.repository == nil { + if c.Repository == nil { + c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) + if err != nil { + return err + } } + s.repository = c.Repository } - s.repository = c.Repository - if c.Machines == nil { - c.Machines, err = machines.Load(s.Context, c.Repository, c.MachineServerVerbosity.Value) - if err != nil { - return err + if s.machines == nil { + if c.Machines == nil { + c.Machines, err = machines.Load(s.Context, + c.Repository, c.MachineServerVerbosity.Value, s.Logger) + if err != nil { + return err + } } + s.machines = c.Machines } - s.machines = c.Machines - if s.Service.ServeMux == nil { - if c.CreateInfo.ServeMux == nil { - c.ServeMux = http.NewServeMux() + // allow partial construction for testing + if c.Machines != nil { + s.inspector = inspect.Inspector{ + IInspectMachines: c.Machines, + } + if s.Service.ServeMux == nil { + if c.CreateInfo.ServeMux == nil { + c.ServeMux = http.NewServeMux() + } + s.ServeMux = c.ServeMux } + s.ServeMux.Handle("/inspect/{dapp}", http.Handler(&s.inspector)) + s.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(&s.inspector)) } - s.Service.ServeMux.Handle("/inspect/{dapp}", http.Handler(s.inspector)) - s.Service.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(s.inspector)) return nil } @@ -144,7 +141,7 @@ func (v *Service) String() string { // It gets unprocessed inputs from the repository, // runs them through the cartesi machine, // and updates the repository with the outputs. -func (advancer *Advancer) Step(ctx context.Context) error { +func (advancer *Service) Step(ctx context.Context) error { // Dynamically updates the list of machines err := advancer.machines.UpdateMachines(ctx) if err != nil { @@ -154,7 +151,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { apps := advancer.machines.Apps() // Gets the unprocessed inputs (of all apps) from the repository. - slog.Debug("advancer: querying for unprocessed inputs") + advancer.Logger.Debug("querying for unprocessed inputs") inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps) if err != nil { return err @@ -162,7 +159,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { // Processes each set of inputs. for app, inputs := range inputs { - slog.Debug(fmt.Sprintf("advancer: processing %d input(s) from %v", len(inputs), app)) + advancer.Logger.Debug(fmt.Sprintf("processing %d input(s) from %v", len(inputs), app)) err := advancer.process(ctx, app, inputs) if err != nil { return err @@ -181,7 +178,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { } // process sequentially processes inputs from the the application. -func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { +func (advancer *Service) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. machine, exists := advancer.machines.GetAdvanceMachine(app) if !exists { @@ -195,7 +192,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In // FIXME if theres a change in epoch id call update epochs for _, input := range inputs { - slog.Info("advancer: Processing input", "app", app, "id", input.Id, "index", input.Index) + advancer.Logger.Info("Processing input", "app", app, "id", input.Id, "index", input.Index) // Sends the input to the cartesi machine. res, err := machine.Advance(ctx, input.RawData, input.Index) diff --git a/internal/advancer/advancer_test.go b/internal/advancer/advancer_test.go index acc27b539..98c393d8d 100644 --- a/internal/advancer/advancer_test.go +++ b/internal/advancer/advancer_test.go @@ -15,6 +15,7 @@ import ( "github.com/cartesi/rollups-node/internal/advancer/machines" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/pkg/service" "github.com/stretchr/testify/suite" ) @@ -25,41 +26,16 @@ func TestAdvancer(t *testing.T) { type AdvancerSuite struct{ suite.Suite } -func (s *AdvancerSuite) TestNew() { - s.Run("Ok", func() { - require := s.Require() - machines := newMockMachines() - machines.Map[randomAddress()] = &MockMachine{} - var repository IAdvancerRepository = &MockRepository{} - advancer, err := New(machines, repository) - require.NotNil(advancer) - require.Nil(err) - }) - - s.Run("InvalidMachines", func() { - require := s.Require() - var machines IAdvancerMachines = nil - var repository IAdvancerRepository = &MockRepository{} - advancer, err := New(machines, repository) - require.Nil(advancer) - require.Error(err) - require.Equal(ErrInvalidMachines, err) - }) - - s.Run("InvalidRepository", func() { - require := s.Require() - machines := newMockMachines() - machines.Map[randomAddress()] = &MockMachine{} - var repository IAdvancerRepository = nil - advancer, err := New(machines, repository) - require.Nil(advancer) - require.Error(err) - require.Equal(ErrInvalidRepository, err) - }) -} - -func (s *AdvancerSuite) TestPoller() { - s.T().Skip("TODO") +func New(m IAdvancerMachines, r IAdvancerRepository) (*Service, error) { + s := &Service{ + machines: m, + repository: r, + } + return s, Create(&CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "advancer", + }, + }, s) } func (s *AdvancerSuite) TestRun() { @@ -105,15 +81,15 @@ func (s *AdvancerSuite) TestRun() { } func (s *AdvancerSuite) TestProcess() { - setup := func() (IAdvancerMachines, *MockRepository, *Advancer, Address) { + setup := func() (IAdvancerMachines, *MockRepository, *Service, Address) { + require := s.Require() + app := randomAddress() machines := newMockMachines() machines.Map[app] = &MockMachine{} repository := &MockRepository{} - advancer := &Advancer{ - machines: machines, - repository: repository, - } + advancer, err := New(machines, repository) + require.Nil(err) return machines, repository, advancer, app } diff --git a/internal/advancer/machines/machines.go b/internal/advancer/machines/machines.go index 97eafedb5..14e05f649 100644 --- a/internal/advancer/machines/machines.go +++ b/internal/advancer/machines/machines.go @@ -45,6 +45,7 @@ type Machines struct { machines map[Address]*nm.NodeMachine repository Repository verbosity cm.ServerVerbosity + Logger *slog.Logger } // Load initializes the cartesi machines. @@ -52,7 +53,12 @@ type Machines struct { // // Load does not fail when one of those machines fail to initialize. // It stores the error to be returned later and continues to initialize the other machines. -func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) { +func Load( + ctx context.Context, + repo Repository, + verbosity cm.ServerVerbosity, + logger *slog.Logger, +) (*Machines, error) { configs, err := repo.GetMachineConfigurations(ctx) if err != nil { return nil, err @@ -63,7 +69,7 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* for _, config := range configs { // Creates the machine. - machine, err := createMachine(ctx, verbosity, config) + machine, err := createMachine(ctx, verbosity, config, logger) if err != nil { err = fmt.Errorf("failed to create machine from snapshot (%v): %w", config, err) errs = errors.Join(errs, err) @@ -71,7 +77,7 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* } // Advances the machine until it catches up with the state of the database (if necessary). - err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs) + err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs, logger) if err != nil { err = fmt.Errorf("failed to advance cartesi machine (%v): %w", config, err) errs = errors.Join(errs, err, machine.Close()) @@ -81,7 +87,12 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* machines[config.AppAddress] = machine } - return &Machines{machines: machines, repository: repo, verbosity: verbosity}, errs + return &Machines{ + machines: machines, + repository: repo, + verbosity: verbosity, + Logger: logger, + }, errs } func (m *Machines) UpdateMachines(ctx context.Context) error { @@ -95,15 +106,15 @@ func (m *Machines) UpdateMachines(ctx context.Context) error { continue } - machine, err := createMachine(ctx, m.verbosity, config) + machine, err := createMachine(ctx, m.verbosity, config, m.Logger) if err != nil { - slog.Error("advancer: Failed to create machine", "app", config.AppAddress, "error", err) + m.Logger.Error("Failed to create machine", "app", config.AppAddress, "error", err) continue } - err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs) + err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs, m.Logger) if err != nil { - slog.Error("Failed to sync the machine", "app", config.AppAddress, "error", err) + m.Logger.Error("Failed to sync the machine", "app", config.AppAddress, "error", err) machine.Close() continue } @@ -158,7 +169,7 @@ func (m *Machines) RemoveAbsent(configs []*MachineConfig) { } for address, machine := range m.machines { if !configMap[address] { - slog.Info("advancer: Application was disabled, shutting down machine", "application", address) + m.Logger.Info("Application was disabled, shutting down machine", "application", address) machine.Close() delete(m.machines, address) } @@ -200,7 +211,7 @@ func (m *Machines) Close() error { err := closeMachines(m.machines) if err != nil { - slog.Error(fmt.Sprintf("failed to close some machines: %v", err)) + m.Logger.Error(fmt.Sprintf("failed to close some machines: %v", err)) } return err } @@ -227,17 +238,18 @@ func closeMachines(machines map[Address]*nm.NodeMachine) (err error) { func createMachine(ctx context.Context, verbosity cm.ServerVerbosity, config *MachineConfig, + logger *slog.Logger, ) (*nm.NodeMachine, error) { - slog.Info("advancer: creating machine", "application", config.AppAddress, + logger.Info("creating machine", "application", config.AppAddress, "template-path", config.SnapshotPath) - slog.Debug("advancer: instantiating remote machine server", "application", config.AppAddress) + logger.Debug("instantiating remote machine server", "application", config.AppAddress) // Starts the server. address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr) if err != nil { return nil, err } - slog.Info("advancer: loading machine on server", "application", config.AppAddress, + logger.Info("loading machine on server", "application", config.AppAddress, "remote-machine", address, "template-path", config.SnapshotPath) // Creates a CartesiMachine from the snapshot. runtimeConfig := &emulator.MachineRuntimeConfig{} @@ -246,7 +258,7 @@ func createMachine(ctx context.Context, return nil, errors.Join(err, cm.StopServer(address)) } - slog.Debug("advancer: machine loaded on server", "application", config.AppAddress, + logger.Debug("machine loaded on server", "application", config.AppAddress, "remote-machine", address, "template-path", config.SnapshotPath) // Creates a RollupsMachine from the CartesiMachine. @@ -276,9 +288,10 @@ func catchUp(ctx context.Context, app Address, machine *nm.NodeMachine, processedInputs uint64, + logger *slog.Logger, ) error { - slog.Info("advancer: catching up unprocessed inputs", "app", app) + logger.Info("catching up unprocessed inputs", "app", app) inputs, err := repo.GetProcessedInputs(ctx, app, processedInputs) if err != nil { @@ -287,7 +300,7 @@ func catchUp(ctx context.Context, for _, input := range inputs { // FIXME epoch id to epoch index - slog.Info("advancer: advancing", "app", app, "epochId", input.EpochId, + logger.Info("advancing", "app", app, "epochId", input.EpochId, "input_index", input.Index) _, err := machine.Advance(ctx, input.RawData, input.Index) if err != nil { diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index af5bf45a9..a556d178a 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -71,7 +71,7 @@ type CreateInfo struct { EthConn *ethclient.Client PostgresEndpoint config.Redacted[string] - DBConn *repository.Database + Repository *repository.Database EnableSubmission bool } @@ -80,7 +80,7 @@ type Service struct { service.Service submissionEnabled bool - DBConn *repository.Database + Repository *repository.Database EthConn *ethclient.Client TxOpts *bind.TransactOpts claimsInFlight map[address]hash // -> txHash @@ -93,35 +93,37 @@ func (c *CreateInfo) LoadEnv() { } c.BlockchainHttpEndpoint.Value = config.GetBlockchainHttpEndpoint() c.PostgresEndpoint.Value = config.GetPostgresEndpoint() + c.PollInterval = config.GetClaimerPollingInterval() + c.LogLevel = service.LogLevel(config.GetLogLevel()) } -func Create(ci CreateInfo, s *Service) error { +func Create(c *CreateInfo, s *Service) error { var err error - err = service.Create(&ci.CreateInfo, &s.Service) + err = service.Create(&c.CreateInfo, &s.Service) if err != nil { return err } - s.submissionEnabled = ci.EnableSubmission + s.submissionEnabled = c.EnableSubmission if s.EthConn == nil { - if ci.EthConn == nil { - ci.EthConn, err = ethclient.Dial(ci.BlockchainHttpEndpoint.Value) + if c.EthConn == nil { + c.EthConn, err = ethclient.Dial(c.BlockchainHttpEndpoint.Value) if err != nil { return err } } - s.EthConn = ci.EthConn + s.EthConn = c.EthConn } - if s.DBConn == nil { - if ci.DBConn == nil { - ci.DBConn, err = repository.Connect(s.Context, ci.PostgresEndpoint.Value) + if s.Repository == nil { + if c.Repository == nil { + c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) if err != nil { return err } } - s.DBConn = ci.DBConn + s.Repository = c.Repository } if s.claimsInFlight == nil { @@ -129,7 +131,7 @@ func Create(ci CreateInfo, s *Service) error { } if s.submissionEnabled && s.TxOpts == nil { - s.TxOpts, err = CreateTxOptsFromAuth(ci.Auth, s.Context, s.EthConn) + s.TxOpts, err = CreateTxOptsFromAuth(c.Auth, s.Context, s.EthConn) if err != nil { return err } @@ -182,14 +184,14 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { errs = append(errs, err) return errs } - s.Logger.Info("claimer: Claim submitted", + s.Logger.Info("Claim submitted", "app", claim.AppContractAddress, "claim", claim.EpochHash, "last_block", claim.EpochLastBlock, "tx", txHash) delete(currClaims, key) } else { - s.Logger.Warn("claimer: expected claim in flight to be in currClaims.", + s.Logger.Warn("expected claim in flight to be in currClaims.", "tx", receipt.TxHash) } delete(s.claimsInFlight, key) @@ -208,7 +210,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { if prevClaimRow, ok := prevClaims[key]; ok { err := checkClaimsConstraint(&prevClaimRow, &currClaimRow) if err != nil { - s.Logger.Error("claimer: database mismatch", + s.Logger.Error("database mismatch", "prevClaim", prevClaimRow, "currClaim", currClaimRow, "err", err, @@ -227,7 +229,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { goto nextApp } if prevEvent == nil { - s.Logger.Error("claimer: missing event", + s.Logger.Error("missing event", "claim", prevClaimRow, "err", ErrMissingEvent, ) @@ -236,7 +238,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { goto nextApp } if !claimMatchesEvent(&prevClaimRow, prevEvent) { - s.Logger.Error("claimer: event mismatch", + s.Logger.Error("event mismatch", "claim", prevClaimRow, "event", prevEvent, "err", ErrEventMismatch, @@ -258,7 +260,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { if currEvent != nil { if !claimMatchesEvent(&currClaimRow, currEvent) { - s.Logger.Error("claimer: event mismatch", + s.Logger.Error("event mismatch", "claim", currClaimRow, "event", currEvent, "err", ErrEventMismatch, @@ -282,7 +284,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { errs = append(errs, err) goto nextApp } - s.Logger.Info("claimer: Submitting claim to blockchain", + s.Logger.Info("Submitting claim to blockchain", "app", currClaimRow.AppContractAddress, "claim", currClaimRow.EpochHash, "last_block", currClaimRow.EpochLastBlock, diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go index 2b243072b..9ff7853f8 100644 --- a/internal/claimer/side-effects.go +++ b/internal/claimer/side-effects.go @@ -52,7 +52,7 @@ func (s *Service) selectClaimPairsPerApp() ( map[address]claimRow, error, ) { - computed, accepted, err := s.DBConn.SelectClaimPairsPerApp(s.Context) + computed, accepted, err := s.Repository.SelectClaimPairsPerApp(s.Context) if err != nil { s.Logger.Error("selectClaimPairsPerApp:failed", "service", s.Name, @@ -71,7 +71,7 @@ func (s *Service) updateEpochWithSubmittedClaim( claim *claimRow, txHash hash, ) error { - err := s.DBConn.UpdateEpochWithSubmittedClaim(s.Context, claim.EpochID, txHash) + err := s.Repository.UpdateEpochWithSubmittedClaim(s.Context, claim.EpochID, txHash) if err != nil { s.Logger.Error("updateEpochWithSubmittedClaim:failed", "service", s.Name, diff --git a/internal/config/config.go b/internal/config/config.go index aba2abfff..9ad9200fe 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,39 +8,8 @@ package config import ( "fmt" "os" - - "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" ) -// NodeConfig contains all the Node variables. -// See the corresponding environment variable for the variable documentation. -type NodeConfig struct { - LogLevel LogLevel - LogPrettyEnabled bool - BlockchainID uint64 - BlockchainHttpEndpoint Redacted[string] - BlockchainWsEndpoint Redacted[string] - LegacyBlockchainEnabled bool - EvmReaderDefaultBlock DefaultBlock - EvmReaderRetryPolicyMaxRetries uint64 - EvmReaderRetryPolicyMaxDelay Duration - BlockchainBlockTimeout int - ContractsInputBoxAddress string - ContractsInputBoxDeploymentBlockNumber int64 - SnapshotDir string - PostgresEndpoint Redacted[string] - HttpAddress string - HttpPort int - FeatureClaimSubmissionEnabled bool - FeatureMachineHashCheckEnabled bool - Auth Auth - AdvancerPollingInterval Duration - ValidatorPollingInterval Duration - ClaimerPollingInterval Duration - // Temporary - MachineServerVerbosity cartesimachine.ServerVerbosity -} - // Auth is used to sign transactions. type Auth any @@ -70,38 +39,6 @@ func (r Redacted[T]) String() string { return "[REDACTED]" } -// FromEnv loads the config from environment variables. -func FromEnv() NodeConfig { - var config NodeConfig - config.LogLevel = GetLogLevel() - config.LogPrettyEnabled = GetLogPrettyEnabled() - config.BlockchainID = GetBlockchainId() - config.BlockchainHttpEndpoint = Redacted[string]{GetBlockchainHttpEndpoint()} - config.BlockchainWsEndpoint = Redacted[string]{GetBlockchainWsEndpoint()} - config.LegacyBlockchainEnabled = GetLegacyBlockchainEnabled() - config.EvmReaderDefaultBlock = GetEvmReaderDefaultBlock() - config.EvmReaderRetryPolicyMaxRetries = GetEvmReaderRetryPolicyMaxRetries() - config.EvmReaderRetryPolicyMaxDelay = GetEvmReaderRetryPolicyMaxDelay() - config.BlockchainBlockTimeout = GetBlockchainBlockTimeout() - config.ContractsInputBoxAddress = GetContractsInputBoxAddress() - config.ContractsInputBoxDeploymentBlockNumber = GetContractsInputBoxDeploymentBlockNumber() - config.SnapshotDir = GetSnapshotDir() - config.PostgresEndpoint = Redacted[string]{GetPostgresEndpoint()} - config.HttpAddress = GetHttpAddress() - config.HttpPort = GetHttpPort() - config.FeatureClaimSubmissionEnabled = GetFeatureClaimSubmissionEnabled() - config.FeatureMachineHashCheckEnabled = GetFeatureMachineHashCheckEnabled() - if config.FeatureClaimSubmissionEnabled { - config.Auth = AuthFromEnv() - } - config.AdvancerPollingInterval = GetAdvancerPollingInterval() - config.ValidatorPollingInterval = GetValidatorPollingInterval() - config.ClaimerPollingInterval = GetClaimerPollingInterval() - // Temporary. - config.MachineServerVerbosity = cartesimachine.ServerVerbosity(GetMachineServerVerbosity()) - return config -} - func AuthFromEnv() Auth { switch GetAuthKind() { case AuthKindPrivateKeyVar: diff --git a/internal/evmreader/claim.go b/internal/evmreader/claim.go index a82688426..c0d257dcd 100644 --- a/internal/evmreader/claim.go +++ b/internal/evmreader/claim.go @@ -6,7 +6,6 @@ package evmreader import ( "cmp" "context" - "log/slog" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" @@ -14,13 +13,13 @@ import ( "github.com/ethereum/go-ethereum/common" ) -func (r *EvmReader) checkForClaimStatus( +func (r *Service) checkForClaimStatus( ctx context.Context, apps []application, mostRecentBlockNumber uint64, ) { - slog.Debug("evmreader: Checking for new Claim Acceptance Events") + r.Logger.Debug("Checking for new Claim Acceptance Events") // Classify them by lastClaimCheck block appsIndexedByLastCheck := indexApps(keyByLastClaimCheck, apps) @@ -37,7 +36,7 @@ func (r *EvmReader) checkForClaimStatus( if mostRecentBlockNumber > lastClaimCheck { - slog.Debug("evmreader: Checking claim acceptance for applications", + r.Logger.Debug("Checking claim acceptance for applications", "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber) @@ -45,14 +44,14 @@ func (r *EvmReader) checkForClaimStatus( r.readAndUpdateClaims(ctx, apps, lastClaimCheck, mostRecentBlockNumber) } else if mostRecentBlockNumber < lastClaimCheck { - slog.Warn( - "evmreader: Not reading claim acceptance: most recent block is lower than the last processed one", //nolint:lll + r.Logger.Warn( + "Not reading claim acceptance: most recent block is lower than the last processed one", //nolint:lll "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber, ) } else { - slog.Warn("evmreader: Not reading claim acceptance: already checked the most recent blocks", + r.Logger.Warn("Not reading claim acceptance: already checked the most recent blocks", "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber, @@ -62,7 +61,7 @@ func (r *EvmReader) checkForClaimStatus( } } -func (r *EvmReader) readAndUpdateClaims( +func (r *Service) readAndUpdateClaims( ctx context.Context, apps []application, lastClaimCheck, mostRecentBlockNumber uint64, @@ -90,7 +89,7 @@ func (r *EvmReader) readAndUpdateClaims( appClaimAcceptanceEventMap, err := r.readClaimsAcceptance( ctx, consensusContract, appAddresses, lastClaimCheck+1, mostRecentBlockNumber) if err != nil { - slog.Error("evmreader: Error reading claim acceptance status", + r.Logger.Error("Error reading claim acceptance status", "apps", apps, "IConsensus", iConsensusAddress, "start", lastClaimCheck, @@ -110,14 +109,14 @@ func (r *EvmReader) readAndUpdateClaims( previousEpochs, err := r.repository.GetPreviousEpochsWithOpenClaims( ctx, app, claimAcceptance.LastProcessedBlockNumber.Uint64()) if err != nil { - slog.Error("evmreader: Error retrieving previous submitted claims", + r.Logger.Error("Error retrieving previous submitted claims", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "error", err) continue APP_LOOP } if len(previousEpochs) > 0 { - slog.Error("evmreader: Application got 'not accepted' claims. It is in an invalid state", + r.Logger.Error("Application got 'not accepted' claims. It is in an invalid state", "claim last block", claimAcceptance.LastProcessedBlockNumber, "app", app) continue APP_LOOP @@ -130,7 +129,7 @@ func (r *EvmReader) readAndUpdateClaims( claimAcceptance.LastProcessedBlockNumber.Uint64()), app) if err != nil { - slog.Error("evmreader: Error retrieving Epoch", + r.Logger.Error("Error retrieving Epoch", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "error", err) @@ -139,16 +138,16 @@ func (r *EvmReader) readAndUpdateClaims( // Check Epoch if epoch == nil { - slog.Error( - "evmreader: Found claim acceptance event for an unknown epoch. Application is in an invalid state", //nolint:lll + r.Logger.Error( + "Found claim acceptance event for an unknown epoch. Application is in an invalid state", //nolint:lll "app", app, "claim last block", claimAcceptance.LastProcessedBlockNumber, "hash", claimAcceptance.Claim) continue APP_LOOP } if epoch.ClaimHash == nil { - slog.Warn( - "evmreader: Found claim acceptance event, but claim hasn't been calculated yet", + r.Logger.Warn( + "Found claim acceptance event, but claim hasn't been calculated yet", "app", app, "lastBlock", claimAcceptance.LastProcessedBlockNumber, ) @@ -156,7 +155,7 @@ func (r *EvmReader) readAndUpdateClaims( } if claimAcceptance.Claim != *epoch.ClaimHash || claimAcceptance.LastProcessedBlockNumber.Uint64() != epoch.LastBlock { - slog.Error("evmreader: Accepted Claim does not match actual Claim. Application is in an invalid state", //nolint:lll + r.Logger.Error("Accepted Claim does not match actual Claim. Application is in an invalid state", //nolint:lll "app", app, "lastBlock", epoch.LastBlock, "hash", epoch.ClaimHash) @@ -164,7 +163,7 @@ func (r *EvmReader) readAndUpdateClaims( continue APP_LOOP } if epoch.Status == EpochStatusClaimAccepted { - slog.Debug("evmreader: Claim already accepted. Skipping", + r.Logger.Debug("Claim already accepted. Skipping", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "claimStatus", epoch.Status, @@ -174,7 +173,7 @@ func (r *EvmReader) readAndUpdateClaims( if epoch.Status != EpochStatusClaimSubmitted { // this happens when running on latest. EvmReader can see the event before // the claim is marked as submitted by the claimer. - slog.Debug("evmreader: Claim status is not submitted. Skipping for now", + r.Logger.Debug("Claim status is not submitted. Skipping for now", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "claimStatus", epoch.Status, @@ -183,7 +182,7 @@ func (r *EvmReader) readAndUpdateClaims( } // Update Epoch claim status - slog.Info("evmreader: Claim Accepted", + r.Logger.Info("Claim Accepted", "app", app, "lastBlock", epoch.LastBlock, "hash", epoch.ClaimHash, @@ -195,7 +194,7 @@ func (r *EvmReader) readAndUpdateClaims( err = r.repository.UpdateEpochs( ctx, app, []*Epoch{epoch}, claimAcceptance.Raw.BlockNumber) if err != nil { - slog.Error("evmreader: Error storing claims", "app", app, "error", err) + r.Logger.Error("Error storing claims", "app", app, "error", err) continue } } @@ -204,7 +203,7 @@ func (r *EvmReader) readAndUpdateClaims( } } -func (r *EvmReader) readClaimsAcceptance( +func (r *Service) readClaimsAcceptance( ctx context.Context, consensusContract ConsensusContract, appAddresses []common.Address, diff --git a/internal/evmreader/claim_test.go b/internal/evmreader/claim_test.go index ef9311e3f..30f1fc668 100644 --- a/internal/evmreader/claim_test.go +++ b/internal/evmreader/claim_test.go @@ -12,6 +12,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" ) @@ -21,15 +22,17 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { wsClient := FakeWSEhtClient{} //New EVM Reader - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := &Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -154,15 +157,17 @@ func (s *EvmReaderSuite) TestReadClaimAcceptance() { //New EVM Reader wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -311,15 +316,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -458,15 +465,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -605,15 +614,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 1c4257b96..7f2b09070 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" "math/big" "time" @@ -30,10 +29,8 @@ import ( type CreateInfo struct { service.CreateInfo - model.EvmReaderPersistentConfig - DefaultBlockString string PostgresEndpoint config.Redacted[string] BlockchainHttpEndpoint config.Redacted[string] BlockchainWsEndpoint config.Redacted[string] @@ -44,7 +41,16 @@ type CreateInfo struct { type Service struct { service.Service - reader EvmReader + + client EthClient + wsClient EthWsClient + inputSource InputSource + repository EvmReaderRepository + contractFactory ContractFactory + inputBoxDeploymentBlock uint64 + defaultBlock DefaultBlock + epochLengthCache map[Address]uint64 + hasEnabledApps bool } func (c *CreateInfo) LoadEnv() { @@ -53,6 +59,7 @@ func (c *CreateInfo) LoadEnv() { c.MaxDelay = config.GetEvmReaderRetryPolicyMaxDelay() c.MaxRetries = config.GetEvmReaderRetryPolicyMaxRetries() c.PostgresEndpoint.Value = config.GetPostgresEndpoint() + c.LogLevel = service.LogLevel(config.GetLogLevel()) // persistent c.DefaultBlock = config.GetEvmReaderDefaultBlock() @@ -69,11 +76,6 @@ func Create(c *CreateInfo, s *Service) error { return err } - c.DefaultBlock, err = config.ToDefaultBlockFromString(c.DefaultBlockString) - if err != nil { - return err - } - client, err := ethclient.DialContext(s.Context, c.BlockchainHttpEndpoint.Value) if err != nil { return err @@ -103,15 +105,14 @@ func Create(c *CreateInfo, s *Service) error { contractFactory := NewEvmReaderContractFactory(client, c.MaxRetries, c.MaxDelay) - s.reader = NewEvmReader( - NewEhtClientWithRetryPolicy(client, c.MaxRetries, c.MaxDelay), - NewEthWsClientWithRetryPolicy(wsClient, c.MaxRetries, c.MaxDelay), - NewInputSourceWithRetryPolicy(inputSource, c.MaxRetries, c.MaxDelay), - c.Database, - c.InputBoxDeploymentBlock, - c.DefaultBlock, - contractFactory, - ) + s.client = NewEhtClientWithRetryPolicy(client, c.MaxRetries, c.MaxDelay) + s.wsClient = NewEthWsClientWithRetryPolicy(wsClient, c.MaxRetries, c.MaxDelay) + s.inputSource = NewInputSourceWithRetryPolicy(inputSource, c.MaxRetries, c.MaxDelay) + s.repository = c.Database + s.inputBoxDeploymentBlock = c.InputBoxDeploymentBlock + s.defaultBlock = c.DefaultBlock + s.contractFactory = contractFactory + s.hasEnabledApps = true return nil } @@ -135,10 +136,11 @@ func (s *Service) Tick() []error { return []error{} } -func (s *Service) Start(context context.Context, ready chan<- struct{}) error { - go s.reader.Run(s.Context, ready) - return s.Serve() +func (s *Service) Serve() error { + go s.Run(s.Context) + return s.Service.Serve() } + func (s *Service) String() string { return s.Name } @@ -245,47 +247,8 @@ type application struct { consensusContract ConsensusContract } -// EvmReader reads Input Added, Claim Submitted and -// Output Executed events from the blockchain -type EvmReader struct { - client EthClient - wsClient EthWsClient - inputSource InputSource - repository EvmReaderRepository - contractFactory ContractFactory - inputBoxDeploymentBlock uint64 - defaultBlock DefaultBlock - epochLengthCache map[Address]uint64 - hasEnabledApps bool -} - -func (r *EvmReader) String() string { - return "evmreader" -} - -// Creates a new EvmReader -func NewEvmReader( - client EthClient, - wsClient EthWsClient, - inputSource InputSource, - repository EvmReaderRepository, - inputBoxDeploymentBlock uint64, - defaultBlock DefaultBlock, - contractFactory ContractFactory, -) EvmReader { - return EvmReader{ - client: client, - wsClient: wsClient, - inputSource: inputSource, - repository: repository, - inputBoxDeploymentBlock: inputBoxDeploymentBlock, - defaultBlock: defaultBlock, - contractFactory: contractFactory, - hasEnabledApps: true, - } -} - -func (r *EvmReader) Run(ctx context.Context, ready chan<- struct{}) error { +func (r *Service) Run(ctx context.Context) error { + ready := make(chan struct{}, 1) // Initialize epochLength cache r.epochLengthCache = make(map[Address]uint64) @@ -297,20 +260,20 @@ func (r *EvmReader) Run(ctx context.Context, ready chan<- struct{}) error { if _, ok := err.(*SubscriptionError); !ok { return err } - slog.Error(err.Error()) - slog.Info("evmreader: Restarting subscription") + r.Logger.Error(err.Error()) + r.Logger.Info("Restarting subscription") } } // watchForNewBlocks watches for new blocks and reads new inputs based on the // default block configuration, which have not been processed yet. -func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) error { +func (r *Service) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) error { headers := make(chan *types.Header) sub, err := r.wsClient.SubscribeNewHead(ctx, headers) if err != nil { return fmt.Errorf("could not start subscription: %v", err) } - slog.Info("evmreader: Subscribed to new block events") + r.Logger.Info("Subscribed to new block events") ready <- struct{}{} defer sub.Unsubscribe() @@ -323,13 +286,13 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} case header := <-headers: // Every time a new block arrives - slog.Debug("evmreader: New block header received", "blockNumber", header.Number, "blockHash", header.Hash()) + r.Logger.Debug("New block header received", "blockNumber", header.Number, "blockHash", header.Hash()) - slog.Debug("evmreader: Retrieving enabled applications") + r.Logger.Debug("Retrieving enabled applications") // Get All Applications runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { - slog.Error("evmreader: Error retrieving running applications", + r.Logger.Error("Error retrieving running applications", "error", err, ) @@ -338,13 +301,13 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} if len(runningApps) == 0 { if r.hasEnabledApps { - slog.Info("evmreader: No registered applications enabled") + r.Logger.Info("No registered applications enabled") } r.hasEnabledApps = false continue } if !r.hasEnabledApps { - slog.Info("evmreader: Found enabled applications") + r.Logger.Info("Found enabled applications") } r.hasEnabledApps = true @@ -353,7 +316,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} for _, app := range runningApps { applicationContract, consensusContract, err := r.getAppContracts(app) if err != nil { - slog.Error("evmreader: Error retrieving application contracts", "app", app, "error", err) + r.Logger.Error("Error retrieving application contracts", "app", app, "error", err) continue } apps = append(apps, application{Application: app, @@ -362,7 +325,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} } if len(apps) == 0 { - slog.Info("evmreader: No correctly configured applications running") + r.Logger.Info("No correctly configured applications running") continue } @@ -373,14 +336,14 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} r.defaultBlock, ) if err != nil { - slog.Error("evmreader: Error fetching most recent block", + r.Logger.Error("Error fetching most recent block", "default block", r.defaultBlock, "error", err) continue } blockNumber = mostRecentHeader.Number.Uint64() - slog.Debug(fmt.Sprintf("evmreader: Using block %d and not %d because of commitment policy: %s", + r.Logger.Debug(fmt.Sprintf("Using block %d and not %d because of commitment policy: %s", mostRecentHeader.Number.Uint64(), header.Number.Uint64(), r.defaultBlock)) } @@ -396,7 +359,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} // fetchMostRecentHeader fetches the most recent header up till the // given default block -func (r *EvmReader) fetchMostRecentHeader( +func (r *Service) fetchMostRecentHeader( ctx context.Context, defaultBlock DefaultBlock, ) (*types.Header, error) { @@ -431,7 +394,7 @@ func (r *EvmReader) fetchMostRecentHeader( // getAppContracts retrieves the ApplicationContract and ConsensusContract for a given Application. // Also validates if IConsensus configuration matches the blockchain registered one -func (r *EvmReader) getAppContracts(app Application, +func (r *Service) getAppContracts(app Application, ) (ApplicationContract, ConsensusContract, error) { applicationContract, err := r.contractFactory.NewApplication(app.ContractAddress) if err != nil { diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 7a4a80753..b12dc0c22 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -17,6 +17,7 @@ import ( appcontract "github.com/cartesi/rollups-node/pkg/contracts/iapplication" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -73,7 +74,7 @@ type EvmReaderSuite struct { wsClient *MockEthClient inputBox *MockInputBox repository *MockRepository - evmReader *EvmReader + evmReader Service contractFactory *MockEvmReaderContractFactory } @@ -108,22 +109,24 @@ func (s *EvmReaderSuite) TearDownSuite() { } func (s *EvmReaderSuite) SetupTest() { - s.client = newMockEthClient() s.wsClient = s.client s.inputBox = newMockInputBox() s.repository = newMockRepository() s.contractFactory = newEmvReaderContractFactory() - inputReader := NewEvmReader( - s.client, - s.wsClient, - s.inputBox, - s.repository, - 0, - DefaultBlockStatusLatest, - s.contractFactory, - ) - s.evmReader = &inputReader + s.evmReader = Service{ + client: s.client, + wsClient: s.wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{ + Name: "evm-reader", + }, &s.evmReader.Service) } // Service tests @@ -170,27 +173,25 @@ func (s *EvmReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { } func (s *EvmReaderSuite) TestItWrongIConsensus() { - consensusContract := &MockIConsensusContract{} - contractFactory := newEmvReaderContractFactory() - contractFactory.Unset("NewIConsensus") contractFactory.On("NewIConsensus", mock.Anything, ).Return(consensusContract, nil) wsClient := FakeWSEhtClient{} - - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare consensus claimEvent0 := &iconsensus.IConsensusClaimAcceptance{ diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go index 9f169aa03..1a7bdda46 100644 --- a/internal/evmreader/input.go +++ b/internal/evmreader/input.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" . "github.com/cartesi/rollups-node/internal/model" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -15,13 +14,13 @@ import ( ) // checkForNewInputs checks if is there new Inputs for all running Applications -func (r *EvmReader) checkForNewInputs( +func (r *Service) checkForNewInputs( ctx context.Context, apps []application, mostRecentBlockNumber uint64, ) { - slog.Debug("evmreader: Checking for new inputs") + r.Logger.Debug("Checking for new inputs") groupedApps := indexApps(byLastProcessedBlock, apps) @@ -37,7 +36,7 @@ func (r *EvmReader) checkForNewInputs( if mostRecentBlockNumber > lastProcessedBlock { - slog.Debug("evmreader: Checking inputs for applications", + r.Logger.Debug("Checking inputs for applications", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -49,7 +48,7 @@ func (r *EvmReader) checkForNewInputs( apps, ) if err != nil { - slog.Error("Error reading inputs", + r.Logger.Error("Error reading inputs", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -58,14 +57,14 @@ func (r *EvmReader) checkForNewInputs( continue } } else if mostRecentBlockNumber < lastProcessedBlock { - slog.Warn( - "evmreader: Not reading inputs: most recent block is lower than the last processed one", + r.Logger.Warn( + "Not reading inputs: most recent block is lower than the last processed one", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, ) } else { - slog.Info("evmreader: Not reading inputs: already checked the most recent blocks", + r.Logger.Info("Not reading inputs: already checked the most recent blocks", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -76,7 +75,7 @@ func (r *EvmReader) checkForNewInputs( // readAndStoreInputs reads, inputs from the InputSource given specific filter options, indexes // them into epochs and store the indexed inputs and epochs -func (r *EvmReader) readAndStoreInputs( +func (r *Service) readAndStoreInputs( ctx context.Context, startBlock uint64, endBlock uint64, @@ -89,7 +88,7 @@ func (r *EvmReader) readAndStoreInputs( // Get App EpochLength err := r.addAppEpochLengthIntoCache(app) if err != nil { - slog.Error("evmreader: Error adding epoch length into cache", + r.Logger.Error("Error adding epoch length into cache", "app", app.ContractAddress, "error", err) continue @@ -100,7 +99,7 @@ func (r *EvmReader) readAndStoreInputs( } if len(appsToProcess) == 0 { - slog.Warn("evmreader: No valid running applications") + r.Logger.Warn("No valid running applications") return nil } @@ -122,7 +121,7 @@ func (r *EvmReader) readAndStoreInputs( currentEpoch, err := r.repository.GetEpoch(ctx, calculateEpochIndex(epochLength, startBlock), address) if err != nil { - slog.Error("evmreader: Error retrieving existing current epoch", + r.Logger.Error("Error retrieving existing current epoch", "app", address, "error", err, ) @@ -131,7 +130,7 @@ func (r *EvmReader) readAndStoreInputs( // Check current epoch status if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { - slog.Error("evmreader: Current epoch is not open", + r.Logger.Error("Current epoch is not open", "app", address, "epoch_index", currentEpoch.Index, "status", currentEpoch.Status, @@ -150,7 +149,7 @@ func (r *EvmReader) readAndStoreInputs( // If input belongs into a new epoch, close the previous known one if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { currentEpoch.Status = EpochStatusClosed - slog.Info("evmreader: Closing epoch", + r.Logger.Info("Closing epoch", "app", currentEpoch.AppAddress, "epoch_index", currentEpoch.Index, "start", currentEpoch.FirstBlock, @@ -168,7 +167,7 @@ func (r *EvmReader) readAndStoreInputs( epochInputMap[currentEpoch] = []Input{} } - slog.Info("evmreader: Found new Input", + r.Logger.Info("Found new Input", "app", address, "index", input.Index, "block", input.BlockNumber, @@ -185,7 +184,7 @@ func (r *EvmReader) readAndStoreInputs( // Indexed all inputs. Check if it is time to close this epoch if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { currentEpoch.Status = EpochStatusClosed - slog.Info("evmreader: Closing epoch", + r.Logger.Info("Closing epoch", "app", currentEpoch.AppAddress, "epoch_index", currentEpoch.Index, "start", currentEpoch.FirstBlock, @@ -204,7 +203,7 @@ func (r *EvmReader) readAndStoreInputs( address, ) if err != nil { - slog.Error("evmreader: Error storing inputs and epochs", + r.Logger.Error("Error storing inputs and epochs", "app", address, "error", err, ) @@ -214,7 +213,7 @@ func (r *EvmReader) readAndStoreInputs( // Store everything if len(epochInputMap) > 0 { - slog.Debug("evmreader: Inputs and epochs stored successfully", + r.Logger.Debug("Inputs and epochs stored successfully", "app", address, "start-block", startBlock, "end-block", endBlock, @@ -222,7 +221,7 @@ func (r *EvmReader) readAndStoreInputs( "total inputs", len(inputs), ) } else { - slog.Debug("evmreader: No inputs or epochs to store") + r.Logger.Debug("No inputs or epochs to store") } } @@ -232,7 +231,7 @@ func (r *EvmReader) readAndStoreInputs( // addAppEpochLengthIntoCache checks the epoch length cache and read epoch length from IConsensus // contract and add it to the cache if needed -func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { +func (r *Service) addAppEpochLengthIntoCache(app application) error { epochLength, ok := r.epochLengthCache[app.ContractAddress] if !ok { @@ -245,11 +244,11 @@ func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { err) } r.epochLengthCache[app.ContractAddress] = epochLength - slog.Info("evmreader: Got epoch length from IConsensus", + r.Logger.Info("Got epoch length from IConsensus", "app", app.ContractAddress, "epoch length", epochLength) } else { - slog.Debug("evmreader: Got epoch length from cache", + r.Logger.Debug("Got epoch length from cache", "app", app.ContractAddress, "epoch length", epochLength) } @@ -258,7 +257,7 @@ func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { } // readInputsFromBlockchain read the inputs from the blockchain ordered by Input index -func (r *EvmReader) readInputsFromBlockchain( +func (r *Service) readInputsFromBlockchain( ctx context.Context, appsAddresses []Address, startBlock, endBlock uint64, @@ -282,7 +281,7 @@ func (r *EvmReader) readInputsFromBlockchain( // Order inputs as order is not enforced by RetrieveInputs method nor the APIs for _, event := range inputsEvents { - slog.Debug("evmreader: Received input", + r.Logger.Debug("Received input", "app", event.AppContract, "index", event.Index, "block", event.Raw.BlockNumber) diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go index 918a21f65..1b6c9ccad 100644 --- a/internal/evmreader/input_test.go +++ b/internal/evmreader/input_test.go @@ -8,6 +8,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" @@ -17,15 +18,17 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -125,15 +128,17 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -233,15 +238,17 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { wsClient := FakeWSEhtClient{} - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + inputReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &inputReader.Service) // Prepare Client s.client.Unset("HeaderByNumber") @@ -327,15 +334,17 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { wsClient := FakeWSEhtClient{} - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + inputReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &inputReader.Service) // Prepare Client s.client.Unset("HeaderByNumber") diff --git a/internal/evmreader/output.go b/internal/evmreader/output.go index 5723a26c2..334717a22 100644 --- a/internal/evmreader/output.go +++ b/internal/evmreader/output.go @@ -6,13 +6,12 @@ package evmreader import ( "bytes" "context" - "log/slog" . "github.com/cartesi/rollups-node/internal/model" "github.com/ethereum/go-ethereum/accounts/abi/bind" ) -func (r *EvmReader) checkForOutputExecution( +func (r *Service) checkForOutputExecution( ctx context.Context, apps []application, mostRecentBlockNumber uint64, @@ -20,7 +19,7 @@ func (r *EvmReader) checkForOutputExecution( appAddresses := appsToAddresses(apps) - slog.Debug("evmreader: Checking for new Output Executed Events", "apps", appAddresses) + r.Logger.Debug("Checking for new Output Executed Events", "apps", appAddresses) for _, app := range apps { @@ -34,7 +33,7 @@ func (r *EvmReader) checkForOutputExecution( if mostRecentBlockNumber > LastOutputCheck { - slog.Debug("evmreader: Checking output execution for application", + r.Logger.Debug("Checking output execution for application", "app", app.ContractAddress, "last output check block", LastOutputCheck, "most recent block", mostRecentBlockNumber) @@ -42,14 +41,14 @@ func (r *EvmReader) checkForOutputExecution( r.readAndUpdateOutputs(ctx, app, LastOutputCheck, mostRecentBlockNumber) } else if mostRecentBlockNumber < LastOutputCheck { - slog.Warn( - "evmreader: Not reading output execution: most recent block is lower than the last processed one", //nolint:lll + r.Logger.Warn( + "Not reading output execution: most recent block is lower than the last processed one", //nolint:lll "app", app.ContractAddress, "last output check block", LastOutputCheck, "most recent block", mostRecentBlockNumber, ) } else { - slog.Warn("evmreader: Not reading output execution: already checked the most recent blocks", + r.Logger.Warn("Not reading output execution: already checked the most recent blocks", "app", app.ContractAddress, "last output check block", LastOutputCheck, "most recent block", mostRecentBlockNumber, @@ -59,7 +58,7 @@ func (r *EvmReader) checkForOutputExecution( } -func (r *EvmReader) readAndUpdateOutputs( +func (r *Service) readAndUpdateOutputs( ctx context.Context, app application, lastOutputCheck, mostRecentBlockNumber uint64) { contract := app.applicationContract @@ -71,7 +70,7 @@ func (r *EvmReader) readAndUpdateOutputs( outputExecutedEvents, err := contract.RetrieveOutputExecutionEvents(opts) if err != nil { - slog.Error("evmreader: Error reading output events", "app", app.ContractAddress, "error", err) + r.Logger.Error("Error reading output events", "app", app.ContractAddress, "error", err) return } @@ -82,30 +81,30 @@ func (r *EvmReader) readAndUpdateOutputs( // Compare output to check it is the correct one output, err := r.repository.GetOutput(ctx, app.ContractAddress, event.OutputIndex) if err != nil { - slog.Error("evmreader: Error retrieving output", + r.Logger.Error("Error retrieving output", "app", app.ContractAddress, "index", event.OutputIndex, "error", err) return } if output == nil { - slog.Warn("evmreader: Found OutputExecuted event but output does not exist in the database yet", + r.Logger.Warn("Found OutputExecuted event but output does not exist in the database yet", "app", app.ContractAddress, "index", event.OutputIndex) return } if !bytes.Equal(output.RawData, event.Output) { - slog.Debug("evmreader: Output mismatch", + r.Logger.Debug("Output mismatch", "app", app.ContractAddress, "index", event.OutputIndex, "actual", output.RawData, "event's", event.Output) - slog.Error("evmreader: Output mismatch. Application is in an invalid state", + r.Logger.Error("Output mismatch. Application is in an invalid state", "app", app.ContractAddress, "index", event.OutputIndex) return } - slog.Info("evmreader: Output executed", "app", app.ContractAddress, "index", event.OutputIndex) + r.Logger.Info("Output executed", "app", app.ContractAddress, "index", event.OutputIndex) output.TransactionHash = &event.Raw.TxHash executedOutputs = append(executedOutputs, output) } @@ -113,7 +112,7 @@ func (r *EvmReader) readAndUpdateOutputs( err = r.repository.UpdateOutputExecutionTransaction( ctx, app.ContractAddress, executedOutputs, mostRecentBlockNumber) if err != nil { - slog.Error("evmreader: Error storing output execution statuses", "app", app, "error", err) + r.Logger.Error("Error storing output execution statuses", "app", app, "error", err) } } diff --git a/internal/evmreader/output_test.go b/internal/evmreader/output_test.go index c05a6c585..5c8b2083c 100644 --- a/internal/evmreader/output_test.go +++ b/internal/evmreader/output_test.go @@ -11,6 +11,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" appcontract "github.com/cartesi/rollups-node/pkg/contracts/iapplication" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" @@ -21,15 +22,17 @@ func (s *EvmReaderSuite) TestOutputExecution() { wsClient := FakeWSEhtClient{} //New EVM Reader - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -151,15 +154,17 @@ func (s *EvmReaderSuite) TestReadOutputExecution() { //New EVM Reader wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ @@ -284,15 +289,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) applicationContract.On("RetrieveOutputExecutionEvents", mock.Anything, @@ -396,15 +403,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ @@ -513,15 +522,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ diff --git a/internal/node/chainid.go b/internal/node/chainid.go deleted file mode 100644 index 12e2a46c0..000000000 --- a/internal/node/chainid.go +++ /dev/null @@ -1,45 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package node - -import ( - "context" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/ethclient" -) - -const defaultTimeout = 45 * time.Second - -// Checks if the chain id from the configuration matches the chain id reported -// by the Ethereum node. If they don't, it returns an error. -func validateChainId(ctx context.Context, chainId uint64, ethereumNodeAddr string) error { - remoteChainId, err := getChainId(ctx, ethereumNodeAddr) - if err != nil { - return err - } else if chainId != remoteChainId { - return fmt.Errorf( - "chainId mismatch; expected %v but Ethereum node returned %v", - chainId, - remoteChainId, - ) - } - return nil -} - -func getChainId(ctx context.Context, ethereumNodeAddr string) (uint64, error) { - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - client, err := ethclient.Dial(ethereumNodeAddr) - if err != nil { - return 0, fmt.Errorf("create RPC client: %w", err) - } - chainId, err := client.ChainID(ctx) - if err != nil { - return 0, fmt.Errorf("get chain id: %w", err) - } - return chainId.Uint64(), nil -} diff --git a/internal/node/chainid_test.go b/internal/node/chainid_test.go deleted file mode 100644 index 9e4fb9d0b..000000000 --- a/internal/node/chainid_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package node - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/suite" -) - -type ValidateChainIdSuite struct { - suite.Suite -} - -func TestValidateChainId(t *testing.T) { - suite.Run(t, new(ValidateChainIdSuite)) -} - -func (s *ValidateChainIdSuite) TestItFailsIfChainIdsDoNotMatch() { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"jsonrpc":"2.0","id":67,"result":"0x7a69"}`) - })) - defer ts.Close() - localChainId := uint64(11111) - - err := validateChainId(context.Background(), localChainId, ts.URL) - - s.NotNil(err) -} - -func (s *ValidateChainIdSuite) TestItReturnsNilIfChainIdsMatch() { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"jsonrpc":"2.0","id":67,"result":"0x7a69"}`) - })) - defer ts.Close() - localChainId := uint64(31337) - - err := validateChainId(context.Background(), localChainId, ts.URL) - - s.Nil(err) -} diff --git a/internal/node/handlers.go b/internal/node/handlers.go deleted file mode 100644 index 7e197b70b..000000000 --- a/internal/node/handlers.go +++ /dev/null @@ -1,14 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package node - -import ( - "log/slog" - "net/http" -) - -func healthcheckHandler(w http.ResponseWriter, r *http.Request) { - slog.Debug("Node received a healthcheck request") - w.WriteHeader(http.StatusOK) -} diff --git a/internal/node/node.go b/internal/node/node.go index d5f41c8ad..c4c1fd377 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -4,25 +4,271 @@ package node import ( - "context" + "fmt" + "log/slog" + "net/http" + "time" + "github.com/cartesi/rollups-node/pkg/service" + + "github.com/cartesi/rollups-node/internal/advancer" + "github.com/cartesi/rollups-node/internal/claimer" "github.com/cartesi/rollups-node/internal/config" + "github.com/cartesi/rollups-node/internal/evmreader" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/repository" - "github.com/cartesi/rollups-node/internal/services" + "github.com/cartesi/rollups-node/internal/validator" + + "github.com/ethereum/go-ethereum/ethclient" ) -// Setup creates the Node top-level supervisor. -func Setup( - ctx context.Context, - c config.NodeConfig, +type CreateInfo struct { + service.CreateInfo + + BlockchainHttpEndpoint config.Redacted[string] + BlockchainID uint64 + HTTPEndpoint config.Redacted[string] + PostgresEndpoint config.Redacted[string] + EnableClaimSubmission bool +} + +type Service struct { + service.Service + + Children []service.IService + Client *ethclient.Client + Repository *repository.Database +} + +func (c *CreateInfo) LoadEnv() { + c.BlockchainHttpEndpoint = config.Redacted[string]{config.GetBlockchainHttpEndpoint()} + c.BlockchainID = config.GetBlockchainId() + c.EnableClaimSubmission = config.GetFeatureClaimSubmissionEnabled() + c.PostgresEndpoint = config.Redacted[string]{config.GetPostgresEndpoint()} + + httpAddress := config.GetHttpAddress() + httpPort := config.GetHttpPort() + c.HTTPEndpoint.Value = fmt.Sprintf("%v:%v", httpAddress, httpPort) +} + +func Create(c *CreateInfo, s *Service) error { + var err error + + err = service.Create(&c.CreateInfo, &s.Service) + if err != nil { + return err + } + + err = withTimeout(1 * time.Second, func() error { + // database connection + s.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) + if err != nil { + return err + } + + // blockchain connection + chainID check + s.Client, err = ethclient.Dial(c.BlockchainHttpEndpoint.Value) + if err != nil { + return err + } + chainID, err := s.Client.ChainID(s.Context) + if err != nil { + return err + } + if c.BlockchainID != chainID.Uint64() { + return fmt.Errorf( + "chainId mismatch; got: %v, expected: %v", + chainID, + c.BlockchainID, + ) + } + return nil + }) + + if err != nil { + s.Logger.Error(fmt.Sprint(err)) + return err + } + return createServices(c, s) +} + +func createServices(c *CreateInfo, s *Service) error { + ch := make(chan string) + limit := time.Duration(1 * time.Second) + deadline := time.After(limit) + + go func() { + child := newEVMReader(s.Logger, s.Repository) + s.Children = append(s.Children, child) + ch <- child.String() + }() + + go func() { + child := newAdvancer(s.Logger, s.Repository, s.ServeMux) + s.Children = append(s.Children, child) + ch <- child.String() + }() + + go func() { + child := newValidator(s.Logger, s.Repository) + s.Children = append(s.Children, child) + ch <- child.String() + }() + + go func() { + child := newClaimer(s.Logger, s.Repository, c.EnableClaimSubmission) + s.Children = append(s.Children, child) + ch <- child.String() + }() + + for range s.Children { + select { + case <-ch: + case <-deadline: + s.Logger.Error("Failed to create services. Time limit exceded", + "limit", limit) + return fmt.Errorf("Failed to create services. Time limit exceded") + } + } + return nil +} + +func (me *Service) Alive() bool { + allAlive := true + for _, s := range me.Children { + allAlive = allAlive && s.Alive() + } + return allAlive +} + +func (me *Service) Ready() bool { + allReady := true + for _, s := range me.Children { + allReady = allReady && s.Ready() + } + return allReady +} + +func (s *Service) Reload() []error { return nil } +func (s *Service) Tick() []error { return nil } +func (s *Service) Stop(bool) []error { return nil } + +// services creation + +func withTimeout(limit time.Duration, fn func() error) error { + ch := make(chan error) + deadline := time.After(limit) + go func() { + ch <- fn() + }() + + select { + case err := <-ch: + return err + case <-deadline: + return fmt.Errorf("Time limit exceded") + } +} + +func newEVMReader( + logger *slog.Logger, database *repository.Database, -) (services.Service, error) { - // checks - err := validateChainId(ctx, c.BlockchainID, c.BlockchainHttpEndpoint.Value) +) service.IService { + s := evmreader.Service{} + c := evmreader.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "evm-reader", + Impl: &s, + ProcOwner: true, // TODO: Remove this after updating supervisor + }, + EvmReaderPersistentConfig: model.EvmReaderPersistentConfig{ + DefaultBlock: model.DefaultBlockStatusSafe, + }, + Database: database, + } + c.LoadEnv() + + err := evmreader.Create(&c, &s) if err != nil { - return nil, err + logger.Error("Fatal", + "error", err) + panic(err) + } + return &s +} + +func newAdvancer( + logger *slog.Logger, + database *repository.Database, + serveMux *http.ServeMux, +) service.IService { + s := advancer.Service{} + c := advancer.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "advancer", + Impl: &s, + ProcOwner: true, + ServeMux: serveMux, + }, + Repository: database, } + c.LoadEnv() - // create service - return newSupervisorService(c, database), nil + err := advancer.Create(&c, &s) + if err != nil { + logger.Error("Fatal", + "error", err) + panic(err) + } + return &s +} + +func newValidator( + logger *slog.Logger, + database *repository.Database, +) service.IService { + s := validator.Service{} + c := validator.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "validator", + Impl: &s, + ProcOwner: true, + }, + Repository: database, + } + c.LoadEnv() + + err := validator.Create(c, &s) + if err != nil { + slog.Error("Fatal", + "error", err) + panic(err) + } + return &s +} + +func newClaimer( + logger *slog.Logger, + database *repository.Database, + enableSubmissionOverride bool, +) service.IService { + s := claimer.Service{} + c := claimer.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "claimer", + Impl: &s, + ProcOwner: true, + }, + Repository: database, + } + c.LoadEnv() + c.EnableSubmission = enableSubmissionOverride // cmdline overrides env + + err := claimer.Create(&c, &s) + if err != nil { + logger.Error("Fatal", + "error", err) + panic(err) + } + return &s } diff --git a/internal/node/services.go b/internal/node/services.go deleted file mode 100644 index 1ab25480d..000000000 --- a/internal/node/services.go +++ /dev/null @@ -1,153 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package node - -import ( - "fmt" - "net/http" - - advancerservice "github.com/cartesi/rollups-node/internal/advancer" - claimerservice "github.com/cartesi/rollups-node/internal/claimer" - "github.com/cartesi/rollups-node/internal/config" - readerservice "github.com/cartesi/rollups-node/internal/evmreader" - "github.com/cartesi/rollups-node/internal/repository" - "github.com/cartesi/rollups-node/internal/services" - validatorservice "github.com/cartesi/rollups-node/internal/validator" - "github.com/cartesi/rollups-node/pkg/service" -) - -// We use an enum to define the ports of each service and avoid conflicts. -type portOffset = int - -const ( - portOffsetProxy = iota -) - -// Get the port of the given service. -func getPort(c config.NodeConfig, offset portOffset) int { - return c.HttpPort + int(offset) -} - -func newSupervisorService( - c config.NodeConfig, - database *repository.Database, -) *services.SupervisorService { - var s []services.Service - - serveMux := http.NewServeMux() - serveMux.Handle("/healthz", http.HandlerFunc(healthcheckHandler)) - - s = append(s, newClaimerService(c, database)) - s = append(s, newEvmReaderService(c, database)) - s = append(s, newAdvancerService(c, database, serveMux)) - s = append(s, newValidatorService(c, database)) - s = append(s, newHttpService(c, serveMux)) - - supervisor := services.SupervisorService{ - Name: "rollups-node", - Services: s, - } - return &supervisor -} - -func newHttpService(c config.NodeConfig, serveMux *http.ServeMux) services.Service { - addr := fmt.Sprintf("%v:%v", c.HttpAddress, getPort(c, portOffsetProxy)) - return &services.HttpService{ - Name: "http", - Address: addr, - Handler: serveMux, - } -} - -func newEvmReaderService(c config.NodeConfig, database *repository.Database) services.Service { - readerService := readerservice.Service{} - createInfo := readerservice.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: "reader", - Impl: &readerService, - ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), - }, - Database: database, - } - - err := readerservice.Create(&createInfo, &readerService) - if err != nil { - readerService.Logger.Error("Fatal", - "service", readerService.Name, - "error", err) - } - return &readerService -} - -func newAdvancerService(c config.NodeConfig, database *repository.Database, serveMux *http.ServeMux) services.Service { - advancerService := advancerservice.Service{} - createInfo := advancerservice.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: "advancer", - PollInterval: c.AdvancerPollingInterval, - Impl: &advancerService, - ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), - ServeMux: serveMux, - }, - Repository: database, - } - - err := advancerservice.Create(&createInfo, &advancerService) - if err != nil { - advancerService.Logger.Error("Fatal", - "service", advancerService.Name, - "error", err) - } - return &advancerService -} - -func newValidatorService(c config.NodeConfig, database *repository.Database) services.Service { - validatorService := validatorservice.Service{} - createInfo := validatorservice.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: "validator", - PollInterval: c.ValidatorPollingInterval, - Impl: &validatorService, - ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), - }, - Repository: database, - } - - err := validatorservice.Create(createInfo, &validatorService) - if err != nil { - validatorService.Logger.Error("Fatal", - "service", validatorService.Name, - "error", err) - } - return &validatorService -} - -func newClaimerService(c config.NodeConfig, database *repository.Database) services.Service { - claimerService := claimerservice.Service{} - createInfo := claimerservice.CreateInfo{ - Auth: c.Auth, - DBConn: database, - PostgresEndpoint: c.PostgresEndpoint, - BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, - EnableSubmission: c.FeatureClaimSubmissionEnabled, - CreateInfo: service.CreateInfo{ - Name: "claimer", - PollInterval: c.ClaimerPollingInterval, - Impl: &claimerService, - ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), - }, - } - - err := claimerservice.Create(createInfo, &claimerService) - if err != nil { - claimerService.Logger.Error("Fatal", - "service", claimerService.Name, - "error", err) - } - return &claimerService -} diff --git a/internal/services/poller/poller.go b/internal/services/poller/poller.go deleted file mode 100644 index 174f63fb1..000000000 --- a/internal/services/poller/poller.go +++ /dev/null @@ -1,53 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package poller - -import ( - "context" - "errors" - "fmt" - "log/slog" - "time" -) - -type Service interface { - Step(context.Context) error -} - -type Poller struct { - name string - service Service - ticker *time.Ticker -} - -var ErrInvalidPollingInterval = errors.New("polling interval must be greater than zero") - -func New(name string, service Service, pollingInterval time.Duration) (*Poller, error) { - if pollingInterval <= 0 { - return nil, ErrInvalidPollingInterval - } - ticker := time.NewTicker(pollingInterval) - return &Poller{name: name, service: service, ticker: ticker}, nil -} - -func (poller *Poller) Start(ctx context.Context) error { - slog.Debug(fmt.Sprintf("%s: poller started", poller.name)) - - for { - // Runs the service's inner routine. - err := poller.service.Step(ctx) - if err != nil { - return err - } - - // Waits for the polling interval to elapse (or for the context to be canceled). - select { - case <-poller.ticker.C: - continue - case <-ctx.Done(): - poller.ticker.Stop() - return nil - } - } -} diff --git a/internal/services/service.go b/internal/services/service.go deleted file mode 100644 index caf9083eb..000000000 --- a/internal/services/service.go +++ /dev/null @@ -1,18 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -// Package services provides mechanisms to start multiple services in the -// background -package services - -import ( - "context" - "fmt" -) - -type Service interface { - fmt.Stringer - - // Starts a service and sends a message to the channel when ready - Start(ctx context.Context, ready chan<- struct{}) error -} diff --git a/internal/validator/validator.go b/internal/validator/validator.go index 613db042d..6f513d461 100644 --- a/internal/validator/validator.go +++ b/internal/validator/validator.go @@ -8,7 +8,6 @@ package validator import ( "context" "fmt" - "log/slog" "time" "github.com/cartesi/rollups-node/internal/config" @@ -139,7 +138,7 @@ func (v *Service) Run(ctx context.Context) error { // validateApplication calculates, validates and stores the claim and/or proofs // for each processed epoch of the application. func (v *Service) validateApplication(ctx context.Context, app Application) error { - slog.Debug("validator: starting validation", "application", app.ContractAddress) + v.Logger.Debug("starting validation", "application", app.ContractAddress) processedEpochs, err := v.repository.GetProcessedEpochs(ctx, app.ContractAddress) if err != nil { return fmt.Errorf( @@ -149,12 +148,12 @@ func (v *Service) validateApplication(ctx context.Context, app Application) erro } for _, epoch := range processedEpochs { - slog.Debug("validator: started calculating claim", + v.Logger.Debug("started calculating claim", "app", app.ContractAddress, "epoch_index", epoch.Index, ) claim, outputs, err := v.createClaimAndProofs(ctx, epoch) - slog.Info("validator: claim calculated", + v.Logger.Info("claim calculated", "app", app.ContractAddress, "epoch_index", epoch.Index, ) @@ -209,7 +208,7 @@ func (v *Service) validateApplication(ctx context.Context, app Application) erro } if len(processedEpochs) == 0 { - slog.Debug("validator: no processed epochs to validate", + v.Logger.Debug("no processed epochs to validate", "app", app.ContractAddress, ) } diff --git a/internal/validator/validator_test.go b/internal/validator/validator_test.go index db58109e1..33353b730 100644 --- a/internal/validator/validator_test.go +++ b/internal/validator/validator_test.go @@ -10,6 +10,7 @@ import ( "github.com/cartesi/rollups-node/internal/merkle" . "github.com/cartesi/rollups-node/internal/model" + "github.com/cartesi/rollups-node/pkg/service" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -33,6 +34,7 @@ func (s *ValidatorSuite) SetupSubTest() { validator = &Service{ repository: repo, } + s.Require().Nil(service.Create(&service.CreateInfo{}, &validator.Service)) dummyEpochs = []Epoch{ {Index: 0, FirstBlock: 0, LastBlock: 9}, {Index: 1, FirstBlock: 10, LastBlock: 19}, diff --git a/pkg/service/service.go b/pkg/service/service.go index afb1db1c0..8544b05fe 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -63,6 +63,8 @@ import ( "sync/atomic" "syscall" "time" + + "github.com/lmittmann/tint" ) var ( @@ -77,6 +79,16 @@ type ServiceImpl interface { Stop(bool) []error } +type IService interface { + Alive() bool + Ready() bool + Reload() []error + Tick() []error + Stop(bool) []error + Serve() error + String() string +} + // CreateInfo stores initialization data for the Create function type CreateInfo struct { Name string @@ -110,61 +122,49 @@ type Service struct { // Create a service by: // - using values from s if non zero, -// - using values from ci, +// - using values from c, // - using default values when applicable -func Create(ci *CreateInfo, s *Service) error { - if ci == nil { - return ErrInvalid - } - - if s == nil { +func Create(c *CreateInfo, s *Service) error { + if c == nil || c.Impl == nil || c.Impl == s || s == nil { return ErrInvalid } - // running s.Running.Store(false) - - // name - if s.Name == "" { - s.Name = ci.Name - } - - // impl - if s.Impl == nil { - s.Impl = ci.Impl - } + s.Name = c.Name + s.Impl = c.Impl // log if s.Logger == nil { - // opts := &tint.Options{ - // Level: LogLevel, - // AddSource: LogLevel == slog.LevelDebug, - // // RFC3339 with milliseconds and without timezone - // TimeFormat: "2006-01-02T15:04:05.000", - // } - // handler := tint.NewHandler(os.Stdout, opts) - // s.Logger = slog.New(handler) - s.Logger = slog.Default() + opts := &tint.Options{ + Level: slog.Level(c.LogLevel), + AddSource: slog.Level(c.LogLevel) == slog.LevelDebug, + // RFC3339 with milliseconds and without timezone + TimeFormat: "2006-01-02T15:04:05.000", + } + handler := tint.NewHandler(os.Stdout, opts) + s.Logger = slog.New(handler) + //s.Logger = slog.Default() + s.Logger = s.Logger.With("service", s.Name) } // context and cancelation if s.Context == nil { - if ci.Context == nil { - ci.Context = context.Background() + if c.Context == nil { + c.Context = context.Background() } - s.Context = ci.Context + s.Context = c.Context } if s.Cancel == nil { - s.Context, s.Cancel = context.WithCancel(ci.Context) + s.Context, s.Cancel = context.WithCancel(c.Context) } - if ci.ProcOwner { + if c.ProcOwner { // ticker if s.Ticker == nil { - if ci.PollInterval == 0 { - ci.PollInterval = 1000 * time.Millisecond + if c.PollInterval == 0 { + c.PollInterval = 1000 * time.Millisecond } - s.PollInterval = ci.PollInterval + s.PollInterval = c.PollInterval s.Ticker = time.NewTicker(s.PollInterval) } @@ -180,38 +180,35 @@ func Create(ci *CreateInfo, s *Service) error { } // telemetry - if ci.TelemetryCreate { + if c.TelemetryCreate { if s.ServeMux == nil { - if ci.ServeMux == nil { - if !ci.ProcOwner { + if c.ServeMux == nil { + if !c.ProcOwner { s.Logger.Warn("Create:Created a new ServeMux", - "service", s.Name, - "ProcOwner", ci.ProcOwner, - "LogLevel", ci.LogLevel) + "ProcOwner", c.ProcOwner, + "LogLevel", c.LogLevel) } - ci.ServeMux = http.NewServeMux() + c.ServeMux = http.NewServeMux() } - s.ServeMux = ci.ServeMux + s.ServeMux = c.ServeMux } - if ci.TelemetryAddress == "" { - ci.TelemetryAddress = ":8080" + if c.TelemetryAddress == "" { + c.TelemetryAddress = ":8080" } s.HTTPServer, s.HTTPServerFunc = s.CreateDefaultTelemetry( - ci.TelemetryAddress, 3, 5*time.Second, s.ServeMux) + c.TelemetryAddress, 3, 5*time.Second, s.ServeMux) go s.HTTPServerFunc() } // ProcOwner will be ready on the call to Serve - if ci.ProcOwner { + if c.ProcOwner { s.Logger.Info("Create", - "service", s.Name, - "LogLevel", ci.LogLevel, + "LogLevel", c.LogLevel, "pid", os.Getpid()) } else { s.Running.Store(true) s.Logger.Info("Create", - "service", s.Name, - "LogLevel", ci.LogLevel) + "LogLevel", c.LogLevel) } return nil } @@ -231,12 +228,10 @@ func (s *Service) Reload() []error { if len(errs) > 0 { s.Logger.Error("Reload", - "service", s.Name, "duration", elapsed, "error", errs) } else { s.Logger.Info("Reload", - "service", s.Name, "duration", elapsed) } return errs @@ -249,12 +244,10 @@ func (s *Service) Tick() []error { if len(errs) > 0 { s.Logger.Error("Tick", - "service", s.Name, "duration", elapsed, "error", errs) } else { s.Logger.Debug("Tick", - "service", s.Name, "duration", elapsed) } return errs @@ -271,13 +264,11 @@ func (s *Service) Stop(force bool) []error { s.Running.Store(false) if len(errs) > 0 { s.Logger.Error("Stop", - "service", s.Name, "force", force, "duration", elapsed, "error", errs) } else { s.Logger.Info("Stop", - "service", s.Name, "force", force, "duration", elapsed) } @@ -302,6 +293,10 @@ func (s *Service) Serve() error { return nil } +func (s *Service) String() string { + return s.Name +} + // Telemetry func (s *Service) CreateDefaultHandlers(prefix string) { s.ServeMux.Handle(prefix+"/readyz", http.HandlerFunc(s.ReadyHandler)) @@ -327,7 +322,6 @@ func (s *Service) CreateDefaultTelemetry( return nil default: s.Logger.Error("http", - "service", s.Name, "error", err, "try", retry+1, "maxRetries", maxRetries,