Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore improvement: batch retry #4071

Merged
merged 8 commits into from
Oct 22, 2024
195 changes: 136 additions & 59 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,58 @@ import (
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

// batchDispatcher is a tool for batching SSTables from
// Workload across different hosts during restore.
// It follows a few rules:
//
// - it dispatches batches from the RemoteDirWorkload with the biggest
// initial size first
//
// - it aims to optimize batch size according to batchSize param
//
// - it selects the biggest SSTables from RemoteDirWorkload first,
// so that batch contains SSTables of similar size (improved shard utilization)
//
// - it supports batch retry - failed batch can be re-tried by other
// hosts (see wait description for more information)
//
// - it supports host retry - host that failed to restore batch can still
// restore other batches (see hostToFailedDC description for more information).
type batchDispatcher struct {
mu sync.Mutex
// Guards all exported methods
mu sync.Mutex
// When there are no more batches to be restored,
// but some already dispatched batches are still
// being processed, idle hosts waits on wait chan.
// They should wait, as in case currently processed
// batch fails to be restored, they can be waked up
// by batchDispatcher, and re-try to restore returned
// batch on their own.
wait chan struct{}

remainingBytes int64
workload Workload
batchSize int
// Const workload defined during indexing
workload Workload
// Mutable workloadProgress updated as batches are dispatched
workloadProgress workloadProgress
Michal-Leszczynski marked this conversation as resolved.
Show resolved Hide resolved
// For batchSize X, batches contain X*node_shard_cnt SSTables.
// We always multiply batchSize by node_shard_cnt in order to
// utilize all shards more equally.
// For batchSize 0, batches contain N*node_shard_cnt SSTables
// of total size up to 5% of node expected workload
// (expectedShardWorkload*node_shard_cnt).
batchSize int
// Equals total_backup_size/($\sum_{node} shard_cnt(node)$)
expectedShardWorkload int64
hostShardCnt map[string]uint
locationHosts map[Location][]string
hostToFailedDC map[string][]string

// Stores host shard count
hostShardCnt map[string]uint
// Stores which hosts have access to which locations
locationHosts map[Location][]string
// Marks which host failed to restore batches from which DCs.
// When host failed to restore a batch from one backed up DC,
// it can still restore other batches coming from different
// DCs. This is a host re-try mechanism aiming to help with #3871.
hostToFailedDC map[string][]string
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
Expand All @@ -35,8 +76,8 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
return &batchDispatcher{
mu: sync.Mutex{},
wait: make(chan struct{}),
remainingBytes: workload.TotalSize,
workload: workload,
workloadProgress: newWorkloadProgress(workload),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand All @@ -45,6 +86,44 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
}
}

// Describes current state of SSTables that are yet to be batched.
type workloadProgress struct {
// Bytes that are yet to be restored.
// They are decreased after a successful batch restoration.
bytesToBeRestored int64
// SSTables grouped by RemoteSSTableDir that are yet to
// be batched. They are removed on batch dispatch, but can
// be re-added when batch failed to be restored.
// workloadProgress.remoteDir and Workload.RemoteDir have
// corresponding indexes.
remoteDir []remoteSSTableDirProgress
}

// Describes current state of SSTables from given RemoteSSTableDir
// that are yet to be batched.
type remoteSSTableDirProgress struct {
RemainingSize int64
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload) workloadProgress {
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
for i := range workload.RemoteDir {
p[i] = remoteSSTableDirProgress{
RemainingSize: workload.RemoteDir[i].Size,
RemainingSSTables: workload.RemoteDir[i].SSTables,
}
}
return workloadProgress{
bytesToBeRestored: workload.TotalSize,
remoteDir: p,
}
}

func (wp workloadProgress) done() bool {
return wp.bytesToBeRestored == 0
}

type batch struct {
TableName
*ManifestInfo
Expand Down Expand Up @@ -92,32 +171,34 @@ func (b batch) IDs() []string {
return ids
}

// ValidateAllDispatched returns error if not all sstables were dispatched.
// ValidateAllDispatched returns error if not all SSTables were dispatched.
func (bd *batchDispatcher) ValidateAllDispatched() error {
bd.mu.Lock()
defer bd.mu.Unlock()

for _, rdw := range bd.workload.RemoteDir {
if rdw.Size != 0 || len(rdw.SSTables) != 0 {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("expected all data to be restored, missing sstables from location %s table %s.%s: %v (%d bytes)",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.SSTables, rdw.Size)
}
}
if !bd.done() {
if !bd.workloadProgress.done() {
return errors.Errorf("expected all data to be restored, internal progress calculation error")
}
return nil
}

// DispatchBatch returns batch restored or false when there is no more work to do.
// DispatchBatch returns batch to be restored or false when there is no more work to do.
// This method might hang and wait for sstables that might come from batches that
// failed to be restored. Because of that, it's important to call ReportSuccess
// or ReportFailure after each dispatched batch was attempted to be restored.
// failed to be restored (see batchDispatcher.wait description for more information).
// Because of that, it's important to call ReportSuccess or ReportFailure after
// each dispatched batch was attempted to be restored.
func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) {
for {
bd.mu.Lock()

if bd.done() {
if bd.workloadProgress.done() {
bd.mu.Unlock()
return batch{}, false
}
Expand All @@ -133,41 +214,43 @@ func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) {
}
}

func (bd *batchDispatcher) done() bool {
return bd.remainingBytes == 0
func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
dirIdx := bd.chooseDir(host)
if dirIdx < 0 {
return batch{}, false
}
return bd.createBatch(dirIdx, host)
}

func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
var rdw *RemoteDirWorkload
for i, w := range bd.workload.RemoteDir {
func (bd *batchDispatcher) chooseDir(host string) int {
dirIdx := -1
for i := range bd.workloadProgress.remoteDir {
rdw := bd.workload.RemoteDir[i]
// Skip empty dir
if w.Size == 0 {
if bd.workloadProgress.remoteDir[i].RemainingSize == 0 {
continue
}
// Skip dir from already failed dc
if slices.Contains(bd.hostToFailedDC[host], w.DC) {
if slices.Contains(bd.hostToFailedDC[host], rdw.DC) {
continue
}
// Sip dir from location without access
if !slices.Contains(bd.locationHosts[w.Location], host) {
if !slices.Contains(bd.locationHosts[rdw.Location], host) {
continue
}
rdw = &bd.workload.RemoteDir[i]
dirIdx = i
break
}
if rdw == nil {
return batch{}, false
}
return bd.createBatch(rdw, host)
return dirIdx
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (batch, bool) {
// Returns batch from given RemoteSSTableDir and updates workloadProgress.
func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
rdp := &bd.workloadProgress.remoteDir[dirIdx]
shardCnt := bd.hostShardCnt[host]
if shardCnt == 0 {
shardCnt = 1
}

var i int
var size int64
if bd.batchSize == maxBatchSize {
Expand All @@ -177,13 +260,13 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat
sizeLimit := expectedNodeWorkload / 20
for {
for j := 0; j < int(shardCnt); j++ {
if i >= len(rdw.SSTables) {
if i >= len(rdp.RemainingSSTables) {
break
}
size += rdw.SSTables[i].Size
size += rdp.RemainingSSTables[i].Size
i++
}
if i >= len(rdw.SSTables) {
if i >= len(rdp.RemainingSSTables) {
break
}
if size > sizeLimit {
Expand All @@ -192,9 +275,9 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(bd.batchSize*int(shardCnt), len(rdw.SSTables))
i = min(bd.batchSize*int(shardCnt), len(rdp.RemainingSSTables))
for j := 0; j < i; j++ {
size += rdw.SSTables[j].Size
size += rdp.RemainingSSTables[j].Size
}
}

Expand All @@ -203,19 +286,17 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat
}
// Extend batch if it was to leave less than
// 1 sstable per shard for the next one.
if len(rdw.SSTables)-i < int(shardCnt) {
for ; i < len(rdw.SSTables); i++ {
size += rdw.SSTables[i].Size
if len(rdp.RemainingSSTables)-i < int(shardCnt) {
for ; i < len(rdp.RemainingSSTables); i++ {
size += rdp.RemainingSSTables[i].Size
}
}

sstables := rdw.SSTables[:i]
rdw.SSTables = rdw.SSTables[i:]
sstables := rdp.RemainingSSTables[:i]
rdp.RemainingSSTables = rdp.RemainingSSTables[i:]
rdw := bd.workload.RemoteDir[dirIdx]

rdw.Size -= size
bd.workload.TableSize[rdw.TableName] -= size
bd.workload.LocationSize[rdw.Location] -= size
bd.workload.TotalSize -= size
rdp.RemainingSize -= size
return batch{
TableName: rdw.TableName,
ManifestInfo: rdw.ManifestInfo,
Expand All @@ -230,8 +311,8 @@ func (bd *batchDispatcher) ReportSuccess(b batch) {
bd.mu.Lock()
defer bd.mu.Unlock()

bd.remainingBytes -= b.Size
if bd.done() {
bd.workloadProgress.bytesToBeRestored -= b.Size
if bd.workloadProgress.done() {
bd.wakeUpWaiting()
}
}
Expand All @@ -244,24 +325,20 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error {
// Mark failed DC for host
bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC)

var rdw *RemoteDirWorkload
dirIdx := -1
for i := range bd.workload.RemoteDir {
if bd.workload.RemoteDir[i].RemoteSSTableDir == b.RemoteSSTableDir {
rdw = &bd.workload.RemoteDir[i]
dirIdx = i
break
}
}
if rdw == nil {
if dirIdx < 0 {
return errors.Errorf("unknown remote sstable dir %s", b.RemoteSSTableDir)
}

var newSST []RemoteSSTable
newSST = append(newSST, b.SSTables...)
newSST = append(newSST, rdw.SSTables...)

rdw.SSTables = newSST
rdw.Size += b.Size
bd.workload.TableSize[b.TableName] += b.Size
bd.workload.LocationSize[b.Location] += b.Size
rdp := &bd.workloadProgress.remoteDir[dirIdx]
rdp.RemainingSSTables = append(b.SSTables, rdp.RemainingSSTables...)
rdp.RemainingSize += b.Size

bd.wakeUpWaiting()
return nil
Expand Down