From 1bded6f12338aeac3f8def124ff7b211f335c36c Mon Sep 17 00:00:00 2001 From: Werner Altewischer Date: Fri, 3 May 2024 13:27:57 +0200 Subject: [PATCH 1/2] Added read-committed transaction isolation support to TransactionAwareCacheDecorator. - Added complete isolation and consistency support for concurrent transactions. - Added accompanying Javadoc documentation. - Added unit test testing the isolation properties introduced. --- .../TransactionAwareCacheDecorator.java | 268 ++++++- ...eDecoratorReadCommittedIsolationTests.java | 658 ++++++++++++++++++ 2 files changed, 892 insertions(+), 34 deletions(-) create mode 100644 spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java diff --git a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java index 45b5870dfecc..3cab9d940510 100644 --- a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java +++ b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java @@ -16,22 +16,28 @@ package org.springframework.cache.transaction; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - import org.springframework.cache.Cache; +import org.springframework.cache.support.SimpleValueWrapper; import org.springframework.lang.Nullable; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + /** * Cache decorator which synchronizes its {@link #put}, {@link #evict} and * {@link #clear} operations with Spring-managed transactions (through Spring's * {@link TransactionSynchronizationManager}), performing the actual cache * put/evict/clear operation only in the after-commit phase of a successful - * transaction. If no transaction is active, {@link #put}, {@link #evict} and + * transaction. Within a transaction this decorator provides consistency for all operations + * performed in order and read-committed isolation from other transactions. + * If no transaction is active, {@link #put}, {@link #evict} and * {@link #clear} operations will be performed immediately, as usual. * *

Note: Use of immediate operations such as {@link #putIfAbsent} and @@ -46,16 +52,42 @@ */ public class TransactionAwareCacheDecorator implements Cache { + // Special value which marks a value as being evicted within the transaction state + private static final Object EVICTED = new Object(); + private final Cache targetCache; + // Thread local for storing the changes made within a transaction + private final ThreadLocal transactionState = new ThreadLocal<>(); + + // Whether commits should be synchronized or not + private final boolean synchronizeCommits; /** * Create a new TransactionAwareCache for the given target Cache. + *

+ * Commits are not synchronized for efficiency, meaning concurrent clear/evict/put events in concurrent transactions could yield different results on the size of the cache + * after all transactions are done. + * * @param targetCache the target Cache to decorate */ public TransactionAwareCacheDecorator(Cache targetCache) { + this(targetCache, false); + } + + /** + * Create a new TransactionAwareCache for the given target Cache, specifying the consistency needed for transaction commits. + *

+ * If synchronizedCommits is set to true, all commits of concurrent transactions are performed in a synchronized fashion, thereby yielding predictable results + * for the size of the cache after all operations are finished. + * + * @param targetCache the target Cache to decorate + * @param synchronizeCommits whether full consistency should be maintained for concurrent transaction commits, i.e. all commits are synchronized to the cache. + */ + public TransactionAwareCacheDecorator(Cache targetCache, boolean synchronizeCommits) { Assert.notNull(targetCache, "Target Cache must not be null"); this.targetCache = targetCache; + this.synchronizeCommits = synchronizeCommits; } @@ -79,18 +111,50 @@ public Object getNativeCache() { @Override @Nullable public ValueWrapper get(Object key) { - return this.targetCache.get(key); + final var transactionState = getTransactionState(); + final var current = transactionState == null ? null : transactionState.get(key); + if (current != null) { + return convert(current); + } else { + return this.targetCache.get(key); + } } @Override @Nullable public T get(Object key, @Nullable Class type) { - return this.targetCache.get(key, type); + final var transactionState = getTransactionState(); + final var wrapper = transactionState == null ? null : transactionState.get(key); + if (wrapper != null) { + // Unwrap + final var value = convertValue(wrapper); + return cast(value, type); + } else { + return this.targetCache.get(key, type); + } } @Override @Nullable public T get(Object key, Callable valueLoader) { + final var transactionState = getTransactionState(); + if (transactionState != null) { + final var wrapper = transactionState.get(key); + final boolean isEvicted = wrapper != null && wrapper.get() == EVICTED; + if (isEvicted || (wrapper == null && this.targetCache.get(key) == null)) { + // Compute value within transaction state + final T value; + try { + value = valueLoader.call(); + } catch (Exception e) { + throw new ValueRetrievalException(key, valueLoader, e); + } + transactionState.put(key, value); + return value; + } else if (wrapper != null) { + return convertValue(wrapper); + } + } return this.targetCache.get(key, valueLoader); } @@ -107,15 +171,10 @@ public CompletableFuture retrieve(Object key, Supplier putValues = new HashMap<>(); + + boolean clearCalled = false; + + @Nullable + ValueWrapper get(Object key) { + var result = putValues.get(key); + if (result == null && clearCalled) { + result = new SimpleValueWrapper(EVICTED); + } + return result; + } + + + void clear() { + clearCalled = true; + putValues.clear(); + } + + void reset() { + clearCalled = false; + putValues.clear(); + } + + void revert(Object key) { + putValues.remove(key); + } + + void evict(Object key) { + putValues.put(key, new SimpleValueWrapper(EVICTED)); + } + + void put(Object key, @Nullable Object value) { + putValues.put(key, new SimpleValueWrapper(value)); + } + + void commitTo(Cache cache) { + if (clearCalled) { + cache.clear(); + } + putValues.forEach((key, valueWrapper) -> { + final var value = valueWrapper.get(); + if (value == EVICTED) { + cache.evict(key); + } else { + cache.put(key, value); + } + }); + } + } + + /** + * Converts the wrapper to effective wrapper, i.e. returns null if the wrapper contains the special EVICTED value and the wrapper otherwise. + * + * @param wrapper The wrapper to convert + * @return The converted wrapper + */ + @Nullable + private static ValueWrapper convert(@Nullable ValueWrapper wrapper) { + if (wrapper != null && wrapper.get() == EVICTED) { + return null; + } + return wrapper; + } + + /** + * Converts the value of the specified wrapper, i.e. returns null if the wrapper contains the special EVICTED value or its value otherwise. + * + * @param wrapper The wrapper to extract the value from + * @return The converted value + */ + @SuppressWarnings("unchecked") + @Nullable + private static T convertValue(@Nullable ValueWrapper wrapper) { + final var effectiveWrapper = convert(wrapper); + return effectiveWrapper == null ? null : (T)effectiveWrapper.get(); + } + + /** + * Requires the specified value to be null or be an instance of the specified type. + * Throws an IllegalStateException otherwise. + * + * @param value The value + * @param type The type + * @return The cast value + */ + @SuppressWarnings("unchecked") + @Nullable + public static T cast(@Nullable Object value, @Nullable Class type) { + if (value != null && type != null && !type.isInstance(value)) { + throw new IllegalStateException( + "Cached value is not of required type [" + type.getName() + "]: " + value); + } + return (T)value; + } + + /** + * Gets the current transaction state or null if no transaction is active. + * When first invoked within a transaction, a transaction synchronization is registered which will apply any changes in the current transaction on commit. + * + * @return The current transaction state + */ + @Nullable + private TransactionState getTransactionState() { + if (TransactionSynchronizationManager.isSynchronizationActive()) { + var state = transactionState.get(); + if (state == null) { + state = new TransactionState(); + transactionState.set(state); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + final var currentState = Objects.requireNonNull(transactionState.get()); + // Transfer any modifications to the underlying cache if the transaction committed + if (status == STATUS_COMMITTED) { + if (synchronizeCommits) { + synchronized (targetCache) { + currentState.commitTo(targetCache); + } + } else { + currentState.commitTo(targetCache); + } + } + transactionState.remove(); + } + }); + } + return state; + } else { + return null; + } + } } diff --git a/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java b/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java new file mode 100644 index 000000000000..22f1c8898608 --- /dev/null +++ b/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java @@ -0,0 +1,658 @@ +package org.springframework.cache.transaction; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.cache.Cache; +import org.springframework.cache.concurrent.ConcurrentMapCache; +import org.springframework.lang.Nullable; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.transaction.support.TransactionSynchronization.STATUS_COMMITTED; +import static org.springframework.transaction.support.TransactionSynchronization.STATUS_ROLLED_BACK; + +public class TransactionAwareCacheDecoratorReadCommittedIsolationTests { + + private Cache cache; + private ConcurrentHashMap map; + + @BeforeEach + void setUp() { + map = new ConcurrentHashMap<>(); + cache = new TransactionAwareCacheDecorator(new ConcurrentMapCache("cacheTest", map, true), true); + } + + + @Test + void getName() { + assertEquals("cacheTest", cache.getName()); + } + + @Test + void getNativeCache() { + assertSame(map, cache.getNativeCache()); + } + + @Test + void putAndGetObject() { + assertNull(cache.get("foo")); + cache.put("foo", "bar"); + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.put("foo", "baz"); + assertEquals("baz", cache.get("foo", String.class)); + assertEqualWrapperContents("baz", cache.get("foo")); + } + + @Test + void putAndGetObjectWithSuccessfulTransaction() { + // First put a value there outside of the transaction + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + cache.put("foo", "bar"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.put("foo", "baz"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("baz", cache.get("foo", String.class)); + assertEqualWrapperContents("baz", cache.get("foo")); + return STATUS_COMMITTED; + }); + + // Now the value should be committed + assertEquals("baz", map.get("foo")); + assertEqualWrapperContents("baz", cache.get("foo")); + assertEquals("baz", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void putAndGetObjectWithUnsuccessfulTransaction() { + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + cache.put("foo", "bar"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.put("foo", "baz"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("baz", cache.get("foo", String.class)); + assertEqualWrapperContents("baz", cache.get("foo")); + return STATUS_ROLLED_BACK; + }); + + // Now the value should not be committed + assertEquals("foo", map.get("foo")); + assertEqualWrapperContents("foo", cache.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void computeObjectWithSuccessfulTransaction() { + // First put a value there outside of the transaction + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + cache.evict("foo"); + cache.get("foo", () -> "bar"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.evict("foo"); + cache.get("foo", () -> "baz"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("baz", cache.get("foo", String.class)); + assertEqualWrapperContents("baz", cache.get("foo")); + return STATUS_COMMITTED; + }); + + // Now the value should be committed + assertEquals("baz", map.get("foo")); + assertEqualWrapperContents("baz", cache.get("foo")); + assertEquals("baz", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void computeObjectWithUnsuccessfulTransaction() { + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + cache.evict("foo"); + cache.get("foo", () -> "bar"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.evict("foo"); + cache.get("foo", () -> "baz"); + + // Should not be committed + assertEquals("foo", map.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + + assertEquals("baz", cache.get("foo", String.class)); + assertEqualWrapperContents("baz", cache.get("foo")); + return STATUS_ROLLED_BACK; + }); + + // Now the value should not be committed + assertEquals("foo", map.get("foo")); + assertEqualWrapperContents("foo", cache.get("foo")); + assertEquals("foo", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void evictUnsuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + cache.evict("foo2"); + cache.evict("foo4"); + assertNull(cache.get("foo2")); + assertNull(cache.get("foo4")); + return STATUS_ROLLED_BACK; + }); + + assertEquals(4, map.size()); + for (var key: keys) { + assertTrue(map.contains(key)); + assertEquals(key, cache.get(key, String.class)); + } + } + + @Test + void evictSuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + cache.evict("foo2"); + cache.evict("foo4"); + assertNull(cache.get("foo2")); + assertNull(cache.get("foo4")); + return STATUS_COMMITTED; + }); + + assertEquals(2, map.size()); + for (var key: Set.of("foo1", "foo3")) { + assertTrue(map.contains(key)); + assertEquals(key, cache.get(key, String.class)); + } + } + + @Test + void clearUnsuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + cache.clear(); + cache.put("foo1", "bar1"); + assertNull(cache.get("foo2")); + assertEquals("bar1", cache.get("foo1", String.class)); + return STATUS_ROLLED_BACK; + }); + + assertEquals(4, map.size()); + for (var key: keys) { + assertTrue(map.contains(key)); + assertEquals(key, cache.get(key, String.class)); + } + } + + @Test + void clearSuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + cache.clear(); + cache.put("foo1", "bar1"); + assertNull(cache.get("foo2")); + assertEquals("bar1", cache.get("foo1", String.class)); + return STATUS_COMMITTED; + }); + + assertEquals(1, map.size()); + assertEquals("bar1", cache.get("foo1", String.class)); + } + + @Test + void invalidateSuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + assertTrue(cache.invalidate()); + + // Should have committed + assertEquals(Boolean.FALSE, withOtherTransaction(() -> cache.invalidate())); + assertNull(withOtherTransaction(() -> cache.get("foo1"))); + assertNull(cache.get("foo1")); + assertFalse(cache.invalidate()); + + cache.put("foo1", "bar1"); + assertEquals("bar1", cache.get("foo1", String.class)); + return STATUS_COMMITTED; + }); + + assertEquals(1, map.size()); + assertEquals("bar1", cache.get("foo1", String.class)); + } + + @Test + void invalidateUnsuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + assertTrue(cache.invalidate()); + + // Should have committed + assertFalse(withOtherTransaction(() -> cache.invalidate())); + assertNull(withOtherTransaction(() -> cache.get("foo1"))); + assertNull(cache.get("foo1")); + assertFalse(cache.invalidate()); + + cache.put("foo1", "bar1"); + assertEquals("bar1", cache.get("foo1", String.class)); + return STATUS_ROLLED_BACK; + }); + + assertEquals(0, map.size()); + assertNull(cache.get("foo1")); + } + + @Test + void evictIfPresentSuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + assertTrue(cache.evictIfPresent("foo2")); + assertTrue(cache.evictIfPresent("foo4")); + assertNull(cache.get("foo2")); + assertNull(cache.get("foo4")); + // Should be immediately committed + assertNull(map.get("foo2")); + assertNull(map.get("foo4")); + return STATUS_COMMITTED; + }); + + assertEquals(2, map.size()); + for (var key: Set.of("foo1", "foo3")) { + assertTrue(map.contains(key)); + assertEquals(key, cache.get(key, String.class)); + } + } + + @Test + void evictIfPresentUnsuccessfulTransaction() { + final var keys = Set.of("foo1", "foo2", "foo3", "foo4"); + for (var key: keys) { + cache.put(key, key); + } + withSimulatedTransaction(() -> { + for (var key: keys) { + assertEquals(key, cache.get(key, String.class)); + } + assertTrue(cache.evictIfPresent("foo2")); + assertTrue(cache.evictIfPresent("foo4")); + assertNull(cache.get("foo2")); + assertNull(cache.get("foo4")); + // Should be immediately committed + assertNull(map.get("foo2")); + assertNull(map.get("foo4")); + return STATUS_ROLLED_BACK; + }); + + // Evict should commit even if rolled back + assertEquals(2, map.size()); + for (var key: Set.of("foo1", "foo3")) { + assertTrue(map.contains(key)); + assertEquals(key, cache.get(key, String.class)); + } + } + + @Test + void putIfAbsentWithSuccessfulTransaction() { + // First put a value there outside of the transaction + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + + cache.evict("foo"); + + // From the perspective of put if absent the cache should still contain "foo" + assertEqualWrapperContents("foo", cache.putIfAbsent("foo", "bar")); + + // Put in transaction state + cache.put("foo", "baz"); + + assertEquals("baz", cache.get("foo", String.class)); + + // Now remove from the committed cache, should also remove from the transaction cache + cache.evictIfPresent("foo"); + + assertNull(withOtherTransaction(() -> cache.get("foo"))); + + assertNull(cache.get("foo")); + + // Now this should put the new value + assertNull(cache.putIfAbsent("foo", "bar")); + + // Should also be visible in the transaction + assertEquals("bar", cache.get("foo", String.class)); + + // Should also be visible in other transaction + assertEquals("bar", withOtherTransaction(() -> cache.get("foo", String.class))); + + return STATUS_COMMITTED; + }); + + // Now the value should be committed + assertEquals("bar", map.get("foo")); + assertEqualWrapperContents("bar", cache.get("foo")); + assertEquals("bar", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void putIfAbsentWithUnsuccessfulTransaction() { + // First put a value there outside of the transaction + cache.put("foo", "foo"); + withSimulatedTransaction(() -> { + assertEquals("foo", cache.get("foo", String.class)); + + cache.evict("foo"); + + // From the perspective of put if absent the cache should still contain "foo" + assertEqualWrapperContents("foo", cache.putIfAbsent("foo", "bar")); + + // Put in transaction state + cache.put("foo", "baz"); + + assertEquals("baz", cache.get("foo", String.class)); + + // Now remove from the committed cache, should also remove from the transaction cache + cache.evictIfPresent("foo"); + + assertNull(withOtherTransaction(() -> cache.get("foo"))); + + assertNull(cache.get("foo")); + + // Now this should put the new value + assertNull(cache.putIfAbsent("foo", "bar")); + + // Should also be visible in the transaction + assertEquals("bar", cache.get("foo", String.class)); + + // Should also be visible in other transaction + assertEquals("bar", withOtherTransaction(() -> cache.get("foo", String.class))); + + // Put another value (this should not commit) + cache.put("foo", "bla"); + + return STATUS_ROLLED_BACK; + }); + + // Now the value should be committed even though the transaction rolled back + assertEquals("bar", map.get("foo")); + assertEqualWrapperContents("bar", cache.get("foo")); + assertEquals("bar", withOtherTransaction(() -> cache.get("foo", String.class))); + } + + @Test + void putNull() { + assertNull(cache.get("foo", String.class)); + assertNull(cache.get("foo")); + cache.put("foo", null); + assertNull(cache.get("foo", String.class)); + assertEqualWrapperContents(null, cache.get("foo")); + } + + @Test + void putAndGetWithValueLoader() { + assertNull(cache.get("foo")); + cache.get("foo", () -> "bar"); + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + cache.get("foo", () -> "baz"); + assertEquals("bar", cache.get("foo", String.class)); + assertEqualWrapperContents("bar", cache.get("foo")); + } + + + @Test + void putAndGetWithValueLoaderConcurrently() throws Exception { + assertNull(cache.get("foo")); + final AtomicReference generatedValue = new AtomicReference<>(); + final AtomicInteger executeCount = new AtomicInteger(0); + runConcurrently((i) -> { + cache.get("foo", () -> { + final var value = UUID.randomUUID().toString(); + generatedValue.set(value); + executeCount.incrementAndGet(); + return value; + }); + }); + assertEquals(1, executeCount.get()); + final var value = generatedValue.get(); + assertNotNull(value); + assertEquals(value, cache.get("foo", String.class)); + } + + @Test + void commitConcurrentTransactionsConsistency() { + runConcurrently((threadIndex) -> { + // Make random modifications. The cache value count should be consistent at the end. + withSimulatedTransaction(() -> { + int operationCount = 500; + for (int i = 0; i < operationCount; ++i) { + if (i % 100 == 0) { + // Even operations are clear + cache.clear(); + } else { + // Odd operations add a key + cache.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + } + return STATUS_COMMITTED; + }); + }); + // There should be exactly 99 puts after the last clear, transaction commits need to be synchronized. + assertEquals(99, map.size()); + } + + @Test + void clear() { + cache.put("foo1", "bar"); + cache.put("foo2", "baz"); + cache.clear(); + assertNull(cache.get("foo1")); + assertNull(cache.get("foo2")); + } + + @Test + void invalidate() { + assertFalse(cache.invalidate()); + cache.put("foo1", "bar"); + cache.put("foo2", "baz"); + assertTrue(cache.invalidate()); + assertNull(cache.get("foo1")); + assertNull(cache.get("foo2")); + } + + @Test + void evict() { + cache.evict("foo"); + assertNull(cache.get("foo")); + cache.put("foo", "bar"); + assertEquals("bar", cache.get("foo", String.class)); + cache.evict("foo"); + assertNull(cache.get("foo")); + } + + @Test + void evictIfPresent() { + assertFalse(cache.evictIfPresent("foo")); + assertNull(cache.get("foo")); + cache.put("foo", "bar"); + assertEquals("bar", cache.get("foo", String.class)); + assertTrue(cache.evictIfPresent("foo")); + assertNull(cache.get("foo")); + } + + @Test + void putIfAbsent() { + assertNull(cache.putIfAbsent("foo", "bar")); + assertEqualWrapperContents("bar", cache.putIfAbsent("foo", "baz")); + assertEqualWrapperContents("bar", cache.get("foo")); + } + + @Test + void putIfAbsentConcurrently() throws Exception { + assertNull(cache.get("foo")); + final AtomicReference putValue = new AtomicReference<>(); + final AtomicInteger executeCount = new AtomicInteger(0); + runConcurrently((i) -> { + final var value = UUID.randomUUID().toString(); + final var result = cache.putIfAbsent("foo", value); + if (result == null) { + executeCount.incrementAndGet(); + putValue.set(value); + } + }); + assertEquals(1, executeCount.get()); + final var value = putValue.get(); + assertNotNull(value); + assertEquals(value, cache.get("foo", String.class)); + } + + private void withSimulatedTransaction(Supplier supplier) { + try { + TransactionSynchronizationManager.initSynchronization(); + final var transactionStatus = supplier.get(); + for (var sync: TransactionSynchronizationManager.getSynchronizations()) { + sync.afterCompletion(transactionStatus); + } + } finally { + TransactionSynchronizationManager.clearSynchronization(); + } + } + + @Nullable + private T withOtherTransaction(Supplier supplier) { + final var result = new AtomicReference(); + final var t = new Thread(() -> { + result.set(supplier.get()); + }); + t.start(); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException("Thread was interrupted", e); + } + return result.get(); + } + + private void assertEqualWrapperContents(@Nullable Object expectedContents, @Nullable Cache.ValueWrapper valueWrapper) { + assertNotNull(valueWrapper); + assertEquals(expectedContents, valueWrapper.get()); + } + + private void runConcurrently(Consumer runnable) { + final int threadCount = 10; + final var executor = new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.HOURS, new LinkedBlockingQueue<>()); + final AtomicInteger callCount = new AtomicInteger(0); + + for (int i = 0; i < threadCount; ++i) { + final int threadIndex = i; + executor.execute(() -> { + callCount.incrementAndGet(); + runnable.accept(threadIndex); + }); + } + executor.shutdown(); + try { + assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail("Got interrupted", e); + } + assertEquals(threadCount, callCount.get()); + } +} From 125a167e83c2a0c452f2eabc2f4f7a87384968c2 Mon Sep 17 00:00:00 2001 From: Werner Altewischer Date: Fri, 3 May 2024 13:43:01 +0200 Subject: [PATCH 2/2] Renamed 'putValues' to 'modifications' and retained order of operations --- .../TransactionAwareCacheDecorator.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java index 3cab9d940510..accc9479f449 100644 --- a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java +++ b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java @@ -23,7 +23,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; @@ -234,13 +234,13 @@ public boolean invalidate() { * Keeps track of the state changes which happened in the current transaction. */ private static class TransactionState { - private final Map putValues = new HashMap<>(); + private final Map modifications = new LinkedHashMap<>(); boolean clearCalled = false; @Nullable ValueWrapper get(Object key) { - var result = putValues.get(key); + var result = modifications.get(key); if (result == null && clearCalled) { result = new SimpleValueWrapper(EVICTED); } @@ -250,31 +250,31 @@ ValueWrapper get(Object key) { void clear() { clearCalled = true; - putValues.clear(); + modifications.clear(); } void reset() { clearCalled = false; - putValues.clear(); + modifications.clear(); } void revert(Object key) { - putValues.remove(key); + modifications.remove(key); } void evict(Object key) { - putValues.put(key, new SimpleValueWrapper(EVICTED)); + modifications.put(key, new SimpleValueWrapper(EVICTED)); } void put(Object key, @Nullable Object value) { - putValues.put(key, new SimpleValueWrapper(value)); + modifications.put(key, new SimpleValueWrapper(value)); } void commitTo(Cache cache) { if (clearCalled) { cache.clear(); } - putValues.forEach((key, valueWrapper) -> { + modifications.forEach((key, valueWrapper) -> { final var value = valueWrapper.get(); if (value == EVICTED) { cache.evict(key);