Skip to content

Commit

Permalink
Add support for decoding versioned responses
Browse files Browse the repository at this point in the history
This will be needed to properly parse kafka's 0.10.0 messages where the version
does not just affect the request format but also the response.
  • Loading branch information
eapache committed Jun 9, 2016
1 parent 50ae3cc commit 37654da
Show file tree
Hide file tree
Showing 48 changed files with 229 additions and 77 deletions.
2 changes: 1 addition & 1 deletion api_versions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion api_versions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -72,3 +72,11 @@ func (r *ApiVersionsResponse) decode(pd packetDecoder) error {

return nil
}

func (r *ApiVersionsResponse) key() int16 {
return 18
}

func (r *ApiVersionsResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion api_versions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestApiVersionsResponse(t *testing.T) {
var response *ApiVersionsResponse

response = new(ApiVersionsResponse)
testDecodable(t, "no error", response, apiVersionResponse)
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
Expand Down
6 changes: 3 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()

Expand Down Expand Up @@ -355,7 +355,7 @@ func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, e
return &promise, nil
}

func (b *Broker) sendAndReceive(req requestBody, res decoder) error {
func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
promise, err := b.send(req, res != nil)

if err != nil {
Expand All @@ -368,7 +368,7 @@ func (b *Broker) sendAndReceive(req requestBody, res decoder) error {

select {
case buf := <-promise.packets:
return decode(buf, res)
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
Expand Down
2 changes: 1 addition & 1 deletion consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
return pe.putString(r.ConsumerGroup)
}

func (r *ConsumerMetadataRequest) decode(pd packetDecoder) (err error) {
func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.ConsumerGroup, err = pd.getString()
return err
}
Expand Down
10 changes: 9 additions & 1 deletion consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type ConsumerMetadataResponse struct {
CoordinatorPort int32 // deprecated: use Coordinator.Addr()
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
Expand Down Expand Up @@ -71,3 +71,11 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
pe.putInt32(r.CoordinatorPort)
return nil
}

func (r *ConsumerMetadataResponse) key() int16 {
return 10
}

func (r *ConsumerMetadataResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Groups, err = pd.getStringArray()
return
}
Expand Down
10 changes: 9 additions & 1 deletion describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
return nil
}

func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -35,6 +35,14 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
return nil
}

func (r *DescribeGroupsResponse) key() int16 {
return 15
}

func (r *DescribeGroupsResponse) version() int16 {
return 0
}

type GroupDescription struct {
Err KError
GroupId string
Expand Down
4 changes: 2 additions & 2 deletions describe_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func TestDescribeGroupsResponse(t *testing.T) {
var response *DescribeGroupsResponse

response = new(DescribeGroupsResponse)
testDecodable(t, "empty", response, describeGroupsResponseEmpty)
testVersionDecodable(t, "empty", response, describeGroupsResponseEmpty, 0)
if len(response.Groups) != 0 {
t.Error("Expected no groups")
}

response = new(DescribeGroupsResponse)
testDecodable(t, "populated", response, describeGroupsResponsePopulated)
testVersionDecodable(t, "populated", response, describeGroupsResponsePopulated, 0)
if len(response.Groups) != 2 {
t.Error("Expected two groups")
}
Expand Down
22 changes: 22 additions & 0 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type decoder interface {
decode(pd packetDecoder) error
}

type versionedDecoder interface {
decode(pd packetDecoder, version int16) error
}

// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
Expand All @@ -60,3 +64,21 @@ func decode(buf []byte, in decoder) error {

return nil
}

func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
err := in.decode(&helper, version)
if err != nil {
return err
}

if helper.off != len(buf) {
return PacketDecodingError{"invalid length"}
}

return nil
}
2 changes: 1 addition & 1 deletion fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
return nil
}

func (f *FetchRequest) decode(pd packetDecoder) (err error) {
func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if _, err = pd.getInt32(); err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
return pe.pop()
}

func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -116,6 +116,14 @@ func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
return nil
}

func (r *FetchResponse) key() int16 {
return 1
}

func (r *FetchResponse) version() int16 {
return 0
}

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
if fr.Blocks == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (

func TestEmptyFetchResponse(t *testing.T) {
response := FetchResponse{}
testDecodable(t, "empty", &response, emptyFetchResponse)
testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)

if len(response.Blocks) != 0 {
t.Error("Decoding produced topic blocks where there were none.")
Expand All @@ -40,7 +40,7 @@ func TestEmptyFetchResponse(t *testing.T) {

func TestOneMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testDecodable(t, "one message", &response, oneMessageFetchResponse)
testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
Expand Down
2 changes: 1 addition & 1 deletion heartbeat_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error {
return nil
}

func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) {
func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion heartbeat_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error {
return nil
}

func (r *HeartbeatResponse) decode(pd packetDecoder) error {
func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -18,3 +18,11 @@ func (r *HeartbeatResponse) decode(pd packetDecoder) error {

return nil
}

func (r *HeartbeatResponse) key() int16 {
return 12
}

func (r *HeartbeatResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion heartbeat_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestHeartbeatResponse(t *testing.T) {
var response *HeartbeatResponse

response = new(HeartbeatResponse)
testDecodable(t, "no error", response, heartbeatResponseNoError)
testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error {
return nil
}

func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error {
return nil
}

func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand Down Expand Up @@ -100,3 +100,11 @@ func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *JoinGroupResponse) key() int16 {
return 11
}

func (r *JoinGroupResponse) version() int16 {
return 0
}
6 changes: 3 additions & 3 deletions join_group_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestJoinGroupResponse(t *testing.T) {
var response *JoinGroupResponse

response = new(JoinGroupResponse)
testDecodable(t, "no error", response, joinGroupResponseNoError)
testVersionDecodable(t, "no error", response, joinGroupResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding Err failed: no error expected but found", response.Err)
}
Expand All @@ -58,7 +58,7 @@ func TestJoinGroupResponse(t *testing.T) {
}

response = new(JoinGroupResponse)
testDecodable(t, "with error", response, joinGroupResponseWithError)
testVersionDecodable(t, "with error", response, joinGroupResponseWithError, 0)
if response.Err != ErrInconsistentGroupProtocol {
t.Error("Decoding Err failed: ErrInconsistentGroupProtocol expected but found", response.Err)
}
Expand All @@ -76,7 +76,7 @@ func TestJoinGroupResponse(t *testing.T) {
}

response = new(JoinGroupResponse)
testDecodable(t, "with error", response, joinGroupResponseLeader)
testVersionDecodable(t, "with error", response, joinGroupResponseLeader, 0)
if response.Err != ErrNoError {
t.Error("Decoding Err failed: ErrNoError expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion leave_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (r *LeaveGroupRequest) encode(pe packetEncoder) error {
return nil
}

func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) {
func (r *LeaveGroupRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion leave_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func (r *LeaveGroupResponse) encode(pe packetEncoder) error {
return nil
}

func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {
func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -18,3 +18,11 @@ func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *LeaveGroupResponse) key() int16 {
return 13
}

func (r *LeaveGroupResponse) version() int16 {
return 0
}
4 changes: 2 additions & 2 deletions leave_group_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ func TestLeaveGroupResponse(t *testing.T) {
var response *LeaveGroupResponse

response = new(LeaveGroupResponse)
testDecodable(t, "no error", response, leaveGroupResponseNoError)
testVersionDecodable(t, "no error", response, leaveGroupResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}

response = new(LeaveGroupResponse)
testDecodable(t, "with error", response, leaveGroupResponseWithError)
testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0)
if response.Err != ErrUnknownMemberId {
t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion list_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) {
func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}

Expand Down
Loading

0 comments on commit 37654da

Please sign in to comment.