Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: batch processing in BTC Subscription Poller #31

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/staking-expiry-checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
defer cancel()

// Create DB client
dbClient, err := db.New(ctx, cfg.Db)
dbClient, err := db.New(ctx, &cfg.Db)
if err != nil {
log.Fatal().Err(err).Msg("error while creating db client")
}
Expand Down
9 changes: 5 additions & 4 deletions config/config-docker.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
pollers:
log-level: debug
expiry-checker:
interval: 5s
timeout: 10s
interval: 10s
timeout: 100s
btc-subscriber:
interval: 5s
timeout: 10s
interval: 10s
timeout: 100s
db:
username: root
password: example
address: "mongodb://localhost:27017"
db-name: staking-api-service
max-pagination-limit: 1000
btc:
rpchost: 127.0.0.1:38332
rpcuser: rpcuser
Expand Down
9 changes: 5 additions & 4 deletions config/config-local.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
pollers:
log-level: debug
expiry-checker:
interval: 5s
timeout: 1000s
interval: 10s
timeout: 100s
btc-subscriber:
interval: 5s
timeout: 1000s
interval: 10s
timeout: 100s
db:
username: root
password: example
address: "mongodb://localhost:27017"
db-name: staking-api-service
max-pagination-limit: 1000
btc:
rpchost: 127.0.0.1:38332
rpcuser: rpcuser
Expand Down
13 changes: 9 additions & 4 deletions internal/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
)

type DbConfig struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DbName string `mapstructure:"db-name"`
Address string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DbName string `mapstructure:"db-name"`
Address string `mapstructure:"address"`
MaxPaginationLimit int64 `mapstructure:"max-pagination-limit"`
}

