Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Feb 21, 2025
1 parent 8dcca1f commit 2448a1e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 76 deletions.
48 changes: 22 additions & 26 deletions pkg/solana/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,50 +47,46 @@ var (
Name: "log_poller_query_duration",
Help: "Measures duration of Log Poller's queries fetching logs",
Buckets: sqlLatencyBuckets,
}, []string{"solanaChainID", "query", "type"})
}, []string{"chainID", "query", "type"})
lpQueryDataSets = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "log_poller_query_dataset_size",
Help: "Measures size of the datasets returned by Log Poller's queries",
}, []string{"solanaChainID", "query", "type"})
}, []string{"chainID", "query", "type"})
lpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "log_poller_logs_inserted",
Help: "Counter to track number of logs inserted by Log Poller",
}, []string{"solanaChainID"})
lpBlockInserted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "log_poller_blocks_inserted",
Help: "Counter to track number of blocks inserted by Log Poller",
}, []string{"solanaChainID"})
}, []string{"chainID"})
)

// ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries.
// It doesn't change internal logic, because all calls are delegated to the origin ORM
type ObservedORM struct {
ORM
queryDuration *prometheus.HistogramVec
datasetSize *prometheus.GaugeVec
logsInserted *prometheus.CounterVec
blocksInserted *prometheus.CounterVec
chainId string
queryDuration *prometheus.HistogramVec
datasetSize *prometheus.GaugeVec
logsInserted *prometheus.CounterVec
chainId string
}

var _ ORM = &ObservedORM{}

// NewObservedORM creates an observed version of log poller's ORM created by NewORM
// Please see ObservedLogPoller for more details on how latencies are measured
func NewObservedORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *ObservedORM {
return &ObservedORM{
ORM: NewORM(chainID, ds, lggr),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
logsInserted: lpLogsInserted,
blocksInserted: lpBlockInserted,
chainId: chainID,
ORM: NewORM(chainID, ds, lggr),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
logsInserted: lpLogsInserted,
chainId: chainID,
}
}

func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error {
err := withObservedExec(o, "InsertLogs", create, func() error {
return o.ORM.InsertLogs(ctx, logs)
})
trackInsertedLogsAndBlock(o, logs, nil, err)
trackInsertedLogs(o, logs, err)
return err
}

Expand Down Expand Up @@ -136,6 +132,12 @@ func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expressio
})
}

func (o *ObservedORM) GetLatestBlock(ctx context.Context) (int64, error) {
return withObservedQuery(o, "GetLatestBlack", func() (int64, error) {
return o.ORM.GetLatestBlock(ctx)
})
}

func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand Down Expand Up @@ -182,17 +184,11 @@ func withObservedExec(o *ObservedORM, query string, queryType queryType, exec fu
return exec()
}

func trackInsertedLogsAndBlock(o *ObservedORM, logs []Log, block *any, err error) {
func trackInsertedLogs(o *ObservedORM, logs []Log, err error) {
if err != nil {
return
}
o.logsInserted.
WithLabelValues(o.chainId).
Add(float64(len(logs)))

if block != nil {
o.blocksInserted.
WithLabelValues(o.chainId).
Inc()
}
}
75 changes: 25 additions & 50 deletions pkg/solana/logpoller/observability_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package logpoller

import (
"context"
"fmt"
"github.com/google/uuid"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -17,48 +17,22 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

func TestMultipleMetricsArePublished(t *testing.T) {
ctx := tests.Context(t)
chainID := uuid.NewString()
orm := createObservedORM(t, chainID)
t.Cleanup(func() { resetMetrics(*orm) })
require.Equal(t, 0, testutil.CollectAndCount(orm.queryDuration))

filter := newRandomFilter(t)
filterID, err := orm.InsertFilter(ctx, filter)
require.NoError(t, err)

filter = newRandomFilter(t)
_, err = orm.InsertFilter(ctx, filter)
require.NoError(t, err)

for i := 0; i < 20; i++ {
log := newRandomLog(t, filterID, chainID, "My Event")
err = orm.InsertLogs(ctx, []Log{log})
require.NoError(t, err)
}
_, _ = orm.SelectSeqNums(ctx)

require.Equal(t, 3, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 21, testutil.CollectAndCount(orm.datasetSize))
}

func TestShouldPublishDurationInCaseOfError(t *testing.T) {
ctx := tests.Context(t)
orm := createObservedORM(t, "testChainID")
t.Cleanup(func() { resetMetrics(*orm) })
require.Equal(t, 0, testutil.CollectAndCount(orm.queryDuration))

// TODO: how an I make this test useful?
_, err := orm.FilteredLogs(ctx, []query.Expression{}, query.LimitAndSort{}, "")
// Cancel ctx to force error
ctx, cancel := context.WithCancel(ctx)
cancel()
_, err := orm.FilteredLogs(ctx, nil, query.LimitAndSort{}, "")
require.Error(t, err)

require.Equal(t, 1, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "200", "FilteredLogs", "read"))
}

func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) {
orm := createObservedORM(t, "420")
orm := createObservedORM(t, chainID)
t.Cleanup(func() { resetMetrics(*orm) })
expectedCount := 9
expectedSize := 2
Expand All @@ -68,52 +42,54 @@ func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) {
require.NoError(t, err)
}

