Skip to content

Commit

Permalink
chore(test): adds dc-mapping integration tests
Browse files Browse the repository at this point in the history
Fixes: #3829
  • Loading branch information
VAveryanov8 committed Jan 16, 2025
1 parent 4622a83 commit f0229b4
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 1 deletion.
190 changes: 190 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func TestRestoreTablesUserIntegration(t *testing.T) {
"location": loc,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
})

Print("Log in via restored user and check permissions")
Expand Down Expand Up @@ -106,6 +110,10 @@ func TestRestoreTablesNoReplicationIntegration(t *testing.T) {
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
})

h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down Expand Up @@ -296,6 +304,10 @@ func TestRestoreSchemaDropAddColumnIntegration(t *testing.T) {
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
})

h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down Expand Up @@ -354,6 +366,10 @@ func TestRestoreTablesVnodeToTabletsIntegration(t *testing.T) {
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
})

validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, c1, c2)
Expand Down Expand Up @@ -455,6 +471,10 @@ func TestRestoreTablesPausedIntegration(t *testing.T) {
"keyspace": []string{ks1, ks2},
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
}
err = runPausedRestore(t, func(ctx context.Context) error {
h.dstCluster.RunID = uuid.NewTime()
Expand Down Expand Up @@ -625,6 +645,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
"rate_limit": []string{"0"},
"unpin_agent_cpu": true,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1", "dc2"}, "target": {"dc1"}},
},
})
if err != nil {
finishedRestore <- err
Expand Down Expand Up @@ -772,6 +796,10 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc1", "dc2"}},
},
}

t.Run("batch retry finished with success", func(t *testing.T) {
Expand Down Expand Up @@ -971,6 +999,10 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
"parallel": 1,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required, because dcs are reversed :D
"dc-mapping": []map[string][]string{
{"source": {"dc1", "dc2"}, "target": {"dc2", "dc1"}},
},
})
close(res)
}()
Expand All @@ -989,3 +1021,161 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

func TestRestoreTablesIntoClusterWithAnotherDCNameIntegration(t *testing.T) {
h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedThirdClusterHosts())

Print("Keyspace setup")
// Source cluster
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}"
ks := randomizedName("multi_location_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks))

// Target cluster
ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}"
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ks))

Print("Table setup")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
tab := randomizedName("tab_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

Print("Save filled table into map")
srcM := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ks, tab, "id", "data")

Print("Run backup")
loc := []Location{
testLocation("one-location-1", ""),
}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"batch_size": 100,
})

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
res := make(chan struct{})
go func() {
h.runRestore(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
// Test if batching does not hang with
// limited parallel and location access.
"parallel": 1,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1"}, "target": {"dc3"}},
},
})
close(res)
}()

select {
case <-res:
case <-time.NewTimer(2 * time.Minute).C:
t.Fatal("Restore hanged")
}

Print("Save restored table into map")
dstM := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ks, tab, "id", "data")

Print("Validate success")
if !maps.Equal(srcM, dstM) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

func TestRestoreOnlyOneDCFromLocation(t *testing.T) {
h := newTestHelper(t, ManagedClusterHosts(), ManagedThirdClusterHosts())

Print("Keyspace setup")
// Source cluster
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}"
ksTwoDC := randomizedName("two_dc_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ksTwoDC))

// Keyspace thats only available in dc2
ksStmtOneDC := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1':0, 'dc2': 1}"
ksOneDC := randomizedName("one_dc_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmtOneDC, ksOneDC))

// Target cluster
ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}"
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksTwoDC))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksOneDC))

Print("Table setup")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
tab := randomizedName("tab_")
for _, ks := range []string{ksTwoDC, ksOneDC} {
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
}

Print("Fill setup")
for _, ks := range []string{ksTwoDC, ksOneDC} {
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)
}

Print("Save filled table into map")
srcMTwoDC := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ksTwoDC, tab, "id", "data")

Print("Run backup")
loc := []Location{
testLocation("one-location-1", ""),
}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ksTwoDC, ksOneDC}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"batch_size": 100,
})

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
res := make(chan struct{})
go func() {
h.runRestore(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
// Test if batching does not hang with
// limited parallel and location access.
"parallel": 1,
"snapshot_tag": tag,
"restore_tables": true,
// DC Mapping is required
"dc-mapping": []map[string][]string{
{"source": {"dc1", "!dc2"}, "target": {"dc3"}},
},
})
close(res)
}()

select {
case <-res:
case <-time.NewTimer(2 * time.Minute).C:
t.Fatal("Restore hanged")
}

Print("Save restored table into map")
dstMTwoDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksTwoDC, tab, "id", "data")
dstMOneDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksOneDC, tab, "id", "data")

Print("Validate success")
if !maps.Equal(srcMTwoDC, dstMTwoDC) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcMTwoDC, dstMTwoDC)
}
if len(dstMOneDC) != 0 {
t.Fatalf("dc2 shouldn't be restored")
}
}
29 changes: 28 additions & 1 deletion pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func TestRestoreTablesSmokeIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

smokeRestore(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser, "{'class': 'NetworkTopologyStrategy', 'dc1': 2}")
Expand Down Expand Up @@ -618,7 +621,7 @@ func smokeRestore(t *testing.T, target Target, keyspace string, loadCnt, loadSiz

Print("When: restore backup on different cluster = (dc1: 3 nodes, dc2: 3 nodes)")
if err := dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil {
t.Fatal(err)
t.Fatal("Restore:", err)
}

dstH.validateRestoreSuccess(dstSession, srcSession, target, []table{{ks: keyspace, tab: BigTableName}})
Expand All @@ -645,6 +648,9 @@ func TestRestoreTablesRestartAgentsIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreWithAgentRestart(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand Down Expand Up @@ -722,6 +728,9 @@ func TestRestoreTablesResumeIntegration(t *testing.T) {
Parallel: testParallel,
RestoreTables: true,
Continue: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand All @@ -748,6 +757,9 @@ func TestRestoreTablesResumeContinueFalseIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand Down Expand Up @@ -911,6 +923,9 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreWithVersions(t, target, testKeyspace, testLoadCnt, testLoadSize, corruptCnt, testUser)
Expand Down Expand Up @@ -1185,6 +1200,9 @@ func TestRestoreTablesViewCQLSchemaIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreViewCQLSchema(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand Down Expand Up @@ -1269,6 +1287,9 @@ func TestRestoreFullViewSSTableSchemaIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreViewSSTableSchema(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand Down Expand Up @@ -1356,6 +1377,9 @@ func TestRestoreFullIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreAllTables(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser)
Expand Down Expand Up @@ -1462,6 +1486,9 @@ func TestRestoreFullAlternatorIntegration(t *testing.T) {
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
DCMappings: DCMappings{
{Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}},
},
}

restoreAlternator(t, schemaTarget, tablesTarget, testKeyspace, testTable, testUser, testAlternatorPort)
Expand Down

0 comments on commit f0229b4

Please sign in to comment.