Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
139148: kv/kvnemesis: switch between lease types r=miraradeva a=miraradeva

Fixes #125260.
The main commit from #135044.

This commit adds a new `ChangeSettingOperation` operation class to kvnemesis. It then adds the first variant of the operation, `SetLeaseType`. This operation changes the default range lease type to either expiration, epoch, or leader. This allows us to exercise lease type changes in kvnemesis.

For #133891, we'll want to add a DRT operator which does something similar.

Release note: None

139888: roachtest: deflake follower-reads/mixed-version/survival=region/locality=global/reads=strong r=miraradeva a=miraradeva

Previously, this test failed due to replicas from the same region being considered unhealthy and re-ordered after replicas from other regions (in `transport.splitHealthy()`), resulting in elevated latency of the (cross-region) follower read. The issue is not present in non-mixed-version tests. Unlike the regular tests, the mixed-version tests invoke the test initialization, which includes ensuring upreplication and correct replica placement, at node startup but not before each follower-reads run. So, a node may consider a recently-restarted node unhealthy, and only the latter would have run the initialization steps on startup.

This commit factors out the logic that ensures upreplicasion and correct replica placement, and invokes this logic not just on node startup but before each follower-reads run.

Fixes: #139335
Fixes: #138076
Fixes: #136099
Fixes: #133520

Release note: None

Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
craig[bot] and miraradeva committed Jan 27, 2025
3 parents 35a415a + 0910606 + 3b82faf commit cf9bf57
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 35 deletions.
55 changes: 31 additions & 24 deletions pkg/cmd/roachtest/tests/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,36 @@ func initFollowerReadsDB(
require.NoError(t, err)
}

ensureUpreplicationAndPlacement(ctx, t, l, topology, db)

const rows = 100
const concurrency = 32
sem := make(chan struct{}, concurrency)
data = make(map[int]int64)
insert := func(k int) task.Func {
v := rng.Int63()
data[k] = v
return func(ctx context.Context, _ *logger.Logger) error {
sem <- struct{}{}
defer func() { <-sem }()
_, err := db.ExecContext(ctx, "INSERT INTO mr_db.test VALUES ( $1, $2 )", k, v)
return errors.Wrap(err, "failed to insert data")
}
}

// Insert the data.
g := t.NewGroup(task.WithContext(ctx))
for i := 0; i < rows; i++ {
g.Go(insert(i))
}
g.Wait()

return data
}

func ensureUpreplicationAndPlacement(
ctx context.Context, t test.Test, l *logger.Logger, topology topologySpec, db *gosql.DB,
) {
// Wait until the table has completed up-replication.
l.Printf("waiting for up-replication...")
retryOpts := retry.Options{MaxBackoff: 15 * time.Second}
Expand Down Expand Up @@ -654,30 +684,6 @@ func initFollowerReadsDB(
}
}
}

const rows = 100
const concurrency = 32
sem := make(chan struct{}, concurrency)
data = make(map[int]int64)
insert := func(k int) task.Func {
v := rng.Int63()
data[k] = v
return func(ctx context.Context, _ *logger.Logger) error {
sem <- struct{}{}
defer func() { <-sem }()
_, err := db.ExecContext(ctx, "INSERT INTO mr_db.test VALUES ( $1, $2 )", k, v)
return errors.Wrap(err, "failed to insert data")
}
}

// Insert the data.
g := t.NewGroup(task.WithContext(ctx))
for i := 0; i < rows; i++ {
g.Go(insert(i))
}
g.Wait()

return data
}

func computeFollowerReadDuration(ctx context.Context, db *gosql.DB) (time.Duration, error) {
Expand Down Expand Up @@ -1052,6 +1058,7 @@ func runFollowerReadsMixedVersionTest(
}

runFollowerReads := func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error {
ensureUpreplicationAndPlacement(ctx, t, l, topology, h.Connect(1))
runFollowerReadsTest(ctx, t, l, c, r, topology, rc, data)
return nil
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
o.Result = resultInit(ctx, err)
case *ChangeSettingOperation:
err := changeClusterSettingInEnv(ctx, env, o)
o.Result = resultInit(ctx, err)
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultInit(ctx, err)
Expand Down Expand Up @@ -683,6 +686,39 @@ func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
}
}

