From 71b03162c568dcc055ef1c2d9c860a0173b6f581 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 23 Sep 2024 14:29:37 +0200 Subject: [PATCH] [proxima-direct-io-cassandra] reinitialize cluster after failed session creation --- .../io/cassandra/CassandraDBAccessor.java | 44 +++++++++--- .../io/cassandra/CassandraDBAccessorTest.java | 69 ++++++++++++++----- 2 files changed, 88 insertions(+), 25 deletions(-) diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java index cbc9aff00..721027ea4 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java @@ -31,6 +31,7 @@ import cz.o2.proxima.core.storage.AbstractStorage.SerializableAbstractStorage; import cz.o2.proxima.core.storage.Partition; import cz.o2.proxima.core.util.Classpath; +import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.direct.core.AttributeWriterBase; import cz.o2.proxima.direct.core.Context; import cz.o2.proxima.direct.core.DataAccessor; @@ -49,6 +50,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -232,18 +234,26 @@ private Cluster getCluster(URI uri) { } private Cluster getCluster(String authority, @Nullable String username) { - final String clusterCachedKey = (username != null ? username + "@" : "") + authority; + final String clusterCachedKey = computeClusterKey(authority, username); synchronized (CLUSTER_MAP) { - Cluster cluster = CLUSTER_MAP.get(clusterCachedKey); - if (cluster == null) { - cluster = createCluster(authority); - CLUSTER_MAP.put(clusterCachedKey, cluster); - } + Cluster cluster = + CLUSTER_MAP.computeIfAbsent(clusterCachedKey, k -> createCluster(authority)); return Objects.requireNonNull(cluster); } } + private void removeCluster(URI uri) { + synchronized (CLUSTER_MAP) { + Optional.ofNullable(CLUSTER_MAP.remove(computeClusterKey(uri.getAuthority(), this.username))) + .ifPresent(Cluster::close); + } + } + + private static String computeClusterKey(String authority, @Nullable String username) { + return (username != null ? username + "@" : "") + authority; + } + @VisibleForTesting Cluster createCluster(String authority) { log.info("Creating cluster for authority {} in accessor {}", authority, this); @@ -252,7 +262,6 @@ Cluster createCluster(String authority) { @VisibleForTesting Builder configureClusterBuilder(Builder builder, String authority) { - builder.addContactPointsWithPorts( Arrays.stream(authority.split(",")) .map(CassandraDBAccessor::getAddress) @@ -273,10 +282,27 @@ static InetSocketAddress getAddress(String p) { } Session ensureSession() { + return ensureSessionInternal(0); + } + + private Session ensureSessionInternal(int retry) { Cluster cluster = getCluster(getUri()); Preconditions.checkState(cluster != null); - /** Session we are connected to. */ - Session session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect); + /* Session we are connected to. */ + Session session; + try { + session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect); + } catch (Exception ex) { + if (retry < 3) { + ExceptionUtils.ignoringInterrupted( + () -> TimeUnit.MILLISECONDS.sleep((int) (Math.pow(2, retry) * 100))); + } else { + throw ex; + } + log.warn("Exception while creating session from cluster. Retry {}.", retry, ex); + removeCluster(getUri()); + return ensureSessionInternal(retry + 1); + } if (session.isClosed()) { synchronized (this) { session = CLUSTER_SESSIONS.get(cluster); diff --git a/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessorTest.java b/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessorTest.java index 016ca0b04..018d34f9b 100644 --- a/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessorTest.java +++ b/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessorTest.java @@ -72,7 +72,7 @@ /** Test suite for {@link CassandraDBAccessor}. */ public class CassandraDBAccessorTest { - static final class TestDBAccessor extends CassandraDBAccessor { + static class TestDBAccessor extends CassandraDBAccessor { @Setter ResultSet res = new EmptyResultSet(); @@ -92,23 +92,23 @@ ResultSet execute(Statement statement) { Cluster createCluster(String authority) { Cluster ret = mock(Cluster.class); AtomicBoolean closed = new AtomicBoolean(false); - when(ret.connect()) - .thenAnswer( - ign -> { - Session session = mock(Session.class); - when(session.isClosed()).thenAnswer(invocationOnMock -> closed.get()); - doAnswer( - invocationOnMock -> { - closed.set(true); - return null; - }) - .when(session) - .close(); - closed.set(false); - return session; - }); + when(ret.connect()).thenAnswer(ign -> newSession(closed)); return ret; } + + Session newSession(AtomicBoolean closed) { + Session session = mock(Session.class); + when(session.isClosed()).thenAnswer(invocationOnMock -> closed.get()); + doAnswer( + invocationOnMock -> { + closed.set(true); + return null; + }) + .when(session) + .close(); + closed.set(false); + return session; + } } public static final class TestCqlFactory extends DefaultCqlFactory { @@ -302,6 +302,43 @@ public void testWriteSuccess() { assertTrue(CassandraDBAccessor.getCLUSTER_MAP().isEmpty()); } + @Test + public void testClusterReconnect() { + entity = EntityDescriptor.newBuilder().setName("dummy").build(); + + CassandraDBAccessor accessor = + new TestDBAccessor( + entity, + URI.create("cassandra://host:9042/table/?primary=data"), + getCfg(TestCqlFactory.class)) { + + int retries = 0; + + @Override + Session newSession(AtomicBoolean closed) { + if (retries++ < 3) { + throw new IllegalStateException("Invalid cluster."); + } + return super.newSession(closed); + } + }; + try (CassandraWriter writer = accessor.newWriter()) { + AtomicBoolean success = new AtomicBoolean(false); + writer.write( + StreamElement.upsert( + entity, + attr, + UUID.randomUUID().toString(), + "key", + attr.getName(), + System.currentTimeMillis(), + new byte[0]), + (status, exc) -> success.set(status)); + assertTrue(success.get()); + } + assertTrue(CassandraDBAccessor.getCLUSTER_MAP().isEmpty()); + } + /** Test failed write. */ @Test public void testWriteFailed() {