diff --git a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java index 51b69359..8787094f 100644 --- a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java @@ -17,7 +17,10 @@ import liquibase.statement.core.LockDatabaseChangeLogStatement; import liquibase.statement.core.RawSqlStatement; import liquibase.statement.core.UnlockDatabaseChangeLogStatement; +import liquibase.util.NetUtil; +import java.net.SocketException; +import java.net.UnknownHostException; import java.sql.SQLException; import java.sql.Statement; @@ -51,17 +54,16 @@ public boolean acquireLock() throws LockException { super.init(); - boolean locked = executor.queryForInt( - new RawSqlStatement("SELECT COUNT(*) FROM " + getChangeLogLockTableName() + " where " + - "locked = TRUE ALLOW FILTERING") - ) > 0; - - if (locked) { + if (isLocked(executor)) { return false; } else { executor.comment("Lock Database"); int rowsUpdated = executor.update(new LockDatabaseChangeLogStatement()); + if (rowsUpdated == -1 && !isLockedByCurrentInstance(executor)) { + // another node was faster + return false; + } if ((rowsUpdated == -1) && (database instanceof MSSQLDatabase)) { Scope.getCurrentScope().getLog(this.getClass()).info("Database did not return a proper row count (Might have NOCOUNT enabled)"); @@ -178,8 +180,27 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr return isDatabaseChangeLogLockTableInitialized; } + private boolean isLocked(Executor executor) throws DatabaseException { + return executor.queryForInt( + new RawSqlStatement("SELECT COUNT(*) FROM " + database.getDefaultCatalogName() + "." + getChangeLogLockTableName() + " where " + + "locked = TRUE ALLOW FILTERING") + ) > 0; + } + + private boolean isLockedByCurrentInstance(Executor executor) throws DatabaseException { + try { + final String lockedBy = NetUtil.getLocalHostName() + " (" + NetUtil.getLocalHostAddress() + ")"; + return executor.queryForInt( + new RawSqlStatement("SELECT COUNT(*) FROM " + database.getDefaultCatalogName() + "." + getChangeLogLockTableName() + " where " + + "LOCKED = TRUE AND LOCKEDBY = '" + lockedBy + "' ALLOW FILTERING") + ) > 0; + } catch (SocketException | UnknownHostException e) { + throw new UnexpectedLiquibaseException(e); + } + } + private String getChangeLogLockTableName() { - if(database.getLiquibaseCatalogName() != null) { + if (database.getLiquibaseCatalogName() != null) { return database.getLiquibaseCatalogName() + "." + database.getDatabaseChangeLogLockTableName(); } else { return database.getDatabaseChangeLogLockTableName(); diff --git a/src/main/java/liquibase/ext/cassandra/sqlgenerator/LockDatabaseChangeLogGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/sqlgenerator/LockDatabaseChangeLogGeneratorCassandra.java index 5c4859f2..89d69635 100644 --- a/src/main/java/liquibase/ext/cassandra/sqlgenerator/LockDatabaseChangeLogGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/sqlgenerator/LockDatabaseChangeLogGeneratorCassandra.java @@ -25,7 +25,7 @@ public boolean supports(LockDatabaseChangeLogStatement statement, Database datab public Sql[] generateSql(LockDatabaseChangeLogStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) { RawSqlStatement updateStatement = new RawSqlStatement("UPDATE " + database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName()) + - " SET LOCKED = TRUE, LOCKEDBY = '" + hostname + " (" + hostaddress + ")" + "', LOCKGRANTED = " + System.currentTimeMillis() + " WHERE ID = 1"); + " SET LOCKED = TRUE, LOCKEDBY = '" + hostname + " (" + hostaddress + ")" + "', LOCKGRANTED = " + System.currentTimeMillis() + " WHERE ID = 1 IF LOCKED = FALSE"); return SqlGeneratorFactory.getInstance().generateSql(updateStatement, database); }