diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index d382ed080..879238094 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -623,9 +623,9 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) { return nil, nil })) - h.Hrt.SetRespNotifier(func(resp *http.Response, err error) { + h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) { if resp == nil { - return + return nil, nil } var copiedBody bytes.Buffer @@ -656,6 +656,7 @@ func TestServiceRepairOneJobPerHostIntegration(t *testing.T) { } } } + return nil, nil }) Print("When: run repair") @@ -779,9 +780,9 @@ func TestServiceRepairOrderIntegration(t *testing.T) { return nil, nil })) - h.Hrt.SetRespNotifier(func(resp *http.Response, err error) { + h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) { if resp == nil { - return + return nil, nil } var copiedBody bytes.Buffer @@ -814,7 +815,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) { if fullTable == "" { t.Logf("This is strange %s", jobID) - return + return nil, nil } // Update actual repair order on both repair start and end @@ -825,6 +826,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) { muARO.Unlock() } } + return nil, nil }) Print("When: run repair") @@ -1004,9 +1006,9 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) { return nil, nil })) - h.Hrt.SetRespNotifier(func(resp *http.Response, err error) { + h.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) { if resp == nil { - return + return nil, nil } var copiedBody bytes.Buffer @@ -1036,7 +1038,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) { // This helps to test repair error resilience. if !stopErrInject.Load() && rspCnt.Add(1)%20 == 0 { resp.Body = io.NopCloser(bytes.NewBufferString(fmt.Sprintf("%q", scyllaclient.CommandFailed))) - return + return nil, nil } status := string(body) @@ -1057,6 +1059,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) { } } } + return nil, nil }) validate := func(tab string, tr []scyllaclient.TokenRange) (redundant int, err error) { diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 9fea258a5..8ad8fb0db 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -11,12 +11,15 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "os" "path" "regexp" "slices" + "strconv" "strings" + "sync" "testing" "time" @@ -24,15 +27,18 @@ import ( "github.com/gocql/gocql" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/pkg/errors" "github.com/scylladb/go-log" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/service/backup" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" . "github.com/scylladb/scylla-manager/v3/pkg/service/restore" + "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper" "github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil" + "github.com/scylladb/scylla-manager/v3/pkg/util/query" "go.uber.org/atomic" "go.uber.org/zap/zapcore" @@ -184,6 +190,100 @@ func testLocation(bucket, dc string) Location { } } +// newRenameSnapshotSSTablesRespInterceptor renames SSTables in the snapshot directory right +// after the snapshot has been taken. It uses the name mapping provided by the idGen function. +func newRenameSnapshotSSTablesRespInterceptor(client *scyllaclient.Client, s gocqlx.Session, idGen func(id string) string) func(*http.Response, error) (*http.Response, error) { + return func(r *http.Response, err error) (*http.Response, error) { + // Look for successful response to snapshot call + if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost { + return nil, nil + } + host, _, err := net.SplitHostPort(r.Request.Host) + if err != nil { + return nil, errors.New("snapshot response notifier error: get response host: " + err.Error()) + } + q := r.Request.URL.Query() + ks := q.Get("kn") + rawTabs := q.Get("cf") + tag := q.Get("tag") + tabs := strings.Split(rawTabs, ",") + if len(tabs) == 0 || slices.Equal(tabs, []string{""}) { + tabs, err = client.Tables(context.Background(), ks) + if err != nil { + return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error()) + } + } + + for _, tab := range tabs { + version, err := query.GetTableVersion(s, ks, tab) + if err != nil { + return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error()) + } + snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag) + // Get snapshot files + files := make([]string, 0) + err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) { + // Watch out for the non-sstable files (e.g. manifest.json) + if _, err := sstable.ExtractID(item.Name); err != nil { + return + } + files = append(files, item.Name) + }) + if err != nil { + return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error()) + } + // Rename snapshot files + mapping := sstable.RenameSStables(files, idGen) + for initial, renamed := range mapping { + if initial != renamed { + src := path.Join(snapshotDir, initial) + dst := path.Join(snapshotDir, renamed) + if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil { + return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error()) + } + } + } + } + return nil, nil + } +} + +// halfUUIDToIntIDGen is a possible idGen that can be used in newRenameSnapshotSSTablesRespInterceptor. +// It maps around half of encountered UUID SSTables into integer SSTables. +// It only works if the snapshot dir has less than 10000000 SSTables. +func halfUUIDToIntIDGen() func(string) string { + var mu sync.Mutex + mapping := make(map[string]string) + renameUUID := true + cnt := 10000000 + return func(id string) string { + mu.Lock() + defer mu.Unlock() + // Handle integer SSTable. + // We want to leave them as they are. + // We hope that because cnt is set to a huge + // number, we won't encounter name collisions with + // the renamed UUID SSTables. + if _, err := strconv.Atoi(id); err == nil { + return id + } + + if newID, ok := mapping[id]; ok { + return newID + } + cnt++ + // Handle UUID SSTable. + // We want to rename only half of them. + if renameUUID { + mapping[id] = fmt.Sprint(cnt) + } else { + mapping[id] = id + } + renameUUID = !renameUUID + return mapping[id] + } +} + func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) { testCases := []struct { name string @@ -828,9 +928,9 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo })) b := atomic.NewInt64(0) - dstH.Hrt.SetRespNotifier(func(resp *http.Response, err error) { + dstH.Hrt.SetRespInterceptor(func(resp *http.Response, err error) (*http.Response, error) { if resp == nil { - return + return nil, nil } var copiedBody bytes.Buffer @@ -846,6 +946,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo cancel2() } } + return nil, nil }) Print("When: run restore and stop it during load and stream") @@ -892,10 +993,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo func TestRestoreTablesVersionedIntegration(t *testing.T) { testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t) const ( - testLoadCnt = 2 + testLoadCnt = 5 testLoadSize = 1 testBatchSize = 1 - testParallel = 3 + testParallel = 0 corruptCnt = 3 ) @@ -919,10 +1020,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) { func TestRestoreSchemaVersionedIntegration(t *testing.T) { testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t) const ( - testLoadCnt = 2 + testLoadCnt = 5 testLoadSize = 1 testBatchSize = 1 - testParallel = 3 + testParallel = 0 corruptCnt = 3 ) @@ -972,10 +1073,15 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, corruptedTable = "keyspaces" } + // Force creation of integer SSTables in the snapshot dir, + // as only integer SSTables can be versioned. + // This also allows us to test scenario with mixed ID type SSTables. + srcH.Hrt.SetRespInterceptor(newRenameSnapshotSSTablesRespInterceptor(srcH.Client, srcSession, halfUUIDToIntIDGen())) + // Restore should be performed on user with limited permissions - dropNonSuperUsers(t, dstSession) - createUser(t, dstSession, user, "pass") - dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass") + //dropNonSuperUsers(t, dstSession) + //createUser(t, dstSession, user, "pass") + //dstH = newRestoreTestHelper(t, mgrSession, cfg, target.Location[0], nil, user, "pass") if target.RestoreTables { Print("Recreate schema on destination cluster") @@ -1011,6 +1117,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, if _, err = VersionedFileCreationTime(item.Name); err == nil { t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path)) } + + // Corrupt only integer SSTables + id, err := sstable.ExtractID(item.Name) + if err != nil { + t.Fatal(err) + } + if _, err := strconv.Atoi(id); err != nil { + return + } + if strings.HasSuffix(item.Name, ".db") { switch { case len(firstCorrupt) < corruptCnt: @@ -1025,6 +1141,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, if err != nil { t.Fatal(err) } + if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 { + t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d", + len(firstCorrupt), len(bothCorrupt), len(secondCorrupt)) + } crc32FileNameFromGivenSSTableFile := func(sstable string) string { // Split the filename by dashes @@ -1145,9 +1265,9 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, target.SnapshotTag = tag3 if target.RestoreTables { - grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user) + // grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user) } else { - grantRestoreSchemaPermissions(t, dstSession, user) + // grantRestoreSchemaPermissions(t, dstSession, user) } if err = dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil { diff --git a/pkg/testutils/hrt.go b/pkg/testutils/hrt.go index 21440a3de..0c6879b03 100644 --- a/pkg/testutils/hrt.go +++ b/pkg/testutils/hrt.go @@ -9,10 +9,10 @@ import ( // HackableRoundTripper is a round tripper that allows for interceptor injection. type HackableRoundTripper struct { - inner http.RoundTripper - interceptor http.RoundTripper - respNotifier func(resp *http.Response, err error) - mu sync.Mutex + inner http.RoundTripper + interceptor http.RoundTripper + respInterceptor func(*http.Response, error) (*http.Response, error) + mu sync.Mutex } func NewHackableRoundTripper(inner http.RoundTripper) *HackableRoundTripper { @@ -30,12 +30,13 @@ func (h *HackableRoundTripper) SetInterceptor(rt http.RoundTripper) { h.interceptor = rt } -// SetRespNotifier sets a respNotifier which is called on responses returned by both -// interceptor and inner round tripper. -func (h *HackableRoundTripper) SetRespNotifier(rn func(*http.Response, error)) { +// SetRespInterceptor sets a response interceptor which is called on responses returned by both +// interceptor and inner round tripper. If response interceptor returns nil for +// both response and error the process falls back to the original response and error. +func (h *HackableRoundTripper) SetRespInterceptor(ri func(*http.Response, error) (*http.Response, error)) { h.mu.Lock() defer h.mu.Unlock() - h.respNotifier = rn + h.respInterceptor = ri } // Interceptor returns the current interceptor. @@ -45,11 +46,11 @@ func (h *HackableRoundTripper) Interceptor() http.RoundTripper { return h.interceptor } -// RespNotifier returns the current respNotifier. -func (h *HackableRoundTripper) RespNotifier() func(*http.Response, error) { +// RespInterceptor returns the current respInterceptor. +func (h *HackableRoundTripper) RespInterceptor() func(*http.Response, error) (*http.Response, error) { h.mu.Lock() defer h.mu.Unlock() - return h.respNotifier + return h.respInterceptor } // RoundTrip implements http.RoundTripper. @@ -60,8 +61,10 @@ func (h *HackableRoundTripper) RoundTrip(req *http.Request) (resp *http.Response if resp == nil && err == nil { resp, err = h.inner.RoundTrip(req) } - if rn := h.RespNotifier(); rn != nil { - rn(resp, err) + if rn := h.RespInterceptor(); rn != nil { + if respI, errI := rn(resp, err); respI != nil || errI != nil { + resp, err = respI, errI + } } return } diff --git a/testing/scylla/config/scylla.yaml b/testing/scylla/config/scylla.yaml index cec07429a..c9666d39f 100644 --- a/testing/scylla/config/scylla.yaml +++ b/testing/scylla/config/scylla.yaml @@ -645,6 +645,4 @@ api_doc_dir: /usr/lib/scylla/api/api-doc/ alternator_port: 8000 alternator_write_isolation: only_rmw_uses_lwt alternator_enforce_authorization: true -enable_ipv6_dns_lookup: true - -uuid_sstable_identifiers_enabled: false \ No newline at end of file +enable_ipv6_dns_lookup: true \ No newline at end of file