Skip to content

Commit

Permalink
fix(backup): use old driver side describe for older Scylla versions
Browse files Browse the repository at this point in the history
It is safer to keep on using the driver implementation of DESCRIBE SCHEMA for older Scylla versions, because:
- older Scylla versions (e.g. 2022) don't support server side describe
- it won't be used for restore purposes anyway

Fixes #3903
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 26, 2024
1 parent cee8877 commit 4688ae0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/service/backup/backupspec/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
Schema = "schema_with_internals.json.gz"
// UnsafeSchema is the name of the schema file that shouldn't be used for restore
// (so for Scylla versions older than 6.0).
UnsafeSchema = "schema.json.gz"
UnsafeSchema = "schema.tar.gz"
// TempFileExt is suffix for the temporary files.
TempFileExt = ".tmp"

Expand Down
97 changes: 77 additions & 20 deletions pkg/service/backup/worker_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
package backup

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"strings"
"sync/atomic"

"github.com/pkg/errors"
Expand All @@ -15,6 +18,8 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)

Expand All @@ -25,37 +30,47 @@ func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session,
}

if safe {
if err := query.RaftReadBarrier(clusterSession); err != nil {
w.Logger.Error(ctx, "Couldn't perform raft read barrier, backup of schema as CQL files will be skipped", "error", err)
return nil
}
} else {
if err := clusterSession.AwaitSchemaAgreement(ctx); err != nil {
w.Logger.Error(ctx, "Couldn't await schema agreement, backup of unsafe schema as CQL files will be skipped", "error", err)
return nil
}
return w.safeDumpSchema(ctx, clusterSession)
}
w.unsafeDumpSchema(ctx, clusterSession)
return nil
}

schema, err := query.DescribeSchemaWithInternals(clusterSession)
func (w *worker) safeDumpSchema(ctx context.Context, session gocqlx.Session) error {
w.Logger.Info(ctx, "Schema backup for used ScyllaDB version is done via DESCRIBE SCHEMA WITH INTERNALS CQL query. "+
"ScyllaDB Manager will snapshot system_schema sstables, but they won't be used during schema restoration")
if err := query.RaftReadBarrier(session); err != nil {
w.Logger.Error(ctx, "Couldn't perform raft read barrier, backup of schema as CQL files will be skipped "+
"(restoring schema from this backup won't be possible)", "error", err)
return nil
}
schema, err := query.DescribeSchemaWithInternals(session)
if err != nil {
return err
}
b, err := marshalAndCompressSchema(schema)
if err != nil {
return err
return errors.Wrap(err, "create safe schema file")
}
w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
w.Schema = b
return nil
}

if safe {
w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
} else {
w.SchemaFilePath = backupspec.RemoteUnsafeSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
w.Logger.Info(ctx, "Backing-up and restoring schema from output of "+
"DESCRIBE SCHEMA WITH INTERNALS is supported starting from ScyllaDB 6.0 or 2024.2. "+
"Previous versions restore schema via snapshot of system_schema sstables")
func (w *worker) unsafeDumpSchema(ctx context.Context, session gocqlx.Session) {
w.Logger.Info(ctx, "Schema backup for used ScyllaDB version is done via snapshot of system_schema sstables. "+
"ScyllaDB Manager will try to create unsafe schema CQL archive, but it won't be used during schema restoration")
if err := session.AwaitSchemaAgreement(ctx); err != nil {
w.Logger.Error(ctx, "Couldn't await schema agreement, backup of unsafe schema as CQL files will be skipped", "error", err)
return
}

b, err := createUnsafeSchemaArchive(ctx, w.Units, session)
if err != nil {
w.Logger.Error(ctx, "Couldn't create unsafe schema archive, backup of unsafe schema as CQL files will be skipped", "error", err)
return
}
w.SchemaFilePath = backupspec.RemoteUnsafeSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
w.Schema = b
return nil
}

func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError error) {
Expand Down Expand Up @@ -136,3 +151,45 @@ func isDescribeSchemaSafe(ctx context.Context, client *scyllaclient.Client, host
}
return out.Load(), nil
}

func createUnsafeSchemaArchive(ctx context.Context, units []Unit, clusterSession gocqlx.Session) (b bytes.Buffer, err error) {
gw := gzip.NewWriter(&b)
tw := tar.NewWriter(gw)

now := timeutc.Now()

for _, u := range units {
if err := ctx.Err(); err != nil {
return bytes.Buffer{}, err
}

km, err := clusterSession.KeyspaceMetadata(u.Keyspace)
if err != nil {
return bytes.Buffer{}, errors.Wrapf(err, "describe keyspace %s schema", u.Keyspace)
}

cqlSchema, err := km.ToCQL()
if err != nil {
return bytes.Buffer{}, errors.Wrapf(err, "cql keyspace %s metadata", u.Keyspace)
}

if err := tw.WriteHeader(&tar.Header{
Name: u.Keyspace + ".cql",
Size: int64(len(cqlSchema)),
Mode: 0o600,
ModTime: now,
}); err != nil {
return bytes.Buffer{}, errors.Wrapf(err, "tar keyspace %s schema", u.Keyspace)
}

if _, err := io.Copy(tw, strings.NewReader(cqlSchema)); err != nil {
return bytes.Buffer{}, errors.Wrapf(err, "copy %s schema", u.Keyspace)
}
}

if err := multierr.Combine(tw.Close(), gw.Close()); err != nil {
return bytes.Buffer{}, errors.Wrap(err, "writer close")
}

return b, nil
}

0 comments on commit 4688ae0

Please sign in to comment.