Skip to content

Commit

Permalink
feat(restore_test): test view progress after cluster is unavailable
Browse files Browse the repository at this point in the history
This test checks for #4191.
  • Loading branch information
Michal-Leszczynski committed Jan 10, 2025
1 parent 801d654 commit e458320
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pkg/restapi/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type HealthCheckService interface {
// RepairService service interface for the REST API handlers.
type RepairService interface {
GetRun(ctx context.Context, clusterID, taskID, runID uuid.UUID) (*repair.Run, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (repair.Progress, error)
GetTarget(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (repair.Target, error)
SetIntensity(ctx context.Context, runID uuid.UUID, intensity float64) error
Expand All @@ -59,15 +60,18 @@ type BackupService interface {
ExtractLocations(ctx context.Context, properties []json.RawMessage) []backupspec.Location
List(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backup.ListItem, error)
ListFiles(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backupspec.FilesInfo, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (backup.Progress, error)
DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, snapshotTags []string) error
GetValidationTarget(_ context.Context, clusterID uuid.UUID, properties json.RawMessage) (backup.ValidationTarget, error)
// GetValidationProgress must work even when the cluster is no longer available.
GetValidationProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) ([]backup.ValidationHostProgress, error)
}

// RestoreService service interface for the REST API handlers.
type RestoreService interface {
GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.View, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (restore.Progress, error)
}

Expand Down
82 changes: 82 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,85 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

func TestRestoreTablesProgressIntegration(t *testing.T) {
// It verifies that:
// - view status progress is correct
// - progress is available even when cluster is not

if IsIPV6Network() {
t.Skip("nodes don't have ip6tables and related modules to properly simulate unavailable cluster")
}

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ks := randomizedName("progress_")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))

Print("Table setup")
tab := randomizedName("tab_")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("View setup")
mv := randomizedName("mv_")
CreateMaterializedView(t, h.srcCluster.rootSession, ks, tab, mv)
CreateMaterializedView(t, h.dstCluster.rootSession, ks, tab, mv)

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 1, ks, tab)

Print("Run backup")
loc := []Location{testLocation("progress", "")}
S3InitBucket(t, loc[0].Path)
tag := h.runBackup(t, map[string]any{
"location": loc,
})

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, nil, h.dstUser)
h.runRestore(t, map[string]any{
"location": loc,
"snapshot_tag": tag,
"restore_tables": true,
})

Print("Validate success")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, "id", "data")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, mv, "id", "data")

Print("Validate view progress")
pr, err := h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}

BlockREST(t, ManagedClusterHosts()...)
defer func() {
TryUnblockREST(t, ManagedClusterHosts())
if err := EnsureNodesAreUP(t, ManagedClusterHosts(), time.Minute); err != nil {
t.Fatal(err)
}
}()
time.Sleep(100 * time.Millisecond)

Print("Validate view progress when cluster is unavailable")
pr, err = h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}
}
8 changes: 5 additions & 3 deletions pkg/testutils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func WaitForNodeUPOrTimeout(h string, timeout time.Duration) error {
}

// BlockREST blocks the Scylla API ports on h machine by dropping TCP packets.
func BlockREST(t *testing.T, h string) {
func BlockREST(t *testing.T, hosts ...string) {
t.Helper()
if err := RunIptablesCommand(t, h, CmdBlockScyllaREST); err != nil {
t.Error(err)
for _, host := range hosts {
if err := RunIptablesCommand(t, host, CmdBlockScyllaREST); err != nil {
t.Error(err)
}
}
}

Expand Down

0 comments on commit e458320

Please sign in to comment.