From 90b4bac7776c240a845771ac9c308aa069ca01f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Thu, 17 Oct 2024 15:44:28 +0200 Subject: [PATCH] feat(restore_test): extend TestRestoreTablesPreparationIntegration with cpu pinning This way this test also checks cpu pinning before and after backup. It also checks cpu pinning before, in the middle, when paused, when resumed, and after restore. --- .../restore/restore_integration_test.go | 69 ++++++++++++++----- .../scylla-manager-agent-ipv6.yaml | 2 + .../scylla-manager-agent.yaml | 2 + 3 files changed, 54 insertions(+), 19 deletions(-) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 318377cd58..300dc20a2d 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -10,6 +10,8 @@ import ( "encoding/json" "fmt" "net/http" + "runtime" + "slices" "strings" "sync/atomic" "testing" @@ -498,7 +500,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) - validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) { + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int, cpus []int64) { // Validate tombstone_gc mode if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) @@ -537,6 +539,24 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) } } + // Validate cpu pinning + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.GetPinnedCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + slices.Sort(cpus) + slices.Sort(got) + if !slices.Equal(cpus, got) { + t.Errorf("expected cpus=%v, got=%v on host %s", cpus, got, host) + } + } + } + + pinnedCPU := []int64{0} // Taken from scylla-manager-agent.yaml used for testing + unpinnedCPU := make([]int64, runtime.NumCPU()) + for i := range unpinnedCPU { + unpinnedCPU[i] = int64(i) } shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) @@ -545,7 +565,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } transfers0 := 2 * int(shardCnt) - setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) { + setTransfersAndRateLimitAndPinnedCPU := func(ch clusterHelper, transfers int, rateLimit int, pin bool) { for _, host := range ch.Client.Config().Hosts { err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers) if err != nil { @@ -555,15 +575,26 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { if err != nil { t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host)) } + if pin { + err = ch.Client.PinCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "pin CPUs on host %s", host)) + } + } else { + err = ch.Client.UnpinFromCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "unpin CPUs on host %s", host)) + } + } } } Print("Set initial transfers and rate limit") - setTransfersAndRateLimit(h.srcCluster, 10, 99) - setTransfersAndRateLimit(h.dstCluster, 10, 99) + setTransfersAndRateLimitAndPinnedCPU(h.srcCluster, 10, 99, true) + setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 10, 99, true) Print("Validate state before backup") - validateState(h.srcCluster, "repair", true, 10, 99) + validateState(h.srcCluster, "repair", true, 10, 99, pinnedCPU) Print("Run backup") loc := []Location{testLocation("preparation", "")} @@ -577,18 +608,19 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { }) Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3, 88) + validateState(h.srcCluster, "repair", true, 3, 88, pinnedCPU) runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() rawProps, err := json.Marshal(map[string]any{ - "location": loc, - "keyspace": ksFilter, - "snapshot_tag": tag, - "transfers": 0, - "rate_limit": []string{"0"}, - "restore_tables": true, + "location": loc, + "keyspace": ksFilter, + "snapshot_tag": tag, + "transfers": 0, + "rate_limit": []string{"0"}, + "unpin_agent_cpu": true, + "restore_tables": true, }) if err != nil { finishedRestore <- err @@ -621,7 +653,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { makeLASHang(reachedDataStageChan, hangLAS) Print("Validate state before restore") - validateState(h.dstCluster, "repair", true, 10, 99) + validateState(h.dstCluster, "repair", true, 10, 99, pinnedCPU) Print("Run restore") finishedRestore := make(chan error) @@ -636,7 +668,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during restore data") - validateState(h.dstCluster, "disabled", false, transfers0, 0) + validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU) Print("Pause restore") restoreCancel() @@ -651,11 +683,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during pause") - validateState(h.dstCluster, "disabled", true, transfers0, 0) + validateState(h.dstCluster, "disabled", true, transfers0, 0, pinnedCPU) Print("Change transfers and rate limit during pause") - setTransfersAndRateLimit(h.srcCluster, 9, 55) - setTransfersAndRateLimit(h.dstCluster, 9, 55) + setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 9, 55, false) reachedDataStageChan = make(chan struct{}) hangLAS = make(chan struct{}) @@ -674,7 +705,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during restore data after pause") - validateState(h.dstCluster, "disabled", false, transfers0, 0) + validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU) Print("Release LAS") close(hangLAS) @@ -686,7 +717,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState(h.dstCluster, "repair", true, transfers0, 0) + validateState(h.dstCluster, "repair", true, transfers0, 0, pinnedCPU) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) diff --git a/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml b/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml index d7ee3d4f06..2f4398284d 100644 --- a/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml +++ b/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml @@ -1,5 +1,7 @@ auth_token: token +cpu: 0 + debug: :5112 logger: diff --git a/testing/scylla-manager-agent/scylla-manager-agent.yaml b/testing/scylla-manager-agent/scylla-manager-agent.yaml index 9075bbc833..83431b07dd 100644 --- a/testing/scylla-manager-agent/scylla-manager-agent.yaml +++ b/testing/scylla-manager-agent/scylla-manager-agent.yaml @@ -1,5 +1,7 @@ auth_token: token +cpu: 0 + debug: :5112 logger: