Skip to content

Commit

Permalink
[OCC] add metrics for scheduler (#431)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- **retries** represents number of tx attempts beyond the first attempt
- **max_incarnation** is the highest incarnation seen in a given block

## Testing performed to validate your change
- lower environment
  • Loading branch information
stevenlanders authored and udpatil committed Feb 27, 2024
1 parent 5f9aa92 commit 308262f
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cosmos/cosmos-sdk/store/multiversion"
store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/occ"
"github.com/cosmos/cosmos-sdk/utils/tracing"
Expand Down Expand Up @@ -78,6 +79,7 @@ type scheduler struct {
allTasks []*deliverTxTask
executeCh chan func()
validateCh chan func()
metrics *schedulerMetrics
}

// NewScheduler creates a new scheduler
Expand All @@ -86,6 +88,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx
workers: workers,
deliverTx: deliverTxFunc,
tracingInfo: tracingInfo,
metrics: &schedulerMetrics{},
}
}

Expand Down Expand Up @@ -152,11 +155,16 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
return res
}

func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
res := make([]types.ResponseDeliverTx, 0, len(tasks))
var maxIncarnation int
for _, t := range tasks {
if t.Incarnation > maxIncarnation {
maxIncarnation = t.Incarnation
}
res = append(res, *t.Response)
}
s.metrics.maxIncarnation = maxIncarnation
return res
}

Expand Down Expand Up @@ -202,6 +210,19 @@ func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) {
}
}

// schedulerMetrics contains metrics for the scheduler
type schedulerMetrics struct {
// maxIncarnation is the highest incarnation seen in this set
maxIncarnation int
// retries is the number of tx attempts beyond the first attempt
retries int
}

func (s *scheduler) emitMetrics() {
telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries")
telemetry.SetGauge(float32(s.metrics.maxIncarnation), "scheduler", "max_incarnation")
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
Expand All @@ -211,6 +232,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
s.allTasks = tasks
s.executeCh = make(chan func(), len(tasks))
s.validateCh = make(chan func(), len(tasks))
defer s.emitMetrics()

// default to number of tasks if workers is negative or 0 by this point
workers := s.workers
Expand Down Expand Up @@ -245,11 +267,13 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
if err != nil {
return nil, err
}
// these are retries which apply to metrics
s.metrics.retries += len(toExecute)
}
for _, mv := range s.multiVersionStores {
mv.WriteLatestToStore()
}
return collectResponses(tasks), nil
return s.collectResponses(tasks), nil
}

func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
Expand Down

0 comments on commit 308262f

Please sign in to comment.