Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing: use UUID based SSTables for testing #4197

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -656,6 +656,7 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) {
}
}
}
return nil, nil
})

Print("When: run repair")
Expand Down Expand Up @@ -779,9 +780,9 @@ func TestServiceRepairOrderIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -814,7 +815,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) {

if fullTable == "" {
t.Logf("This is strange %s", jobID)
return
return nil, nil
}

// Update actual repair order on both repair start and end
Expand All @@ -825,6 +826,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) {
muARO.Unlock()
}
}
return nil, nil
})

Print("When: run repair")
Expand Down Expand Up @@ -1004,9 +1006,9 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
return nil, nil
}))

h.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand Down Expand Up @@ -1036,7 +1038,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
// This helps to test repair error resilience.
if !stopErrInject.Load() && rspCnt.Add(1)%20 == 0 {
resp.Body = io.NopCloser(bytes.NewBufferString(fmt.Sprintf("%q", scyllaclient.CommandFailed)))
return
return nil, nil
}

status := string(body)
Expand All @@ -1057,6 +1059,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
}
}
}
return nil, nil
})

validate := func(tab string, tr []scyllaclient.TokenRange) (redundant int, err error) {
Expand Down
142 changes: 131 additions & 11 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,34 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
. "github.com/scylladb/scylla-manager/v3/pkg/service/restore"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper"
"github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -184,6 +190,100 @@ func testLocation(bucket, dc string) Location {
}
}

// newRenameSnapshotSSTablesRespInterceptor renames SSTables in the snapshot directory right
// after the snapshot has been taken. It uses the name mapping provided by the idGen function.
func newRenameSnapshotSSTablesRespInterceptor(client *scyllaclient.Client, s gocqlx.Session, idGen func(id string) string) func(*http.Response, error) (*http.Response, error) {
return func(r *http.Response, err error) (*http.Response, error) {
// Look for successful response to snapshot call
if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost {
return nil, nil
}
host, _, err := net.SplitHostPort(r.Request.Host)
if err != nil {
return nil, errors.New("snapshot response notifier error: get response host: " + err.Error())
}
q := r.Request.URL.Query()
ks := q.Get("kn")
rawTabs := q.Get("cf")
tag := q.Get("tag")
tabs := strings.Split(rawTabs, ",")
if len(tabs) == 0 || slices.Equal(tabs, []string{""}) {
tabs, err = client.Tables(context.Background(), ks)
if err != nil {
return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error())
}
}

for _, tab := range tabs {
version, err := query.GetTableVersion(s, ks, tab)
if err != nil {
return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error())
}
snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag)
// Get snapshot files
files := make([]string, 0)
err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) {
// Watch out for the non-sstable files (e.g. manifest.json)
if _, err := sstable.ExtractID(item.Name); err != nil {
return
}
files = append(files, item.Name)
})
if err != nil {
return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error())
}
// Rename snapshot files
mapping := sstable.RenameSStables(files, idGen)
for initial, renamed := range mapping {
if initial != renamed {
src := path.Join(snapshotDir, initial)
dst := path.Join(snapshotDir, renamed)
if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil {
return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error())
}
}
}
}
return nil, nil
}
}

// halfUUIDToIntIDGen is a possible idGen that can be used in newRenameSnapshotSSTablesRespInterceptor.
// It maps around half of encountered UUID SSTables into integer SSTables.
// It only works if the snapshot dir has less than 10000000 SSTables.
func halfUUIDToIntIDGen() func(string) string {
var mu sync.Mutex
mapping := make(map[string]string)
renameUUID := true
cnt := 10000000
return func(id string) string {
mu.Lock()
defer mu.Unlock()
// Handle integer SSTable.
// We want to leave them as they are.
// We hope that because cnt is set to a huge
// number, we won't encounter name collisions with
// the renamed UUID SSTables.
if _, err := strconv.Atoi(id); err == nil {
return id
}

if newID, ok := mapping[id]; ok {
return newID
}
cnt++
// Handle UUID SSTable.
// We want to rename only half of them.
if renameUUID {
mapping[id] = fmt.Sprint(cnt)
} else {
mapping[id] = id
}
renameUUID = !renameUUID
return mapping[id]
}
}

func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -828,9 +928,9 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
}))

b := atomic.NewInt64(0)
dstH.Hrt.SetRespNotifier(func(resp *http.Response, err error) {
dstH.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) {
if resp == nil {
return
return nil, nil
}

var copiedBody bytes.Buffer
Expand All @@ -846,6 +946,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
cancel2()
}
}
return nil, nil
})

Print("When: run restore and stop it during load and stream")
Expand Down Expand Up @@ -892,10 +993,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
func TestRestoreTablesVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand All @@ -919,10 +1020,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) {
func TestRestoreSchemaVersionedIntegration(t *testing.T) {
testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t)
const (
testLoadCnt = 2
testLoadCnt = 5
testLoadSize = 1
testBatchSize = 1
testParallel = 3
testParallel = 0
corruptCnt = 3
)

Expand Down Expand Up @@ -972,10 +1073,15 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
corruptedTable = "keyspaces"
}

