diff --git a/go/osutil/loadavg.go b/go/osutil/loadavg.go new file mode 100644 index 00000000000..0c71f9e18b1 --- /dev/null +++ b/go/osutil/loadavg.go @@ -0,0 +1,33 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +import ( + "fmt" + "strconv" + "strings" +) + +// parseLoadAvg parses the load average from the content of /proc/loadavg or sysctl output. +// Input such as "1.00 0.99 0.98 1/1 1", "2.83 3.01 3.36" +func parseLoadAvg(content string) (float64, error) { + fields := strings.Fields(content) + if len(fields) == 0 { + return 0, fmt.Errorf("unexpected loadavg content: %s", content) + } + return strconv.ParseFloat(fields[0], 64) +} diff --git a/go/osutil/loadavg_darwin.go b/go/osutil/loadavg_darwin.go new file mode 100644 index 00000000000..f7b5b6e492a --- /dev/null +++ b/go/osutil/loadavg_darwin.go @@ -0,0 +1,40 @@ +//go:build darwin + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +import ( + "fmt" + "os/exec" +) + +// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems. +// On other systems, it returns 0 with no error. +func LoadAvg() (float64, error) { + cmd := exec.Command("sysctl", "-n", "vm.loadavg") + // Sample output: `{ 2.83 3.01 3.36 }` + output, err := cmd.CombinedOutput() + if err != nil { + return 0, err + } + if len(output) < 1 { + return 0, fmt.Errorf("unexpected sysctl output: %q", output) + } + output = output[1:] // Remove the leading `{ ` + return parseLoadAvg(string(output)) +} diff --git a/go/osutil/loadavg_linux.go b/go/osutil/loadavg_linux.go new file mode 100644 index 00000000000..7663bf60ab7 --- /dev/null +++ b/go/osutil/loadavg_linux.go @@ -0,0 +1,33 @@ +//go:build linux + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +import ( + "os" +) + +// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems. +// On other systems, it returns 0 with no error. +func LoadAvg() (float64, error) { + content, err := os.ReadFile("/proc/loadavg") + if err != nil { + return 0, err + } + return parseLoadAvg(string(content)) +} diff --git a/go/osutil/loadavg_other.go b/go/osutil/loadavg_other.go new file mode 100644 index 00000000000..a516cb46a66 --- /dev/null +++ b/go/osutil/loadavg_other.go @@ -0,0 +1,25 @@ +//go:build !linux && !darwin + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems. +// On other systems, it returns 0 with no error. +func LoadAvg() (float64, error) { + return 0, nil +} diff --git a/go/osutil/loadavg_test.go b/go/osutil/loadavg_test.go new file mode 100644 index 00000000000..5f3831648c6 --- /dev/null +++ b/go/osutil/loadavg_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLoadAvgValue(t *testing.T) { + tcases := []struct { + input string + loadavg float64 + isError bool + }{ + { + input: "", + isError: true, + }, + { + input: "{}", + isError: true, + }, + { + input: "{ x y z }", + isError: true, + }, + { + input: "1", + loadavg: 1.0, + }, + { + input: "0.00 0.00 0.00 1/1 1", + loadavg: 0.0, + }, + { + input: "2.72 2.89 3.17", + loadavg: 2.72, + }, + { + input: " 2.72 2.89 3.17", + loadavg: 2.72, + }, + } + for _, tcase := range tcases { + t.Run(tcase.input, func(t *testing.T) { + loadavg, err := parseLoadAvg(tcase.input) + if tcase.isError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tcase.loadavg, loadavg) + } + }) + } +} + +func TestLoadAvg(t *testing.T) { + loadavg, err := LoadAvg() + assert.NoError(t, err) + assert.GreaterOrEqual(t, loadavg, 0.0) +} diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 08cea643940..f96069c81b8 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -215,7 +215,7 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod } // waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check -func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) bool { +func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) (*tabletmanagerdatapb.CheckThrottlerResponse, bool) { _ = warmUpHeartbeat(t) ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4) defer cancel() @@ -229,11 +229,11 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode if wantCode == resp.Check.ResponseCode { // Wait for any cached check values to be cleared and the new // status value to be in effect everywhere before returning. - return true + return resp.Check, true } select { case <-ctx.Done(): - return assert.EqualValues(t, wantCode, resp.Check.StatusCode, "response: %+v", resp) + return resp.Check, false case <-ticker.C: } } @@ -779,16 +779,16 @@ func TestUpdateAppCheckedMetrics(t *testing.T) { } waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED) }) - t.Run("assigning 'loadavg' metrics to 'test' app", func(t *testing.T) { + t.Run("assigning 'threads_running' metrics to 'test' app", func(t *testing.T) { { - req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 7777} + req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 7777} _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil) assert.NoError(t, err) } { req := &vtctldatapb.UpdateThrottlerConfigRequest{} appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{ - testAppName.String(): {Names: []string{"loadavg"}}, + testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String()}}, } _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics) assert.NoError(t, err) @@ -802,18 +802,18 @@ func TestUpdateAppCheckedMetrics(t *testing.T) { for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { throttler.WaitForThrottlerStatusEnabled(t, &clusterInstance.VtctldClientProcess, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout) } - t.Run("validating OK response from throttler since it's checking loadavg", func(t *testing.T) { - if !waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK) { + t.Run("validating OK response from throttler since it's checking threads_running", func(t *testing.T) { + if _, ok := waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK); !ok { t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) } }) }) - t.Run("assigning 'loadavg,lag' metrics to 'test' app", func(t *testing.T) { + t.Run("assigning 'threads_running,lag' metrics to 'test' app", func(t *testing.T) { { req := &vtctldatapb.UpdateThrottlerConfigRequest{} appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{ - testAppName.String(): {Names: []string{"loadavg,lag"}}, + testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String(), base.LagMetricName.String()}}, } _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics) assert.NoError(t, err) @@ -831,9 +831,51 @@ func TestUpdateAppCheckedMetrics(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED) }) }) + t.Run("assigning 'mysqld-loadavg,mysqld-datadir-used-ratio' metrics to 'test' app", func(t *testing.T) { + { + req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.MysqldDatadirUsedRatioMetricName.String(), Threshold: 0.9999} + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil) + assert.NoError(t, err) + } + { + req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.MysqldLoadAvgMetricName.String(), Threshold: 5555} + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil) + assert.NoError(t, err) + } + { + req := &vtctldatapb.UpdateThrottlerConfigRequest{} + appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{ + testAppName.String(): {Names: []string{base.MysqldDatadirUsedRatioMetricName.String(), base.MysqldLoadAvgMetricName.String()}}, + } + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics) + assert.NoError(t, err) + } + { + req := &vtctldatapb.UpdateThrottlerConfigRequest{Threshold: extremelyHighThreshold.Seconds()} + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil) + assert.NoError(t, err) + } + // Wait for the throttler to be enabled everywhere with new config. + for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { + throttler.WaitForThrottlerStatusEnabled(t, &clusterInstance.VtctldClientProcess, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: extremelyHighThreshold.Seconds()}, throttlerEnabledTimeout) + } + t.Run("validating OK response from throttler since it's checking mysqld-loadavg,mysqld-datadir-used-ratio", func(t *testing.T) { + resp, ok := waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK) + if !ok { + t.Logf("response: %+v", resp) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } + require.Contains(t, resp.Metrics, base.MysqldDatadirUsedRatioMetricName.String()) + require.Contains(t, resp.Metrics, base.MysqldLoadAvgMetricName.String()) + assert.NotContains(t, resp.Metrics, base.ThreadsRunningMetricName.String()) + + assert.NotZero(t, resp.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value) + }) + }) t.Run("removing assignment from 'test' app and restoring defaults", func(t *testing.T) { { - req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 0} + req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 0} _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil) assert.NoError(t, err) } diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index fccad19c324..a426355e01c 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -529,7 +529,7 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat selfCheckURL := fmt.Sprintf("http://localhost:%d/throttler/check-self", tablet.HTTPPort) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(time.Second) defer ticker.Stop() for { @@ -548,8 +548,10 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat } select { case <-ctx.Done(): - t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen value: %+v", - tablet.Alias, timeout, checkResp) + respByte, _ := io.ReadAll(checkResp.Body) + body := string(respByte) + require.Failf(t, "time out", "waiting for %s tablet's throttler to return a valid result after %v; last seen result: %+v", + tablet.Alias, timeout, body) return case <-ticker.C: } diff --git a/go/textutil/strings.go b/go/textutil/strings.go index 2a923cd3259..a17daf8d382 100644 --- a/go/textutil/strings.go +++ b/go/textutil/strings.go @@ -90,6 +90,19 @@ func SingleWordCamel(w string) string { return strings.ToUpper(w[0:1]) + strings.ToLower(w[1:]) } +var multiWordSplitterRegexp = regexp.MustCompile(`[-_.\s]+`) + +// PascalCase turns a string into PascalCase by splitting it into words and +// capitalizing the first letter of each word. +func PascalCase(w string) string { + var b strings.Builder + words := multiWordSplitterRegexp.Split(w, -1) + for _, word := range words { + b.WriteString(SingleWordCamel(word)) + } + return b.String() +} + // ValueIsSimulatedNull returns true if the slice value represents // a NULL or unknown/unspecified value. This is used to distinguish // between a zero value empty slice and a user provided value of an diff --git a/go/textutil/strings_test.go b/go/textutil/strings_test.go index d65c187c4cb..828f39847cf 100644 --- a/go/textutil/strings_test.go +++ b/go/textutil/strings_test.go @@ -122,6 +122,64 @@ func TestSingleWordCamel(t *testing.T) { } } +func TestPascalCase(t *testing.T) { + tt := []struct { + word string + expect string + }{ + { + word: "", + expect: "", + }, + { + word: "_", + expect: "", + }, + { + word: "_a", + expect: "A", + }, + { + word: "A", + expect: "A", + }, + { + word: "mysql", + expect: "Mysql", + }, + { + word: "mySQL", + expect: "Mysql", + }, + { + word: "foo-bar", + expect: "FooBar", + }, + { + word: "mysql-server", + expect: "MysqlServer", + }, + { + word: "io_util", + expect: "IoUtil", + }, + { + word: "there and back again", + expect: "ThereAndBackAgain", + }, + { + word: "combine_all_OF the\tabove", + expect: "CombineAllOfTheAbove", + }, + } + for _, tc := range tt { + t.Run(tc.word, func(t *testing.T) { + pascal := PascalCase(tc.word) + assert.Equal(t, tc.expect, pascal) + }) + } +} + func TestValueIsSimulatedNull(t *testing.T) { tt := []struct { name string diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 72c1d8f6658..d7435705a8a 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -47,6 +47,7 @@ import ( "vitess.io/vitess/config" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/osutil" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" @@ -1332,18 +1333,7 @@ func hostMetrics(ctx context.Context, cnf *Mycnf) (*mysqlctlpb.HostMetricsRespon _ = func() error { metric := newMetric("loadavg") - if runtime.GOOS != "linux" { - return withError(metric, fmt.Errorf("loadavg metric is only available on Linux")) - } - content, err := os.ReadFile("/proc/loadavg") - if err != nil { - return withError(metric, err) - } - fields := strings.Fields(string(content)) - if len(fields) == 0 { - return withError(metric, fmt.Errorf("unexpected /proc/loadavg content")) - } - loadAvg, err := strconv.ParseFloat(fields[0], 64) + loadAvg, err := osutil.LoadAvg() if err != nil { return withError(metric, err) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_cache.go b/go/vt/vttablet/tabletserver/throttle/base/metric_cache.go index 8695cb83229..faad65ca79e 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/metric_cache.go +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_cache.go @@ -49,6 +49,7 @@ import ( "github.com/patrickmn/go-cache" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) // MetricsQueryType indicates the type of metrics query on MySQL backend. See following. @@ -142,13 +143,13 @@ func (metric *ThrottleMetric) WithError(err error) *ThrottleMetric { // ReadThrottleMetrics returns a metric for the given probe. Either by explicit query // or via SHOW REPLICA STATUS -func ReadThrottleMetrics(ctx context.Context, probe *Probe, metricsFunc func(context.Context) ThrottleMetrics) ThrottleMetrics { +func ReadThrottleMetrics(ctx context.Context, probe *Probe, tmClient tmclient.TabletManagerClient, metricsFunc func(context.Context, tmclient.TabletManagerClient) ThrottleMetrics) ThrottleMetrics { if metrics := getCachedThrottleMetrics(probe); metrics != nil { return metrics } started := time.Now() - throttleMetrics := metricsFunc(ctx) + throttleMetrics := metricsFunc(ctx, tmClient) go func(metrics ThrottleMetrics, started time.Time) { stats.GetOrNewGauge("ThrottlerProbesLatency", "probes latency").Set(time.Since(started).Nanoseconds()) diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_name.go b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go index 98e1288fb23..607192b9c0c 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/metric_name.go +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go @@ -60,11 +60,13 @@ func (names MetricNames) Unique() MetricNames { } const ( - DefaultMetricName MetricName = "default" - LagMetricName MetricName = "lag" - ThreadsRunningMetricName MetricName = "threads_running" - CustomMetricName MetricName = "custom" - LoadAvgMetricName MetricName = "loadavg" + DefaultMetricName MetricName = "default" + LagMetricName MetricName = "lag" + ThreadsRunningMetricName MetricName = "threads_running" + CustomMetricName MetricName = "custom" + LoadAvgMetricName MetricName = "loadavg" + MysqldLoadAvgMetricName MetricName = "mysqld-loadavg" + MysqldDatadirUsedRatioMetricName MetricName = "mysqld-datadir-used-ratio" ) func (metric MetricName) DefaultScope() Scope { diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go b/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go index 9867ca18db3..ffd7f674cc2 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go @@ -21,6 +21,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/textutil" ) func TestAggregateName(t *testing.T) { @@ -238,4 +241,25 @@ func TestKnownMetricNames(t *testing.T) { assert.Contains(t, KnownMetricNames, LoadAvgMetricName) assert.Contains(t, KnownMetricNames, CustomMetricName) assert.Contains(t, KnownMetricNames, DefaultMetricName) + assert.Contains(t, KnownMetricNames, MysqldLoadAvgMetricName) + assert.Contains(t, KnownMetricNames, MysqldDatadirUsedRatioMetricName) +} + +func TestSingleWordCamelKnownMetricNames(t *testing.T) { + expectCases := map[MetricName]string{ + LagMetricName: "Lag", + ThreadsRunningMetricName: "ThreadsRunning", + LoadAvgMetricName: "Loadavg", + CustomMetricName: "Custom", + DefaultMetricName: "Default", + MysqldLoadAvgMetricName: "MysqldLoadavg", + MysqldDatadirUsedRatioMetricName: "MysqldDatadirUsedRatio", + } + for _, metricName := range KnownMetricNames { + t.Run(metricName.String(), func(t *testing.T) { + expect, ok := expectCases[metricName] + require.True(t, ok) + assert.Equal(t, expect, textutil.PascalCase(metricName.String())) + }) + } } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric.go index 220dfa6bf60..88fbe2bdd13 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric.go @@ -21,15 +21,24 @@ import ( "fmt" "strconv" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) +type SelfMetricReadParams struct { + Throttler metricsPublisher + Conn *connpool.Conn + TmClient tmclient.TabletManagerClient + TabletInfo *topo.TabletInfo +} + type SelfMetric interface { Name() MetricName DefaultScope() Scope DefaultThreshold() float64 RequiresConn() bool - Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric + Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric } var ( diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go index 585e63ea285..88f789e5dcd 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go @@ -18,8 +18,6 @@ package base import ( "context" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{}) @@ -43,6 +41,6 @@ func (m *CustomQuerySelfMetric) RequiresConn() bool { return true } -func (m *CustomQuerySelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { - return ReadSelfMySQLThrottleMetric(ctx, conn, throttler.GetCustomMetricsQuery()) +func (m *CustomQuerySelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, params.Conn, params.Throttler.GetCustomMetricsQuery()) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go index 8bce295da7c..97309fa6ea9 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go @@ -19,8 +19,6 @@ package base import ( "context" "fmt" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var _ SelfMetric = registerSelfMetric(&DefaultSelfMetric{}) @@ -44,7 +42,7 @@ func (m *DefaultSelfMetric) RequiresConn() bool { return false } -func (m *DefaultSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { +func (m *DefaultSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { return &ThrottleMetric{ Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"), } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go index dc25ee5622a..3d0e4beebe1 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go @@ -23,7 +23,6 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var ( @@ -65,6 +64,6 @@ func (m *LagSelfMetric) RequiresConn() bool { return true } -func (m *LagSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { - return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery()) +func (m *LagSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, params.Conn, m.GetQuery()) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go index 40a2878421a..2d880169020 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go @@ -18,20 +18,16 @@ package base import ( "context" - "fmt" - "os" "runtime" - "strconv" - "strings" + "sync/atomic" + "time" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/osutil" ) var ( - loadavgOnlyAvailableOnLinuxMetric = &ThrottleMetric{ - Scope: SelfScope, - Err: fmt.Errorf("loadavg metric is only available on Linux"), - } + cachedLoadAvgMetric atomic.Pointer[ThrottleMetric] + loadAvgCacheDuration = 1 * time.Second ) var _ SelfMetric = registerSelfMetric(&LoadAvgSelfMetric{}) @@ -55,27 +51,26 @@ func (m *LoadAvgSelfMetric) RequiresConn() bool { return false } -func (m *LoadAvgSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { - if runtime.GOOS != "linux" { - return loadavgOnlyAvailableOnLinuxMetric +func (m *LoadAvgSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + // This function will be called sequentially, and therefore does not need strong mutex protection. Still, we use atomics + // to ensure correctness in case an external goroutine tries to read the metric concurrently. + metric := cachedLoadAvgMetric.Load() + if metric != nil { + return metric } - metric := &ThrottleMetric{ + metric = &ThrottleMetric{ Scope: SelfScope, } - { - content, err := os.ReadFile("/proc/loadavg") - if err != nil { - return metric.WithError(err) - } - fields := strings.Fields(string(content)) - if len(fields) == 0 { - return metric.WithError(fmt.Errorf("unexpected /proc/loadavg content")) - } - loadAvg, err := strconv.ParseFloat(fields[0], 64) - if err != nil { - return metric.WithError(err) - } - metric.Value = loadAvg / float64(runtime.NumCPU()) + val, err := osutil.LoadAvg() + if err != nil { + return metric.WithError(err) } + metric.Value = val / float64(runtime.NumCPU()) + + cachedLoadAvgMetric.Store(metric) + time.AfterFunc(loadAvgCacheDuration, func() { + cachedLoadAvgMetric.Store(nil) + }) + return metric } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld.go new file mode 100644 index 00000000000..321837d86b4 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld.go @@ -0,0 +1,156 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "vitess.io/vitess/go/timer" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" +) + +var ( + mysqlHostMetricsRpcTimeout = 5 * time.Second + mysqlHostMetricsRateLimit = 10 * time.Second + mysqlHostMetricsRateLimiter atomic.Pointer[timer.RateLimiter] + lastMySQLHostMetricsResponse atomic.Pointer[tabletmanagerdatapb.MysqlHostMetricsResponse] +) + +// getMysqlMetricsRateLimiter returns a rate limiter that is active until the given context is cancelled. +// This function will be called sequentially, but nonetheless it offers _some_ concurrent safety. Namely, +// that a created rate limiter is guaranteed to be cleaned up +func getMysqlMetricsRateLimiter(ctx context.Context, rateLimit time.Duration) *timer.RateLimiter { + rateLimiter := mysqlHostMetricsRateLimiter.Load() + if rateLimiter == nil { + rateLimiter = timer.NewRateLimiter(rateLimit) + go func() { + defer mysqlHostMetricsRateLimiter.Store(nil) + defer rateLimiter.Stop() + <-ctx.Done() + }() + mysqlHostMetricsRateLimiter.Store(rateLimiter) + } + return rateLimiter +} + +// readMysqlHostMetrics reads MySQL host metrics sporadically from the tablet manager (which in turn reads +// them from mysql deamon). The metrics are then cached, whether successful or not. +// This idea is that is is very wasteful to read these metrics for every single query. E.g. right now the throttler +// can issue 4 reads per second, which is wasteful to go through two RPCs to get the disk space usage for example. Even the load +// average on the MySQL server is not that susceptible to change. +func readMysqlHostMetrics(ctx context.Context, params *SelfMetricReadParams) error { + if params.TmClient == nil { + return fmt.Errorf("tmClient is nil") + } + if params.TabletInfo == nil { + return fmt.Errorf("tabletInfo is nil") + } + rateLimiter := getMysqlMetricsRateLimiter(ctx, mysqlHostMetricsRateLimit) + err := rateLimiter.Do(func() error { + ctx, cancel := context.WithTimeout(ctx, mysqlHostMetricsRpcTimeout) + defer cancel() + + resp, err := params.TmClient.MysqlHostMetrics(ctx, params.TabletInfo.Tablet, &tabletmanagerdatapb.MysqlHostMetricsRequest{}) + if err != nil { + return err + } + lastMySQLHostMetricsResponse.Store(resp) + return nil + }) + return err +} + +// getMysqlHostMetric gets a metric from the last read MySQL host metrics. The metric will either be directly read from +// tablet manager (which then reads it from the mysql deamon), or from the cache. +func getMysqlHostMetric(ctx context.Context, params *SelfMetricReadParams, mysqlHostMetricName string) *ThrottleMetric { + metric := &ThrottleMetric{ + Scope: SelfScope, + } + if err := readMysqlHostMetrics(ctx, params); err != nil { + return metric.WithError(err) + } + resp := lastMySQLHostMetricsResponse.Load() + if resp == nil { + return metric.WithError(ErrNoResultYet) + } + mysqlMetric := resp.HostMetrics.Metrics[mysqlHostMetricName] + if mysqlMetric == nil { + return metric.WithError(ErrNoSuchMetric) + } + metric.Value = mysqlMetric.Value + if mysqlMetric.Error != nil { + metric.Err = errors.New(mysqlMetric.Error.Message) + } + return metric +} + +var _ SelfMetric = registerSelfMetric(&MysqldLoadAvgSelfMetric{}) +var _ SelfMetric = registerSelfMetric(&MysqldDatadirUsedRatioSelfMetric{}) + +// MysqldLoadAvgSelfMetric stands for the load average per cpu, on the MySQL host. +type MysqldLoadAvgSelfMetric struct { +} + +func (m *MysqldLoadAvgSelfMetric) Name() MetricName { + return MysqldLoadAvgMetricName +} + +func (m *MysqldLoadAvgSelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *MysqldLoadAvgSelfMetric) DefaultThreshold() float64 { + return 1.0 +} + +func (m *MysqldLoadAvgSelfMetric) RequiresConn() bool { + return false +} + +func (m *MysqldLoadAvgSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + return getMysqlHostMetric(ctx, params, "loadavg") +} + +// MysqldDatadirUsedRatioSelfMetric stands for the disk space usage of the mount where MySQL's datadir is located. +// Range: 0.0 (empty) - 1.0 (full) +type MysqldDatadirUsedRatioSelfMetric struct { +} + +func (m *MysqldDatadirUsedRatioSelfMetric) Name() MetricName { + return MysqldDatadirUsedRatioMetricName +} + +func (m *MysqldDatadirUsedRatioSelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *MysqldDatadirUsedRatioSelfMetric) DefaultThreshold() float64 { + return 0.98 +} + +func (m *MysqldDatadirUsedRatioSelfMetric) RequiresConn() bool { + return false +} + +func (m *MysqldDatadirUsedRatioSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + return getMysqlHostMetric(ctx, params, "datadir-used-ratio") +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld_test.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld_test.go new file mode 100644 index 00000000000..39d3f3f5ec2 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestGetMysqlMetricsRateLimiter(t *testing.T) { + rateLimit := 10 * time.Millisecond + for i := range 3 { + testName := fmt.Sprintf("iteration %d", i) + t.Run(testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + { + rateLimiter := mysqlHostMetricsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + rateLimiter := getMysqlMetricsRateLimiter(ctx, rateLimit) + assert.NotNil(t, rateLimiter) + for range 5 { + r := getMysqlMetricsRateLimiter(ctx, rateLimit) + // Returning the same rate limiter + assert.Equal(t, rateLimiter, r) + } + val := 0 + incr := func() error { + val++ + return nil + } + for range 10 { + rateLimiter.Do(incr) + time.Sleep(2 * rateLimit) + } + assert.EqualValues(t, 10, val) + cancel() + // There can be a race condition where the rate limiter still emits one final tick after the context is cancelled. + // So we wait enough time to ensure that tick is "wasted". + time.Sleep(2 * rateLimit) + // Now that the rate limited was stopped (we invoked `cancel()`), its `Do()` should not invoke the function anymore. + for range 7 { + rateLimiter.Do(incr) + time.Sleep(time.Millisecond) + } + assert.EqualValues(t, 10, val) // Same "10" value as before. + { + rateLimiter := mysqlHostMetricsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + }) + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go index 08f7d408d1c..cb59547a768 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go @@ -18,8 +18,6 @@ package base import ( "context" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var ( @@ -47,6 +45,6 @@ func (m *ThreadsRunningSelfMetric) RequiresConn() bool { return true } -func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { - return ReadSelfMySQLThrottleMetric(ctx, conn, threadsRunningMetricQuery) +func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, params.Conn, threadsRunningMetricQuery) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go b/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go index 1d2d4d0652c..10020af27e6 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go +++ b/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go @@ -16,8 +16,8 @@ limitations under the License. package base -// ThrottlerMetricsPublisher is implemented by throttler.Throttler and is used by SelfMetric +// metricsPublisher is implemented by throttler.Throttler and is used by SelfMetric // implementations to query the throttler. -type ThrottlerMetricsPublisher interface { +type metricsPublisher interface { GetCustomMetricsQuery() string } diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index ccdfcb2ce23..d7f43d85e9d 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -188,9 +188,9 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, scope ba // Out of abundance of caution, we will protect against such a scenario. return } - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheck%s%sTotal", textutil.SingleWordCamel(metricScope.String()), textutil.SingleWordCamel(metricName.String())), "").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheck%s%sTotal", textutil.PascalCase(metricScope.String()), textutil.PascalCase(metricName.String())), "").Add(1) if !metricCheckResult.IsOK() { - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheck%s%sError", textutil.SingleWordCamel(metricScope.String()), textutil.SingleWordCamel(metricName.String())), "").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheck%s%sError", textutil.PascalCase(metricScope.String()), textutil.PascalCase(metricName.String())), "").Add(1) } }(metricCheckResult) } @@ -249,7 +249,7 @@ func (check *ThrottlerCheck) localCheck(ctx context.Context, aggregatedMetricNam check.throttler.markMetricHealthy(aggregatedMetricName) } if timeSinceHealthy, found := check.throttler.timeSinceMetricHealthy(aggregatedMetricName); found { - go stats.GetOrNewGauge(fmt.Sprintf("ThrottlerCheck%sSecondsSinceHealthy", textutil.SingleWordCamel(scope.String())), fmt.Sprintf("seconds since last healthy check for %v", scope)).Set(int64(timeSinceHealthy.Seconds())) + go stats.GetOrNewGauge(fmt.Sprintf("ThrottlerCheck%sSecondsSinceHealthy", textutil.PascalCase(scope.String())), fmt.Sprintf("seconds since last healthy check for %v", scope)).Set(int64(timeSinceHealthy.Seconds())) } return checkResult @@ -261,7 +261,7 @@ func (check *ThrottlerCheck) reportAggregated(aggregatedMetricName string, metri return } if value, err := metricResult.Get(); err == nil { - stats.GetOrNewGaugeFloat64(fmt.Sprintf("ThrottlerAggregated%s%s", textutil.SingleWordCamel(scope.String()), textutil.SingleWordCamel(metricName.String())), fmt.Sprintf("aggregated value for %v", scope)).Set(value) + stats.GetOrNewGaugeFloat64(fmt.Sprintf("ThrottlerAggregated%s%s", textutil.PascalCase(scope.String()), textutil.PascalCase(metricName.String())), fmt.Sprintf("aggregated value for %v", scope)).Set(value) } } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index af7f59abb7e..839ba9d43b8 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -95,7 +95,6 @@ const ( DefaultThrottleRatio = 1.0 defaultReplicationLagQuery = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat" - threadsRunningQuery = "show global status like 'threads_running'" inventoryPrefix = "inventory/" throttlerConfigPrefix = "config/" @@ -137,6 +136,7 @@ type Throttler struct { keyspace string shard string tabletAlias *topodatapb.TabletAlias + tabletInfo atomic.Pointer[topo.TabletInfo] check *ThrottlerCheck isEnabled atomic.Bool @@ -190,7 +190,7 @@ type Throttler struct { cancelEnableContext context.CancelFunc throttledAppsMutex sync.Mutex - readSelfThrottleMetrics func(context.Context) base.ThrottleMetrics // overwritten by unit test + readSelfThrottleMetrics func(context.Context, tmclient.TabletManagerClient) base.ThrottleMetrics // overwritten by unit test } // ThrottlerStatus published some status values from the throttler @@ -262,8 +262,8 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv } throttler.StoreMetricsThreshold(base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold()) - throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics { - return throttler.readSelfThrottleMetricsInternal(ctx) + throttler.readSelfThrottleMetrics = func(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics { + return throttler.readSelfThrottleMetricsInternal(ctx, tmClient) } return throttler } @@ -338,6 +338,15 @@ func (throttler *Throttler) initConfig() { // readThrottlerConfig proactively reads the throttler's config from SrvKeyspace in local topo func (throttler *Throttler) readThrottlerConfig(ctx context.Context) (*topodatapb.ThrottlerConfig, error) { + // since we're reading from topo, let's seize this opportunity to read table info as well + if throttler.tabletInfo.Load() == nil { + if ti, err := throttler.ts.GetTablet(ctx, throttler.tabletAlias); err == nil { + throttler.tabletInfo.Store(ti) + } else { + log.Errorf("Throttler: error reading tablet info: %v", err) + } + } + srvks, err := throttler.ts.GetSrvKeyspace(ctx, throttler.tabletAlias.Cell, throttler.keyspace) if err != nil { return nil, err @@ -804,7 +813,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { if throttler.IsOpen() { // frequent // Always collect self metrics: - throttler.collectSelfMetrics(ctx) + throttler.collectSelfMetrics(ctx, tmClient) if !throttler.isDormant() { throttler.collectShardMetrics(ctx, tmClient) } @@ -869,7 +878,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { }() } -func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClient tmclient.TabletManagerClient, probe *base.Probe) (probeFunc func(context.Context) base.ThrottleMetrics) { +func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, probe *base.Probe) (probeFunc func(context.Context, tmclient.TabletManagerClient) base.ThrottleMetrics) { metricsWithError := func(err error) base.ThrottleMetrics { metrics := base.ThrottleMetrics{} for _, metricName := range base.KnownMetricNames { @@ -882,7 +891,7 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie } return metrics } - return func(ctx context.Context) base.ThrottleMetrics { + return func(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics { // Some reasonable timeout, to ensure we release connections even if they're hanging (otherwise grpc-go keeps polling those connections forever) ctx, cancel := context.WithTimeout(ctx, 4*activeCollectInterval) defer cancel() @@ -940,7 +949,7 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie // readSelfThrottleMetricsInternal rreads all registsred self metrics on this tablet (or backend MySQL server). // This is the actual place where metrics are read, to be later aggregated and/or propagated to other tablets. -func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) base.ThrottleMetrics { +func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics { result := make(base.ThrottleMetrics, len(base.RegisteredSelfMetrics)) writeMetric := func(metric *base.ThrottleMetric) { select { @@ -950,15 +959,20 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) } } readMetric := func(selfMetric base.SelfMetric) *base.ThrottleMetric { - if !selfMetric.RequiresConn() { - return selfMetric.Read(ctx, throttler, nil) + params := &base.SelfMetricReadParams{ + Throttler: throttler, + TmClient: tmClient, + TabletInfo: throttler.tabletInfo.Load(), } - conn, err := throttler.pool.Get(ctx, nil) - if err != nil { - return &base.ThrottleMetric{Err: err} + if selfMetric.RequiresConn() { + conn, err := throttler.pool.Get(ctx, nil) + if err != nil { + return &base.ThrottleMetric{Err: err} + } + defer conn.Recycle() + params.Conn = conn.Conn } - defer conn.Recycle() - return selfMetric.Read(ctx, throttler, conn.Conn) + return selfMetric.Read(ctx, params) } for metricName, selfMetric := range base.RegisteredSelfMetrics { if metricName == base.DefaultMetricName { @@ -975,7 +989,7 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) return result } -func (throttler *Throttler) collectSelfMetrics(ctx context.Context) { +func (throttler *Throttler) collectSelfMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient) { probe := throttler.inventory.ClustersProbes[throttler.tabletAliasString()] if probe == nil { // probe not created yet @@ -990,7 +1004,7 @@ func (throttler *Throttler) collectSelfMetrics(ctx context.Context) { defer atomic.StoreInt64(&probe.QueryInProgress, 0) // Throttler is probing its own tablet's metrics: - _ = base.ReadThrottleMetrics(ctx, probe, throttler.readSelfThrottleMetrics) + _ = base.ReadThrottleMetrics(ctx, probe, tmClient, throttler.readSelfThrottleMetrics) }() } @@ -1011,9 +1025,9 @@ func (throttler *Throttler) collectShardMetrics(ctx context.Context, tmClient tm defer atomic.StoreInt64(&probe.QueryInProgress, 0) // Throttler probing other tablets: - throttleMetricFunc := throttler.generateTabletProbeFunction(base.ShardScope, tmClient, probe) + throttleMetricFunc := throttler.generateTabletProbeFunction(base.ShardScope, probe) - throttleMetrics := base.ReadThrottleMetrics(ctx, probe, throttleMetricFunc) + throttleMetrics := base.ReadThrottleMetrics(ctx, probe, tmClient, throttleMetricFunc) for _, metric := range throttleMetrics { select { case <-ctx.Done(): diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index fd7921899da..0a1162b02d3 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -71,6 +71,18 @@ var ( Value: 2.718, Err: nil, }, + base.MysqldLoadAvgMetricName: &base.ThrottleMetric{ + Scope: base.SelfScope, + Alias: "", + Value: 0.3311, + Err: nil, + }, + base.MysqldDatadirUsedRatioMetricName: &base.ThrottleMetric{ + Scope: base.SelfScope, + Alias: "", + Value: 0.85, + Err: nil, + }, } replicaMetrics = map[string]*MetricResult{ base.LagMetricName.String(): { @@ -93,6 +105,16 @@ var ( ResponseCode: tabletmanagerdatapb.CheckThrottlerResponseCode_OK, Value: 5.1, }, + base.MysqldLoadAvgMetricName.String(): { + StatusCode: http.StatusOK, + ResponseCode: tabletmanagerdatapb.CheckThrottlerResponseCode_OK, + Value: 0.2211, + }, + base.MysqldDatadirUsedRatioMetricName.String(): { + StatusCode: http.StatusOK, + ResponseCode: tabletmanagerdatapb.CheckThrottlerResponseCode_OK, + Value: 0.87, + }, } nonPrimaryTabletType atomic.Int32 ) @@ -283,7 +305,7 @@ func newTestThrottler() *Throttler { throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval) throttler.recentCheckDiff = int64(3 * time.Second / recentCheckRateLimiterInterval) - throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics { + throttler.readSelfThrottleMetrics = func(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics { for _, metric := range selfMetrics { go func() { select { @@ -1827,10 +1849,12 @@ func TestChecks(t *testing.T) { assert.Equal(t, testAppName.String(), checkResult.AppName) assert.Equal(t, len(base.KnownMetricNames), len(checkResult.Metrics)) - assert.EqualValues(t, 0.3, checkResult.Metrics[base.LagMetricName.String()].Value) // self lag value, because flags.Scope is set - assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // self value, because flags.Scope is set - assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // self value, because flags.Scope is set - assert.EqualValues(t, 2.718, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // self value, because flags.Scope is set + assert.EqualValues(t, 0.3, checkResult.Metrics[base.LagMetricName.String()].Value) // self lag value, because flags.Scope is set + assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // self value, because flags.Scope is set + assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // self value, because flags.Scope is set + assert.EqualValues(t, 2.718, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // self value, because flags.Scope is set + assert.EqualValues(t, 0.3311, checkResult.Metrics[base.MysqldLoadAvgMetricName.String()].Value) // self value, because flags.Scope is set + assert.EqualValues(t, 0.85, checkResult.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value) // self value, because flags.Scope is set for _, metric := range checkResult.Metrics { assert.EqualValues(t, base.SelfScope.String(), metric.Scope) } @@ -1886,10 +1910,12 @@ func TestChecks(t *testing.T) { assert.Equal(t, testAppName.String(), checkResult.AppName) assert.Equal(t, len(base.KnownMetricNames), len(checkResult.Metrics)) - assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, because flags.Scope is set - assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // shard value, because flags.Scope is set - assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // shard value, because flags.Scope is set - assert.EqualValues(t, 5.1, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // shard value, because flags.Scope is set + assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, because flags.Scope is set + assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // shard value, because flags.Scope is set + assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // shard value, because flags.Scope is set + assert.EqualValues(t, 5.1, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // shard value, because flags.Scope is set + assert.EqualValues(t, 0.3311, checkResult.Metrics[base.MysqldLoadAvgMetricName.String()].Value) // shard value, because flags.Scope is set + assert.EqualValues(t, 0.87, checkResult.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value) // shard value, because flags.Scope is set for _, metric := range checkResult.Metrics { assert.EqualValues(t, base.ShardScope.String(), metric.Scope) } @@ -1918,14 +1944,18 @@ func TestChecks(t *testing.T) { assert.ErrorIs(t, checkResult.Error, base.ErrThresholdExceeded) assert.Equal(t, len(base.KnownMetricNames), len(checkResult.Metrics)) - assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, because "shard" is the default scope for lag - assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // self value, because "self" is the default scope for threads_running - assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // self value, because "self" is the default scope for custom - assert.EqualValues(t, 2.718, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // self value, because "self" is the default scope for loadavg + assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, because "shard" is the default scope for lag + assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // self value, because "self" is the default scope for threads_running + assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // self value, because "self" is the default scope for custom + assert.EqualValues(t, 2.718, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // self value, because "self" is the default scope for loadavg + assert.EqualValues(t, 0.3311, checkResult.Metrics[base.MysqldLoadAvgMetricName.String()].Value) // self value, because "self" is the default scope for loadavg + assert.EqualValues(t, 0.85, checkResult.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value) // self value, because "self" is the default scope for loadavg assert.EqualValues(t, base.ShardScope.String(), checkResult.Metrics[base.LagMetricName.String()].Scope) assert.EqualValues(t, base.SelfScope.String(), checkResult.Metrics[base.ThreadsRunningMetricName.String()].Scope) assert.EqualValues(t, base.SelfScope.String(), checkResult.Metrics[base.CustomMetricName.String()].Scope) assert.EqualValues(t, base.SelfScope.String(), checkResult.Metrics[base.LoadAvgMetricName.String()].Scope) + assert.EqualValues(t, base.SelfScope.String(), checkResult.Metrics[base.MysqldLoadAvgMetricName.String()].Scope) + assert.EqualValues(t, base.SelfScope.String(), checkResult.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Scope) }) }) t.Run("checks, defined scope masks explicit scope metrics", func(t *testing.T) { @@ -1939,6 +1969,8 @@ func TestChecks(t *testing.T) { base.MetricName("self/threads_running"), base.MetricName("custom"), base.MetricName("shard/loadavg"), + base.MetricName("shard/mysqld-loadavg"), + base.MetricName("self/mysqld-datadir-used-ratio"), base.MetricName("default"), } checkResult := throttler.Check(ctx, testAppName.String(), metricNames, flags) @@ -1950,10 +1982,12 @@ func TestChecks(t *testing.T) { assert.ErrorIs(t, checkResult.Error, base.ErrThresholdExceeded) assert.Equal(t, len(metricNames), len(checkResult.Metrics)) - assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, even though scope name is in metric name - assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // shard value, even though scope name is in metric name - assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // shard value because flags.Scope is set - assert.EqualValues(t, 5.1, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // shard value, not because scope name is in metric name but because flags.Scope is set + assert.EqualValues(t, 0.9, checkResult.Metrics[base.LagMetricName.String()].Value) // shard lag value, even though scope name is in metric name + assert.EqualValues(t, 26, checkResult.Metrics[base.ThreadsRunningMetricName.String()].Value) // shard value, even though scope name is in metric name + assert.EqualValues(t, 17, checkResult.Metrics[base.CustomMetricName.String()].Value) // shard value because flags.Scope is set + assert.EqualValues(t, 5.1, checkResult.Metrics[base.LoadAvgMetricName.String()].Value) // shard value, not because scope name is in metric name but because flags.Scope is set + assert.EqualValues(t, 0.3311, checkResult.Metrics[base.MysqldLoadAvgMetricName.String()].Value) // shard value, not because scope name is in metric name but because flags.Scope is set + assert.EqualValues(t, 0.87, checkResult.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value) // shard value, even though scope name is in metric name for _, metric := range checkResult.Metrics { assert.EqualValues(t, base.ShardScope.String(), metric.Scope) } @@ -2222,8 +2256,12 @@ func TestReplica(t *testing.T) { base.DefaultMetricName: assert.Error(t, metricResult.Error, "metricName=%v, value=%v, threshold=%v", metricName, metricResult.Value, metricResult.Threshold) assert.ErrorIs(t, metricResult.Error, base.ErrThresholdExceeded) - case base.ThreadsRunningMetricName: + case base.ThreadsRunningMetricName, + base.MysqldLoadAvgMetricName, + base.MysqldDatadirUsedRatioMetricName: assert.NoError(t, metricResult.Error, "metricName=%v, value=%v, threshold=%v", metricName, metricResult.Value, metricResult.Threshold) + default: + assert.Fail(t, "unexpected metric", "name=%v", metricName) } } })