Skip to content

Commit

Permalink
sqlstats: add new MaybeFlushWithDrainer method to persistedSqlStats
Browse files Browse the repository at this point in the history
Adds a new SSDrainer interface that PersistedSQLStats uses to flush
sql stats.

Instead of calling ConsumeStats on sslocal.SQLStats, persistedsqlstats
will use a provided SSDrainer to either DrainStats or Reset.

The ConsumeStats method on sslocal.SQLStats has been removed and its
logic has been redistributed to persistedsqlstats, as that was the
only consumer of the method. Since all the logic now lives in
persistedsqlstats, the flush related functions can be called
directly instead of relying on nested callbacks.

In addition to the new interface, this commit updates the flush
logic to upsert multiple rows in a single upsert statement,
determined by `sql.stats.flush.batch_size`.

Epic: CRDB-45771
Release note: None
  • Loading branch information
kyle-a-wong committed Feb 21, 2025
1 parent a610010 commit afee562
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 385 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/contention",
"//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/httputil",
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,16 @@ func testResetSQLStatsRPCForTenant(
}

if flushed {
testTenant.TenantSQLStats().MaybeFlush(ctx, testTenant.GetTenant().AppStopper())
controlCluster.TenantSQLStats(serverccl.RandomServer).MaybeFlush(ctx, controlCluster.Tenant(0).GetTenant().AppStopper())
testTenantServer := testTenant.TenantSQLServer()
testTenantServer.GetSQLStatsProvider().MaybeFlush(
ctx,
testTenant.GetTenant().AppStopper(),
)
randomTenantServer := controlCluster.TenantSQLServer(serverccl.RandomServer)
randomTenantServer.GetSQLStatsProvider().MaybeFlush(
ctx,
controlCluster.Tenant(0).GetTenant().AppStopper(),
)
}

status := testTenant.TenantStatusSrv()
Expand Down
19 changes: 9 additions & 10 deletions pkg/ccl/serverccl/tenant_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
Expand All @@ -41,7 +40,7 @@ type testTenant struct {
tenantConn *gosql.DB
tenantDB *sqlutils.SQLRunner
tenantStatus serverpb.SQLStatusServer
tenantSQLStats *persistedsqlstats.PersistedSQLStats
tenantSQLServer *sql.Server
tenantContentionRegistry *contention.Registry
}

Expand All @@ -53,8 +52,8 @@ func (h *testTenant) GetTenantConn() *sqlutils.SQLRunner {
return h.tenantDB
}

