Skip to content

combined PR: metadata v13, proprietary API keys, limit API versions, deterministic listeners #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func initFlags() {
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")

Expand Down
7 changes: 6 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ var (
Version = "unknown"
)

type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)

type ListenerConfig struct {
BrokerAddress string
ListenerAddress string
AdvertisedAddress string
}
type IdListenerConfig struct {
BrokerAddress string
Listener net.Listener
}
type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down Expand Up @@ -74,6 +78,7 @@ type Config struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DeterministicListeners bool
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
Expand Down
4 changes: 2 additions & 2 deletions proxy/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const (
apiKeySaslHandshake = int16(17)
apiKeyApiApiVersions = int16(18)

minRequestApiKey = int16(0) // 0 - Produce
maxRequestApiKey = int16(120) // so far 67 is the last (reserve some for the feature)
minRequestApiKey = int16(0) // 0 - Produce
maxRequestApiKey = int16(20000) // so far 67 is the last (reserve some for the feature)
)

var (
Expand Down
1 change: 1 addition & 0 deletions proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
return false, nil, err
}

// Update proxy/protocol/responses.go apiKeyProduceMaxVersion when adding new Produce version support
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
// CorrelationID + ClientID
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions proxy/processor_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxy
import (
"bytes"
"encoding/hex"
"testing"
"time"

"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestHandleRequest(t *testing.T) {
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
}

func TestHandleResponse(t *testing.T) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" {
switch brokerPort {
case 19092:
Expand Down
5 changes: 3 additions & 2 deletions proxy/protocol/request_key_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func (r *RequestKeyVersion) ResponseHeaderVersion() int16 {
case 87: // ReadShareGroupStateSummary
return 1
default:
// throw new UnsupportedVersionException("Unsupported API key " + apiKey);
return -1
// previously; throw new UnsupportedVersionException("Unsupported API key " + apiKey);
// now, assume 'unknown' api keys are all flexible
return 1
}
}
163 changes: 160 additions & 3 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,29 @@ package protocol
import (
"errors"
"fmt"
"math"

"github.com/grepplabs/kafka-proxy/config"
)

const (
apiKeyProduce = 0
apiKeyMetadata = 3
apiKeyFindCoordinator = 10
apiKeyApiVersions = 18

// Update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy
apiKeyApiVersionsMaxVersion = 4
apiKeyMetadataMaxVersion = 13
apiKeyFindCoordinatorMaxVersion = 6
// produce requests are parsed by proxy/processor_default.go mustReply()
apiKeyProduceMaxVersion = 11

brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"
apiKeysKeyname = "api_keys"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand All @@ -22,6 +34,7 @@ const (
var (
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions()
)

func createMetadataResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -243,7 +256,33 @@ func createMetadataResponseSchemaVersions() []Schema {
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12}
metadataResponseV13 := NewSchema("metadata_response_v13",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9},
&Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12},
&SchemaTaggedFields{Name: "response_tagged_fields"},
&Mfield{Name: "error_code", Ty: TypeInt16},
)

// Update apiKeyMetadataMaxVersion when adding new versions
return []Schema{
metadataResponseV0,
metadataResponseV1,
metadataResponseV2,
metadataResponseV3,
metadataResponseV4,
metadataResponseV5,
metadataResponseV6,
metadataResponseV7,
metadataResponseV8,
metadataResponseV9,
metadataResponseV10,
metadataResponseV11,
metadataResponseV12,
metadataResponseV13,
}
}

func createFindCoordinatorResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -296,9 +335,117 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorResponseV5 := findCoordinatorResponseV4
findCoordinatorResponseV6 := findCoordinatorResponseV5

// Update apiKeyFindCoordinatorMaxVersion when adding new versions
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func createApiVersionsResponseSchemaVersions() []Schema {
apiVersionKeyV0 := NewSchema("api_versions_key_v0",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
)

apiVersionSchemaV3 := NewSchema("api_versions_key_schema3",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
&SchemaTaggedFields{"api_versions_tagged_fields"},
)

apiVersionsResponseV0 := NewSchema("api_versions_response_v0",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
)

// Version 1 adds throttle time to the response.
apiVersionsResponseV1 := NewSchema("api_versions_response_v1",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
)

// Starting in version 2, on quota violation, brokers send out responses before throttling.
apiVersionsResponseV2 := apiVersionsResponseV1

// Version 3 is the first flexible version. Tagged fields are only supported in the body but
// not in the header. The length of the header must not change in order to guarantee the
// backward compatibility.
//
// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
apiVersionsResponseV3 := NewSchema("api_versions_response_v3",
&Mfield{Name: "error_code", Ty: TypeInt16},
&CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
apiVersionsResponseV4 := apiVersionsResponseV3

// Update apiKeyApiVersionsMaxVersion when adding new versions
return []Schema{
apiVersionsResponseV0,
apiVersionsResponseV1,
apiVersionsResponseV2,
apiVersionsResponseV3,
apiVersionsResponseV4,
}
}

func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
}
if fn == nil {
return errors.New("net address mapper must not be nil")
}
apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{})
if !ok {
return errors.New("api versions not found")
}
for _, apiVersionElement := range apiVersionsArray {
apiVersion := apiVersionElement.(*Struct)
apiKey, ok := apiVersion.Get("api_key").(int16)
if !ok {
return errors.New("api_keys.api_key not found")
}
maxVersion, ok := apiVersion.Get("max_version").(int16)
if !ok {
return errors.New("api_keys.max_version not found")
}

limitVersion := int16(math.MaxInt16)
switch apiKey {
case apiKeyProduce:
if maxVersion > apiKeyProduceMaxVersion {
limitVersion = apiKeyProduceMaxVersion
}
case apiKeyMetadata:
if maxVersion > apiKeyMetadataMaxVersion {
limitVersion = apiKeyMetadataMaxVersion
}
case apiKeyFindCoordinator:
if maxVersion > apiKeyFindCoordinatorMaxVersion {
limitVersion = apiKeyFindCoordinatorMaxVersion
}
case apiKeyApiVersions:
if maxVersion > apiKeyApiVersionsMaxVersion {
limitVersion = apiKeyApiVersionsMaxVersion
}
}
if maxVersion > limitVersion {
err := apiVersion.Replace("max_version", limitVersion)
if err != nil {
return err
}
}
}

return nil
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
Expand All @@ -320,12 +467,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
if !ok {
return errors.New("broker.port not found")
}
nodeId, ok := broker.Get(nodeKeyName).(int32)
if !ok {
return errors.New("broker.node_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,12 +534,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
if !ok {
return errors.New("coordinator.port not found")
}
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
if !ok {
return errors.New("coordinator.node_id not found")
}

if host == "" && port <= 0 {
return nil
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down Expand Up @@ -437,6 +592,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
case apiKeyFindCoordinator:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
case apiKeyApiVersions:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse)
default:
return nil, nil
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package protocol
import (
"encoding/hex"
"fmt"
"github.com/google/uuid"
"reflect"
"strings"
"testing"

"github.com/google/uuid"

"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -20,7 +21,7 @@ var (
// topic_metadata
0x00, 0x00, 0x00, 0x00}

testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "myhost1", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand All @@ -31,7 +32,7 @@ var (
return "", 0, errors.New("unexpected data")
}

testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 19092 {
return "myhost1", 34001, nil
} else if brokerHost == "localhost" && brokerPort == 29092 {
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "azure.microsoft.com", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand Down
Loading