Skip to content

Commit

Permalink
VReplication: Add throttler stats (#15221)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Feb 19, 2024
1 parent 4ddb705 commit af38099
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
}
]
},
"t1": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"t2": {
"t2": {
"column_vindexes": [
{
"column": "t1id",
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
// constraints, where the parent table is lexicographically sorted before the child table and
// thus may be dropped first, can be successfully cancelled.
func testFKCancel(t *testing.T, vc *VitessCluster) {
var targetKeyspace = "fktarget"
var sourceKeyspace = "fksource"
var workflowName = "wf2"
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
targetKeyspace := "fktarget"
sourceKeyspace := "fksource"
workflowName := "wf2"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
mt := newMoveTables(vc, &moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
Expand Down
38 changes: 30 additions & 8 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1155,7 +1156,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {

func materializeProduct(t *testing.T, useVtctldClient bool) {
t.Run("materializeProduct", func(t *testing.T) {
// materializing from "product" keyspace to "customer" keyspace
// Materializing from "product" keyspace to "customer" keyspace.
workflow := "cproduct"
keyspace := "customer"
defaultCell := vc.Cells[vc.CellNames[0]]
Expand All @@ -1169,7 +1170,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {

productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary")
t.Run("throttle-app-product", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
// Now, throttle the source side component (vstreamer), and insert some rows.
for _, tab := range productTablets {
body, err := throttleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1180,19 +1181,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
}
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
// we expect the additional rows to **not appear** in the materialized view
// To be fair to the test, we give the target time to apply the new changes. We
// expect it to NOT get them in the first place, we expect the additional rows
// to **not appear** in the materialized view.
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
// Unthrottle the vstreamer component, and expect the rows to show up.
for _, tab := range productTablets {
body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
// give time for unthrottling to take effect and for target to fetch data
// Give time for unthrottling to take effect and for targets to fetch data.
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
for _, tab := range customerTablets {
Expand All @@ -1201,8 +1216,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
})

t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle vreplication (vcopier/vapplier) on target tablets, and
// insert some more rows.
// Now, throttle vreplication on the target side (vplayer), and insert some
// more rows.
for _, tab := range customerTablets {
body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1217,6 +1232,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
// rows to **not appear** in the materialized view.
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Stats struct {

PartialQueryCount *stats.CountersWithMultiLabels
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -174,6 +176,7 @@ func NewStats() *Stats {
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
return bps
}

Expand Down Expand Up @@ -369,13 +372,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
if backoff == throttler.NotThrottled {
break
}
blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1)
// We don't bother checking for context cancellation here because the
// sleep will block only up to 1 second. (Usually, backoff is 1s / rate
// e.g. a rate of 1000 TPS results into a backoff of 1 ms.)
time.Sleep(backoff)
}

// get the response
// Get the response.
response, err := stream.Recv()
// Check context before checking error, because canceled
// contexts could be wrapped as regular errors.
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba
}

sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema)
current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell)
current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell)
if err != nil {
panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err))
}
if !callback(current.Value, nil) {
panic("sandboxTopo callback returned false")
}
Expand Down
27 changes: 26 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ type vrStats struct {
mu sync.Mutex
isOpen bool
controllers map[int32]*controller

ThrottledCount *stats.Counter
}

func (st *vrStats) register() {

st.ThrottledCount = stats.NewCounter("", "")
stats.NewGaugeFunc("VReplicationStreamCount", "Number of vreplication streams", st.numControllers)
stats.NewGaugeFunc("VReplicationLagSecondsMax", "Max vreplication seconds behind primary", st.maxReplicationLagSeconds)
stats.NewStringMapFuncWithMultiLabels(
Expand Down Expand Up @@ -502,6 +504,29 @@ func (st *vrStats) register() {
return result
})

stats.NewCounterFunc(
"VReplicationThrottledCountTotal",
"The total number of times that vreplication has been throttled",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
return st.ThrottledCount.Get()
})
stats.NewCountersFuncWithMultiLabels(
"VReplicationThrottledCounts",
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
[]string{"workflow", "id", "throttler", "component"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ThrottledCounts.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -132,7 +133,9 @@ func TestStatusHtml(t *testing.T) {
func TestVReplicationStats(t *testing.T) {
blpStats := binlogplayer.NewStats()
defer blpStats.Stop()
testStats := &vrStats{}
testStats := &vrStats{
ThrottledCount: stats.NewCounter("", ""),
}
testStats.isOpen = true
testStats.controllers = map[int32]*controller{
1: {
Expand Down Expand Up @@ -184,6 +187,14 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)

testStats.ThrottledCount.Add(99)
require.Equal(t, int64(99), testStats.ThrottledCount.Get())

blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,21 @@ func (vr *vreplicator) throttlerAppName() string {
return throttlerapp.Concatenate(names...)
}

// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record
// with a rate limit so that it's only saved in the database at most once per
// throttleUpdatesRateLimiter.tickerTime.
// It also increments the throttled count in the stats to keep track of how many
// times a VReplication workflow, and the specific sub-component, is throttled by the
// tablet throttler over time. It also increments the global throttled count to keep
// track of how many times in total vreplication has been throttled across all workflows
// (both ones that currently exist and ones that no longer do).
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
appName := appThrottled.String()
vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)
globalStats.ThrottledCount.Add(1)
err := vr.throttleUpdatesRateLimiter.Do(func() error {
tm := time.Now().Unix()
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String())
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appName)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
)

func getDefaultCollationID() int64 {
return 45
return 45 // utf8mb4_general_ci
}

var (
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func TestNoBlob(t *testing.T) {
env = nil
newEngine(t, ctx, "noblob")
defer func() {
if engine != nil {
engine.Close()
}
if env != nil {
env.Close()
}
engine = oldEngine
env = oldEnv
}()
Expand Down Expand Up @@ -1854,6 +1860,12 @@ func TestMinimalMode(t *testing.T) {
env = nil
newEngine(t, ctx, "minimal")
defer func() {
if engine != nil {
engine.Close()
}
if env != nil {
env.Close()
}
engine = oldEngine
env = oldEnv
}()
Expand Down

0 comments on commit af38099

Please sign in to comment.