Skip to content

Commit

Permalink
Testing: use UUID based SSTables for testing (#4197)
Browse files Browse the repository at this point in the history
* fix(testing): use UUID based SSTables for testing

UUID SSTables are the default in newer Scylla version,
so we should focus on testing them and mock integer
based SSTables when needed.

Ref #4182

* feat(testutils): allow modifying hrt response

It's useful for scenarios where we want to
fail the test in case of response interceptor
error, which is difficult to achieve without
returning error response.

* fix(restore_test): versioned restore with mixed SSTable ID types

This way we are able to cover mixed scenarios (integer + UUID SSTables),
which was missing from our test coverage. It also allows us to move
to testing the default UUID SSTables, which should be our priority.

Fixes #4182
  • Loading branch information
Michal-Leszczynski authored Jan 21, 2025
1 parent 6a7b777 commit 1072a47
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 35 deletions.
19 changes: 11 additions & 8 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -656,6 +656,7 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) {
}
}
}
return nil, nil
})

Print("When: run repair")
Expand Down Expand Up @@ -779,9 +780,9 @@ func TestServiceRepairOrderIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -814,7 +815,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) {

if fullTable == "" {
t.Logf("This is strange %s", jobID)
return
return nil, nil
}

// Update actual repair order on both repair start and end
Expand All @@ -825,6 +826,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) {
muARO.Unlock()
}
}
return nil, nil
})

Print("When: run repair")
Expand Down Expand Up @@ -1004,9 +1006,9 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -1036,7 +1038,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
// This helps to test repair error resilience.
if !stopErrInject.Load() && rspCnt.Add(1)%20 == 0 {
resp.Body = io.NopCloser(bytes.NewBufferString(fmt.Sprintf("%q", scyllaclient.CommandFailed)))
return
return nil, nil
}

status := string(body)
Expand All @@ -1057,6 +1059,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
}
}
}
return nil, nil
})

validate := func(tab string, tr []scyllaclient.TokenRange) (redundant int, err error) {
Expand Down
142 changes: 131 additions & 11 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,34 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
. "github.com/scylladb/scylla-manager/v3/pkg/service/restore"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper"
"github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -184,6 +190,100 @@ func testLocation(bucket, dc string) Location {
}
}

// newRenameSnapshotSSTablesRespInterceptor renames SSTables in the snapshot directory right
// after the snapshot has been taken. It uses the name mapping provided by the idGen function.
func newRenameSnapshotSSTablesRespInterceptor(client *scyllaclient.Client, s gocqlx.Session, idGen func(id string) string) func(*http.Response, error) (*http.Response, error) {
return func(r *http.Response, err error) (*http.Response, error) {
// Look for successful response to snapshot call
if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost {
return nil, nil
}
host, _, err := net.SplitHostPort(r.Request.Host)
if err != nil {
return nil, errors.New("snapshot response notifier error: get response host: " + err.Error())
}
q := r.Request.URL.Query()
ks := q.Get("kn")
rawTabs := q.Get("cf")
tag := q.Get("tag")
tabs := strings.Split(rawTabs, ",")
if len(tabs) == 0 || slices.Equal(tabs, []string{""}) {
tabs, err = client.Tables(context.Background(), ks)
if err != nil {
return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error())
}
}

for _, tab := range tabs {
version, err := query.GetTableVersion(s, ks, tab)
if err != nil {
return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error())
}
snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag)
// Get snapshot files
files := make([]string, 0)
err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) {
// Watch out for the non-sstable files (e.g. manifest.json)
if _, err := sstable.ExtractID(item.Name); err != nil {
return
}
files = append(files, item.Name)
})
if err != nil {
return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error())
}
// Rename snapshot files
mapping := sstable.RenameSStables(files, idGen)
for initial, renamed := range mapping {
if initial != renamed {
src := path.Join(snapshotDir, initial)
dst := path.Join(snapshotDir, renamed)
if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil {
return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error())
}
}
}
}
return nil, nil
}
}

// halfUUIDToIntIDGen is a possible idGen that can be used in newRenameSnapshotSSTablesRespInterceptor.
// It maps around half of encountered UUID SSTables into integer SSTables.
// It only works if the snapshot dir has less than 10000000 SSTables.
func halfUUIDToIntIDGen() func(string) string {
var mu sync.Mutex
mapping := make(map[string]string)
renameUUID := true
cnt := 10000000
return func(id string) string {
mu.Lock()
defer mu.Unlock()
// Handle integer SSTable.
// We want to leave them as they are.
// We hope that because cnt is set to a huge
// number, we won't encounter name collisions with
// the renamed UUID SSTables.
if _, err := strconv.Atoi(id); err == nil {
return id
}

if newID, ok := mapping[id]; ok {
return newID
}
cnt++
// Handle UUID SSTable.
// We want to rename only half of them.
if renameUUID {
mapping[id] = fmt.Sprint(cnt)
} else {
mapping[id] = id
}
renameUUID = !renameUUID
return mapping[id]
}
}

func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -828,9 +928,9 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
}))