require.Equal(t, expectedCount, counterFromHistogramByLabels(t, orm.queryDuration, "420", "query", "read"))
require.Equal(t, expectedSize, counterFromGaugeByLabels(orm.datasetSize, "420", "query", "read"))
require.Equal(t, expectedCount, counterFromHistogramByLabels(t, orm.queryDuration, chainID, "query", "read"))
require.Equal(t, expectedSize, counterFromGaugeByLabels(orm.datasetSize, chainID, "query", "read"))

require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "420", "other_query", "read"))
require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, chainID, "other_query", "read"))
require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "5", "query", "read"))

require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "other_query", "read"))
require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, chainID, "other_query", "read"))
require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "5", "query", "read"))
}

func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) {
orm := createObservedORM(t, "420")
orm := createObservedORM(t, chainID)

_, err := withObservedQueryAndResults(orm, "errorQuery", func() ([]string, error) { return nil, fmt.Errorf("error") })
require.Error(t, err)

require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "420", "errorQuery", "read"))
require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "errorQuery", "read"))
require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, chainID, "errorQuery", "read"))
require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, chainID, "errorQuery", "read"))
}

func TestMetricsAreProperlyPopulatedForWrites(t *testing.T) {
orm := createObservedORM(t, "420")
orm := createObservedORM(t, chainID)
require.NoError(t, withObservedExec(orm, "execQuery", create, func() error { return nil }))
require.Error(t, withObservedExec(orm, "execQuery", create, func() error { return fmt.Errorf("error") }))

require.Equal(t, 2, counterFromHistogramByLabels(t, orm.queryDuration, "420", "execQuery", "create"))
require.Equal(t, 2, counterFromHistogramByLabels(t, orm.queryDuration, chainID, "execQuery", "create"))
}

func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
ctx := tests.Context(t)
orm := createObservedORM(t, "420")
logs := generateRandomLogs(t, 100, 20)
orm := createObservedORM(t, chainID)

filter := newRandomFilter(t)
filterID, err := orm.InsertFilter(tests.Context(t), filter)
require.NoError(t, err)

logs := generateRandomLogs(t, filterID, 20)

// First insert 10 logs
require.NoError(t, orm.InsertLogs(ctx, logs[:10]))
assert.Equal(t, float64(10), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(10), testutil.ToFloat64(orm.logsInserted.WithLabelValues(chainID)))

// Insert 5 more logs
require.NoError(t, orm.InsertLogs(ctx, logs[10:15]))
assert.Equal(t, float64(15), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(1), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))
assert.Equal(t, float64(15), testutil.ToFloat64(orm.logsInserted.WithLabelValues(chainID)))

// Insert 5 more logs
require.NoError(t, orm.InsertLogs(ctx, logs[15:]))
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues(chainID)))
}

func generateRandomLogs(t *testing.T, filterID int64, count int) []Log {
Expand All @@ -134,7 +110,6 @@ func resetMetrics(lp ObservedORM) {
lp.queryDuration.Reset()
lp.datasetSize.Reset()
lp.logsInserted.Reset()
lp.blocksInserted.Reset()
}

func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int {
Expand Down

0 comments on commit 2448a1e

Please sign in to comment.