From cae9cfebd8785e060b8a4b62b294bbae3c4d067c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 14:11:33 +0200 Subject: [PATCH 01/10] feat(schema): drop sequential restore run tracking Right now SM restores location by location, manifest by manifest, table by table. That's why it tracks restore progress by keeping location/manifest/table in the DB. We are moving away from sequential restore approach in favor of restoring from all locations/manifests/tables at the same time. --- pkg/schema/table/table.go | 4 ---- schema/v3.4.0.cql | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 schema/v3.4.0.cql diff --git a/pkg/schema/table/table.go b/pkg/schema/table/table.go index 1e5bf6351..5dcbeefcc 100644 --- a/pkg/schema/table/table.go +++ b/pkg/schema/table/table.go @@ -188,14 +188,10 @@ var ( Columns: []string{ "cluster_id", "id", - "keyspace_name", - "location", - "manifest_path", "prev_id", "repair_task_id", "snapshot_tag", "stage", - "table_name", "task_id", "units", "views", diff --git a/schema/v3.4.0.cql b/schema/v3.4.0.cql new file mode 100644 index 000000000..4fe13883c --- /dev/null +++ b/schema/v3.4.0.cql @@ -0,0 +1,4 @@ +ALTER TABLE restore_run DROP location; +ALTER TABLE restore_run DROP manifest_path; +ALTER TABLE restore_run DROP keyspace_name; +ALTER TABLE restore_run DROP table_name; \ No newline at end of file From 899edfed292daaf6ee814102a542a2c53b7639ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 14:30:02 +0200 Subject: [PATCH 02/10] feat(restore): adjust model to dropped sequential restore run tracking --- pkg/service/restore/model.go | 30 +++++++--------- pkg/service/restore/schema_worker.go | 13 ++----- pkg/service/restore/tables_worker.go | 47 +++++-------------------- pkg/service/restore/tablesdir_worker.go | 12 +++---- pkg/service/restore/worker.go | 4 --- 5 files changed, 29 insertions(+), 77 deletions(-) diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 39a7d4a28..baedc9550 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -80,16 +80,11 @@ type Run struct { TaskID uuid.UUID ID uuid.UUID - PrevID uuid.UUID - Location string // marks currently processed location - ManifestPath string // marks currently processed manifest - Keyspace string `db:"keyspace_name"` // marks currently processed keyspace - Table string `db:"table_name"` // marks currently processed table - SnapshotTag string - Stage Stage + PrevID uuid.UUID + SnapshotTag string + Stage Stage RepairTaskID uuid.UUID // task ID of the automated post-restore repair - // Cache that's initialized once for entire task Units []Unit Views []View @@ -165,20 +160,21 @@ func (t *View) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error return gocql.Unmarshal(info, data, f.Addr().Interface()) } -// RunProgress describes restore progress (like in RunProgress) of -// already started download of SSTables with specified IDs to host. +// RunProgress describes progress of restoring a single batch. type RunProgress struct { ClusterID uuid.UUID TaskID uuid.UUID RunID uuid.UUID - ManifestPath string - Keyspace string `db:"keyspace_name"` - Table string `db:"table_name"` - Host string // IP of the node to which SSTables are downloaded. - AgentJobID int64 + // Different DB name because of historical reasons and because we can't drop/alter clustering column + RemoteSSTableDir string `db:"manifest_path"` + Keyspace string `db:"keyspace_name"` + Table string `db:"table_name"` + SSTableID []string `db:"sstable_id"` + + Host string // IP of the node to which SSTables are downloaded. + AgentJobID int64 - SSTableID []string `db:"sstable_id"` DownloadStartedAt *time.Time DownloadCompletedAt *time.Time RestoreStartedAt *time.Time @@ -206,7 +202,7 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run "cluster_id": pr.ClusterID, "task_id": pr.TaskID, "run_id": pr.RunID, - "manifest_path": pr.ManifestPath, + "manifest_path": pr.RemoteSSTableDir, "keyspace_name": pr.Keyspace, "table_name": pr.Table, }).Iter() diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index bf1048f7f..e5c547ea5 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -167,7 +167,6 @@ func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: "DESCRIBE SCHEMA WITH INTERNALS", Keyspace: u.Keyspace, Table: t.Table, DownloadStartedAt: &start, @@ -188,8 +187,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema from location", "location", location) defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location) - w.run.Location = location.String() - tableDownloadHandler := func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { return nil @@ -198,9 +195,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table) - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace - return w.workFunc(ctx, fm) } @@ -209,7 +203,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc defer w.logger.Info(ctx, "Downloading schema from manifest finished", "manifest", miwc.ManifestInfo) w.miwc = miwc - w.run.ManifestPath = miwc.Path() w.insertRun(ctx) return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler) @@ -308,9 +301,9 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.run.ManifestPath, - Keyspace: w.run.Keyspace, - Table: w.run.Table, + RemoteSSTableDir: srcDir, + Keyspace: fm.Keyspace, + Table: fm.Table, Host: host, DownloadStartedAt: &start, DownloadCompletedAt: &end, diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 774b581de..fbdf3d188 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -20,9 +20,6 @@ type tablesWorker struct { repairSvc *repair.Service progress *TotalRestoreProgress - // When set to false, tablesWorker will skip restoration of location/manifest/table - // until it encounters the one present in run. - alreadyResumed bool } // TotalRestoreProgress is a struct that holds information about the total progress of the restore job. @@ -66,20 +63,16 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) { func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) *tablesWorker { return &tablesWorker{ - worker: w, - repairSvc: repairSvc, - alreadyResumed: true, - progress: NewTotalRestoreProgress(totalBytes), + worker: w, + repairSvc: repairSvc, + progress: NewTotalRestoreProgress(totalBytes), } } // restore files from every location specified in restore target. func (w *tablesWorker) restore(ctx context.Context) error { - if w.target.Continue && w.run.PrevID != uuid.Nil && w.run.Table != "" { - w.alreadyResumed = false - } // Init metrics only on fresh start - if w.alreadyResumed { + if w.run.PrevID == uuid.Nil { w.initRestoreMetrics(ctx) } @@ -158,10 +151,6 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { // Restore locations in deterministic order for _, l := range w.target.Location { - if !w.alreadyResumed && w.run.Location != l.String() { - w.logger.Info(ctx, "Skipping location", "location", l) - continue - } if err := w.restoreLocation(ctx, l); err != nil { return err } @@ -174,11 +163,6 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e defer w.logger.Info(ctx, "Restoring location finished", "location", location) restoreManifest := func(miwc ManifestInfoWithContent) error { - if !w.alreadyResumed && w.run.ManifestPath != miwc.Path() { - w.logger.Info(ctx, "Skipping manifest", "manifest", miwc.ManifestInfo) - return nil - } - w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) @@ -194,32 +178,17 @@ func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithCont return nil } - if !w.alreadyResumed { - if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table { - w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table) - return nil - } - } - w.logger.Info(ctx, "Restoring table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Restoring table finished", "keyspace", fm.Keyspace, "table", fm.Table) - - w.run.Location = miwc.Location.String() - w.run.ManifestPath = miwc.Path() - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace w.insertRun(ctx) dw, err := newTablesDirWorker(ctx, w.worker, miwc, fm, w.progress) if err != nil { return errors.Wrap(err, "create dir worker") } - if !w.alreadyResumed { - if err := dw.resumePrevProgress(); err != nil { - return errors.Wrap(err, "resume prev run progress") - } + if err := dw.resumePrevProgress(); err != nil { + return errors.Wrap(err, "resume prev run progress") } - w.alreadyResumed = true if err := dw.restore(ctx); err != nil { if ctx.Err() != nil { @@ -232,8 +201,8 @@ func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithCont } w.logger.Error(ctx, "Restore table failed on some hosts but restore will proceed", - "keyspace", w.run.Keyspace, - "table", w.run.Table, + "keyspace", fm.Keyspace, + "table", fm.Table, "error", err, ) } diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index b2942a9eb..b5964c18e 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -262,12 +262,11 @@ func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) func (w *tablesDirWorker) resumePrevProgress() error { bind := &RunProgress{ - ClusterID: w.run.ClusterID, - TaskID: w.run.TaskID, - RunID: w.run.ID, - ManifestPath: w.miwc.Path(), - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + ClusterID: w.run.ClusterID, + TaskID: w.run.TaskID, + RunID: w.run.ID, + Keyspace: w.fm.Keyspace, + Table: w.fm.Table, } // All bundles IDs started in the previous run @@ -371,7 +370,6 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.miwc.Path(), Keyspace: w.fm.Keyspace, Table: w.fm.Table, Host: host, diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 00f96c6dc..674d29693 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -666,10 +666,6 @@ func (w *worker) decorateWithPrevRun(ctx context.Context) error { if w.target.Continue { w.run.PrevID = prev.ID - w.run.Location = prev.Location - w.run.ManifestPath = prev.ManifestPath - w.run.Keyspace = prev.Keyspace - w.run.Table = prev.Table w.run.Stage = prev.Stage w.run.RepairTaskID = prev.RepairTaskID } From b7e97c6d18e0ca4931d8315c9243aa460d8b931c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 27 Sep 2024 10:05:05 +0200 Subject: [PATCH 03/10] refactor(backupspec): include the newest file version in ListVersionedFiles There is no need to iterate versioned files (ListVersionedFiles) and not versioned files (buildFilesSizesCache) separately. Doing it in a single iteration is faster, and it allows to store all size information in a single place. --- pkg/service/backup/backupspec/versioning.go | 37 ++++++++++++++------- pkg/service/restore/schema_worker.go | 11 +----- pkg/service/restore/tablesdir_worker.go | 17 ++-------- pkg/service/restore/worker.go | 17 ---------- 4 files changed, 28 insertions(+), 54 deletions(-) diff --git a/pkg/service/backup/backupspec/versioning.go b/pkg/service/backup/backupspec/versioning.go index 103502d13..af3320394 100644 --- a/pkg/service/backup/backupspec/versioning.go +++ b/pkg/service/backup/backupspec/versioning.go @@ -21,13 +21,18 @@ import ( // (e.g. older version of 'md-2-big-Data.db' could be 'md-2-big-Data.db.sm_20230114183231UTC') // Note, that the newest version of SSTable does not have snapshot tag extension. type VersionedSSTable struct { - Name string // Original SSTable name (e.g. md-2-big-Data.db) - Version string // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC) + Name string // Original SSTable name (e.g. md-2-big-Data.db) + // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC). + // Empty version describes not versioned (newest version) of sstable without the snapshot tag suffix. + Version string Size int64 } // FullName returns versioned file name. func (vt VersionedSSTable) FullName() string { + if vt.Version == "" { + return vt.Name + } return vt.Name + "." + vt.Version } @@ -66,6 +71,9 @@ func IsVersionedFileRemovable(oldest time.Time, versioned string) (bool, error) // SplitNameAndVersion splits versioned file name into its original name and its version. func SplitNameAndVersion(versioned string) (name, version string) { versionExt := path.Ext(versioned) + if versionExt == "" || !IsSnapshotTag(versionExt[1:]) { + return versioned, "" + } baseName := strings.TrimSuffix(versioned, versionExt) return baseName, versionExt[1:] } @@ -74,11 +82,7 @@ func SplitNameAndVersion(versioned string) (name, version string) { func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, dir string) (VersionedMap, error) { versionedFiles := make(VersionedMap) allVersions := make(map[string][]VersionedSSTable) - - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - VersionedOnly: true, - } + opts := &scyllaclient.RcloneListDirOpts{FilesOnly: true} f := func(item *scyllaclient.RcloneListDirItem) { name, version := SplitNameAndVersion(item.Name) allVersions[name] = append(allVersions[name], VersionedSSTable{ @@ -87,7 +91,6 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh Size: item.Size, }) } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { return nil, errors.Wrapf(err, "host %s: listing versioned files", host) } @@ -97,9 +100,17 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh return nil, err } // Chose correct version with respect to currently restored snapshot tag - for _, versions := range allVersions { - var candidate VersionedSSTable + for name, versions := range allVersions { + var ( + candidate VersionedSSTable + newest VersionedSSTable + ) for _, v := range versions { + if v.Version == "" { + newest = v + continue + } + tagT, err := SnapshotTagTime(v.Version) if err != nil { return nil, err @@ -111,8 +122,10 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh } } - if candidate.Version != "" { - versionedFiles[candidate.Name] = candidate + if candidate.Version == "" { + versionedFiles[name] = newest + } else { + versionedFiles[name] = candidate } } diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index e5c547ea5..8f7bea05f 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -235,12 +235,6 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { if err != nil { return errors.Wrap(err, "initialize versioned SSTables") } - if len(w.versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", w.versionedFiles, - ) - } idMapping := w.getFileNamesMapping(fm.Files, false) uuidMapping := w.getFileNamesMapping(fm.Files, true) @@ -268,10 +262,7 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { // Rename SSTable in the destination in order to avoid name conflicts dstFile := renamedSSTables[file] // Take the correct version of restored file - srcFile := file - if v, ok := w.versionedFiles[file]; ok { - srcFile = v.FullName() - } + srcFile := w.versionedFiles[file].FullName() srcPath := path.Join(srcDir, srcFile) dstPath := path.Join(dstDir, dstFile) diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index b5964c18e..5e5bab453 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -37,7 +37,6 @@ type tablesDirWorker struct { // (with respect to currently restored snapshot tag) // that should be used during the restore procedure. versionedFiles VersionedMap - fileSizesCache map[string]int64 progress *TotalRestoreProgress } @@ -64,17 +63,6 @@ func newTablesDirWorker(ctx context.Context, w worker, miwc ManifestInfoWithCont if err != nil { return tablesDirWorker{}, errors.Wrap(err, "initialize versioned SSTables") } - if len(versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", versionedFiles, - ) - } - - fileSizesCache, err := buildFilesSizesCache(ctx, w.client, hosts[0], srcDir, versionedFiles) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "build files sizes cache") - } return tablesDirWorker{ worker: w, @@ -86,7 +74,6 @@ func newTablesDirWorker(ctx context.Context, w worker, miwc ManifestInfoWithCont fm: fm, ongoingPr: make([]*RunProgress, len(hosts)), versionedFiles: versionedFiles, - fileSizesCache: fileSizesCache, progress: progress, }, nil } @@ -393,7 +380,7 @@ func (w *tablesDirWorker) startDownload(ctx context.Context, host string, batch ) // Decide which files require to be downloaded in their older version for _, file := range batch { - if v, ok := w.versionedFiles[file]; ok { + if v := w.versionedFiles[file]; v.Version != "" { versionedBatch = append(versionedBatch, v) versionedPr += v.Size } else { @@ -558,7 +545,7 @@ func (w *tablesDirWorker) batchFromIDs(ids []string) []string { func (w *tablesDirWorker) countBatchSize(batch []string) int64 { var batchSize int64 for _, file := range batch { - batchSize += w.fileSizesCache[file] + batchSize += w.versionedFiles[file].Size } return batchSize } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 674d29693..7269d2a76 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -790,20 +790,3 @@ func (w *worker) stopJob(ctx context.Context, jobID int64, host string) { ) } } - -func buildFilesSizesCache(ctx context.Context, client *scyllaclient.Client, host, dir string, versioned VersionedMap) (map[string]int64, error) { - filesSizesCache := make(map[string]int64) - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - } - f := func(item *scyllaclient.RcloneListDirItem) { - filesSizesCache[item.Name] = item.Size - } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { - return nil, errors.Wrapf(err, "host %s: listing all files from %s", host, dir) - } - for k, v := range versioned { - filesSizesCache[k] = v.Size - } - return filesSizesCache, nil -} From 13ba030c6d160ff875e7034a184387a245e440c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 27 Sep 2024 10:06:16 +0200 Subject: [PATCH 04/10] feat(restore): add workload indexing This commit introduces the structure of restore workload. Workload is divided per location->table->remote sstable directory. This changes the hierarchy established by manifests (location->node->table->remote sstable dir). It also aggregates files into actual sstables, extracts their IDs, and aggregates their sizes, and keeps track of sstable versioning. --- pkg/service/restore/index.go | 197 ++++++++++++++++++++++++++++++++++ pkg/service/restore/model.go | 6 ++ pkg/service/restore/worker.go | 9 ++ 3 files changed, 212 insertions(+) create mode 100644 pkg/service/restore/index.go diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go new file mode 100644 index 000000000..61c90e78c --- /dev/null +++ b/pkg/service/restore/index.go @@ -0,0 +1,197 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "context" + + "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/sstable" +) + +// LocationWorkload represents aggregated restore workload +// in given backup location. +type LocationWorkload struct { + Location + + Size int64 + Tables []TableWorkload +} + +// TableWorkload represents restore workload +// from many manifests for given table in given backup location. +type TableWorkload struct { + Location + TableName + + Size int64 + RemoteDirs []RemoteDirWorkload +} + +// RemoteDirWorkload represents restore workload +// for given table and manifest in given backup location. +type RemoteDirWorkload struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +// RemoteSSTable represents SSTable updated with size and version info from remote. +type RemoteSSTable struct { + SSTable // File names might contain versioned snapshot tag extension + Size int64 + Versioned bool +} + +// SSTable represents files creating a single sstable. +type SSTable struct { + ID string + Files []string +} + +// IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) { + var workload []LocationWorkload + for _, l := range locations { + lw, err := w.indexLocationWorkload(ctx, l) + if err != nil { + return nil, errors.Wrapf(err, "index workload in %s", l) + } + workload = append(workload, lw) + } + return workload, nil +} + +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) { + rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") + } + return aggregateLocationWorkload(rawWorkload), nil +} + +func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { + var rawWorkload []RemoteDirWorkload + err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + + sstables, err := filesMetaToSSTables(fm) + if err != nil { + return errors.Wrapf(err, "convert files meta to sstables") + } + sstDir := m.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) + remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, w.randomHostFromLocation(location), sstDir, sstables) + if err != nil { + return errors.Wrap(err, "fetch sstables sizes") + } + + var size int64 + for _, sst := range remoteSSTables { + size += sst.Size + } + t := TableName{ + Keyspace: fm.Keyspace, + Table: fm.Table, + } + workload := RemoteDirWorkload{ + TableName: t, + ManifestInfo: m.ManifestInfo, + RemoteSSTableDir: sstDir, + Size: size, + SSTables: remoteSSTables, + } + rawWorkload = append(rawWorkload, workload) + return nil + }) + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over manifests") + } + return rawWorkload, nil +} + +func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { + remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) + for _, rw := range rawWorkload { + remoteDirWorkloads[rw.TableName] = append(remoteDirWorkloads[rw.TableName], rw) + } + + var tableWorkloads []TableWorkload + for _, tw := range remoteDirWorkloads { + var size int64 + for _, rdw := range tw { + size += rdw.Size + } + tableWorkloads = append(tableWorkloads, TableWorkload{ + Location: tw[0].Location, + TableName: tw[0].TableName, + Size: size, + RemoteDirs: tw, + }) + } + + var size int64 + for _, tw := range tableWorkloads { + size += tw.Size + } + return LocationWorkload{ + Location: tableWorkloads[0].Location, + Size: size, + Tables: tableWorkloads, + } +} + +func (w *tablesWorker) adjustSSTablesWithRemote(ctx context.Context, host, remoteDir string, sstables map[string]SSTable) ([]RemoteSSTable, error) { + versioned, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, host, remoteDir) + if err != nil { + return nil, errors.Wrap(err, "list versioned files") + } + + remoteSSTables := make([]RemoteSSTable, 0, len(sstables)) + for id, sst := range sstables { + rsst := RemoteSSTable{SSTable: SSTable{ID: id}} + for _, f := range sst.Files { + v, ok := versioned[f] + if !ok { + return nil, errors.Errorf("file %s is not present in listed versioned files", f) + } + + rsst.Files = append(rsst.Files, v.FullName()) + rsst.Size += v.Size + rsst.Versioned = rsst.Versioned || v.Version != "" + } + remoteSSTables = append(remoteSSTables, rsst) + } + + return remoteSSTables, nil +} + +func filesMetaToSSTables(fm FilesMeta) (map[string]SSTable, error) { + const expectedSSTableFileCnt = 9 + sstables := make(map[string]SSTable, len(fm.Files)/expectedSSTableFileCnt) + + for _, f := range fm.Files { + id, err := sstable.ExtractID(f) + if err != nil { + return nil, errors.Wrapf(err, "extract sstable component %s generation ID", f) + } + + if sst, ok := sstables[id]; ok { + sst.Files = append(sst.Files, f) + sstables[id] = sst + } else { + sstables[id] = SSTable{ + ID: id, + Files: []string{f}, + } + } + } + return sstables, nil +} diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index baedc9550..55a84b736 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -275,3 +275,9 @@ type ViewProgress struct { Status scyllaclient.ViewBuildStatus `json:"status"` } + +// TableName represents full table name. +type TableName struct { + Keyspace string + Table string +} diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 7269d2a76..8082bf7ff 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "path" "regexp" "slices" @@ -43,6 +44,14 @@ type worker struct { clusterSession gocqlx.Session } +func (w *worker) randomHostFromLocation(loc Location) string { + hosts, ok := w.target.locationHosts[loc] + if !ok { + panic("no hosts for location: " + loc.String()) + } + return hosts[rand.Intn(len(hosts))] +} + func (w *worker) init(ctx context.Context, properties json.RawMessage) error { if err := w.initTarget(ctx, properties); err != nil { return errors.Wrap(err, "init target") From f6b5c0a1c4b62769faa526140c3cd6d6d90aa79b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 14:52:46 +0200 Subject: [PATCH 05/10] feat(restore): index, support resume Indexed workload won't contain sstables that were already restored during previous restore run. --- pkg/service/restore/index.go | 45 ++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 61c90e78c..470fb0e1f 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -4,6 +4,7 @@ package restore import ( "context" + "slices" "github.com/pkg/errors" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" @@ -71,6 +72,12 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat if err != nil { return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") } + if w.target.Continue { + rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") + } + } return aggregateLocationWorkload(rawWorkload), nil } @@ -117,6 +124,44 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo return rawWorkload, nil } +func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + remoteSSTableDirToRestoredIDs := make(map[string][]string) + err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + if validateTimeIsSet(pr.RestoreCompletedAt) { + remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...) + } + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over prev run progress") + } + if len(remoteSSTableDirToRestoredIDs) == 0 { + return rawWorkload, nil + } + + var filtered []RemoteDirWorkload + for _, rw := range rawWorkload { + var filteredSSTables []RemoteSSTable + var size int64 + for _, sst := range rw.SSTables { + if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { + filteredSSTables = append(filteredSSTables, sst) + size += sst.Size + } + } + if len(filteredSSTables) > 0 { + filtered = append(filtered, RemoteDirWorkload{ + TableName: rw.TableName, + ManifestInfo: rw.ManifestInfo, + RemoteSSTableDir: rw.RemoteSSTableDir, + Size: size, + SSTables: filteredSSTables, + }) + } + } + + return filtered, nil +} + func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) for _, rw := range rawWorkload { From 737cccbc2bff88b28ef1bc5216f7cb113b074794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 15:31:32 +0200 Subject: [PATCH 06/10] feat(restore): index, support metrics init --- pkg/service/restore/index.go | 38 ++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 470fb0e1f..140cfdf80 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -7,6 +7,7 @@ import ( "slices" "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/sstable" ) @@ -162,6 +163,43 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW return filtered, nil } +func (w *tablesWorker) initMetrics(workload []LocationWorkload) { + // For now, the only persistent across task runs metrics are progress and remaining_bytes. + // The rest: state, view_build_status, batch_size are calculated from scratch. + w.metrics.ResetClusterMetrics(w.run.ClusterID) + + // Init remaining bytes + for _, wl := range workload { + for _, twl := range wl.Tables { + for _, rdwl := range twl.RemoteDirs { + w.metrics.SetRemainingBytes(metrics.RestoreBytesLabels{ + ClusterID: rdwl.ClusterID.String(), + SnapshotTag: rdwl.SnapshotTag, + Location: rdwl.Location.String(), + DC: rdwl.DC, + Node: rdwl.NodeID, + Keyspace: rdwl.Keyspace, + Table: rdwl.Table, + }, rdwl.Size) + } + } + } + + // Init progress + var totalSize int64 + for _, u := range w.run.Units { + totalSize += u.Size + } + var workloadSize int64 + for _, wl := range workload { + workloadSize += wl.Size + } + w.metrics.SetProgress(metrics.RestoreProgressLabels{ + ClusterID: w.run.ClusterID.String(), + SnapshotTag: w.run.SnapshotTag, + }, float64(totalSize-workloadSize)/float64(totalSize)*100) +} + func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) for _, rw := range rawWorkload { From 72f9c6fd28c949f09e23d4993bc0fffc15f07cb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 24 Sep 2024 16:18:55 +0200 Subject: [PATCH 07/10] feat(restore): add primitive batching using indexed workload This is a temporary implementation used for integrating workload indexing with the rest of the code. It will be improved as a part of the #3979. --- pkg/service/restore/batch.go | 175 +++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 pkg/service/restore/batch.go diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go new file mode 100644 index 000000000..b432e653d --- /dev/null +++ b/pkg/service/restore/batch.go @@ -0,0 +1,175 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "slices" + "sync" + + "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +type batchDispatcher struct { + mu sync.Mutex + workload []LocationWorkload + batchSize int + locationHosts map[Location][]string +} + +func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher { + return &batchDispatcher{ + mu: sync.Mutex{}, + workload: workload, + batchSize: batchSize, + locationHosts: locationHosts, + } +} + +type batch struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +func (b batch) NotVersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if !sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSize() int64 { + var size int64 + for _, sst := range b.SSTables { + if sst.Versioned { + size += sst.Size + } + } + return size +} + +func (b batch) IDs() []string { + var ids []string + for _, sst := range b.SSTables { + ids = append(ids, sst.ID) + } + return ids +} + +// ValidateAllDispatched returns error if not all sstables were dispatched. +func (b *batchDispatcher) ValidateAllDispatched() error { + for _, lw := range b.workload { + if lw.Size != 0 { + for _, tw := range lw.Tables { + if tw.Size != 0 { + for _, dw := range tw.RemoteDirs { + if dw.Size != 0 || len(dw.SSTables) != 0 { + return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)", + dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)", + tw.Location, tw.Keyspace, tw.Table, tw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)", + lw.Location, lw.Size) + } + } + return nil +} + +// DispatchBatch batch to be restored or false when there is no more work to do. +func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + l := b.chooseLocation(host) + if l == nil { + return batch{}, false + } + t := b.chooseTable(l) + if t == nil { + return batch{}, false + } + dir := b.chooseRemoteDir(t) + if dir == nil { + return batch{}, false + } + out := b.createBatch(l, t, dir) + return out, true +} + +// Returns location for which batch should be created. +func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload { + for i := range b.workload { + if b.workload[i].Size == 0 { + continue + } + if slices.Contains(b.locationHosts[b.workload[i].Location], host) { + return &b.workload[i] + } + } + return nil +} + +// Returns table for which batch should be created. +func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { + for i := range location.Tables { + if location.Tables[i].Size == 0 { + continue + } + return &location.Tables[i] + } + return nil +} + +// Return remote dir for which batch should be created. +func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { + for i := range table.RemoteDirs { + if table.RemoteDirs[i].Size == 0 { + continue + } + return &table.RemoteDirs[i] + } + return nil +} + +// Returns batch and updates RemoteDirWorkload and its parents. +func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch { + i := min(b.batchSize, len(dir.SSTables)) + sstables := dir.SSTables[:i] + dir.SSTables = dir.SSTables[i:] + + var size int64 + for _, sst := range sstables { + size += sst.Size + } + dir.Size -= size + t.Size -= size + l.Size -= size + return batch{ + TableName: dir.TableName, + ManifestInfo: dir.ManifestInfo, + RemoteSSTableDir: dir.RemoteSSTableDir, + Size: size, + SSTables: sstables, + } +} From c0edb79c83d9315ada64977046d83e3424594751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 25 Sep 2024 13:02:53 +0200 Subject: [PATCH 08/10] feat(restore): integrate new indexing and batching with codebase This commit makes use of the new indexing and batching approaches and uses them in the restore tables codebase. --- pkg/service/restore/model.go | 4 - pkg/service/restore/service.go | 8 +- .../service_restore_integration_test.go | 11 +- pkg/service/restore/tables_worker.go | 129 +++-- pkg/service/restore/tablesdir_worker.go | 505 +++--------------- pkg/service/restore/worker.go | 5 + 6 files changed, 166 insertions(+), 496 deletions(-) diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 55a84b736..c4d4a26b8 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -214,10 +214,6 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run return iter.Close() } -func (pr *RunProgress) idCnt() int64 { - return int64(len(pr.SSTableID)) -} - func (pr *RunProgress) setRestoreStartedAt() { t := timeutc.Now() pr.RestoreStartedAt = &t diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index 523bf01f2..dfe1fbc7a 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -115,8 +115,12 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI for _, unit := range w.run.Units { totalBytesToRestore += unit.Size } - tw := newTablesWorker(w, s.repairSvc, totalBytesToRestore) - err = tw.restore(ctx) + tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore) + if workerErr != nil { + err = workerErr + } else { + err = tw.restore(ctx) + } } else { sw := &schemaWorker{worker: w} err = sw.restore(ctx) diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index a2e784c94..5ff1d7c86 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -852,15 +852,6 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) - if err != nil { - t.Fatal(err) - } - Printf("And: restore progress: %+#v\n", pr) - if pr.Downloaded == 0 { - t.Fatal("Expected partial restore progress") - } - Print("When: resume restore and stop in during repair") dstH.RunID = uuid.MustRandom() err = dstH.service.Restore(ctx2, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)) @@ -872,7 +863,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err = dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) + pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) if err != nil { t.Fatal(err) } diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index fbdf3d188..5d77c52bb 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -9,17 +9,21 @@ import ( "sync" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" + "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" + "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) type tablesWorker struct { worker - repairSvc *repair.Service - progress *TotalRestoreProgress + tableVersion map[TableName]string + repairSvc *repair.Service + progress *TotalRestoreProgress } // TotalRestoreProgress is a struct that holds information about the total progress of the restore job. @@ -61,12 +65,27 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) { p.restoredBytes += bytesRestored } -func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) *tablesWorker { - return &tablesWorker{ - worker: w, - repairSvc: repairSvc, - progress: NewTotalRestoreProgress(totalBytes), +func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) { + versions := make(map[TableName]string) + for _, u := range w.run.Units { + for _, t := range u.Tables { + v, err := query.GetTableVersion(w.clusterSession, u.Keyspace, t.Table) + if err != nil { + return nil, errors.Wrapf(err, "get %s.%s version", u.Keyspace, t.Table) + } + versions[TableName{ + Keyspace: u.Keyspace, + Table: t.Table, + }] = v + } } + + return &tablesWorker{ + worker: w, + tableVersion: versions, + repairSvc: repairSvc, + progress: NewTotalRestoreProgress(totalBytes), + }, nil } // restore files from every location specified in restore target. @@ -149,66 +168,60 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "Started restoring tables") defer w.logger.Info(ctx, "Restoring tables finished") - // Restore locations in deterministic order - for _, l := range w.target.Location { - if err := w.restoreLocation(ctx, l); err != nil { - return err - } + workload, err := w.IndexWorkload(ctx, w.target.Location) + if err != nil { + return err } - return nil -} - -func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) error { - w.logger.Info(ctx, "Restoring location", "location", location) - defer w.logger.Info(ctx, "Restoring location finished", "location", location) + w.initMetrics(workload) - restoreManifest := func(miwc ManifestInfoWithContent) error { - w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) - defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) - - return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc)) + bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts) + hostsS := strset.New() + for _, h := range w.target.locationHosts { + hostsS.Add(h...) } + hosts := hostsS.List() + + f := func(n int) (err error) { + h := hosts[n] + for { + // Download and stream in parallel + b, ok := bd.DispatchBatch(h) + if !ok { + w.logger.Info(ctx, "No more batches to restore", "host", h) + return nil + } + w.metrics.IncreaseBatchSize(w.run.ClusterID, h, b.Size) + w.logger.Info(ctx, "Got batch to restore", + "host", h, + "keyspace", b.Keyspace, + "table", b.Table, + "size", b.Size, + "sstable count", len(b.SSTables), + ) - return w.forEachManifest(ctx, location, restoreManifest) -} - -func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error { - return func(fm FilesMeta) error { - if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { - return nil - } - - w.logger.Info(ctx, "Restoring table", "keyspace", fm.Keyspace, "table", fm.Table) - defer w.logger.Info(ctx, "Restoring table finished", "keyspace", fm.Keyspace, "table", fm.Table) - w.insertRun(ctx) - - dw, err := newTablesDirWorker(ctx, w.worker, miwc, fm, w.progress) - if err != nil { - return errors.Wrap(err, "create dir worker") - } - if err := dw.resumePrevProgress(); err != nil { - return errors.Wrap(err, "resume prev run progress") - } - - if err := dw.restore(ctx); err != nil { - if ctx.Err() != nil { - return ctx.Err() + pr, err := w.newRunProgress(ctx, h, b) + if err != nil { + return errors.Wrap(err, "create new run progress") } - // In case all SSTables have been restored, restore can proceed even - // with errors from some hosts. - if len(dw.bundleIDPool) > 0 { - return errors.Wrapf(err, "not restored bundles %v", dw.bundleIDPool.drain()) + if err := w.restoreBatch(ctx, b, pr); err != nil { + return errors.Wrap(err, "restore batch") } - - w.logger.Error(ctx, "Restore table failed on some hosts but restore will proceed", - "keyspace", fm.Keyspace, - "table", fm.Table, - "error", err, - ) + w.decreaseRemainingBytesMetric(b) } + } + + notify := func(n int, err error) { + w.logger.Error(ctx, "Failed to restore files on host", + "host", hosts[n], + "error", err, + ) + } - return nil + err = parallel.Run(len(hosts), w.target.Parallel, f, notify) + if err == nil { + return bd.ValidateAllDispatched() } + return err } func (w *tablesWorker) stageRepair(ctx context.Context) error { diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index 5e5bab453..990efc9ad 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -8,177 +8,43 @@ import ( "time" "github.com/pkg/errors" - "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" - "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" - "github.com/scylladb/scylla-manager/v3/pkg/util/query" - "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - "go.uber.org/atomic" ) -// tablesDirWorker is responsible for restoring table files from manifest in parallel. -type tablesDirWorker struct { - worker - - bundles map[string]bundle // Maps bundle to it's ID - bundleIDPool idPool // SSTable IDs yet to be restored - - dstDir string // "data:" prefixed path to upload dir (common for every host) - srcDir string // Full path to remote directory with backed-up files - miwc ManifestInfoWithContent // Manifest containing fm - fm FilesMeta // Describes table and it's files located in srcDir - - ongoingPr []*RunProgress // Unfinished RunProgress from previous run of each host - - // Maps original SSTable name to its existing older version - // (with respect to currently restored snapshot tag) - // that should be used during the restore procedure. - versionedFiles VersionedMap - progress *TotalRestoreProgress -} - -func newTablesDirWorker(ctx context.Context, w worker, miwc ManifestInfoWithContent, fm FilesMeta, progress *TotalRestoreProgress) (tablesDirWorker, error) { - bundles := newBundles(fm) - bundleIDPool := newIDPool(bundles) - - version, err := query.GetTableVersion(w.clusterSession, fm.Keyspace, fm.Table) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "get table version") - } - - srcDir := miwc.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) - dstDir := UploadTableDir(fm.Keyspace, fm.Table, version) - w.logger.Info(ctx, "Found table's src and dst dirs", - "keyspace", fm.Keyspace, - "table", fm.Table, - "src_dir", srcDir, - "dst_dir", dstDir, - ) - - hosts := w.target.locationHosts[miwc.Location] - versionedFiles, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, hosts[0], srcDir) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "initialize versioned SSTables") - } - - return tablesDirWorker{ - worker: w, - bundles: bundles, - bundleIDPool: bundleIDPool, - dstDir: dstDir, - srcDir: srcDir, - miwc: miwc, - fm: fm, - ongoingPr: make([]*RunProgress, len(hosts)), - versionedFiles: versionedFiles, - progress: progress, - }, nil -} - -// restore SSTables of receivers manifest/table in parallel. -func (w *tablesDirWorker) restore(ctx context.Context) error { - // Count of SSTable IDs yet to be successfully restored - ctr := atomic.NewInt64(w.bundleIDPool.size()) - for _, pr := range w.ongoingPr { - if pr != nil { - ctr.Add(pr.idCnt()) - } - } - - if ctr.Load() == 0 { - w.logger.Info(ctx, "Table does not have any more SSTables to restore", - "keyspace", w.fm.Keyspace, - "table", w.fm.Table, - ) - return nil - } - - hosts := w.target.locationHosts[w.miwc.Location] - f := func(n int) (err error) { - h := hosts[n] - ongoingPr := w.ongoingPr[n] - - // First handle ongoing restore - if ongoingPr != nil { - if err := w.reactivateRunProgress(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "reactivate run progress") - } - - if err := w.restoreBatch(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "restore reactivated batch") - } - - if ctr.Sub(ongoingPr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - - for { - pr, err := w.newRunProgress(ctx, h) - if err != nil { - return errors.Wrap(err, "create run progress") - } - if pr == nil { - w.logger.Info(ctx, "No more batches to restore", "host", h) - return nil - } - - if err := w.restoreBatch(ctx, pr); err != nil { - return errors.Wrap(err, "restore batch") - } - - if ctr.Sub(pr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - } - - notify := func(n int, err error) { - w.logger.Error(ctx, "Failed to restore files on host", - "host", hosts[n], - "error", err, - ) - } - - return parallel.Run(len(hosts), w.target.Parallel, f, notify) -} - -func (w *tablesDirWorker) restoreBatch(ctx context.Context, pr *RunProgress) (err error) { +func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgress) (err error) { defer func() { // Run cleanup on non-pause error - if err != nil && ctx.Err() == nil { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) + if err != nil { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) w.cleanupRunProgress(context.Background(), pr) } else { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) } }() // Download has already been started on RunProgress creation. // Skip steps already done in the previous run. if !validateTimeIsSet(pr.DownloadCompletedAt) { - if err := w.waitJob(ctx, pr); err != nil { + if err := w.waitJob(ctx, b, pr); err != nil { return errors.Wrap(err, "wait for job") } } if !validateTimeIsSet(pr.RestoreCompletedAt) { - if err := w.restoreSSTables(ctx, pr); err != nil { + if err := w.restoreSSTables(ctx, b, pr); err != nil { return errors.Wrap(err, "call load and stream") } } - - w.decreaseRemainingBytesMetric(pr.Downloaded + pr.Skipped + pr.VersionedProgress) w.logger.Info(ctx, "Restored batch", "host", pr.Host, "sstable_id", pr.SSTableID) return nil } // waitJob waits for rclone job to finish while updating its progress. -func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err error) { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.miwc.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) +func (w *tablesWorker) waitJob(ctx context.Context, b batch, pr *RunProgress) (err error) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) w.logger.Info(ctx, "Waiting for job", "host", pr.Host, "job_id", pr.AgentJobID) defer func() { @@ -216,7 +82,7 @@ func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err err } } -func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { +func (w *tablesWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { // Set StartedAt and CompletedAt based on Job if t := time.Time(job.StartedAt); !t.IsZero() { pr.DownloadStartedAt = &t @@ -232,8 +98,8 @@ func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunPro w.insertRunProgress(ctx, pr) } -func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) error { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) +func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProgress) error { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) if !validateTimeIsSet(pr.RestoreStartedAt) { pr.setRestoreStartedAt() w.insertRunProgress(ctx, pr) @@ -247,109 +113,19 @@ func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) return err } -func (w *tablesDirWorker) resumePrevProgress() error { - bind := &RunProgress{ - ClusterID: w.run.ClusterID, - TaskID: w.run.TaskID, - RunID: w.run.ID, - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, - } - - // All bundles IDs started in the previous run - startedID := strset.New() - // All unfinished RunProgress started in the previous run - ongoingPr := make(map[string]*RunProgress) - err := bind.ForEachTableProgress(w.session, func(pr *RunProgress) { - startedID.Add(pr.SSTableID...) - if !validateTimeIsSet(pr.RestoreCompletedAt) { - cp := *pr - ongoingPr[pr.Host] = &cp - } - }) - if err != nil { - return err - } - - // Remove already started ID from the pool - ids := w.bundleIDPool.drain() - for _, id := range ids { - if !startedID.Has(id) { - w.bundleIDPool <- id - } - } - - hosts := w.target.locationHosts[w.miwc.Location] - // Set ongoing RunProgress so that they can be resumed - for i, h := range hosts { - w.ongoingPr[i] = ongoingPr[h] - } - return nil -} - -// reactivateRunProgress preserves batch assembled in the previous run and tries to reuse its unfinished rclone job. -func (w *tablesDirWorker) reactivateRunProgress(ctx context.Context, pr *RunProgress) error { - // Nothing to do if download has already finished - if validateTimeIsSet(pr.DownloadCompletedAt) { - return nil - } - - job, err := w.client.RcloneJobProgress(ctx, pr.Host, pr.AgentJobID, w.config.LongPollingTimeoutSeconds) - if err != nil { - return errors.Wrapf(err, "get progress of rclone job %d", pr.AgentJobID) - } - // Nothing to do if rclone job is still running - if scyllaclient.WorthWaitingForJob(job.Status) { - return nil - } - - // Recreate rclone job - batch := w.batchFromIDs(pr.SSTableID) - if err := w.cleanUploadDir(ctx, pr.Host, w.dstDir, batch); err != nil { - return errors.Wrapf(err, "clean upload dir of host %s", pr.Host) - } - - jobID, versionedPr, err := w.startDownload(ctx, pr.Host, batch) - if err != nil { - w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - return err - } - - pr.AgentJobID = jobID - pr.VersionedProgress = versionedPr - w.insertRunProgress(ctx, pr) - return nil -} - -// newRunProgress creates RunProgress by assembling batch and starting download to host's upload dir. -func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*RunProgress, error) { +// newRunProgress creates RunProgress by starting download to host's upload dir. +func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) (*RunProgress, error) { if err := w.checkAvailableDiskSpace(ctx, host); err != nil { return nil, errors.Wrap(err, "validate free disk space") } - takenIDs := w.chooseIDsForBatch(ctx, w.target.BatchSize, host) - if ctx.Err() != nil { - w.returnBatchToPool(takenIDs, host) - return nil, ctx.Err() - } - if takenIDs == nil { - return nil, nil //nolint: nilnil - } - - w.logger.Info(ctx, "Created new batch", - "host", host, - "sstable_id", takenIDs, - ) - - batch := w.batchFromIDs(takenIDs) - if err := w.cleanUploadDir(ctx, host, w.dstDir, nil); err != nil { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + if err := w.cleanUploadDir(ctx, host, uploadDir, nil); err != nil { return nil, errors.Wrapf(err, "clean upload dir of host %s", host) } - jobID, versionedPr, err := w.startDownload(ctx, host, batch) + jobID, versionedPr, err := w.startDownload(ctx, host, b) if err != nil { - w.returnBatchToPool(takenIDs, host) return nil, err } @@ -357,11 +133,12 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + RemoteSSTableDir: b.RemoteSSTableDir, + Keyspace: b.Keyspace, + Table: b.Table, Host: host, AgentJobID: jobID, - SSTableID: takenIDs, + SSTableID: b.IDs(), VersionedProgress: versionedPr, } @@ -373,135 +150,86 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run // Downloading of versioned files happens first in a synchronous way. // It returns jobID for asynchronous download of the newest versions of files // alongside with the size of the already downloaded versioned files. -func (w *tablesDirWorker) startDownload(ctx context.Context, host string, batch []string) (jobID, versionedPr int64, err error) { - var ( - regularBatch = make([]string, 0) - versionedBatch = make([]VersionedSSTable, 0) - ) - // Decide which files require to be downloaded in their older version - for _, file := range batch { - if v := w.versionedFiles[file]; v.Version != "" { - versionedBatch = append(versionedBatch, v) - versionedPr += v.Size - } else { - regularBatch = append(regularBatch, file) +func (w *tablesWorker) startDownload(ctx context.Context, host string, b batch) (jobID, versionedPr int64, err error) { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + sstables := b.NotVersionedSSTables() + versioned := b.VersionedSSTables() + versionedSize := b.VersionedSize() + if len(versioned) > 0 { + if err := w.downloadVersioned(ctx, host, b.RemoteSSTableDir, uploadDir, versioned); err != nil { + return 0, 0, errors.Wrapf(err, "download versioned sstabled on host %s", host) } } - // Downloading versioned files requires us to rename them (strip version extension) - // and function RcloneCopyPaths lacks this option. In order to achieve that, we copy - // all versioned files one by one with RcloneCopyFile (which supports renaming files). - // The assumption is that the existence of versioned files is low and that they - // are rather small, so we can do it in a synchronous way. - // Copying files can be done in full parallel because of rclone ability to limit transfers. - f := func(i int) error { - file := versionedBatch[i] - // Restore file without its version extension - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - - if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { - return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) - } - - w.logger.Info(ctx, "Downloaded versioned file", - "host", host, - "src", src, - "dst", dst, - "size", file.Size, - ) - - return nil - } - notify := func(i int, err error) { - file := versionedBatch[i] - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - w.logger.Error(ctx, "Failed to download versioned SSTable", - "file", file, - "dst", dst, - "src", src, - "error", err, - ) - } - - if err := parallel.Run(len(versionedBatch), parallel.NoLimit, f, notify); err != nil { - return 0, 0, err - } // Start asynchronous job for downloading the newest versions of remaining files - jobID, err = w.client.RcloneCopyPaths(ctx, host, w.dstDir, w.srcDir, regularBatch) + files := make([]string, 0) + for _, sst := range sstables { + files = append(files, sst.Files...) + } + jobID, err = w.client.RcloneCopyPaths(ctx, host, uploadDir, b.RemoteSSTableDir, files) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } - w.logger.Info(ctx, "Started downloading files", "host", host, "job_id", jobID, - "batch", regularBatch, ) - - return jobID, versionedPr, nil + return jobID, versionedSize, nil } -// chooseIDsForBatch returns slice of IDs of SSTables that the batch consists of. -func (w *tablesDirWorker) chooseIDsForBatch(ctx context.Context, size int, host string) (takenIDs []string) { - defer func() { - w.increaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(takenIDs), host) - }() - - // All hosts are trying to get IDs for batch from the pool. - // Pool is closed after the whole table has been restored. - - // Take at most size IDs - for i := 0; i < size; i++ { - select { - case <-ctx.Done(): - return takenIDs - default: - } - - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - default: - // Don't wait for more IDs if the pool is empty - // and host already has something to restore. - if len(takenIDs) > 0 { - return takenIDs - } - // Here host hasn't taken any IDs and pool is empty, - // so it waits for the whole table to be restored or - // for IDs that might return to the pool in case of error on the other hosts. - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - case <-ctx.Done(): - return takenIDs +// Downloading versioned files requires us to rename them (strip version extension) +// and function RcloneCopyPaths lacks this option. In order to achieve that, we copy +// all versioned files one by one with RcloneCopyFile (which supports renaming files). +// The assumption is that the existence of versioned files is low and that they +// are rather small, so we can do it in a synchronous way. +// Copying files can be done in full parallel because of rclone ability to limit transfers. +func (w *tablesWorker) downloadVersioned(ctx context.Context, host, srcDir, dstDir string, versioned []RemoteSSTable) error { + f := func(i int) error { + sst := versioned[i] + for _, file := range sst.Files { + name, _ := SplitNameAndVersion(file) + // Restore file without its version extension + dst := path.Join(dstDir, name) + src := path.Join(srcDir, file) + if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { + return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) } } + w.logger.Info(ctx, "Downloaded versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + ) + return nil + } + + notify := func(i int, err error) { + sst := versioned[i] + w.logger.Error(ctx, "Failed to download versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + "error", err, + ) } - return takenIDs + return parallel.Run(len(versioned), parallel.NoLimit, f, notify) } -func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { +func (w *tablesWorker) decreaseRemainingBytesMetric(b batch) { labels := metrics.RestoreBytesLabels{ ClusterID: w.run.ClusterID.String(), SnapshotTag: w.run.SnapshotTag, - Location: w.miwc.Location.String(), - DC: w.miwc.DC, - Node: w.miwc.NodeID, - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + Location: b.Location.String(), + DC: b.DC, + Node: b.NodeID, + Keyspace: b.Keyspace, + Table: b.Table, } - w.metrics.DecreaseRemainingBytes(labels, bytes) - w.progress.Update(bytes) + w.metrics.DecreaseRemainingBytes(labels, b.Size) + w.progress.Update(b.Size) progressLabels := metrics.RestoreProgressLabels{ ClusterID: w.run.ClusterID.String(), @@ -510,80 +238,13 @@ func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { w.metrics.SetProgress(progressLabels, w.progress.CurrentProgress()) } -func (w *tablesDirWorker) increaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.IncreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) decreaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.DecreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { +func (w *tablesWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - - if cleanErr := w.cleanUploadDir(ctx, pr.Host, w.dstDir, nil); cleanErr != nil { - w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) - } -} - -func (w *tablesDirWorker) returnBatchToPool(ids []string, host string) { - w.decreaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(ids), host) - for _, id := range ids { - w.bundleIDPool <- id - } -} - -func (w *tablesDirWorker) batchFromIDs(ids []string) []string { - var batch []string - for _, id := range ids { - batch = append(batch, w.bundles[id]...) - } - return batch -} - -func (w *tablesDirWorker) countBatchSize(batch []string) int64 { - var batchSize int64 - for _, file := range batch { - batchSize += w.versionedFiles[file].Size - } - return batchSize -} - -// bundle represents SSTables with the same ID. -type bundle []string - -func newBundles(fm FilesMeta) map[string]bundle { - bundles := make(map[string]bundle) - for _, f := range fm.Files { - id, err := sstable.ExtractID(f) - if err != nil { - panic(err) - } - bundles[id] = append(bundles[id], f) + tn := TableName{ + Keyspace: pr.Keyspace, + Table: pr.Table, } - return bundles -} - -// idPool represents pool of SSTableIDs yet to be restored. -type idPool chan string - -func (p idPool) drain() []string { - var out []string - for len(p) > 0 { - out = append(out, <-p) - } - return out -} - -func (p idPool) size() int64 { - return int64(len(p)) -} - -func newIDPool(bundles map[string]bundle) chan string { - bundleIDPool := make(chan string, len(bundles)) - for id := range bundles { - bundleIDPool <- id + if cleanErr := w.cleanUploadDir(ctx, pr.Host, UploadTableDir(pr.Keyspace, pr.Table, w.tableVersion[tn]), nil); cleanErr != nil { + w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) } - return bundleIDPool } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 8082bf7ff..8984a934a 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -689,6 +689,11 @@ func (w *worker) clonePrevProgress(ctx context.Context) { defer q.Release() err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.PrevID, func(pr *RunProgress) { + // We don't support interrupted run progresses resume, + // so only finished run progresses should be copied. + if !validateTimeIsSet(pr.RestoreCompletedAt) { + return + } pr.RunID = w.run.ID if err := q.BindStruct(pr).Exec(); err != nil { w.logger.Error(ctx, "Couldn't clone run progress", From ad8a33c4ba2c105c5fa2cc9df708bcfb0bc0fb44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Thu, 26 Sep 2024 14:37:37 +0200 Subject: [PATCH 09/10] fix(restore): handle download of fully versioned batch Recent commits changed versioned batch download so that if any sstable component is versioned, then all sstable components are downloaded as versioned files. It was done in that way to allow easier versioned progress calculation (we don't store per file size, only the whole sstable size). This brought to light a bug (that existed before, but was more difficult to hit), in which restoring batch failed when the whole batch was versioned, as calling RcloneSyncCopyPaths on empty paths parameter resulted in broken download. We could just skip the RcloneSyncCopyPaths call when the whole batch is versioned, but this would leave us without the agentJobID which is a part of sort key in RestoreRunProgress. Without it, we could potentially overwrite one restore run progress with another - if both of them happened on the same RemoteSSTableDir, by the same Host, and were fully versioned. It would also introduce a different path for restoring regular batch and fully versioned batch, which is not desirable. That's why I decided to modify rclone server to allow empty path parameter, so that it still generates agentJobID, but it doesn't do anything except for that. --- pkg/rclone/rcserver/rc.go | 4 +++- pkg/scyllaclient/client_rclone.go | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 639d0848e..0e0537d07 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -554,7 +554,9 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) { if err != nil { return nil, err } - + if len(paths) == 0 { + return nil, nil + } return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false) } } diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index 4db3e2782..b4f415353 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -280,6 +280,9 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem if err != nil { return 0, err } + if paths == nil { + paths = make([]string, 0) + } p := operations.SyncCopyPathsParams{ Context: forceHost(ctx, host), From 503bd5c5c57dd99701fae3e71aa90ba0f47d5f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 2 Oct 2024 20:24:36 +0200 Subject: [PATCH 10/10] feat(restore): index, log workload info Workload info contains location/table/remote sstable dir sstable count, total size, max and average sstable size. --- pkg/service/restore/index.go | 69 +++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 140cfdf80..dd7b7b72c 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -74,12 +74,14 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") } if w.target.Continue { - rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload) + rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload) if err != nil { return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") } } - return aggregateLocationWorkload(rawWorkload), nil + workload := aggregateLocationWorkload(rawWorkload) + w.logWorkloadInfo(ctx, workload) + return workload, nil } func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { @@ -115,7 +117,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo Size: size, SSTables: remoteSSTables, } - rawWorkload = append(rawWorkload, workload) + if size > 0 { + rawWorkload = append(rawWorkload, workload) + } return nil }) }) @@ -125,7 +129,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo return rawWorkload, nil } -func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { +func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + w.logger.Info(ctx, "Filter out previously restored sstables") + remoteSSTableDirToRestoredIDs := make(map[string][]string) err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { if validateTimeIsSet(pr.RestoreCompletedAt) { @@ -139,7 +145,11 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW return rawWorkload, nil } - var filtered []RemoteDirWorkload + var ( + filtered []RemoteDirWorkload + skippedCount int + skippedSize int64 + ) for _, rw := range rawWorkload { var filteredSSTables []RemoteSSTable var size int64 @@ -147,6 +157,9 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { filteredSSTables = append(filteredSSTables, sst) size += sst.Size + } else { + skippedCount++ + skippedSize += sst.Size } } if len(filteredSSTables) > 0 { @@ -157,9 +170,12 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW Size: size, SSTables: filteredSSTables, }) + } else { + w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir) } } + w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize) return filtered, nil } @@ -200,6 +216,49 @@ func (w *tablesWorker) initMetrics(workload []LocationWorkload) { }, float64(totalSize-workloadSize)/float64(totalSize)*100) } +func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) { + if workload.Size == 0 { + return + } + var locMax, locCnt int64 + for _, twl := range workload.Tables { + if twl.Size == 0 { + continue + } + var tabMax, tabCnt int64 + for _, rdwl := range twl.RemoteDirs { + if rdwl.Size == 0 { + continue + } + var dirMax int64 + for _, sst := range rdwl.SSTables { + dirMax = max(dirMax, sst.Size) + } + dirCnt := int64(len(rdwl.SSTables)) + w.logger.Info(ctx, "Remote sstable dir workload info", + "path", rdwl.RemoteSSTableDir, + "max size", dirMax, + "average size", rdwl.Size/dirCnt, + "count", dirCnt) + tabCnt += dirCnt + tabMax = max(tabMax, dirMax) + } + w.logger.Info(ctx, "Table workload info", + "keyspace", twl.Keyspace, + "table", twl.Table, + "max size", tabMax, + "average size", twl.Size/tabCnt, + "count", tabCnt) + locCnt += tabCnt + locMax = max(locMax, tabMax) + } + w.logger.Info(ctx, "Location workload info", + "location", workload.Location.String(), + "max size", locMax, + "average size", workload.Size/locCnt, + "count", locCnt) +} + func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) for _, rw := range rawWorkload {