Skip to content

Commit

Permalink
rpk: fix incorrect parsing of ipv6 addresses
Browse files Browse the repository at this point in the history
This fixes a bug where we were adding double
brackets to IPV6 addresses.

Fixes #23363

(cherry picked from commit 3607a3b)
  • Loading branch information
r-vasquez authored and vbotbuildovich committed Feb 4, 2025
1 parent d5bfdfc commit 32bc792
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 29 deletions.
93 changes: 64 additions & 29 deletions src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,13 +1591,15 @@ func (c *Config) fixSchemePorts() error {
if port == "" {
port = strconv.Itoa(DefaultKafkaPort)
}
host = normalizeHost(host)
c.redpandaYaml.Rpk.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port)
}
for i, a := range c.redpandaYaml.Rpk.AdminAPI.Addresses {
scheme, host, port, err := rpknet.SplitSchemeHostPort(a)
if err != nil {
return fmt.Errorf("unable to fix admin address %v: %w", a, err)
}
host = normalizeHost(host)
switch scheme {
case "":
if port == "" {
Expand All @@ -1610,54 +1612,87 @@ func (c *Config) fixSchemePorts() error {
return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme)
}
}
p := c.rpkYaml.Profile(c.rpkYaml.CurrentProfile)
for i, k := range p.KafkaAPI.Brokers {
_, host, port, err := rpknet.SplitSchemeHostPort(k)
if err != nil {
return fmt.Errorf("unable to fix broker address %v: %w", k, err)
}
if port == "" {
port = strconv.Itoa(DefaultKafkaPort)
}
p.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port)
}
for i, a := range p.AdminAPI.Addresses {
for i, a := range c.redpandaYaml.Rpk.SR.Addresses {
scheme, host, port, err := rpknet.SplitSchemeHostPort(a)
if err != nil {
return fmt.Errorf("unable to fix admin address %v: %w", a, err)
return fmt.Errorf("unable to fix schema registry address %v: %w", a, err)
}
host = normalizeHost(host)
switch scheme {
case "":
if port == "" {
port = strconv.Itoa(DefaultAdminPort)
port = strconv.Itoa(DefaultSchemaRegPort)
}
p.AdminAPI.Addresses[i] = net.JoinHostPort(host, port)
c.redpandaYaml.Rpk.SR.Addresses[i] = net.JoinHostPort(host, port)
case "http", "https":
continue // keep whatever port exists; empty ports will default to 80 or 443
default:
return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme)
return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme)
}
}
for i, a := range p.SR.Addresses {
scheme, host, port, err := rpknet.SplitSchemeHostPort(a)
if err != nil {
return fmt.Errorf("unable to fix schema registry address %v: %w", a, err)
}
switch scheme {
case "":

p := c.rpkYaml.Profile(c.rpkYaml.CurrentProfile)
if p != nil {
for i, k := range p.KafkaAPI.Brokers {
_, host, port, err := rpknet.SplitSchemeHostPort(k)
if err != nil {
return fmt.Errorf("unable to fix broker address %v: %w", k, err)
}
host = normalizeHost(host)
if port == "" {
port = strconv.Itoa(DefaultSchemaRegPort)
port = strconv.Itoa(DefaultKafkaPort)
}
p.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port)
}
for i, a := range p.AdminAPI.Addresses {
scheme, host, port, err := rpknet.SplitSchemeHostPort(a)
if err != nil {
return fmt.Errorf("unable to fix admin address %v: %w", a, err)
}
host = normalizeHost(host)
switch scheme {
case "":
if port == "" {
port = strconv.Itoa(DefaultAdminPort)
}
p.AdminAPI.Addresses[i] = net.JoinHostPort(host, port)
case "http", "https":
continue // keep whatever port exists; empty ports will default to 80 or 443
default:
return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme)
}
}
for i, a := range p.SR.Addresses {
scheme, host, port, err := rpknet.SplitSchemeHostPort(a)
if err != nil {
return fmt.Errorf("unable to fix schema registry address %v: %w", a, err)
}
host = normalizeHost(host)
switch scheme {
case "":
if port == "" {
port = strconv.Itoa(DefaultSchemaRegPort)
}
p.SR.Addresses[i] = net.JoinHostPort(host, port)
case "http", "https":
continue // keep whatever port exists; empty ports will default to 80 or 443
default:
return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme)
}
p.SR.Addresses[i] = net.JoinHostPort(host, port)
case "http", "https":
continue // keep whatever port exists; empty ports will default to 80 or 443
default:
return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme)
}
}
return nil
}

