diff --git a/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLDatasource.java b/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLDatasource.java index 6abd4d11..bb855660 100644 --- a/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLDatasource.java +++ b/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLDatasource.java @@ -158,8 +158,9 @@ public static SQLDatasource retrieveDatasource(SQLDatasource.SQLDatasourceParams return sqlDatasourceToBeReturned; } - public static Connection getConnection(boolean isInTrx, TransactionResourceManager trxResourceManager, - BObject client, SQLDatasource datasource) throws SQLException { + public static Connection getConnection(boolean isInTrx, BObject client, SQLDatasource datasource, + TransactionLocalContext transactionLocalContext, + boolean trxManagerEnabled) throws SQLException { Connection conn; try { if (!isInTrx) { @@ -167,12 +168,11 @@ public static Connection getConnection(boolean isInTrx, TransactionResourceManag } String connectorId = (String) client.getNativeData(Constants.SQL_CONNECTOR_TRANSACTION_ID); boolean isXAConnection = datasource.isXADataSource(); - TransactionLocalContext transactionLocalContext = trxResourceManager.getCurrentTransactionContext(); String globalTxId = transactionLocalContext.getGlobalTransactionId(); String currentTxBlockId = transactionLocalContext.getCurrentTransactionBlockId(); BallerinaTransactionContext txContext = transactionLocalContext.getTransactionContext(connectorId); if (txContext == null) { - if (isXAConnection && !trxResourceManager.getTransactionManagerEnabled()) { + if (isXAConnection && !trxManagerEnabled) { XAConnection xaConn = datasource.getXAConnection(); XAResource xaResource = xaConn.getXAResource(); TransactionResourceManager.getInstance() diff --git a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/CallProcessor.java b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/CallProcessor.java index a539fc9b..e7a49a88 100644 --- a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/CallProcessor.java +++ b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/CallProcessor.java @@ -30,6 +30,7 @@ import io.ballerina.runtime.api.values.BStream; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; +import io.ballerina.runtime.transactions.TransactionLocalContext; import io.ballerina.runtime.transactions.TransactionResourceManager; import io.ballerina.stdlib.sql.Constants; import io.ballerina.stdlib.sql.datasource.SQLDatasource; @@ -89,27 +90,24 @@ public static Object nativeCall(Environment env, BObject client, BObject paramSQ AbstractStatementParameterProcessor statementParameterProcessor, AbstractResultParameterProcessor resultParameterProcessor) { TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); - if (!Utils.isWithinTrxBlock(trxResourceManager)) { - Future balFuture = env.markAsync(); - SQL_EXECUTOR_SERVICE.execute(() -> { - Object resultStream = - nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor, - resultParameterProcessor, false, null); - balFuture.complete(resultStream); - }); - } else { - return nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor, - resultParameterProcessor, true, trxResourceManager); - } + boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager); + boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled(); + TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); + Future balFuture = env.markAsync(); + SQL_EXECUTOR_SERVICE.execute(() -> { + Object resultStream = + nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor, + resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled); + balFuture.complete(resultStream); + }); return null; - } private static Object nativeCallExecutable(BObject client, BObject paramSQLString, BArray recordTypes, AbstractStatementParameterProcessor statementParameterProcessor, AbstractResultParameterProcessor resultParameterProcessor, - boolean isWithinTrxBlock, - TransactionResourceManager trxResourceManager) { + boolean isWithinTrxBlock, TransactionLocalContext currentTrxContext, + boolean trxManagerEnabled) { Object dbClient = client.getNativeData(DATABASE_CLIENT); if (dbClient != null) { SQLDatasource sqlDatasource = (SQLDatasource) dbClient; @@ -123,7 +121,8 @@ private static Object nativeCallExecutable(BObject client, BObject paramSQLStrin String sqlQuery = null; try { sqlQuery = getSqlQuery(paramSQLString); - connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource); + connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource, currentTrxContext, + trxManagerEnabled); statement = connection.prepareCall(sqlQuery); HashMap outputParamTypes = new HashMap<>(); diff --git a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/ExecuteProcessor.java b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/ExecuteProcessor.java index a09aacc6..212b47eb 100644 --- a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/ExecuteProcessor.java +++ b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/ExecuteProcessor.java @@ -26,6 +26,7 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.runtime.transactions.TransactionLocalContext; import io.ballerina.runtime.transactions.TransactionResourceManager; import io.ballerina.stdlib.sql.Constants; import io.ballerina.stdlib.sql.datasource.SQLDatasource; @@ -74,24 +75,24 @@ private ExecuteProcessor() { public static Object nativeExecute(Environment env, BObject client, BObject paramSQLString, AbstractStatementParameterProcessor statementParameterProcessor) { TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); - if (!Utils.isWithinTrxBlock(trxResourceManager)) { - Future balFuture = env.markAsync(); - SQL_EXECUTOR_SERVICE.execute(() -> { - Object resultStream = - nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, false, null); - balFuture.complete(resultStream); - }); - } else { - return nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, true, - trxResourceManager); - } + boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager); + boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled(); + TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); + Future balFuture = env.markAsync(); + SQL_EXECUTOR_SERVICE.execute(() -> { + Object resultStream = + nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, withinTrxBlock, + currentTrxContext, trxManagerEnabled); + balFuture.complete(resultStream); + }); + return null; } private static Object nativeExecuteExecutable(BObject client, BObject paramSQLString, AbstractStatementParameterProcessor statementParameterProcessor, - boolean isWithInTrxBlock, - TransactionResourceManager trxResourceManager) { + boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext, + boolean trxManagerEnabled) { Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT); if (dbClient != null) { SQLDatasource sqlDatasource = (SQLDatasource) dbClient; @@ -105,7 +106,8 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt String sqlQuery = null; try { sqlQuery = getSqlQuery(paramSQLString); - connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource); + connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource, + currentTrxContext, trxManagerEnabled); if (sqlDatasource.getExecuteGKFlag()) { statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS); @@ -154,25 +156,25 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt public static Object nativeBatchExecute(Environment env, BObject client, BArray paramSQLStrings, AbstractStatementParameterProcessor statementParameterProcessor) { TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); - if (!Utils.isWithinTrxBlock(trxResourceManager)) { - Future balFuture = env.markAsync(); - SQL_EXECUTOR_SERVICE.execute(() -> { - Object resultStream = - nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor, - false, null); - balFuture.complete(resultStream); - }); - } else { - return nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor, - true, trxResourceManager); - } + boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager); + boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled(); + TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); + Future balFuture = env.markAsync(); + SQL_EXECUTOR_SERVICE.execute(() -> { + Object resultStream = + nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor, + withinTrxBlock, currentTrxContext, trxManagerEnabled); + balFuture.complete(resultStream); + }); + return null; } private static Object nativeBatchExecuteExecutable(BObject client, BArray paramSQLStrings, AbstractStatementParameterProcessor statementParameterProcessor, boolean isWithinTrxBlock, - TransactionResourceManager trxResourceManager) { + TransactionLocalContext currentTrxContext, + boolean trxManagerEnabled) { Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT); if (dbClient != null) { SQLDatasource sqlDatasource = (SQLDatasource) dbClient; @@ -202,7 +204,8 @@ private static Object nativeBatchExecuteExecutable(BObject client, BArray paramS "commands. These has to be executed in different function calls"); } } - connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource); + connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource, + currentTrxContext, trxManagerEnabled); if (sqlDatasource.getBatchExecuteGKFlag()) { statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS); diff --git a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/QueryProcessor.java b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/QueryProcessor.java index 441c943d..ff4524fd 100644 --- a/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/QueryProcessor.java +++ b/native/src/main/java/io/ballerina/stdlib/sql/nativeimpl/QueryProcessor.java @@ -34,6 +34,7 @@ import io.ballerina.runtime.api.values.BStream; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; +import io.ballerina.runtime.transactions.TransactionLocalContext; import io.ballerina.runtime.transactions.TransactionResourceManager; import io.ballerina.stdlib.sql.Constants; import io.ballerina.stdlib.sql.datasource.SQLDatasource; @@ -76,31 +77,28 @@ private QueryProcessor() { * @param resultParameterProcessor post-processor of the result * @return result stream or error */ - public static BStream nativeQuery( - Environment env, BObject client, BObject paramSQLString, Object recordType, - AbstractStatementParameterProcessor statementParameterProcessor, - AbstractResultParameterProcessor resultParameterProcessor) { + public static BStream nativeQuery(Environment env, BObject client, BObject paramSQLString, Object recordType, + AbstractStatementParameterProcessor statementParameterProcessor, + AbstractResultParameterProcessor resultParameterProcessor) { TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); - if (!Utils.isWithinTrxBlock(trxResourceManager)) { - Future balFuture = env.markAsync(); - SQL_EXECUTOR_SERVICE.execute(() -> { - BStream resultStream = - nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor, - resultParameterProcessor, false, null); - balFuture.complete(resultStream); - }); - } else { - return nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor, - resultParameterProcessor, true, trxResourceManager); - } + boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager); + boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled(); + TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); + Future balFuture = env.markAsync(); + SQL_EXECUTOR_SERVICE.execute(() -> { + BStream resultStream = + nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor, + resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled); + balFuture.complete(resultStream); + }); return null; } - private static BStream nativeQueryExecutable( - BObject client, BObject paramSQLString, Object recordType, - AbstractStatementParameterProcessor statementParameterProcessor, - AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock, - TransactionResourceManager trxResourceManager) { + private static BStream nativeQueryExecutable(BObject client, BObject paramSQLString, Object recordType, + AbstractStatementParameterProcessor statementParameterProcessor, + AbstractResultParameterProcessor resultParameterProcessor, + boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext, + boolean trxManagerEnabled) { Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT); if (dbClient != null) { SQLDatasource sqlDatasource = (SQLDatasource) dbClient; @@ -115,7 +113,8 @@ private static BStream nativeQueryExecutable( String sqlQuery = null; try { sqlQuery = Utils.getSqlQuery(paramSQLString); - connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource); + connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource, + currentTrxContext, trxManagerEnabled); statement = connection.prepareStatement(sqlQuery); statementParameterProcessor.setParams(connection, statement, paramSQLString); resultSet = statement.executeQuery(); @@ -154,18 +153,18 @@ public static Object nativeQueryRow(Environment env, BObject client, BObject par AbstractStatementParameterProcessor statementParameterProcessor, AbstractResultParameterProcessor resultParameterProcessor) { TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); - if (!Utils.isWithinTrxBlock(trxResourceManager)) { - Future balFuture = env.markAsync(); - SQL_EXECUTOR_SERVICE.execute(() -> { - Object resultStream = - nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor, - resultParameterProcessor, false, null); - balFuture.complete(resultStream); - }); - } else { - return nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor, - resultParameterProcessor, true, trxResourceManager); - } + boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager); + boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled(); + TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); + Future balFuture = env.markAsync(); + SQL_EXECUTOR_SERVICE.execute(() -> { + Object resultStream = + nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor, + resultParameterProcessor, withinTrxBlock, currentTrxContext, + trxManagerEnabled); + balFuture.complete(resultStream); + }); + return null; } @@ -174,7 +173,7 @@ private static Object nativeQueryRowExecutable( BTypedesc ballerinaType, AbstractStatementParameterProcessor statementParameterProcessor, AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock, - TransactionResourceManager trxResourceManager) { + TransactionLocalContext currentTrxContext, boolean trxManagerEnabled) { Type describingType = TypeUtils.getReferredType(ballerinaType.getDescribingType()); Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT); if (dbClient != null) { @@ -189,7 +188,8 @@ private static Object nativeQueryRowExecutable( String sqlQuery = null; try { sqlQuery = Utils.getSqlQuery(paramSQLString); - connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource); + connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource, + currentTrxContext, trxManagerEnabled); statement = connection.prepareStatement(sqlQuery); statementParameterProcessor.setParams(connection, statement, paramSQLString); resultSet = statement.executeQuery();