Skip to content

Commit

Permalink
various attempts at fixing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Aug 14, 2024
1 parent 1c8688c commit 4093865
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ExclusiveReentrantLockManager {

private final int waitToCollect;

LockMonitoring<ExclusiveReentrantLock> lockMonitoring;
LockMonitoring<Lock> lockMonitoring;

public ExclusiveReentrantLockManager() {
this(false);
Expand All @@ -50,18 +50,18 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency

if (trackLocks || Properties.lockTrackingEnabled()) {

lockMonitoring = new LockTracking(
lockMonitoring = new LockTracking<>(
true,
"ExclusiveReentrantLockManager",
"ExclusiveReentrantLockManager-Tracking",
LoggerFactory.getLogger(this.getClass()),
waitToCollect,
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
);

} else {
lockMonitoring = new LockCleaner(
lockMonitoring = new LockCleaner<>(
false,
"ExclusiveReentrantLockManager",
"ExclusiveReentrantLockManager-Cleaner",
LoggerFactory.getLogger(this.getClass()),
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
);
Expand All @@ -87,6 +87,8 @@ private Lock tryExclusiveLockInner() {

}

private final AtomicLong ownerIsDead = new AtomicLong();

private Lock getExclusiveLockInner() throws InterruptedException {

synchronized (owner) {
Expand All @@ -100,6 +102,14 @@ private Lock getExclusiveLockInner() throws InterruptedException {
if (lock != null) {
return lock;
} else {
if (!owner.get().isAlive()) {
long l = ownerIsDead.incrementAndGet();
if (l > 10) {
ownerIsDead.set(0);
continue;
}

}
lockMonitoring.runCleanup();
owner.wait(waitToCollect);
}
Expand All @@ -113,6 +123,12 @@ private Lock getExclusiveLockInner() throws InterruptedException {
if (lock != null) {
return lock;
} else {
if (!owner.get().isAlive()) {
System.out.println("Owner thread is dead");
// activeLocks.set(0);
// owner.set(null);
// continue;
}
owner.wait(waitToCollect);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.WeakHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.eclipse.rdf4j.common.transaction.IsolationLevel;
Expand Down Expand Up @@ -119,6 +120,7 @@ protected static boolean debugEnabled() {
* debugging was disable at the time the connection was acquired.
*/
private final Map<SailConnection, Throwable> activeConnections = new IdentityHashMap<>();
// private final Map<SailConnection, Throwable> activeConnections = new WeakHashMap<>();

/*
* constructors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
Expand Down Expand Up @@ -98,17 +98,16 @@ public abstract class AbstractSailConnection implements SailConnection {
private final AtomicReference<Thread> activeThread = new AtomicReference<>();

@SuppressWarnings("FieldMayBeFinal")
private boolean isOpen = true;
private volatile boolean isOpen = true;
private static final VarHandle IS_OPEN;

private Thread owner;

/**
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
* transaction.
*
*/
private final ReentrantLock updateLock = new ReentrantLock();
private final ExclusiveReentrantLockManager updateLock = new ExclusiveReentrantLockManager();
private final LongAdder iterationsOpened = new LongAdder();
private final LongAdder iterationsClosed = new LongAdder();

Expand Down Expand Up @@ -200,8 +199,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
activeThread.setRelease(Thread.currentThread());

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();
try {
if (isActive()) {
throw new SailException("a transaction is already active on this connection.");
Expand All @@ -210,8 +208,11 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
startTransactionInternal();
txnActive = true;
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand Down Expand Up @@ -505,15 +506,19 @@ public final void prepare() throws SailException {
activeThread.setRelease(Thread.currentThread());
verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
if (txnActive) {
prepareInternal();
txnPrepared = true;
}
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand All @@ -535,7 +540,8 @@ public final void commit() throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
if (txnActive) {
if (!txnPrepared) {
Expand All @@ -546,8 +552,11 @@ public final void commit() throws SailException {
txnPrepared = false;
}
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand All @@ -572,7 +581,8 @@ public final void rollback() throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
if (txnActive) {
try {
Expand All @@ -586,8 +596,11 @@ public final void rollback() throws SailException {
debugEnabled ? new Throwable() : null);
}
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand Down Expand Up @@ -694,13 +707,17 @@ public final void endUpdate(UpdateContext op) throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
verifyIsActive();
endUpdateInternal(op);
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand Down Expand Up @@ -750,14 +767,18 @@ public final void clear(Resource... contexts) throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
verifyIsActive();
clearInternal(contexts);
statementsRemoved = true;
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand Down Expand Up @@ -820,13 +841,17 @@ public final void setNamespace(String prefix, String name) throws SailException

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
verifyIsActive();
setNamespaceInternal(prefix, name);
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand All @@ -848,13 +873,17 @@ public final void removeNamespace(String prefix) throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
verifyIsActive();
removeNamespaceInternal(prefix);
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand All @@ -873,13 +902,17 @@ public final void clearNamespaces() throws SailException {

verifyIsOpen();

updateLock.lock();
Lock exclusiveLock = updateLock.getExclusiveLock();

try {
verifyIsActive();
clearNamespacesInternal();
} finally {
updateLock.unlock();
exclusiveLock.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
} finally {
try {
activeThread.setRelease(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,56 +734,59 @@ synchronized public void close() throws SailException {
if (closed) {
return;
}

if (getWrappedConnection() instanceof AbstractSailConnection) {
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();

abstractSailConnection.waitForOtherOperations(true);
}

try {
if (isActive()) {
rollback();
if (getWrappedConnection() instanceof AbstractSailConnection) {
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();

abstractSailConnection.waitForOtherOperations(true);
}
} finally {
try {
shapesRepoConnection.close();

try {
if (isActive()) {
rollback();
}
} finally {
try {
if (previousStateConnection != null) {
previousStateConnection.close();
}
shapesRepoConnection.close();

} finally {
try {
if (serializableConnection != null) {
serializableConnection.close();
if (previousStateConnection != null) {
previousStateConnection.close();
}
} finally {

} finally {
try {
super.close();
if (serializableConnection != null) {
serializableConnection.close();
}
} finally {

try {
sail.closeConnection();
super.close();
} finally {
try {
cleanupShapesReadWriteLock();
sail.closeConnection();
} finally {
try {
cleanupReadWriteLock();
cleanupShapesReadWriteLock();
} finally {
closed = true;
}
try {
cleanupReadWriteLock();
} finally {
closed = true;
}

}
}
}
}
}
}
}
}

}

@Override
Expand Down

0 comments on commit 4093865

Please sign in to comment.