Skip to content

Commit

Permalink
Invalidating client is not removed from the list of clients holding t…
Browse files Browse the repository at this point in the history
…he invalidated key (#192)

* simplified client-for-fetch choosing and added some concurrency tests

* remove invalidating client from clients with key

---------

Co-authored-by: dennis.mercuriali <[email protected]>
  • Loading branch information
dmercuriali and dennis.mercuriali authored Jan 8, 2024
1 parent 3f4adcf commit f744b3d
Show file tree
Hide file tree
Showing 7 changed files with 684 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,11 @@ private void invalidate(RawString _key, KeyLock lock) throws InterruptedExceptio
request.setParameter("lockId", lock.getLockId());
}
Message response = _channel.sendMessageWithReply(request, invalidateTimeout);

if (internalClientListener != null) {
internalClientListener.onInvalidateResponse(_key.toString(), response);
}

if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST, "invalidate {0}, -> {1}", new Object[]{_key, response});
}
Expand Down Expand Up @@ -1426,6 +1431,11 @@ private boolean load(String key, byte[] data, Object reference, long expireTime,
request.setParameter("lockId", lock.getLockId());
}
Message response = _chanel.sendMessageWithReply(request, invalidateTimeout);

if (internalClientListener != null) {
internalClientListener.onLoadResponse(key, response);
}

if (response.type != Message.TYPE_ACK) {
throw new CacheException("error while loading key " + key + " (" + response + ")");
}
Expand Down Expand Up @@ -1473,6 +1483,11 @@ private boolean put(RawString _key, byte[] data, Object reference, long expireTi
request.setParameter("lockId", lock.getLockId());
}
Message response = _chanel.sendMessageWithReply(request, invalidateTimeout);

if (internalClientListener != null) {
internalClientListener.onPutResponse(_key.toString(), response);
}

