Skip to content

Commit

Permalink
roachtest/failure-injection: add failure injection smoke test
Browse files Browse the repository at this point in the history
This adds an integration test for the failure injection library.
The test spins up a cluster and randomly selects a failure to
inject. It then validates that the failure was correctly injected.
Afterwards, it reverts the failure and validates that the failure
was correctly cleaned up.
  • Loading branch information
DarrylWong committed Feb 19, 2025
1 parent c2ece08 commit 8721ab3
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"context"
gosql "database/sql"
"fmt"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"io"
"net/http"
"os"
"regexp"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand Down Expand Up @@ -215,3 +217,19 @@ func IfLocal(c cluster.Cluster, trueVal, falseVal string) string {
}
return falseVal
}

// CheckPortBlocked returns true if a connection from a node to a port on another node
// can be established. Requires nmap to be installed.
func CheckPortBlocked(
ctx context.Context, l *logger.Logger, c cluster.Cluster, fromNode, toNode option.NodeListOption, port string,
) (bool, error) {
// `nmap -oG` example output:
// Host: {IP} {HOST_NAME} Status: Up
// Host: {IP} {HOST_NAME} Ports: 26257/open/tcp//cockroach///
// We care about the port scan result and whether it is filtered or open.
res, err := c.RunWithDetailsSingleNode(ctx, l, option.WithNodes(fromNode), fmt.Sprintf("nmap -p %s {ip%s} -oG - | awk '/Ports:/{print $5}'", port, toNode))
if err != nil {
return false, err
}
return strings.Contains(res.Stdout, "filtered"), nil
}
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"event_log.go",
"export_parquet.go",
"failover.go",
"failure_injection.go",
"fixtures.go",
"follower_reads.go",
"go_helpers.go",
Expand Down Expand Up @@ -250,6 +251,7 @@ go_library(
"//pkg/roachprod",
"//pkg/roachprod/config",
"//pkg/roachprod/errors",
"//pkg/roachprod/failureinjection/failures",
"//pkg/roachprod/install",
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
Expand Down
306 changes: 306 additions & 0 deletions pkg/cmd/roachtest/tests/failure_injection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package tests

import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/failureinjection/failures"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/errors"
"math/rand"
)

type failureSmokeTest struct {
testName string
failureName string
args failures.FailureArgs
// Validate that the failure was injected correctly, called after Setup() and Inject().
validateFailure func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error
// Validate that the failure was restored correctly, called after Restore().
validateRestore func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error
}

func (t *failureSmokeTest) run(
ctx context.Context, l *logger.Logger, c cluster.Cluster, fr *failures.FailureRegistry,
) error {
// TODO(darryl): In the future, roachtests should interact with the failure injection library
// through helper functions in roachtestutil so they don't have to interface with roachprod
// directly.
failureMode, err := fr.GetFailureMode(c.MakeNodes(), t.failureName, l, c.IsSecure())
if err != nil {
return err
}
if err = failureMode.Setup(ctx, l, t.args); err != nil {
return err
}
if err = failureMode.Inject(ctx, l, t.args); err != nil {
return err
}

// Allow the failure to take effect.
if err = failureMode.WaitForFailureToPropagate(ctx, l, t.args); err != nil {
return err
}

if err = t.validateFailure(ctx, l, c); err != nil {
return err
}
if err = failureMode.Restore(ctx, l, t.args); err != nil {
return err
}

// Allow the cluster to return to normal.
if err = failureMode.WaitForFailureToRestore(ctx, l, t.args); err != nil {
return err
}

if err = t.validateRestore(ctx, l, c); err != nil {
return err
}
if err = failureMode.Cleanup(ctx, l, t.args); err != nil {
return err
}
return nil
}

func (t *failureSmokeTest) noopRun(
ctx context.Context, l *logger.Logger, c cluster.Cluster, _ *failures.FailureRegistry,
) error {
if err := t.validateFailure(ctx, l, c); err == nil {
return errors.New("no failure was injected but validation still passed")
}
if err := t.validateRestore(ctx, l, c); err != nil {
return errors.Wrapf(err, "no failure was injected but post restore validation still failed")
}
return nil
}

var bidirectionalNetworkPartitionTest = func(c cluster.Cluster) failureSmokeTest {
nodes := c.CRDBNodes()
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
srcNode := nodes[0]
destNode := nodes[1]
unaffectedNode := nodes[2]
return failureSmokeTest{
testName: "bidirectional network partition",
failureName: failures.IPTablesNetworkPartitionName,
args: failures.NetworkPartitionArgs{
Partitions: []failures.NetworkPartition{{
Source: install.Nodes{install.Node(srcNode)},
Destination: install.Nodes{install.Node(destNode)},
Type: failures.Bidirectional,
}},
},
validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if !blocked {
return errors.Errorf("expected connections from node %d to node %d to be blocked", srcNode, destNode)
}

blocked, err = roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(unaffectedNode), fmt.Sprintf("{pgport:%d}", unaffectedNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to not be blocked", srcNode, unaffectedNode)
}
return nil
},
validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to not be blocked", srcNode, destNode)
}
return err
},
}
}