func changeClusterSettingInEnv(ctx context.Context, env *Env, op *ChangeSettingOperation) error {
var settings map[string]string
switch op.Type {
case ChangeSettingType_SetLeaseType:
switch op.LeaseType {
case roachpb.LeaseExpiration:
settings = map[string]string{
"kv.lease.expiration_leases_only.enabled": "true",
}
case roachpb.LeaseEpoch:
settings = map[string]string{
"kv.lease.expiration_leases_only.enabled": "false",
"kv.raft.leader_fortification.fraction_enabled": "0.0",
}
case roachpb.LeaseLeader:
settings = map[string]string{
"kv.lease.expiration_leases_only.enabled": "false",
"kv.raft.leader_fortification.fraction_enabled": "1.0",
}
default:
panic(errors.AssertionFailedf(`unknown LeaseType: %v`, op.LeaseType))
}
default:
panic(errors.AssertionFailedf(`unknown ChangeSettingType: %v`, op.Type))
}
for name, val := range settings {
if err := env.SetClusterSetting(ctx, name, val); err != nil {
return err
}
}
return nil
}

func updateZoneConfig(zone *zonepb.ZoneConfig, change ChangeZoneType) {
switch change {
case ChangeZoneType_ToggleGlobalReads:
Expand Down
20 changes: 16 additions & 4 deletions pkg/kv/kvnemesis/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,27 @@ func (e *Env) CheckConsistency(ctx context.Context, span roachpb.Span) []error {
// SetClosedTimestampInterval sets the kv.closed_timestamp.target_duration
// cluster setting to the provided duration.
func (e *Env) SetClosedTimestampInterval(ctx context.Context, d time.Duration) error {
q := fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'`, d)
_, err := e.anyNode().ExecContext(ctx, q)
return err
return e.SetClusterSetting(ctx, "kv.closed_timestamp.target_duration", d.String())
}

// ResetClosedTimestampInterval resets the kv.closed_timestamp.target_duration
// cluster setting to its default value.
func (e *Env) ResetClosedTimestampInterval(ctx context.Context) error {
const q = `SET CLUSTER SETTING kv.closed_timestamp.target_duration TO DEFAULT`
return e.SetClusterSettingToDefault(ctx, "kv.closed_timestamp.target_duration")
}

// SetClusterSetting sets the cluster setting with the provided name to the
// provided value.
func (e *Env) SetClusterSetting(ctx context.Context, name, val string) error {
q := fmt.Sprintf(`SET CLUSTER SETTING %s = '%s'`, name, val)
_, err := e.anyNode().ExecContext(ctx, q)
return err
}

// SetClusterSettingToDefault resets the cluster setting with the provided name
// to its default value.
func (e *Env) SetClusterSettingToDefault(ctx context.Context, name string) error {
q := fmt.Sprintf(`SET CLUSTER SETTING %s TO DEFAULT`, name)
_, err := e.anyNode().ExecContext(ctx, q)
return err
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type OperationConfig struct {
Merge MergeConfig
ChangeReplicas ChangeReplicasConfig
ChangeLease ChangeLeaseConfig
ChangeSetting ChangeSettingConfig
ChangeZone ChangeZoneConfig
}

Expand Down Expand Up @@ -324,6 +325,13 @@ type ChangeLeaseConfig struct {
TransferLease int
}

// ChangeSettingConfig configures the relative probability of generating a
// cluster setting change operation.
type ChangeSettingConfig struct {
// SetLeaseType changes the default range lease type.
SetLeaseType int
}

// ChangeZoneConfig configures the relative probability of generating a zone
// configuration change operation.
type ChangeZoneConfig struct {
Expand Down Expand Up @@ -443,6 +451,9 @@ func newAllOperationsConfig() GeneratorConfig {
ChangeLease: ChangeLeaseConfig{
TransferLease: 1,
},
ChangeSetting: ChangeSettingConfig{
SetLeaseType: 1,
},
ChangeZone: ChangeZoneConfig{
ToggleGlobalReads: 1,
},
Expand Down Expand Up @@ -683,6 +694,7 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
transferLeaseFn := makeTransferLeaseFn(key, append(voters, nonVoters...))
addOpGen(&allowed, transferLeaseFn, g.Config.Ops.ChangeLease.TransferLease)

addOpGen(&allowed, setLeaseType, g.Config.Ops.ChangeSetting.SetLeaseType)
addOpGen(&allowed, toggleGlobalReads, g.Config.Ops.ChangeZone.ToggleGlobalReads)

return step(g.selectOp(rng, allowed))
Expand Down Expand Up @@ -1452,6 +1464,14 @@ func makeTransferLeaseFn(key string, current []roachpb.ReplicationTarget) opGenF
}
}

func setLeaseType(_ *generator, rng *rand.Rand) Operation {
leaseTypes := roachpb.TestingAllLeaseTypes()
leaseType := leaseTypes[rng.Intn(len(leaseTypes))]
op := changeSetting(ChangeSettingType_SetLeaseType)
op.ChangeSetting.LeaseType = leaseType
return op
}

func toggleGlobalReads(_ *generator, _ *rand.Rand) Operation {
return changeZone(ChangeZoneType_ToggleGlobalReads)
}
Expand Down Expand Up @@ -1928,6 +1948,10 @@ func transferLease(key string, target roachpb.StoreID) Operation {
return Operation{TransferLease: &TransferLeaseOperation{Key: []byte(key), Target: target}}
}

func changeSetting(changeType ChangeSettingType) Operation {
return Operation{ChangeSetting: &ChangeSettingOperation{Type: changeType}}
}

func changeZone(changeType ChangeZoneType) Operation {
return Operation{ChangeZone: &ChangeZoneOperation{Type: changeType}}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ func TestRandStep(t *testing.T) {
}
case *TransferLeaseOperation:
counts.ChangeLease.TransferLease++
case *ChangeSettingOperation:
switch o.Type {
case ChangeSettingType_SetLeaseType:
counts.ChangeSetting.SetLeaseType++
}
case *ChangeZoneOperation:
switch o.Type {
case ChangeZoneType_ToggleGlobalReads:
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *TransferLeaseOperation:
return &o.Result
case *ChangeSettingOperation:
return &o.Result
case *ChangeZoneOperation:
return &o.Result
case *BatchOperation:
Expand Down Expand Up @@ -144,6 +146,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *TransferLeaseOperation:
o.format(w, fctx)
case *ChangeSettingOperation:
o.format(w, fctx)
case *ChangeZoneOperation:
o.format(w, fctx)
case *BatchOperation:
Expand Down Expand Up @@ -393,6 +397,16 @@ func (op TransferLeaseOperation) format(w *strings.Builder, fctx formatCtx) {
op.Result.format(w)
}

func (op ChangeSettingOperation) format(w *strings.Builder, fctx formatCtx) {
switch op.Type {
case ChangeSettingType_SetLeaseType:
fmt.Fprintf(w, `env.SetClusterSetting(ctx, %s, %s)`, op.Type, op.LeaseType)
default:
panic(errors.AssertionFailedf(`unknown ChangeSettingType: %v`, op.Type))
}
op.Result.format(w)
}

func (op ChangeZoneOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `env.UpdateZoneConfig(ctx, %s)`, op.Type)
op.Result.format(w)
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ message TransferLeaseOperation {
Result result = 3 [(gogoproto.nullable) = false];
}

enum ChangeSettingType {
SetLeaseType = 0;
}

message ChangeSettingOperation {
ChangeSettingType type = 1;
int32 lease_type = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.LeaseType"];
Result result = 3 [(gogoproto.nullable) = false];
}

enum ChangeZoneType {
ToggleGlobalReads = 0;
}
Expand Down Expand Up @@ -165,12 +175,13 @@ message Operation {
MergeOperation merge = 14;
ChangeReplicasOperation change_replicas = 15;
TransferLeaseOperation transfer_lease = 16;
ChangeZoneOperation change_zone = 17;
AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"];
SavepointCreateOperation savepoint_create = 19;
SavepointReleaseOperation savepoint_release = 20;
SavepointRollbackOperation savepoint_rollback = 21;
BarrierOperation barrier = 22;
ChangeSettingOperation change_setting = 17;
ChangeZoneOperation change_zone = 18;
AddSSTableOperation add_sstable = 19 [(gogoproto.customname) = "AddSSTable"];
SavepointCreateOperation savepoint_create = 20;
SavepointReleaseOperation savepoint_release = 21;
SavepointRollbackOperation savepoint_rollback = 22;
BarrierOperation barrier = 23;
}

enum ResultType {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,14 @@ func (v *validator) processOp(op Operation) {
if !transferLeaseResultIsIgnorable(t.Result) {
v.failIfError(op, t.Result) // fail on all other errors
}
case *ChangeSettingOperation:
execTimestampStrictlyOptional = true
// It's possible that reading the modified setting times out. Ignore these
// errors for now, at least until we do some validation that depends on the
// cluster settings being fully propagated.
if !resultIsErrorStr(t.Result, `setting updated but timed out waiting to read new value`) {
v.failIfError(op, t.Result)
}
case *ChangeZoneOperation:
execTimestampStrictlyOptional = true
v.failIfError(op, t.Result) // fail on all errors
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ func (l Lease) OwnedBy(storeID StoreID) bool {
// LeaseType describes the type of lease.
//
//go:generate stringer -type=LeaseType
type LeaseType int
type LeaseType int32

const (
// LeaseNone specifies no lease, to be used as a default value.
Expand Down

0 comments on commit cf9bf57

Please sign in to comment.