Skip to content

Commit

Permalink
resub to missed notifs
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 committed Dec 2, 2024
1 parent c6c14a3 commit e6fb430
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 2 deletions.
35 changes: 35 additions & 0 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,38 @@ func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex(

return nil
}

func (db *Database) GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
) ([]*model.BTCDelegationDetails, error) {
// Convert states to a slice of strings
stateStrings := make([]string, len(states))
for i, state := range states {
stateStrings[i] = state.String()
}

filter := bson.M{"state": bson.M{"$in": stateStrings}}

cursor, err := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
Find(ctx, filter)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

var delegations []*model.BTCDelegationDetails
if err := cursor.All(ctx, &delegations); err != nil {
return nil, err
}

if len(delegations) == 0 {
return nil, &NotFoundError{
Key: "specified states",
Message: "No BTC delegations found for the specified states",
}
}

return delegations, nil
}
7 changes: 7 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,11 @@ type DbInterface interface {
* @return An error if the operation failed
*/
SaveBTCDelegationUnbondingSlashingTxHex(ctx context.Context, stakingTxHashHex string, unbondingSlashingTxHex string) error
/**
* GetBTCDelegationsByStates retrieves the BTC delegations by the states.
* @param ctx The context
* @param states The states
* @return The BTC delegations or an error
*/
GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.BTCDelegationDetails, error)
}
2 changes: 2 additions & 0 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) {

// Sync global parameters
s.SyncGlobalParams(ctx)
// Resubscribe to missed BTC notifications
s.ResubscribeToMissedBtcNotifications(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the websocket event subscription process
Expand Down
23 changes: 21 additions & 2 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package services
import (
"context"

"github.com/cometbft/cometbft/types"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
ctypes "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

Expand All @@ -26,7 +27,7 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
for {
select {
case event := <-eventChan:
newBlockEvent, ok := event.Data.(types.EventDataNewBlock)
newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
if !ok {
log.Fatal().Msg("Event is not a NewBlock event")
}
Expand All @@ -49,3 +50,21 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
}
}()
}

// Resubscribe to missed BTC notifications
func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) {
go func() {
defer s.wg.Done()
delegations, err := s.db.GetBTCDelegationsByStates(ctx, []types.DelegationState{types.StateUnbonding, types.StateSlashed})
if err != nil {
log.Fatal().Msgf("Failed to get BTC delegations: %v", err)
}

for _, delegation := range delegations {
// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
log.Fatal().Msgf("Failed to register spend notification: %v", err)
}
}
}()
}
30 changes: 30 additions & 0 deletions tests/mocks/mock_db_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e6fb430

Please sign in to comment.