Skip to content

Commit

Permalink
Fix race condition in LiKafkaInstrumentedProducerImpl (#189)
Browse files Browse the repository at this point in the history
* Fix race condition in LiKafkaInstrumentedProducerImpl when two threads calling close in send callback

Co-authored-by: Ke Hu <[email protected]>
  • Loading branch information
kehuum and Ke Hu authored Sep 3, 2020
1 parent da22836 commit d0471d7
Showing 1 changed file with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class LiKafkaInstrumentedProducerImpl<K, V> implements DelegatingProducer

private final long initialConnectionTimeoutMs;
private final ReentrantReadWriteLock delegateLock = new ReentrantReadWriteLock();
private final Object closeLock = new Object();
private final Properties baseConfig;
private final Map<String, String> libraryVersions;
private final ProducerFactory<K, V> producerFactory;
Expand Down Expand Up @@ -460,36 +459,39 @@ private boolean proceedClosing() {
if (isClosed()) {
return false;
}
synchronized (closeLock) {
if (isClosed()) {
return false; //someone beat us to it
}
int holds = delegateLock.getReadHoldCount(); //this is for our thread
ReentrantReadWriteLock.ReadLock readLock = delegateLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = delegateLock.writeLock();
if (holds > 0) { //do we own a read lock ?
for (int i = 0; i < holds; i++) {
readLock.unlock();
}
//at this point we no longer hold a read lock, but any number of other
//readers/writers may slip past us

// release read lock held by current thread if any
// There are use cases of closing the producer in send callback if send fails, we need to ensure
// the same ordering of acquiring/releasing locks, in this case:
// 1. send will get the Read lock
// 2. close will first release all Read locks; then grab the Write lock
final int holds = delegateLock.getReadHoldCount();
ReentrantReadWriteLock.ReadLock readLock = delegateLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = delegateLock.writeLock();
if (holds > 0) { //do we own a read lock ?
for (int i = 0; i < holds; i++) {
readLock.unlock();
}
//at this point we no longer hold a read lock, but any number of other
//readers/writers may slip past us. That's fine in dual close case since they will
//always first try to release the read locks they own.
}

try {
writeLock.lock(); //wait for a write lock
try {
writeLock.lock(); //wait for a write lock
try {
if (isClosed()) {
return false; //some other writer may have beaten us again
}
closedAt = System.currentTimeMillis();
return true;
} finally {
writeLock.unlock();
if (isClosed()) {
return false; //some other writer may have beaten us again
}
closedAt = System.currentTimeMillis();
return true;
} finally {
if (holds > 0) { //restore our read lock holds (if we had any)
for (int i = 0; i < holds; i++) {
readLock.lock();
}
writeLock.unlock();
}
} finally {
if (holds > 0) { //restore our read lock holds (if we had any)
for (int i = 0; i < holds; i++) {
readLock.lock();
}
}
}
Expand Down

0 comments on commit d0471d7

Please sign in to comment.