Skip to content

Commit

Permalink
fix(restore): don't make host hang during batching when they are done
Browse files Browse the repository at this point in the history
Consider a scenario with parallel=1 and multi-dc and multi-location.
Note that SM is using 'parallel.Run' for restoring in parallel.
TODO: finish commit message!
  • Loading branch information
Michal-Leszczynski committed Oct 21, 2024
1 parent 43491e9 commit 1d3db02
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 57 deletions.
8 changes: 7 additions & 1 deletion pkg/service/backup/backupspec/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ func NewLocation(location string) (l Location, err error) {
}

func (l Location) String() string {
p := l.Provider.String() + ":" + l.Path
p := l.StringWithoutDC()
if l.DC != "" {
p = l.DC + ":" + p
}
return p
}

// StringWithoutDC returns Location string representation
// that lacks DC information.
func (l Location) StringWithoutDC() string {
return l.Provider.String() + ":" + l.Path
}

// Datacenter returns location's datacenter.
func (l Location) Datacenter() string {
return l.DC
Expand Down
119 changes: 63 additions & 56 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// hosts (see wait description for more information)
//
// - it supports host retry - host that failed to restore batch can still
// restore other batches (see hostToFailedDC description for more information).
// restore other batches (see hostFailedDC description for more information).
type batchDispatcher struct {
// Guards all exported methods
mu sync.Mutex
Expand All @@ -39,12 +39,6 @@ type batchDispatcher struct {
// by batchDispatcher, and re-try to restore returned
// batch on their own.
wait chan struct{}
// How many hosts are waiting on wait
waitCnt int
// Is batching over either because all the data
// has been restored, or because it can't be restored
// due to re-tried failures.
done bool

// Const workload defined during indexing
workload Workload
Expand All @@ -63,13 +57,6 @@ type batchDispatcher struct {
hostCnt int
// Stores host shard count
hostShardCnt map[string]uint
// Stores which hosts have access to which locations
locationHosts map[Location][]string
// Marks which host failed to restore batches from which DCs.
// When host failed to restore a batch from one backed up DC,
// it can still restore other batches coming from different
// DCs. This is a host re-try mechanism aiming to help with #3871.
hostToFailedDC map[string][]string
}

func newBatchDispatcher(workload Workload, batchSize, hostCnt int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
Expand All @@ -85,21 +72,28 @@ func newBatchDispatcher(workload Workload, batchSize, hostCnt int, hostShardCnt
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload),
workloadProgress: newWorkloadProgress(workload, locationHosts),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostCnt: hostCnt,
hostShardCnt: hostShardCnt,
locationHosts: locationHosts,
hostToFailedDC: make(map[string][]string),
}
}

// Describes current state of SSTables that are yet to be batched.
type workloadProgress struct {
// Bytes that are yet to be restored.
// Bytes that are yet to be restored from given backed up DC.
// They are decreased after a successful batch restoration.
bytesToBeRestored int64
dcBytesToBeRestored map[string]int64
// Marks which host failed to restore batches from which DCs.
// When host failed to restore a batch from one backed up DC,
// it can still restore other batches coming from different
// DCs. This is a host re-try mechanism aiming to help with #3871.
hostFailedDC map[string][]string
// Stores which hosts have access to restore which DCs.
// It assumes that the whole DC is backed up to a single
// backup location.
hostDCAccess map[string][]string
// SSTables grouped by RemoteSSTableDir that are yet to
// be batched. They are removed on batch dispatch, but can
// be re-added when batch failed to be restored.
Expand All @@ -115,18 +109,44 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload) workloadProgress {
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
for i := range workload.RemoteDir {
for i, rdw := range workload.RemoteDir {
dcBytes[rdw.DC] += rdw.Size
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC)
p[i] = remoteSSTableDirProgress{
RemainingSize: workload.RemoteDir[i].Size,
RemainingSSTables: workload.RemoteDir[i].SSTables,
RemainingSize: rdw.Size,
RemainingSSTables: rdw.SSTables,
}
}
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
}
}
return workloadProgress{
bytesToBeRestored: workload.TotalSize,
remoteDir: p,
dcBytesToBeRestored: dcBytes,
hostFailedDC: make(map[string][]string),
hostDCAccess: hostDCAccess,
remoteDir: p,
}
}

// Checks if given host finished restoring all that it could.
func (wp workloadProgress) isDone(host string) bool {
failed := wp.hostFailedDC[host]
for _, dc := range wp.hostDCAccess[host] {
// Host isn't done when there is still some data to be restored
// from a DC that it has access to, and it didn't previously fail
// to restore data from this DC.
if !slices.Contains(failed, dc) && wp.dcBytesToBeRestored[dc] != 0 {
return false
}
}
return true
}

