From f744b3dd26596148fa88cab9e15db8214d833e4f Mon Sep 17 00:00:00 2001 From: Dennis Mercuriali Date: Mon, 8 Jan 2024 14:35:28 +0100 Subject: [PATCH] Invalidating client is not removed from the list of clients holding the invalidated key (#192) * simplified client-for-fetch choosing and added some concurrency tests * remove invalidating client from clients with key --------- Co-authored-by: dennis.mercuriali --- .../java/blazingcache/client/CacheClient.java | 15 ++ .../client/impl/InternalClientListener.java | 9 + .../java/blazingcache/server/CacheServer.java | 78 +++--- .../client/FetchPriorityTest.java | 96 ++++++- .../blazingcache/client/InvalidateTest.java | 99 ++++++++ .../client/LoadConcurrencyTest.java | 236 ++++++++++++++++++ ...ckOnLostFetchMessageAndSlowClientTest.java | 204 ++++++++++++++- 7 files changed, 684 insertions(+), 53 deletions(-) create mode 100644 blazingcache-core/src/test/java/blazingcache/client/InvalidateTest.java create mode 100644 blazingcache-core/src/test/java/blazingcache/client/LoadConcurrencyTest.java diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java index 2d80382..472fe66 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java @@ -1179,6 +1179,11 @@ private void invalidate(RawString _key, KeyLock lock) throws InterruptedExceptio request.setParameter("lockId", lock.getLockId()); } Message response = _channel.sendMessageWithReply(request, invalidateTimeout); + + if (internalClientListener != null) { + internalClientListener.onInvalidateResponse(_key.toString(), response); + } + if (LOGGER.isLoggable(Level.FINEST)) { LOGGER.log(Level.FINEST, "invalidate {0}, -> {1}", new Object[]{_key, response}); } @@ -1426,6 +1431,11 @@ private boolean load(String key, byte[] data, Object reference, long expireTime, request.setParameter("lockId", lock.getLockId()); } Message response = _chanel.sendMessageWithReply(request, invalidateTimeout); + + if (internalClientListener != null) { + internalClientListener.onLoadResponse(key, response); + } + if (response.type != Message.TYPE_ACK) { throw new CacheException("error while loading key " + key + " (" + response + ")"); } @@ -1473,6 +1483,11 @@ private boolean put(RawString _key, byte[] data, Object reference, long expireTi request.setParameter("lockId", lock.getLockId()); } Message response = _chanel.sendMessageWithReply(request, invalidateTimeout); + + if (internalClientListener != null) { + internalClientListener.onPutResponse(_key.toString(), response); + } + if (response.type != Message.TYPE_ACK) { throw new CacheException("error while putting key " + _key + " (" + response + ")"); } diff --git a/blazingcache-core/src/main/java/blazingcache/client/impl/InternalClientListener.java b/blazingcache-core/src/main/java/blazingcache/client/impl/InternalClientListener.java index 461f7b2..8120a92 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/impl/InternalClientListener.java +++ b/blazingcache-core/src/main/java/blazingcache/client/impl/InternalClientListener.java @@ -39,6 +39,15 @@ public default boolean messageReceived(Message message, Channel channel) { public default void onFetchResponse(String key, Message message) { } + public default void onLoadResponse(String key, Message response) { + } + + public default void onInvalidateResponse(String key, Message response) { + } + + public default void onPutResponse(String key, Message response) { + } + public default void onRequestSent(Message message) { } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServer.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServer.java index 1b1d565..ce69cee 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServer.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServer.java @@ -19,8 +19,18 @@ */ package blazingcache.server; +import blazingcache.management.JMXUtils; +import blazingcache.network.Message; +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyChannelAcceptor; +import blazingcache.server.management.BlazingCacheServerStatusMXBean; +import blazingcache.server.management.CacheServerStatusMXBean; +import blazingcache.utils.RawString; +import blazingcache.zookeeper.LeaderShipChangeListener; +import blazingcache.zookeeper.ZKClusterManager; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,24 +39,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Level; import java.util.logging.Logger; - import org.apache.zookeeper.ZooKeeper; -import blazingcache.management.JMXUtils; -import blazingcache.network.Message; -import blazingcache.network.ServerHostData; -import blazingcache.network.netty.NettyChannelAcceptor; -import blazingcache.server.management.BlazingCacheServerStatusMXBean; -import blazingcache.server.management.CacheServerStatusMXBean; -import blazingcache.utils.RawString; -import blazingcache.zookeeper.LeaderShipChangeListener; -import blazingcache.zookeeper.ZKClusterManager; - /** * The CacheServer core. * @@ -359,6 +358,7 @@ public void invalidateKey(RawString key, String sourceClientId, String clientPro SimpleCallback finishAndReleaseLock = new SimpleCallback() { @Override public void onResult(RawString result, Throwable error) { + cacheStatus.removeKeyForClient(key, sourceClientId); locksManager.releaseWriteLockForKey(key, sourceClientId, lockID); onFinish.onResult(result, error); } @@ -446,45 +446,47 @@ public void onResult(Message result, Throwable error) { return; } - List candidates = new ArrayList<>(); + int maxPriority = 0; + List maxPriorityCandidates = new ArrayList<>(); for (String remoteClientId : clientsForKey) { CacheServerSideConnection connection = acceptor.getActualConnectionFromClient(remoteClientId); - if (connection != null && connection.getFetchPriority() > 0) { - candidates.add(connection); + if (connection != null) { + int fetchPriority = connection.getFetchPriority(); + if (fetchPriority == 0 || fetchPriority < maxPriority) { + continue; + } + + if (fetchPriority > maxPriority) { + maxPriorityCandidates.clear(); + maxPriority = fetchPriority; + } + maxPriorityCandidates.add(connection); } } - candidates.sort((a, b) -> { - int priorityComparison = b.getFetchPriority() - a.getFetchPriority(); - if (priorityComparison == 0) { - return ThreadLocalRandom.current().nextInt(); - } - return priorityComparison; - }); boolean foundOneGoodClientConnected = false; - for (CacheServerSideConnection connection : candidates) { - String remoteClientId = connection.getClientId(); + if (!maxPriorityCandidates.isEmpty()) { + CacheServerSideConnection connection = maxPriorityCandidates.get(ThreadLocalRandom.current().nextInt(maxPriorityCandidates.size())); + String remoteClientId = connection.getClientId(); UnicastRequestStatus unicastRequestStatus = new UnicastRequestStatus(clientId, remoteClientId, "fetch " + key); networkRequestsStatusMonitor.register(unicastRequestStatus); - connection.sendFetchKeyMessage(remoteClientId, key, new SimpleCallback() { - - @Override - public void onResult(Message result, Throwable error) { - networkRequestsStatusMonitor.unregister(unicastRequestStatus); - LOGGER.log(Level.FINE, "client " + remoteClientId + " answer to fetch :" + result, error); - if (result != null && result.type == Message.TYPE_ACK) { - // da questo momento consideriamo che il client abbia la entry in memoria - // anche se di fatto potrebbe succedere che il messaggio di risposta non arrivi mai - long expiretime = (long) result.parameters.get("expiretime"); - cacheStatus.registerKeyForClient(key, clientId, expiretime); - } - finishAndReleaseLock.onResult(result, error); + + connection.sendFetchKeyMessage(remoteClientId, key, (result, error) -> { + networkRequestsStatusMonitor.unregister(unicastRequestStatus); + LOGGER.log(Level.FINE, "client " + remoteClientId + " answer to fetch :" + result, error); + if (result != null && result.type == Message.TYPE_ACK) { + // da questo momento consideriamo che il client abbia la entry in memoria + // anche se di fatto potrebbe succedere che il messaggio di risposta non arrivi mai + long expiretime = (long) result.parameters.get("expiretime"); + cacheStatus.registerKeyForClient(key, clientId, expiretime); } + finishAndReleaseLock.onResult(result, error); }); + foundOneGoodClientConnected = true; - break; } + if (!foundOneGoodClientConnected) { finishAndReleaseLock.onResult(Message.ERROR(clientId, new Exception("no connected client for key " + key)), null); } diff --git a/blazingcache-core/src/test/java/blazingcache/client/FetchPriorityTest.java b/blazingcache-core/src/test/java/blazingcache/client/FetchPriorityTest.java index 4e8509e..21779c0 100644 --- a/blazingcache-core/src/test/java/blazingcache/client/FetchPriorityTest.java +++ b/blazingcache-core/src/test/java/blazingcache/client/FetchPriorityTest.java @@ -19,26 +19,20 @@ */ package blazingcache.client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import blazingcache.client.impl.InternalClientListener; import blazingcache.network.Channel; import blazingcache.network.Message; import blazingcache.network.ServerHostData; import blazingcache.network.netty.NettyCacheServerLocator; import blazingcache.server.CacheServer; - import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - import org.junit.Test; -import static org.junit.Assert.assertTrue; - /** * Test for slow cclients an fetches * @@ -274,4 +268,86 @@ public boolean messageReceived(Message message, Channel channel) { } } + @Test + public void testTryOnlyFirstClient() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(1000); + cacheServer.start(); + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) { + client1.setFetchPriority(1); + client2.setFetchPriority(10); + client3.setFetchPriority(5); + client1.start(); + client2.start(); + client3.start(); + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + assertTrue(client3.waitForConnection(10000)); + + AtomicInteger fetchesServedByClient1 = new AtomicInteger(); + AtomicInteger fetchesServedByClient2 = new AtomicInteger(); + AtomicInteger fetchesServedByClient3 = new AtomicInteger(); + + client1.setInternalClientListener(new InternalClientListener() { + + @Override + public boolean messageReceived(Message message, Channel channel) { + if (message.type == Message.TYPE_FETCH_ENTRY) { + fetchesServedByClient1.incrementAndGet(); + } + return true; + } + }); + + client2.setInternalClientListener(new InternalClientListener() { + @Override + public boolean messageReceived(Message message, Channel channel) { + if (message.type == Message.TYPE_FETCH_ENTRY) { + fetchesServedByClient2.incrementAndGet(); + channel.sendReplyMessage(message, + Message.ERROR(client2.getClientId(), new Exception("entry no more here")) + ); + return false; + } + return true; + } + }); + + client3.setInternalClientListener(new InternalClientListener() { + + @Override + public boolean messageReceived(Message message, Channel channel) { + if (message.type == Message.TYPE_FETCH_ENTRY) { + fetchesServedByClient3.incrementAndGet(); + } + return true; + } + }); + + assertNull(client1.get("foo")); + assertNull(client2.get("foo")); + assertNull(client3.get("foo")); + + client2.put("foo", data, 0); + client3.put("foo", data, 0); + assertNull(client1.get("foo")); + assertNotNull(client2.get("foo")); + assertNotNull(client3.get("foo")); + + EntryHandle entry = client1.fetch("foo"); + assertNull(entry); + + assertEquals(0, fetchesServedByClient1.get()); + assertEquals(1, fetchesServedByClient2.get()); + assertEquals(0, fetchesServedByClient3.get()); + } + } + } + } diff --git a/blazingcache-core/src/test/java/blazingcache/client/InvalidateTest.java b/blazingcache-core/src/test/java/blazingcache/client/InvalidateTest.java new file mode 100644 index 0000000..a45766c --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/InvalidateTest.java @@ -0,0 +1,99 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import blazingcache.client.impl.InternalClientListener; +import blazingcache.network.Message; +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyCacheServerLocator; +import blazingcache.server.CacheServer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author dennis.mercuriali + */ +public class InvalidateTest { + + @Test + public void simpleInvalidate() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(120000); + cacheServer.start(); + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData))) { + client1.start(); + client2.start(); + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + + CountDownLatch invalidateDone_latch = new CountDownLatch(1); + + client2.setInternalClientListener(new InternalClientListener() { + @Override + public void onInvalidateResponse(String key, Message response) { + if (response.type == Message.TYPE_ACK) { + invalidateDone_latch.countDown(); + } + } + }); + + client1.load("entry", data, -1); + client2.load("entry", data, -1); + assertArrayEquals(client1.get("entry").getSerializedData(), data); + assertArrayEquals(client2.get("entry").getSerializedData(), data); + + Thread invalidate = new Thread(() -> { + try { + client2.invalidate("entry"); + } catch (InterruptedException exc) { + } + }); + + invalidate.start(); + invalidate.join(); + + assertEquals(0L, invalidateDone_latch.getCount()); + + System.out.println("key_client1:" + cacheServer.getCacheStatus().getKeysForClient(client1.getClientId())); + System.out.println("key_client2:" + cacheServer.getCacheStatus().getKeysForClient(client2.getClientId())); + + assertTrue(cacheServer.getCacheStatus().getKeysForClient(client1.getClientId()).isEmpty()); + assertTrue(cacheServer.getCacheStatus().getKeysForClient(client2.getClientId()).isEmpty()); + + assertNull(client1.get("entry")); + assertNull(client2.get("entry")); + } + } + } +} diff --git a/blazingcache-core/src/test/java/blazingcache/client/LoadConcurrencyTest.java b/blazingcache-core/src/test/java/blazingcache/client/LoadConcurrencyTest.java new file mode 100644 index 0000000..c9fdfb9 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/LoadConcurrencyTest.java @@ -0,0 +1,236 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import blazingcache.client.impl.InternalClientListener; +import blazingcache.network.Message; +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyCacheServerLocator; +import blazingcache.server.CacheServer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author dennis.mercuriali + */ +public class LoadConcurrencyTest { + + @Test + public void loadInvalidateConflict() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(120000); + cacheServer.start(); + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData))) { + client1.start(); + client2.start(); + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + + CountDownLatch loadDoneForServer_latch = new CountDownLatch(1); + CountDownLatch invalidateFinished_latch = new CountDownLatch(1); + AtomicInteger invalidateCount = new AtomicInteger(0); + + client1.setInternalClientListener(new InternalClientListener() { + @Override + public void onLoadResponse(String key, Message response) { + if (response.type == Message.TYPE_ACK) { + loadDoneForServer_latch.countDown(); + try { + assertTrue(invalidateFinished_latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + + @Override + public void onRequestSent(Message message) { + if (message.type == Message.TYPE_INVALIDATE) { + invalidateCount.incrementAndGet(); + } + } + }); + + client2.setInternalClientListener(new InternalClientListener() { + + @Override + public void onRequestSent(Message message) { + if (message.type == Message.TYPE_INVALIDATE) { + try { + assertTrue(loadDoneForServer_latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + + @Override + public void onInvalidateResponse(String key, Message response) { + if (response.type == Message.TYPE_ACK) { + invalidateFinished_latch.countDown(); + } + } + }); + + client2.load("entry", "test".getBytes(StandardCharsets.UTF_8), -1); + + Thread load = new Thread(() -> { + try { + client1.load("entry", "test".getBytes(StandardCharsets.UTF_8), -1); + } catch (InterruptedException | CacheException err) { + } + }); + + Thread invalidation = new Thread(() -> { + try { + client2.invalidate("entry"); + } catch (InterruptedException err) { + } + }); + + load.start(); + invalidation.start(); + + invalidation.join(); + load.join(); + + assertEquals(0L, loadDoneForServer_latch.getCount()); + assertEquals(0L, invalidateFinished_latch.getCount()); + + System.out.println("key_client1:" + cacheServer.getCacheStatus().getKeysForClient(client1.getClientId())); + System.out.println("key_client2:" + cacheServer.getCacheStatus().getKeysForClient(client2.getClientId())); + + assertNull(client1.get("entry")); + assertNull(client2.get("entry")); + Assert.assertEquals(0, invalidateCount.get()); + } + } + + } + + @Test + public void loadPutConflict() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(120000); + cacheServer.start(); + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData))) { + client1.start(); + client2.start(); + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + + CountDownLatch loadDoneForServer_latch = new CountDownLatch(1); + CountDownLatch putFinished_latch = new CountDownLatch(1); + AtomicInteger invalidateCount = new AtomicInteger(0); + + client1.setInternalClientListener(new InternalClientListener() { + @Override + public void onLoadResponse(String key, Message response) { + if (response.type == Message.TYPE_ACK) { + loadDoneForServer_latch.countDown(); + try { + assertTrue(putFinished_latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + + @Override + public void onRequestSent(Message message) { + if (message.type == Message.TYPE_INVALIDATE) { + invalidateCount.incrementAndGet(); + } + } + }); + + client2.setInternalClientListener(new InternalClientListener() { + + @Override + public void onRequestSent(Message message) { + if (message.type == Message.TYPE_PUT_ENTRY) { + try { + assertTrue(loadDoneForServer_latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + + @Override + public void onPutResponse(String key, Message response) { + if (response.type == Message.TYPE_ACK) { + putFinished_latch.countDown(); + } + } + }); + + client2.load("entry", "test".getBytes(StandardCharsets.UTF_8), -1); + + Thread load = new Thread(() -> { + try { + client1.load("entry", "test".getBytes(StandardCharsets.UTF_8), -1); + } catch (InterruptedException | CacheException err) { + } + }); + + Thread put = new Thread(() -> { + try { + client2.put("entry", "test2".getBytes(StandardCharsets.UTF_8), -1); + } catch (InterruptedException | CacheException err) { + } + }); + + load.start(); + put.start(); + + put.join(); + load.join(); + + assertEquals(0L, loadDoneForServer_latch.getCount()); + assertEquals(0L, putFinished_latch.getCount()); + + System.out.println("key_client1:" + cacheServer.getCacheStatus().getKeysForClient(client1.getClientId())); + System.out.println("key_client2:" + cacheServer.getCacheStatus().getKeysForClient(client2.getClientId())); + + assertNull(client1.get("entry")); + assertNull(client2.get("entry")); + Assert.assertEquals(1, invalidateCount.get()); + } + } + } +} diff --git a/blazingcache-core/src/test/java/blazingcache/client/LockOnLostFetchMessageAndSlowClientTest.java b/blazingcache-core/src/test/java/blazingcache/client/LockOnLostFetchMessageAndSlowClientTest.java index 8118209..2e2813c 100644 --- a/blazingcache-core/src/test/java/blazingcache/client/LockOnLostFetchMessageAndSlowClientTest.java +++ b/blazingcache-core/src/test/java/blazingcache/client/LockOnLostFetchMessageAndSlowClientTest.java @@ -19,6 +19,10 @@ */ package blazingcache.client; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import blazingcache.client.impl.InternalClientListener; import blazingcache.network.Channel; import blazingcache.network.Message; @@ -29,10 +33,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; import org.junit.Test; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Test for slow cclients an fetches @@ -50,8 +51,8 @@ public void basicTest() throws Exception { cacheServer.setClientFetchTimeout(1000); cacheServer.start(); try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); - CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)); - CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) { + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) { client1.start(); client2.start(); client3.start(); @@ -131,6 +132,199 @@ public void run() { } } + } + + @Test + public void slowClientCausesOtherRequestsToWait() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(20000); + cacheServer.start(); + try (CacheClient slowClient = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) { + slowClient.start(); + client2.start(); + client3.start(); + assertTrue(slowClient.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + assertTrue(client3.waitForConnection(10000)); + + // client1 will respond after some time + slowClient.setInternalClientListener(new InternalClientListener() { + @Override + public boolean messageReceived(Message message, Channel channel) { + if (message.type == Message.TYPE_FETCH_ENTRY) { + RawString key = RawString.of(message.parameters.get("key")); + if (key.toString().equals("slow-fetch")) { + try { + Thread.sleep(10_000); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + return true; + } + }); + + slowClient.put("slow-fetch", data, 0); + + CountDownLatch latch_before_2 = new CountDownLatch(1); + CountDownLatch latch_2 = new CountDownLatch(1); + Thread thread_2 = new Thread(new Runnable() { + @Override + public void run() { + try { + latch_before_2.countDown(); + EntryHandle remoteLoad = client2.fetch("slow-fetch"); + assertNotNull(remoteLoad); + latch_2.countDown(); + } catch (Throwable t) { + t.printStackTrace(); + fail(t + ""); + } + } + }); + thread_2.start(); + + // wait to enter the wait + assertTrue(latch_before_2.await(2, TimeUnit.SECONDS)); + + // a new client issues the fetch, it MUST wait on the lock on the 'slow-fetch' key + CountDownLatch latch_before_3 = new CountDownLatch(1); + CountDownLatch latch_3 = new CountDownLatch(1); + Thread thread_3 = new Thread(new Runnable() { + @Override + public void run() { + try { + latch_before_3.countDown(); + EntryHandle remoteLoad = client3.fetch("slow-fetch"); + assertNotNull(remoteLoad); + latch_3.countDown(); + } catch (Throwable t) { + t.printStackTrace(); + fail(t + ""); + } + } + }); + thread_3.start(); + + // wait to enter the wait + assertTrue(latch_before_3.await(10, TimeUnit.SECONDS)); + + assertFalse(latch_2.await(2, TimeUnit.SECONDS)); + + // wait to exit the wait + assertTrue(latch_2.await(10, TimeUnit.SECONDS)); + assertTrue(latch_3.await(20, TimeUnit.SECONDS)); + + assertTrue(cacheServer.getLocksManager().getLockedKeys().isEmpty()); + + // clean up test + thread_2.join(); + thread_3.join(); + } + } + } + + @Test + public void testInvalidateAwaitsLoad() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setClientFetchTimeout(20000); + cacheServer.start(); + try (CacheClient slowClient = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) { + slowClient.start(); + client2.start(); + client3.start(); + assertTrue(slowClient.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + assertTrue(client3.waitForConnection(10000)); + + // client1 will respond after some time + slowClient.setInternalClientListener(new InternalClientListener() { + @Override + public boolean messageReceived(Message message, Channel channel) { + if (message.type == Message.TYPE_FETCH_ENTRY) { + RawString key = RawString.of(message.parameters.get("key")); + if (key.toString().equals("slow-fetch")) { + try { + Thread.sleep(10_000); + } catch (InterruptedException exc) { + throw new RuntimeException(exc); + } + } + } + return true; + } + }); + + slowClient.put("slow-fetch", data, 0); + + CountDownLatch latch_before_2 = new CountDownLatch(1); + CountDownLatch latch_2 = new CountDownLatch(1); + Thread thread_2 = new Thread(new Runnable() { + @Override + public void run() { + try { + latch_before_2.countDown(); + EntryHandle remoteLoad = client2.fetch("slow-fetch"); + assertNotNull(remoteLoad); + latch_2.countDown(); + } catch (Throwable t) { + t.printStackTrace(); + fail(t + ""); + } + } + }); + thread_2.start(); + + // wait to enter the wait + assertTrue(latch_before_2.await(2, TimeUnit.SECONDS)); + + // a new client issues the fetch, it MUST wait on the lock on the 'slow-fetch' key + CountDownLatch latch_before_3 = new CountDownLatch(1); + CountDownLatch latch_3 = new CountDownLatch(1); + Thread thread_3 = new Thread(new Runnable() { + @Override + public void run() { + try { + latch_before_3.countDown(); + EntryHandle remoteLoad = client3.fetch("slow-fetch"); + assertNotNull(remoteLoad); + latch_3.countDown(); + } catch (Throwable t) { + t.printStackTrace(); + fail(t + ""); + } + } + }); + thread_3.start(); + + // wait to enter the wait + assertTrue(latch_before_3.await(10, TimeUnit.SECONDS)); + + assertFalse(latch_2.await(2, TimeUnit.SECONDS)); + + // wait to exit the wait + assertTrue(latch_2.await(10, TimeUnit.SECONDS)); + assertTrue(latch_3.await(20, TimeUnit.SECONDS)); + + assertTrue(cacheServer.getLocksManager().getLockedKeys().isEmpty()); + + // clean up test + thread_2.join(); + thread_3.join(); + } + + } } }