diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 9f95e75787c2..82e5edef78c4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -305,6 +305,7 @@ ALL_TESTS = [ "//pkg/revert:revert_test", "//pkg/roachpb:roachpb_disallowed_imports_test", "//pkg/roachpb:roachpb_test", + "//pkg/roachprod/blobfixture:blobfixture_test", "//pkg/roachprod/cloud:cloud_test", "//pkg/roachprod/config:config_test", "//pkg/roachprod/install:install_test", @@ -1612,6 +1613,8 @@ GO_TARGETS = [ "//pkg/revert:revert_test", "//pkg/roachpb:roachpb", "//pkg/roachpb:roachpb_test", + "//pkg/roachprod/blobfixture:blobfixture", + "//pkg/roachprod/blobfixture:blobfixture_test", "//pkg/roachprod/cloud:cloud", "//pkg/roachprod/cloud:cloud_test", "//pkg/roachprod/config:config", diff --git a/pkg/roachprod/blobfixture/BUILD.bazel b/pkg/roachprod/blobfixture/BUILD.bazel new file mode 100644 index 000000000000..2dcdcb95284c --- /dev/null +++ b/pkg/roachprod/blobfixture/BUILD.bazel @@ -0,0 +1,37 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "blobfixture", + srcs = [ + "metadata.go", + "registry.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/roachprod/blobfixture", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/cloud", + "//pkg/roachprod/logger", + "//pkg/settings/cluster", + "//pkg/util/ioctx", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "blobfixture_test", + srcs = ["registry_test.go"], + embed = [":blobfixture"], + deps = [ + "//pkg/cloud", + "//pkg/cloud/cloudpb", + "//pkg/cloud/nodelocal", + "//pkg/roachprod/logger", + "//pkg/settings/cluster", + "//pkg/testutils", + "//pkg/util/leaktest", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/roachprod/blobfixture/metadata.go b/pkg/roachprod/blobfixture/metadata.go new file mode 100644 index 000000000000..a4ab73ecb1c8 --- /dev/null +++ b/pkg/roachprod/blobfixture/metadata.go @@ -0,0 +1,147 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package blobfixture + +import ( + "encoding/json" + "fmt" + "slices" + "time" + + "github.com/cockroachdb/errors" +) + +// FixtureMetadata is the metadata stored as in object storage for each +// fixture. Its serialized to object storage as json. So be mindful of +// backwards compatability when changing or removing fields. +type FixtureMetadata struct { + // Kind is a user defined string that is used to group fixtures together. + Kind string `json:"kind"` + + // DataPath is the path to the fixture data in object storage. + DataPath string `json:"path"` + + // MetadataPath is the path to the metadata for this fixture in object storage. + MetadataPath string `json:"metadata_path"` + + // CreatedAt is the time the fixture was created. + CreatedAt time.Time `json:"created_at"` + + // ReadyAt is the time the fixture was made ready for use. + ReadyAt *time.Time `json:"ready_at,omitempty"` +} + +func (f *FixtureMetadata) MarshalJson() ([]byte, error) { + // Indent the metadata json so the blobs are easier to read when viewing + // objects in the cloud consoles. + return json.MarshalIndent(f, "", " ") +} + +func (f *FixtureMetadata) UnmarshalJson(data []byte) error { + if err := json.Unmarshal(data, f); err != nil { + return errors.Wrap(err, "unmarshal fixture metadata") + } + if f.Kind == "" { + return errors.New("missing kind") + } + if f.DataPath == "" { + return errors.New("missing data path") + } + if f.MetadataPath == "" { + return errors.New("missing metadata path") + } + if f.CreatedAt.IsZero() { + return errors.New("missing created at") + } + return nil +} + +func mostRecent(fixture []FixtureMetadata) *FixtureMetadata { + var mostRecent *FixtureMetadata + for _, f := range fixture { + if f.ReadyAt == nil || f.ReadyAt.IsZero() { + continue + } + if mostRecent == nil || f.ReadyAt.After(*mostRecent.ReadyAt) { + mostRecent = &f + } + } + return mostRecent +} + +type fixtureToDelete struct { + metadata FixtureMetadata + reason string +} + +// fixturesToGc returns a list of fixtures to delete. The policy is as follows: +// +// If a fixture is not ready within 48 hours, assume creation failed and it +// was leaked. +// +// A fixture has a successor if there is another fixture of the same kind +// that was made ready after it. A fixture is eligible for gc if it has a +// successor and the successor was made ready more than 24 hours ago. The 24 +// hour wait is to ensure no tests are in the middle of using the fixture. +// +// GC decisions are made soly based on the metadata. There is no attempt to +// examine actual live data in object storage. This ensures the GC will only +// delete data that is managed by the fixture registry, so its safe to mix +// manually managed and non-managed fixtures. This decision may be worth +// revisiting if the registry is given its own bucket and is guaranteed to own +// all data in it. +func fixturesToGc(gcAt time.Time, allFixtures []FixtureMetadata) []fixtureToDelete { + // If a fixtures is not ready within 48 hours, assume creation failed and it + // was leaked. + leakedAtThreshold := gcAt.Add(-48 * time.Hour) + + // A fixture is eligible for gc if it has a successor and the successor was + // made ready more than 24 hours ago. + obsoleteThreshold := gcAt.Add(-24 * time.Hour) + + toDelete := []fixtureToDelete{} + + byKind := make(map[string][]FixtureMetadata) + for _, f := range allFixtures { + if f.ReadyAt == nil || f.ReadyAt.IsZero() { + if f.CreatedAt.Before(leakedAtThreshold) { + toDelete = append(toDelete, fixtureToDelete{ + metadata: f, + reason: "fixture was not made ready within 48 hours", + }) + continue + } else { + // fixtures is being created and is not eligible for gc + continue + } + } + byKind[f.Kind] = append(byKind[f.Kind], f) + } + + for kind := range byKind { + // Sort by ReadyAt in descending order so that index 0 is the most recent + // fixture. + slices.SortFunc(byKind[kind], func(a, b FixtureMetadata) int { + return -a.ReadyAt.Compare(*b.ReadyAt) + }) + } + + for _, fixtures := range byKind { + // NOTE: starting at 1 because index 0 is the most recent fixture and is + // not eligible for garbage collection. + for i := 1; i < len(fixtures); i++ { + successor := fixtures[i-1] + if successor.ReadyAt.Before(obsoleteThreshold) { + toDelete = append(toDelete, fixtureToDelete{ + metadata: fixtures[i], + reason: fmt.Sprintf("fixture '%s' is was mode obsolete by '%s' at '%s'", fixtures[i].DataPath, successor.DataPath, successor.ReadyAt), + }) + } + } + } + + return toDelete +} diff --git a/pkg/roachprod/blobfixture/registry.go b/pkg/roachprod/blobfixture/registry.go new file mode 100644 index 000000000000..dd3179f1d203 --- /dev/null +++ b/pkg/roachprod/blobfixture/registry.go @@ -0,0 +1,269 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package blobfixture + +import ( + "context" + "net/url" + "path" + "regexp" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// Registry is used to manage fixtures stored in object storage. It tracks +// which fixtures are ready for use and uses garbage collection to delete +// leaked clusters. +type Registry struct { + // storage is used to read and write fixture data and metadata. storage is + // configured using the root of the bucket. For example, if the uri is + // gs://cockroach-fixtures/roachprod/v25.1, the 'roachprod/v25.1' part is + // removed when creating the storage instance. This improves the legibility + // of paths in fixture metadata objects. + storage cloud.ExternalStorage + + // clock is used to generate timestamps for fixtures. It is a variable so that + // it can be replaced in tests. + clock func() time.Time + + // uri is the prefix for all fixture data and metadata. + // + // Fixtures are stored in the following layout: + // /metadata// contains metadata for a fixture instance + // // contains the actual fixture data + // + // The uri ispassed to the registry at construction time. The baseURI is + // expected to be of the form "scheme:///roachprod/". So a + // full metadata path looks like: + // gs://cockroach-fixtures/roachprod/v25.1/metadata/backup-tpcc-30k/20220101-1504 + uri url.URL +} + +// NewRegistry creates a new Registry instance. The uri parameter is the prefix +// for all fixture data and metadata. See the comment on the uri field for the +// structure of a fixture directory. +func NewRegistry(ctx context.Context, uri url.URL) (*Registry, error) { + supportedSchemes := map[string]bool{"gs": true, "s3": true, "azure": true} + if !supportedSchemes[uri.Scheme] { + return nil, errors.Errorf("unsupported scheme %q", uri.Scheme) + } + + storage, err := cloud.EarlyBootExternalStorageFromURI( + ctx, + getBaseURI(uri), + base.ExternalIODirConfig{}, + cluster.MakeTestingClusterSettings(), + nil, /*limiters */ + cloud.NilMetrics, + ) + if err != nil { + return nil, err + } + + return &Registry{ + storage: storage, + uri: uri, + clock: timeutil.Now, + }, nil +} + +func getBaseURI(uri url.URL) string { + baseUri := uri + baseUri.Path = "" + return baseUri.String() +} + +// GetLatest returns the most recent ready fixture of the given kind. +func (r *Registry) GetLatest(ctx context.Context, kind string) (FixtureMetadata, error) { + metadataPrefix := path.Join(r.uri.Path, "metadata", kind) + + fixtures, err := r.listFixtures(ctx, metadataPrefix, nil) + if err != nil { + return FixtureMetadata{}, errors.Wrapf(err, "failed to fetch metadata in %q", metadataPrefix) + } + + found := mostRecent(fixtures) + if found == nil { + return FixtureMetadata{}, errors.Errorf("no fixtures found for kind %q in %q", kind, metadataPrefix) + } + + return *found, nil +} + +// kindRegex is used to validate the kind name when creating fixtures. Fixture +// names are allowed to be lower-snake-case-with-1337-numbers. For example: +// backup-tpcc-30k is an allowed kind name. +var kindRegex = regexp.MustCompile(`^[a-z0-9\-]+$`) + +// Create creates a new fixture of the given kind. Once the fixture generator +// is finished creating the fixture, it should call SetReadyAt to mark the +// fixture as ready for use. +func (r *Registry) Create( + ctx context.Context, kind string, l *logger.Logger, +) (ScratchHandle, error) { + if !kindRegex.MatchString(kind) { + return ScratchHandle{}, errors.Errorf("invalid kind name %q", kind) + } + + if strings.Contains(kind, "metadata") { + return ScratchHandle{}, errors.Errorf("kind name %q contains reserved word 'metadata'", kind) + } + + now := r.clock().UTC() + basename := now.Format("20060102-1504") + + metadata := FixtureMetadata{ + CreatedAt: now, + Kind: kind, + DataPath: path.Join(r.uri.Path, kind, basename), + MetadataPath: path.Join(r.uri.Path, "metadata", kind, basename), + } + + if err := r.upsertMetadata(metadata); err != nil { + return ScratchHandle{}, err + } + + l.Printf("creating fixture %q", metadata.DataPath) + + return ScratchHandle{registry: r, logger: l, metadata: metadata}, nil +} + +// GC deletes fixtures that were leaked or obsolete. See the comment on the +// fixturesToGc for details about the GC policy. +func (r *Registry) GC(ctx context.Context, l *logger.Logger) error { + fixtures, err := r.listFixtures(ctx, path.Join(r.uri.Path, "metadata"), l) + if err != nil { + return errors.Wrap(err, "gc failed to list metadata") + } + + toDelete := fixturesToGc(r.clock(), fixtures) + + l.Printf("running gc: deleting %d out of %d fixtures", len(toDelete), len(fixtures)) + + for _, f := range toDelete { + l.Printf("deleting fixture %q: %s", f.metadata.DataPath, f.reason) + if err := r.deleteBlobsMatchingPrefix(f.metadata.DataPath); err != nil { + return err + } + if err := r.deleteMetadata(f.metadata); err != nil { + return err + } + } + + return nil +} + +func (r *Registry) Close() { + _ = r.storage.Close() +} + +// URI returns a new URL with the given path appended to the registry's URI. +// +// Example: +// fixture = r.GetLatest(ctx, "backup-tpcc-30k") +// r.URI(fixture.DataPath) returns 'gs://cockroach-fixtures/roachprod/v25.1/backup-tpcc-30k/20220101-1504' +func (r *Registry) URI(path string) url.URL { + copy := r.uri + copy.Path = path + return copy +} + +func (r *Registry) listFixtures( + ctx context.Context, kindPrefix string, l *logger.Logger, +) ([]FixtureMetadata, error) { + if l != nil { + l.Printf("listing fixtures: %s", kindPrefix) + } + result := []FixtureMetadata{} + err := r.storage.List(ctx, kindPrefix /*delimiter*/, "", func(found string) error { + file, _, err := r.storage.ReadFile(ctx, path.Join(kindPrefix, found), cloud.ReadOptions{}) + if err != nil { + return err + } + defer func() { _ = file.Close(ctx) }() + + json, err := ioctx.ReadAll(ctx, file) + if err != nil { + return err + } + + metadata := FixtureMetadata{} + if err := metadata.UnmarshalJson(json); err != nil { + return err + } + + result = append(result, metadata) + + return nil + }) + if err != nil { + return nil, err + } + + return result, nil +} + +func (r *Registry) upsertMetadata(metadata FixtureMetadata) error { + json, err := metadata.MarshalJson() + if err != nil { + return err + } + + writer, err := r.storage.Writer(context.Background(), metadata.MetadataPath) + if err != nil { + return err + } + + if _, err := writer.Write(json); err != nil { + _ = writer.Close() + return err + } + + return writer.Close() +} + +func (r *Registry) deleteMetadata(metadata FixtureMetadata) error { + return errors.Wrap(r.storage.Delete(context.Background(), metadata.MetadataPath), "unable to delete fixture metadata") +} + +func (r *Registry) deleteBlobsMatchingPrefix(prefix string) error { + return r.storage.List(context.Background(), prefix, "", func(path string) error { + return r.storage.Delete(context.Background(), prefix+path) + }) +} + +// ScratchHandle is returned by Registry.Create and is used to mark a fixture +// as ready for use. +type ScratchHandle struct { + registry *Registry + logger *logger.Logger + metadata FixtureMetadata +} + +func (s *ScratchHandle) Metadata() FixtureMetadata { + return s.metadata +} + +// SetReadyAt marks the fixture as ready for use. This should be called after +// the fixture generator has finished creating the fixture and is no longer +// writing to the data path. +func (s *ScratchHandle) SetReadyAt(ctx context.Context) error { + ready := s.registry.clock() + s.metadata.ReadyAt = &ready + if err := s.registry.upsertMetadata(s.metadata); err != nil { + return err + } + s.logger.Printf("fixture '%s' ready at '%s'", s.metadata.DataPath, s.metadata.ReadyAt) + return nil +} diff --git a/pkg/roachprod/blobfixture/registry_test.go b/pkg/roachprod/blobfixture/registry_test.go new file mode 100644 index 000000000000..3dee6662fbe8 --- /dev/null +++ b/pkg/roachprod/blobfixture/registry_test.go @@ -0,0 +1,228 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package blobfixture + +import ( + "context" + "io" + "net/url" + "path" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func newLogger(t *testing.T) *logger.Logger { + cfg := logger.Config{ + Stdout: io.Discard, + Stderr: io.Discard, + } + l, err := cfg.NewLogger("") + require.NoError(t, err) + return l +} + +func newTestRegistry(t *testing.T, uri string) *Registry { + dir, cleanup := testutils.TempDir(t) + t.Cleanup(cleanup) + + url, err := url.Parse(uri) + require.NoError(t, err) + + cfg := cloudpb.ExternalStorage{} + cfg.LocalFileConfig.Path = dir + + return &Registry{ + storage: nodelocal.TestingMakeNodelocalStorage(dir, cluster.MakeTestingClusterSettings(), cfg), + uri: *url, + clock: func() time.Time { return timeutil.Now() }, + } +} + +func TestFixtureRegistry(t *testing.T) { + defer leaktest.AfterTest(t)() + + type fixture struct { + kind string + createdAt time.Time + readyAt time.Time + isLatestOfKind bool + survivesGC bool + } + + start := timeutil.Now() + makeTime := func(days float32) time.Time { + return start.Add(time.Duration(days*24) * time.Hour) + } + + fixtures := []fixture{ + { + // This fixture was created 3 days ago, but is not ready yet, so it will + // be garbage collected. + kind: "kind-leaked", + createdAt: makeTime(-3), + survivesGC: false, + isLatestOfKind: false, + }, + { + // This fixture is not ready yet, but it was created less than 2 days + // ago, so it will not be garbage collected. + kind: "kind-creating", + createdAt: makeTime(-1), + survivesGC: true, + isLatestOfKind: false, + }, + { + // This fixture is older than 2 days, but it is the latest of its kind, + // so it will not be garbage collected. + kind: "kind-singleton", + createdAt: makeTime(-10), + readyAt: makeTime(-5), + survivesGC: true, + isLatestOfKind: true, + }, + { + // This fixture was obsolete for more than two days, so it will be + // deleted. + kind: "kind-multiple", + createdAt: makeTime(-10), + readyAt: makeTime(-9), + survivesGC: false, + isLatestOfKind: false, + }, + { + // This fixture was not obsolete for more than one day, so it will not be + // deleted. + kind: "kind-multiple", + createdAt: makeTime(-5), + readyAt: makeTime(-4), + survivesGC: true, + isLatestOfKind: false, + }, + { + // This fixture was recently created, so its predecessor will not be deleted. + kind: "kind-multiple", + createdAt: makeTime(-2), + readyAt: makeTime(-0.5), + survivesGC: true, + isLatestOfKind: true, + }, + { + // This is the most recent fixture of its kind, but its not ready yet, so + // its not the latest and its not eligible for GC. + kind: "kind-multiple", + createdAt: makeTime(-1), + survivesGC: true, + isLatestOfKind: false, + }, + } + + type fixturesCreated struct { + fixture fixture + metadata FixtureMetadata + } + + var created []fixturesCreated + + var now time.Time + dir, cleanup := testutils.TempDir(t) + defer cleanup() + cfg := cloudpb.ExternalStorage{} + cfg.LocalFileConfig.Path = dir + + registry := newTestRegistry(t, "nodelocal://1/roachtest/v25.1") + registry.clock = func() time.Time { return now } + + l := newLogger(t) + + ctx := context.Background() + for _, f := range fixtures { + now = f.createdAt + + handle, err := registry.Create(ctx, f.kind, l) + require.NoError(t, err) + + metadata := handle.Metadata() + + writer, err := registry.storage.Writer(ctx, path.Join(metadata.DataPath, "sentinel")) + require.NoError(t, err) + + _, err = writer.Write([]byte(metadata.CreatedAt.String())) + require.NoError(t, err) + + require.NoError(t, writer.Close()) + + if !f.readyAt.IsZero() { + now = f.readyAt + require.NoError(t, handle.SetReadyAt(ctx)) + } + + created = append(created, fixturesCreated{ + fixture: f, + metadata: metadata, + }) + } + + now = makeTime(0) + + require.NoError(t, registry.GC(ctx, l)) + + for _, c := range created { + metadata, err := registry.GetLatest(ctx, c.fixture.kind) + if c.fixture.isLatestOfKind { + require.NoError(t, err) + require.Equal(t, c.metadata.DataPath, metadata.DataPath) + } else { + if err != nil { + require.ErrorContains(t, err, "no fixtures found for kind") + } + require.NotEqual(t, c.metadata.DataPath, metadata.DataPath) + } + + reader, _, err := registry.storage.ReadFile(ctx, path.Join(c.metadata.DataPath, "sentinel"), cloud.ReadOptions{}) + if err == nil { + require.NoError(t, reader.Close(ctx)) + } + + if c.fixture.survivesGC { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, cloud.ErrFileDoesNotExist, "fixture %s", c.metadata.DataPath) + } + } +} + +func TestFixtureRegistryURI(t *testing.T) { + defer leaktest.AfterTest(t)() + + registry := newTestRegistry(t, "nodelocal://1/roachprod/v25.1") + // The time is always 2024-06-01 12:23 + registry.clock = func() time.Time { return time.Date(2024, 6, 1, 12, 23, 0, 0, time.UTC) } + + handle, err := registry.Create(context.Background(), "test-kind", newLogger(t)) + require.NoError(t, err) + + meta := handle.Metadata() + + dataUri := registry.URI(meta.DataPath) + require.Equal(t, + "nodelocal://1/roachprod/v25.1/test-kind/20240601-1223", + dataUri.String()) + + metaUri := registry.URI(meta.MetadataPath) + require.Equal(t, + "nodelocal://1/roachprod/v25.1/metadata/test-kind/20240601-1223", + metaUri.String()) +}