// Force creation of integer SSTables in the snapshot dir,
// as only integer SSTables can be versioned.
// This also allows us to test scenario with mixed ID type SSTables.
srcH.Hrt.SetRespInterceptor(newRenameSnapshotSSTablesRespInterceptor(srcH.Client, srcSession, halfUUIDToIntIDGen()))

// Restore should be performed on user with limited permissions
dropNonSuperUsers(t, dstSession)
createUser(t, dstSession, user, "pass")
dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")
//dropNonSuperUsers(t, dstSession)
//createUser(t, dstSession, user, "pass")
//dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass")

if target.RestoreTables {
Print("Recreate schema on destination cluster")
Expand Down Expand Up @@ -1011,6 +1117,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if _, err = VersionedFileCreationTime(item.Name); err == nil {
t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path))
}

// Corrupt only integer SSTables
id, err := sstable.ExtractID(item.Name)
if err != nil {
t.Fatal(err)
}
if _, err := strconv.Atoi(id); err != nil {
return
}

if strings.HasSuffix(item.Name, ".db") {
switch {
case len(firstCorrupt) < corruptCnt:
Expand All @@ -1025,6 +1141,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
if err != nil {
t.Fatal(err)
}
if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 {
t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d",
len(firstCorrupt), len(bothCorrupt), len(secondCorrupt))
}

crc32FileNameFromGivenSSTableFile := func(sstable string) string {
// Split the filename by dashes
Expand Down Expand Up @@ -1145,9 +1265,9 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt,
target.SnapshotTag = tag3

if target.RestoreTables {
grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
// grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
} else {
grantRestoreSchemaPermissions(t, dstSession, user)
// grantRestoreSchemaPermissions(t, dstSession, user)
}

if err = dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil {
Expand Down
29 changes: 16 additions & 13 deletions pkg/testutils/hrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

// HackableRoundTripper is a round tripper that allows for interceptor injection.
type HackableRoundTripper struct {
inner http.RoundTripper
interceptor http.RoundTripper
respNotifier func(resp *http.Response, err error)
mu sync.Mutex
inner http.RoundTripper
interceptor http.RoundTripper
respInterceptor func(*http.Response, error) (*http.Response, error)
mu sync.Mutex
}

func NewHackableRoundTripper(inner http.RoundTripper) *HackableRoundTripper {
Expand All @@ -30,12 +30,13 @@ func (h *HackableRoundTripper) SetInterceptor(rt http.RoundTripper) {
h.interceptor = rt
}

// SetRespNotifier sets a respNotifier which is called on responses returned by both
// interceptor and inner round tripper.
func (h *HackableRoundTripper) SetRespNotifier(rn func(*http.Response, error)) {
// SetRespInterceptor sets a response interceptor which is called on responses returned by both
// interceptor and inner round tripper. If response interceptor returns nil for
// both response and error the process falls back to the original response and error.
func (h *HackableRoundTripper) SetRespInterceptor(ri func(*http.Response, error) (*http.Response, error)) {
h.mu.Lock()
defer h.mu.Unlock()
h.respNotifier = rn
h.respInterceptor = ri
}

// Interceptor returns the current interceptor.
Expand All @@ -45,11 +46,11 @@ func (h *HackableRoundTripper) Interceptor() http.RoundTripper {
return h.interceptor
}

// RespNotifier returns the current respNotifier.
func (h *HackableRoundTripper) RespNotifier() func(*http.Response, error) {
// RespInterceptor returns the current respInterceptor.
func (h *HackableRoundTripper) RespInterceptor() func(*http.Response, error) (*http.Response, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.respNotifier
return h.respInterceptor
}

// RoundTrip implements http.RoundTripper.
Expand All @@ -60,8 +61,10 @@ func (h *HackableRoundTripper) RoundTrip(req *http.Request) (resp *http.Response
if resp == nil && err == nil {
resp, err = h.inner.RoundTrip(req)
}
if rn := h.RespNotifier(); rn != nil {
rn(resp, err)
if rn := h.RespInterceptor(); rn != nil {
if respI, errI := rn(resp, err); respI != nil || errI != nil {
resp, err = respI, errI
}
}
return
}
4 changes: 1 addition & 3 deletions testing/scylla/config/scylla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,4 @@ api_doc_dir: /usr/lib/scylla/api/api-doc/
alternator_port: 8000
alternator_write_isolation: only_rmw_uses_lwt
alternator_enforce_authorization: true
enable_ipv6_dns_lookup: true

uuid_sstable_identifiers_enabled: false
enable_ipv6_dns_lookup: true
Loading