var asymmetricIncomingNetworkPartitionTest = func(c cluster.Cluster) failureSmokeTest {
nodes := c.CRDBNodes()
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
srcNode := nodes[0]
destNode := nodes[1]
return failureSmokeTest{
testName: "asymmetric network partition - incoming traffic dropped",
failureName: failures.IPTablesNetworkPartitionName,
args: failures.NetworkPartitionArgs{
Partitions: []failures.NetworkPartition{{
Source: install.Nodes{install.Node(srcNode)},
Destination: install.Nodes{install.Node(destNode)},
Type: failures.Incoming,
}},
},
validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to be open", srcNode, destNode)
}

blocked, err = roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(destNode), c.Nodes(srcNode), fmt.Sprintf("{pgport:%d}", srcNode))
if err != nil {
return err
}
if !blocked {
return errors.Errorf("expected connections from node %d to node %d to be blocked", destNode, srcNode)
}
return nil
},
validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to be open", srcNode, destNode)
}
return err
},
}
}

var asymmetricOutgoingNetworkPartitionTest = func(c cluster.Cluster) failureSmokeTest {
nodes := c.CRDBNodes()
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
srcNode := nodes[0]
destNode := nodes[1]
return failureSmokeTest{
testName: "asymmetric network partition - outgoing traffic dropped",
failureName: failures.IPTablesNetworkPartitionName,
args: failures.NetworkPartitionArgs{
Partitions: []failures.NetworkPartition{{
Source: install.Nodes{install.Node(srcNode)},
Destination: install.Nodes{install.Node(destNode)},
Type: failures.Outgoing,
}},
},
validateFailure: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if !blocked {
return errors.Errorf("expected connections from node %d to node %d to be blocked", srcNode, destNode)
}

blocked, err = roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(destNode), c.Nodes(srcNode), fmt.Sprintf("{pgport:%d}", srcNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to be open", destNode, srcNode)
}
return nil
},
validateRestore: func(ctx context.Context, l *logger.Logger, c cluster.Cluster) error {
blocked, err := roachtestutil.CheckPortBlocked(ctx, l, c, c.Nodes(srcNode), c.Nodes(destNode), fmt.Sprintf("{pgport:%d}", destNode))
if err != nil {
return err
}
if blocked {
return errors.Errorf("expected connections from node %d to node %d to be open", srcNode, destNode)
}
return err
},
}
}

func setupFailureSmokeTests(ctx context.Context, t test.Test, c cluster.Cluster) error {
// Download any dependencies needed.
if err := c.Install(ctx, t.L(), c.CRDBNodes(), "nmap"); err != nil {
return err
}
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.CRDBNodes())
// Run a light workload in the background so we have some traffic in the database.
c.Run(ctx, option.WithNodes(c.WorkloadNode()), "./cockroach workload init tpcc --warehouses=100 {pgurl:1}")
t.Go(func(goCtx context.Context, l *logger.Logger) error {
return c.RunE(goCtx, option.WithNodes(c.WorkloadNode()), "./cockroach workload run tpcc --tolerate-errors --warehouses=100 {pgurl:1-3}")
}, task.WithContext(ctx))
return nil
}

func runFailureSmokeTest(ctx context.Context, t test.Test, c cluster.Cluster, noopFailer bool) {
if err := setupFailureSmokeTests(ctx, t, c); err != nil {
t.Fatal(err)
}
fr := failures.NewFailureRegistry()
fr.Register()

var failureSmokeTests = []failureSmokeTest{
bidirectionalNetworkPartitionTest(c),
asymmetricIncomingNetworkPartitionTest(c),
asymmetricOutgoingNetworkPartitionTest(c),
}

// Randomize the order of the tests in case any of the failures have unexpected side
// effects that may mask failures, e.g. a cgroups disk stall isn't properly restored
// which causes a dmsetup disk stall to appear to work even if it doesn't.
rand.Shuffle(len(failureSmokeTests), func(i, j int) {
failureSmokeTests[i], failureSmokeTests[j] = failureSmokeTests[j], failureSmokeTests[i]
})

for _, test := range failureSmokeTests {
t.L().Printf("running %s test", test.testName)
if noopFailer {
if err := test.noopRun(ctx, t.L(), c, fr); err != nil {
t.Fatal(err)
}
} else {
if err := test.run(ctx, t.L(), c, fr); err != nil {
t.Fatal(errors.Wrapf(err, "%s failed", test.testName))
}
}
t.L().Printf("%s test complete", test.testName)
}
}

func registerFISmokeTest(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "failure-injection-smoke-test",
Owner: registry.OwnerTestEng,
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(2), spec.WorkloadNodeCPU(2), spec.ReuseNone()),
CompatibleClouds: registry.OnlyGCE,
// TODO(darryl): When the FI library starts seeing more use through roachtests, CLI, etc. switch this to Nightly.
Suites: registry.ManualOnly,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailureSmokeTest(ctx, t, c, false /* noopFailer */)
},
})
r.Add(registry.TestSpec{
Name: "failure-injection-noop-smoke-test",
Owner: registry.OwnerTestEng,
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(2), spec.WorkloadNodeCPU(2), spec.ReuseNone()),
CompatibleClouds: registry.OnlyGCE,
Suites: registry.ManualOnly,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailureSmokeTest(ctx, t, c, true /* noopFailer */)
},
})
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func RegisterTests(r registry.Registry) {
registerHibernate(r, hibernateSpatialOpts)
registerHotSpotSplits(r)
registerHTTPRestart(r)
registerFISmokeTest(r)
registerImportCancellation(r)
registerImportDecommissioned(r)
registerImportMixedVersions(r)
Expand Down
5 changes: 5 additions & 0 deletions pkg/roachprod/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ rm /tmp/otelcol-contrib.deb;
"bzip2": `
sudo apt-get update;
sudo apt-get install -y bzip2;
`,

"nmap": `
sudo apt-get update;
sudo apt-get install -y nmap;
`,
}

Expand Down

0 comments on commit 8721ab3

Please sign in to comment.