Skip to content

Commit

Permalink
PR fixes:
Browse files Browse the repository at this point in the history
- clean up config test
- clarify comment on batch
- rm none
- rm naked error
- clarify producerList logic
  • Loading branch information
FrancoisPoinsot committed Mar 30, 2019
1 parent 6b86562 commit 077e5d4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
30 changes: 21 additions & 9 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,36 @@ func TestProducerConfigValidates(t *testing.T) {
cfg.Producer.RequiredAcks = WaitForAll
},
"Idempotent producer requires Net.MaxOpenRequests to be 1"},
}

for i, test := range tests {
c := NewConfig()
test.cfg(c)
if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
}
}
}
func TestConsumerConfigValidates(t *testing.T) {
tests := []struct {
name string
cfg func(*Config)
err string
}{
{"ReadCommitted Version",
func(cfg *Config) {
cfg.Version = V0_10_0_0
cfg.Consumer.IsolationLevel = ReadCommitted
},
"ReadCommitted requires Version >= V0_11_0_0"},
{"ReadCommitted Version",
func(cfg *Config) {
cfg.Version = V0_10_0_0
cfg.Consumer.IsolationLevel = ReadCommitted
},
"ReadCommitted requires Version >= V0_11_0_0"},
{"ReadCommitted Version",
"ReadCommitted requires Version >= V0_11_0_0",
},
{"Incorrect isolation level",
func(cfg *Config) {
cfg.Version = V0_11_0_0
cfg.Consumer.IsolationLevel = IsolationLevel(42)
},
"Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted"},
"Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
},
}

for i, test := range tests {
Expand Down
24 changes: 10 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sarama
import (
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -596,15 +595,11 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

abortedProducerIDs := make(map[int64]none, len(block.AbortedTransactions))

// Load aborted transaction in separate var because we are going to depile this one
abortedTransactions := make([]*AbortedTransaction, len(block.AbortedTransactions))
copy(abortedTransactions, block.AbortedTransactions)
sort.Slice(
abortedTransactions,
func(i, j int) bool { return abortedTransactions[i].FirstOffset < abortedTransactions[j].FirstOffset },
)
// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
abortedTransactions := block.getAbortedTransactions()

messages := []*ConsumerMessage{}
for _, records := range block.RecordsSet {
Expand All @@ -617,12 +612,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

messages = append(messages, messageSetMessages...)
case defaultRecords:
for _, abortedTransaction := range abortedTransactions {
if abortedTransaction.FirstOffset > records.RecordBatch.LastOffset() {
// Consume remaining abortedTransaction up to last offset of current batch
for _, txn := range abortedTransactions {
if txn.FirstOffset > records.RecordBatch.LastOffset() {
break
}
// add aborted transaction to abortedProducer list and depile abortedTransactions
abortedProducerIDs[abortedTransaction.ProducerID] = none{}
abortedProducerIDs[txn.ProducerID] = struct{}{}
// Pop abortedTransactions so that we never add it again
abortedTransactions = abortedTransactions[1:]
}

Expand Down
2 changes: 1 addition & 1 deletion control_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ControlRecord struct {
Type ControlRecordType
}

func (cr *ControlRecord) decode(key, value packetDecoder) (err error) {
func (cr *ControlRecord) decode(key, value packetDecoder) error {
{
var err error
cr.Version, err = value.getInt16()
Expand Down
14 changes: 13 additions & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"sort"
"time"
)

Expand Down Expand Up @@ -185,6 +186,17 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
return pe.pop()
}

func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
// I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
// plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
at := b.AbortedTransactions
sort.Slice(
at,
func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
)
return at
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Expand Down Expand Up @@ -386,7 +398,7 @@ func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, ke
}

// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
// But instead of append a record a batch of record it append a new batch of record of size 1 to a set of batch
// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
frb := r.getOrCreateBlock(topic, partition)
Expand Down

0 comments on commit 077e5d4

Please sign in to comment.