Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore improvement: batch retry #4071

Merged
merged 8 commits into from
Oct 22, 2024
243 changes: 141 additions & 102 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import (
)

type batchDispatcher struct {
mu sync.Mutex
workload []LocationWorkload
mu sync.Mutex
wait chan struct{}

remainingBytes int64
workload Workload
batchSize int
expectedShardWorkload int64
hostShardCnt map[string]uint
locationHosts map[Location][]string
hostToFailedDC map[string][]string
}

func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
sortWorkloadBySizeDesc(workload)
var size int64
for _, t := range workload {
size += t.Size
}
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
shards += sh
Expand All @@ -34,11 +34,14 @@ func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt
}
return &batchDispatcher{
mu: sync.Mutex{},
wait: make(chan struct{}),
remainingBytes: workload.TotalSize,
workload: workload,
batchSize: batchSize,
expectedShardWorkload: size / int64(shards),
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
locationHosts: locationHosts,
hostToFailedDC: make(map[string][]string),
}
}

Expand Down Expand Up @@ -90,106 +93,97 @@ func (b batch) IDs() []string {
}

// ValidateAllDispatched returns error if not all sstables were dispatched.
func (b *batchDispatcher) ValidateAllDispatched() error {
for _, lw := range b.workload {
if lw.Size != 0 {
for _, tw := range lw.Tables {
if tw.Size != 0 {
for _, dw := range tw.RemoteDirs {
if dw.Size != 0 || len(dw.SSTables) != 0 {
return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)",
dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)",
tw.Location, tw.Keyspace, tw.Table, tw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)",
lw.Location, lw.Size)
func (bd *batchDispatcher) ValidateAllDispatched() error {
bd.mu.Lock()
defer bd.mu.Unlock()

for _, rdw := range bd.workload.RemoteDir {
if rdw.Size != 0 || len(rdw.SSTables) != 0 {
return errors.Errorf("expected all data to be restored, missing sstables from location %s table %s.%s: %v (%d bytes)",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.SSTables, rdw.Size)
}
}
if !bd.done() {
return errors.Errorf("expected all data to be restored, internal progress calculation error")
}
return nil
}

// DispatchBatch batch to be restored or false when there is no more work to do.
func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) {
b.mu.Lock()
defer b.mu.Unlock()
// DispatchBatch returns batch restored or false when there is no more work to do.
// This method might hang and wait for sstables that might come from batches that
// failed to be restored. Because of that, it's important to call ReportSuccess
// or ReportFailure after each dispatched batch was attempted to be restored.
func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) {
for {
bd.mu.Lock()

l := b.chooseLocation(host)
if l == nil {
return batch{}, false
}
t := b.chooseTable(l)
if t == nil {
return batch{}, false
}
dir := b.chooseRemoteDir(t)
if dir == nil {
return batch{}, false
}
return b.createBatch(l, t, dir, host)
}

// Returns location for which batch should be created.
func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload {
for i := range b.workload {
if b.workload[i].Size == 0 {
continue
if bd.done() {
bd.mu.Unlock()
return batch{}, false
}
if slices.Contains(b.locationHosts[b.workload[i].Location], host) {
return &b.workload[i]
b, ok := bd.dispatchBatch(host)
wait := bd.wait

bd.mu.Unlock()

if ok {
return b, true
}
<-wait
}
return nil
}

// Returns table for which batch should be created.
func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload {
for i := range location.Tables {
if location.Tables[i].Size == 0 {
continue
}
return &location.Tables[i]
}
return nil
func (bd *batchDispatcher) done() bool {
return bd.remainingBytes == 0
}

// Return remote dir for which batch should be created.
func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload {
for i := range table.RemoteDirs {
if table.RemoteDirs[i].Size == 0 {
func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
var rdw *RemoteDirWorkload
for i, w := range bd.workload.RemoteDir {
// Skip empty dir
if w.Size == 0 {
continue
}
// Skip dir from already failed dc
if slices.Contains(bd.hostToFailedDC[host], w.DC) {
continue
}
return &table.RemoteDirs[i]
// Sip dir from location without access
if !slices.Contains(bd.locationHosts[w.Location], host) {
continue
}
rdw = &bd.workload.RemoteDir[i]
break
}
return nil
if rdw == nil {
return batch{}, false
}
return bd.createBatch(rdw, host)
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) {
shardCnt := b.hostShardCnt[host]
func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (batch, bool) {
shardCnt := bd.hostShardCnt[host]
if shardCnt == 0 {
shardCnt = 1
}

var i int
var size int64
if b.batchSize == maxBatchSize {
if bd.batchSize == maxBatchSize {
// Create batch containing multiple of node shard count sstables
// and size up to 5% of expected node workload.
expectedNodeWorkload := b.expectedShardWorkload * int64(shardCnt)
expectedNodeWorkload := bd.expectedShardWorkload * int64(shardCnt)
sizeLimit := expectedNodeWorkload / 20
for {
for j := 0; j < int(shardCnt); j++ {
if i >= len(dir.SSTables) {
if i >= len(rdw.SSTables) {
break
}
size += dir.SSTables[i].Size
size += rdw.SSTables[i].Size
i++
}
if i >= len(dir.SSTables) {
if i >= len(rdw.SSTables) {
break
}
if size > sizeLimit {
Expand All @@ -198,9 +192,9 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(b.batchSize*int(shardCnt), len(dir.SSTables))
i = min(bd.batchSize*int(shardCnt), len(rdw.SSTables))
for j := 0; j < i; j++ {
size += dir.SSTables[j].Size
size += rdw.SSTables[j].Size
}
}

Expand All @@ -209,44 +203,89 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir
}
// Extend batch if it was to leave less than
// 1 sstable per shard for the next one.
if len(dir.SSTables)-i < int(shardCnt) {
for ; i < len(dir.SSTables); i++ {
size += dir.SSTables[i].Size
if len(rdw.SSTables)-i < int(shardCnt) {
for ; i < len(rdw.SSTables); i++ {
size += rdw.SSTables[i].Size
}
}

sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]
sstables := rdw.SSTables[:i]
rdw.SSTables = rdw.SSTables[i:]

dir.Size -= size
t.Size -= size
l.Size -= size
rdw.Size -= size
bd.workload.TableSize[rdw.TableName] -= size
bd.workload.LocationSize[rdw.Location] -= size
bd.workload.TotalSize -= size
return batch{
TableName: dir.TableName,
ManifestInfo: dir.ManifestInfo,
RemoteSSTableDir: dir.RemoteSSTableDir,
TableName: rdw.TableName,
ManifestInfo: rdw.ManifestInfo,
RemoteSSTableDir: rdw.RemoteSSTableDir,
Size: size,
SSTables: sstables,
}, true
}

func sortWorkloadBySizeDesc(workload []LocationWorkload) {
slices.SortFunc(workload, func(a, b LocationWorkload) int {
// ReportSuccess notifies batchDispatcher that given batch was restored successfully.
func (bd *batchDispatcher) ReportSuccess(b batch) {
bd.mu.Lock()
defer bd.mu.Unlock()

bd.remainingBytes -= b.Size
if bd.done() {
bd.wakeUpWaiting()
}
}

// ReportFailure notifies batchDispatcher that given batch failed to be restored.
func (bd *batchDispatcher) ReportFailure(host string, b batch) error {
bd.mu.Lock()
defer bd.mu.Unlock()

// Mark failed DC for host
bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC)

var rdw *RemoteDirWorkload
for i := range bd.workload.RemoteDir {
if bd.workload.RemoteDir[i].RemoteSSTableDir == b.RemoteSSTableDir {
rdw = &bd.workload.RemoteDir[i]
}
}
if rdw == nil {
return errors.Errorf("unknown remote sstable dir %s", b.RemoteSSTableDir)
}

var newSST []RemoteSSTable
newSST = append(newSST, b.SSTables...)
newSST = append(newSST, rdw.SSTables...)

rdw.SSTables = newSST
Michal-Leszczynski marked this conversation as resolved.
Show resolved Hide resolved
rdw.Size += b.Size
bd.workload.TableSize[b.TableName] += b.Size
bd.workload.LocationSize[b.Location] += b.Size
karol-kokoszka marked this conversation as resolved.
Show resolved Hide resolved

bd.wakeUpWaiting()
return nil
}

func (bd *batchDispatcher) wakeUpWaiting() {
close(bd.wait)
bd.wait = make(chan struct{})
}

func sortWorkload(workload Workload) {
// Order remote sstable dirs by table size, then by their size (decreasing).
slices.SortFunc(workload.RemoteDir, func(a, b RemoteDirWorkload) int {
ats := workload.TableSize[a.TableName]
bts := workload.TableSize[b.TableName]
if ats != bts {
return int(bts - ats)
}
return int(b.Size - a.Size)
})
for _, loc := range workload {
slices.SortFunc(loc.Tables, func(a, b TableWorkload) int {
// Order sstables by their size (decreasing)
for _, rdw := range workload.RemoteDir {
slices.SortFunc(rdw.SSTables, func(a, b RemoteSSTable) int {
return int(b.Size - a.Size)
})
for _, tab := range loc.Tables {
slices.SortFunc(tab.RemoteDirs, func(a, b RemoteDirWorkload) int {
return int(b.Size - a.Size)
})
for _, dir := range tab.RemoteDirs {
slices.SortFunc(dir.SSTables, func(a, b RemoteSSTable) int {
return int(b.Size - a.Size)
})
}
}
}
}
Loading
Loading