Skip to content

Commit

Permalink
kvserver: add avgProposalToLocalApplicationLatency
Browse files Browse the repository at this point in the history
This patch adds a new field `avgProposalToLocalApplicationLatency` to replica,
tracking latency similar to `raft.replication.latency` with the following
differences:
1. It is tracked at the per replica level.
2. Only includes successful write commands excluding errors like
'AmbiguousResultError' or rejected proposals or for request
evaluation that did not lead to raft proposals.

This field tracks the average time between proposal evaluation and local
application of a command on successful writes. An exponentially weighted moving
average is used with an effective window size of 30, where recent data points
have a higher influence, and older data gradually decays. The metric is
thread-safe.

Notes:
1. It does not include full application on follower replicas.
2. No measurements are recorded for read-only commands or read-write commands
that do not result in writes.
3. No measurements are recorded for proposal failures (e.g. due to
AmbiguousResultError, rejected proposals, or for request
evaluation that did not lead to raft proposals).

Note that avgProposalToLocalApplicationLatency is left unused. Future
commits will use it to compute closed timestamp target
for global tables.

Informs: #72393
Release note: none
  • Loading branch information
wenyihu6 committed Feb 22, 2025
1 parent c471a72 commit acd8ebf
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 1 deletion.
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,20 @@ type Replica struct {
// computePostTrunc* methods.
pendingLogTruncations pendingLogTruncations

// avgProposalToLocalApplicationLatency tracks the average time between
// proposal evaluation and local application of a command on successful
// writes. An exponentially weighted moving average is used with an effective
// window size of 30, where recent data points have a higher influence, and
// older data gradually decays. Thread-safe.
//
// Notes:
// 1. It does not include full application on follower replicas.
// 2. No measurements are recorded for read-only commands or read-write
// commands that do not result in writes.
// 3. No measurements are recorded for proposal failures (e.g. due to
// AmbiguousResultError or rejected proposals).
avgProposalToLocalApplicationLatency *rpc.ThreadSafeMovingAverage

rangefeedMu struct {
syncutil.RWMutex
// proc is an instance of a rangefeed Processor that is capable of
Expand Down Expand Up @@ -1045,6 +1059,12 @@ func (r *Replica) LogStorageRaftMuLocked() *logstore.LogStore {
return r.raftMu.logStorage
}

// recordProposalToLocalApplicationLatency records the duration it took to
// propose and apply the last command locally on success.
func (r *Replica) recordProposalToLocalApplicationLatency(timeToProposeAndApply time.Duration) {
r.avgProposalToLocalApplicationLatency.Add(float64(timeToProposeAndApply.Nanoseconds()))
}

// cleanupFailedProposal cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
// It requires that Replica.mu is exclusively held.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (c *replicatedCmd) AckSuccess(ctx context.Context) error {
// AckOutcomeAndFinish implements the apply.AppliedCommand.
func (c *replicatedCmd) AckOutcomeAndFinish(ctx context.Context) error {
if c.IsLocal() {
c.proposal.ec.recordProposalToLocalApplicationLatency()
// finishApplication clears proposal's endCmds (c.proposal.ec), so we need
// to record the latency right before.
c.proposal.finishApplication(ctx, c.response)
}
c.finishTracingSpan()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func newUninitializedReplicaWithoutRaftGroup(
DisableTxnPushing: store.TestingKnobs().DontPushOnLockConflictError,
TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs,
}),
allocatorToken: &plan.AllocatorToken{},
allocatorToken: &plan.AllocatorToken{},
avgProposalToLocalApplicationLatency: rpc.NewThreadMovingAverage(),
}
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, rangeID)

Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,18 @@ func (ec *endCmds) poison() {
ec.repl.concMgr.PoisonReq(ec.g)
}

// recordProposalToLocalApplicationLatency records the duration of the last
// local application on successful writes.
func (ec *endCmds) recordProposalToLocalApplicationLatency() {
if ec.repl == nil {
return
}
if ts := ec.replicatingSince; !ts.IsZero() {
// Read-only commands have a zero replicatingSince timestamp.
ec.repl.recordProposalToLocalApplicationLatency(timeutil.Since(ts))
}
}

// done releases the latches acquired by the command and updates the timestamp
// cache using the final timestamp of each command. If `br` is nil, it is
// assumed that `done` is being called by a request that's dropping its latches
Expand Down
8 changes: 8 additions & 0 deletions pkg/rpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ type ThreadSafeMovingAverage struct {
ma ewma.MovingAverage
}

// NewThreadMovingAverage creates a new NewThreadMovingAverage that uses a
// SimpleEWMA. It is a weighted moving average with an effective window size of
// 30, where recent data points have a higher influence, and older data
// gradually decays.
func NewThreadMovingAverage() *ThreadSafeMovingAverage {
return &ThreadSafeMovingAverage{ma: ewma.NewMovingAverage()}
}

func (t *ThreadSafeMovingAverage) Set(v float64) {
t.Lock()
defer t.Unlock()
Expand Down

0 comments on commit acd8ebf

Please sign in to comment.