Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore improvement: index #4044

Merged
merged 10 commits into from
Oct 3, 2024
Prev Previous commit
feat(restore): index, log workload info
Workload info contains location/table/remote sstable dir sstable count,
total size, max and average sstable size.
Michal-Leszczynski committed Oct 3, 2024
commit 503bd5c5c57dd99701fae3e71aa90ba0f47d5f31
69 changes: 64 additions & 5 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
@@ -74,12 +74,14 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads")
}
if w.target.Continue {
rawWorkload, err = w.filterPreviouslyRestoredSStables(rawWorkload)
rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload)
if err != nil {
return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables")
}
}
return aggregateLocationWorkload(rawWorkload), nil
workload := aggregateLocationWorkload(rawWorkload)
w.logWorkloadInfo(ctx, workload)
return workload, nil
}

func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) {
@@ -115,7 +117,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo
Size: size,
SSTables: remoteSSTables,
}
rawWorkload = append(rawWorkload, workload)
if size > 0 {
rawWorkload = append(rawWorkload, workload)
}
return nil
})
})
@@ -125,7 +129,9 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo
return rawWorkload, nil
}

func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) {
func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) {
w.logger.Info(ctx, "Filter out previously restored sstables")

remoteSSTableDirToRestoredIDs := make(map[string][]string)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) {
if validateTimeIsSet(pr.RestoreCompletedAt) {
@@ -139,14 +145,21 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW
return rawWorkload, nil
}

var filtered []RemoteDirWorkload
var (
filtered []RemoteDirWorkload
skippedCount int
skippedSize int64
)
for _, rw := range rawWorkload {
var filteredSSTables []RemoteSSTable
var size int64
for _, sst := range rw.SSTables {
if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) {
filteredSSTables = append(filteredSSTables, sst)
size += sst.Size
} else {
skippedCount++
skippedSize += sst.Size
}
}
if len(filteredSSTables) > 0 {
@@ -157,9 +170,12 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(rawWorkload []RemoteDirW
Size: size,
SSTables: filteredSSTables,
})
} else {
w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir)
}
}

w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize)
return filtered, nil
}

@@ -200,6 +216,49 @@ func (w *tablesWorker) initMetrics(workload []LocationWorkload) {
}, float64(totalSize-workloadSize)/float64(totalSize)*100)
}

func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) {
if workload.Size == 0 {
return
}
var locMax, locCnt int64
for _, twl := range workload.Tables {
if twl.Size == 0 {
continue
}
var tabMax, tabCnt int64
for _, rdwl := range twl.RemoteDirs {
if rdwl.Size == 0 {
continue
}
var dirMax int64
for _, sst := range rdwl.SSTables {
dirMax = max(dirMax, sst.Size)
}
dirCnt := int64(len(rdwl.SSTables))
w.logger.Info(ctx, "Remote sstable dir workload info",
"path", rdwl.RemoteSSTableDir,
"max size", dirMax,
"average size", rdwl.Size/dirCnt,
"count", dirCnt)
tabCnt += dirCnt
tabMax = max(tabMax, dirMax)
}
w.logger.Info(ctx, "Table workload info",
"keyspace", twl.Keyspace,
"table", twl.Table,
"max size", tabMax,
"average size", twl.Size/tabCnt,
"count", tabCnt)
locCnt += tabCnt
locMax = max(locMax, tabMax)
}
w.logger.Info(ctx, "Location workload info",
"location", workload.Location.String(),
"max size", locMax,
"average size", workload.Size/locCnt,
"count", locCnt)
}

func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload {
remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload)
for _, rw := range rawWorkload {