func (cfg *DbConfig) Validate() error {
Expand Down Expand Up @@ -57,5 +58,9 @@ func (cfg *DbConfig) Validate() error {
return fmt.Errorf("port number must be between 1024 and 65535 (inclusive)")
}

if cfg.MaxPaginationLimit < 2 {
return fmt.Errorf("max pagination limit must be greater than 1")
}

return nil
}
61 changes: 60 additions & 1 deletion internal/db/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db
import (
"context"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

Expand All @@ -12,9 +13,15 @@ import (
type Database struct {
dbName string
client *mongo.Client
cfg *config.DbConfig
}

func New(ctx context.Context, cfg config.DbConfig) (*Database, error) {
type DbResultMap[T any] struct {
Data []T `json:"data"`
PaginationToken string `json:"paginationToken"`
}

func New(ctx context.Context, cfg *config.DbConfig) (*Database, error) {
credential := options.Credential{
Username: cfg.Username,
Password: cfg.Password,
Expand All @@ -28,6 +35,7 @@ func New(ctx context.Context, cfg config.DbConfig) (*Database, error) {
return &Database{
dbName: cfg.DbName,
client: client,
cfg: cfg,
}, nil
}

Expand All @@ -42,3 +50,54 @@ func (db *Database) Ping(ctx context.Context) error {
func (db *Database) Shutdown(ctx context.Context) error {
return db.client.Disconnect(ctx)
}

/*
Builds the result map with a pagination token.
If the result length exceeds the maximum limit, it returns the map with a token.
Otherwise, it returns the map with an empty token. Note that the pagination
limit is the maximum number of results to return.
For example, if the limit is 10, it fetches 11 but returns only 10.
The last result is used to generate the pagination token.
*/
func toResultMapWithPaginationToken[T any](paginationLimit int64, result []T, paginationKeyBuilder func(T) (string, error)) (*DbResultMap[T], error) {
if len(result) > int(paginationLimit) {
result = result[:paginationLimit]
paginationToken, err := paginationKeyBuilder(result[len(result)-1])
if err != nil {
return nil, err
}
return &DbResultMap[T]{
Data: result,
PaginationToken: paginationToken,
}, nil
}

return &DbResultMap[T]{
Data: result,
PaginationToken: "",
}, nil
}

// Finds documents in the collection with pagination in returned results.
func findWithPagination[T any](
ctx context.Context, client *mongo.Collection, filter bson.M,
options *options.FindOptions, limit int64,
paginationKeyBuilder func(T) (string, error),
) (*DbResultMap[T], error) {
// Always fetch one more than the limit to check if there are more results
// this is used to generate the pagination token
options.SetLimit(limit + 1)

cursor, err := client.Find(ctx, filter, options)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

var result []T
if err = cursor.All(ctx, &result); err != nil {
return nil, err
}

return toResultMapWithPaginationToken(limit, result, paginationKeyBuilder)
}
42 changes: 27 additions & 15 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,30 +107,42 @@ func (db *Database) GetBTCDelegationByStakingTxHash(
func (db *Database) GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
) ([]*model.DelegationDocument, error) {
// Convert states to a slice of strings
paginationToken string,
) (*DbResultMap[model.DelegationDocument], error) {
// Convert states to strings
stateStrings := make([]string, len(states))
for i, state := range states {
stateStrings[i] = state.ToString()
}

filter := bson.M{"state": bson.M{"$in": stateStrings}}
opts := options.Find().SetLimit(200) // to prevent large result sets

cursor, err := db.client.Database(db.dbName).
Collection(model.DelegationsCollection).
Find(ctx, filter, opts)
if err != nil {
return nil, err
// Build filter
filter := bson.M{
"state": bson.M{"$in": stateStrings},
}
defer cursor.Close(ctx)

var delegations []*model.DelegationDocument
if err := cursor.All(ctx, &delegations); err != nil {
return nil, err
// Setup options
options := options.Find()
options.SetSort(bson.M{"_id": 1})

// Decode pagination token if it exists
if paginationToken != "" {
decodedToken, err := model.DecodePaginationToken[model.DelegationScanPagination](paginationToken)
if err != nil {
return nil, &InvalidPaginationTokenError{
Message: "Invalid pagination token",
}
}
filter["_id"] = bson.M{"$gt": decodedToken.StakingTxHashHex}
}

return delegations, nil
return findWithPagination(
ctx,
db.client.Database(db.dbName).Collection(model.DelegationsCollection),
filter,
options,
db.cfg.MaxPaginationLimit,
model.BuildDelegationScanPaginationToken,
)
}

func (db *Database) GetBTCDelegationState(
Expand Down
6 changes: 5 additions & 1 deletion internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type DbInterface interface {
GetBTCDelegationByStakingTxHash(
ctx context.Context, stakingTxHash string,
) (*model.DelegationDocument, error)
GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.DelegationDocument, error)
GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
paginationToken string,
) (*DbResultMap[model.DelegationDocument], error)
GetBTCDelegationState(ctx context.Context, stakingTxHash string) (*types.DelegationState, error)
}
15 changes: 15 additions & 0 deletions internal/db/model/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,18 @@ type DelegationDocument struct {
StakingTx *TimelockTransaction `bson:"staking_tx"` // Always exist
UnbondingTx *TimelockTransaction `bson:"unbonding_tx,omitempty"`
}

type DelegationScanPagination struct {
StakingTxHashHex string `json:"staking_tx_hash_hex"`
}

func BuildDelegationScanPaginationToken(d DelegationDocument) (string, error) {
page := &DelegationScanPagination{
StakingTxHashHex: d.StakingTxHashHex,
}
token, err := GetPaginationToken(page)
if err != nil {
return "", err
}
return token, nil
}
27 changes: 27 additions & 0 deletions internal/db/model/pagination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package model

import (
"encoding/base64"
"encoding/json"
)

func DecodePaginationToken[T any](token string) (*T, error) {
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return nil, err
}
var d T
err = json.Unmarshal(tokenBytes, &d)
if err != nil {
return nil, err
}
return &d, nil
}

func GetPaginationToken[PaginationType any](d PaginationType) (string, error) {
tokenBytes, err := json.Marshal(d)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(tokenBytes), nil
}
16 changes: 12 additions & 4 deletions internal/observability/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (O Outcome) String() string {
var (
once sync.Once
metricsRouter *chi.Mux
pollDurationHistogram *prometheus.HistogramVec
pollerDurationHistogram *prometheus.HistogramVec
btcClientDurationHistogram *prometheus.HistogramVec
invalidTransactionsCounter *prometheus.CounterVec
failedVerifyingUnbondingTxsCounter prometheus.Counter
Expand Down Expand Up @@ -74,13 +74,13 @@ func initMetricsRouter(metricsPort int) {
// registerMetrics initializes and register the Prometheus metrics.
func registerMetrics() {
defaultHistogramBucketsSeconds := []float64{0.1, 0.5, 1, 2.5, 5, 10, 30}
pollDurationHistogram = prometheus.NewHistogramVec(
pollerDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "poll_duration_seconds",
Help: "Histogram of poll durations in seconds.",
Buckets: defaultHistogramBucketsSeconds,
},
[]string{"status"},
[]string{"poller_name", "status"},
)

btcClientDurationHistogram = prometheus.NewHistogramVec(
Expand Down Expand Up @@ -124,7 +124,7 @@ func registerMetrics() {
)

prometheus.MustRegister(
pollDurationHistogram,
pollerDurationHistogram,
btcClientDurationHistogram,
invalidTransactionsCounter,
failedVerifyingUnbondingTxsCounter,
Expand Down Expand Up @@ -179,3 +179,11 @@ func IncrementFailedVerifyingStakingWithdrawalTxCounter() {
func IncrementFailedVerifyingUnbondingWithdrawalTxCounter() {
failedVerifyingUnbondingWithdrawalTxsCounter.Inc()
}

func ObservePollerDuration(pollerName string, duration time.Duration, err error) {
status := "success"
if err != nil {
status = "failure"
}
pollerDurationHistogram.WithLabelValues(pollerName, status).Observe(duration.Seconds())
}
Loading
Loading