diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index f5811c7a..dccfdea7 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -89,6 +89,7 @@ func initFlags() { Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))") Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started") Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment") + Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).") Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.") Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.") diff --git a/config/config.go b/config/config.go index d8f958fa..9a15b6f6 100644 --- a/config/config.go +++ b/config/config.go @@ -22,13 +22,17 @@ var ( Version = "unknown" ) -type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) +type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) type ListenerConfig struct { BrokerAddress string ListenerAddress string AdvertisedAddress string } +type IdListenerConfig struct { + BrokerAddress string + Listener net.Listener +} type DialAddressMapping struct { SourceAddress string DestinationAddress string @@ -74,6 +78,7 @@ type Config struct { DefaultListenerIP string BootstrapServers []ListenerConfig ExternalServers []ListenerConfig + DeterministicListeners bool DialAddressMappings []DialAddressMapping DisableDynamicListeners bool DynamicAdvertisedListener string diff --git a/proxy/processor.go b/proxy/processor.go index 1f61bf77..80ec71df 100644 --- a/proxy/processor.go +++ b/proxy/processor.go @@ -20,8 +20,8 @@ const ( apiKeySaslHandshake = int16(17) apiKeyApiApiVersions = int16(18) - minRequestApiKey = int16(0) // 0 - Produce - maxRequestApiKey = int16(120) // so far 67 is the last (reserve some for the feature) + minRequestApiKey = int16(0) // 0 - Produce + maxRequestApiKey = int16(20000) // so far 67 is the last (reserve some for the feature) ) var ( diff --git a/proxy/processor_default.go b/proxy/processor_default.go index fc243514..db700e13 100644 --- a/proxy/processor_default.go +++ b/proxy/processor_default.go @@ -159,6 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ return false, nil, err } + // Update proxy/protocol/responses.go apiKeyProduceMaxVersion when adding new Produce version support case 3, 4, 5, 6, 7, 8, 9, 10, 11: // CorrelationID + ClientID if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil { diff --git a/proxy/processor_default_test.go b/proxy/processor_default_test.go index f2720f4a..cc31e58a 100644 --- a/proxy/processor_default_test.go +++ b/proxy/processor_default_test.go @@ -3,11 +3,12 @@ package proxy import ( "bytes" "encoding/hex" + "testing" + "time" + "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestHandleRequest(t *testing.T) { @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) { } func TestHandleResponse(t *testing.T) { - netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" { switch brokerPort { case 19092: diff --git a/proxy/protocol/request_key_version.go b/proxy/protocol/request_key_version.go index d03ee888..ac2d2de0 100644 --- a/proxy/protocol/request_key_version.go +++ b/proxy/protocol/request_key_version.go @@ -387,7 +387,8 @@ func (r *RequestKeyVersion) ResponseHeaderVersion() int16 { case 87: // ReadShareGroupStateSummary return 1 default: - // throw new UnsupportedVersionException("Unsupported API key " + apiKey); - return -1 + // previously; throw new UnsupportedVersionException("Unsupported API key " + apiKey); + // now, assume 'unknown' api keys are all flexible + return 1 } } diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index fcd5cb65..95019c20 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -3,17 +3,29 @@ package protocol import ( "errors" "fmt" + "math" "github.com/grepplabs/kafka-proxy/config" ) const ( + apiKeyProduce = 0 apiKeyMetadata = 3 apiKeyFindCoordinator = 10 + apiKeyApiVersions = 18 + + // Update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy + apiKeyApiVersionsMaxVersion = 4 + apiKeyMetadataMaxVersion = 13 + apiKeyFindCoordinatorMaxVersion = 6 + // produce requests are parsed by proxy/processor_default.go mustReply() + apiKeyProduceMaxVersion = 11 brokersKeyName = "brokers" hostKeyName = "host" portKeyName = "port" + nodeKeyName = "node_id" + apiKeysKeyname = "api_keys" coordinatorKeyName = "coordinator" coordinatorsKeyName = "coordinators" @@ -22,6 +34,7 @@ const ( var ( metadataResponseSchemaVersions = createMetadataResponseSchemaVersions() findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions() + apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions() ) func createMetadataResponseSchemaVersions() []Schema { @@ -243,7 +256,33 @@ func createMetadataResponseSchemaVersions() []Schema { &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12} + metadataResponseV13 := NewSchema("metadata_response_v13", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9}, + &Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + ) + + // Update apiKeyMetadataMaxVersion when adding new versions + return []Schema{ + metadataResponseV0, + metadataResponseV1, + metadataResponseV2, + metadataResponseV3, + metadataResponseV4, + metadataResponseV5, + metadataResponseV6, + metadataResponseV7, + metadataResponseV8, + metadataResponseV9, + metadataResponseV10, + metadataResponseV11, + metadataResponseV12, + metadataResponseV13, + } } func createFindCoordinatorResponseSchemaVersions() []Schema { @@ -296,9 +335,117 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { findCoordinatorResponseV5 := findCoordinatorResponseV4 findCoordinatorResponseV6 := findCoordinatorResponseV5 + // Update apiKeyFindCoordinatorMaxVersion when adding new versions return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6} } +func createApiVersionsResponseSchemaVersions() []Schema { + apiVersionKeyV0 := NewSchema("api_versions_key_v0", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + ) + + apiVersionSchemaV3 := NewSchema("api_versions_key_schema3", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + &SchemaTaggedFields{"api_versions_tagged_fields"}, + ) + + apiVersionsResponseV0 := NewSchema("api_versions_response_v0", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + ) + + // Version 1 adds throttle time to the response. + apiVersionsResponseV1 := NewSchema("api_versions_response_v1", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + ) + + // Starting in version 2, on quota violation, brokers send out responses before throttling. + apiVersionsResponseV2 := apiVersionsResponseV1 + + // Version 3 is the first flexible version. Tagged fields are only supported in the body but + // not in the header. The length of the header must not change in order to guarantee the + // backward compatibility. + // + // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported + // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. + apiVersionsResponseV3 := NewSchema("api_versions_response_v3", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0. + apiVersionsResponseV4 := apiVersionsResponseV3 + + // Update apiKeyApiVersionsMaxVersion when adding new versions + return []Schema{ + apiVersionsResponseV0, + apiVersionsResponseV1, + apiVersionsResponseV2, + apiVersionsResponseV3, + apiVersionsResponseV4, + } +} + +func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { + if decodedStruct == nil { + return errors.New("decoded struct must not be nil") + } + if fn == nil { + return errors.New("net address mapper must not be nil") + } + apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{}) + if !ok { + return errors.New("api versions not found") + } + for _, apiVersionElement := range apiVersionsArray { + apiVersion := apiVersionElement.(*Struct) + apiKey, ok := apiVersion.Get("api_key").(int16) + if !ok { + return errors.New("api_keys.api_key not found") + } + maxVersion, ok := apiVersion.Get("max_version").(int16) + if !ok { + return errors.New("api_keys.max_version not found") + } + + limitVersion := int16(math.MaxInt16) + switch apiKey { + case apiKeyProduce: + if maxVersion > apiKeyProduceMaxVersion { + limitVersion = apiKeyProduceMaxVersion + } + case apiKeyMetadata: + if maxVersion > apiKeyMetadataMaxVersion { + limitVersion = apiKeyMetadataMaxVersion + } + case apiKeyFindCoordinator: + if maxVersion > apiKeyFindCoordinatorMaxVersion { + limitVersion = apiKeyFindCoordinatorMaxVersion + } + case apiKeyApiVersions: + if maxVersion > apiKeyApiVersionsMaxVersion { + limitVersion = apiKeyApiVersionsMaxVersion + } + } + if maxVersion > limitVersion { + err := apiVersion.Replace("max_version", limitVersion) + if err != nil { + return err + } + } + } + + return nil +} + func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { if decodedStruct == nil { return errors.New("decoded struct must not be nil") @@ -320,12 +467,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu if !ok { return errors.New("broker.port not found") } + nodeId, ok := broker.Get(nodeKeyName).(int32) + if !ok { + return errors.New("broker.node_id not found") + } if host == "" && port <= 0 { continue } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } @@ -383,12 +534,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e if !ok { return errors.New("coordinator.port not found") } + nodeId, ok := coordinator.Get(nodeKeyName).(int32) + if !ok { + return errors.New("coordinator.node_id not found") + } if host == "" && port <= 0 { return nil } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } @@ -437,6 +592,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse) case apiKeyFindCoordinator: return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse) + case apiKeyApiVersions: + return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse) default: return nil, nil } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index c2ef822d..549e9737 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -3,11 +3,12 @@ package protocol import ( "encoding/hex" "fmt" - "github.com/google/uuid" "reflect" "strings" "testing" + "github.com/google/uuid" + "github.com/grepplabs/kafka-proxy/config" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -20,7 +21,7 @@ var ( // topic_metadata 0x00, 0x00, 0x00, 0x00} - testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "myhost1", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { @@ -31,7 +32,7 @@ var ( return "", 0, errors.New("unexpected data") } - testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 19092 { return "myhost1", 34001, nil } else if brokerHost == "localhost" && brokerPort == 29092 { @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) { a.Nil(err) a.Equal(bytes, resp) - modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "azure.microsoft.com", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { diff --git a/proxy/proxy.go b/proxy/proxy.go index 203635e4..7114a780 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -25,11 +25,13 @@ type Listeners struct { listenFunc ListenFunc + deterministicListeners bool disableDynamicListeners bool dynamicSequentialMinPort int - brokerToListenerConfig map[string]config.ListenerConfig - lock sync.RWMutex + brokerToListenerConfig map[string]config.ListenerConfig + brokerIdToIdListenerConfig map[int32]config.IdListenerConfig + lock sync.RWMutex } func NewListeners(cfg *config.Config) (*Listeners, error) { @@ -64,15 +66,19 @@ func NewListeners(cfg *config.Config) (*Listeners, error) { return nil, err } + brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig) + return &Listeners{ - defaultListenerIP: defaultListenerIP, - dynamicAdvertisedListener: dynamicAdvertisedListener, - connSrc: make(chan Conn, 1), - brokerToListenerConfig: brokerToListenerConfig, - tcpConnOptions: tcpConnOptions, - listenFunc: listenFunc, - disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, - dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, + defaultListenerIP: defaultListenerIP, + dynamicAdvertisedListener: dynamicAdvertisedListener, + connSrc: make(chan Conn, 1), + brokerToListenerConfig: brokerToListenerConfig, + brokerIdToIdListenerConfig: brokerIdToIdListenerConfig, + tcpConnOptions: tcpConnOptions, + listenFunc: listenFunc, + deterministicListeners: cfg.Proxy.DeterministicListeners, + disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, + dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, }, nil } @@ -117,7 +123,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo return brokerToListenerConfig, nil } -func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { +func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "" || brokerPort <= 0 { return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort) } @@ -126,6 +132,7 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l p.lock.RLock() listenerConfig, ok := p.brokerToListenerConfig[brokerAddress] + idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId] p.lock.RUnlock() if ok { @@ -133,13 +140,25 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l return util.SplitHostPort(listenerConfig.AdvertisedAddress) } if !p.disableDynamicListeners { + if brokerIdFound { + logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId) + // Existing broker ID found, but with a different upstream broker + // Close existing listener, remove two mappings: + // * ID to removed upstream broker + // * removed upstream broker + idListenerConfig.Listener.Close() + p.lock.Lock() + delete(p.brokerIdToIdListenerConfig, brokerId) + delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress) + p.lock.Unlock() + } logrus.Infof("Starting dynamic listener for broker %s", brokerAddress) - return p.ListenDynamicInstance(brokerAddress) + return p.ListenDynamicInstance(brokerAddress, brokerId) } return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort) } -func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) { +func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) { p.lock.Lock() defer p.lock.Unlock() // double check @@ -147,9 +166,15 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, return util.SplitHostPort(v.AdvertisedAddress) } - defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort)) - if p.dynamicSequentialMinPort != 0 { - p.dynamicSequentialMinPort += 1 + var defaultListenerAddress string + + if p.deterministicListeners { + defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId))) + } else { + defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort)) + if p.dynamicSequentialMinPort != 0 { + p.dynamicSequentialMinPort += 1 + } } cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress} @@ -167,6 +192,7 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port)) p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress} + p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l} logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress) diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 0bcb8515..13cf3568 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2,9 +2,10 @@ package proxy import ( "fmt" + "testing" + "github.com/grepplabs/kafka-proxy/config" "github.com/stretchr/testify/assert" - "testing" ) func TestGetBrokerToListenerConfig(t *testing.T) { @@ -24,7 +25,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -38,9 +43,21 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{}, nil, @@ -64,8 +81,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -79,8 +104,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32401", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32401 0.0.0.0:32400} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -88,8 +121,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32401", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32401} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -97,13 +138,31 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32403", "kafka-proxy-0:32403", "kafka-proxy-0:32403"}, - {"192.168.99.100:32404", "kafka-proxy-0:32404", "kafka-proxy-0:32404"}, + { + BrokerAddress: "192.168.99.100:32403", + ListenerAddress: "kafka-proxy-0:32403", + AdvertisedAddress: "kafka-proxy-0:32403"}, + { + BrokerAddress: "192.168.99.100:32404", + ListenerAddress: "kafka-proxy-0:32404", + AdvertisedAddress: "kafka-proxy-0:32404"}, }, nil, map[string]config.ListenerConfig{ @@ -136,10 +195,18 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, nil, map[string]config.ListenerConfig{ @@ -152,10 +219,18 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-1:32400", "kafka-proxy-1:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-1:32400", + AdvertisedAddress: "kafka-proxy-1:32400", + }, }, fmt.Errorf("bootstrap and external server mappings 192.168.99.100:32400 with different advertised addresses: kafka-proxy-1:32400 and kafka-proxy-0:32400"), nil, @@ -163,7 +238,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping has different listener and advertised addresses {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32401}"), nil, @@ -171,8 +250,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32400", "kafka-proxy-0:32401", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping 192.168.99.100:32400 configured twice: kafka-proxy-0:32401 and {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32400}"), nil,