func (h *testTenant) TenantSQLStats() *persistedsqlstats.PersistedSQLStats {
return h.tenantSQLStats
func (h *testTenant) TenantSQLServer() *sql.Server {
return h.tenantSQLServer
}

func (h *testTenant) TenantStatusSrv() serverpb.SQLStatusServer {
Expand All @@ -78,7 +77,7 @@ type TestTenant interface {
GetTenant() serverutils.ApplicationLayerInterface
GetTenantDB() *gosql.DB
GetTenantConn() *sqlutils.SQLRunner
TenantSQLStats() *persistedsqlstats.PersistedSQLStats
TenantSQLServer() *sql.Server
TenantStatusSrv() serverpb.SQLStatusServer
TenantContentionRegistry() *contention.Registry
GetRPCContext() *rpc.Context
Expand All @@ -95,15 +94,15 @@ func newTestTenant(
tenant, tenantConn := serverutils.StartTenant(t, server, args)
sqlDB := sqlutils.MakeSQLRunner(tenantConn)
status := tenant.StatusServer().(serverpb.SQLStatusServer)
sqlStats := tenant.SQLServer().(*sql.Server).GetSQLStatsProvider()
sqlServer := tenant.SQLServer().(*sql.Server)
contentionRegistry := tenant.ExecutorConfig().(sql.ExecutorConfig).ContentionRegistry

return &testTenant{
tenant: tenant,
tenantConn: tenantConn,
tenantDB: sqlDB,
tenantStatus: status,
tenantSQLStats: sqlStats,
tenantSQLServer: sqlServer,
tenantContentionRegistry: contentionRegistry,
}
}
Expand Down Expand Up @@ -197,7 +196,7 @@ type TenantClusterHelper interface {
TenantDB(idx serverIdx) *gosql.DB
TenantHTTPClient(t *testing.T, idx serverIdx, isAdmin bool) *httpClient
TenantAdminHTTPClient(t *testing.T, idx serverIdx) *httpClient
TenantSQLStats(idx serverIdx) *persistedsqlstats.PersistedSQLStats
TenantSQLServer(idx serverIdx) *sql.Server
TenantStatusSrv(idx serverIdx) serverpb.SQLStatusServer
TenantContentionRegistry(idx serverIdx) *contention.Registry
Cleanup(t *testing.T)
Expand Down Expand Up @@ -249,8 +248,8 @@ func (c tenantCluster) TenantAdminHTTPClient(t *testing.T, idx serverIdx) *httpC
return c.TenantHTTPClient(t, idx, true /* isAdmin */)
}

func (c tenantCluster) TenantSQLStats(idx serverIdx) *persistedsqlstats.PersistedSQLStats {
return c.Tenant(idx).TenantSQLStats()
func (c tenantCluster) TenantSQLServer(idx serverIdx) *sql.Server {
return c.Tenant(idx).TenantSQLServer()
}

func (c tenantCluster) TenantStatusSrv(idx serverIdx) serverpb.SQLStatusServer {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ go_library(
"//pkg/sql/appstatspb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/parser/statements",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
Expand Down Expand Up @@ -94,6 +92,7 @@ go_test(
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatstestutil",
"//pkg/sql/sqlstats/sslocal",
"//pkg/sql/sqlstats/ssmemstorage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/sqlstats/persistedsqlstats/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,15 @@ func BenchmarkSqlStatsMaxFlushTime(b *testing.B) {
defer s.Stopper().Stop(ctx)
sqlConn := sqlutils.MakeSQLRunner(conn)

sqlStats := s.SQLServer().(*sql.Server).GetSQLStatsProvider()
sqlStats := s.SQLServer().(*sql.Server).GetLocalSQLStatsProvider()
pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider()
controller := s.SQLServer().(*sql.Server).GetSQLStatsController()
stmtFingerprintLimit := sqlstats.MaxMemSQLStatsStmtFingerprints.Get(&s.ClusterSettings().SV)
txnFingerprintLimit := sqlstats.MaxMemSQLStatsTxnFingerprints.Get(&s.ClusterSettings().SV)

// Fills the in-memory stats for the 'bench' application until the fingerprint limit is reached.
fillBenchAppMemStats := func() {
appContainer := sqlStats.SQLStats.GetApplicationStats("bench")
appContainer := sqlStats.GetApplicationStats("bench")
mockStmtValue := sqlstats.RecordedStmtStats{}
for i := int64(1); i <= stmtFingerprintLimit; i++ {
stmtKey := appstatspb.StatementStatisticsKey{
Expand Down Expand Up @@ -436,7 +437,7 @@ func BenchmarkSqlStatsMaxFlushTime(b *testing.B) {
require.NoError(b, controller.ResetClusterSQLStats(ctx))
fillBenchAppMemStats()
b.StartTimer()
require.True(b, sqlStats.MaybeFlush(ctx, s.AppStopper()))
require.True(b, pss.MaybeFlush(ctx, s.AppStopper()))
b.StopTimer()
}
})
Expand All @@ -448,7 +449,7 @@ func BenchmarkSqlStatsMaxFlushTime(b *testing.B) {
for i := 0; i < b.N; i++ {
fillBenchAppMemStats()
b.StartTimer()
require.True(b, sqlStats.MaybeFlush(ctx, s.AppStopper()))
require.True(b, pss.MaybeFlush(ctx, s.AppStopper()))
b.StopTimer()
}
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting(
settings.NonNegativeDurationWithMaximum(time.Hour*24),
settings.WithPublic)

// SQLStatsFlushBatchSize is the cluster setting that controls how many
// rows are inserted per upsert during a sql stats flush.
var SQLStatsFlushBatchSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.stats.flush.batch_size",
"the number of rows to flush per upsert.",
50,
settings.NonNegativeInt)

// MinimumInterval is the cluster setting that controls the minimum interval
// between each flush operation. If flush operations get triggered faster
// than what is allowed by this setting, (e.g. when too many fingerprints are
Expand Down
Loading

0 comments on commit afee562

Please sign in to comment.