b := atomic.NewInt64(0)
dstH.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
dstH.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand All @@ -846,6 +946,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
cancel2()
}
}
return nil, nil
})

Print("When: run restore and stop it during load and stream")
Expand Down Expand Up @@ -892,10 +993,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
func TestRestoreTablesVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand All @@ -919,10 +1020,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) {
func TestRestoreSchemaVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand Down Expand Up @@ -972,10 +1073,15 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
corruptedTable = "keyspaces"
}

// Force creation of integer SSTables in the snapshot dir,
// as only integer SSTables can be versioned.
// This also allows us to test scenario with mixed ID type SSTables.
srcH.Hrt.SetRespInterceptor(newRenameSnapshotSSTablesRespInterceptor(srcH.Client, srcSession, halfUUIDToIntIDGen()))

// Restore should be performed on user with limited permissions
dropNonSuperUsers(t, dstSession)
createUser(t, dstSession, user, "pass")
dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")
//dropNonSuperUsers(t, dstSession)
//createUser(t, dstSession, user, "pass")
//dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")

if target.RestoreTables {
Print("Recreate schema on destination cluster")
Expand Down Expand Up @@ -1011,6 +1117,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if _, err = VersionedFileCreationTime(item.Name); err == nil {
t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path))
}

// Corrupt only integer SSTables
id, err := sstable.ExtractID(item.Name)
if err != nil {
t.Fatal(err)
}
if _, err := strconv.Atoi(id); err != nil {
return
}

if strings.HasSuffix(item.Name, ".db") {
switch {
case len(firstCorrupt) < corruptCnt:
Expand All @@ -1025,6 +1141,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if err != nil {
t.Fatal(err)
}
if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 {
t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d",
len(firstCorrupt), len(bothCorrupt), len(secondCorrupt))
}

crc32FileNameFromGivenSSTableFile := func(sstable string) string {
// Split the filename by dashes
Expand Down Expand Up @@ -1145,9 +1265,9 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
target.SnapshotTag = tag3

if target.RestoreTables {
grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
// grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
} else {
grantRestoreSchemaPermissions(t, dstSession, user)
// grantRestoreSchemaPermissions(t, dstSession, user)
}

if err = dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil {
Expand Down
29 changes: 16 additions & 13 deletions pkg/testutils/hrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

// HackableRoundTripper is a round tripper that allows for interceptor injection.
type HackableRoundTripper struct {
inner http.RoundTripper
interceptor http.RoundTripper
respNotifier func(resp *http.Response, err error)
mu sync.Mutex
inner http.RoundTripper
interceptor http.RoundTripper
respInterceptor func(*http.Response, error) (*http.Response, error)
mu sync.Mutex
}

func NewHackableRoundTripper(inner http.RoundTripper) *HackableRoundTripper {
Expand All @@ -30,12 +30,13 @@ func (h *HackableRoundTripper) SetInterceptor(rt http.RoundTripper) {
h.interceptor = rt
}

// SetRespNotifier sets a respNotifier which is called on responses returned by both
// interceptor and inner round tripper.
func (h *HackableRoundTripper) SetRespNotifier(rn func(*http.Response, error)) {
// SetRespInterceptor sets a response interceptor which is called on responses returned by both
// interceptor and inner round tripper. If response interceptor returns nil for
// both response and error the process falls back to the original response and error.
func (h *HackableRoundTripper) SetRespInterceptor(ri func(*http.Response, error) (*http.Response, error)) {
h.mu.Lock()
defer h.mu.Unlock()
h.respNotifier = rn
h.respInterceptor = ri
}

// Interceptor returns the current interceptor.
Expand All @@ -45,11 +46,11 @@ func (h *HackableRoundTripper) Interceptor() http.RoundTripper {
return h.interceptor
}

// RespNotifier returns the current respNotifier.
func (h *HackableRoundTripper) RespNotifier() func(*http.Response, error) {
// RespInterceptor returns the current respInterceptor.
func (h *HackableRoundTripper) RespInterceptor() func(*http.Response, error) (*http.Response, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.respNotifier
return h.respInterceptor
}

// RoundTrip implements http.RoundTripper.
Expand All @@ -60,8 +61,10 @@ func (h *HackableRoundTripper) RoundTrip(req *http.Request) (resp *http.Response
if resp == nil && err == nil {
resp, err = h.inner.RoundTrip(req)
}
if rn := h.RespNotifier(); rn != nil {
rn(resp, err)
if rn := h.RespInterceptor(); rn != nil {
if respI, errI := rn(resp, err); respI != nil || errI != nil {
resp, err = respI, errI
}
}
return
}
4 changes: 1 addition & 3 deletions testing/scylla/config/scylla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,4 @@ api_doc_dir: /usr/lib/scylla/api/api-doc/
alternator_port: 8000
alternator_write_isolation: only_rmw_uses_lwt
alternator_enforce_authorization: true
enable_ipv6_dns_lookup: true

uuid_sstable_identifiers_enabled: false
enable_ipv6_dns_lookup: true

0 comments on commit 1072a47

Please sign in to comment.