type batch struct {
Expand Down Expand Up @@ -188,8 +208,11 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
if bd.workloadProgress.bytesToBeRestored != 0 {
return errors.Errorf("expected all data to be restored, internal progress calculation error")
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
if bytes != 0 {
return errors.Errorf("expected all data from DC %q to be restored (missing %d bytes): "+
"internal progress calculation error", dc, bytes)
}
}
return nil
}
Expand All @@ -206,8 +229,8 @@ func (bd *batchDispatcher) DispatchBatch(ctx context.Context, host string) (batc
}

bd.mu.Lock()
// Check if there is anything to batch
if bd.done {
// Check if there is anything to do for this host
if bd.workloadProgress.isDone(host) {
bd.mu.Unlock()
return batch{}, false
}
Expand All @@ -217,38 +240,19 @@ func (bd *batchDispatcher) DispatchBatch(ctx context.Context, host string) (batc
bd.mu.Unlock()
return b, true
}
// If case all hosts would be waiting for batches,
// mark batching as finished due to re-tried failures.
if bd.waitCnt+1 == bd.hostCnt {
bd.done = true
bd.mu.Unlock()
return batch{}, false
}

// Wait for SSTables that might return after failure
wait := bd.wait
bd.waitCnt++
bd.mu.Unlock()

select {
case <-ctx.Done():
case <-wait:
}

bd.mu.Lock()
bd.waitCnt--
bd.mu.Unlock()
}
}

func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
dirIdx := bd.chooseDir(host)
if dirIdx < 0 {
return batch{}, false
}
return bd.createBatch(dirIdx, host)
}

func (bd *batchDispatcher) chooseDir(host string) int {
dirIdx := -1
for i := range bd.workloadProgress.remoteDir {
rdw := bd.workload.RemoteDir[i]
Expand All @@ -257,17 +261,20 @@ func (bd *batchDispatcher) chooseDir(host string) int {
continue
}
// Skip dir from already failed dc
if slices.Contains(bd.hostToFailedDC[host], rdw.DC) {
if slices.Contains(bd.workloadProgress.hostFailedDC[host], rdw.DC) {
continue
}
// Sip dir from location without access
if !slices.Contains(bd.locationHosts[rdw.Location], host) {
if !slices.Contains(bd.workloadProgress.hostDCAccess[host], rdw.DC) {
continue
}
dirIdx = i
break
}
return dirIdx
if dirIdx < 0 {
return batch{}, false
}
return bd.createBatch(dirIdx, host)
}

// Returns batch from given RemoteSSTableDir and updates workloadProgress.
Expand Down Expand Up @@ -337,10 +344,10 @@ func (bd *batchDispatcher) ReportSuccess(b batch) {
bd.mu.Lock()
defer bd.mu.Unlock()

bd.workloadProgress.bytesToBeRestored -= b.Size
dcBytes := bd.workloadProgress.dcBytesToBeRestored
dcBytes[b.DC] -= b.Size
// Mark batching as finished due to successful restore
if bd.workloadProgress.bytesToBeRestored == 0 {
bd.done = true
if dcBytes[b.DC] == 0 {
bd.wakeUpWaiting()
}
}
Expand All @@ -351,7 +358,7 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error {
defer bd.mu.Unlock()

// Mark failed DC for host
bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC)
bd.workloadProgress.hostFailedDC[host] = append(bd.workloadProgress.hostFailedDC[host], b.DC)

dirIdx := -1
for i := range bd.workload.RemoteDir {
Expand Down
75 changes: 75 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"net/http"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -883,3 +884,77 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
}
})
}

func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
// Since we need multi-dc clusters for multi-dc backup/restore
// we will use the same cluster as both src and dst.
h := newTestHelper(t, ManagedClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}"
ks := randomizedName("multi_location_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks))

Print("Table setup")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
tab := randomizedName("tab_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

Print("Save filled table into map")
srcM := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ks, tab, "id", "data")

Print("Run backup")
loc := []Location{
testLocation("multi-location-1", "dc1"),
testLocation("multi-location-2", "dc2"),
}
S3InitBucket(t, loc[0].Path)
S3InitBucket(t, loc[1].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"batch_size": 100,
})

Print("Truncate backed up table")
truncateStmt := "TRUNCATE TABLE %q.%q"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(truncateStmt, ks, tab))

// Reverse dcs - just for fun
loc[0].DC = "dc2"
loc[1].DC = "dc1"

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
res := make(chan struct{})
go func() {
h.runRestore(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
// Test if batching does not hang with
// limited parallel and location access.
"parallel": 1,
"snapshot_tag": tag,
"restore_tables": true,
})
close(res)
}()

select {
case <-res:
case <-time.NewTimer(2 * time.Minute).C:
t.Fatal("Restore hanged")
}

Print("Save restored table into map")
dstM := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ks, tab, "id", "data")

Print("Validate success")
if !maps.Equal(srcM, dstM) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

0 comments on commit 1d3db02

Please sign in to comment.