diff --git a/pkg/restapi/services.go b/pkg/restapi/services.go index 40f1182d0..8d314ae88 100644 --- a/pkg/restapi/services.go +++ b/pkg/restapi/services.go @@ -46,6 +46,7 @@ type HealthCheckService interface { // RepairService service interface for the REST API handlers. type RepairService interface { GetRun(ctx context.Context, clusterID, taskID, runID uuid.UUID) (*repair.Run, error) + // GetProgress must work even when the cluster is no longer available. GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (repair.Progress, error) GetTarget(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (repair.Target, error) SetIntensity(ctx context.Context, runID uuid.UUID, intensity float64) error @@ -59,15 +60,18 @@ type BackupService interface { ExtractLocations(ctx context.Context, properties []json.RawMessage) []backupspec.Location List(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backup.ListItem, error) ListFiles(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backupspec.FilesInfo, error) + // GetProgress must work even when the cluster is no longer available. GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (backup.Progress, error) DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, snapshotTags []string) error GetValidationTarget(_ context.Context, clusterID uuid.UUID, properties json.RawMessage) (backup.ValidationTarget, error) + // GetValidationProgress must work even when the cluster is no longer available. GetValidationProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) ([]backup.ValidationHostProgress, error) } // RestoreService service interface for the REST API handlers. type RestoreService interface { GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.View, error) + // GetProgress must work even when the cluster is no longer available. GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (restore.Progress, error) } diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 90e584c4b..6a54dd43d 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -989,3 +989,85 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) { t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM) } } + +func TestRestoreTablesProgressIntegration(t *testing.T) { + // It verifies that: + // - view status progress is correct + // - progress is available even when cluster is not + + if IsIPV6Network() { + t.Skip("nodes don't have ip6tables and related modules to properly simulate unavailable cluster") + } + + h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) + + Print("Keyspace setup") + ks := randomizedName("progress_") + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1)) + + Print("Table setup") + tab := randomizedName("tab_") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + + Print("View setup") + mv := randomizedName("mv_") + CreateMaterializedView(t, h.srcCluster.rootSession, ks, tab, mv) + CreateMaterializedView(t, h.dstCluster.rootSession, ks, tab, mv) + + Print("Fill setup") + fillTable(t, h.srcCluster.rootSession, 1, ks, tab) + + Print("Run backup") + loc := []Location{testLocation("progress", "")} + S3InitBucket(t, loc[0].Path) + tag := h.runBackup(t, map[string]any{ + "location": loc, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, nil, h.dstUser) + h.runRestore(t, map[string]any{ + "location": loc, + "snapshot_tag": tag, + "restore_tables": true, + }) + + Print("Validate success") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, "id", "data") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, mv, "id", "data") + + Print("Validate view progress") + pr, err := h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID) + if err != nil { + t.Fatal(errors.Wrap(err, "get progress")) + } + for _, v := range pr.Views { + if v.BuildStatus != scyllaclient.StatusSuccess { + t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus) + } + } + + BlockREST(t, ManagedClusterHosts()...) + defer func() { + TryUnblockREST(t, ManagedClusterHosts()) + if err := EnsureNodesAreUP(t, ManagedClusterHosts(), time.Minute); err != nil { + t.Fatal(err) + } + }() + time.Sleep(100 * time.Millisecond) + + Print("Validate view progress when cluster is unavailable") + pr, err = h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID) + if err != nil { + t.Fatal(errors.Wrap(err, "get progress")) + } + for _, v := range pr.Views { + if v.BuildStatus != scyllaclient.StatusSuccess { + t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus) + } + } +} diff --git a/pkg/testutils/exec.go b/pkg/testutils/exec.go index 556ca8b1c..44400769d 100644 --- a/pkg/testutils/exec.go +++ b/pkg/testutils/exec.go @@ -159,10 +159,12 @@ func WaitForNodeUPOrTimeout(h string, timeout time.Duration) error { } // BlockREST blocks the Scylla API ports on h machine by dropping TCP packets. -func BlockREST(t *testing.T, h string) { +func BlockREST(t *testing.T, hosts ...string) { t.Helper() - if err := RunIptablesCommand(t, h, CmdBlockScyllaREST); err != nil { - t.Error(err) + for _, host := range hosts { + if err := RunIptablesCommand(t, host, CmdBlockScyllaREST); err != nil { + t.Error(err) + } } }