From 728eeeb86d31f70b9e4e6fcc5e6fdb7664284f00 Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Mon, 27 Jan 2025 10:35:48 -0500 Subject: [PATCH] roachtest: update backup fixtures Previously, backup fixture creation was a manual process. Now, fixture creation will run as a roachtest and use the fixture registry utility added by #140368. Fixture generation was reworked to use import instead of old backup fixtures. This makes it trivial to bootstrap fixtures in new clouds and in new fixture buckets. Release note: none Part of: #139159 --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/backup.go | 62 ++- pkg/cmd/roachtest/tests/backup_fixtures.go | 528 ++++++++++----------- pkg/cmd/roachtest/tests/online_restore.go | 4 +- pkg/cmd/roachtest/tests/restore.go | 13 +- pkg/roachprod/blobfixture/registry.go | 7 +- 6 files changed, 324 insertions(+), 291 deletions(-) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index c04723d76b6a..8ec2527cbeed 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -244,6 +244,7 @@ go_library( "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/roachprod", + "//pkg/roachprod/blobfixture", "//pkg/roachprod/config", "//pkg/roachprod/errors", "//pkg/roachprod/install", diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index a4e967439479..af57069bd908 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -928,6 +928,45 @@ func getAWSKMSAssumeRoleURI() (string, error) { return correctURI, nil } +func withAuthParameters(uri url.URL) (url.URL, error) { + q := uri.Query() + switch uri.Scheme { + case "gs": + expect := map[string]string{ + AssumeRoleGCSCredentials: gcp.CredentialsParam, + AssumeRoleGCSServiceAccount: gcp.AssumeRoleParam, + } + for env, param := range expect { + v := os.Getenv(env) + if v == "" { + return url.URL{}, errors.Newf("env variable %s must be present to run the KMS test", env) + } + // Nightly env uses JSON credentials, which have to be base64 encoded. + if param == gcp.CredentialsParam { + v = base64.StdEncoding.EncodeToString([]byte(v)) + } + q.Add(param, v) + } + + // Get GCP Key name from env variable. + keyName := os.Getenv(KMSKeyNameAEnvVar) + if keyName == "" { + return url.URL{}, errors.Newf("env variable %s must be present to run the KMS test", KMSKeyNameAEnvVar) + } + + // Set AUTH to specified + q.Add(cloudstorage.AuthParam, cloudstorage.AuthParamSpecified) + case "s3": + + default: + return url.URL{}, errors.Newf("unsupported scheme %s", uri.Scheme) + } + + uri.RawQuery = q.Encode() + + return uri, nil +} + func getGCSKMSURI(keyIDEnvVariable string) (string, error) { q := make(url.Values) expect := map[string]string{ @@ -985,7 +1024,7 @@ func getGCSKMSAssumeRoleURI() (string, error) { return correctURI, nil } -func getGCSBackupPath(dest string) (string, error) { +func getGCSAuth() (url.Values, error) { q := make(url.Values) expect := map[string]string{ AssumeRoleGCSCredentials: gcp.CredentialsParam, @@ -994,7 +1033,7 @@ func getGCSBackupPath(dest string) (string, error) { for env, param := range expect { v := os.Getenv(env) if v == "" { - return "", errors.Errorf("env variable %s must be present to run the assume role test", env) + return nil, errors.Errorf("env variable %s must be present to run the assume role test", env) } // Nightly env uses JSON credentials, which have to be base64 encoded. @@ -1003,6 +1042,14 @@ func getGCSBackupPath(dest string) (string, error) { } q.Add(param, v) } + return q, nil +} + +func getGCSBackupPath(dest string) (string, error) { + q, err := getGCSAuth() + if err != nil { + return "", err + } // Set AUTH to specified q.Add(cloudstorage.AuthParam, cloudstorage.AuthParamSpecified) @@ -1011,7 +1058,7 @@ func getGCSBackupPath(dest string) (string, error) { return uri, nil } -func getAWSBackupPath(dest string) (string, error) { +func getS3Auth() (url.Values, error) { q := make(url.Values) expect := map[string]string{ AssumeRoleAWSKeyIDEnvVar: amazon.AWSAccessKeyParam, @@ -1022,11 +1069,18 @@ func getAWSBackupPath(dest string) (string, error) { for env, param := range expect { v := os.Getenv(env) if v == "" { - return "", errors.Errorf("env variable %s must be present to run the KMS test", env) + return nil, errors.Errorf("env variable %s must be present to run S3 tests", env) } q.Add(param, v) } q.Add(cloudstorage.AuthParam, cloudstorage.AuthParamSpecified) + return q, nil +} +func getAWSBackupPath(dest string) (string, error) { + q, err := getS3Auth() + if err != nil { + return "", err + } return fmt.Sprintf("s3://"+backupTestingBucket+"/%s?%s", dest, q.Encode()), nil } diff --git a/pkg/cmd/roachtest/tests/backup_fixtures.go b/pkg/cmd/roachtest/tests/backup_fixtures.go index f526b2903f9b..9757d43c907b 100644 --- a/pkg/cmd/roachtest/tests/backup_fixtures.go +++ b/pkg/cmd/roachtest/tests/backup_fixtures.go @@ -8,141 +8,139 @@ package tests import ( "context" "fmt" + "net/url" + "path" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod" + "github.com/cockroachdb/cockroach/pkg/roachprod/blobfixture" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -func makeBackupFixtureSpecs(override scheduledBackupSpecs) scheduledBackupSpecs { - backupSpecs := makeBackupSpecs(override.backupSpecs, defaultBackupFixtureSpecs.backupSpecs) - specs := scheduledBackupSpecs{ - backupSpecs: backupSpecs, - incrementalBackupCrontab: defaultBackupFixtureSpecs.incrementalBackupCrontab, - } - if override.incrementalBackupCrontab != "" { - specs.incrementalBackupCrontab = override.incrementalBackupCrontab - } - specs.ignoreExistingBackups = override.ignoreExistingBackups - return specs +type TpccFixture struct { + Name string + ImportWarehouses int + WorkloadWarehouses int + MinutesPerIncremental int + IncrementalChainLength int + RestoredSizeEstimate string } -// defaultBackupFixtureSpecs defines the default scheduled backup used to create a fixture. -var defaultBackupFixtureSpecs = scheduledBackupSpecs{ - // Runs an incremental backup every 5 minutes. - incrementalBackupCrontab: "*/5 * * * *", - - // The default option of false prevents roachtest users from overriding the - // latest backup in a collection, which may be used in restore roachtests. - ignoreExistingBackups: false, - - backupSpecs: backupSpecs{ - version: "v23.1.11", - cloud: spec.AWS, - fullBackupDir: "LATEST", - numBackupsInChain: 48, - workload: tpceRestore{ - customers: 25000, - }, - }, +// TinyFixture is a TPCC fixture that is intended for smoke tests, local +// testing, and continous testing of the fixture generation logic. +var TinyFixture = TpccFixture{ + Name: "tpcc-10", + ImportWarehouses: 10, + WorkloadWarehouses: 10, + IncrementalChainLength: 4, + RestoredSizeEstimate: "700MiB", } -const scheduleLabel = "schedule_cluster" - -// fixtureFromMasterVersion should be used in the backupSpecs version field to -// create a fixture using the bleeding edge of master. In the backup fixture -// path on external storage, the {version} subdirectory will be equal to this -// value. -const fixtureFromMasterVersion = "latest" +// SmallFixture is a TPCC fixture that is intended to be quick to restore and +// cheap to generate for continous testing of the fixture generation logic. +var SmallFixture = TpccFixture{ + Name: "tpcc-5k", + ImportWarehouses: 5000, + WorkloadWarehouses: 1000, + IncrementalChainLength: 48, + RestoredSizeEstimate: "350GiB", +} -type scheduledBackupSpecs struct { - backupSpecs - // ignoreExistingBackups if set to true, will allow a new backup chain - // to get written to an already existing backup collection. - ignoreExistingBackups bool - incrementalBackupCrontab string +// MediumFixture is a TPCC fixture sized so that it is a tight fit in 3 nodes +// with the smallest supported node size of 4 VCPU per node. +var MediumFixture = TpccFixture{ + Name: "tpcc-30k", + ImportWarehouses: 30000, + WorkloadWarehouses: 10000, + IncrementalChainLength: 400, + RestoredSizeEstimate: "2TiB", } -func (sbs scheduledBackupSpecs) scheduledBackupCmd() string { - // This backup schedule will first run a full backup immediately and then the - // incremental backups at the given incrementalBackupCrontab cadence until the user cancels the - // backup schedules. To ensure that only one full backup chain gets created, - // begin the backup schedule at the beginning of the week, as a new full - // backup will get created on Sunday at Midnight ;) - options := "" - if !sbs.nonRevisionHistory { - options = "WITH revision_history" - } - backupCmd := fmt.Sprintf(`BACKUP INTO %s %s`, sbs.backupCollection(), options) - cmd := fmt.Sprintf(`CREATE SCHEDULE %s FOR %s RECURRING '%s' FULL BACKUP '@weekly' WITH SCHEDULE OPTIONS first_run = 'now'`, - scheduleLabel, backupCmd, sbs.incrementalBackupCrontab) - if sbs.ignoreExistingBackups { - cmd = cmd + ",ignore_existing_backups" - } - return cmd +// LargeFixture is a TPCC fixture sized so that it is a tight fit in 3 nodes +// with the maximum supported node density of 8 TiB storage per node. If the +// node storage density increases, then the size of this fixture should be +// increased. +var LargeFixture = TpccFixture{ + Name: "tpcc-300k", + ImportWarehouses: 300000, + WorkloadWarehouses: 100, + IncrementalChainLength: 400, + RestoredSizeEstimate: "20TiB", } type backupFixtureSpecs struct { // hardware specifies the roachprod specs to create the scheduledBackupSpecs fixture on. hardware hardwareSpecs - // scheduledBackupSpecs specifies the scheduled scheduledBackupSpecs fixture which will be created. - scheduledBackupSpecs scheduledBackupSpecs - - // initWorkloadViaRestore, if specified, initializes the cluster via restore - // of an older fixture. The fields specified here will override any fields - // specified in the scheduledBackupSpecs field above. - initWorkloadViaRestore *restoreSpecs + fixture TpccFixture timeout time.Duration // A no-op, used only to set larger timeouts due roachtests limiting timeouts based on the suite suites registry.SuiteSet - testName string + clouds []spec.Cloud // If non-empty, the test will be skipped with the supplied reason. skip string } -func (bf *backupFixtureSpecs) initTestName() { - bf.testName = fmt.Sprintf("backupFixture/%s/revision-history=%t/%s", bf.scheduledBackupSpecs.workload.String(), !bf.scheduledBackupSpecs.nonRevisionHistory, bf.scheduledBackupSpecs.cloud) +const scheduleLabel = "tpcc_backup" + +// fixtureFromMasterVersion should be used in the backupSpecs version field to +// create a fixture using the bleeding edge of master. In the backup fixture +// path on external storage, the {version} subdirectory will be equal to this +// value. +const fixtureFromMasterVersion = "latest" + +type scheduledBackupSpecs struct { + backupSpecs + // ignoreExistingBackups if set to true, will allow a new backup chain + // to get written to an already existing backup collection. + ignoreExistingBackups bool + incrementalBackupCrontab string } -func makeBackupDriver(t test.Test, c cluster.Cluster, sp backupFixtureSpecs) backupDriver { - return backupDriver{ - t: t, - c: c, - sp: sp, - } +func CreateScheduleStatement(uri url.URL) string { + // This backup schedule will first run a full backup immediately and then the + // incremental backups at the given incrementalBackupCrontab cadence until + // the user cancels the backup schedules. To ensure that only one full backup + // chain gets created, schedule the full back up on backup will get created + // on Sunday at Midnight ;) + statement := fmt.Sprintf( + `CREATE SCHEDULE IF NOT EXISTS "%s" +FOR BACKUP DATABASE tpcc +INTO '%s' +RECURRING '* * * * *' +FULL BACKUP '@weekly' +WITH SCHEDULE OPTIONS first_run = 'now', ignore_existing_backups; +`, scheduleLabel, uri.String()) + return statement } type backupDriver struct { - sp backupFixtureSpecs - - t test.Test - c cluster.Cluster + sp backupFixtureSpecs + t test.Test + c cluster.Cluster + version *clusterupgrade.Version + fixture blobfixture.FixtureMetadata + registry *blobfixture.Registry } func (bd *backupDriver) prepareCluster(ctx context.Context) { - if err := bd.sp.scheduledBackupSpecs.CloudIsCompatible(bd.c.Cloud()); err != nil { - bd.t.Skip(err.Error()) - } - version := clusterupgrade.CurrentVersion() - if bd.sp.scheduledBackupSpecs.version != fixtureFromMasterVersion { - version = clusterupgrade.MustParseVersion(bd.sp.scheduledBackupSpecs.version) - } - bd.t.L().Printf("Creating cluster with version %s", version) + bd.t.L().Printf("Creating cluster with version %s", bd.version) binaryPath, err := clusterupgrade.UploadCockroach(ctx, bd.t, bd.t.L(), bd.c, - bd.sp.hardware.getCRDBNodes(), version) + bd.sp.hardware.getCRDBNodes(), bd.version) require.NoError(bd.t, err) require.NoError(bd.t, clusterupgrade.StartWithSettings(ctx, bd.t.L(), bd.c, @@ -150,75 +148,78 @@ func (bd *backupDriver) prepareCluster(ctx context.Context) { option.NewStartOpts(option.NoBackupSchedule, option.DisableWALFailover), install.BinaryOption(binaryPath))) - bd.assertCorrectCockroachBinary(ctx) - if !bd.sp.scheduledBackupSpecs.ignoreExistingBackups { - // This check allows the roachtest to fail fast, instead of when the - // scheduled backup cmd is issued. - require.False(bd.t, bd.checkForExistingBackupCollection(ctx), fmt.Sprintf("existing backup in collection %s", bd.sp.scheduledBackupSpecs.backupCollection())) - } -} - -// checkForExistingBackupCollection returns true if there exists a backup in the collection path. -func (bd *backupDriver) checkForExistingBackupCollection(ctx context.Context) bool { - collectionQuery := fmt.Sprintf(`SELECT count(*) FROM [SHOW BACKUPS IN %s]`, - bd.sp.scheduledBackupSpecs.backupCollection()) conn := bd.c.Conn(ctx, bd.t.L(), 1) - sql := sqlutils.MakeSQLRunner(conn) - var collectionCount int - sql.QueryRow(bd.t, collectionQuery).Scan(&collectionCount) - return collectionCount > 0 -} -func (bd *backupDriver) assertCorrectCockroachBinary(ctx context.Context) { - binaryQuery := "SELECT value FROM crdb_internal.node_build_info WHERE field = 'Version'" - conn := bd.c.Conn(ctx, bd.t.L(), 1) - sql := sqlutils.MakeSQLRunner(conn) - var binaryVersion string - sql.QueryRow(bd.t, binaryQuery).Scan(&binaryVersion) - if bd.sp.scheduledBackupSpecs.version != fixtureFromMasterVersion { - require.Equal(bd.t, bd.sp.scheduledBackupSpecs.version, binaryVersion, "cluster not running on expected binary") - } else { - require.Contains(bd.t, binaryVersion, "dev") - } + // Work around an issue with import where large imports can bottlneck on + // snapshots. + _, err = conn.Exec("set cluster setting kv.snapshot_rebalance.max_rate = '256 MiB'") + require.NoError(bd.t, err) } func (bd *backupDriver) initWorkload(ctx context.Context) { - if bd.sp.initWorkloadViaRestore == nil { - bd.t.L().Printf(`Initializing workload`) - bd.sp.scheduledBackupSpecs.workload.init(ctx, bd.t, bd.c, bd.sp.hardware) - return - } - computedRestoreSpecs := restoreSpecs{ - hardware: bd.sp.hardware, - backup: makeBackupSpecs(bd.sp.initWorkloadViaRestore.backup, bd.sp.scheduledBackupSpecs.backupSpecs), - restoreUptoIncremental: bd.sp.initWorkloadViaRestore.restoreUptoIncremental, - } - restoreDriver := makeRestoreDriver(bd.t, bd.c, computedRestoreSpecs) - bd.t.L().Printf(`Initializing workload via restore`) - restoreDriver.getAOST(ctx) - // Only restore the database because a cluster restore will also restore the - // scheduled_jobs system table, which will automatically begin any backed up - // backup schedules, which complicates fixture generation. - target := fmt.Sprintf("DATABASE %s", restoreDriver.sp.backup.workload.DatabaseName()) - require.NoError(bd.t, restoreDriver.run(ctx, target)) + bd.t.L().Printf("importing tpcc with %d warehouses", bd.sp.fixture.ImportWarehouses) + + urls, err := bd.c.InternalPGUrl(ctx, bd.t.L(), bd.c.Node(1), roachprod.PGURLOptions{}) + require.NoError(bd.t, err) + + cmd := roachtestutil.NewCommand("./cockroach workload fixtures import tpcc"). + Arg("%q", urls[0]). + Option("checks=false"). + Flag("warehouses", bd.sp.fixture.ImportWarehouses). + String() + + bd.c.Run(ctx, option.WithNodes(bd.c.WorkloadNode()), cmd) } -func (bd *backupDriver) runWorkload(ctx context.Context) error { - return bd.sp.scheduledBackupSpecs.workload.run(ctx, bd.t, bd.c, bd.sp.hardware) +func (bd *backupDriver) runWorkload(ctx context.Context) (func(), error) { + bd.t.L().Printf("starting tpcc workload against %d", bd.sp.fixture.WorkloadWarehouses) + + workloadCtx, workloadCancel := context.WithCancel(ctx) + m := bd.c.NewMonitor(workloadCtx) + m.Go(func(ctx context.Context) error { + cmd := roachtestutil.NewCommand("./cockroach workload run tpcc"). + Arg("{pgurl%s}", bd.c.CRDBNodes()). + Option("tolerate-errors=true"). + Flag("ramp", "1m"). + Flag("warehouses", bd.sp.fixture.WorkloadWarehouses). + String() + err := bd.c.RunE(ctx, option.WithNodes(bd.c.WorkloadNode()), cmd) + if err != nil && ctx.Err() == nil { + return err + } + // We expect the workload to return a context cancelled error because + // the roachtest driver cancels the monitor's context after the backup + // schedule completes. + if err != nil && ctx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + bd.t.L().Printf("workload successfully finished") + return nil + }) + + return func() { + workloadCancel() + m.Wait() + }, nil } // scheduleBackups begins the backup schedule. func (bd *backupDriver) scheduleBackups(ctx context.Context) { + bd.t.L().Printf("creating backup schedule", bd.sp.fixture.WorkloadWarehouses) + + createScheduleStatement := CreateScheduleStatement(bd.registry.URI(bd.fixture.DataPath)) conn := bd.c.Conn(ctx, bd.t.L(), 1) - sql := sqlutils.MakeSQLRunner(conn) - sql.Exec(bd.t, bd.sp.scheduledBackupSpecs.scheduledBackupCmd()) + _, err := conn.Exec(createScheduleStatement) + require.NoError(bd.t, err) } // monitorBackups pauses the schedule once the target number of backups in the // chain have been taken. func (bd *backupDriver) monitorBackups(ctx context.Context) { - conn := bd.c.Conn(ctx, bd.t.L(), 1) - sql := sqlutils.MakeSQLRunner(conn) + sql := sqlutils.MakeSQLRunner(bd.c.Conn(ctx, bd.t.L(), 1)) + fixtureURI := bd.registry.URI(bd.fixture.DataPath) for { time.Sleep(1 * time.Minute) var activeScheduleCount int @@ -229,10 +230,10 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) { continue } var backupCount int - backupCountQuery := fmt.Sprintf(`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN %s]`, bd.sp.scheduledBackupSpecs.backupCollection()) + backupCountQuery := fmt.Sprintf(`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']`, fixtureURI.String()) sql.QueryRow(bd.t, backupCountQuery).Scan(&backupCount) bd.t.L().Printf(`%d scheduled backups taken`, backupCount) - if backupCount >= bd.sp.scheduledBackupSpecs.numBackupsInChain { + if backupCount >= bd.sp.fixture.IncrementalChainLength { pauseSchedulesQuery := fmt.Sprintf(`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'`, scheduleLabel) sql.QueryRow(bd.t, pauseSchedulesQuery) break @@ -240,168 +241,141 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) { } } +func newFixtureRegistry(ctx context.Context, t test.Test, c cluster.Cluster) *blobfixture.Registry { + // TODO(jeffswenson): use the assume role support from `tests/backup.go` + // TODO(jeffswenson): update permissions on AWS to grant the backup testing + // role access to fixtures. + // TODO(jeffswenson): make sure the fixture registry removes explicit + // credentials from the URI when logging. + var uri url.URL + switch c.Cloud() { + case spec.AWS: + require.NoError(t, err) + uri = url.URL{ + Scheme: "s3", + Host: "cockroach-fixtures-us-east-2", + RawQuery: "AUTH=implicit", + } + case spec.GCE: + auth, err := getGCSAuth() + require.NoError(t, err) + uri = url.URL{ + Scheme: "gs", + Host: "cockroach-fixtures-us-east1", + RawQuery: auth.Encode(), + } + case spec.Local: + uri = url.URL{ + Scheme: "gs", + Host: "cockroach-fixtures-us-east1", + RawQuery: url.Values{ + "AUTH": []string{"implicit"}, + "ASSUME_ROLE": []string{"backup-testing@cockroach-ephemeral.iam.gserviceaccount.com"}, + }.Encode(), + } + default: + t.Fatalf("fixtures not supported on %s", c.Cloud()) + } + + uri.Path = path.Join(uri.Path, "roachtest/v25.1") + + registry, err := blobfixture.NewRegistry(ctx, uri) + require.NoError(t, err) + + return registry +} + func registerBackupFixtures(r registry.Registry) { - for _, bf := range []backupFixtureSpecs{ + specs := []backupFixtureSpecs{ { - // 400GB backup fixture with 48 incremental layers. This is used by - // - restore/tpce/400GB/aws/inc-count=48/nodes=4/cpus=8 - // - restore/tpce/400GB/aws/nodes=4/cpus=16 - // - restore/tpce/400GB/aws/nodes=4/cpus=8 - // - restore/tpce/400GB/aws/nodes=8/cpus=8 - hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{ - version: fixtureFromMasterVersion}, + fixture: TinyFixture, + hardware: makeHardwareSpecs(hardwareSpecs{ + workloadNode: true, }), - timeout: 5 * time.Hour, - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{version: "v22.2.0", numBackupsInChain: 48}, - restoreUptoIncremental: 48, - }, - skip: "only for fixture generation", - suites: registry.Suites(registry.Nightly), - }, - { - // 400GB backup fixture, no revision history, with 48 incremental layers. - // This will used by the online restore roachtests. During 24.2 - // development, we can use it to enable OR of incremental backups. - hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{ - version: fixtureFromMasterVersion, - nonRevisionHistory: true, - }, - }), - timeout: 5 * time.Hour, - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{ - version: fixtureFromMasterVersion, - numBackupsInChain: 48, - }, - restoreUptoIncremental: 12, - }, - skip: "only for fixture generation", - suites: registry.Suites(registry.Nightly), + timeout: 2 * time.Hour, + suites: registry.Suites(registry.Nightly), + clouds: []spec.Cloud{spec.AWS, spec.GCE, spec.Local}, }, { - // 8TB backup fixture, no revision history, with 48 incremental layers. - // This will used by the online restore roachtests. During 24.2 - // development, we can use it to enable OR of incremental backups. - hardware: makeHardwareSpecs(hardwareSpecs{nodes: 10, volumeSize: 1500, workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{ - version: fixtureFromMasterVersion, - nonRevisionHistory: true, - workload: tpceRestore{customers: 500000}, - }, + fixture: SmallFixture, + hardware: makeHardwareSpecs(hardwareSpecs{ + workloadNode: true, }), - timeout: 23 * time.Hour, - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{ - version: "v23.1.11", - numBackupsInChain: 48, - nonRevisionHistory: true, - }, - restoreUptoIncremental: 12, - }, - skip: "only for fixture generation", - suites: registry.Suites(registry.Weekly), - }, - { - // 8TB Backup Fixture. - hardware: makeHardwareSpecs(hardwareSpecs{nodes: 10, volumeSize: 2000, workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{ - version: fixtureFromMasterVersion, - workload: tpceRestore{customers: 500000}}}), - timeout: 25 * time.Hour, - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{version: "v22.2.1", numBackupsInChain: 48}, - restoreUptoIncremental: 48, - }, - // Use weekly to allow an over 24 hour timeout. - suites: registry.Suites(registry.Weekly), - skip: "only for fixture generation", - }, - { - // Default Fixture, Run on GCE. Initiated by the tpce --init. - hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{cloud: spec.GCE}}), - // TODO(radu): this should be OnlyGCE. + timeout: 2 * time.Hour, suites: registry.Suites(registry.Nightly), - timeout: 5 * time.Hour, - skip: "only for fixture generation", + clouds: []spec.Cloud{spec.AWS, spec.GCE}, }, { - // 32TB Backup Fixture. - hardware: makeHardwareSpecs(hardwareSpecs{nodes: 15, cpus: 16, volumeSize: 5000, workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{workload: tpceRestore{customers: 2000000}}}), - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{version: "v22.2.1", numBackupsInChain: 48}, - restoreUptoIncremental: 48, - }, - // Use weekly to allow an over 24 hour timeout. + fixture: MediumFixture, + hardware: makeHardwareSpecs(hardwareSpecs{ + workloadNode: true, + nodes: 9, + cpus: 16, + }), + timeout: 24 * time.Hour, suites: registry.Suites(registry.Weekly), - timeout: 48 * time.Hour, - skip: "only for fixture generation", + clouds: []spec.Cloud{spec.AWS, spec.GCE}, }, { - hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}), - scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{ - backupSpecs: backupSpecs{ - workload: tpccRestore{opts: tpccRestoreOptions{warehouses: 5000}}, - nonRevisionHistory: true, - }, + fixture: LargeFixture, + hardware: makeHardwareSpecs(hardwareSpecs{ + workloadNode: true, + nodes: 9, + cpus: 32, + volumeSize: 4000, }), - initWorkloadViaRestore: &restoreSpecs{ - backup: backupSpecs{version: "v23.1.1", numBackupsInChain: 48}, - restoreUptoIncremental: 48, - }, - timeout: 1 * time.Hour, - suites: registry.Suites(registry.Nightly), - skip: "only for fixture generation", + timeout: 24 * time.Hour, + suites: registry.Suites(registry.Weekly), + // The large fixture is only generated on GCE to reduce the cost of + // storing the fixtures. + clouds: []spec.Cloud{spec.GCE}, }, - } { + } + for _, bf := range specs { bf := bf - bf.initTestName() + clusterSpec := bf.hardware.makeClusterSpecs(r) r.Add(registry.TestSpec{ - Name: bf.testName, + Name: fmt.Sprintf( + "backupFixture/tpcc/warehouses=%d/incrementals=%d", + bf.fixture.ImportWarehouses, bf.fixture.IncrementalChainLength, + ), Owner: registry.OwnerDisasterRecovery, - Cluster: bf.hardware.makeClusterSpecs(r, bf.scheduledBackupSpecs.cloud), + Cluster: clusterSpec, Timeout: bf.timeout, EncryptionSupport: registry.EncryptionMetamorphic, - CompatibleClouds: bf.scheduledBackupSpecs.CompatibleClouds(), + CompatibleClouds: registry.Clouds(bf.clouds...), Suites: bf.suites, Skip: bf.skip, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - - bd := makeBackupDriver(t, c, bf) + registry := newFixtureRegistry(ctx, t, c) + + // Piggy back on fixture generation to run the GC. Run the GC first to + // bound the number of fixtures leaked if fixture creation is broken. + require.NoError(t, registry.GC(ctx, t.L())) + + handle, err := registry.Create(ctx, bf.fixture.Name, t.L()) + require.NoError(t, err) + + bd := backupDriver{ + t: t, + c: c, + sp: bf, + version: clusterupgrade.CurrentVersion(), + fixture: handle.Metadata(), + registry: registry, + } bd.prepareCluster(ctx) bd.initWorkload(ctx) - workloadCtx, workloadCancel := context.WithCancel(ctx) - m := c.NewMonitor(workloadCtx) - defer func() { - workloadCancel() - m.Wait() - }() - m.Go(func(ctx context.Context) error { - err := bd.runWorkload(ctx) - // We expect the workload to return a context cancelled error because - // the roachtest driver cancels the monitor's context after the backup - // schedule completes. - if err != nil && ctx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned a - // different error. - return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) - } - bd.t.L().Printf("workload successfully finished") - return nil - }) + stopWorkload, err := bd.runWorkload(ctx) + require.NoError(t, err) + bd.scheduleBackups(ctx) bd.monitorBackups(ctx) + + stopWorkload() + + require.NoError(t, handle.SetReadyAt(ctx)) }, }) } diff --git a/pkg/cmd/roachtest/tests/online_restore.go b/pkg/cmd/roachtest/tests/online_restore.go index a3316088cb3c..082b3ca6f2d4 100644 --- a/pkg/cmd/roachtest/tests/online_restore.go +++ b/pkg/cmd/roachtest/tests/online_restore.go @@ -138,7 +138,7 @@ func registerOnlineRestorePerf(r registry.Registry) { Name: sp.testName, Owner: registry.OwnerDisasterRecovery, Benchmark: true, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Cluster: sp.hardware.makeClusterSpecs(r), Timeout: sp.timeout, // These tests measure performance. To ensure consistent perf, // disable metamorphic encryption. @@ -219,7 +219,7 @@ func registerOnlineRestoreCorrectness(r registry.Registry) { registry.TestSpec{ Name: sp.testName, Owner: registry.OwnerDisasterRecovery, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Cluster: sp.hardware.makeClusterSpecs(r), Timeout: sp.timeout, CompatibleClouds: sp.backup.CompatibleClouds(), Suites: sp.suites, diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index fbb80f13a9fe..62a111c54f3b 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -64,7 +64,7 @@ func registerRestoreNodeShutdown(r registry.Registry) { r.Add(registry.TestSpec{ Name: "restore/nodeShutdown/worker", Owner: registry.OwnerDisasterRecovery, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Cluster: sp.hardware.makeClusterSpecs(r), CompatibleClouds: sp.backup.CompatibleClouds(), Suites: registry.Suites(registry.Nightly), TestSelectionOptOutSuites: registry.Suites(registry.Nightly), @@ -88,7 +88,7 @@ func registerRestoreNodeShutdown(r registry.Registry) { r.Add(registry.TestSpec{ Name: "restore/nodeShutdown/coordinator", Owner: registry.OwnerDisasterRecovery, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Cluster: sp.hardware.makeClusterSpecs(r), CompatibleClouds: sp.backup.CompatibleClouds(), Suites: registry.Suites(registry.Nightly), TestSelectionOptOutSuites: registry.Suites(registry.Nightly), @@ -131,7 +131,7 @@ func registerRestore(r registry.Registry) { Name: withPauseSpecs.testName, Owner: registry.OwnerDisasterRecovery, Benchmark: true, - Cluster: withPauseSpecs.hardware.makeClusterSpecs(r, withPauseSpecs.backup.cloud), + Cluster: withPauseSpecs.hardware.makeClusterSpecs(r), Timeout: withPauseSpecs.timeout, CompatibleClouds: withPauseSpecs.backup.CompatibleClouds(), Suites: registry.Suites(registry.Nightly), @@ -424,7 +424,7 @@ func registerRestore(r registry.Registry) { Name: sp.testName, Owner: registry.OwnerDisasterRecovery, Benchmark: true, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Cluster: sp.hardware.makeClusterSpecs(r), Timeout: sp.timeout, // These tests measure performance. To ensure consistent perf, // disable metamorphic encryption. @@ -514,9 +514,7 @@ type hardwareSpecs struct { zones []string } -func (hw hardwareSpecs) makeClusterSpecs( - r registry.Registry, backupCloud spec.Cloud, -) spec.ClusterSpec { +func (hw hardwareSpecs) makeClusterSpecs(r registry.Registry) spec.ClusterSpec { clusterOpts := make([]spec.Option, 0) clusterOpts = append(clusterOpts, spec.CPU(hw.cpus)) if hw.volumeSize != 0 { @@ -528,6 +526,7 @@ func (hw hardwareSpecs) makeClusterSpecs( addWorkloadNode := 0 if hw.workloadNode { addWorkloadNode++ + clusterOpts = append(clusterOpts, spec.WorkloadNodeCount(1)) } if len(hw.zones) > 0 { // Each test is set up to run on one specific cloud, so it's ok that the diff --git a/pkg/roachprod/blobfixture/registry.go b/pkg/roachprod/blobfixture/registry.go index dd3179f1d203..2c7d35e4934e 100644 --- a/pkg/roachprod/blobfixture/registry.go +++ b/pkg/roachprod/blobfixture/registry.go @@ -37,7 +37,8 @@ type Registry struct { // it can be replaced in tests. clock func() time.Time - // uri is the prefix for all fixture data and metadata. + // uri is the prefix for all fixture data and metadata. Do not log the uri. + // It may contain authentication information. // // Fixtures are stored in the following layout: // /metadata// contains metadata for a fixture instance @@ -47,6 +48,7 @@ type Registry struct { // expected to be of the form "scheme:///roachprod/". So a // full metadata path looks like: // gs://cockroach-fixtures/roachprod/v25.1/metadata/backup-tpcc-30k/20220101-1504 + // uri url.URL } @@ -170,6 +172,9 @@ func (r *Registry) Close() { // URI returns a new URL with the given path appended to the registry's URI. // +// Be careful when logging the URI. The query paremeters may contain +// authentication information. +// // Example: // fixture = r.GetLatest(ctx, "backup-tpcc-30k") // r.URI(fixture.DataPath) returns 'gs://cockroach-fixtures/roachprod/v25.1/backup-tpcc-30k/20220101-1504'