// normalizeHost remove surrounding brackets if present for ipv6,
// net.JoinHostPort will add them back.
func normalizeHost(host string) string {
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
return host[1 : len(host)-1]
}
return host
}

func (c *Config) addConfigToProfiles() {
for i := range c.rpkYaml.Profiles {
c.rpkYaml.Profiles[i].c = c
Expand Down
177 changes: 177 additions & 0 deletions src/go/rpk/pkg/config/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,3 +1204,180 @@ func TestXSetDefaultsPaths(t *testing.T) {
}
}
}

func TestConfig_fixSchemePorts(t *testing.T) {
tests := []struct {
name string
config Config
expect Config
errMsg string
}{
{
name: "fix missing ports in redpanda.yaml addresses",
config: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"broker1", "broker2:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1", "address2", "address3:8088"},
},
},
},
},
expect: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"broker1:9092", "broker2:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1:9644", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1:8081", "address2:8081", "address3:8088"},
},
},
},
},
},
{
name: "fix missing ports in profile brokers",
config: Config{
rpkYaml: RpkYaml{
CurrentProfile: "default",
Profiles: []RpkProfile{
{
Name: "default",
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"profile-broker1", "profile-broker2:9094"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1", "address2", "address3:8088"},
},
},
},
},
},
expect: Config{
rpkYaml: RpkYaml{
CurrentProfile: "default",
Profiles: []RpkProfile{
{
Name: "default",
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"profile-broker1:9092", "profile-broker2:9094"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1:9644", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1:8081", "address2:8081", "address3:8088"},
},
},
},
},
},
},
{
name: "fix missing ports in redpanda.yaml addresses",
config: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"broker1", "broker2:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1", "address2", "address3:8088"},
},
},
},
},
expect: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"broker1:9092", "broker2:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"address1:9644", "address2:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"address1:8081", "address2:8081", "address3:8088"},
},
},
},
},
},
{
name: "fix missing ports in redpanda.yaml addresses with IPv6",
config: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"[2001:db8::1]", "[2001:db8::2]:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"[2001:db8::3]", "[2001:db8::4]:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"[2001:db8::5]", "[2001:db8::6]", "[2001:db8::7]:8088"},
},
},
},
},
expect: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{"[2001:db8::1]:9092", "[2001:db8::2]:9093"},
},
AdminAPI: RpkAdminAPI{
Addresses: []string{"[2001:db8::3]:9644", "[2001:db8::4]:2222"},
},
SR: RpkSchemaRegistryAPI{
Addresses: []string{"[2001:db8::5]:8081", "[2001:db8::6]:8081", "[2001:db8::7]:8088"},
},
},
},
},
},
{
name: "invalid broker address",
config: Config{
redpandaYaml: RedpandaYaml{
Rpk: RpkNodeConfig{
KafkaAPI: RpkKafkaAPI{
Brokers: []string{":invalid"},
},
},
},
},
expect: Config{},
errMsg: "unable to fix broker address",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.fixSchemePorts()
if tt.errMsg != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.errMsg)
} else {
require.NoError(t, err)
require.Equal(t, tt.expect, tt.config)
}
})
}
}

0 comments on commit 32bc792

Please sign in to comment.