Skip to content

Commit

Permalink
Merge #140150
Browse files Browse the repository at this point in the history
140150: roachprod: add ip address expander func r=srosenberg,herkolategan a=DarrylWong

This change adds ip to the list of supported roachprod expander funcs. e.g. `{ip:1-3}` now expands to the private ip addresses of nodes 1 to 3. `:public` can be appended, e.g. `{ip:1-3:public}` to specify the public ip addresses.

Additionally, this change also logs expanded commands for roachtests if they differ from the original command. This should ease debugging as we can know what command was actually run on the node.

Release note: none
Epic: none
Fixes: none

----

Additional context is that this PR is motivated by my playing around with iptables on a roachprod cluster. Found running `roachprod ip` repeatedly and copy pasting tedious.

Co-authored-by: DarrylWong <[email protected]>
  • Loading branch information
craig[bot] and DarrylWong committed Feb 5, 2025
2 parents 2e7d5a6 + 59f61f0 commit bbacfa6
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 26 deletions.
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,7 @@ func (c *clusterImpl) RunE(ctx context.Context, options install.RunOptions, args
}
if err := roachprod.Run(
ctx, l, c.MakeNodes(nodes), "", "", c.IsSecure(),
l.Stdout, l.Stderr, args, options.WithExpanderConfig(expanderCfg),
l.Stdout, l.Stderr, args, options.WithExpanderConfig(expanderCfg).WithLogExpandedCommand(),
); err != nil {
if err := ctx.Err(); err != nil {
l.Printf("(note: incoming context was canceled: %s)", err)
Expand Down Expand Up @@ -2578,7 +2578,7 @@ func (c *clusterImpl) RunWithDetails(
}
results, err := roachprod.RunWithDetails(
ctx, l, c.MakeNodes(nodes), "" /* SSHOptions */, "", /* processTag */
c.IsSecure(), args, options.WithExpanderConfig(expanderCfg),
c.IsSecure(), args, options.WithExpanderConfig(expanderCfg).WithLogExpandedCommand(),
)

var logFileFull string
Expand Down
6 changes: 2 additions & 4 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,11 +1341,9 @@ func (f *blackholeFailer) Fail(ctx context.Context, nodeID int) {
// FailPartial creates a partial blackhole failure between the given node and
// peers.
func (f *blackholeFailer) FailPartial(ctx context.Context, nodeID int, peerIDs []int) {
peerIPs, err := f.c.InternalIP(ctx, f.t.L(), peerIDs)
require.NoError(f.t, err)

for _, peerIP := range peerIPs {
for _, peerID := range peerIDs {
pgport := fmt.Sprintf("{pgport:%d}", nodeID)
peerIP := fmt.Sprintf("{ip:%d}", peerID)

// When dropping both input and output, make sure we drop packets in both
// directions for both the inbound and outbound TCP connections, such that
Expand Down
16 changes: 4 additions & 12 deletions pkg/cmd/roachtest/tests/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,8 @@ func initJepsen(ctx context.Context, t test.Test, c cluster.Cluster, j jepsenCon
c.Run(ctx, option.WithNodes(workers), "sh", "-c", `"cat controller_id_rsa.pub >> .ssh/authorized_keys"`)
// Prime the known hosts file, and use the unhashed format to
// work around JSCH auth error: https://github.com/jepsen-io/jepsen/blob/master/README.md
ips, err := c.InternalIP(ctx, t.L(), workers)
if err != nil {
t.Fatal(err)
}
for _, ip := range ips {
c.Run(ctx, option.WithNodes(controller), "sh", "-c", fmt.Sprintf(`"ssh-keyscan -t rsa %s >> .ssh/known_hosts"`, ip))
for _, worker := range workers {
c.Run(ctx, option.WithNodes(controller), "sh", "-c", fmt.Sprintf(`"ssh-keyscan -t rsa {ip:%d} >> .ssh/known_hosts"`, worker))
}

t.L().Printf("cluster initialization complete\n")
Expand Down Expand Up @@ -313,12 +309,8 @@ func runJepsen(ctx context.Context, t test.Test, c cluster.Cluster, testName, ne

// Get the IP addresses for all our workers.
var nodeFlags []string
ips, err := c.InternalIP(ctx, t.L(), c.Range(1, c.Spec().NodeCount-1))
if err != nil {
t.Fatal(err)
}
for _, ip := range ips {
nodeFlags = append(nodeFlags, "-n "+ip)
for _, node := range c.Range(1, c.Spec().NodeCount-1) {
nodeFlags = append(nodeFlags, fmt.Sprintf("-n {ip:%d}", node))
}
nodesStr := strings.Join(nodeFlags, " ")

Expand Down
1 change: 1 addition & 0 deletions pkg/roachprod/install/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_test(
srcs = [
"cluster_synced_test.go",
"cockroach_test.go",
"expander_test.go",
"monitor_test.go",
"services_test.go",
"session_test.go",
Expand Down
15 changes: 11 additions & 4 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,12 @@ func (r *RunResultDetails) Output(decorate bool) string {
type RunCmdOptions struct {
combinedOut bool
includeRoachprodEnvVars bool
stdin io.Reader
stdout, stderr io.Writer
remoteOptions []remoteSessionOption
expanderConfig ExpanderConfig
// If true, logs the expanded result if it differs from the original command.
logExpandedCmd bool
stdin io.Reader
stdout, stderr io.Writer
remoteOptions []remoteSessionOption
expanderConfig ExpanderConfig
}

// Default RunCmdOptions enable combining output (stdout and stderr) and capturing ssh (verbose) debug output.
Expand Down Expand Up @@ -826,6 +828,9 @@ func (c *SyncedCluster) runCmdOnSingleNode(
if err != nil {
return &noResult, errors.WithDetailf(err, "error expanding command: %s", cmd)
}
if opts.logExpandedCmd && expandedCmd != cmd {
l.Printf("Node %d expanded cmd: %s", e.node, expandedCmd)
}

nodeCmd := expandedCmd
// Be careful about changing these command strings. In particular, we need
Expand Down Expand Up @@ -931,6 +936,7 @@ func (c *SyncedCluster) Run(
stdout: stdout,
stderr: stderr,
expanderConfig: options.ExpanderConfig,
logExpandedCmd: options.LogExpandedCmd,
}
result, err := c.runCmdOnSingleNode(ctx, l, node, cmd, opts)
return result, err
Expand Down Expand Up @@ -1015,6 +1021,7 @@ func (c *SyncedCluster) RunWithDetails(
stdout: l.Stdout,
stderr: l.Stderr,
expanderConfig: options.ExpanderConfig,
logExpandedCmd: options.LogExpandedCmd,
}
result, err := c.runCmdOnSingleNode(ctx, l, node, cmd, opts)
return result, err
Expand Down
49 changes: 45 additions & 4 deletions pkg/roachprod/install/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var pgURLRe = regexp.MustCompile(`{pgurl(:[-,0-9]+|:(?i)lb)?(:[a-z0-9\-]+)?(:[0-
var pgHostRe = regexp.MustCompile(`{pghost(:[-,0-9]+|:(?i)lb)?(:[a-z0-9\-]+)?(:[0-9]+)?}`)
var pgPortRe = regexp.MustCompile(`{pgport(:[-,0-9]+)?(:[a-z0-9\-]+)?(:[0-9]+)?}`)
var uiPortRe = regexp.MustCompile(`{uiport(:[-,0-9]+)}`)
var ipAddressRe = regexp.MustCompile(`{ip(:\d+([-,]\d+)?)(:public|:private)?}`)
var storeDirRe = regexp.MustCompile(`{store-dir(:[0-9]+)?}`)
var logDirRe = regexp.MustCompile(`{log-dir(:[a-z0-9\-]+)?(:[0-9]+)?}`)
var certsDirRe = regexp.MustCompile(`{certs-dir}`)
Expand All @@ -51,10 +52,12 @@ type ExpanderConfig struct {
type expander struct {
node Node

pgURLs map[string]map[Node]string
pgHosts map[Node]string
pgPorts map[Node]string
uiPorts map[Node]string
pgURLs map[string]map[Node]string
pgHosts map[Node]string
pgPorts map[Node]string
uiPorts map[Node]string
publicIPs map[Node]string
privateIPs map[Node]string
}

// expanderFunc is a function which may expand a string with a templated value.
Expand All @@ -77,6 +80,7 @@ func (e *expander) expand(
e.maybeExpandStoreDir,
e.maybeExpandLogDir,
e.maybeExpandCertsDir,
e.maybeExpandIPAddress,
}
for _, f := range expanders {
v, expanded, fErr := f(ctx, l, c, cfg, s)
Expand Down Expand Up @@ -329,3 +333,40 @@ func (e *expander) maybeExpandCertsDir(
}
return c.CertsDir(e.node), true, nil
}

// maybeExpandIPAddress is an expanderFunc for {ipaddress:<nodeSpec>[:public|private]}
func (e *expander) maybeExpandIPAddress(
ctx context.Context, l *logger.Logger, c *SyncedCluster, cfg ExpanderConfig, s string,
) (string, bool, error) {
m := ipAddressRe.FindStringSubmatch(s)
if m == nil {
return s, false, nil
}

var err error
switch m[3] {
case ":public":
if e.publicIPs == nil {
e.publicIPs = make(map[Node]string, len(c.VMs))
for _, node := range allNodes(len(c.VMs)) {
e.publicIPs[node] = c.Host(node)
}
}

s, err = e.maybeExpandMap(c, e.publicIPs, m[1])
default:
if e.privateIPs == nil {
e.privateIPs = make(map[Node]string, len(c.VMs))
for _, node := range allNodes(len(c.VMs)) {
ip, err := c.GetInternalIP(node)
if err != nil {
return "", false, err
}
e.privateIPs[node] = ip
}
}

s, err = e.maybeExpandMap(c, e.privateIPs, m[1])
}
return s, err == nil, err
}
84 changes: 84 additions & 0 deletions pkg/roachprod/install/expander_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package install

import (
"context"
"encoding/json"
"github.com/cockroachdb/cockroach/pkg/roachprod/cloud"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func newTestCluster(t *testing.T) *SyncedCluster {
testClusterFile := datapathutils.TestDataPath(t, "test_cluster.txt")
data, err := os.ReadFile(testClusterFile)
if err != nil {
t.Fatalf("could not read test cluster file: %s", err)
}
metadata := &cloud.Cluster{}
if err := json.Unmarshal(data, metadata); err != nil {
t.Fatalf("could not unmarshal test cluster file: %s", err)
}
c, err := NewSyncedCluster(metadata, "all", MakeClusterSettings())
if err != nil {
t.Fatalf("could not create synced cluster: %s", err)
}
return c
}

func TestIPExpander(t *testing.T) {
ctx := context.Background()
l := &logger.Logger{}

c := newTestCluster(t)
e := &expander{
node: c.Nodes[0],
}
cfg := ExpanderConfig{}

for _, tc := range []struct {
name string
command string
expected string
}{
{
name: "same node",
command: "{ip:1}",
expected: "10.142.1.1",
},
{
name: "different node",
command: "{ip:2}",
expected: "10.142.1.2",
},
{
name: "range of nodes",
command: "{ip:1-3}",
expected: "10.142.1.1 10.142.1.2 10.142.1.3",
},
{
name: "non consecutive nodes",
command: "{ip:2,4}",
expected: "10.142.1.2 10.142.1.4",
},
{
name: "public ip",
command: "{ip:1-4:public}",
expected: "35.196.120.1 35.227.25.2 34.75.95.3 34.139.54.4",
},
{
name: "private ip",
command: "{ip:1-4:private}",
expected: "10.142.1.1 10.142.1.2 10.142.1.3 10.142.1.4",
},
} {
t.Run(tc.name, func(t *testing.T) {
res, err := e.expand(ctx, l, c, cfg, tc.command)
require.NoError(t, err)
require.Equal(t, tc.expected, res)
})
}
}
7 changes: 7 additions & 0 deletions pkg/roachprod/install/run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type RunOptions struct {
// ExpanderConfig configures the behaviour of the roachprod expander
// during a run.
ExpanderConfig ExpanderConfig
// LogExpandedCmd will log the expanded command if it differs from the original.
LogExpandedCmd bool

// These are private to roachprod
Nodes Nodes
Expand Down Expand Up @@ -106,3 +108,8 @@ func (r RunOptions) WithExpanderConfig(cfg ExpanderConfig) RunOptions {
r.ExpanderConfig = cfg
return r
}

func (r RunOptions) WithLogExpandedCommand() RunOptions {
r.LogExpandedCmd = true
return r
}
Loading

0 comments on commit bbacfa6

Please sign in to comment.