diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java index 6ac7fa4df1..698b086d46 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java @@ -34,7 +34,7 @@ public class ExclusiveReentrantLockManager { private final int waitToCollect; - LockMonitoring lockMonitoring; + LockMonitoring lockMonitoring; public ExclusiveReentrantLockManager() { this(false); @@ -50,18 +50,18 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency if (trackLocks || Properties.lockTrackingEnabled()) { - lockMonitoring = new LockTracking( + lockMonitoring = new LockTracking<>( true, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager(w/tracking)", LoggerFactory.getLogger(this.getClass()), waitToCollect, Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); } else { - lockMonitoring = new LockCleaner( + lockMonitoring = new LockCleaner<>( false, - "ExclusiveReentrantLockManager", + "ExclusiveReentrantLockManager(w/cleaner)", LoggerFactory.getLogger(this.getClass()), Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner) ); @@ -87,6 +87,8 @@ private Lock tryExclusiveLockInner() { } + private final AtomicLong ownerIsDead = new AtomicLong(); + private Lock getExclusiveLockInner() throws InterruptedException { synchronized (owner) { @@ -100,11 +102,20 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + long l = ownerIsDead.incrementAndGet(); + if (l > 10) { + ownerIsDead.set(0); + continue; + } + + } lockMonitoring.runCleanup(); owner.wait(waitToCollect); } } while (true); } else { + int deadCount = 0; while (true) { if (Thread.interrupted()) { throw new InterruptedException(); @@ -113,6 +124,12 @@ private Lock getExclusiveLockInner() throws InterruptedException { if (lock != null) { return lock; } else { + if (!owner.get().isAlive()) { + System.out.println("Owner thread is dead"); +// activeLocks.set(0); +// owner.set(null); +// continue; + } owner.wait(waitToCollect); } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java index 55e58fa3fa..c9b382e76e 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.rdf4j.common.transaction.IsolationLevel; @@ -119,6 +120,7 @@ protected static boolean debugEnabled() { * debugging was disable at the time the connection was acquired. */ private final Map activeConnections = new IdentityHashMap<>(); +// private final Map activeConnections = new WeakHashMap<>(); /* * constructors diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index ca9bb7b60f..976300af88 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; +import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager; import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -98,7 +98,7 @@ public abstract class AbstractSailConnection implements SailConnection { private final AtomicReference activeThread = new AtomicReference<>(); @SuppressWarnings("FieldMayBeFinal") - private boolean isOpen = true; + private volatile boolean isOpen = true; private static final VarHandle IS_OPEN; private Thread owner; @@ -106,9 +106,8 @@ public abstract class AbstractSailConnection implements SailConnection { /** * Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a * transaction. - * */ - private final ReentrantLock updateLock = new ReentrantLock(); + private final ExclusiveReentrantLockManager updateLock = new ExclusiveReentrantLockManager(); private final LongAdder iterationsOpened = new LongAdder(); private final LongAdder iterationsClosed = new LongAdder(); @@ -200,8 +199,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); try { if (isActive()) { throw new SailException("a transaction is already active on this connection."); @@ -210,8 +208,11 @@ public void begin(IsolationLevel isolationLevel) throws SailException { startTransactionInternal(); txnActive = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -505,15 +506,19 @@ public final void prepare() throws SailException { activeThread.setRelease(Thread.currentThread()); verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { prepareInternal(); txnPrepared = true; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -535,7 +540,8 @@ public final void commit() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { if (!txnPrepared) { @@ -546,8 +552,11 @@ public final void commit() throws SailException { txnPrepared = false; } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -572,7 +581,8 @@ public final void rollback() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { if (txnActive) { try { @@ -586,8 +596,11 @@ public final void rollback() throws SailException { debugEnabled ? new Throwable() : null); } } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -694,13 +707,17 @@ public final void endUpdate(UpdateContext op) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); endUpdateInternal(op); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -750,14 +767,18 @@ public final void clear(Resource... contexts) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearInternal(contexts); statementsRemoved = true; } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -820,13 +841,17 @@ public final void setNamespace(String prefix, String name) throws SailException verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); setNamespaceInternal(prefix, name); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -848,13 +873,17 @@ public final void removeNamespace(String prefix) throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); removeNamespaceInternal(prefix); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -873,13 +902,17 @@ public final void clearNamespaces() throws SailException { verifyIsOpen(); - updateLock.lock(); + Lock exclusiveLock = updateLock.getExclusiveLock(); + try { verifyIsActive(); clearNamespacesInternal(); } finally { - updateLock.unlock(); + exclusiveLock.release(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { try { activeThread.setRelease(null); @@ -968,7 +1001,7 @@ protected void prepareInternal() throws SailException { protected abstract void commitInternal() throws SailException; - protected abstract void rollbackInternal() throws SailException; + public abstract void rollbackInternal() throws SailException; protected abstract void addStatementInternal(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException; @@ -999,7 +1032,7 @@ protected AbstractSail getSailBase() { return sailBase; } - private void forceCloseActiveOperations() throws SailException { + public void forceCloseActiveOperations() throws SailException { for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) { System.gc(); try { diff --git a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java index 83fe59d93e..66660995d6 100644 --- a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java +++ b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManagerTest.java @@ -128,7 +128,8 @@ void stalledTest() throws InterruptedException { assertThat(memoryAppender.countEventsForLogger(ExclusiveReentrantLockManager.class.getName())) .isGreaterThanOrEqualTo(1); - memoryAppender.assertContains("is waiting on a possibly stalled lock \"ExclusiveReentrantLockManager\" with id", + memoryAppender.assertContains( + "is waiting on a possibly stalled lock \"ExclusiveReentrantLockManager(w/tracking)\" with id", Level.INFO); memoryAppender.assertContains( "at org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManagerTest.lambda$stalledTest$0(ExclusiveReentrantLockManagerTest.java:", diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java index 7942984593..8f54ca839d 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java @@ -513,7 +513,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { synchronized (datasets) { SailDataset toCloseDataset = null; SailSink toCloseExplicitSink = null; diff --git a/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java b/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java index b599d07a97..cddc2971a9 100755 --- a/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java +++ b/core/sail/extensible-store/src/main/java/org/eclipse/rdf4j/sail/extensiblestore/ExtensibleStoreConnection.java @@ -54,7 +54,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { super.rollbackInternal(); // create a fresh event object. sailChangedEvent = new DefaultSailChangedEvent(sail); diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java index 8dc3c7019e..1ec127f414 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java @@ -102,7 +102,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { super.rollbackInternal(); } finally { diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java index c4123966a4..f51a92d29b 100644 --- a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemoryStoreConnection.java @@ -72,7 +72,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { super.rollbackInternal(); // create a fresh event object. sailChangedEvent = new DefaultSailChangedEvent(sail); diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java index eb2a39fd8b..64e6f9ec1e 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConnection.java @@ -87,7 +87,7 @@ protected void commitInternal() throws SailException { } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { super.rollbackInternal(); } finally { diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java index 829fdd9176..511f587e4a 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java @@ -11,20 +11,6 @@ package org.eclipse.rdf4j.sail.shacl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.concurrent.locks.StampedLockManager; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -58,6 +44,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * @author Heshan Jayasinghe * @author HÃ¥vard Ottestad @@ -107,7 +107,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen private volatile boolean closed; ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection, - SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { + SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { super(connection); this.previousStateConnection = previousStateConnection; this.shapesRepoConnection = shapesRepoConnection; @@ -118,7 +118,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection, - SailRepositoryConnection shapesRepoConnection) { + SailRepositoryConnection shapesRepoConnection) { super(connection); this.previousStateConnection = previousStateConnection; this.shapesRepoConnection = shapesRepoConnection; @@ -129,7 +129,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, - SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { + SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) { super(connection); this.previousStateConnection = null; this.shapesRepoConnection = shapesRepoConnection; @@ -140,7 +140,7 @@ public class ShaclSailConnection extends NotifyingSailConnectionWrapper implemen } ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, - SailRepositoryConnection shapesRepoConnection) { + SailRepositoryConnection shapesRepoConnection) { super(connection); this.previousStateConnection = null; this.serializableConnection = null; @@ -230,7 +230,7 @@ public void begin(IsolationLevel level) throws SailException { /** * @return the transaction settings that are based purely on the settings based down through the begin(...) method - * without considering any sail level settings for things like caching or parallel validation. + * without considering any sail level settings for things like caching or parallel validation. */ private Settings getLocalTransactionSettings() { return new Settings(this); @@ -503,7 +503,7 @@ ConnectionsGroup getConnectionsGroup() { } private ValidationReport performValidation(List shapes, boolean validateEntireBaseSail, - ConnectionsGroup connectionsGroup) throws InterruptedException { + ConnectionsGroup connectionsGroup) throws InterruptedException { long beforeValidation = 0; if (sail.isPerformanceLogging()) { @@ -735,49 +735,54 @@ synchronized public void close() throws SailException { if (closed) { return; } - - if (getWrappedConnection() instanceof AbstractSailConnection) { - AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); - - abstractSailConnection.waitForOtherOperations(true); - } - try { - if (isActive()) { - rollback(); + if (getWrappedConnection() instanceof AbstractSailConnection) { + AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); + abstractSailConnection.waitForOtherOperations(true); + abstractSailConnection.forceCloseActiveOperations(); + if(abstractSailConnection.isActive()) { + abstractSailConnection.rollbackInternal(); + } } } finally { - try { - shapesRepoConnection.close(); + try { + if (isActive()) { + super.close(); + } } finally { try { - if (previousStateConnection != null) { - previousStateConnection.close(); - } + shapesRepoConnection.close(); } finally { try { - if (serializableConnection != null) { - serializableConnection.close(); + if (previousStateConnection != null) { + previousStateConnection.close(); } - } finally { + } finally { try { - super.close(); + if (serializableConnection != null) { + serializableConnection.close(); + } } finally { + try { - sail.closeConnection(); + super.close(); } finally { try { - cleanupShapesReadWriteLock(); + sail.closeConnection(); } finally { try { - cleanupReadWriteLock(); + cleanupShapesReadWriteLock(); } finally { - closed = true; - } + try { + cleanupReadWriteLock(); + } finally { + closed = true; + } + } } } } @@ -785,6 +790,7 @@ synchronized public void close() throws SailException { } } } + } @Override @@ -1040,7 +1046,7 @@ public RdfsSubClassOfReasoner getRdfsSubClassOfReasoner() { @Override public CloseableIteration getStatements(Resource subj, IRI pred, Value obj, - boolean includeInferred, Resource... contexts) throws SailException { + boolean includeInferred, Resource... contexts) throws SailException { if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) { return ConnectionHelper .getCloseableIteration(shapesRepoConnection.getStatements(subj, pred, obj, includeInferred)); @@ -1094,7 +1100,7 @@ public static class Settings { transient private Settings previous = null; public Settings(boolean cacheSelectNodes, boolean validationEnabled, boolean parallelValidation, - IsolationLevel isolationLevel) { + IsolationLevel isolationLevel) { this.cacheSelectedNodes = cacheSelectNodes; if (!validationEnabled) { validationApproach = ValidationApproach.Disabled; @@ -1119,18 +1125,18 @@ public Settings(ShaclSailConnection connection) { validationApproach = (ValidationApproach) transactionSetting; } else if (transactionSetting instanceof ShaclSail.TransactionSettings.PerformanceHint) { switch (((ShaclSail.TransactionSettings.PerformanceHint) transactionSetting)) { - case ParallelValidation: - parallelValidation = true; - break; - case SerialValidation: - parallelValidation = false; - break; - case CacheDisabled: - cacheSelectedNodes = false; - break; - case CacheEnabled: - cacheSelectedNodes = true; - break; + case ParallelValidation: + parallelValidation = true; + break; + case SerialValidation: + parallelValidation = false; + break; + case CacheDisabled: + cacheSelectedNodes = false; + break; + case CacheEnabled: + cacheSelectedNodes = true; + break; } } diff --git a/testsuites/benchmark/pom.xml b/testsuites/benchmark/pom.xml index e90aa3e94e..b71a8c3c45 100644 --- a/testsuites/benchmark/pom.xml +++ b/testsuites/benchmark/pom.xml @@ -115,6 +115,13 @@ + + com.github.siom79.japicmp + japicmp-maven-plugin + + true + + diff --git a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java index 4ee406b0e7..530ba87390 100644 --- a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java +++ b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.rdf4j.common.concurrent.locks.Properties; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.transaction.IsolationLevels; import org.eclipse.rdf4j.model.IRI; @@ -36,6 +37,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -77,6 +79,7 @@ public abstract class SailConcurrencyTest { @BeforeEach public void setUp() { +// Properties.setLockTrackingEnabled(true); store = createSail(); store.init(); vf = store.getValueFactory(); @@ -155,8 +158,9 @@ public int getSize() { * * @see https://github.com/eclipse/rdf4j/issues/693 */ - @Test +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) + @RepeatedTest(100) public void testConcurrentAddLargeTxn() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -197,8 +201,9 @@ public void testConcurrentAddLargeTxn() throws Exception { * Verifies that two large concurrent transactions in separate contexts do not cause inconsistencies or errors when * one of the transactions rolls back at the end. */ - @Test +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) + @RepeatedTest(100) public void testConcurrentAddLargeTxnRollback() throws Exception { logger.info("executing two large concurrent transactions"); final CountDownLatch runnersDone = new CountDownLatch(2); @@ -317,7 +322,8 @@ public void testGetContextIDs() throws Exception { } } - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdown() throws InterruptedException { if (store instanceof AbstractSail) { @@ -362,7 +368,8 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException { } // @Disabled - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testSerialThreads() throws InterruptedException { if (store instanceof AbstractSail) { @@ -443,7 +450,8 @@ public void testSerialThreads() throws InterruptedException { } - @Test + @RepeatedTest(100) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedException { if (store instanceof AbstractSail) { @@ -499,8 +507,9 @@ public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedE } - @Test - @Timeout(value = 30, unit = TimeUnit.MINUTES) + @RepeatedTest(100) +// @Test +// @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { if (store instanceof AbstractSail) { ((AbstractSail) store).setConnectionTimeOut(200); @@ -545,13 +554,25 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept try { if (thread2.isAlive()) { +// try { connection2.get().close(); + +// }finally { connection1.get().close(); + +// } } else { +// try { connection1.get().close(); + +// }finally { connection2.get().close(); + +// } } - } catch (SailException ignored) { + } catch (Throwable logged) { + logger.error("Error closing connection", logged); + throw logged; } try (SailConnection connection = store.getConnection()) { @@ -575,7 +596,8 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept store.shutDown(); } - @Test + @RepeatedTest(1000) +// @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) public void testConcurrentConnectionsShutdownAndCloseRollback() throws InterruptedException { if (store instanceof AbstractSail) { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java index e1df6b6ca8..e9c98f8c67 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java @@ -362,7 +362,7 @@ protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Reso } @Override - protected void rollbackInternal() throws SailException { + public void rollbackInternal() throws SailException { try { getWriteStrategyInternal().rollback(); } catch (RepositoryException e) {