diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 84b55d59d..61a180705 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -9,6 +9,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/multiversion" store "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/occ" "github.com/cosmos/cosmos-sdk/utils/tracing" @@ -78,6 +79,7 @@ type scheduler struct { allTasks []*deliverTxTask executeCh chan func() validateCh chan func() + metrics *schedulerMetrics } // NewScheduler creates a new scheduler @@ -86,6 +88,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx workers: workers, deliverTx: deliverTxFunc, tracingInfo: tracingInfo, + metrics: &schedulerMetrics{}, } } @@ -152,11 +155,16 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { return res } -func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { +func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { res := make([]types.ResponseDeliverTx, 0, len(tasks)) + var maxIncarnation int for _, t := range tasks { + if t.Incarnation > maxIncarnation { + maxIncarnation = t.Incarnation + } res = append(res, *t.Response) } + s.metrics.maxIncarnation = maxIncarnation return res } @@ -202,6 +210,19 @@ func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) { } } +// schedulerMetrics contains metrics for the scheduler +type schedulerMetrics struct { + // maxIncarnation is the highest incarnation seen in this set + maxIncarnation int + // retries is the number of tx attempts beyond the first attempt + retries int +} + +func (s *scheduler) emitMetrics() { + telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries") + telemetry.SetGauge(float32(s.metrics.maxIncarnation), "scheduler", "max_incarnation") +} + func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) { // initialize mutli-version stores if they haven't been initialized yet s.tryInitMultiVersionStore(ctx) @@ -211,6 +232,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t s.allTasks = tasks s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) + defer s.emitMetrics() // default to number of tasks if workers is negative or 0 by this point workers := s.workers @@ -245,11 +267,13 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t if err != nil { return nil, err } + // these are retries which apply to metrics + s.metrics.retries += len(toExecute) } for _, mv := range s.multiVersionStores { mv.WriteLatestToStore() } - return collectResponses(tasks), nil + return s.collectResponses(tasks), nil } func (s *scheduler) shouldRerun(task *deliverTxTask) bool {