Skip to content

Commit

Permalink
Merge pull request #932: [proxima-direct-cassandra] reinitialize clus…
Browse files Browse the repository at this point in the history
…ter after failed session creation
  • Loading branch information
je-ik authored Sep 23, 2024
2 parents 2b7a371 + 71b0316 commit ae45afa
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import cz.o2.proxima.core.storage.AbstractStorage.SerializableAbstractStorage;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
Expand All @@ -49,6 +50,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -232,18 +234,26 @@ private Cluster getCluster(URI uri) {
}

private Cluster getCluster(String authority, @Nullable String username) {
final String clusterCachedKey = (username != null ? username + "@" : "") + authority;
final String clusterCachedKey = computeClusterKey(authority, username);

synchronized (CLUSTER_MAP) {
Cluster cluster = CLUSTER_MAP.get(clusterCachedKey);
if (cluster == null) {
cluster = createCluster(authority);
CLUSTER_MAP.put(clusterCachedKey, cluster);
}
Cluster cluster =
CLUSTER_MAP.computeIfAbsent(clusterCachedKey, k -> createCluster(authority));
return Objects.requireNonNull(cluster);
}
}

private void removeCluster(URI uri) {
synchronized (CLUSTER_MAP) {
Optional.ofNullable(CLUSTER_MAP.remove(computeClusterKey(uri.getAuthority(), this.username)))
.ifPresent(Cluster::close);
}
}

private static String computeClusterKey(String authority, @Nullable String username) {
return (username != null ? username + "@" : "") + authority;
}

@VisibleForTesting
Cluster createCluster(String authority) {
log.info("Creating cluster for authority {} in accessor {}", authority, this);
Expand All @@ -252,7 +262,6 @@ Cluster createCluster(String authority) {

@VisibleForTesting
Builder configureClusterBuilder(Builder builder, String authority) {

builder.addContactPointsWithPorts(
Arrays.stream(authority.split(","))
.map(CassandraDBAccessor::getAddress)
Expand All @@ -273,10 +282,27 @@ static InetSocketAddress getAddress(String p) {
}

Session ensureSession() {
return ensureSessionInternal(0);
}

private Session ensureSessionInternal(int retry) {
Cluster cluster = getCluster(getUri());
Preconditions.checkState(cluster != null);
/** Session we are connected to. */
Session session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect);
/* Session we are connected to. */
Session session;
try {
session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect);
} catch (Exception ex) {
if (retry < 3) {
ExceptionUtils.ignoringInterrupted(
() -> TimeUnit.MILLISECONDS.sleep((int) (Math.pow(2, retry) * 100)));
} else {
throw ex;
}
log.warn("Exception while creating session from cluster. Retry {}.", retry, ex);
removeCluster(getUri());
return ensureSessionInternal(retry + 1);
}
if (session.isClosed()) {
synchronized (this) {
session = CLUSTER_SESSIONS.get(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
/** Test suite for {@link CassandraDBAccessor}. */
public class CassandraDBAccessorTest {

static final class TestDBAccessor extends CassandraDBAccessor {
static class TestDBAccessor extends CassandraDBAccessor {

@Setter ResultSet res = new EmptyResultSet();

Expand All @@ -92,23 +92,23 @@ ResultSet execute(Statement statement) {
Cluster createCluster(String authority) {
Cluster ret = mock(Cluster.class);
AtomicBoolean closed = new AtomicBoolean(false);
when(ret.connect())
.thenAnswer(
ign -> {
Session session = mock(Session.class);
when(session.isClosed()).thenAnswer(invocationOnMock -> closed.get());
doAnswer(
invocationOnMock -> {
closed.set(true);
return null;
})
.when(session)
.close();
closed.set(false);
return session;
});
when(ret.connect()).thenAnswer(ign -> newSession(closed));
return ret;
}

Session newSession(AtomicBoolean closed) {
Session session = mock(Session.class);
when(session.isClosed()).thenAnswer(invocationOnMock -> closed.get());
doAnswer(
invocationOnMock -> {
closed.set(true);
return null;
})
.when(session)
.close();
closed.set(false);
return session;
}
}

public static final class TestCqlFactory extends DefaultCqlFactory {
Expand Down Expand Up @@ -302,6 +302,43 @@ public void testWriteSuccess() {
assertTrue(CassandraDBAccessor.getCLUSTER_MAP().isEmpty());
}

@Test
public void testClusterReconnect() {
entity = EntityDescriptor.newBuilder().setName("dummy").build();

CassandraDBAccessor accessor =
new TestDBAccessor(
entity,
URI.create("cassandra://host:9042/table/?primary=data"),
getCfg(TestCqlFactory.class)) {

int retries = 0;

@Override
Session newSession(AtomicBoolean closed) {
if (retries++ < 3) {
throw new IllegalStateException("Invalid cluster.");
}
return super.newSession(closed);
}
};
try (CassandraWriter writer = accessor.newWriter()) {
AtomicBoolean success = new AtomicBoolean(false);
writer.write(
StreamElement.upsert(
entity,
attr,
UUID.randomUUID().toString(),
"key",
attr.getName(),
System.currentTimeMillis(),
new byte[0]),
(status, exc) -> success.set(status));
assertTrue(success.get());
}
assertTrue(CassandraDBAccessor.getCLUSTER_MAP().isEmpty());
}

/** Test failed write. */
@Test
public void testWriteFailed() {
Expand Down

0 comments on commit ae45afa

Please sign in to comment.