diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 639d0848e..0e0537d07 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -554,7 +554,9 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) { if err != nil { return nil, err } - + if len(paths) == 0 { + return nil, nil + } return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false) } } diff --git a/pkg/schema/table/table.go b/pkg/schema/table/table.go index 1e5bf6351..5dcbeefcc 100644 --- a/pkg/schema/table/table.go +++ b/pkg/schema/table/table.go @@ -188,14 +188,10 @@ var ( Columns: []string{ "cluster_id", "id", - "keyspace_name", - "location", - "manifest_path", "prev_id", "repair_task_id", "snapshot_tag", "stage", - "table_name", "task_id", "units", "views", diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index 4db3e2782..b4f415353 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -280,6 +280,9 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem if err != nil { return 0, err } + if paths == nil { + paths = make([]string, 0) + } p := operations.SyncCopyPathsParams{ Context: forceHost(ctx, host), diff --git a/pkg/service/backup/backupspec/versioning.go b/pkg/service/backup/backupspec/versioning.go index 103502d13..af3320394 100644 --- a/pkg/service/backup/backupspec/versioning.go +++ b/pkg/service/backup/backupspec/versioning.go @@ -21,13 +21,18 @@ import ( // (e.g. older version of 'md-2-big-Data.db' could be 'md-2-big-Data.db.sm_20230114183231UTC') // Note, that the newest version of SSTable does not have snapshot tag extension. type VersionedSSTable struct { - Name string // Original SSTable name (e.g. md-2-big-Data.db) - Version string // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC) + Name string // Original SSTable name (e.g. md-2-big-Data.db) + // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC). + // Empty version describes not versioned (newest version) of sstable without the snapshot tag suffix. + Version string Size int64 } // FullName returns versioned file name. func (vt VersionedSSTable) FullName() string { + if vt.Version == "" { + return vt.Name + } return vt.Name + "." + vt.Version } @@ -66,6 +71,9 @@ func IsVersionedFileRemovable(oldest time.Time, versioned string) (bool, error) // SplitNameAndVersion splits versioned file name into its original name and its version. func SplitNameAndVersion(versioned string) (name, version string) { versionExt := path.Ext(versioned) + if versionExt == "" || !IsSnapshotTag(versionExt[1:]) { + return versioned, "" + } baseName := strings.TrimSuffix(versioned, versionExt) return baseName, versionExt[1:] } @@ -74,11 +82,7 @@ func SplitNameAndVersion(versioned string) (name, version string) { func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, dir string) (VersionedMap, error) { versionedFiles := make(VersionedMap) allVersions := make(map[string][]VersionedSSTable) - - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - VersionedOnly: true, - } + opts := &scyllaclient.RcloneListDirOpts{FilesOnly: true} f := func(item *scyllaclient.RcloneListDirItem) { name, version := SplitNameAndVersion(item.Name) allVersions[name] = append(allVersions[name], VersionedSSTable{ @@ -87,7 +91,6 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh Size: item.Size, }) } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { return nil, errors.Wrapf(err, "host %s: listing versioned files", host) } @@ -97,9 +100,17 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh return nil, err } // Chose correct version with respect to currently restored snapshot tag - for _, versions := range allVersions { - var candidate VersionedSSTable + for name, versions := range allVersions { + var ( + candidate VersionedSSTable + newest VersionedSSTable + ) for _, v := range versions { + if v.Version == "" { + newest = v + continue + } + tagT, err := SnapshotTagTime(v.Version) if err != nil { return nil, err @@ -111,8 +122,10 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh } } - if candidate.Version != "" { - versionedFiles[candidate.Name] = candidate + if candidate.Version == "" { + versionedFiles[name] = newest + } else { + versionedFiles[name] = candidate } } diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go new file mode 100644 index 000000000..b432e653d --- /dev/null +++ b/pkg/service/restore/batch.go @@ -0,0 +1,175 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "slices" + "sync" + + "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +type batchDispatcher struct { + mu sync.Mutex + workload []LocationWorkload + batchSize int + locationHosts map[Location][]string +} + +func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher { + return &batchDispatcher{ + mu: sync.Mutex{}, + workload: workload, + batchSize: batchSize, + locationHosts: locationHosts, + } +} + +type batch struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +func (b batch) NotVersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if !sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSize() int64 { + var size int64 + for _, sst := range b.SSTables { + if sst.Versioned { + size += sst.Size + } + } + return size +} + +func (b batch) IDs() []string { + var ids []string + for _, sst := range b.SSTables { + ids = append(ids, sst.ID) + } + return ids +} + +// ValidateAllDispatched returns error if not all sstables were dispatched. +func (b *batchDispatcher) ValidateAllDispatched() error { + for _, lw := range b.workload { + if lw.Size != 0 { + for _, tw := range lw.Tables { + if tw.Size != 0 { + for _, dw := range tw.RemoteDirs { + if dw.Size != 0 || len(dw.SSTables) != 0 { + return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)", + dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)", + tw.Location, tw.Keyspace, tw.Table, tw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)", + lw.Location, lw.Size) + } + } + return nil +} + +// DispatchBatch batch to be restored or false when there is no more work to do. +func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + l := b.chooseLocation(host) + if l == nil { + return batch{}, false + } + t := b.chooseTable(l) + if t == nil { + return batch{}, false + } + dir := b.chooseRemoteDir(t) + if dir == nil { + return batch{}, false + } + out := b.createBatch(l, t, dir) + return out, true +} + +// Returns location for which batch should be created. +func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload { + for i := range b.workload { + if b.workload[i].Size == 0 { + continue + } + if slices.Contains(b.locationHosts[b.workload[i].Location], host) { + return &b.workload[i] + } + } + return nil +} + +// Returns table for which batch should be created. +func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { + for i := range location.Tables { + if location.Tables[i].Size == 0 { + continue + } + return &location.Tables[i] + } + return nil +} + +// Return remote dir for which batch should be created. +func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { + for i := range table.RemoteDirs { + if table.RemoteDirs[i].Size == 0 { + continue + } + return &table.RemoteDirs[i] + } + return nil +} + +// Returns batch and updates RemoteDirWorkload and its parents. +func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch { + i := min(b.batchSize, len(dir.SSTables)) + sstables := dir.SSTables[:i] + dir.SSTables = dir.SSTables[i:] + + var size int64 + for _, sst := range sstables { + size += sst.Size + } + dir.Size -= size + t.Size -= size + l.Size -= size + return batch{ + TableName: dir.TableName, + ManifestInfo: dir.ManifestInfo, + RemoteSSTableDir: dir.RemoteSSTableDir, + Size: size, + SSTables: sstables, + } +} diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go new file mode 100644 index 000000000..dd7b7b72c --- /dev/null +++ b/pkg/service/restore/index.go @@ -0,0 +1,339 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "context" + "slices" + + "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/metrics" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/sstable" +) + +// LocationWorkload represents aggregated restore workload +// in given backup location. +type LocationWorkload struct { + Location + + Size int64 + Tables []TableWorkload +} + +// TableWorkload represents restore workload +// from many manifests for given table in given backup location. +type TableWorkload struct { + Location + TableName + + Size int64 + RemoteDirs []RemoteDirWorkload +} + +// RemoteDirWorkload represents restore workload +// for given table and manifest in given backup location. +type RemoteDirWorkload struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +// RemoteSSTable represents SSTable updated with size and version info from remote. +type RemoteSSTable struct { + SSTable // File names might contain versioned snapshot tag extension + Size int64 + Versioned bool +} + +// SSTable represents files creating a single sstable. +type SSTable struct { + ID string + Files []string +} + +// IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) { + var workload []LocationWorkload + for _, l := range locations { + lw, err := w.indexLocationWorkload(ctx, l) + if err != nil { + return nil, errors.Wrapf(err, "index workload in %s", l) + } + workload = append(workload, lw) + } + return workload, nil +} + +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) { + rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") + } + if w.target.Continue { + rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") + } + } + workload := aggregateLocationWorkload(rawWorkload) + w.logWorkloadInfo(ctx, workload) + return workload, nil +} + +func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { + var rawWorkload []RemoteDirWorkload + err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + + sstables, err := filesMetaToSSTables(fm) + if err != nil { + return errors.Wrapf(err, "convert files meta to sstables") + } + sstDir := m.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) + remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, w.randomHostFromLocation(location), sstDir, sstables) + if err != nil { + return errors.Wrap(err, "fetch sstables sizes") + } + + var size int64 + for _, sst := range remoteSSTables { + size += sst.Size + } + t := TableName{ + Keyspace: fm.Keyspace, + Table: fm.Table, + } + workload := RemoteDirWorkload{ + TableName: t, + ManifestInfo: m.ManifestInfo, + RemoteSSTableDir: sstDir, + Size: size, + SSTables: remoteSSTables, + } + if size > 0 { + rawWorkload = append(rawWorkload, workload) + } + return nil + }) + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over manifests") + } + return rawWorkload, nil +} + +func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + w.logger.Info(ctx, "Filter out previously restored sstables") + + remoteSSTableDirToRestoredIDs := make(map[string][]string) + err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + if validateTimeIsSet(pr.RestoreCompletedAt) { + remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...) + } + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over prev run progress") + } + if len(remoteSSTableDirToRestoredIDs) == 0 { + return rawWorkload, nil + } + + var ( + filtered []RemoteDirWorkload + skippedCount int + skippedSize int64 + ) + for _, rw := range rawWorkload { + var filteredSSTables []RemoteSSTable + var size int64 + for _, sst := range rw.SSTables { + if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { + filteredSSTables = append(filteredSSTables, sst) + size += sst.Size + } else { + skippedCount++ + skippedSize += sst.Size + } + } + if len(filteredSSTables) > 0 { + filtered = append(filtered, RemoteDirWorkload{ + TableName: rw.TableName, + ManifestInfo: rw.ManifestInfo, + RemoteSSTableDir: rw.RemoteSSTableDir, + Size: size, + SSTables: filteredSSTables, + }) + } else { + w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir) + } + } + + w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize) + return filtered, nil +} + +func (w *tablesWorker) initMetrics(workload []LocationWorkload) { + // For now, the only persistent across task runs metrics are progress and remaining_bytes. + // The rest: state, view_build_status, batch_size are calculated from scratch. + w.metrics.ResetClusterMetrics(w.run.ClusterID) + + // Init remaining bytes + for _, wl := range workload { + for _, twl := range wl.Tables { + for _, rdwl := range twl.RemoteDirs { + w.metrics.SetRemainingBytes(metrics.RestoreBytesLabels{ + ClusterID: rdwl.ClusterID.String(), + SnapshotTag: rdwl.SnapshotTag, + Location: rdwl.Location.String(), + DC: rdwl.DC, + Node: rdwl.NodeID, + Keyspace: rdwl.Keyspace, + Table: rdwl.Table, + }, rdwl.Size) + } + } + } + + // Init progress + var totalSize int64 + for _, u := range w.run.Units { + totalSize += u.Size + } + var workloadSize int64 + for _, wl := range workload { + workloadSize += wl.Size + } + w.metrics.SetProgress(metrics.RestoreProgressLabels{ + ClusterID: w.run.ClusterID.String(), + SnapshotTag: w.run.SnapshotTag, + }, float64(totalSize-workloadSize)/float64(totalSize)*100) +} + +func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) { + if workload.Size == 0 { + return + } + var locMax, locCnt int64 + for _, twl := range workload.Tables { + if twl.Size == 0 { + continue + } + var tabMax, tabCnt int64 + for _, rdwl := range twl.RemoteDirs { + if rdwl.Size == 0 { + continue + } + var dirMax int64 + for _, sst := range rdwl.SSTables { + dirMax = max(dirMax, sst.Size) + } + dirCnt := int64(len(rdwl.SSTables)) + w.logger.Info(ctx, "Remote sstable dir workload info", + "path", rdwl.RemoteSSTableDir, + "max size", dirMax, + "average size", rdwl.Size/dirCnt, + "count", dirCnt) + tabCnt += dirCnt + tabMax = max(tabMax, dirMax) + } + w.logger.Info(ctx, "Table workload info", + "keyspace", twl.Keyspace, + "table", twl.Table, + "max size", tabMax, + "average size", twl.Size/tabCnt, + "count", tabCnt) + locCnt += tabCnt + locMax = max(locMax, tabMax) + } + w.logger.Info(ctx, "Location workload info", + "location", workload.Location.String(), + "max size", locMax, + "average size", workload.Size/locCnt, + "count", locCnt) +} + +func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { + remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) + for _, rw := range rawWorkload { + remoteDirWorkloads[rw.TableName] = append(remoteDirWorkloads[rw.TableName], rw) + } + + var tableWorkloads []TableWorkload + for _, tw := range remoteDirWorkloads { + var size int64 + for _, rdw := range tw { + size += rdw.Size + } + tableWorkloads = append(tableWorkloads, TableWorkload{ + Location: tw[0].Location, + TableName: tw[0].TableName, + Size: size, + RemoteDirs: tw, + }) + } + + var size int64 + for _, tw := range tableWorkloads { + size += tw.Size + } + return LocationWorkload{ + Location: tableWorkloads[0].Location, + Size: size, + Tables: tableWorkloads, + } +} + +func (w *tablesWorker) adjustSSTablesWithRemote(ctx context.Context, host, remoteDir string, sstables map[string]SSTable) ([]RemoteSSTable, error) { + versioned, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, host, remoteDir) + if err != nil { + return nil, errors.Wrap(err, "list versioned files") + } + + remoteSSTables := make([]RemoteSSTable, 0, len(sstables)) + for id, sst := range sstables { + rsst := RemoteSSTable{SSTable: SSTable{ID: id}} + for _, f := range sst.Files { + v, ok := versioned[f] + if !ok { + return nil, errors.Errorf("file %s is not present in listed versioned files", f) + } + + rsst.Files = append(rsst.Files, v.FullName()) + rsst.Size += v.Size + rsst.Versioned = rsst.Versioned || v.Version != "" + } + remoteSSTables = append(remoteSSTables, rsst) + } + + return remoteSSTables, nil +} + +func filesMetaToSSTables(fm FilesMeta) (map[string]SSTable, error) { + const expectedSSTableFileCnt = 9 + sstables := make(map[string]SSTable, len(fm.Files)/expectedSSTableFileCnt) + + for _, f := range fm.Files { + id, err := sstable.ExtractID(f) + if err != nil { + return nil, errors.Wrapf(err, "extract sstable component %s generation ID", f) + } + + if sst, ok := sstables[id]; ok { + sst.Files = append(sst.Files, f) + sstables[id] = sst + } else { + sstables[id] = SSTable{ + ID: id, + Files: []string{f}, + } + } + } + return sstables, nil +} diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 39a7d4a28..c4d4a26b8 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -80,16 +80,11 @@ type Run struct { TaskID uuid.UUID ID uuid.UUID - PrevID uuid.UUID - Location string // marks currently processed location - ManifestPath string // marks currently processed manifest - Keyspace string `db:"keyspace_name"` // marks currently processed keyspace - Table string `db:"table_name"` // marks currently processed table - SnapshotTag string - Stage Stage + PrevID uuid.UUID + SnapshotTag string + Stage Stage RepairTaskID uuid.UUID // task ID of the automated post-restore repair - // Cache that's initialized once for entire task Units []Unit Views []View @@ -165,20 +160,21 @@ func (t *View) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error return gocql.Unmarshal(info, data, f.Addr().Interface()) } -// RunProgress describes restore progress (like in RunProgress) of -// already started download of SSTables with specified IDs to host. +// RunProgress describes progress of restoring a single batch. type RunProgress struct { ClusterID uuid.UUID TaskID uuid.UUID RunID uuid.UUID - ManifestPath string - Keyspace string `db:"keyspace_name"` - Table string `db:"table_name"` - Host string // IP of the node to which SSTables are downloaded. - AgentJobID int64 + // Different DB name because of historical reasons and because we can't drop/alter clustering column + RemoteSSTableDir string `db:"manifest_path"` + Keyspace string `db:"keyspace_name"` + Table string `db:"table_name"` + SSTableID []string `db:"sstable_id"` + + Host string // IP of the node to which SSTables are downloaded. + AgentJobID int64 - SSTableID []string `db:"sstable_id"` DownloadStartedAt *time.Time DownloadCompletedAt *time.Time RestoreStartedAt *time.Time @@ -206,7 +202,7 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run "cluster_id": pr.ClusterID, "task_id": pr.TaskID, "run_id": pr.RunID, - "manifest_path": pr.ManifestPath, + "manifest_path": pr.RemoteSSTableDir, "keyspace_name": pr.Keyspace, "table_name": pr.Table, }).Iter() @@ -218,10 +214,6 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run return iter.Close() } -func (pr *RunProgress) idCnt() int64 { - return int64(len(pr.SSTableID)) -} - func (pr *RunProgress) setRestoreStartedAt() { t := timeutc.Now() pr.RestoreStartedAt = &t @@ -279,3 +271,9 @@ type ViewProgress struct { Status scyllaclient.ViewBuildStatus `json:"status"` } + +// TableName represents full table name. +type TableName struct { + Keyspace string + Table string +} diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index bf1048f7f..8f7bea05f 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -167,7 +167,6 @@ func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: "DESCRIBE SCHEMA WITH INTERNALS", Keyspace: u.Keyspace, Table: t.Table, DownloadStartedAt: &start, @@ -188,8 +187,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema from location", "location", location) defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location) - w.run.Location = location.String() - tableDownloadHandler := func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { return nil @@ -198,9 +195,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table) - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace - return w.workFunc(ctx, fm) } @@ -209,7 +203,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc defer w.logger.Info(ctx, "Downloading schema from manifest finished", "manifest", miwc.ManifestInfo) w.miwc = miwc - w.run.ManifestPath = miwc.Path() w.insertRun(ctx) return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler) @@ -242,12 +235,6 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { if err != nil { return errors.Wrap(err, "initialize versioned SSTables") } - if len(w.versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", w.versionedFiles, - ) - } idMapping := w.getFileNamesMapping(fm.Files, false) uuidMapping := w.getFileNamesMapping(fm.Files, true) @@ -275,10 +262,7 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { // Rename SSTable in the destination in order to avoid name conflicts dstFile := renamedSSTables[file] // Take the correct version of restored file - srcFile := file - if v, ok := w.versionedFiles[file]; ok { - srcFile = v.FullName() - } + srcFile := w.versionedFiles[file].FullName() srcPath := path.Join(srcDir, srcFile) dstPath := path.Join(dstDir, dstFile) @@ -308,9 +292,9 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.run.ManifestPath, - Keyspace: w.run.Keyspace, - Table: w.run.Table, + RemoteSSTableDir: srcDir, + Keyspace: fm.Keyspace, + Table: fm.Table, Host: host, DownloadStartedAt: &start, DownloadCompletedAt: &end, diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index 523bf01f2..dfe1fbc7a 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -115,8 +115,12 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI for _, unit := range w.run.Units { totalBytesToRestore += unit.Size } - tw := newTablesWorker(w, s.repairSvc, totalBytesToRestore) - err = tw.restore(ctx) + tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore) + if workerErr != nil { + err = workerErr + } else { + err = tw.restore(ctx) + } } else { sw := &schemaWorker{worker: w} err = sw.restore(ctx) diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index a2e784c94..5ff1d7c86 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -852,15 +852,6 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) - if err != nil { - t.Fatal(err) - } - Printf("And: restore progress: %+#v\n", pr) - if pr.Downloaded == 0 { - t.Fatal("Expected partial restore progress") - } - Print("When: resume restore and stop in during repair") dstH.RunID = uuid.MustRandom() err = dstH.service.Restore(ctx2, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)) @@ -872,7 +863,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err = dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) + pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) if err != nil { t.Fatal(err) } diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 774b581de..5d77c52bb 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -9,20 +9,21 @@ import ( "sync" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" + "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" + "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) type tablesWorker struct { worker - repairSvc *repair.Service - progress *TotalRestoreProgress - // When set to false, tablesWorker will skip restoration of location/manifest/table - // until it encounters the one present in run. - alreadyResumed bool + tableVersion map[TableName]string + repairSvc *repair.Service + progress *TotalRestoreProgress } // TotalRestoreProgress is a struct that holds information about the total progress of the restore job. @@ -64,22 +65,33 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) { p.restoredBytes += bytesRestored } -func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) *tablesWorker { - return &tablesWorker{ - worker: w, - repairSvc: repairSvc, - alreadyResumed: true, - progress: NewTotalRestoreProgress(totalBytes), +func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) { + versions := make(map[TableName]string) + for _, u := range w.run.Units { + for _, t := range u.Tables { + v, err := query.GetTableVersion(w.clusterSession, u.Keyspace, t.Table) + if err != nil { + return nil, errors.Wrapf(err, "get %s.%s version", u.Keyspace, t.Table) + } + versions[TableName{ + Keyspace: u.Keyspace, + Table: t.Table, + }] = v + } } + + return &tablesWorker{ + worker: w, + tableVersion: versions, + repairSvc: repairSvc, + progress: NewTotalRestoreProgress(totalBytes), + }, nil } // restore files from every location specified in restore target. func (w *tablesWorker) restore(ctx context.Context) error { - if w.target.Continue && w.run.PrevID != uuid.Nil && w.run.Table != "" { - w.alreadyResumed = false - } // Init metrics only on fresh start - if w.alreadyResumed { + if w.run.PrevID == uuid.Nil { w.initRestoreMetrics(ctx) } @@ -156,90 +168,60 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "Started restoring tables") defer w.logger.Info(ctx, "Restoring tables finished") - // Restore locations in deterministic order - for _, l := range w.target.Location { - if !w.alreadyResumed && w.run.Location != l.String() { - w.logger.Info(ctx, "Skipping location", "location", l) - continue - } - if err := w.restoreLocation(ctx, l); err != nil { - return err - } + workload, err := w.IndexWorkload(ctx, w.target.Location) + if err != nil { + return err } - return nil -} - -func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) error { - w.logger.Info(ctx, "Restoring location", "location", location) - defer w.logger.Info(ctx, "Restoring location finished", "location", location) + w.initMetrics(workload) - restoreManifest := func(miwc ManifestInfoWithContent) error { - if !w.alreadyResumed && w.run.ManifestPath != miwc.Path() { - w.logger.Info(ctx, "Skipping manifest", "manifest", miwc.ManifestInfo) - return nil - } - - w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) - defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) - - return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc)) + bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts) + hostsS := strset.New() + for _, h := range w.target.locationHosts { + hostsS.Add(h...) } - - return w.forEachManifest(ctx, location, restoreManifest) -} - -func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error { - return func(fm FilesMeta) error { - if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { - return nil - } - - if !w.alreadyResumed { - if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table { - w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table) + hosts := hostsS.List() + + f := func(n int) (err error) { + h := hosts[n] + for { + // Download and stream in parallel + b, ok := bd.DispatchBatch(h) + if !ok { + w.logger.Info(ctx, "No more batches to restore", "host", h) return nil } - } - - w.logger.Info(ctx, "Restoring table", "keyspace", fm.Keyspace, "table", fm.Table) - defer w.logger.Info(ctx, "Restoring table finished", "keyspace", fm.Keyspace, "table", fm.Table) - - w.run.Location = miwc.Location.String() - w.run.ManifestPath = miwc.Path() - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace - w.insertRun(ctx) - - dw, err := newTablesDirWorker(ctx, w.worker, miwc, fm, w.progress) - if err != nil { - return errors.Wrap(err, "create dir worker") - } - if !w.alreadyResumed { - if err := dw.resumePrevProgress(); err != nil { - return errors.Wrap(err, "resume prev run progress") - } - } - w.alreadyResumed = true + w.metrics.IncreaseBatchSize(w.run.ClusterID, h, b.Size) + w.logger.Info(ctx, "Got batch to restore", + "host", h, + "keyspace", b.Keyspace, + "table", b.Table, + "size", b.Size, + "sstable count", len(b.SSTables), + ) - if err := dw.restore(ctx); err != nil { - if ctx.Err() != nil { - return ctx.Err() + pr, err := w.newRunProgress(ctx, h, b) + if err != nil { + return errors.Wrap(err, "create new run progress") } - // In case all SSTables have been restored, restore can proceed even - // with errors from some hosts. - if len(dw.bundleIDPool) > 0 { - return errors.Wrapf(err, "not restored bundles %v", dw.bundleIDPool.drain()) + if err := w.restoreBatch(ctx, b, pr); err != nil { + return errors.Wrap(err, "restore batch") } - - w.logger.Error(ctx, "Restore table failed on some hosts but restore will proceed", - "keyspace", w.run.Keyspace, - "table", w.run.Table, - "error", err, - ) + w.decreaseRemainingBytesMetric(b) } + } + + notify := func(n int, err error) { + w.logger.Error(ctx, "Failed to restore files on host", + "host", hosts[n], + "error", err, + ) + } - return nil + err = parallel.Run(len(hosts), w.target.Parallel, f, notify) + if err == nil { + return bd.ValidateAllDispatched() } + return err } func (w *tablesWorker) stageRepair(ctx context.Context) error { diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index b2942a9eb..990efc9ad 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -8,190 +8,43 @@ import ( "time" "github.com/pkg/errors" - "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" - "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" - "github.com/scylladb/scylla-manager/v3/pkg/util/query" - "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - "go.uber.org/atomic" ) -// tablesDirWorker is responsible for restoring table files from manifest in parallel. -type tablesDirWorker struct { - worker - - bundles map[string]bundle // Maps bundle to it's ID - bundleIDPool idPool // SSTable IDs yet to be restored - - dstDir string // "data:" prefixed path to upload dir (common for every host) - srcDir string // Full path to remote directory with backed-up files - miwc ManifestInfoWithContent // Manifest containing fm - fm FilesMeta // Describes table and it's files located in srcDir - - ongoingPr []*RunProgress // Unfinished RunProgress from previous run of each host - - // Maps original SSTable name to its existing older version - // (with respect to currently restored snapshot tag) - // that should be used during the restore procedure. - versionedFiles VersionedMap - fileSizesCache map[string]int64 - progress *TotalRestoreProgress -} - -func newTablesDirWorker(ctx context.Context, w worker, miwc ManifestInfoWithContent, fm FilesMeta, progress *TotalRestoreProgress) (tablesDirWorker, error) { - bundles := newBundles(fm) - bundleIDPool := newIDPool(bundles) - - version, err := query.GetTableVersion(w.clusterSession, fm.Keyspace, fm.Table) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "get table version") - } - - srcDir := miwc.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) - dstDir := UploadTableDir(fm.Keyspace, fm.Table, version) - w.logger.Info(ctx, "Found table's src and dst dirs", - "keyspace", fm.Keyspace, - "table", fm.Table, - "src_dir", srcDir, - "dst_dir", dstDir, - ) - - hosts := w.target.locationHosts[miwc.Location] - versionedFiles, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, hosts[0], srcDir) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "initialize versioned SSTables") - } - if len(versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", versionedFiles, - ) - } - - fileSizesCache, err := buildFilesSizesCache(ctx, w.client, hosts[0], srcDir, versionedFiles) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "build files sizes cache") - } - - return tablesDirWorker{ - worker: w, - bundles: bundles, - bundleIDPool: bundleIDPool, - dstDir: dstDir, - srcDir: srcDir, - miwc: miwc, - fm: fm, - ongoingPr: make([]*RunProgress, len(hosts)), - versionedFiles: versionedFiles, - fileSizesCache: fileSizesCache, - progress: progress, - }, nil -} - -// restore SSTables of receivers manifest/table in parallel. -func (w *tablesDirWorker) restore(ctx context.Context) error { - // Count of SSTable IDs yet to be successfully restored - ctr := atomic.NewInt64(w.bundleIDPool.size()) - for _, pr := range w.ongoingPr { - if pr != nil { - ctr.Add(pr.idCnt()) - } - } - - if ctr.Load() == 0 { - w.logger.Info(ctx, "Table does not have any more SSTables to restore", - "keyspace", w.fm.Keyspace, - "table", w.fm.Table, - ) - return nil - } - - hosts := w.target.locationHosts[w.miwc.Location] - f := func(n int) (err error) { - h := hosts[n] - ongoingPr := w.ongoingPr[n] - - // First handle ongoing restore - if ongoingPr != nil { - if err := w.reactivateRunProgress(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "reactivate run progress") - } - - if err := w.restoreBatch(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "restore reactivated batch") - } - - if ctr.Sub(ongoingPr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - - for { - pr, err := w.newRunProgress(ctx, h) - if err != nil { - return errors.Wrap(err, "create run progress") - } - if pr == nil { - w.logger.Info(ctx, "No more batches to restore", "host", h) - return nil - } - - if err := w.restoreBatch(ctx, pr); err != nil { - return errors.Wrap(err, "restore batch") - } - - if ctr.Sub(pr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - } - - notify := func(n int, err error) { - w.logger.Error(ctx, "Failed to restore files on host", - "host", hosts[n], - "error", err, - ) - } - - return parallel.Run(len(hosts), w.target.Parallel, f, notify) -} - -func (w *tablesDirWorker) restoreBatch(ctx context.Context, pr *RunProgress) (err error) { +func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgress) (err error) { defer func() { // Run cleanup on non-pause error - if err != nil && ctx.Err() == nil { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) + if err != nil { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) w.cleanupRunProgress(context.Background(), pr) } else { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) } }() // Download has already been started on RunProgress creation. // Skip steps already done in the previous run. if !validateTimeIsSet(pr.DownloadCompletedAt) { - if err := w.waitJob(ctx, pr); err != nil { + if err := w.waitJob(ctx, b, pr); err != nil { return errors.Wrap(err, "wait for job") } } if !validateTimeIsSet(pr.RestoreCompletedAt) { - if err := w.restoreSSTables(ctx, pr); err != nil { + if err := w.restoreSSTables(ctx, b, pr); err != nil { return errors.Wrap(err, "call load and stream") } } - - w.decreaseRemainingBytesMetric(pr.Downloaded + pr.Skipped + pr.VersionedProgress) w.logger.Info(ctx, "Restored batch", "host", pr.Host, "sstable_id", pr.SSTableID) return nil } // waitJob waits for rclone job to finish while updating its progress. -func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err error) { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.miwc.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) +func (w *tablesWorker) waitJob(ctx context.Context, b batch, pr *RunProgress) (err error) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) w.logger.Info(ctx, "Waiting for job", "host", pr.Host, "job_id", pr.AgentJobID) defer func() { @@ -229,7 +82,7 @@ func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err err } } -func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { +func (w *tablesWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { // Set StartedAt and CompletedAt based on Job if t := time.Time(job.StartedAt); !t.IsZero() { pr.DownloadStartedAt = &t @@ -245,8 +98,8 @@ func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunPro w.insertRunProgress(ctx, pr) } -func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) error { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) +func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProgress) error { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) if !validateTimeIsSet(pr.RestoreStartedAt) { pr.setRestoreStartedAt() w.insertRunProgress(ctx, pr) @@ -260,110 +113,19 @@ func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) return err } -func (w *tablesDirWorker) resumePrevProgress() error { - bind := &RunProgress{ - ClusterID: w.run.ClusterID, - TaskID: w.run.TaskID, - RunID: w.run.ID, - ManifestPath: w.miwc.Path(), - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, - } - - // All bundles IDs started in the previous run - startedID := strset.New() - // All unfinished RunProgress started in the previous run - ongoingPr := make(map[string]*RunProgress) - err := bind.ForEachTableProgress(w.session, func(pr *RunProgress) { - startedID.Add(pr.SSTableID...) - if !validateTimeIsSet(pr.RestoreCompletedAt) { - cp := *pr - ongoingPr[pr.Host] = &cp - } - }) - if err != nil { - return err - } - - // Remove already started ID from the pool - ids := w.bundleIDPool.drain() - for _, id := range ids { - if !startedID.Has(id) { - w.bundleIDPool <- id - } - } - - hosts := w.target.locationHosts[w.miwc.Location] - // Set ongoing RunProgress so that they can be resumed - for i, h := range hosts { - w.ongoingPr[i] = ongoingPr[h] - } - return nil -} - -// reactivateRunProgress preserves batch assembled in the previous run and tries to reuse its unfinished rclone job. -func (w *tablesDirWorker) reactivateRunProgress(ctx context.Context, pr *RunProgress) error { - // Nothing to do if download has already finished - if validateTimeIsSet(pr.DownloadCompletedAt) { - return nil - } - - job, err := w.client.RcloneJobProgress(ctx, pr.Host, pr.AgentJobID, w.config.LongPollingTimeoutSeconds) - if err != nil { - return errors.Wrapf(err, "get progress of rclone job %d", pr.AgentJobID) - } - // Nothing to do if rclone job is still running - if scyllaclient.WorthWaitingForJob(job.Status) { - return nil - } - - // Recreate rclone job - batch := w.batchFromIDs(pr.SSTableID) - if err := w.cleanUploadDir(ctx, pr.Host, w.dstDir, batch); err != nil { - return errors.Wrapf(err, "clean upload dir of host %s", pr.Host) - } - - jobID, versionedPr, err := w.startDownload(ctx, pr.Host, batch) - if err != nil { - w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - return err - } - - pr.AgentJobID = jobID - pr.VersionedProgress = versionedPr - w.insertRunProgress(ctx, pr) - return nil -} - -// newRunProgress creates RunProgress by assembling batch and starting download to host's upload dir. -func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*RunProgress, error) { +// newRunProgress creates RunProgress by starting download to host's upload dir. +func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) (*RunProgress, error) { if err := w.checkAvailableDiskSpace(ctx, host); err != nil { return nil, errors.Wrap(err, "validate free disk space") } - takenIDs := w.chooseIDsForBatch(ctx, w.target.BatchSize, host) - if ctx.Err() != nil { - w.returnBatchToPool(takenIDs, host) - return nil, ctx.Err() - } - if takenIDs == nil { - return nil, nil //nolint: nilnil - } - - w.logger.Info(ctx, "Created new batch", - "host", host, - "sstable_id", takenIDs, - ) - - batch := w.batchFromIDs(takenIDs) - if err := w.cleanUploadDir(ctx, host, w.dstDir, nil); err != nil { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + if err := w.cleanUploadDir(ctx, host, uploadDir, nil); err != nil { return nil, errors.Wrapf(err, "clean upload dir of host %s", host) } - jobID, versionedPr, err := w.startDownload(ctx, host, batch) + jobID, versionedPr, err := w.startDownload(ctx, host, b) if err != nil { - w.returnBatchToPool(takenIDs, host) return nil, err } @@ -371,12 +133,12 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.miwc.Path(), - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + RemoteSSTableDir: b.RemoteSSTableDir, + Keyspace: b.Keyspace, + Table: b.Table, Host: host, AgentJobID: jobID, - SSTableID: takenIDs, + SSTableID: b.IDs(), VersionedProgress: versionedPr, } @@ -388,135 +150,86 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run // Downloading of versioned files happens first in a synchronous way. // It returns jobID for asynchronous download of the newest versions of files // alongside with the size of the already downloaded versioned files. -func (w *tablesDirWorker) startDownload(ctx context.Context, host string, batch []string) (jobID, versionedPr int64, err error) { - var ( - regularBatch = make([]string, 0) - versionedBatch = make([]VersionedSSTable, 0) - ) - // Decide which files require to be downloaded in their older version - for _, file := range batch { - if v, ok := w.versionedFiles[file]; ok { - versionedBatch = append(versionedBatch, v) - versionedPr += v.Size - } else { - regularBatch = append(regularBatch, file) - } - } - // Downloading versioned files requires us to rename them (strip version extension) - // and function RcloneCopyPaths lacks this option. In order to achieve that, we copy - // all versioned files one by one with RcloneCopyFile (which supports renaming files). - // The assumption is that the existence of versioned files is low and that they - // are rather small, so we can do it in a synchronous way. - // Copying files can be done in full parallel because of rclone ability to limit transfers. - f := func(i int) error { - file := versionedBatch[i] - // Restore file without its version extension - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - - if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { - return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) +func (w *tablesWorker) startDownload(ctx context.Context, host string, b batch) (jobID, versionedPr int64, err error) { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + sstables := b.NotVersionedSSTables() + versioned := b.VersionedSSTables() + versionedSize := b.VersionedSize() + if len(versioned) > 0 { + if err := w.downloadVersioned(ctx, host, b.RemoteSSTableDir, uploadDir, versioned); err != nil { + return 0, 0, errors.Wrapf(err, "download versioned sstabled on host %s", host) } - - w.logger.Info(ctx, "Downloaded versioned file", - "host", host, - "src", src, - "dst", dst, - "size", file.Size, - ) - - return nil - } - - notify := func(i int, err error) { - file := versionedBatch[i] - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - w.logger.Error(ctx, "Failed to download versioned SSTable", - "file", file, - "dst", dst, - "src", src, - "error", err, - ) } - if err := parallel.Run(len(versionedBatch), parallel.NoLimit, f, notify); err != nil { - return 0, 0, err - } // Start asynchronous job for downloading the newest versions of remaining files - jobID, err = w.client.RcloneCopyPaths(ctx, host, w.dstDir, w.srcDir, regularBatch) + files := make([]string, 0) + for _, sst := range sstables { + files = append(files, sst.Files...) + } + jobID, err = w.client.RcloneCopyPaths(ctx, host, uploadDir, b.RemoteSSTableDir, files) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } - w.logger.Info(ctx, "Started downloading files", "host", host, "job_id", jobID, - "batch", regularBatch, ) - - return jobID, versionedPr, nil + return jobID, versionedSize, nil } -// chooseIDsForBatch returns slice of IDs of SSTables that the batch consists of. -func (w *tablesDirWorker) chooseIDsForBatch(ctx context.Context, size int, host string) (takenIDs []string) { - defer func() { - w.increaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(takenIDs), host) - }() - - // All hosts are trying to get IDs for batch from the pool. - // Pool is closed after the whole table has been restored. - - // Take at most size IDs - for i := 0; i < size; i++ { - select { - case <-ctx.Done(): - return takenIDs - default: - } - - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - default: - // Don't wait for more IDs if the pool is empty - // and host already has something to restore. - if len(takenIDs) > 0 { - return takenIDs - } - // Here host hasn't taken any IDs and pool is empty, - // so it waits for the whole table to be restored or - // for IDs that might return to the pool in case of error on the other hosts. - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - case <-ctx.Done(): - return takenIDs +// Downloading versioned files requires us to rename them (strip version extension) +// and function RcloneCopyPaths lacks this option. In order to achieve that, we copy +// all versioned files one by one with RcloneCopyFile (which supports renaming files). +// The assumption is that the existence of versioned files is low and that they +// are rather small, so we can do it in a synchronous way. +// Copying files can be done in full parallel because of rclone ability to limit transfers. +func (w *tablesWorker) downloadVersioned(ctx context.Context, host, srcDir, dstDir string, versioned []RemoteSSTable) error { + f := func(i int) error { + sst := versioned[i] + for _, file := range sst.Files { + name, _ := SplitNameAndVersion(file) + // Restore file without its version extension + dst := path.Join(dstDir, name) + src := path.Join(srcDir, file) + if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { + return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) } } + w.logger.Info(ctx, "Downloaded versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + ) + return nil + } + + notify := func(i int, err error) { + sst := versioned[i] + w.logger.Error(ctx, "Failed to download versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + "error", err, + ) } - return takenIDs + return parallel.Run(len(versioned), parallel.NoLimit, f, notify) } -func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { +func (w *tablesWorker) decreaseRemainingBytesMetric(b batch) { labels := metrics.RestoreBytesLabels{ ClusterID: w.run.ClusterID.String(), SnapshotTag: w.run.SnapshotTag, - Location: w.miwc.Location.String(), - DC: w.miwc.DC, - Node: w.miwc.NodeID, - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + Location: b.Location.String(), + DC: b.DC, + Node: b.NodeID, + Keyspace: b.Keyspace, + Table: b.Table, } - w.metrics.DecreaseRemainingBytes(labels, bytes) - w.progress.Update(bytes) + w.metrics.DecreaseRemainingBytes(labels, b.Size) + w.progress.Update(b.Size) progressLabels := metrics.RestoreProgressLabels{ ClusterID: w.run.ClusterID.String(), @@ -525,80 +238,13 @@ func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { w.metrics.SetProgress(progressLabels, w.progress.CurrentProgress()) } -func (w *tablesDirWorker) increaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.IncreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) decreaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.DecreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { +func (w *tablesWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - - if cleanErr := w.cleanUploadDir(ctx, pr.Host, w.dstDir, nil); cleanErr != nil { - w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) + tn := TableName{ + Keyspace: pr.Keyspace, + Table: pr.Table, } -} - -func (w *tablesDirWorker) returnBatchToPool(ids []string, host string) { - w.decreaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(ids), host) - for _, id := range ids { - w.bundleIDPool <- id - } -} - -func (w *tablesDirWorker) batchFromIDs(ids []string) []string { - var batch []string - for _, id := range ids { - batch = append(batch, w.bundles[id]...) - } - return batch -} - -func (w *tablesDirWorker) countBatchSize(batch []string) int64 { - var batchSize int64 - for _, file := range batch { - batchSize += w.fileSizesCache[file] - } - return batchSize -} - -// bundle represents SSTables with the same ID. -type bundle []string - -func newBundles(fm FilesMeta) map[string]bundle { - bundles := make(map[string]bundle) - for _, f := range fm.Files { - id, err := sstable.ExtractID(f) - if err != nil { - panic(err) - } - bundles[id] = append(bundles[id], f) - } - return bundles -} - -// idPool represents pool of SSTableIDs yet to be restored. -type idPool chan string - -func (p idPool) drain() []string { - var out []string - for len(p) > 0 { - out = append(out, <-p) - } - return out -} - -func (p idPool) size() int64 { - return int64(len(p)) -} - -func newIDPool(bundles map[string]bundle) chan string { - bundleIDPool := make(chan string, len(bundles)) - for id := range bundles { - bundleIDPool <- id + if cleanErr := w.cleanUploadDir(ctx, pr.Host, UploadTableDir(pr.Keyspace, pr.Table, w.tableVersion[tn]), nil); cleanErr != nil { + w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) } - return bundleIDPool } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 00f96c6dc..8984a934a 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "path" "regexp" "slices" @@ -43,6 +44,14 @@ type worker struct { clusterSession gocqlx.Session } +func (w *worker) randomHostFromLocation(loc Location) string { + hosts, ok := w.target.locationHosts[loc] + if !ok { + panic("no hosts for location: " + loc.String()) + } + return hosts[rand.Intn(len(hosts))] +} + func (w *worker) init(ctx context.Context, properties json.RawMessage) error { if err := w.initTarget(ctx, properties); err != nil { return errors.Wrap(err, "init target") @@ -666,10 +675,6 @@ func (w *worker) decorateWithPrevRun(ctx context.Context) error { if w.target.Continue { w.run.PrevID = prev.ID - w.run.Location = prev.Location - w.run.ManifestPath = prev.ManifestPath - w.run.Keyspace = prev.Keyspace - w.run.Table = prev.Table w.run.Stage = prev.Stage w.run.RepairTaskID = prev.RepairTaskID } @@ -684,6 +689,11 @@ func (w *worker) clonePrevProgress(ctx context.Context) { defer q.Release() err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.PrevID, func(pr *RunProgress) { + // We don't support interrupted run progresses resume, + // so only finished run progresses should be copied. + if !validateTimeIsSet(pr.RestoreCompletedAt) { + return + } pr.RunID = w.run.ID if err := q.BindStruct(pr).Exec(); err != nil { w.logger.Error(ctx, "Couldn't clone run progress", @@ -794,20 +804,3 @@ func (w *worker) stopJob(ctx context.Context, jobID int64, host string) { ) } } - -func buildFilesSizesCache(ctx context.Context, client *scyllaclient.Client, host, dir string, versioned VersionedMap) (map[string]int64, error) { - filesSizesCache := make(map[string]int64) - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - } - f := func(item *scyllaclient.RcloneListDirItem) { - filesSizesCache[item.Name] = item.Size - } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { - return nil, errors.Wrapf(err, "host %s: listing all files from %s", host, dir) - } - for k, v := range versioned { - filesSizesCache[k] = v.Size - } - return filesSizesCache, nil -} diff --git a/schema/v3.4.0.cql b/schema/v3.4.0.cql new file mode 100644 index 000000000..4fe13883c --- /dev/null +++ b/schema/v3.4.0.cql @@ -0,0 +1,4 @@ +ALTER TABLE restore_run DROP location; +ALTER TABLE restore_run DROP manifest_path; +ALTER TABLE restore_run DROP keyspace_name; +ALTER TABLE restore_run DROP table_name; \ No newline at end of file