diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index dde583f7af40..4abcec1c1256 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2156,6 +2156,7 @@ GO_TARGETS = [ "//pkg/sql/sqlitelogictest/tests/local-vec-off:local-vec-off_test", "//pkg/sql/sqlitelogictest/tests/local:local_test", "//pkg/sql/sqlitelogictest:sqlitelogictest", + "//pkg/sql/sqlliveness/slbase:slbase", "//pkg/sql/sqlliveness/slinstance:slinstance", "//pkg/sql/sqlliveness/slinstance:slinstance_test", "//pkg/sql/sqlliveness/slprovider:slprovider", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 78db59d2f59d..d91702b18268 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -291,7 +291,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/storage", "//pkg/testutils", diff --git a/pkg/ccl/backupccl/backup_tenant_test.go b/pkg/ccl/backupccl/backup_tenant_test.go index d65c45ea8827..54bab8eb219c 100644 --- a/pkg/ccl/backupccl/backup_tenant_test.go +++ b/pkg/ccl/backupccl/backup_tenant_test.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" _ "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -101,7 +101,7 @@ func TestBackupSharedProcessTenantNodeDown(t *testing.T) { // for instance-based planning to recognize the downed node. sv := &tenantApp.ClusterSettings().SV padding := 10 * time.Second - timeout := slinstance.DefaultTTL.Get(sv) + slinstance.DefaultHeartBeat.Get(sv) + padding + timeout := slbase.DefaultTTL.Get(sv) + slbase.DefaultHeartBeat.Get(sv) + padding testutils.SucceedsWithin(t, func() error { _, err := tenantDB.Exec("BACKUP INTO 'nodelocal://1/worker-failure'") return err diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel index 558e1f699cee..60114ef4b944 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/BUILD.bazel @@ -25,7 +25,7 @@ go_test( "//pkg/spanconfig", "//pkg/sql/sem/eval", "//pkg/sql/sqlinstance/instancestorage", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go index 6e38590146c5..cddd30b0479c 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeccl/tenant_upgrade_test.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -448,8 +448,8 @@ func TestTenantUpgradeFailure(t *testing.T) { // Shorten the reclaim loop so that terminated SQL servers don't block // the upgrade from succeeding. instancestorage.ReclaimLoopInterval.Override(ctx, &settings.SV, 250*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 15*time.Second) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 500*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 15*time.Second) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 500*time.Millisecond) tenantStopper := stop.NewStopper() // Initialize the version to the minimum it could be. require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV)) diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel index af38c14e5023..89e5a41a018e 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/BUILD.bazel @@ -30,7 +30,7 @@ go_test( "//pkg/sql/catalog/lease", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlinstance/instancestorage", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/testutils/serverutils", "//pkg/testutils/skip", # keep diff --git a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go index 6feb7b946f63..85780de45b03 100644 --- a/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/local_test_util_test.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -114,8 +114,8 @@ func runTest(t *testing.T, variant sharedtestutil.TestVariant, test sharedtestut ttlOverride *= 10 } heartbeatOverride := ttlOverride / 10 - slinstance.DefaultTTL.Override(ctx, &s.SV, ttlOverride) - slinstance.DefaultHeartBeat.Override(ctx, &s.SV, heartbeatOverride) + slbase.DefaultTTL.Override(ctx, &s.SV, ttlOverride) + slbase.DefaultHeartBeat.Override(ctx, &s.SV, heartbeatOverride) } // Initialize the version to the MinSupportedVersion so that we can perform diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index f564bc5a0d94..d70c91932e7d 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -86,7 +86,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", "//pkg/testutils", diff --git a/pkg/ccl/multiregionccl/regionliveness_test.go b/pkg/ccl/multiregionccl/regionliveness_test.go index 106f0c9f48c4..f7593051a6c3 100644 --- a/pkg/ccl/multiregionccl/regionliveness_test.go +++ b/pkg/ccl/multiregionccl/regionliveness_test.go @@ -32,7 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" "github.com/cockroachdb/cockroach/pkg/sql/regions" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -65,7 +65,7 @@ func TestRegionLivenessProber(t *testing.T) { makeSettings := func() *cluster.Settings { cs := cluster.MakeTestingClusterSettings() instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) + slbase.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true) return cs } @@ -88,23 +88,19 @@ func TestRegionLivenessProber(t *testing.T) { var tenants []serverutils.ApplicationLayerInterface var tenantSQL []*gosql.DB blockProbeQuery := atomic.Bool{} + defer regionliveness.TestingSetProbeLivenessTimeout(500*time.Millisecond, + func() { + // Timeout attempts to probe intentionally. + if blockProbeQuery.Swap(false) { + time.Sleep(2 * time.Second) + } + })() for _, s := range testCluster.Servers { tenantArgs := base.TestTenantArgs{ Settings: makeSettings(), TenantID: id, Locality: s.Locality(), - TestingKnobs: base.TestingKnobs{ - SQLExecutor: &sql.ExecutorTestingKnobs{ - BeforeExecute: func(ctx context.Context, stmt string, descriptors *descs.Collection) { - const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region" - if strings.Contains(stmt, probeQuery) && blockProbeQuery.Swap(false) { - // Timeout this query intentionally. - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - } - }, - }, - }, } ts, tenantDB := serverutils.StartTenant(t, s, tenantArgs) tenants = append(tenants, ts) @@ -126,7 +122,7 @@ func TestRegionLivenessProber(t *testing.T) { cachedRegionProvider, err = regions.NewCachedDatabaseRegions(ctx, tenants[0].DB(), tenants[0].LeaseManager().(*lease.Manager)) require.NoError(t, err) idb := tenants[0].InternalDB().(isql.DB) - regionProber := regionliveness.NewLivenessProber(idb, cachedRegionProvider, tenants[0].ClusterSettings()) + regionProber := regionliveness.NewLivenessProber(idb.KV(), tenants[0].Codec(), cachedRegionProvider, tenants[0].ClusterSettings()) // Validates the expected regions versus the region liveness set. checkExpectedRegions := func(expectedRegions []string, regions regionliveness.LiveRegions) { require.Equalf(t, len(regions), len(expectedRegions), @@ -188,6 +184,24 @@ func TestRegionLivenessProber(t *testing.T) { if _, ok := regions[expectedRegions[1]]; ok { return errors.AssertionFailedf("removed region detected %s", expectedRegions[1]) } + // Similarly query the unavailable physcial regions + unavailablePhysicalRegions, err := regionProber.QueryUnavailablePhysicalRegions(ctx, txn) + if err != nil { + return err + } + if len(unavailablePhysicalRegions) != 1 { + return errors.AssertionFailedf("physical region was not marked as unavailable") + } + // Validate the physical region marked as unavailable is correct + regionTypeDesc := cachedRegionProvider.GetRegionEnumTypeDesc() + for i := 0; i < regionTypeDesc.NumEnumMembers(); i++ { + if regionTypeDesc.GetMemberLogicalRepresentation(i) != expectedRegions[1] { + continue + } + if !unavailablePhysicalRegions.ContainsPhysicalRepresentation(string(regionTypeDesc.GetMemberPhysicalRepresentation(i))) { + return errors.AssertionFailedf("incorrect physical region was marked as unavailable %v", unavailablePhysicalRegions) + } + } return nil }) }) @@ -211,7 +225,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { makeSettings := func() *cluster.Settings { cs := cluster.MakeTestingClusterSettings() instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond) - slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) + slbase.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second) regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true) return cs } @@ -222,6 +236,18 @@ func TestRegionLivenessProberForLeases(t *testing.T) { "us-west", } detectLeaseWait := atomic.Bool{} + targetCount := atomic.Int64{} + var tenants []serverutils.ApplicationLayerInterface + var tenantSQL []*gosql.DB + defer regionliveness.TestingSetProbeLivenessTimeout(1*time.Second, func() { + if !detectLeaseWait.Load() { + return + } + time.Sleep(time.Second * 2) + targetCount.Swap(0) + detectLeaseWait.Swap(false) + })() + testCluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t, expectedRegions, 1, @@ -231,12 +257,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { id, err := roachpb.MakeTenantID(11) require.NoError(t, err) - - var tenants []serverutils.ApplicationLayerInterface - var tenantSQL []*gosql.DB - targetCount := atomic.Int64{} - - for _, s := range testCluster.Servers { + for i, s := range testCluster.Servers { tenantArgs := base.TestTenantArgs{ Settings: makeSettings(), TenantID: id, @@ -247,18 +268,13 @@ func TestRegionLivenessProberForLeases(t *testing.T) { if !detectLeaseWait.Load() { return } - const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.public.crdb_internal_region" const leaseQuery = "SELECT count(1) FROM system.public.lease AS OF SYSTEM TIME" // Fail intentionally, when we go to probe the first region. if strings.Contains(stmt, leaseQuery) { if targetCount.Add(1) != 1 { return } - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - } else if strings.Contains(stmt, probeQuery) { - time.Sleep(testingRegionLivenessProbeTimeout + time.Second) - targetCount.Swap(0) - detectLeaseWait.Swap(false) + time.Sleep(time.Second * 2) } }, }, @@ -267,16 +283,21 @@ func TestRegionLivenessProberForLeases(t *testing.T) { tenant, tenantDB := serverutils.StartTenant(t, s, tenantArgs) tenantSQL = append(tenantSQL, tenantDB) tenants = append(tenants, tenant) + // Before the other tenants are added we need to configure the system database, + // otherwise they will come up in a non multi-region mode and not all subsystems + // will be aware (i.e. session ID and SQL instance will not be MR aware). + if i == 0 { + // Convert into a multi-region DB. + _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value)) + require.NoError(t, err) + for i := 1; i < len(expectedRegions); i++ { + _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i])) + require.NoError(t, err) + } + _, err = tenantSQL[0].Exec("ALTER DATABASE system SURVIVE ZONE FAILURE") + require.NoError(t, err) + } } - // Convert into a multi-region DB. - _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value)) - require.NoError(t, err) - for i := 1; i < len(expectedRegions); i++ { - _, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i])) - require.NoError(t, err) - } - _, err = tenantSQL[0].Exec("ALTER DATABASE system SURVIVE ZONE FAILURE") - require.NoError(t, err) // Override the table timeout probe for testing. for _, ts := range tenants { regionliveness.RegionLivenessProbeTimeout.Override(ctx, &ts.ClusterSettings().SV, testingRegionLivenessProbeTimeout) @@ -296,7 +317,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { detectLeaseWait.Swap(true) _, err = tenantSQL[1].Exec("ALTER TABLE t1 ADD COLUMN i INT") require.ErrorContainsf(t, err, "count-lease timed out reading from a region", "failed to timeout") - // Keep an active lease on node 1, but it will be seen as ignored eventually + // Keep an active lease on node 0, but it will be seen as ignored eventually // because the region will start to get quarantined. tx, err := tenantSQL[0].Begin() require.NoError(t, err) @@ -316,7 +337,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { require.NoError(t, tx.Rollback()) - // Validate we can have a "droped" region and the query won't fail. + // Validate we can have a "dropped" region and the query won't fail. lm := tenants[0].LeaseManager().(*lease.Manager) cachedDatabaseRegions, err := regions.NewCachedDatabaseRegions(ctx, tenants[0].DB(), lm) require.NoError(t, err) @@ -327,7 +348,7 @@ func TestRegionLivenessProberForLeases(t *testing.T) { PhysicalRepresentation: nil, LogicalRepresentation: "FakeRegion", }) - return mut + return builder.BuildExistingMutableType() }) require.NoError(t, lm.WaitForNoVersion(ctx, descpb.ID(tableID), cachedDatabaseRegions, retry.Options{})) } diff --git a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel index a72ea80394ab..de5fa8eb0ace 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel +++ b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel @@ -86,7 +86,7 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/sem/eval", "//pkg/sql/sqlliveness", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/stats", "//pkg/testutils", "//pkg/testutils/datapathutils", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index f66ff1be82d6..bd32b22431df 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -48,7 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" @@ -1024,7 +1024,7 @@ func TestSQLLivenessExemption(t *testing.T) { st := cluster.MakeTestingClusterSettings() // Make the tenant heartbeat like crazy. ctx := context.Background() - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) _, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ TenantID: tenantID, diff --git a/pkg/ccl/testccl/sqlccl/BUILD.bazel b/pkg/ccl/testccl/sqlccl/BUILD.bazel index 5b6affecd614..837ff875ced1 100644 --- a/pkg/ccl/testccl/sqlccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlccl/BUILD.bazel @@ -44,7 +44,7 @@ go_test( "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/sessiondatapb", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqltestutils", "//pkg/sql/tests", "//pkg/testutils", diff --git a/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go b/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go index 6ee627eedeca..f385a61ccefc 100644 --- a/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go +++ b/pkg/ccl/testccl/sqlccl/temp_table_clean_test.go @@ -16,7 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -104,8 +104,8 @@ func TestTenantTempTableCleanup(t *testing.T) { sql.TempObjectWaitInterval.Override(ctx, &st.SV, time.Second*0) // Set up sessions to expire within 5 seconds of a // nodes death. - slinstance.DefaultTTL.Override(ctx, &st.SV, 5*time.Second) - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Second) + slbase.DefaultTTL.Override(ctx, &st.SV, 5*time.Second) + slbase.DefaultHeartBeat.Override(ctx, &st.SV, time.Second) return st } diff --git a/pkg/kv/kvserver/gc/BUILD.bazel b/pkg/kv/kvserver/gc/BUILD.bazel index 85164225f186..0d5df75f935b 100644 --- a/pkg/kv/kvserver/gc/BUILD.bazel +++ b/pkg/kv/kvserver/gc/BUILD.bazel @@ -64,7 +64,7 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/settings/cluster", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", diff --git a/pkg/kv/kvserver/gc/gc_int_test.go b/pkg/kv/kvserver/gc/gc_int_test.go index b460fb16b729..deb67b76986d 100644 --- a/pkg/kv/kvserver/gc/gc_int_test.go +++ b/pkg/kv/kvserver/gc/gc_int_test.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -86,7 +86,7 @@ func TestEndToEndGC(t *testing.T) { settings := cluster.MakeTestingClusterSettings() // Push the TTL up to 60 hours since we emulate a 50 hours // clock jump below. - slinstance.DefaultTTL.Override(ctx, &settings.SV, 60*time.Hour) + slbase.DefaultTTL.Override(ctx, &settings.SV, 60*time.Hour) manualClock := hlc.NewHybridManualClock() s, appSqlDb, appKvDb := serverutils.StartServer(t, base.TestServerArgs{ diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index d0c69f24c222..dec85e8ac5bd 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -56,6 +57,7 @@ func CheckTwoVersionInvariant( ctx context.Context, clock *hlc.Clock, db isql.DB, + codec keys.SQLCodec, descsCol *Collection, regions regionliveness.CachedDatabaseRegions, settings *clustersettings.Settings, @@ -92,7 +94,7 @@ func CheckTwoVersionInvariant( // transaction ends up committing then there won't have been any created // in the meantime. count, err := lease.CountLeases( - ctx, db, regions, settings, withNewVersion, txn.ProvisionalCommitTimestamp(), false, /*forAnyVersion*/ + ctx, db, codec, regions, settings, withNewVersion, txn.ProvisionalCommitTimestamp(), false, /*forAnyVersion*/ ) if err != nil { return err @@ -119,7 +121,7 @@ func CheckTwoVersionInvariant( for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { // Use the current clock time. now := clock.Now() - count, err := lease.CountLeases(ctx, db, regions, settings, withNewVersion, now, false /*forAnyVersion*/) + count, err := lease.CountLeases(ctx, db, codec, regions, settings, withNewVersion, now, false /*forAnyVersion*/) if err != nil { return err } diff --git a/pkg/sql/catalog/lease/count.go b/pkg/sql/catalog/lease/count.go index dfa7c3f9f329..ca562c85080c 100644 --- a/pkg/sql/catalog/lease/count.go +++ b/pkg/sql/catalog/lease/count.go @@ -15,6 +15,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/keys" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" @@ -31,6 +32,7 @@ import ( func CountLeases( ctx context.Context, db isql.DB, + codec keys.SQLCodec, cachedDatabaseRegions regionliveness.CachedDatabaseRegions, settings *clustersettings.Settings, versions []IDVersion, @@ -55,7 +57,7 @@ func CountLeases( if err := txn.KV().SetFixedTimestamp(ctx, at); err != nil { return err } - prober := regionliveness.NewLivenessProber(db, cachedDatabaseRegions, settings) + prober := regionliveness.NewLivenessProber(db.KV(), codec, cachedDatabaseRegions, settings) regionMap, err := prober.QueryLiveness(ctx, txn.KV()) if err != nil { return err diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 85e967ab3c8c..7aada4eaf741 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -134,7 +134,7 @@ func (m *Manager) WaitForNoVersion( } for lastCount, r := 0, retry.Start(retryOpts); r.Next(); { now := m.storage.clock.Now() - count, err := CountLeases(ctx, m.storage.db, cachedDatabaseRegions, m.settings, versions, now, true /*forAnyVersion*/) + count, err := CountLeases(ctx, m.storage.db, m.Codec(), cachedDatabaseRegions, m.settings, versions, now, true /*forAnyVersion*/) if err != nil { return err } @@ -198,7 +198,7 @@ func (m *Manager) WaitForOneVersion( // version of the descriptor. now := m.storage.clock.Now() descs := []IDVersion{NewIDVersionPrev(desc.GetName(), desc.GetID(), desc.GetVersion())} - count, err := CountLeases(ctx, m.storage.db, regions, m.settings, descs, now, false /*forAnyVersion*/) + count, err := CountLeases(ctx, m.storage.db, m.Codec(), regions, m.settings, descs, now, false /*forAnyVersion*/) if err != nil { return nil, err } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 532bcd9002a0..9e37f1064d25 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1340,6 +1340,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context) ctx, ex.server.cfg.Clock, ex.server.cfg.InternalDB, + ex.server.cfg.Codec, ex.extraTxnState.descCollection, regionCache, ex.server.cfg.Settings, diff --git a/pkg/sql/regionliveness/BUILD.bazel b/pkg/sql/regionliveness/BUILD.bazel index 58ce56f70c6b..a6e60123a8ae 100644 --- a/pkg/sql/regionliveness/BUILD.bazel +++ b/pkg/sql/regionliveness/BUILD.bazel @@ -6,19 +6,25 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/regionliveness", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvpb", + "//pkg/roachpb", "//pkg/server/serverpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catpb", - "//pkg/sql/isql", + "//pkg/sql/catalog/systemschema", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", + "//pkg/sql/rowenc/keyside", + "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", - "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/types", + "//pkg/util/encoding", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/sql/regionliveness/prober.go b/pkg/sql/regionliveness/prober.go index 7c8d33dc7dac..5b519805445a 100644 --- a/pkg/sql/regionliveness/prober.go +++ b/pkg/sql/regionliveness/prober.go @@ -15,19 +15,25 @@ import ( "sort" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -66,13 +72,31 @@ func (l LiveRegions) ForEach(fn func(region string) error) error { return nil } +// UnavailableAtPhysicalRegions is map of regions (in physical representation). +type UnavailableAtPhysicalRegions map[string]struct{} + +// ContainsPhysicalRepresentation contains the physical representation of a region +// as stored inside the KV. +func (u UnavailableAtPhysicalRegions) ContainsPhysicalRepresentation( + physicalRepresentationForRegion string, +) bool { + _, ok := u[physicalRepresentationForRegion] + return ok +} + // Prober used to determine the set of regions which are still alive. type Prober interface { // ProbeLiveness can be used after a timeout to label a regions as unavailable. ProbeLiveness(ctx context.Context, region string) error + // ProbeLivenessWithPhysicalRegion can be used after a timeout to label a regions as unavailable, + // with this version only the physical representation is required. + ProbeLivenessWithPhysicalRegion(ctx context.Context, regionBytes []byte) error // QueryLiveness can be used to get the list of regions which are currently // accessible. QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRegions, error) + // QueryUnavailablePhysicalRegions returns a list of regions that are unavailable at + // right now as physical representations. + QueryUnavailablePhysicalRegions(ctx context.Context, txn *kv.Txn) (UnavailableAtPhysicalRegions, error) // GetProbeTimeout gets maximum timeout waiting on a table before issuing // liveness queries. GetProbeTimeout() (bool, time.Duration) @@ -91,17 +115,37 @@ type CachedDatabaseRegions interface { } type livenessProber struct { - db isql.DB + db *kv.DB + codec keys.SQLCodec + kvWriter bootstrap.KVWriter cachedDBRegions CachedDatabaseRegions settings *clustersettings.Settings } +var probeLivenessTimeout = 15 * time.Second +var testingProbeQueryCallbackFunc func() + +func TestingSetProbeLivenessTimeout(newTimeout time.Duration, probeCallbackFn func()) func() { + oldTimeout := probeLivenessTimeout + probeLivenessTimeout = newTimeout + testingProbeQueryCallbackFunc = probeCallbackFn + return func() { + probeLivenessTimeout = oldTimeout + probeCallbackFn = nil + } +} + // NewLivenessProber creates a new region liveness prober. func NewLivenessProber( - db isql.DB, cachedDBRegions CachedDatabaseRegions, settings *clustersettings.Settings, + db *kv.DB, + codec keys.SQLCodec, + cachedDBRegions CachedDatabaseRegions, + settings *clustersettings.Settings, ) Prober { return &livenessProber{ db: db, + codec: codec, + kvWriter: bootstrap.MakeKVWriter(codec, systemschema.RegionLivenessTable), cachedDBRegions: cachedDBRegions, settings: settings, } @@ -109,25 +153,53 @@ func NewLivenessProber( // ProbeLiveness implements Prober. func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error { + // If region liveness is disabled then nothing to do. + regionLivenessEnabled, _ := l.GetProbeTimeout() + if !regionLivenessEnabled { + return nil + } + // Resolve the physical value for this region. + regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() + foundIdx := -1 + for i := 0; i < regionEnum.NumEnumMembers(); i++ { + if regionEnum.GetMemberLogicalRepresentation(i) == region { + foundIdx = i + break + } + } + if foundIdx == -1 { + return errors.AssertionFailedf("unable to find region %s in region enum", region) + } + return l.ProbeLivenessWithPhysicalRegion(ctx, regionEnum.GetMemberPhysicalRepresentation(foundIdx)) +} + +func (l *livenessProber) ProbeLivenessWithPhysicalRegion( + ctx context.Context, regionBytes []byte, +) error { // If region liveness is disabled then nothing to do. regionLivenessEnabled, tableTimeout := l.GetProbeTimeout() if !regionLivenessEnabled { return nil } - const probeQuery = ` -SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region -` + regionEnumValue := tree.NewDBytes(tree.DBytes(regionBytes)) + // Probe from the SQL instances table to confirm if the region + // is live. err := timeutil.RunWithTimeout(ctx, "probe-liveness", tableTimeout, func(ctx context.Context) error { - return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - _, err := txn.QueryRowEx( - ctx, "probe-sql-instance", txn.KV(), sessiondata.NodeUserSessionDataOverride, - probeQuery, region, - ) + return l.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if testingProbeQueryCallbackFunc != nil { + testingProbeQueryCallbackFunc() + } + instancesTable := systemschema.SQLInstancesTable() + indexPrefix := l.codec.IndexPrefix(uint32(instancesTable.GetID()), uint32(instancesTable.GetPrimaryIndexID())) + regionPrefixBytes, err := keyside.Encode(indexPrefix, regionEnumValue, encoding.Ascending) if err != nil { return err } - return nil + regionPrefix := roachpb.Key(regionPrefixBytes) + regionPrefixEnd := regionPrefix.PrefixEnd() + _, err = txn.Scan(ctx, regionPrefix, regionPrefixEnd, 0) + return err }) }) @@ -137,34 +209,35 @@ SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_in } // Region has gone down, set the unavailable_at time on it - return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - defaultTTL := slinstance.DefaultTTL.Get(&l.settings.SV) - defaultHeartbeat := slinstance.DefaultHeartBeat.Get(&l.settings.SV) + return l.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + defaultTTL := slbase.DefaultTTL.Get(&l.settings.SV) + defaultHeartbeat := slbase.DefaultHeartBeat.Get(&l.settings.SV) // Get the read timestamp and pick a commit deadline. - readTS := txn.KV().ReadTimestamp().AddDuration(defaultHeartbeat) - txnTS := readTS.AddDuration(defaultTTL) - // Insert a new unavailable_at time if one doesn't exist already. - _, err = txn.Exec(ctx, "mark-region-unavailable", txn.KV(), - "INSERT INTO system.region_liveness(crdb_region, unavailable_at) VALUES ($1, $2) "+ - "ON CONFLICT (crdb_region) DO NOTHING", - region, - txnTS.GoTime()) + commitDeadline := txn.ReadTimestamp().AddDuration(defaultHeartbeat) + txnTS := commitDeadline.AddDuration(defaultTTL) + if err := txn.UpdateDeadline(ctx, commitDeadline); err != nil { + return err + } + ba := txn.NewBatch() + // Insert a new unavailable_at time. + err := l.kvWriter.Insert(ctx, ba, false, regionEnumValue, tree.MustMakeDTimestamp(txnTS.GoTime(), time.Microsecond)) if err != nil { return err } - // Transaction has moved the read timestamp forward, - // so force a retry. - if txn.KV().ReadTimestamp().After(readTS) { - return txn.KV().GenerateForcedRetryableErr(ctx, "read timestamp has moved unable to set deadline.") + if err := txn.Run(ctx, ba); err != nil { + // Conditional put failing is fine, since it means someone else + // has marked the region as dead. + if errors.HasType(err, &kvpb.ConditionFailedError{}) { + return nil + } + return err } - return txn.KV().UpdateDeadline(ctx, readTS) + return nil }) - } // QueryLiveness implements Prober. func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRegions, error) { - executor := l.db.Executor() // Database is not multi-region so report a single region. if l.cachedDBRegions == nil || !l.cachedDBRegions.IsMultiRegion() { @@ -182,23 +255,54 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe return regionStatus, nil } // Detect and down regions and remove them. - rows, err := executor.QueryBufferedEx( - ctx, "query-region-liveness", txn, sessiondata.NodeUserSessionDataOverride, - "SELECT crdb_region, unavailable_at FROM system.region_liveness", - ) + unavailableAtRegions, err := l.QueryUnavailablePhysicalRegions(ctx, txn) if err != nil { return nil, err } - for _, row := range rows { - enum, _ := tree.AsDEnum(row[0]) - unavailableAt := tree.MustBeDTimestamp(row[1]) + regionEnum := l.cachedDBRegions.GetRegionEnumTypeDesc() + for i := 0; i < regionEnum.NumEnumMembers(); i++ { + if unavailableAtRegions.ContainsPhysicalRepresentation(string(regionEnum.GetMemberPhysicalRepresentation(i))) { + delete(regionStatus, regionEnum.GetMemberLogicalRepresentation(i)) + } + } + return regionStatus, nil +} + +// QueryUnavailablePhysicalRegions implements Prober. +func (l *livenessProber) QueryUnavailablePhysicalRegions( + ctx context.Context, txn *kv.Txn, +) (UnavailableAtPhysicalRegions, error) { + // Scan the entire region liveness table. + regionLivenessIndex := l.codec.IndexPrefix(uint32(systemschema.RegionLivenessTable.GetID()), uint32(systemschema.RegionLivenessTable.GetPrimaryIndexID())) + keyValues, err := txn.Scan(ctx, regionLivenessIndex, regionLivenessIndex.PrefixEnd(), 0) + if err != nil { + return nil, err + } + // Detect any down regions and remove them. + unavailableAtRegions := make(UnavailableAtPhysicalRegions) + datumAlloc := &tree.DatumAlloc{} + for _, keyValue := range keyValues { + tuple, err := keyValue.Value.GetTuple() + if err != nil { + return nil, err + } + enumDatum, _, err := keyside.Decode(datumAlloc, types.Bytes, keyValue.Key[len(regionLivenessIndex):], encoding.Ascending) + if err != nil { + return nil, err + } + enumBytes := enumDatum.(*tree.DBytes) + ts, _, err := valueside.Decode(datumAlloc, types.Timestamp, tuple) + if err != nil { + return nil, err + } + unavailableAt := ts.(*tree.DTimestamp) // Region is now officially unavailable, so lets remove // it. if txn.ReadTimestamp().GoTime().After(unavailableAt.Time) { - delete(regionStatus, enum.LogicalRep) + unavailableAtRegions[string(*enumBytes)] = struct{}{} } } - return regionStatus, nil + return unavailableAtRegions, nil } // GetProbeTimeout gets maximum timeout waiting on a table before issuing diff --git a/pkg/sql/sqlliveness/slbase/BUILD.bazel b/pkg/sql/sqlliveness/slbase/BUILD.bazel new file mode 100644 index 000000000000..0cdb203b6169 --- /dev/null +++ b/pkg/sql/sqlliveness/slbase/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "slbase", + srcs = ["slbase.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase", + visibility = ["//visibility:public"], + deps = ["//pkg/settings"], +) diff --git a/pkg/sql/sqlliveness/slbase/slbase.go b/pkg/sql/sqlliveness/slbase/slbase.go new file mode 100644 index 000000000000..e454c408beb4 --- /dev/null +++ b/pkg/sql/sqlliveness/slbase/slbase.go @@ -0,0 +1,36 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package slbase + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +var ( + // DefaultTTL specifies the time to expiration when a session is created. + DefaultTTL = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "server.sqlliveness.ttl", + "default sqlliveness session ttl", + 40*time.Second, + settings.NonNegativeDuration, + ) + // DefaultHeartBeat specifies the period between attempts to extend a session. + DefaultHeartBeat = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "server.sqlliveness.heartbeat", + "duration heart beats to push session expiration further out in time", + 5*time.Second, + settings.NonNegativeDuration, + ) +) diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 54a1c7f41f58..e95ab9cc4784 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -6,10 +6,10 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance", visibility = ["//visibility:public"], deps = [ - "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/enum", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/util/grpcutil", "//pkg/util/hlc", @@ -38,6 +38,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/enum", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slbase", "//pkg/sql/sqlliveness/slstorage", "//pkg/testutils", "//pkg/util/hlc", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 324b47148216..56f010474013 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -19,10 +19,10 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -36,25 +36,6 @@ import ( "github.com/cockroachdb/logtags" ) -var ( - // DefaultTTL specifies the time to expiration when a session is created. - DefaultTTL = settings.RegisterDurationSetting( - settings.ApplicationLevel, - "server.sqlliveness.ttl", - "default sqlliveness session ttl", - 40*time.Second, - settings.NonNegativeDuration, - ) - // DefaultHeartBeat specifies the period between attempts to extend a session. - DefaultHeartBeat = settings.RegisterDurationSetting( - settings.ApplicationLevel, - "server.sqlliveness.heartbeat", - "duration heart beats to push session expiration further out in time", - 5*time.Second, - settings.NonNegativeDuration, - ) -) - // Writer provides interactions with the storage of session records. type Writer interface { // Insert stores the input Session. @@ -413,10 +394,10 @@ func NewSQLInstance( stopper: stopper, sessionEvents: sessionEvents, ttl: func() time.Duration { - return DefaultTTL.Get(&settings.SV) + return slbase.DefaultTTL.Get(&settings.SV) }, hb: func() time.Duration { - return DefaultHeartBeat.Get(&settings.SV) + return slbase.DefaultHeartBeat.Get(&settings.SV) }, drain: make(chan struct{}), } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 9e0ee16ffa17..c85e65a414c2 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -44,8 +45,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.Latest.Version(), clusterversion.MinSupported.Version(), true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil) @@ -113,8 +114,8 @@ func TestSQLInstanceRelease(t *testing.T) { clusterversion.Latest.Version(), clusterversion.MinSupported.Version(), true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + slbase.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slbase.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() var ambientCtx log.AmbientContext diff --git a/pkg/sql/sqlliveness/slstorage/BUILD.bazel b/pkg/sql/sqlliveness/slstorage/BUILD.bazel index 68ba572471f2..335b93c5849b 100644 --- a/pkg/sql/sqlliveness/slstorage/BUILD.bazel +++ b/pkg/sql/sqlliveness/slstorage/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/settings/cluster", "//pkg/sql/catalog", "//pkg/sql/catalog/systemschema", + "//pkg/sql/regionliveness", "//pkg/sql/sem/eval", "//pkg/sql/sqlliveness", "//pkg/util/admission/admissionpb", diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder.go b/pkg/sql/sqlliveness/slstorage/key_encoder.go index 3e866a33f3f7..9981d1fa4747 100644 --- a/pkg/sql/sqlliveness/slstorage/key_encoder.go +++ b/pkg/sql/sqlliveness/slstorage/key_encoder.go @@ -22,7 +22,7 @@ import ( // keyCodec manages the SessionID <-> roachpb.Key mapping. type keyCodec interface { - encode(sid sqlliveness.SessionID) (roachpb.Key, error) + encode(sid sqlliveness.SessionID) (roachpb.Key, string, error) decode(key roachpb.Key) (sqlliveness.SessionID, error) // indexPrefix returns the prefix for an encoded key. encode() will return @@ -37,21 +37,21 @@ type rbrEncoder struct { rbrIndex roachpb.Key } -func (e *rbrEncoder) encode(session sqlliveness.SessionID) (roachpb.Key, error) { - region, _, err := UnsafeDecodeSessionID(session) +func (e *rbrEncoder) encode(session sqlliveness.SessionID) (roachpb.Key, string, error) { + region, _, err := SafeDecodeSessionID(session) if err != nil { - return nil, err + return nil, "", err } if len(region) == 0 { - return nil, errors.Newf("legacy session passed to rbr table: '%s'", session.String()) + return nil, "", errors.Newf("legacy session passed to rbr table: '%s'", session.String()) } const columnFamilyID = 0 key := e.indexPrefix() - key = encoding.EncodeBytesAscending(key, region) + key = encoding.EncodeBytesAscending(key, encoding.UnsafeConvertStringToBytes(region)) key = encoding.EncodeBytesAscending(key, session.UnsafeBytes()) - return keys.MakeFamilyKey(key, columnFamilyID), nil + return keys.MakeFamilyKey(key, columnFamilyID), region, nil } func (e *rbrEncoder) decode(key roachpb.Key) (sqlliveness.SessionID, error) { diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder_test.go b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go index b5ccfc61eb90..3bb21675a7ba 100644 --- a/pkg/sql/sqlliveness/slstorage/key_encoder_test.go +++ b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go @@ -49,9 +49,10 @@ func TestKeyEncoder(t *testing.T) { id, err := MakeSessionID(enum.One, uuid.MakeV4()) require.NoError(t, err) - key, err := codec.encode(id) + key, region, err := codec.encode(id) require.NoError(t, err) require.True(t, bytes.HasPrefix(key, codec.indexPrefix())) + require.Equal(t, region, string(enum.One)) decodedID, err := codec.decode(key) require.NoError(t, err) diff --git a/pkg/sql/sqlliveness/slstorage/sessionid.go b/pkg/sql/sqlliveness/slstorage/sessionid.go index 060290ad5e03..b21f9d2fa841 100644 --- a/pkg/sql/sqlliveness/slstorage/sessionid.go +++ b/pkg/sql/sqlliveness/slstorage/sessionid.go @@ -90,3 +90,29 @@ func UnsafeDecodeSessionID(session sqlliveness.SessionID) (region, id []byte, er return rest[:regionLen], rest[regionLen:], nil } + +// SafeDecodeSessionID decodes the region and id from the SessionID. +func SafeDecodeSessionID(session sqlliveness.SessionID) (region, id string, err error) { + if len(session) == legacyLen { + return "", "", errors.Newf("unexpected legacy SessionID format") + } + if len(session) < minimumNonLegacyLen { + // The smallest valid v1 session id is a [version, 1, single_byte_region, uuid...], + // which is three bytes larger than a uuid. + return "", "", errors.New("session id is too short") + } + + // Decode the version. + if session[0] != sessionIDVersion { + return "", "", errors.Newf("invalid session id version: %d", session[0]) + } + regionLen := int(session[1]) + rest := session[2:] + + // Decode and validate the length of the region. + if len(rest) != regionLen+uuid.Size { + return "", "", errors.Newf("session id with length %d is the wrong size to include a region with length %d", len(session), regionLen) + } + + return string(rest[:regionLen]), string(rest[regionLen:]), nil +} diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 7827bad06de1..6e99ac3f4239 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/regionliveness" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -309,15 +310,35 @@ func (s *Storage) deleteOrFetchSession( var deleted bool var prevExpiration hlc.Timestamp ctx = multitenant.WithTenantCostControlExemption(ctx) + livenessProber := regionliveness.NewLivenessProber(s.db, s.codec, nil, s.settings) + k, regionPhysicalRep, err := s.keyCodec.encode(sid) if err := s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Reset captured variable in case of retry. deleted, expiration, prevExpiration = false, hlc.Timestamp{}, hlc.Timestamp{} - - k, err := s.keyCodec.encode(sid) if err != nil { return err } - kv, err := txn.Get(ctx, k) + if unavailableAtRegions, err := livenessProber.QueryUnavailablePhysicalRegions(ctx, txn); err != nil || + unavailableAtRegions.ContainsPhysicalRepresentation(regionPhysicalRep) { + return err + } + execWithTimeout, timeout := livenessProber.GetProbeTimeout() + var kv kv.KeyValue + if execWithTimeout { + // Detect if we fail to a region and force a probe in that + // case. + err = timeutil.RunWithTimeout(ctx, "fetch-session", timeout, func(ctx context.Context) error { + kvInner, err := txn.Get(ctx, k) + kv = kvInner + return err + }) + + if err != nil { + return err + } + } else { + kv, err = txn.Get(ctx, k) + } if err != nil { return err } @@ -343,6 +364,12 @@ func (s *Storage) deleteOrFetchSession( return txn.CommitInBatch(ctx, ba) }); err != nil { + if regionliveness.IsQueryTimeoutErr(err) { + probeErr := livenessProber.ProbeLivenessWithPhysicalRegion(ctx, encoding.UnsafeConvertStringToBytes(regionPhysicalRep)) + if probeErr != nil { + err = errors.WithSecondaryError(err, probeErr) + } + } return false, hlc.Timestamp{}, errors.Wrapf(err, "could not query session id: %s", sid) } @@ -468,7 +495,7 @@ func (s *Storage) Insert( if err := s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { batch := txn.NewBatch() - k, err := s.keyCodec.encode(sid) + k, _, err := s.keyCodec.encode(sid) if err != nil { return err } @@ -492,7 +519,7 @@ func (s *Storage) Update( ) (sessionExists bool, err error) { ctx = multitenant.WithTenantCostControlExemption(ctx) err = s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - k, err := s.keyCodec.encode(sid) + k, _, err := s.keyCodec.encode(sid) if err != nil { return err } @@ -525,7 +552,7 @@ func (s *Storage) Delete(ctx context.Context, session sqlliveness.SessionID) err return s.txn(ctx, func(ctx context.Context, txn *kv.Txn) error { batch := txn.NewBatch() - key, err := s.keyCodec.encode(session) + key, _, err := s.keyCodec.encode(session) if err != nil { return err }