diff --git a/pkg/service/backup/backupspec/paths.go b/pkg/service/backup/backupspec/paths.go index b618eedac..494787fad 100644 --- a/pkg/service/backup/backupspec/paths.go +++ b/pkg/service/backup/backupspec/paths.go @@ -20,7 +20,7 @@ const ( Schema = "schema_with_internals.json.gz" // UnsafeSchema is the name of the schema file that shouldn't be used for restore // (so for Scylla versions older than 6.0). - UnsafeSchema = "schema.json.gz" + UnsafeSchema = "schema.tar.gz" // TempFileExt is suffix for the temporary files. TempFileExt = ".tmp" diff --git a/pkg/service/backup/worker_schema.go b/pkg/service/backup/worker_schema.go index be884c188..fcecec539 100644 --- a/pkg/service/backup/worker_schema.go +++ b/pkg/service/backup/worker_schema.go @@ -3,10 +3,13 @@ package backup import ( + "archive/tar" "bytes" "compress/gzip" "context" "encoding/json" + "io" + "strings" "sync/atomic" "github.com/pkg/errors" @@ -15,6 +18,8 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" "github.com/scylladb/scylla-manager/v3/pkg/util/query" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" + "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -25,37 +30,47 @@ func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session, } if safe { - if err := query.RaftReadBarrier(clusterSession); err != nil { - w.Logger.Error(ctx, "Couldn't perform raft read barrier, backup of schema as CQL files will be skipped", "error", err) - return nil - } - } else { - if err := clusterSession.AwaitSchemaAgreement(ctx); err != nil { - w.Logger.Error(ctx, "Couldn't await schema agreement, backup of unsafe schema as CQL files will be skipped", "error", err) - return nil - } + return w.safeDumpSchema(ctx, clusterSession) } + w.unsafeDumpSchema(ctx, clusterSession) + return nil +} - schema, err := query.DescribeSchemaWithInternals(clusterSession) +func (w *worker) safeDumpSchema(ctx context.Context, session gocqlx.Session) error { + w.Logger.Info(ctx, "Schema backup for used ScyllaDB version is done via DESCRIBE SCHEMA WITH INTERNALS CQL query. "+ + "ScyllaDB Manager will snapshot system_schema sstables, but they won't be used during schema restoration") + if err := query.RaftReadBarrier(session); err != nil { + w.Logger.Error(ctx, "Couldn't perform raft read barrier, backup of schema as CQL files will be skipped "+ + "(restoring schema from this backup won't be possible)", "error", err) + return nil + } + schema, err := query.DescribeSchemaWithInternals(session) if err != nil { return err } b, err := marshalAndCompressSchema(schema) if err != nil { - return err + return errors.Wrap(err, "create safe schema file") } + w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag) + w.Schema = b + return nil +} - if safe { - w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag) - } else { - w.SchemaFilePath = backupspec.RemoteUnsafeSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag) - w.Logger.Info(ctx, "Backing-up and restoring schema from output of "+ - "DESCRIBE SCHEMA WITH INTERNALS is supported starting from ScyllaDB 6.0 or 2024.2. "+ - "Previous versions restore schema via snapshot of system_schema sstables") +func (w *worker) unsafeDumpSchema(ctx context.Context, session gocqlx.Session) { + w.Logger.Info(ctx, "Schema backup for used ScyllaDB version is done via snapshot of system_schema sstables. "+ + "ScyllaDB Manager will try to create unsafe schema CQL archive, but it won't be used during schema restoration") + if err := session.AwaitSchemaAgreement(ctx); err != nil { + w.Logger.Error(ctx, "Couldn't await schema agreement, backup of unsafe schema as CQL files will be skipped", "error", err) + return } - + b, err := createUnsafeSchemaArchive(ctx, w.Units, session) + if err != nil { + w.Logger.Error(ctx, "Couldn't create unsafe schema archive, backup of unsafe schema as CQL files will be skipped", "error", err) + return + } + w.SchemaFilePath = backupspec.RemoteUnsafeSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag) w.Schema = b - return nil } func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError error) { @@ -136,3 +151,45 @@ func isDescribeSchemaSafe(ctx context.Context, client *scyllaclient.Client, host } return out.Load(), nil } + +func createUnsafeSchemaArchive(ctx context.Context, units []Unit, clusterSession gocqlx.Session) (b bytes.Buffer, err error) { + gw := gzip.NewWriter(&b) + tw := tar.NewWriter(gw) + + now := timeutc.Now() + + for _, u := range units { + if err := ctx.Err(); err != nil { + return bytes.Buffer{}, err + } + + km, err := clusterSession.KeyspaceMetadata(u.Keyspace) + if err != nil { + return bytes.Buffer{}, errors.Wrapf(err, "describe keyspace %s schema", u.Keyspace) + } + + cqlSchema, err := km.ToCQL() + if err != nil { + return bytes.Buffer{}, errors.Wrapf(err, "cql keyspace %s metadata", u.Keyspace) + } + + if err := tw.WriteHeader(&tar.Header{ + Name: u.Keyspace + ".cql", + Size: int64(len(cqlSchema)), + Mode: 0o600, + ModTime: now, + }); err != nil { + return bytes.Buffer{}, errors.Wrapf(err, "tar keyspace %s schema", u.Keyspace) + } + + if _, err := io.Copy(tw, strings.NewReader(cqlSchema)); err != nil { + return bytes.Buffer{}, errors.Wrapf(err, "copy %s schema", u.Keyspace) + } + } + + if err := multierr.Combine(tw.Close(), gw.Close()); err != nil { + return bytes.Buffer{}, errors.Wrap(err, "writer close") + } + + return b, nil +}