Skip to content

Commit

Permalink
Merge pull request #2476 from yma96/master
Browse files Browse the repository at this point in the history
Reconnect Cassandra client and reinit session when NoHostAvailableException
  • Loading branch information
yma96 authored Oct 25, 2024
2 parents c958f0f + 9f4d960 commit 400160a
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.collect.Lists;
import org.commonjava.indy.action.IndyLifecycleException;
import org.commonjava.indy.action.StartupAction;
Expand Down Expand Up @@ -302,7 +304,7 @@ private void update( String ga, Set<String> set )
BoundStatement bound = preparedStoresIncrement.bind();
bound.setSet( 0, set );
bound.setString( 1, ga );
session.execute( bound );
executeSession( bound );
inMemoryCache.remove( ga ); // clear to force reloading
}

Expand All @@ -313,11 +315,11 @@ public void reduce( String ga, Set<String> set, boolean isAsync )
bound.setString( 1, ga );
if ( isAsync )
{
session.executeAsync( bound );
executeSession ( bound, true, ResultSetFuture.class );
}
else
{
session.execute( bound );
executeSession( bound );
}
inMemoryCache.remove( ga ); // clear to force reloading
}
Expand Down Expand Up @@ -393,7 +395,7 @@ public Set<String> getStoresContaining( String gaPath )
}
// query db
BoundStatement bound = preparedQueryByGA.bind( gaPath );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
if ( row != null )
{
Expand Down Expand Up @@ -426,4 +428,41 @@ public boolean matchAny( List<ArtifactStore> concreteStores )
}
return false;
}

private ResultSet executeSession ( BoundStatement bind )
{
return executeSession ( bind, false, ResultSet.class );
}

private <T> T executeSession ( BoundStatement bind, boolean isAsync, Class<T> type )
{
boolean exception = false;
T trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
cassandraClient.close();
cassandraClient.init();
this.init();
}
trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
cassandraClient.close();
cassandraClient.init();
this.init();
trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import org.commonjava.indy.conf.IndyConfiguration;
Expand Down Expand Up @@ -224,7 +225,7 @@ private Date calculateExpirationTime( Date scheduleTime, Long timeout)
public DtxSchedule querySchedule( String storeKey, String jobName )
{
BoundStatement bound = preparedSingleScheduleQuery.bind( storeKey, jobName );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );

Row row = resultSet.one();

Expand All @@ -240,7 +241,7 @@ public Collection<DtxExpiration> queryExpirations( Date date )
Collection<DtxExpiration> expirations = new ArrayList<>( );

BoundStatement bound = preparedExpiredQuery.bind( pid );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
expirations.add( toDtxExpiration( row ) );
} );
Expand All @@ -260,7 +261,7 @@ public void queryAndSetExpiredSchedule( Date date )
.equals( expiration.getScheduleUID() ) )
{
BoundStatement boundU = preparedExpiredUpdate.bind( schedule.getStoreKey(), schedule.getJobName() );
session.execute( boundU );
executeSession( boundU );

logger.debug( "Expired entry: {}", schedule );
eventDispatcher.fire( new ScheduleTriggerEvent( schedule.getJobType(), schedule.getPayload() ) );
Expand All @@ -273,7 +274,7 @@ public Collection<DtxSchedule> querySchedulesByJobType( String jobType )
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByTypeQuery.bind( jobType );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
schedules.add(toDtxSchedule(row));
} );
Expand All @@ -284,7 +285,7 @@ public Collection<DtxSchedule> querySchedulesByStoreKey( String storeKey )
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByStoreKeyQuery.bind( storeKey );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
schedules.add(toDtxSchedule(row));
} );
Expand All @@ -295,7 +296,7 @@ public Collection<DtxSchedule> querySchedules( String storeKey, String jobType,
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByStoreKeyAndTypeQuery.bind( storeKey, jobType );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
DtxSchedule schedule = toDtxSchedule( row );
if ( !expired && !schedule.getExpired() )
Expand Down Expand Up @@ -342,4 +343,35 @@ private DtxExpiration toDtxExpiration( Row row )
return null;
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.commonjava.indy.conf.IndyConfiguration;
import org.commonjava.o11yphant.metrics.annotation.Measure;
import org.commonjava.indy.model.core.StoreKey;
Expand Down Expand Up @@ -177,7 +178,7 @@ public void addMissing( final ConcreteResource resource )

BoundStatement bound = preparedInsert.bind( key.toString(), resource.getPath(), curDate, timeoutDate,
timeoutInSeconds );
session.execute( bound );
executeSession( bound );
inMemoryCache.put( resource, DUMB_CACHE_VALUE, timeoutInSeconds, TimeUnit.SECONDS );
}

Expand All @@ -191,7 +192,7 @@ public boolean isMissing( final ConcreteResource resource )
}
StoreKey key = getResourceKey( resource );
BoundStatement bound = preparedExistQuery.bind( key.toString(), resource.getPath() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
if ( row == null )
{
Expand All @@ -217,7 +218,7 @@ public void clearMissing( final Location location )
{
StoreKey key = ( (KeyedLocation) location ).getKey();
BoundStatement bound = preparedDeleteByStore.bind( key.toString() );
session.execute( bound );
executeSession( bound );
clearInMemoryCache( location );
}

Expand Down Expand Up @@ -246,7 +247,7 @@ public void clearMissing( final ConcreteResource resource )
{
StoreKey key = getResourceKey( resource );
BoundStatement bound = preparedDelete.bind( key.toString(), resource.getPath() );
session.execute( bound );
executeSession( bound );
inMemoryCache.remove( resource );
}

Expand All @@ -272,7 +273,7 @@ public Set<String> getMissing( final Location location )
logger.debug( "[NFC] getMissing for {}", location );
StoreKey key = ( (KeyedLocation) location ).getKey();
BoundStatement bound = preparedQueryByStore.bind( key.toString() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
int count = 0;
Set<String> matches = new HashSet<>();
for ( Row row : result )
Expand Down Expand Up @@ -315,7 +316,7 @@ public Set<String> getMissing( Location location, int pageIndex, int pageSize )
public long getSize( StoreKey storeKey )
{
BoundStatement bound = preparedCountByStore.bind( storeKey.toString() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.one().get( 0, Long.class );
}

Expand All @@ -331,4 +332,36 @@ private StoreKey getResourceKey( ConcreteResource resource )
KeyedLocation location = (KeyedLocation) resource.getLocation();
return location.getKey();
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
cassandraClient.close();
cassandraClient.init();
this.start();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
cassandraClient.close();
cassandraClient.init();
this.start();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public CassandraClient( CassandraConfig config )
}

@PostConstruct
private void init()
public void init()
{
if ( !config.isEnabled() )
{
Expand Down Expand Up @@ -116,18 +116,15 @@ public Session getSession( String keyspace )
} );
}

private volatile boolean closed;

public void close()
{
if ( !closed && cluster != null && sessions != null )
if ( cluster != null && sessions != null )
{
logger.info( "Close cassandra client" );
sessions.entrySet().forEach( e -> e.getValue().close() );
sessions.clear();
cluster.close();
cluster = null;
closed = true;
}
}

Expand Down

0 comments on commit 400160a

Please sign in to comment.