From 9d04b4704b144bb1095ca29a23c6ef4bb3576233 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 4 Dec 2023 15:12:09 -0500 Subject: [PATCH 1/4] sqlliveness: refactor out settings into slbase For the sqlliveness package to adopt the regionliveness package, we need to eliminate a circular dependency between the two. Currently, the region liveness requires to access the default TTL and heartbeat settings inside sqlliveness. To address this, this patch will eliminate this cycle by moving these settings into a subpackage called slbase. Release note: None --- pkg/BUILD.bazel | 1 + pkg/ccl/backupccl/BUILD.bazel | 2 +- pkg/ccl/backupccl/backup_tenant_test.go | 4 +-- .../kvccl/kvtenantccl/upgradeccl/BUILD.bazel | 2 +- .../upgradeccl/tenant_upgrade_test.go | 6 ++-- .../upgradeinterlockccl/BUILD.bazel | 2 +- .../local_test_util_test.go | 6 ++-- .../tenantcostclient/BUILD.bazel | 2 +- .../tenantcostclient/tenant_side_test.go | 4 +-- pkg/ccl/testccl/sqlccl/BUILD.bazel | 2 +- .../testccl/sqlccl/temp_table_clean_test.go | 6 ++-- pkg/kv/kvserver/gc/BUILD.bazel | 2 +- pkg/kv/kvserver/gc/gc_int_test.go | 4 +-- pkg/sql/regionliveness/BUILD.bazel | 2 +- pkg/sql/regionliveness/prober.go | 6 ++-- pkg/sql/sqlliveness/slbase/BUILD.bazel | 9 +++++ pkg/sql/sqlliveness/slbase/slbase.go | 36 +++++++++++++++++++ pkg/sql/sqlliveness/slinstance/BUILD.bazel | 3 +- pkg/sql/sqlliveness/slinstance/slinstance.go | 25 ++----------- .../sqlliveness/slinstance/slinstance_test.go | 9 ++--- 20 files changed, 81 insertions(+), 52 deletions(-) create mode 100644 pkg/sql/sqlliveness/slbase/BUILD.bazel create mode 100644 pkg/sql/sqlliveness/slbase/slbase.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 44c0c7ee9f9c..16f170731dd5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2154,6 +2154,7 @@ GO_TARGETS = [ "//pkg/sql/sqlitelogictest/tests/local-vec-off:local-vec-off_test", "//pkg/sql/sqlitelogictest/tests/local:local_test", "//pkg/sql/sqlitelogictest:sqlitelogictest", + "//pkg/sql/sqlliveness/slbase:slbase", "//pkg/sql/sqlliveness/slinstance:slinstance", "//pkg/sql/sqlliveness/slinstance:slinstance_test", "//pkg/sql/sqlliveness/slprovider:slprovider", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 78db59d2f59d..d91702b18268 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -291,7 +291,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/storage", "//pkg/testutils", diff --git a/pkg/ccl/backupccl/backup_tenant_test.go b/pkg/ccl/backupccl/backup_tenant_test.go index d65c45ea8827..54bab8eb219c 100644 --- a/pkg/ccl/backupccl/backup_tenant_test.go +++ b/pkg/ccl/backupccl/backup_tenant_test.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" _ "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -101,7 +101,7 @@ func TestBackupSharedProcessTenantNodeDown(t *testing.T) { // for instance-based planning to recognize the downed node. sv := &tenantApp.ClusterSettings().SV padding := 10 * time.Second - timeout := slinstance.DefaultTTL.Get(sv) + slinstance.DefaultHeartBeat.Get(sv) + padding + timeout := slbase.DefaultTTL.Get(sv) + slbase.DefaultHeartBeat.Get(sv) + padding testutils.SucceedsWithin(t, func() error { _, err := tenantDB.Exec("BACKUP INTO 'nodelocal://1/worker-failure'") return err diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel index 558e1f699cee..60114ef4b944 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel @@ -25,7 +25,7 @@ go_test( "//pkg/spanconfig", "//pkg/sql/sem/eval", "//pkg/sql/sqlinstance/instancestorage", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go index 6e38590146c5..cddd30b0479c 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -448,8 +448,8 @@ func TestTenantUpgradeFailure(t *testing.T) { // Shorten the reclaim loop so that terminated SQL servers don't block // the upgrade from succeeding. instancestorage.ReclaimLoopInterval.Override(ctx, &settings.SV, 250*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 15*time.Second) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 500*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 15*time.Second) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 500*time.Millisecond) tenantStopper := stop.NewStopper() // Initialize the version to the minimum it could be. require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV)) diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel index af38c14e5023..89e5a41a018e 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel @@ -30,7 +30,7 @@ go_test( "//pkg/sql/catalog/lease", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlinstance/instancestorage", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/testutils/serverutils", "//pkg/testutils/skip", # keep diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go index 6feb7b946f63..85780de45b03 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -114,8 +114,8 @@ func runTest(t *testing.T, variant sharedtestutil.TestVariant, test sharedtestut ttlOverride *= 10 } heartbeatOverride := ttlOverride / 10 - slinstance.DefaultTTL.Override(ctx, &s.SV, ttlOverride) - slinstance.DefaultHeartBeat.Override(ctx, &s.SV, heartbeatOverride) + slbase.DefaultTTL.Override(ctx, &s.SV, ttlOverride) + slbase.DefaultHeartBeat.Override(ctx, &s.SV, heartbeatOverride) } // Initialize the version to the MinSupportedVersion so that we can perform diff --git a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel index a72ea80394ab..de5fa8eb0ace 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel +++ b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel @@ -86,7 +86,7 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/sem/eval", "//pkg/sql/sqlliveness", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/testutils", "//pkg/testutils/datapathutils", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index f66ff1be82d6..bd32b22431df 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -48,7 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" @@ -1024,7 +1024,7 @@ func TestSQLLivenessExemption(t *testing.T) { st := cluster.MakeTestingClusterSettings() // Make the tenant heartbeat like crazy. ctx := context.Background() - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) _, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ TenantID: tenantID, diff --git a/pkg/ccl/testccl/sqlccl/BUILD.bazel b/pkg/ccl/testccl/sqlccl/BUILD.bazel index 5b6affecd614..837ff875ced1 100644 --- a/pkg/ccl/testccl/sqlccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlccl/BUILD.bazel @@ -44,7 +44,7 @@ go_test( "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/sessiondatapb", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqltestutils", "//pkg/sql/tests", "//pkg/testutils", diff --git a/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go b/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go index 6ee627eedeca..f385a61ccefc 100644 --- a/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go +++ b/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go @@ -16,7 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -104,8 +104,8 @@ func TestTenantTempTableCleanup(t *testing.T) { sql.TempObjectWaitInterval.Override(ctx, &st.SV, time.Second*0) // Set up sessions to expire within 5 seconds of a // nodes death. - slinstance.DefaultTTL.Override(ctx, &st.SV, 5*time.Second) - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Second) + slbase.DefaultTTL.Override(ctx, &st.SV, 5*time.Second) + slbase.DefaultHeartBeat.Override(ctx, &st.SV, time.Second) return st } diff --git a/pkg/kv/kvserver/gc/BUILD.bazel b/pkg/kv/kvserver/gc/BUILD.bazel index 85164225f186..0d5df75f935b 100644 --- a/pkg/kv/kvserver/gc/BUILD.bazel +++ b/pkg/kv/kvserver/gc/BUILD.bazel @@ -64,7 +64,7 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/settings/cluster", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", diff --git a/pkg/kv/kvserver/gc/gc_int_test.go b/pkg/kv/kvserver/gc/gc_int_test.go index b460fb16b729..deb67b76986d 100644 --- a/pkg/kv/kvserver/gc/gc_int_test.go +++ b/pkg/kv/kvserver/gc/gc_int_test.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -86,7 +86,7 @@ func TestEndToEndGC(t *testing.T) { settings := cluster.MakeTestingClusterSettings() // Push the TTL up to 60 hours since we emulate a 50 hours // clock jump below. - slinstance.DefaultTTL.Override(ctx, &settings.SV, 60*time.Hour) + slbase.DefaultTTL.Override(ctx, &settings.SV, 60*time.Hour) manualClock := hlc.NewHybridManualClock() s, appSqlDb, appKvDb := serverutils.StartServer(t, base.TestServerArgs{ diff --git a/pkg/sql/regionliveness/BUILD.bazel b/pkg/sql/regionliveness/BUILD.bazel index 58ce56f70c6b..91463cfdafe7 100644 --- a/pkg/sql/regionliveness/BUILD.bazel +++ b/pkg/sql/regionliveness/BUILD.bazel @@ -17,7 +17,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/types", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/regionliveness/prober.go b/pkg/sql/regionliveness/prober.go index 7c8d33dc7dac..406938c57c09 100644 --- a/pkg/sql/regionliveness/prober.go +++ b/pkg/sql/regionliveness/prober.go @@ -26,7 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -138,8 +138,8 @@ SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_in // Region has gone down, set the unavailable_at time on it return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - defaultTTL := slinstance.DefaultTTL.Get(&l.settings.SV) - defaultHeartbeat := slinstance.DefaultHeartBeat.Get(&l.settings.SV) + defaultTTL := slbase.DefaultTTL.Get(&l.settings.SV) + defaultHeartbeat := slbase.DefaultHeartBeat.Get(&l.settings.SV) // Get the read timestamp and pick a commit deadline. readTS := txn.KV().ReadTimestamp().AddDuration(defaultHeartbeat) txnTS := readTS.AddDuration(defaultTTL) diff --git a/pkg/sql/sqlliveness/slbase/BUILD.bazel b/pkg/sql/sqlliveness/slbase/BUILD.bazel new file mode 100644 index 000000000000..0cdb203b6169 --- /dev/null +++ b/pkg/sql/sqlliveness/slbase/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "slbase", + srcs = ["slbase.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase", + visibility = ["//visibility:public"], + deps = ["//pkg/settings"], +) diff --git a/pkg/sql/sqlliveness/slbase/slbase.go b/pkg/sql/sqlliveness/slbase/slbase.go new file mode 100644 index 000000000000..e454c408beb4 --- /dev/null +++ b/pkg/sql/sqlliveness/slbase/slbase.go @@ -0,0 +1,36 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package slbase + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +var ( + // DefaultTTL specifies the time to expiration when a session is created. + DefaultTTL = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "server.sqlliveness.ttl", + "default sqlliveness session ttl", + 40*time.Second, + settings.NonNegativeDuration, + ) + // DefaultHeartBeat specifies the period between attempts to extend a session. + DefaultHeartBeat = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "server.sqlliveness.heartbeat", + "duration heart beats to push session expiration further out in time", + 5*time.Second, + settings.NonNegativeDuration, + ) +) diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 54a1c7f41f58..e95ab9cc4784 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -6,10 +6,10 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance", visibility = ["//visibility:public"], deps = [ - "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/enum", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/util/grpcutil", "//pkg/util/hlc", @@ -38,6 +38,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/enum", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/testutils", "//pkg/util/hlc", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 324b47148216..56f010474013 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -19,10 +19,10 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -36,25 +36,6 @@ import ( "github.com/cockroachdb/logtags" ) -var ( - // DefaultTTL specifies the time to expiration when a session is created. - DefaultTTL = settings.RegisterDurationSetting( - settings.ApplicationLevel, - "server.sqlliveness.ttl", - "default sqlliveness session ttl", - 40*time.Second, - settings.NonNegativeDuration, - ) - // DefaultHeartBeat specifies the period between attempts to extend a session. - DefaultHeartBeat = settings.RegisterDurationSetting( - settings.ApplicationLevel, - "server.sqlliveness.heartbeat", - "duration heart beats to push session expiration further out in time", - 5*time.Second, - settings.NonNegativeDuration, - ) -) - // Writer provides interactions with the storage of session records. type Writer interface { // Insert stores the input Session. @@ -413,10 +394,10 @@ func NewSQLInstance( stopper: stopper, sessionEvents: sessionEvents, ttl: func() time.Duration { - return DefaultTTL.Get(&settings.SV) + return slbase.DefaultTTL.Get(&settings.SV) }, hb: func() time.Duration { - return DefaultHeartBeat.Get(&settings.SV) + return slbase.DefaultHeartBeat.Get(&settings.SV) }, drain: make(chan struct{}), } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 9e0ee16ffa17..c85e65a414c2 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -44,8 +45,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.Latest.Version(), clusterversion.MinSupported.Version(), true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil) @@ -113,8 +114,8 @@ func TestSQLInstanceRelease(t *testing.T) { clusterversion.Latest.Version(), clusterversion.MinSupported.Version(), true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() var ambientCtx log.AmbientContext From e5f7c6a3ae6f9715ef21b2c6c18b24b635eff2f8 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 4 Dec 2023 15:12:37 -0500 Subject: [PATCH 2/4] regionliveness: adopt using KV apis for reading liveness Previously, the region liveness interfaces used internal executors for reading and writing to the region_livness table. This was effective in cases where we had access to internal executors like leasing but broke down for lower level code that needs to adopt these concepts like sqlliveness. This patch, moves the region liveness logic to use KV API to encode / decode rows as required. Release note: None --- pkg/ccl/multiregionccl/BUILD.bazel | 2 +- pkg/ccl/multiregionccl/regionliveness_test.go | 44 +++--- pkg/sql/catalog/descs/txn.go | 6 +- pkg/sql/catalog/lease/count.go | 4 +- pkg/sql/catalog/lease/lease.go | 4 +- pkg/sql/conn_executor_exec.go | 1 + pkg/sql/regionliveness/BUILD.bazel | 10 +- pkg/sql/regionliveness/prober.go | 146 +++++++++++++----- 8 files changed, 149 insertions(+), 68 deletions(-) diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index f564bc5a0d94..d70c91932e7d 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -86,7 +86,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", "//pkg/testutils", diff --git a/pkg/ccl/multiregionccl/regionliveness_test.go b/pkg/ccl/multiregionccl/regionliveness_test.go index 106f0c9f48c4..c0445a6e46a9 100644 --- a/pkg/ccl/multiregionccl/regionliveness_test.go +++ b/pkg/ccl/multiregionccl/regionliveness_test.go @@ -32,7 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" "github.com/cockroachdb/cockroach/pkg/sql/regions" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -65,7 +65,7 @@ func TestRegionLivenessProber(t *testing.T) { makeSettings := func() *cluster.Settings { cs := cluster.MakeTestingClusterSettings() instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) + slbase.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true) return cs } @@ -88,23 +88,19 @@ func TestRegionLivenessProber(t *testing.T) { var tenants []serverutils.ApplicationLayerInterface var tenantSQL []*gosql.DB blockProbeQuery := atomic.Bool{} + defer regionliveness.TestingSetProbeLivenessTimeout(500*time.Millisecond, + func() { + // Timeout attempts to probe intentionally. + if blockProbeQuery.Swap(false) { + time.Sleep(2 * time.Second) + } + })() for _, s := range testCluster.Servers { tenantArgs := base.TestTenantArgs{ Settings: makeSettings(), TenantID: id, Locality: s.Locality(), - TestingKnobs: base.TestingKnobs{ - SQLExecutor: &sql.ExecutorTestingKnobs{ - BeforeExecute: func(ctx context.Context, stmt string, descriptors *descs.Collection) { - const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region" - if strings.Contains(stmt, probeQuery) && blockProbeQuery.Swap(false) { - // Timeout this query intentionally. - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - } - }, - }, - }, } ts, tenantDB := serverutils.StartTenant(t, s, tenantArgs) tenants = append(tenants, ts) @@ -126,7 +122,7 @@ func TestRegionLivenessProber(t *testing.T) { cachedRegionProvider, err = regions.NewCachedDatabaseRegions(ctx, tenants[0].DB(), tenants[0].LeaseManager().(*lease.Manager)) require.NoError(t, err) idb := tenants[0].InternalDB().(isql.DB) - regionProber := regionliveness.NewLivenessProber(idb, cachedRegionProvider, tenants[0].ClusterSettings()) + regionProber := regionliveness.NewLivenessProber(idb.KV(), tenants[0].Codec(), cachedRegionProvider, tenants[0].ClusterSettings()) // Validates the expected regions versus the region liveness set. checkExpectedRegions := func(expectedRegions []string, regions regionliveness.LiveRegions) { require.Equalf(t, len(regions), len(expectedRegions), @@ -211,7 +207,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { makeSettings := func() *cluster.Settings { cs := cluster.MakeTestingClusterSettings() instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) + slbase.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true) return cs } @@ -222,6 +218,16 @@ func TestRegionLivenessProberForLeases(t *testing.T) { "us-west", } detectLeaseWait := atomic.Bool{} + targetCount := atomic.Int64{} + defer regionliveness.TestingSetProbeLivenessTimeout(1*time.Second, func() { + if !detectLeaseWait.Load() { + return + } + time.Sleep(time.Second * 2) + targetCount.Swap(0) + detectLeaseWait.Swap(false) + })() + testCluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t, expectedRegions, 1, @@ -234,7 +240,6 @@ func TestRegionLivenessProberForLeases(t *testing.T) { var tenants []serverutils.ApplicationLayerInterface var tenantSQL []*gosql.DB - targetCount := atomic.Int64{} for _, s := range testCluster.Servers { tenantArgs := base.TestTenantArgs{ @@ -247,18 +252,13 @@ func TestRegionLivenessProberForLeases(t *testing.T) { if !detectLeaseWait.Load() { return } - const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.public.crdb_internal_region" const leaseQuery = "SELECT count(1) FROM system.public.lease AS OF SYSTEM TIME" // Fail intentionally, when we go to probe the first region. if strings.Contains(stmt, leaseQuery) { if targetCount.Add(1) != 1 { return } - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - } else if strings.Contains(stmt, probeQuery) { - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - targetCount.Swap(0) - detectLeaseWait.Swap(false) + time.Sleep(time.Second * 2) } }, }, diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index d0c69f24c222..dec85e8ac5bd 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -56,6 +57,7 @@ func CheckTwoVersionInvariant( ctx context.Context, clock *hlc.Clock, db isql.DB, + codec keys.SQLCodec, descsCol *Collection, regions regionliveness.CachedDatabaseRegions, settings *clustersettings.Settings, @@ -92,7 +94,7 @@ func CheckTwoVersionInvariant( // transaction ends up committing then there won't have been any created // in the meantime. count, err := lease.CountLeases( - ctx, db, regions, settings, withNewVersion, txn.ProvisionalCommitTimestamp(), false, /*forAnyVersion*/ + ctx, db, codec, regions, settings, withNewVersion, txn.ProvisionalCommitTimestamp(), false, /*forAnyVersion*/ ) if err != nil { return err @@ -119,7 +121,7 @@ func CheckTwoVersionInvariant( for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { // Use the current clock time. now := clock.Now() - count, err := lease.CountLeases(ctx, db, regions, settings, withNewVersion, now, false /*forAnyVersion*/) + count, err := lease.CountLeases(ctx, db, codec, regions, settings, withNewVersion, now, false /*forAnyVersion*/) if err != nil { return err } diff --git a/pkg/sql/catalog/lease/count.go b/pkg/sql/catalog/lease/count.go index dfa7c3f9f329..ca562c85080c 100644 --- a/pkg/sql/catalog/lease/count.go +++ b/pkg/sql/catalog/lease/count.go @@ -15,6 +15,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/keys" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" @@ -31,6 +32,7 @@ import ( func CountLeases( ctx context.Context, db isql.DB, + codec keys.SQLCodec, cachedDatabaseRegions regionliveness.CachedDatabaseRegions, settings *clustersettings.Settings, versions []IDVersion, @@ -55,7 +57,7 @@ func CountLeases( if err := txn.KV().SetFixedTimestamp(ctx, at); err != nil { return err } - prober := regionliveness.NewLivenessProber(db, cachedDatabaseRegions, settings) + prober := regionliveness.NewLivenessProber(db.KV(), codec, cachedDatabaseRegions, settings) regionMap, err := prober.QueryLiveness(ctx, txn.KV()) if err != nil { return err diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 85e967ab3c8c..7aada4eaf741 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -134,7 +134,7 @@ func (m *Manager) WaitForNoVersion( } for lastCount, r := 0, retry.Start(retryOpts); r.Next(); { now := m.storage.clock.Now() - count, err := CountLeases(ctx, m.storage.db, cachedDatabaseRegions, m.settings, versions, now, true /*forAnyVersion*/) + count, err := CountLeases(ctx, m.storage.db, m.Codec(), cachedDatabaseRegions, m.settings, versions, now, true /*forAnyVersion*/) if err != nil { return err } @@ -198,7 +198,7 @@ func (m *Manager) WaitForOneVersion( // version of the descriptor. now := m.storage.clock.Now() descs := []IDVersion{NewIDVersionPrev(desc.GetName(), desc.GetID(), desc.GetVersion())} - count, err := CountLeases(ctx, m.storage.db, regions, m.settings, descs, now, false /*forAnyVersion*/) + count, err := CountLeases(ctx, m.storage.db, m.Codec(), regions, m.settings, descs, now, false /*forAnyVersion*/) if err != nil { return nil, err } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4085db8a8fdb..5bff8e2bc73b 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1340,6 +1340,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) ctx, ex.server.cfg.Clock, ex.server.cfg.InternalDB, + ex.server.cfg.Codec, ex.extraTxnState.descCollection, regionCache, ex.server.cfg.Settings, diff --git a/pkg/sql/regionliveness/BUILD.bazel b/pkg/sql/regionliveness/BUILD.bazel index 91463cfdafe7..a6e60123a8ae 100644 --- a/pkg/sql/regionliveness/BUILD.bazel +++ b/pkg/sql/regionliveness/BUILD.bazel @@ -6,19 +6,25 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/regionliveness", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvpb", + "//pkg/roachpb", "//pkg/server/serverpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catpb", - "//pkg/sql/isql", + "//pkg/sql/catalog/systemschema", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", + "//pkg/sql/rowenc/keyside", + "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", "//pkg/sql/sqlliveness/slbase", "//pkg/sql/types", + "//pkg/util/encoding", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/sql/regionliveness/prober.go b/pkg/sql/regionliveness/prober.go index 406938c57c09..1fb6b1809d53 100644 --- a/pkg/sql/regionliveness/prober.go +++ b/pkg/sql/regionliveness/prober.go @@ -11,23 +11,30 @@ package regionliveness import ( + "bytes" "context" "sort" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -91,17 +98,37 @@ type CachedDatabaseRegions interface { } type livenessProber struct { - db isql.DB + db *kv.DB + codec keys.SQLCodec + kvWriter bootstrap.KVWriter cachedDBRegions CachedDatabaseRegions settings *clustersettings.Settings } +var probeLivenessTimeout = 15 * time.Second +var testingProbeQueryCallbackFunc func() + +func TestingSetProbeLivenessTimeout(newTimeout time.Duration, probeCallbackFn func()) func() { + oldTimeout := probeLivenessTimeout + probeLivenessTimeout = newTimeout + testingProbeQueryCallbackFunc = probeCallbackFn + return func() { + probeLivenessTimeout = oldTimeout + probeCallbackFn = nil + } +} + // NewLivenessProber creates a new region liveness prober. func NewLivenessProber( - db isql.DB, cachedDBRegions CachedDatabaseRegions, settings *clustersettings.Settings, + db *kv.DB, + codec keys.SQLCodec, + cachedDBRegions CachedDatabaseRegions, + settings *clustersettings.Settings, ) Prober { return &livenessProber{ db: db, + codec: codec, + kvWriter: bootstrap.MakeKVWriter(codec, systemschema.RegionLivenessTable), cachedDBRegions: cachedDBRegions, settings: settings, } @@ -110,24 +137,41 @@ func NewLivenessProber( // ProbeLiveness implements Prober. func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error { // If region liveness is disabled then nothing to do. - regionLivenessEnabled, tableTimeout := l.GetProbeTimeout() + regionLivenessEnabled, _ := l.GetProbeTimeout() if !regionLivenessEnabled { return nil } - const probeQuery = ` -SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region -` + // Resolve the physical value for this region. + regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() + foundIdx := -1 + for i := 0; i < regionEnum.NumEnumMembers(); i++ { + if regionEnum.GetMemberLogicalRepresentation(i) == region { + foundIdx = i + break + } + } + if foundIdx == -1 { + return errors.AssertionFailedf("unable to find region %s in region enum", region) + } + regionEnumValue := tree.NewDBytes(tree.DBytes(regionEnum.GetMemberPhysicalRepresentation(foundIdx))) + // Probe from the SQL instances table to confirm if the region + // is live. err := timeutil.RunWithTimeout(ctx, "probe-liveness", tableTimeout, func(ctx context.Context) error { - return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - _, err := txn.QueryRowEx( - ctx, "probe-sql-instance", txn.KV(), sessiondata.NodeUserSessionDataOverride, - probeQuery, region, - ) + return l.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if testingProbeQueryCallbackFunc != nil { + testingProbeQueryCallbackFunc() + } + instancesTable := systemschema.SQLInstancesTable() + indexPrefix := l.codec.IndexPrefix(uint32(instancesTable.GetID()), uint32(instancesTable.GetPrimaryIndexID())) + regionPrefixBytes, err := keyside.Encode(indexPrefix, regionEnumValue, encoding.Ascending) if err != nil { return err } - return nil + regionPrefix := roachpb.Key(regionPrefixBytes) + regionPrefixEnd := regionPrefix.PrefixEnd() + _, err = txn.Scan(ctx, regionPrefix, regionPrefixEnd, 0) + return err }) }) @@ -137,34 +181,36 @@ SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_in } // Region has gone down, set the unavailable_at time on it - return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return l.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { defaultTTL := slbase.DefaultTTL.Get(&l.settings.SV) defaultHeartbeat := slbase.DefaultHeartBeat.Get(&l.settings.SV) // Get the read timestamp and pick a commit deadline. - readTS := txn.KV().ReadTimestamp().AddDuration(defaultHeartbeat) - txnTS := readTS.AddDuration(defaultTTL) - // Insert a new unavailable_at time if one doesn't exist already. - _, err = txn.Exec(ctx, "mark-region-unavailable", txn.KV(), - "INSERT INTO system.region_liveness(crdb_region, unavailable_at) VALUES ($1, $2) "+ - "ON CONFLICT (crdb_region) DO NOTHING", - region, - txnTS.GoTime()) + commitDeadline := txn.ReadTimestamp().AddDuration(defaultHeartbeat) + txnTS := commitDeadline.AddDuration(defaultTTL) + if err := txn.UpdateDeadline(ctx, commitDeadline); err != nil { + return err + } + ba := txn.NewBatch() + // Insert a new unavailable_at time. + err := l.kvWriter.Insert(ctx, ba, false, regionEnumValue, tree.MustMakeDTimestamp(txnTS.GoTime(), time.Microsecond)) if err != nil { return err } - // Transaction has moved the read timestamp forward, - // so force a retry. - if txn.KV().ReadTimestamp().After(readTS) { - return txn.KV().GenerateForcedRetryableErr(ctx, "read timestamp has moved unable to set deadline.") + if err := txn.Run(ctx, ba); err != nil { + // Conditional put failing is fine, since it means someone else + // has marked the region as dead. + if errors.HasType(err, &kvpb.ConditionFailedError{}) { + return nil + } + return err } - return txn.KV().UpdateDeadline(ctx, readTS) + return nil }) } // QueryLiveness implements Prober. func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRegions, error) { - executor := l.db.Executor() // Database is not multi-region so report a single region. if l.cachedDBRegions == nil || !l.cachedDBRegions.IsMultiRegion() { @@ -181,21 +227,45 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe if !RegionLivenessEnabled.Get(&l.settings.SV) { return regionStatus, nil } - // Detect and down regions and remove them. - rows, err := executor.QueryBufferedEx( - ctx, "query-region-liveness", txn, sessiondata.NodeUserSessionDataOverride, - "SELECT crdb_region, unavailable_at FROM system.region_liveness", - ) + // Scan the entire regin + regionLivenessIndex := l.codec.IndexPrefix(uint32(systemschema.RegionLivenessTable.GetID()), uint32(systemschema.RegionLivenessTable.GetPrimaryIndexID())) + keyValues, err := txn.Scan(ctx, regionLivenessIndex, regionLivenessIndex.PrefixEnd(), 0) if err != nil { return nil, err } - for _, row := range rows { - enum, _ := tree.AsDEnum(row[0]) - unavailableAt := tree.MustBeDTimestamp(row[1]) + // Detect and down regions and remove them. + datumAlloc := &tree.DatumAlloc{} + for _, keyValue := range keyValues { + tuple, err := keyValue.Value.GetTuple() + if err != nil { + return nil, err + } + enumDatum, _, err := keyside.Decode(datumAlloc, types.Bytes, keyValue.Key[len(regionLivenessIndex):], encoding.Ascending) + if err != nil { + return nil, err + } + enumBytes := enumDatum.(*tree.DBytes) + regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() + regionEnumIdx := -1 + for i := 0; i < regionEnum.NumEnumMembers(); i++ { + if bytes.Equal(regionEnum.GetMemberPhysicalRepresentation(i), []byte(*enumBytes)) { + regionEnumIdx = i + break + } + } + // Newly added enum we don't know abut. + if regionEnumIdx == -1 { + continue + } + ts, _, err := valueside.Decode(datumAlloc, types.Timestamp, tuple) + if err != nil { + return nil, err + } + unavailableAt := ts.(*tree.DTimestamp) // Region is now officially unavailable, so lets remove // it. if txn.ReadTimestamp().GoTime().After(unavailableAt.Time) { - delete(regionStatus, enum.LogicalRep) + delete(regionStatus, regionEnum.GetMemberLogicalRepresentation(regionEnumIdx)) } } return regionStatus, nil From ea69f68b416db91b3c5c4f5eb5d0ff73c9e39618 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 30 Nov 2023 11:18:08 -0500 Subject: [PATCH 3/4] sqlliveness: adopt regionliveness for IsAlive Previously, sqlliveness would probe the system.sqlliveness table, but had no logic to check for region liveness. Which meant it ran the risk of potentially getting stuck on dead regions if the system database is moved to SURVIVE REGION. This patch adopts the regionliveness subsystem so that these queries have timeouts and probe for dead regions. This allows subsystems that rely on IsAlive like jobs to take advantage of region liveness. Fixes: #115563 Release note: None --- pkg/ccl/multiregionccl/regionliveness_test.go | 22 +++++- pkg/sql/regionliveness/prober.go | 72 ++++++++++++++----- pkg/sql/sqlliveness/slstorage/BUILD.bazel | 1 + pkg/sql/sqlliveness/slstorage/key_encoder.go | 14 ++-- .../sqlliveness/slstorage/key_encoder_test.go | 3 +- pkg/sql/sqlliveness/slstorage/sessionid.go | 26 +++++++ pkg/sql/sqlliveness/slstorage/slstorage.go | 39 ++++++++-- 7 files changed, 142 insertions(+), 35 deletions(-) diff --git a/pkg/ccl/multiregionccl/regionliveness_test.go b/pkg/ccl/multiregionccl/regionliveness_test.go index c0445a6e46a9..d4b16ae24b1b 100644 --- a/pkg/ccl/multiregionccl/regionliveness_test.go +++ b/pkg/ccl/multiregionccl/regionliveness_test.go @@ -184,6 +184,24 @@ func TestRegionLivenessProber(t *testing.T) { if _, ok := regions[expectedRegions[1]]; ok { return errors.AssertionFailedf("removed region detected %s", expectedRegions[1]) } + // Similarly query the unavailable physcial regions + unavailablePhysicalRegions, err := regionProber.QueryUnavailablePhysicalRegions(ctx, txn) + if err != nil { + return err + } + if len(unavailablePhysicalRegions) != 1 { + return errors.AssertionFailedf("physical region was not marked as unavailable") + } + // Validate the physical region marked as unavailable is correct + regionTypeDesc := cachedRegionProvider.GetRegionEnumTypeDesc() + for i := 0; i < regionTypeDesc.NumEnumMembers(); i++ { + if regionTypeDesc.GetMemberLogicalRepresentation(i) != expectedRegions[1] { + continue + } + if !unavailablePhysicalRegions.ContainsPhysicalRepresentation(string(regionTypeDesc.GetMemberPhysicalRepresentation(i))) { + return errors.AssertionFailedf("incorrect physical region was marked as unavailable %v", unavailablePhysicalRegions) + } + } return nil }) }) @@ -316,7 +334,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { require.NoError(t, tx.Rollback()) - // Validate we can have a "droped" region and the query won't fail. + // Validate we can have a "dropped" region and the query won't fail. lm := tenants[0].LeaseManager().(*lease.Manager) cachedDatabaseRegions, err := regions.NewCachedDatabaseRegions(ctx, tenants[0].DB(), lm) require.NoError(t, err) @@ -327,7 +345,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { PhysicalRepresentation: nil, LogicalRepresentation: "FakeRegion", }) - return mut + return builder.BuildExistingMutableType() }) require.NoError(t, lm.WaitForNoVersion(ctx, descpb.ID(tableID), cachedDatabaseRegions, retry.Options{})) } diff --git a/pkg/sql/regionliveness/prober.go b/pkg/sql/regionliveness/prober.go index 1fb6b1809d53..5b519805445a 100644 --- a/pkg/sql/regionliveness/prober.go +++ b/pkg/sql/regionliveness/prober.go @@ -11,7 +11,6 @@ package regionliveness import ( - "bytes" "context" "sort" "time" @@ -73,13 +72,31 @@ func (l LiveRegions) ForEach(fn func(region string) error) error { return nil } +// UnavailableAtPhysicalRegions is map of regions (in physical representation). +type UnavailableAtPhysicalRegions map[string]struct{} + +// ContainsPhysicalRepresentation contains the physical representation of a region +// as stored inside the KV. +func (u UnavailableAtPhysicalRegions) ContainsPhysicalRepresentation( + physicalRepresentationForRegion string, +) bool { + _, ok := u[physicalRepresentationForRegion] + return ok +} + // Prober used to determine the set of regions which are still alive. type Prober interface { // ProbeLiveness can be used after a timeout to label a regions as unavailable. ProbeLiveness(ctx context.Context, region string) error + // ProbeLivenessWithPhysicalRegion can be used after a timeout to label a regions as unavailable, + // with this version only the physical representation is required. + ProbeLivenessWithPhysicalRegion(ctx context.Context, regionBytes []byte) error // QueryLiveness can be used to get the list of regions which are currently // accessible. QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRegions, error) + // QueryUnavailablePhysicalRegions returns a list of regions that are unavailable at + // right now as physical representations. + QueryUnavailablePhysicalRegions(ctx context.Context, txn *kv.Txn) (UnavailableAtPhysicalRegions, error) // GetProbeTimeout gets maximum timeout waiting on a table before issuing // liveness queries. GetProbeTimeout() (bool, time.Duration) @@ -153,7 +170,18 @@ func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error if foundIdx == -1 { return errors.AssertionFailedf("unable to find region %s in region enum", region) } - regionEnumValue := tree.NewDBytes(tree.DBytes(regionEnum.GetMemberPhysicalRepresentation(foundIdx))) + return l.ProbeLivenessWithPhysicalRegion(ctx, regionEnum.GetMemberPhysicalRepresentation(foundIdx)) +} + +func (l *livenessProber) ProbeLivenessWithPhysicalRegion( + ctx context.Context, regionBytes []byte, +) error { + // If region liveness is disabled then nothing to do. + regionLivenessEnabled, tableTimeout := l.GetProbeTimeout() + if !regionLivenessEnabled { + return nil + } + regionEnumValue := tree.NewDBytes(tree.DBytes(regionBytes)) // Probe from the SQL instances table to confirm if the region // is live. err := timeutil.RunWithTimeout(ctx, "probe-liveness", tableTimeout, @@ -206,7 +234,6 @@ func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error } return nil }) - } // QueryLiveness implements Prober. @@ -227,13 +254,32 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe if !RegionLivenessEnabled.Get(&l.settings.SV) { return regionStatus, nil } - // Scan the entire regin + // Detect and down regions and remove them. + unavailableAtRegions, err := l.QueryUnavailablePhysicalRegions(ctx, txn) + if err != nil { + return nil, err + } + regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() + for i := 0; i < regionEnum.NumEnumMembers(); i++ { + if unavailableAtRegions.ContainsPhysicalRepresentation(string(regionEnum.GetMemberPhysicalRepresentation(i))) { + delete(regionStatus, regionEnum.GetMemberLogicalRepresentation(i)) + } + } + return regionStatus, nil +} + +// QueryUnavailablePhysicalRegions implements Prober. +func (l *livenessProber) QueryUnavailablePhysicalRegions( + ctx context.Context, txn *kv.Txn, +) (UnavailableAtPhysicalRegions, error) { + // Scan the entire region liveness table. regionLivenessIndex := l.codec.IndexPrefix(uint32(systemschema.RegionLivenessTable.GetID()), uint32(systemschema.RegionLivenessTable.GetPrimaryIndexID())) keyValues, err := txn.Scan(ctx, regionLivenessIndex, regionLivenessIndex.PrefixEnd(), 0) if err != nil { return nil, err } - // Detect and down regions and remove them. + // Detect any down regions and remove them. + unavailableAtRegions := make(UnavailableAtPhysicalRegions) datumAlloc := &tree.DatumAlloc{} for _, keyValue := range keyValues { tuple, err := keyValue.Value.GetTuple() @@ -245,18 +291,6 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe return nil, err } enumBytes := enumDatum.(*tree.DBytes) - regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() - regionEnumIdx := -1 - for i := 0; i < regionEnum.NumEnumMembers(); i++ { - if bytes.Equal(regionEnum.GetMemberPhysicalRepresentation(i), []byte(*enumBytes)) { - regionEnumIdx = i - break - } - } - // Newly added enum we don't know abut. - if regionEnumIdx == -1 { - continue - } ts, _, err := valueside.Decode(datumAlloc, types.Timestamp, tuple) if err != nil { return nil, err @@ -265,10 +299,10 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe // Region is now officially unavailable, so lets remove // it. if txn.ReadTimestamp().GoTime().After(unavailableAt.Time) { - delete(regionStatus, regionEnum.GetMemberLogicalRepresentation(regionEnumIdx)) + unavailableAtRegions[string(*enumBytes)] = struct{}{} } } - return regionStatus, nil + return unavailableAtRegions, nil } // GetProbeTimeout gets maximum timeout waiting on a table before issuing diff --git a/pkg/sql/sqlliveness/slstorage/BUILD.bazel b/pkg/sql/sqlliveness/slstorage/BUILD.bazel index 68ba572471f2..335b93c5849b 100644 --- a/pkg/sql/sqlliveness/slstorage/BUILD.bazel +++ b/pkg/sql/sqlliveness/slstorage/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/settings/cluster", "//pkg/sql/catalog", "//pkg/sql/catalog/systemschema", + "//pkg/sql/regionliveness", "//pkg/sql/sem/eval", "//pkg/sql/sqlliveness", "//pkg/util/admission/admissionpb", diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder.go b/pkg/sql/sqlliveness/slstorage/key_encoder.go index 3e866a33f3f7..9981d1fa4747 100644 --- a/pkg/sql/sqlliveness/slstorage/key_encoder.go +++ b/pkg/sql/sqlliveness/slstorage/key_encoder.go @@ -22,7 +22,7 @@ import ( // keyCodec manages the SessionID <-> roachpb.Key mapping. type keyCodec interface { - encode(sid sqlliveness.SessionID) (roachpb.Key, error) + encode(sid sqlliveness.SessionID) (roachpb.Key, string, error) decode(key roachpb.Key) (sqlliveness.SessionID, error) // indexPrefix returns the prefix for an encoded key. encode() will return @@ -37,21 +37,21 @@ type rbrEncoder struct { rbrIndex roachpb.Key } -func (e *rbrEncoder) encode(session sqlliveness.SessionID) (roachpb.Key, error) { - region, _, err := UnsafeDecodeSessionID(session) +func (e *rbrEncoder) encode(session sqlliveness.SessionID) (roachpb.Key, string, error) { + region, _, err := SafeDecodeSessionID(session) if err != nil { - return nil, err + return nil, "", err } if len(region) == 0 { - return nil, errors.Newf("legacy session passed to rbr table: '%s'", session.String()) + return nil, "", errors.Newf("legacy session passed to rbr table: '%s'", session.String()) } const columnFamilyID = 0 key := e.indexPrefix() - key = encoding.EncodeBytesAscending(key, region) + key = encoding.EncodeBytesAscending(key, encoding.UnsafeConvertStringToBytes(region)) key = encoding.EncodeBytesAscending(key, session.UnsafeBytes()) - return keys.MakeFamilyKey(key, columnFamilyID), nil + return keys.MakeFamilyKey(key, columnFamilyID), region, nil } func (e *rbrEncoder) decode(key roachpb.Key) (sqlliveness.SessionID, error) { diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder_test.go b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go index b5ccfc61eb90..3bb21675a7ba 100644 --- a/pkg/sql/sqlliveness/slstorage/key_encoder_test.go +++ b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go @@ -49,9 +49,10 @@ func TestKeyEncoder(t *testing.T) { id, err := MakeSessionID(enum.One, uuid.MakeV4()) require.NoError(t, err) - key, err := codec.encode(id) + key, region, err := codec.encode(id) require.NoError(t, err) require.True(t, bytes.HasPrefix(key, codec.indexPrefix())) + require.Equal(t, region, string(enum.One)) decodedID, err := codec.decode(key) require.NoError(t, err) diff --git a/pkg/sql/sqlliveness/slstorage/sessionid.go b/pkg/sql/sqlliveness/slstorage/sessionid.go index 060290ad5e03..b21f9d2fa841 100644 --- a/pkg/sql/sqlliveness/slstorage/sessionid.go +++ b/pkg/sql/sqlliveness/slstorage/sessionid.go @@ -90,3 +90,29 @@ func UnsafeDecodeSessionID(session sqlliveness.SessionID) (region, id []byte, er return rest[:regionLen], rest[regionLen:], nil } + +// SafeDecodeSessionID decodes the region and id from the SessionID. +func SafeDecodeSessionID(session sqlliveness.SessionID) (region, id string, err error) { + if len(session) == legacyLen { + return "", "", errors.Newf("unexpected legacy SessionID format") + } + if len(session) < minimumNonLegacyLen { + // The smallest valid v1 session id is a [version, 1, single_byte_region, uuid...], + // which is three bytes larger than a uuid. + return "", "", errors.New("session id is too short") + } + + // Decode the version. + if session[0] != sessionIDVersion { + return "", "", errors.Newf("invalid session id version: %d", session[0]) + } + regionLen := int(session[1]) + rest := session[2:] + + // Decode and validate the length of the region. + if len(rest) != regionLen+uuid.Size { + return "", "", errors.Newf("session id with length %d is the wrong size to include a region with length %d", len(session), regionLen) + } + + return string(rest[:regionLen]), string(rest[regionLen:]), nil +} diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 7827bad06de1..6e99ac3f4239 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -309,15 +310,35 @@ func (s *Storage) deleteOrFetchSession( var deleted bool var prevExpiration hlc.Timestamp ctx = multitenant.WithTenantCostControlExemption(ctx) + livenessProber := regionliveness.NewLivenessProber(s.db, s.codec, nil, s.settings) + k, regionPhysicalRep, err := s.keyCodec.encode(sid) if err := s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Reset captured variable in case of retry. deleted, expiration, prevExpiration = false, hlc.Timestamp{}, hlc.Timestamp{} - - k, err := s.keyCodec.encode(sid) if err != nil { return err } - kv, err := txn.Get(ctx, k) + if unavailableAtRegions, err := livenessProber.QueryUnavailablePhysicalRegions(ctx, txn); err != nil || + unavailableAtRegions.ContainsPhysicalRepresentation(regionPhysicalRep) { + return err + } + execWithTimeout, timeout := livenessProber.GetProbeTimeout() + var kv kv.KeyValue + if execWithTimeout { + // Detect if we fail to a region and force a probe in that + // case. + err = timeutil.RunWithTimeout(ctx, "fetch-session", timeout, func(ctx context.Context) error { + kvInner, err := txn.Get(ctx, k) + kv = kvInner + return err + }) + + if err != nil { + return err + } + } else { + kv, err = txn.Get(ctx, k) + } if err != nil { return err } @@ -343,6 +364,12 @@ func (s *Storage) deleteOrFetchSession( return txn.CommitInBatch(ctx, ba) }); err != nil { + if regionliveness.IsQueryTimeoutErr(err) { + probeErr := livenessProber.ProbeLivenessWithPhysicalRegion(ctx, encoding.UnsafeConvertStringToBytes(regionPhysicalRep)) + if probeErr != nil { + err = errors.WithSecondaryError(err, probeErr) + } + } return false, hlc.Timestamp{}, errors.Wrapf(err, "could not query session id: %s", sid) } @@ -468,7 +495,7 @@ func (s *Storage) Insert( if err := s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { batch := txn.NewBatch() - k, err := s.keyCodec.encode(sid) + k, _, err := s.keyCodec.encode(sid) if err != nil { return err } @@ -492,7 +519,7 @@ func (s *Storage) Update( ) (sessionExists bool, err error) { ctx = multitenant.WithTenantCostControlExemption(ctx) err = s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - k, err := s.keyCodec.encode(sid) + k, _, err := s.keyCodec.encode(sid) if err != nil { return err } @@ -525,7 +552,7 @@ func (s *Storage) Delete(ctx context.Context, session sqlliveness.SessionID) err return s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { batch := txn.NewBatch() - key, err := s.keyCodec.encode(session) + key, _, err := s.keyCodec.encode(session) if err != nil { return err } From e53bda28f4b5d2cb9fbb7294e4ded270e8a37df5 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 30 Nov 2023 22:21:11 -0500 Subject: [PATCH 4/4] multiregionccl: setup MR system db properly for regionliveness_test Previously, the regionliveness_test set up all the tenants and then added the regions on the system database. Unfortuantely, specific subsystems like sqlliveness need the region enum to be configured *before* a new tenant is added. This patch, changes the order of operations so that the enum is updated first. Release note: None --- pkg/ccl/multiregionccl/regionliveness_test.go | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/multiregionccl/regionliveness_test.go b/pkg/ccl/multiregionccl/regionliveness_test.go index d4b16ae24b1b..f7593051a6c3 100644 --- a/pkg/ccl/multiregionccl/regionliveness_test.go +++ b/pkg/ccl/multiregionccl/regionliveness_test.go @@ -237,6 +237,8 @@ func TestRegionLivenessProberForLeases(t *testing.T) { } detectLeaseWait := atomic.Bool{} targetCount := atomic.Int64{} + var tenants []serverutils.ApplicationLayerInterface + var tenantSQL []*gosql.DB defer regionliveness.TestingSetProbeLivenessTimeout(1*time.Second, func() { if !detectLeaseWait.Load() { return @@ -255,11 +257,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { id, err := roachpb.MakeTenantID(11) require.NoError(t, err) - - var tenants []serverutils.ApplicationLayerInterface - var tenantSQL []*gosql.DB - - for _, s := range testCluster.Servers { + for i, s := range testCluster.Servers { tenantArgs := base.TestTenantArgs{ Settings: makeSettings(), TenantID: id, @@ -285,16 +283,21 @@ func TestRegionLivenessProberForLeases(t *testing.T) { tenant, tenantDB := serverutils.StartTenant(t, s, tenantArgs) tenantSQL = append(tenantSQL, tenantDB) tenants = append(tenants, tenant) + // Before the other tenants are added we need to configure the system database, + // otherwise they will come up in a non multi-region mode and not all subsystems + // will be aware (i.e. session ID and SQL instance will not be MR aware). + if i == 0 { + // Convert into a multi-region DB. + _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value)) + require.NoError(t, err) + for i := 1; i < len(expectedRegions); i++ { + _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i])) + require.NoError(t, err) + } + _, err = tenantSQL[0].Exec("ALTER DATABASE system SURVIVE ZONE FAILURE") + require.NoError(t, err) + } } - // Convert into a multi-region DB. - _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value)) - require.NoError(t, err) - for i := 1; i < len(expectedRegions); i++ { - _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i])) - require.NoError(t, err) - } - _, err = tenantSQL[0].Exec("ALTER DATABASE system SURVIVE ZONE FAILURE") - require.NoError(t, err) // Override the table timeout probe for testing. for _, ts := range tenants { regionliveness.RegionLivenessProbeTimeout.Override(ctx, &ts.ClusterSettings().SV, testingRegionLivenessProbeTimeout) @@ -314,7 +317,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { detectLeaseWait.Swap(true) _, err = tenantSQL[1].Exec("ALTER TABLE t1 ADD COLUMN i INT") require.ErrorContainsf(t, err, "count-lease timed out reading from a region", "failed to timeout") - // Keep an active lease on node 1, but it will be seen as ignored eventually + // Keep an active lease on node 0, but it will be seen as ignored eventually // because the region will start to get quarantined. tx, err := tenantSQL[0].Begin() require.NoError(t, err)