Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore): adds --dc-mapping flag to restore command #4213

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ include testing/.env
INTEGRATION_TEST_ARGS := -cluster $(PUBLIC_NET)100 \
-managed-cluster $(PUBLIC_NET)11,$(PUBLIC_NET)12,$(PUBLIC_NET)13,$(PUBLIC_NET)21,$(PUBLIC_NET)22,$(PUBLIC_NET)23 \
-test-network $(PUBLIC_NET) \
-managed-second-cluster $(PUBLIC_NET)31,$(PUBLIC_NET)32 \
-managed-second-cluster $(PUBLIC_NET)31,$(PUBLIC_NET)32,$(PUBLIC_NET)41,$(PUBLIC_NET)42 \
-user cassandra -password cassandra \
-agent-auth-token token \
-s3-data-dir ./testing/minio/data -s3-provider Minio -s3-endpoint $(MINIO_ENDPOINT) -s3-access-key-id $(MINIO_USER_ACCESS_KEY) -s3-secret-access-key $(MINIO_USER_SECRET_KEY)
Expand Down
46 changes: 32 additions & 14 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ type command struct {
flag.TaskBase
client *managerclient.Client

cluster string
location []string
keyspace []string
snapshotTag string
batchSize int
parallel int
transfers int
rateLimit []string
allowCompaction bool
unpinAgentCPU bool
restoreSchema bool
restoreTables bool
dryRun bool
showTables bool
cluster string
location []string
keyspace []string
snapshotTag string
batchSize int
parallel int
transfers int
rateLimit []string
allowCompaction bool
unpinAgentCPU bool
restoreSchema bool
restoreTables bool
dryRun bool
showTables bool
dcMapping dcMappings
skipDCMappingValidation bool
}

func NewCommand(client *managerclient.Client) *cobra.Command {
Expand Down Expand Up @@ -90,6 +92,8 @@ func (cmd *command) init() {
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "")
w.Unwrap().Var(&cmd.dcMapping, "dc-mapping", "")
w.Unwrap().BoolVar(&cmd.skipDCMappingValidation, "skip-dc-mapping-validation", false, "")
}

func (cmd *command) run(args []string) error {
Expand Down Expand Up @@ -182,6 +186,20 @@ func (cmd *command) run(args []string) error {
props["restore_tables"] = cmd.restoreTables
ok = true
}
if cmd.Flag("dc-mapping").Changed {
if cmd.Update() {
return wrapper("dc-mapping")
}
props["dc_mapping"] = cmd.dcMapping
ok = true
}
if cmd.Flag("skip-dc-mapping-validation").Changed {
if cmd.Update() {
return wrapper("skip-dc-mapping-validation")
}
props["skip_dc_mapping_validation"] = cmd.skipDCMappingValidation
ok = true
}

if cmd.dryRun {
res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task)
Expand Down
70 changes: 70 additions & 0 deletions pkg/command/restore/dcmappings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (C) 2025 ScyllaDB

package restore

import (
"strings"

"github.com/pkg/errors"
)

type dcMappings []dcMapping

type dcMapping struct {
Source []string `json:"source"`
Target []string `json:"target"`
}

// Set parses --dc-mapping flag, where the syntax is following:
// ; - used to split different mappings
// => - used to split source => target DCs
// , - used to seprate DCs.
func (dcm *dcMappings) Set(v string) error {
mappingParts := strings.Split(v, ";")
for _, dcMapPart := range mappingParts {
sourceTargetParts := strings.Split(dcMapPart, "=>")
if len(sourceTargetParts) != 2 {
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart)
}
if sourceTargetParts[0] == "" || sourceTargetParts[1] == "" {
return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart)
}

var mapping dcMapping
mapping.Source = parseDCList(sourceTargetParts[0])
mapping.Target = parseDCList(sourceTargetParts[1])

*dcm = append(*dcm, mapping)
}
return nil
}

func parseDCList(raw string) []string {
dcs := strings.Split(raw, ",")
for i, dc := range dcs {
dcs[i] = strings.TrimSpace(dc)
}
return dcs
}

// String builds --dc-mapping flag back from struct.
func (dcm *dcMappings) String() string {
if dcm == nil {
return ""
}
var res strings.Builder
for i, mapping := range *dcm {
res.WriteString(
strings.Join(mapping.Source, ",") + "=>" + strings.Join(mapping.Target, ","),
)
if i != len(*dcm)-1 {
res.WriteString(";")
}
}
return res.String()
}

// Type implements pflag.Value interface.
func (dcm *dcMappings) Type() string {
return "dc-mapping"
}
120 changes: 120 additions & 0 deletions pkg/command/restore/dcmappings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (C) 2025 ScyllaDB
package restore

