Skip to content

Commit

Permalink
KAFKA-15771: fix concurrency bug in ProduceRequest#partitionSizes() (a…
Browse files Browse the repository at this point in the history
…pache#14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <[email protected]>, hudeqi <[email protected]>, vamossagar12 <[email protected]>

--------------------------------
Co-authored-by: fangxiaobing <[email protected]>
  • Loading branch information
fxbing and fangxiaobing authored Nov 7, 2023
1 parent febf0fb commit 2d07e57
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ Map<TopicPartition, Integer> partitionSizes() {
// this method may be called by different thread (see the comment on data)
synchronized (this) {
if (partitionSizes == null) {
partitionSizes = new HashMap<>();
Map<TopicPartition, Integer> tmpPartitionSizes = new HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
partitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
tmpPartitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
(ignored, previousValue) ->
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue))
)
);
partitionSizes = tmpPartitionSizes;
}
}
}
Expand Down

0 comments on commit 2d07e57

Please sign in to comment.