Skip to content

Commit

Permalink
fix(backup): use raft read barrier instead of awaiting schema agreement
Browse files Browse the repository at this point in the history
Fixes #3887
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 21, 2024
1 parent f46c173 commit 24999e1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 61 deletions.
19 changes: 9 additions & 10 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,23 +817,22 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
return w.Snapshot(ctx, hi, target.SnapshotParallel)
},
StageAwaitSchema: func() error {
clusterSession, err := s.clusterSession(ctx, clusterID)
descSchemaHost := liveNodes[0].Addr
session, err := s.clusterSession(ctx, clusterID, cluster.SingleHostSessionConfigOption(descSchemaHost))
if err != nil {
w.Logger.Info(ctx, "No CQL cluster session, backup of schema as CQL files will be skipped", "error", err)
return nil
}
defer clusterSession.Close()

if err := w.AwaitSchemaAgreement(ctx, clusterSession); err != nil {
return errors.Wrap(err, "await schema agreement")
if errors.Is(err, cluster.ErrNoCQLCredentials) {
w.Logger.Error(ctx, "No CQL cluster credentials, backup of schema as CQL files will be skipped", "error", err)
return nil
}
return errors.Wrapf(err, "create single host (%s) session to", descSchemaHost)
}
defer session.Close()

var hosts []string
for _, h := range hi {
hosts = append(hosts, h.IP)
}

if err := w.DumpSchema(ctx, clusterSession, hosts); err != nil {
if err := w.DumpSchema(ctx, session, hosts); err != nil {
return errors.Wrap(err, "dump schema")
}

Expand Down
64 changes: 13 additions & 51 deletions pkg/service/backup/worker_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,81 +8,43 @@ import (
"context"
"encoding/json"
"sync/atomic"
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"golang.org/x/sync/errgroup"
)

func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession gocqlx.Session) error {
const (
waitMin = 15 * time.Second // nolint: revive
waitMax = 1 * time.Minute
maxElapsedTime = 15 * time.Minute
multiplier = 2
jitter = 0.2
)

backoff := retry.NewExponentialBackoff(
waitMin,
maxElapsedTime,
waitMax,
multiplier,
jitter,
)

notify := func(err error, wait time.Duration) {
w.Logger.Info(ctx, "Schema agreement not reached, retrying...", "error", err, "wait", wait)
func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session, hosts []string) error {
safe, err := isDescribeSchemaSafe(ctx, w.Client, hosts)
if err != nil {
return errors.Wrap(err, "check describe schema support")
}

const (
peerSchemasStmt = "SELECT schema_version FROM system.peers"
localSchemaStmt = "SELECT schema_version FROM system.local WHERE key='local'"
)

return retry.WithNotify(ctx, func() error {
var v []string
if err := clusterSession.Query(peerSchemasStmt, nil).SelectRelease(&v); err != nil {
return retry.Permanent(err)
}
var lv string
if err := clusterSession.Query(localSchemaStmt, nil).GetRelease(&lv); err != nil {
return retry.Permanent(err)
if safe {
if err := query.RaftReadBarrier(clusterSession); err != nil {
w.Logger.Error(ctx, "Couldn't perform raft read barrier, backup of schema as CQL files will be skipped", "error", err)
return nil
}

// Join all versions
m := strset.New(v...)
m.Add(lv)
if m.Size() > 1 {
return errors.Errorf("cluster schema versions not consistent: %s", m.List())
} else {
if err := clusterSession.AwaitSchemaAgreement(ctx); err != nil {
w.Logger.Error(ctx, "Couldn't await schema agreement, backup of unsafe schema as CQL files will be skipped", "error", err)
return nil
}
}

return nil
}, backoff, notify)
}

func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session, hosts []string) error {
schema, err := query.DescribeSchemaWithInternals(clusterSession)
if err != nil {
return err
}

b, err := marshalAndCompressSchema(schema)
if err != nil {
return err
}

safe, err := isDescribeSchemaSafe(ctx, w.Client, hosts)
if err != nil {
return errors.Wrap(err, "check describe schema support")
}
if safe {
w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
} else {
Expand Down

0 comments on commit 24999e1

Please sign in to comment.