diff --git a/pkg/beacon/beacon.go b/pkg/beacon/beacon.go index 545195c..1ccab71 100644 --- a/pkg/beacon/beacon.go +++ b/pkg/beacon/beacon.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" eth2client "github.com/attestantio/go-eth2-client" @@ -142,6 +143,8 @@ type Node interface { OnHealthCheckSucceeded(ctx context.Context, handler func(ctx context.Context, event *HealthCheckSucceededEvent) error) // OnFinalityCheckpointUpdated is called when a the head finality checkpoint is updated. OnFinalityCheckpointUpdated(ctx context.Context, handler func(ctx context.Context, event *FinalityCheckpointUpdated) error) + // OnFirstTimeHealthy is called when the node is healthy for the first time. + OnFirstTimeHealthy(ctx context.Context, handler func(ctx context.Context, event *FirstTimeHealthyEvent) error) } // Node represents an Ethereum beacon node. It computes values based on the spec. @@ -175,6 +178,11 @@ type node struct { metrics *Metrics + Ready bool + + hasEmittedFirstTimeHealthy bool + firstHealthyMutex sync.Mutex + crons *gocron.Scheduler } @@ -189,6 +197,8 @@ func NewNode(log logrus.FieldLogger, config *Config, namespace string, options O broker: emission.NewEmitter(), stat: NewStatus(options.HealthCheck.SuccessfulResponses, options.HealthCheck.FailedResponses), + + firstHealthyMutex: sync.Mutex{}, } if options.PrometheusMetrics { @@ -203,6 +213,8 @@ func NewNode(log logrus.FieldLogger, config *Config, namespace string, options O } func (n *node) Start(ctx context.Context) error { + n.log.Info("Starting beacon...") + ctx, cancel := context.WithCancel(ctx) n.ctx = ctx n.cancel = cancel @@ -263,6 +275,8 @@ func (n *node) Start(ctx context.Context) error { s.StartAsync() + n.log.Info("Beacon started!") + return nil } @@ -351,11 +365,13 @@ func (n *node) bootstrap(ctx context.Context) error { return err } - n.publishReady(ctx) - //nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine go n.ensureBeaconSubscription(ctx) + n.Ready = true + + go n.publishReady(ctx) + return nil } @@ -428,12 +444,19 @@ func (n *node) runHealthcheck(ctx context.Context) { n.stat.Health().RecordSuccess() + n.firstHealthyMutex.Lock() + defer n.firstHealthyMutex.Unlock() + + if !n.hasEmittedFirstTimeHealthy { + n.hasEmittedFirstTimeHealthy = true + + go n.publishFirstTimeHealthy(ctx) + } + n.publishHealthCheckSucceeded(ctx, time.Since(start)) } func (n *node) initializeState(ctx context.Context) error { - n.log.Info("Initializing beacon state") - spec, err := n.FetchSpec(ctx) if err != nil { return err @@ -446,8 +469,6 @@ func (n *node) initializeState(ctx context.Context) error { n.wallclock = ethwallclock.NewEthereumBeaconChain(genesis.GenesisTime, spec.SecondsPerSlot.AsDuration(), uint64(spec.SlotsPerEpoch)) - n.log.Info("Beacon state initialized! Ready to serve requests...") - return nil } diff --git a/pkg/beacon/event.go b/pkg/beacon/event.go index 2a42808..a813538 100644 --- a/pkg/beacon/event.go +++ b/pkg/beacon/event.go @@ -37,6 +37,7 @@ const ( topicHealthCheckSucceeded = "health_check_suceeded" topicHealthCheckFailed = "health_check_failed" topicFinalityCheckpointUpdated = "finality_checkpoint_updated" + topicFirstTimeHealthy = "first_time_healthy" // Official beacon events that are proxied topicAttestation = "attestation" @@ -92,3 +93,7 @@ type HealthCheckFailedEvent struct { type FinalityCheckpointUpdated struct { Finality *v1.Finality } + +// FirstTimeHealthyEvent is emitted when the node is first considered healthy. +type FirstTimeHealthyEvent struct { +} diff --git a/pkg/beacon/options.go b/pkg/beacon/options.go index b4cde81..c6e3414 100644 --- a/pkg/beacon/options.go +++ b/pkg/beacon/options.go @@ -92,6 +92,7 @@ func DefaultEnabledBeaconSubscriptionOptions() BeaconSubscriptionOptions { topicHead, topicVoluntaryExit, topicContributionAndProof, + topicBlobSidecar, }, } } diff --git a/pkg/beacon/publisher.go b/pkg/beacon/publisher.go index cbbecc6..041cb5f 100644 --- a/pkg/beacon/publisher.go +++ b/pkg/beacon/publisher.go @@ -100,3 +100,7 @@ func (n *node) publishFinalityCheckpointUpdated(ctx context.Context, finality *v Finality: finality, }) } + +func (n *node) publishFirstTimeHealthy(ctx context.Context) { + n.broker.Emit(topicFirstTimeHealthy, &FirstTimeHealthyEvent{}) +} diff --git a/pkg/beacon/subscriber.go b/pkg/beacon/subscriber.go index 93701d1..5147a4d 100644 --- a/pkg/beacon/subscriber.go +++ b/pkg/beacon/subscriber.go @@ -123,3 +123,9 @@ func (n *node) OnFinalityCheckpointUpdated(ctx context.Context, handler func(ctx n.handleSubscriberError(handler(ctx, event), topicFinalityCheckpointUpdated) }) } + +func (n *node) OnFirstTimeHealthy(ctx context.Context, handler func(ctx context.Context, event *FirstTimeHealthyEvent) error) { + n.broker.On(topicFirstTimeHealthy, func(event *FirstTimeHealthyEvent) { + n.handleSubscriberError(handler(ctx, event), topicFirstTimeHealthy) + }) +} diff --git a/pkg/beacon/subscriptions.go b/pkg/beacon/subscriptions.go index 01cf415..746a2a2 100644 --- a/pkg/beacon/subscriptions.go +++ b/pkg/beacon/subscriptions.go @@ -47,19 +47,7 @@ func (n *node) subscribeToBeaconEvents(ctx context.Context) error { return errors.New("client does not implement eth2client.Subscriptions") } - topics := []string{} - - for key, supported := range v1.SupportedEventTopics { - if !supported { - continue - } - - if !n.options.BeaconSubscription.Topics.Exists(key) { - continue - } - - topics = append(topics, key) - } + topics := n.options.BeaconSubscription.Topics n.log.WithField("topics", topics).Info("Subscribing to events upstream")