Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of SQL database calls inside transaction contexts #719

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,21 @@ public static SQLDatasource retrieveDatasource(SQLDatasource.SQLDatasourceParams
return sqlDatasourceToBeReturned;
}

public static Connection getConnection(boolean isInTrx, TransactionResourceManager trxResourceManager,
BObject client, SQLDatasource datasource) throws SQLException {
public static Connection getConnection(boolean isInTrx, BObject client, SQLDatasource datasource,
TransactionLocalContext transactionLocalContext,
boolean trxManagerEnabled) throws SQLException {
Connection conn;
try {
if (!isInTrx) {
return datasource.getConnection();
}
String connectorId = (String) client.getNativeData(Constants.SQL_CONNECTOR_TRANSACTION_ID);
boolean isXAConnection = datasource.isXADataSource();
TransactionLocalContext transactionLocalContext = trxResourceManager.getCurrentTransactionContext();
String globalTxId = transactionLocalContext.getGlobalTransactionId();
String currentTxBlockId = transactionLocalContext.getCurrentTransactionBlockId();
BallerinaTransactionContext txContext = transactionLocalContext.getTransactionContext(connectorId);
if (txContext == null) {
if (isXAConnection && !trxResourceManager.getTransactionManagerEnabled()) {
if (isXAConnection && !trxManagerEnabled) {
XAConnection xaConn = datasource.getXAConnection();
XAResource xaResource = xaConn.getXAResource();
TransactionResourceManager.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -89,27 +90,24 @@ public static Object nativeCall(Environment env, BObject client, BObject paramSQ
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;

}

private static Object nativeCallExecutable(BObject client, BObject paramSQLString, BArray recordTypes,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithinTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -123,7 +121,8 @@ private static Object nativeCallExecutable(BObject client, BObject paramSQLStrin
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource, currentTrxContext,
trxManagerEnabled);
statement = connection.prepareCall(sqlQuery);

HashMap<Integer, Integer> outputParamTypes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -74,24 +75,24 @@ private ExecuteProcessor() {
public static Object nativeExecute(Environment env, BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, true,
trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, withinTrxBlock,
currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeExecuteExecutable(BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -105,7 +106,8 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down Expand Up @@ -154,25 +156,25 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
public static Object nativeBatchExecute(Environment env, BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
false, null);
balFuture.complete(resultStream);
});
} else {
return nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeBatchExecuteExecutable(BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand Down Expand Up @@ -202,7 +204,8 @@ private static Object nativeBatchExecuteExecutable(BObject client, BArray paramS
"commands. These has to be executed in different function calls");
}
}
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getBatchExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -76,31 +77,28 @@ private QueryProcessor() {
* @param resultParameterProcessor post-processor of the result
* @return result stream or error
*/
public static BStream nativeQuery(
Environment env, BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
public static BStream nativeQuery(Environment env, BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;
}

private static BStream nativeQueryExecutable(
BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
private static BStream nativeQueryExecutable(BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -115,7 +113,8 @@ private static BStream nativeQueryExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down Expand Up @@ -154,18 +153,18 @@ public static Object nativeQueryRow(Environment env, BObject client, BObject par
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext,
trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

Expand All @@ -174,7 +173,7 @@ private static Object nativeQueryRowExecutable(
BTypedesc ballerinaType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
TransactionLocalContext currentTrxContext, boolean trxManagerEnabled) {
Type describingType = TypeUtils.getReferredType(ballerinaType.getDescribingType());
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
Expand All @@ -189,7 +188,8 @@ private static Object nativeQueryRowExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down
Loading