diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 4ce4fb195..39a7d4a28 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -4,6 +4,7 @@ package restore import ( "reflect" + "slices" "sort" "time" @@ -111,6 +112,14 @@ func (u *Unit) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error return gocql.Unmarshal(info, data, f.Addr().Interface()) } +func unitsContainTable(units []Unit, ks, tab string) bool { + idx := slices.IndexFunc(units, func(u Unit) bool { return u.Keyspace == ks }) + if idx < 0 { + return false + } + return slices.ContainsFunc(units[idx].Tables, func(t Table) bool { return t.Table == tab }) +} + // Table represents restored table, its size and original tombstone_gc mode. type Table struct { Table string `json:"table" db:"table_name"` diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index 831578f40..bf1048f7f 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -191,6 +191,10 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.run.Location = location.String() tableDownloadHandler := func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table) @@ -208,7 +212,7 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.run.ManifestPath = miwc.Path() w.insertRun(ctx) - return miwc.ForEachIndexIterWithError(w.target.Keyspace, tableDownloadHandler) + return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler) } return w.forEachManifest(ctx, location, manifestDownloadHandler) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 25bd2c07b..774b581de 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -182,7 +182,7 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) - return miwc.ForEachIndexIterWithError(w.target.Keyspace, w.restoreDir(ctx, miwc)) + return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc)) } return w.forEachManifest(ctx, location, restoreManifest) @@ -190,6 +190,10 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error { return func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + if !w.alreadyResumed { if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table { w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table) @@ -279,8 +283,12 @@ func (w *tablesWorker) initRestoreMetrics(ctx context.Context) { func(miwc ManifestInfoWithContent) error { sizePerTableAndKeyspace := make(map[string]map[string]int64) err := miwc.ForEachIndexIterWithError( - w.target.Keyspace, + nil, func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + if sizePerTableAndKeyspace[fm.Keyspace] == nil { sizePerTableAndKeyspace[fm.Keyspace] = make(map[string]int64) }