Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-kokoszka committed Oct 24, 2024
1 parent 986c9f4 commit c92d05c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 23 deletions.
19 changes: 12 additions & 7 deletions pkg/service/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,13 @@ func (s *Service) discoverAndSetClusterHosts(ctx context.Context, c *Cluster) er
return errors.Wrap(s.setKnownHosts(c, knownHosts), "update known_hosts in SM DB")
}

const (
discoverClusterHostsTimeout = 5 * time.Second
)

func (s *Service) discoverClusterHosts(ctx context.Context, c *Cluster) (knownHosts, liveHosts []string, err error) {
if c.Host != "" {
knownHosts, liveHosts, err := s.discoverClusterHostUsingCoordinator(ctx, c, 5*time.Second, c.Host)
knownHosts, liveHosts, err := s.discoverClusterHostUsingCoordinator(ctx, c, discoverClusterHostsTimeout, c.Host)
if err != nil {
s.logger.Error(ctx, "Couldn't discover hosts using stored coordinator host, proceeding with other known ones",
"coordinator-host", c.Host, "error", err)
Expand Down Expand Up @@ -205,18 +209,18 @@ func (s *Service) discoverClusterHosts(ctx context.Context, c *Cluster) (knownHo
go func(host string) {
defer wg.Done()

knownHosts, liveHosts, err := s.discoverClusterHostUsingCoordinator(discoverContext, c, 5*time.Second, host)
if err != nil && !errors.Is(err, context.Canceled) {
s.logger.Error(ctx, "Couldn't discover hosts", "host", host, "error", err)
knownHosts, liveHosts, err := s.discoverClusterHostUsingCoordinator(discoverContext, c, discoverClusterHostsTimeout, host)
if err != nil {
if !errors.Is(err, context.Canceled) {
s.logger.Error(ctx, "Couldn't discover hosts", "host", host, "error", err)
}
return
}
select {

Check failure on line 219 in pkg/service/cluster/service.go

View workflow job for this annotation

GitHub Actions / Various checks

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case result <- hostsTuple{
live: liveHosts,
known: knownHosts,
}:
case <-discoverContext.Done():
return
}
}(cp)
}
Expand Down Expand Up @@ -254,8 +258,9 @@ func (s *Service) discoverClusterHostUsingCoordinator(ctx context.Context, c *Cl
if err != nil {
return nil, nil, err
}
defer logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")

knownHosts, err = s.discoverHosts(ctx, client)
logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")
if err != nil {
return nil, nil, err
}
Expand Down
31 changes: 17 additions & 14 deletions pkg/service/cluster/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ import (
)

func TestValidateHostConnectivityIntegration(t *testing.T) {
// given
Print("given: the fresh cluster")
var (
ctx = context.Background()
session = CreateScyllaManagerDBSession(t)
secretsStore = store.NewTableStore(session, table.Secrets)
timeout = 5 * time.Second
timeout = 15 * time.Second
c = &cluster.Cluster{
AuthToken: "token",
Host: ManagedClusterHost(),
Expand Down Expand Up @@ -83,39 +83,42 @@ func TestValidateHostConnectivityIntegration(t *testing.T) {
result: nil,
},
} {
t.Run(tc.name, func(innerT *testing.T) {
t.Run(tc.name, func(t *testing.T) {
defer func() {
for _, host := range tc.hostsDown {
if err := StartService(host, "scylla"); err != nil {
innerT.Logf("error on starting stopped scylla service on host={%s}, err={%s}", host, err)
t.Logf("error on starting stopped scylla service on host={%s}, err={%s}", host, err)
}
if err := RunIptablesCommand(host, CmdUnblockScyllaREST); err != nil {
innerT.Logf("error trying to unblock REST API on host = {%s}, err={%s}", host, err)
t.Logf("error trying to unblock REST API on host = {%s}, err={%s}", host, err)
}
}
}()

// then: validate that call takes less than 5 seconds
Printf("then: validate that call to validate host connectivity takes less than %v seconds", timeout.Seconds())
testCluster, err := s.GetClusterByID(context.Background(), c.ID)
if err != nil {
innerT.Fatal(err)
t.Fatal(err)
}
if err := callValidateHostConnectivityWithTimeout(ctx, s, timeout, testCluster); err != nil {
innerT.Fatal(err)
t.Fatal(err)
}
// when: the scylla service is stopped and the scylla API is timing out some hosts
Printf("when: the scylla service is stopped and the scylla API is timing out on some hosts")
// It's needed to block Scylla REST API, so that the clients are just hanging when they call the API.
// Scylla service must be stopped to make the node to report DOWN status. Blocking REST API is not
// enough.
for _, host := range tc.hostsDown {
if err := StopService(host, "scylla"); err != nil {
innerT.Fatal(err)
t.Fatal(err)
}
if err := RunIptablesCommand(host, CmdBlockScyllaREST); err != nil {
innerT.Error(err)
t.Error(err)
}
}

// then: validate that call still takes less than 5 seconds
if err := callValidateHostConnectivityWithTimeout(ctx, s, 30*time.Second, testCluster); !errors.Is(err, tc.result) {
innerT.Fatal(err)
Printf("then: validate that call still takes less than %v seconds", timeout.Seconds())
if err := callValidateHostConnectivityWithTimeout(ctx, s, timeout, testCluster); !errors.Is(err, tc.result) {
t.Fatal(err)
}
})
}
Expand Down
1 change: 0 additions & 1 deletion pkg/service/healthcheck/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func TestStatus_Ping_Independent_From_REST_Integration(t *testing.T) {
}

// When #1 -> default scenario where everything works fine
// Retry 5 times, as the service could be stopped by other
status, err := healthSvc.Status(context.Background(), testCluster.ID)
if err != nil {
t.Fatal(err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/testutils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@ func WaitForNodeUP(h string, timeout time.Duration) error {
select {
case <-done:
return
case <-time.After(time.Second):
default:
stdout, _, err := ExecOnHost(h, "nodetool status | grep "+h)
if err != nil {
continue
}
if strings.HasPrefix(stdout, "UN") {
return
}
select {
case <-done:
return
case <-time.After(time.Second):
}
}
}
}()
Expand Down

0 comments on commit c92d05c

Please sign in to comment.