From 409386588cd3ebb64a1d1171700a2d7ba060f93e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Wed, 14 Aug 2024 10:43:44 +0200 Subject: [PATCH] various attempts at fixing issue --- .../locks/ExclusiveReentrantLockManager.java | 26 ++++-- .../rdf4j/sail/helpers/AbstractSail.java | 2 + .../sail/helpers/AbstractSailConnection.java | 79 +++++++++++++------ .../rdf4j/sail/shacl/ShaclSailConnection.java | 49 ++++++------ 4 files changed, 105 insertions(+), 51 deletions(-) diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java index 6ac7fa4df12..f9896126df1 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java @@ -34,7 +34,7 @@ public class ExclusiveReentrantLockManager { private final int waitToCollect; - LockMonitoring lockMonitoring; + LockMonitoring lockMonitoring; public ExclusiveReentrantLockManager() { this(false); @@ -50,18 +50,18 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency if (trackLocks || Properties.lockTrackingEnabled()) { - lockMonitoring = new LockTracking( + lockMonitoring = new LockTracking<>( true, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager-Tracking", LoggerFactory.getLogger(this.getClass()), waitToCollect, Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); } else { - lockMonitoring = new LockCleaner( + lockMonitoring = new LockCleaner<>( false, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager-Cleaner", LoggerFactory.getLogger(this.getClass()), Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); @@ -87,6 +87,8 @@ private Lock tryExclusiveLockInner() { } + private final AtomicLong ownerIsDead = new AtomicLong(); + private Lock getExclusiveLockInner() throws InterruptedException { synchronized (owner) { @@ -100,6 +102,14 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + long l = ownerIsDead.incrementAndGet(); + if (l > 10) { + ownerIsDead.set(0); + continue; + } + + } lockMonitoring.runCleanup(); owner.wait(waitToCollect); } @@ -113,6 +123,12 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + System.out.println("Owner thread is dead"); +// activeLocks.set(0); +// owner.set(null); +// continue; + } owner.wait(waitToCollect); } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java index 55e58fa3fab..c9b382e76e1 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.rdf4j.common.transaction.IsolationLevel; @@ -119,6 +120,7 @@ protected static boolean debugEnabled() { * debugging was disable at the time the connection was acquired. */ private final Map activeConnections = new IdentityHashMap<>(); +// private final Map activeConnections = new WeakHashMap<>(); /* * constructors diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index ca9bb7b60fb..34fac8cab80 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; +import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager; import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -98,7 +98,7 @@ public abstract class AbstractSailConnection implements SailConnection { private final AtomicReference activeThread = new AtomicReference<>(); @SuppressWarnings("FieldMayBeFinal") - private boolean isOpen = true; + private volatile boolean isOpen = true; private static final VarHandle IS_OPEN; private Thread owner; @@ -106,9 +106,8 @@ public abstract class AbstractSailConnection implements SailConnection { /** * Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a * transaction. - * */ - private final ReentrantLock updateLock = new ReentrantLock(); + private final ExclusiveReentrantLockManager updateLock = new ExclusiveReentrantLockManager(); private final LongAdder iterationsOpened = new LongAdder(); private final LongAdder iterationsClosed = new LongAdder(); @@ -200,8 +199,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); try { if (isActive()) { throw new SailException("a transaction is already active on this connection."); @@ -210,8 +208,11 @@ public void begin(IsolationLevel isolationLevel) throws SailException { startTransactionInternal(); txnActive = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -505,15 +506,19 @@ public final void prepare() throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { prepareInternal(); txnPrepared = true; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -535,7 +540,8 @@ public final void commit() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { if (!txnPrepared) { @@ -546,8 +552,11 @@ public final void commit() throws SailException { txnPrepared = false; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -572,7 +581,8 @@ public final void rollback() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { try { @@ -586,8 +596,11 @@ public final void rollback() throws SailException { debugEnabled ? new Throwable() : null); } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -694,13 +707,17 @@ public final void endUpdate(UpdateContext op) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); endUpdateInternal(op); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -750,14 +767,18 @@ public final void clear(Resource... contexts) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearInternal(contexts); statementsRemoved = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -820,13 +841,17 @@ public final void setNamespace(String prefix, String name) throws SailException verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); setNamespaceInternal(prefix, name); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -848,13 +873,17 @@ public final void removeNamespace(String prefix) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); removeNamespaceInternal(prefix); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -873,13 +902,17 @@ public final void clearNamespaces() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearNamespacesInternal(); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java index 1e0416e76ad..9a9da41e8d5 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java @@ -734,49 +734,51 @@ synchronized public void close() throws SailException { if (closed) { return; } - - if (getWrappedConnection() instanceof AbstractSailConnection) { - AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); - - abstractSailConnection.waitForOtherOperations(true); - } - try { - if (isActive()) { - rollback(); + if (getWrappedConnection() instanceof AbstractSailConnection) { + AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); + + abstractSailConnection.waitForOtherOperations(true); } } finally { - try { - shapesRepoConnection.close(); + try { + if (isActive()) { + rollback(); + } } finally { try { - if (previousStateConnection != null) { - previousStateConnection.close(); - } + shapesRepoConnection.close(); } finally { try { - if (serializableConnection != null) { - serializableConnection.close(); + if (previousStateConnection != null) { + previousStateConnection.close(); } - } finally { + } finally { try { - super.close(); + if (serializableConnection != null) { + serializableConnection.close(); + } } finally { + try { - sail.closeConnection(); + super.close(); } finally { try { - cleanupShapesReadWriteLock(); + sail.closeConnection(); } finally { try { - cleanupReadWriteLock(); + cleanupShapesReadWriteLock(); } finally { - closed = true; - } + try { + cleanupReadWriteLock(); + } finally { + closed = true; + } + } } } } @@ -784,6 +786,7 @@ synchronized public void close() throws SailException { } } } + } @Override