Skip to content

Commit

Permalink
Multi-metrics throttler: post v21 deprecations and changes (#16915)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Feb 6, 2025
1 parent b50894f commit 1c56ca1
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 804 deletions.
4 changes: 0 additions & 4 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ func init() {
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.MetricName, "metric-name", "", "name of the metric for which we apply a new threshold (requires --threshold). If empty, the default (either 'lag' or 'custom') metric is used.")
UpdateThrottlerConfig.Flags().Float64Var(&updateThrottlerConfigOptions.Threshold, "threshold", 0, "threshold for the either default check (replication lag seconds) or custom check")
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.CustomQuery, "custom-query", "", "custom throttler check query")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckSelf, "check-as-check-self", false, "/throttler/check requests behave as is /throttler/check-self was called")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckShard, "check-as-check-shard", false, "use standard behavior for /throttler/check requests")
UpdateThrottlerConfig.Flags().MarkDeprecated("check-as-check-self", "specify metric with scope in --app-metrics to apply to all checks, or use --scope in CheckThrottler for a specific check")
UpdateThrottlerConfig.Flags().MarkDeprecated("check-as-check-shard", "specify metric with scope in --app-metrics to apply to all checks, or use --scope in CheckThrottler for a specific check")

UpdateThrottlerConfig.Flags().StringVar(&unthrottledAppRule.Name, "unthrottle-app", "", "an app name to unthrottle")
UpdateThrottlerConfig.Flags().StringVar(&throttledAppRule.Name, "throttle-app", "", "an app name to throttle")
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand/v2"
"net/http"
"os"
"path"
"strings"
Expand All @@ -33,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

Expand Down Expand Up @@ -207,7 +207,7 @@ func TestRevertSchemaChanges(t *testing.T) {
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute)

t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
Expand Down
37 changes: 3 additions & 34 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func vitessThrottleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (
flags := &throttle.CheckFlags{
Scope: base.ShardScope,
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, throttlerapp.VitessName, flags)
return resp, err
Expand All @@ -188,16 +187,14 @@ func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctl
flags := &throttle.CheckFlags{
Scope: base.ShardScope,
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*vtctldatapb.CheckThrottlerResponse, error) {
flags := &throttle.CheckFlags{
Scope: base.SelfScope,
MultiMetricsEnabled: true,
Scope: base.SelfScope,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
return resp, err
Expand All @@ -220,7 +217,7 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod
require.NoError(t, err)

time.Sleep(time.Second)
return throttle.ResponseCodeFromStatus(resp.Check.ResponseCode, int(resp.Check.StatusCode))
return resp.Check.ResponseCode
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
Expand All @@ -242,7 +239,7 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode
}
select {
case <-ctx.Done():
return resp.Check, false
return resp.Check, assert.EqualValues(t, wantCode, resp.Check.ResponseCode, "response: %+v", resp)
case <-ticker.C:
}
}
Expand Down Expand Up @@ -376,13 +373,6 @@ func TestInitialThrottler(t *testing.T) {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}

if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
Expand All @@ -401,13 +391,6 @@ func TestInitialThrottler(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -481,13 +464,11 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
t.Run("validating primary check self", func(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating replica check self", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand All @@ -514,7 +495,6 @@ func TestLag(t *testing.T) {
t.Run("expecting throttler push back", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("primary self-check should still be fine", func(t *testing.T) {
Expand All @@ -525,10 +505,6 @@ func TestLag(t *testing.T) {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
// self (on primary) is unaffected by replication lag
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
Expand All @@ -541,7 +517,6 @@ func TestLag(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("exempting test app", func(t *testing.T) {
Expand Down Expand Up @@ -619,13 +594,11 @@ func TestLag(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("replica self-check should be fine", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -668,7 +641,6 @@ func TestCustomQuery(t *testing.T) {
throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("test threads running", func(t *testing.T) {
Expand All @@ -695,7 +667,6 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -707,7 +678,6 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -730,7 +700,6 @@ func TestRestoreDefaultQuery(t *testing.T) {
t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) {
Expand Down
16 changes: 5 additions & 11 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func CheckThrottlerRaw(vtctldProcess *cluster.VtctldClientProcess, tablet *clust
args = append(args, "CheckThrottler")
if flags == nil {
flags = &throttle.CheckFlags{
Scope: base.SelfScope,
MultiMetricsEnabled: true,
Scope: base.SelfScope,
}
}
if appName != "" {
Expand Down Expand Up @@ -124,11 +123,6 @@ func UpdateThrottlerTopoConfigRaw(
args = append(args, "--threshold", fmt.Sprintf("%f", opts.Threshold))
}
args = append(args, "--custom-query", opts.CustomQuery)
if opts.CustomQuery != "" {
args = append(args, "--check-as-check-self")
} else {
args = append(args, "--check-as-check-shard")
}
if appRule != nil {
args = append(args, "--throttle-app", appRule.Name)
args = append(args, "--throttle-app-duration", time.Until(protoutil.TimeFromProto(appRule.ExpiresAt).UTC()).String())
Expand Down Expand Up @@ -485,15 +479,15 @@ func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.L
}
}

func WaitForCheckThrottlerResult(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, expect int32, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) {
func WaitForCheckThrottlerResult(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, expect tabletmanagerdatapb.CheckThrottlerResponseCode, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
resp, err := CheckThrottler(clusterInstance, tablet, appName, flags)
require.NoError(t, err)
if resp.Check.StatusCode == expect {
if resp.Check.ResponseCode == expect {
return resp, nil
}
select {
Expand Down Expand Up @@ -534,11 +528,11 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat

for {
checkResp, checkErr := http.Get(checkURL)
if checkErr != nil {
if checkErr == nil {
defer checkResp.Body.Close()
}
selfCheckResp, selfCheckErr := http.Get(selfCheckURL)
if selfCheckErr != nil {
if selfCheckErr == nil {
defer selfCheckResp.Body.Close()
}
if checkErr == nil && selfCheckErr == nil &&
Expand Down
25 changes: 21 additions & 4 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -178,16 +179,32 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s

// waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for
// the provided app name in its self check.
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) {
var gotCode int64
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := throttlerCheckSelf(tablet, throttlerApp)
require.NoError(t, err)

gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode")
require.NoError(t, err)
responseCode, err := jsonparser.GetInt([]byte(output), "ResponseCode")
require.NoError(t, err, "waitForTabletThrottlingStatus output: %v", output)

var gotCode int
switch tabletmanagerdatapb.CheckThrottlerResponseCode(responseCode) {
case tabletmanagerdatapb.CheckThrottlerResponseCode_OK:
gotCode = http.StatusOK
case tabletmanagerdatapb.CheckThrottlerResponseCode_APP_DENIED:
gotCode = http.StatusExpectationFailed
case tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED:
gotCode = http.StatusTooManyRequests
case tabletmanagerdatapb.CheckThrottlerResponseCode_UNKNOWN_METRIC:
gotCode = http.StatusNotFound
case tabletmanagerdatapb.CheckThrottlerResponseCode_INTERNAL_ERROR:
gotCode = http.StatusInternalServerError
default:
gotCode = http.StatusInternalServerError
}

if wantCode == gotCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
Expand Down
Loading

0 comments on commit 1c56ca1

Please sign in to comment.