From 077e5d4f7b8baa608d513569a62f1728dc86706b Mon Sep 17 00:00:00 2001 From: FrancoisPoinsot Date: Sat, 30 Mar 2019 14:48:24 +0100 Subject: [PATCH] PR fixes: - clean up config test - clarify comment on batch - rm none - rm naked error - clarify producerList logic --- config_test.go | 30 +++++++++++++++++++++--------- consumer.go | 24 ++++++++++-------------- control_record.go | 2 +- fetch_response.go | 14 +++++++++++++- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/config_test.go b/config_test.go index 53bbe36d8..887bfeb68 100644 --- a/config_test.go +++ b/config_test.go @@ -272,24 +272,36 @@ func TestProducerConfigValidates(t *testing.T) { cfg.Producer.RequiredAcks = WaitForAll }, "Idempotent producer requires Net.MaxOpenRequests to be 1"}, + } + + for i, test := range tests { + c := NewConfig() + test.cfg(c) + if err := c.Validate(); string(err.(ConfigurationError)) != test.err { + t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) + } + } +} +func TestConsumerConfigValidates(t *testing.T) { + tests := []struct { + name string + cfg func(*Config) + err string + }{ {"ReadCommitted Version", func(cfg *Config) { cfg.Version = V0_10_0_0 cfg.Consumer.IsolationLevel = ReadCommitted }, - "ReadCommitted requires Version >= V0_11_0_0"}, - {"ReadCommitted Version", - func(cfg *Config) { - cfg.Version = V0_10_0_0 - cfg.Consumer.IsolationLevel = ReadCommitted - }, - "ReadCommitted requires Version >= V0_11_0_0"}, - {"ReadCommitted Version", + "ReadCommitted requires Version >= V0_11_0_0", + }, + {"Incorrect isolation level", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Consumer.IsolationLevel = IsolationLevel(42) }, - "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted"}, + "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted", + }, } for i, test := range tests { diff --git a/consumer.go b/consumer.go index 8640115ae..e8d62df09 100644 --- a/consumer.go +++ b/consumer.go @@ -3,7 +3,6 @@ package sarama import ( "errors" "fmt" - "sort" "sync" "sync/atomic" "time" @@ -596,15 +595,11 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu child.fetchSize = child.conf.Consumer.Fetch.Default atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) - abortedProducerIDs := make(map[int64]none, len(block.AbortedTransactions)) - - // Load aborted transaction in separate var because we are going to depile this one - abortedTransactions := make([]*AbortedTransaction, len(block.AbortedTransactions)) - copy(abortedTransactions, block.AbortedTransactions) - sort.Slice( - abortedTransactions, - func(i, j int) bool { return abortedTransactions[i].FirstOffset < abortedTransactions[j].FirstOffset }, - ) + // abortedProducerIDs contains producerID which message should be ignored as uncommitted + // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) + // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over + abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions)) + abortedTransactions := block.getAbortedTransactions() messages := []*ConsumerMessage{} for _, records := range block.RecordsSet { @@ -617,12 +612,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu messages = append(messages, messageSetMessages...) case defaultRecords: - for _, abortedTransaction := range abortedTransactions { - if abortedTransaction.FirstOffset > records.RecordBatch.LastOffset() { + // Consume remaining abortedTransaction up to last offset of current batch + for _, txn := range abortedTransactions { + if txn.FirstOffset > records.RecordBatch.LastOffset() { break } - // add aborted transaction to abortedProducer list and depile abortedTransactions - abortedProducerIDs[abortedTransaction.ProducerID] = none{} + abortedProducerIDs[txn.ProducerID] = struct{}{} + // Pop abortedTransactions so that we never add it again abortedTransactions = abortedTransactions[1:] } diff --git a/control_record.go b/control_record.go index d36f51789..b5d79a116 100644 --- a/control_record.go +++ b/control_record.go @@ -17,7 +17,7 @@ type ControlRecord struct { Type ControlRecordType } -func (cr *ControlRecord) decode(key, value packetDecoder) (err error) { +func (cr *ControlRecord) decode(key, value packetDecoder) error { { var err error cr.Version, err = value.getInt16() diff --git a/fetch_response.go b/fetch_response.go index ad4a7b734..3afc18778 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -1,6 +1,7 @@ package sarama import ( + "sort" "time" ) @@ -185,6 +186,17 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) return pe.pop() } +func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction { + // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered + // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself + at := b.AbortedTransactions + sort.Slice( + at, + func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset }, + ) + return at +} + type FetchResponse struct { Blocks map[string]map[int32]*FetchResponseBlock ThrottleTime time.Duration @@ -386,7 +398,7 @@ func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, ke } // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp -// But instead of append a record a batch of record it append a new batch of record of size 1 to a set of batch +// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) { frb := r.getOrCreateBlock(topic, partition)