import (
"fmt"
"slices"
"testing"
)

func TestSetDCMapping(t *testing.T) {
testCases := []struct {
input string
expectedErr bool
expectedMappings dcMappings
}{
{
input: "dc1=>dc2",
expectedMappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
},
},
{
input: " dc1, dc2 => dc1, dc2",
expectedMappings: dcMappings{
{Source: []string{"dc1", "dc2"}, Target: []string{"dc1", "dc2"}},
},
},
{
input: "dc1=>dc3;dc2=>dc4",
expectedMappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc3"}},
{Source: []string{"dc2"}, Target: []string{"dc4"}},
},
},
{
input: "dc1,dc2=>dc3",
expectedMappings: dcMappings{
{Source: []string{"dc1", "dc2"}, Target: []string{"dc3"}},
},
},
{
input: "dc1=>dc3=>dc2=>dc4",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: ";",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "=>",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "dc1=>",
expectedMappings: dcMappings{},
expectedErr: true,
},
{
input: "dc1=>;",
expectedMappings: dcMappings{},
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
var mappings dcMappings

err := mappings.Set(tc.input)
if tc.expectedErr && err == nil {
t.Fatal("Expected err, but got nil")
}
if !tc.expectedErr && err != nil {
t.Fatalf("Unexpected err: %v", err)
}
equal := slices.EqualFunc(tc.expectedMappings, mappings, func(a, b dcMapping) bool {
return slices.Equal(a.Source, b.Source) &&
slices.Equal(a.Target, b.Target)
})
if !equal {
t.Fatalf("Expected %v, but got %v", tc.expectedMappings, mappings)
}
})
}

}

func TestDCMappingString(t *testing.T) {
testCases := []struct {
mappings dcMappings
expected string
}{
{
mappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
},
expected: "dc1=>dc2",
},
{
mappings: dcMappings{
{Source: []string{"dc1"}, Target: []string{"dc2"}},
{Source: []string{"dc3"}, Target: []string{"dc4"}},
},
expected: "dc1=>dc2;dc3=>dc4",
},
{},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
actual := tc.mappings.String()
if actual != tc.expected {
t.Fatalf("Expected %q, but got %q", tc.expected, actual)
}
})
}
}
16 changes: 16 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,19 @@ dry-run: |

show-tables: |
Prints table names together with keyspace, used in combination with --dry-run.

dc-mapping: |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sctool command docs are actually also a part of the regular SM docs.
In order to include those changes in the docs, you should run make docs from repo root dir. In order to see how the changes were rendered, you can run make -C docs preview(or justmake previewfromdocs` dir).

Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.
All DCs from source cluster should be explicitly mapped to all DCs in the target cluster. The only exception is when
source and target cluster has exact match: source dcs == target dcs.
If only some DCs are needed to be restored, partial dc mappings can be used with --skip-dc-mapping-validation flag.
Only works with tables restoration (--restore-tables=true).
Syntax:
"source_dc1,source_dc2=>target_dc1,target_dc2"
Multiple mappings are separated by semicolons (;)
Examples:
"dc1,dc2=>dc3" - data from dc1 and dc2 DCs should be restored to dc3 DC.

skip-dc-mapping-validation: |
Allows to proceed with restore command when dc mapping doesn't describe all DCs from source and target clusters.
DCs that are not in --dc-mapping will be ignored and won't be restored.
24 changes: 12 additions & 12 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type batchDispatcher struct {
hostShardCnt map[string]uint
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationInfo []LocationInfo) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
Expand All @@ -70,7 +70,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload, locationHosts),
workloadProgress: newWorkloadProgress(workload, locationInfo),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand Down Expand Up @@ -106,22 +106,22 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
func newWorkloadProgress(workload Workload, locationInfo []LocationInfo) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
for i, rdw := range workload.RemoteDir {
dcBytes[rdw.DC] += rdw.Size
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC)
p[i] = remoteSSTableDirProgress{
RemainingSize: rdw.Size,
RemainingSSTables: rdw.SSTables,
}
}
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
hostDCAccess := map[string][]string{}
for _, l := range locationInfo {
for dc, hosts := range l.DCHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], dc)
}
}
}
return workloadProgress{
Expand Down Expand Up @@ -201,8 +201,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
if slices.Contains(bd.workloadProgress.hostFailedDC[host], rdw.DC) {
continue
}
// Sip dir from location without access
// Skip dir from location without access
if !slices.Contains(bd.workloadProgress.hostDCAccess[host], rdw.DC) {
continue
}
Expand Down
Loading