Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore improvement: index #4044

Merged
merged 10 commits into from
Oct 3, 2024
4 changes: 3 additions & 1 deletion pkg/rclone/rcserver/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) {
if err != nil {
return nil, err
}

if len(paths) == 0 {
return nil, nil
}
return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false)
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem
if err != nil {
return 0, err
}
if paths == nil {
paths = make([]string, 0)
}

p := operations.SyncCopyPathsParams{
Context: forceHost(ctx, host),
Expand Down
37 changes: 25 additions & 12 deletions pkg/service/backup/backupspec/versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
// (e.g. older version of 'md-2-big-Data.db' could be 'md-2-big-Data.db.sm_20230114183231UTC')
// Note, that the newest version of SSTable does not have snapshot tag extension.
type VersionedSSTable struct {
Name string // Original SSTable name (e.g. md-2-big-Data.db)
Version string // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC)
Name string // Original SSTable name (e.g. md-2-big-Data.db)
// Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC).
// Empty version describes not versioned (newest version) of sstable without the snapshot tag suffix.
Version string
Size int64
}

// FullName returns versioned file name.
func (vt VersionedSSTable) FullName() string {
if vt.Version == "" {
return vt.Name
}
return vt.Name + "." + vt.Version
}

Expand Down Expand Up @@ -66,6 +71,9 @@ func IsVersionedFileRemovable(oldest time.Time, versioned string) (bool, error)
// SplitNameAndVersion splits versioned file name into its original name and its version.
func SplitNameAndVersion(versioned string) (name, version string) {
versionExt := path.Ext(versioned)
if versionExt == "" || !IsSnapshotTag(versionExt[1:]) {
return versioned, ""
}
baseName := strings.TrimSuffix(versioned, versionExt)
return baseName, versionExt[1:]
}
Expand All @@ -74,11 +82,7 @@ func SplitNameAndVersion(versioned string) (name, version string) {
func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, dir string) (VersionedMap, error) {
versionedFiles := make(VersionedMap)
allVersions := make(map[string][]VersionedSSTable)

opts := &scyllaclient.RcloneListDirOpts{
FilesOnly: true,
VersionedOnly: true,
}
opts := &scyllaclient.RcloneListDirOpts{FilesOnly: true}
f := func(item *scyllaclient.RcloneListDirItem) {
name, version := SplitNameAndVersion(item.Name)
allVersions[name] = append(allVersions[name], VersionedSSTable{
Expand All @@ -87,7 +91,6 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
Size: item.Size,
})
}

if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil {
return nil, errors.Wrapf(err, "host %s: listing versioned files", host)
}
Expand All @@ -97,9 +100,17 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
return nil, err
}
// Chose correct version with respect to currently restored snapshot tag
for _, versions := range allVersions {
var candidate VersionedSSTable
for name, versions := range allVersions {
var (
candidate VersionedSSTable
newest VersionedSSTable
)
for _, v := range versions {
if v.Version == "" {
newest = v
continue
}

tagT, err := SnapshotTagTime(v.Version)
if err != nil {
return nil, err
Expand All @@ -111,8 +122,10 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
}
}

if candidate.Version != "" {
versionedFiles[candidate.Name] = candidate
if candidate.Version == "" {
versionedFiles[name] = newest
} else {
versionedFiles[name] = candidate
}
}

Expand Down
175 changes: 175 additions & 0 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright (C) 2024 ScyllaDB

package restore

import (
"slices"
"sync"

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

type batchDispatcher struct {
mu sync.Mutex
workload []LocationWorkload
batchSize int
locationHosts map[Location][]string
}

func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher {
return &batchDispatcher{
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
locationHosts: locationHosts,
}
}

type batch struct {
TableName
*ManifestInfo

RemoteSSTableDir string
Size int64
SSTables []RemoteSSTable
}

func (b batch) NotVersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
if !sst.Versioned {
ssts = append(ssts, sst)
}
}
return ssts
}

func (b batch) VersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
if sst.Versioned {
ssts = append(ssts, sst)
}
}
return ssts
}

func (b batch) VersionedSize() int64 {
var size int64
for _, sst := range b.SSTables {
if sst.Versioned {
size += sst.Size
}
}
return size
}

func (b batch) IDs() []string {
var ids []string
for _, sst := range b.SSTables {
ids = append(ids, sst.ID)
}
return ids
}

// ValidateAllDispatched returns error if not all sstables were dispatched.
func (b *batchDispatcher) ValidateAllDispatched() error {
for _, lw := range b.workload {
if lw.Size != 0 {
for _, tw := range lw.Tables {
if tw.Size != 0 {
for _, dw := range tw.RemoteDirs {
if dw.Size != 0 || len(dw.SSTables) != 0 {
return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)",
dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)",
tw.Location, tw.Keyspace, tw.Table, tw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)",
lw.Location, lw.Size)
}
}
return nil
}

// DispatchBatch batch to be restored or false when there is no more work to do.
func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) {
b.mu.Lock()
defer b.mu.Unlock()

l := b.chooseLocation(host)
if l == nil {
return batch{}, false
}
t := b.chooseTable(l)
if t == nil {
return batch{}, false
}
dir := b.chooseRemoteDir(t)
if dir == nil {
return batch{}, false
}
out := b.createBatch(l, t, dir)
return out, true
}

// Returns location for which batch should be created.
func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload {
for i := range b.workload {
if b.workload[i].Size == 0 {
continue
}
if slices.Contains(b.locationHosts[b.workload[i].Location], host) {
return &b.workload[i]
}
}
return nil
}

// Returns table for which batch should be created.
func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload {
for i := range location.Tables {
if location.Tables[i].Size == 0 {
continue
}
return &location.Tables[i]
}
return nil
}

// Return remote dir for which batch should be created.
func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload {
for i := range table.RemoteDirs {
if table.RemoteDirs[i].Size == 0 {
continue
}
return &table.RemoteDirs[i]
}
return nil
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch {
i := min(b.batchSize, len(dir.SSTables))
sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]

var size int64
for _, sst := range sstables {
size += sst.Size
}
dir.Size -= size
t.Size -= size
l.Size -= size
return batch{
TableName: dir.TableName,
ManifestInfo: dir.ManifestInfo,
RemoteSSTableDir: dir.RemoteSSTableDir,
Size: size,
SSTables: sstables,
}
}
Loading
Loading