if (response.type != Message.TYPE_ACK) {
throw new CacheException("error while putting key " + _key + " (" + response + ")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public default boolean messageReceived(Message message, Channel channel) {
public default void onFetchResponse(String key, Message message) {
}

public default void onLoadResponse(String key, Message response) {
}

public default void onInvalidateResponse(String key, Message response) {
}

public default void onPutResponse(String key, Message response) {
}

public default void onRequestSent(Message message) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@
*/
package blazingcache.server;

import blazingcache.management.JMXUtils;
import blazingcache.network.Message;
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyChannelAcceptor;
import blazingcache.server.management.BlazingCacheServerStatusMXBean;
import blazingcache.server.management.CacheServerStatusMXBean;
import blazingcache.utils.RawString;
import blazingcache.zookeeper.LeaderShipChangeListener;
import blazingcache.zookeeper.ZKClusterManager;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,24 +39,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.zookeeper.ZooKeeper;

import blazingcache.management.JMXUtils;
import blazingcache.network.Message;
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyChannelAcceptor;
import blazingcache.server.management.BlazingCacheServerStatusMXBean;
import blazingcache.server.management.CacheServerStatusMXBean;
import blazingcache.utils.RawString;
import blazingcache.zookeeper.LeaderShipChangeListener;
import blazingcache.zookeeper.ZKClusterManager;

/**
* The CacheServer core.
*
Expand Down Expand Up @@ -359,6 +358,7 @@ public void invalidateKey(RawString key, String sourceClientId, String clientPro
SimpleCallback<RawString> finishAndReleaseLock = new SimpleCallback<RawString>() {
@Override
public void onResult(RawString result, Throwable error) {
cacheStatus.removeKeyForClient(key, sourceClientId);
locksManager.releaseWriteLockForKey(key, sourceClientId, lockID);
onFinish.onResult(result, error);
}
Expand Down Expand Up @@ -446,45 +446,47 @@ public void onResult(Message result, Throwable error) {
return;
}

List<CacheServerSideConnection> candidates = new ArrayList<>();
int maxPriority = 0;
List<CacheServerSideConnection> maxPriorityCandidates = new ArrayList<>();
for (String remoteClientId : clientsForKey) {
CacheServerSideConnection connection = acceptor.getActualConnectionFromClient(remoteClientId);
if (connection != null && connection.getFetchPriority() > 0) {
candidates.add(connection);
if (connection != null) {
int fetchPriority = connection.getFetchPriority();
if (fetchPriority == 0 || fetchPriority < maxPriority) {
continue;
}

if (fetchPriority > maxPriority) {
maxPriorityCandidates.clear();
maxPriority = fetchPriority;
}
maxPriorityCandidates.add(connection);
}
}
candidates.sort((a, b) -> {
int priorityComparison = b.getFetchPriority() - a.getFetchPriority();
if (priorityComparison == 0) {
return ThreadLocalRandom.current().nextInt();
}
return priorityComparison;
});

boolean foundOneGoodClientConnected = false;
for (CacheServerSideConnection connection : candidates) {
String remoteClientId = connection.getClientId();
if (!maxPriorityCandidates.isEmpty()) {
CacheServerSideConnection connection = maxPriorityCandidates.get(ThreadLocalRandom.current().nextInt(maxPriorityCandidates.size()));

String remoteClientId = connection.getClientId();
UnicastRequestStatus unicastRequestStatus = new UnicastRequestStatus(clientId, remoteClientId, "fetch " + key);
networkRequestsStatusMonitor.register(unicastRequestStatus);
connection.sendFetchKeyMessage(remoteClientId, key, new SimpleCallback<Message>() {

@Override
public void onResult(Message result, Throwable error) {
networkRequestsStatusMonitor.unregister(unicastRequestStatus);
LOGGER.log(Level.FINE, "client " + remoteClientId + " answer to fetch :" + result, error);
if (result != null && result.type == Message.TYPE_ACK) {
// da questo momento consideriamo che il client abbia la entry in memoria
// anche se di fatto potrebbe succedere che il messaggio di risposta non arrivi mai
long expiretime = (long) result.parameters.get("expiretime");
cacheStatus.registerKeyForClient(key, clientId, expiretime);
}
finishAndReleaseLock.onResult(result, error);

connection.sendFetchKeyMessage(remoteClientId, key, (result, error) -> {
networkRequestsStatusMonitor.unregister(unicastRequestStatus);
LOGGER.log(Level.FINE, "client " + remoteClientId + " answer to fetch :" + result, error);
if (result != null && result.type == Message.TYPE_ACK) {
// da questo momento consideriamo che il client abbia la entry in memoria
// anche se di fatto potrebbe succedere che il messaggio di risposta non arrivi mai
long expiretime = (long) result.parameters.get("expiretime");
cacheStatus.registerKeyForClient(key, clientId, expiretime);
}
finishAndReleaseLock.onResult(result, error);
});

foundOneGoodClientConnected = true;
break;
}

if (!foundOneGoodClientConnected) {
finishAndReleaseLock.onResult(Message.ERROR(clientId, new Exception("no connected client for key " + key)), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,20 @@
*/
package blazingcache.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import blazingcache.client.impl.InternalClientListener;
import blazingcache.network.Channel;
import blazingcache.network.Message;
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import org.junit.Test;

import static org.junit.Assert.assertTrue;

/**
* Test for slow cclients an fetches
*
Expand Down Expand Up @@ -274,4 +268,86 @@ public boolean messageReceived(Message message, Channel channel) {
}
}

@Test
public void testTryOnlyFirstClient() throws Exception {
byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);

ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null);
try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {
cacheServer.setClientFetchTimeout(1000);
cacheServer.start();
try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client3 = new CacheClient("theClient3", "ciao", new NettyCacheServerLocator(serverHostData));) {
client1.setFetchPriority(1);
client2.setFetchPriority(10);
client3.setFetchPriority(5);
client1.start();
client2.start();
client3.start();

assertTrue(client1.waitForConnection(10000));
assertTrue(client2.waitForConnection(10000));
assertTrue(client3.waitForConnection(10000));

AtomicInteger fetchesServedByClient1 = new AtomicInteger();
AtomicInteger fetchesServedByClient2 = new AtomicInteger();
AtomicInteger fetchesServedByClient3 = new AtomicInteger();

client1.setInternalClientListener(new InternalClientListener() {

@Override
public boolean messageReceived(Message message, Channel channel) {
if (message.type == Message.TYPE_FETCH_ENTRY) {
fetchesServedByClient1.incrementAndGet();
}
return true;
}
});

client2.setInternalClientListener(new InternalClientListener() {
@Override
public boolean messageReceived(Message message, Channel channel) {
if (message.type == Message.TYPE_FETCH_ENTRY) {
fetchesServedByClient2.incrementAndGet();
channel.sendReplyMessage(message,
Message.ERROR(client2.getClientId(), new Exception("entry no more here"))
);
return false;
}
return true;
}
});

client3.setInternalClientListener(new InternalClientListener() {

@Override
public boolean messageReceived(Message message, Channel channel) {
if (message.type == Message.TYPE_FETCH_ENTRY) {
fetchesServedByClient3.incrementAndGet();
}
return true;
}
});

assertNull(client1.get("foo"));
assertNull(client2.get("foo"));
assertNull(client3.get("foo"));

client2.put("foo", data, 0);
client3.put("foo", data, 0);
assertNull(client1.get("foo"));
assertNotNull(client2.get("foo"));
assertNotNull(client3.get("foo"));

EntryHandle entry = client1.fetch("foo");
assertNull(entry);

assertEquals(0, fetchesServedByClient1.get());
assertEquals(1, fetchesServedByClient2.get());
assertEquals(0, fetchesServedByClient3.get());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package blazingcache.client;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import blazingcache.client.impl.InternalClientListener;
import blazingcache.network.Message;
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

/**
* @author dennis.mercuriali
*/
public class InvalidateTest {

@Test
public void simpleInvalidate() throws Exception {
byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);

ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null);
try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {
cacheServer.setClientFetchTimeout(120000);
cacheServer.start();
try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData))) {
client1.start();
client2.start();

assertTrue(client1.waitForConnection(10000));
assertTrue(client2.waitForConnection(10000));

CountDownLatch invalidateDone_latch = new CountDownLatch(1);

client2.setInternalClientListener(new InternalClientListener() {
@Override
public void onInvalidateResponse(String key, Message response) {
if (response.type == Message.TYPE_ACK) {
invalidateDone_latch.countDown();
}
}
});

client1.load("entry", data, -1);
client2.load("entry", data, -1);
assertArrayEquals(client1.get("entry").getSerializedData(), data);
assertArrayEquals(client2.get("entry").getSerializedData(), data);

Thread invalidate = new Thread(() -> {
try {
client2.invalidate("entry");
} catch (InterruptedException exc) {
}
});

invalidate.start();
invalidate.join();

assertEquals(0L, invalidateDone_latch.getCount());

System.out.println("key_client1:" + cacheServer.getCacheStatus().getKeysForClient(client1.getClientId()));
System.out.println("key_client2:" + cacheServer.getCacheStatus().getKeysForClient(client2.getClientId()));

assertTrue(cacheServer.getCacheStatus().getKeysForClient(client1.getClientId()).isEmpty());
assertTrue(cacheServer.getCacheStatus().getKeysForClient(client2.getClientId()).isEmpty());

assertNull(client1.get("entry"));
assertNull(client2.get("entry"));
}
}
}
}
Loading

0 comments on commit f744b3d

Please sign in to comment.