diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java index bca154f..c686c61 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java @@ -57,7 +57,6 @@ public class LiKafkaInstrumentedProducerImpl implements DelegatingProducer private final long initialConnectionTimeoutMs; private final ReentrantReadWriteLock delegateLock = new ReentrantReadWriteLock(); - private final Object closeLock = new Object(); private final Properties baseConfig; private final Map libraryVersions; private final ProducerFactory producerFactory; @@ -460,36 +459,39 @@ private boolean proceedClosing() { if (isClosed()) { return false; } - synchronized (closeLock) { - if (isClosed()) { - return false; //someone beat us to it - } - int holds = delegateLock.getReadHoldCount(); //this is for our thread - ReentrantReadWriteLock.ReadLock readLock = delegateLock.readLock(); - ReentrantReadWriteLock.WriteLock writeLock = delegateLock.writeLock(); - if (holds > 0) { //do we own a read lock ? - for (int i = 0; i < holds; i++) { - readLock.unlock(); - } - //at this point we no longer hold a read lock, but any number of other - //readers/writers may slip past us + + // release read lock held by current thread if any + // There are use cases of closing the producer in send callback if send fails, we need to ensure + // the same ordering of acquiring/releasing locks, in this case: + // 1. send will get the Read lock + // 2. close will first release all Read locks; then grab the Write lock + final int holds = delegateLock.getReadHoldCount(); + ReentrantReadWriteLock.ReadLock readLock = delegateLock.readLock(); + ReentrantReadWriteLock.WriteLock writeLock = delegateLock.writeLock(); + if (holds > 0) { //do we own a read lock ? + for (int i = 0; i < holds; i++) { + readLock.unlock(); } + //at this point we no longer hold a read lock, but any number of other + //readers/writers may slip past us. That's fine in dual close case since they will + //always first try to release the read locks they own. + } + + try { + writeLock.lock(); //wait for a write lock try { - writeLock.lock(); //wait for a write lock - try { - if (isClosed()) { - return false; //some other writer may have beaten us again - } - closedAt = System.currentTimeMillis(); - return true; - } finally { - writeLock.unlock(); + if (isClosed()) { + return false; //some other writer may have beaten us again } + closedAt = System.currentTimeMillis(); + return true; } finally { - if (holds > 0) { //restore our read lock holds (if we had any) - for (int i = 0; i < holds; i++) { - readLock.lock(); - } + writeLock.unlock(); + } + } finally { + if (holds > 0) { //restore our read lock holds (if we had any) + for (int i = 0; i < holds; i++) { + readLock.lock(); } } }