Skip to content

Commit

Permalink
change close end and prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
shukai committed Jan 21, 2025
1 parent 690b73c commit 976049b
Showing 1 changed file with 63 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@

/**
* Connection proxy for XA mode.
*
*/
public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);

private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance()
.getInt(XA_BRANCH_EXECUTION_TIMEOUT, DefaultValues.DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT);
private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt(XA_BRANCH_EXECUTION_TIMEOUT,
DefaultValues.DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT);

private volatile boolean currentAutoCommitStatus = true;

Expand All @@ -65,8 +66,7 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold

private volatile Long prepareTime = null;

private static final Integer TIMEOUT = Math.max(BRANCH_EXECUTION_TIMEOUT,
DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
private static final Integer TIMEOUT = Math.max(BRANCH_EXECUTION_TIMEOUT, DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);

private boolean shouldBeHeld = false;

Expand All @@ -76,14 +76,12 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold
* Constructor of Connection Proxy for XA mode.
*
* @param originalConnection Normal Connection from the original DataSource.
* @param xaConnection XA Connection based on physical connection of the
* normal Connection above.
* @param resource The corresponding Resource(DataSource proxy) from
* which the connections was created.
* @param xid Seata global transaction xid.
* @param xaConnection XA Connection based on physical connection of the normal Connection above.
* @param resource The corresponding Resource(DataSource proxy) from which the connections was created.
* @param xid Seata global transaction xid.
*/
public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource,
String xid) {
String xid) {
super(originalConnection, xaConnection, resource, xid);
this.shouldBeHeld = resource.isShouldBeHeld();
}
Expand Down Expand Up @@ -120,18 +118,15 @@ private void releaseIfNecessary() {

private void xaEnd(XAXid xaXid, int flags) throws XAException {
if (!xaEnded) {
xaEnded = true;
}
if (xaActive) {
xaResource.end(xaXid, flags);
xaEnded = true;
}
}

/**
* XA commit
*
* @param xid global transaction xid
* @param branchId transaction branch id
* @param xid global transaction xid
* @param branchId transaction branch id
* @param applicationData application data
* @throws SQLException SQLException
*/
Expand All @@ -145,9 +140,8 @@ public void xaCommit(String xid, long branchId, String applicationData) throws X

/**
* XA rollback
*
* @param xid global transaction xid
* @param branchId transaction branch id
* @param xid global transaction xid
* @param branchId transaction branch id
* @param applicationData application data
*/
public void xaRollback(String xid, long branchId, String applicationData) throws XAException {
Expand All @@ -163,7 +157,6 @@ public void xaRollback(String xid, long branchId, String applicationData) throws

/**
* XA rollback
*
* @param xaXid xaXid
* @throws XAException XAException
*/
Expand All @@ -178,51 +171,41 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
return;
}
if (isReadOnly()) {
// If it is a read-only transaction, do nothing
//If it is a read-only transaction, do nothing
currentAutoCommitStatus = autoCommit;
return;
}
if (autoCommit) {
// According to JDBC spec:
// If this method is called during a transaction and the
// auto-commit mode is changed, the transaction is committed.
if (xaActive && !xaEnded) {
if (xaActive) {
commit();
}
} else {
int flags;
if (this.xaBranchXid != null && currentAutoCommitStatus) {
flags = XAResource.TMJOIN;
} else {
if (xaActive) {
throw new SQLException(
"should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
flags = SeataXAResource.ORATRANSLOOSE;
} else {
flags = XAResource.TMNOFLAGS;
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(),
null, xid, null, null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException(
"failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(),
te);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
return;
}
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
// Keep the Connection if necessary
keepIfNecessary();
try {
start(flags);
start();
} catch (XAException e) {
cleanXABranchContext();
throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
Expand Down Expand Up @@ -250,12 +233,6 @@ public void commit() throws SQLException {
if (!xaActive || this.xaBranchXid == null) {
throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
}
try {
end(XAResource.TMSUCCESS);
} catch (XAException e) {
throw new SQLException("Failed to end(TMSUCCESS) xa branch on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + e.getMessage(), e);
}
}
}

Expand All @@ -279,22 +256,25 @@ public void rollback() throws SQLException {
LOGGER.info("{} was rollbacked", xaBranchXid);
} catch (XAException xe) {
throw new SQLException("Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + xe.getMessage(), xe);
+ " since " + xe.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}

private void start(int flags) throws XAException, SQLException {
private void start() throws XAException, SQLException {
try (ResourceLock ignored = resourceLock.obtain()) {
// 3. XA Start
xaResource.start(this.xaBranchXid, flags);
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
} else {
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
}

try {
termination();
} catch (SQLException e) {
// the framework layer does not actively call ROLLBACK when setAutoCommit throws
// an SQL exception
// the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception
xaResource.end(this.xaBranchXid, XAResource.TMFAIL);
xaRollback(xaBranchXid);
// Branch Report to TC: Failed
Expand Down Expand Up @@ -330,33 +310,40 @@ private void checkTimeout(Long now) throws XAException {
public void close() throws SQLException {
try (ResourceLock ignored = resourceLock.obtain()) {
try {
if (xaEnded) {
if (xaActive && this.xaBranchXid != null) {
// XA End: Success
try {
end(XAResource.TMSUCCESS);
} catch (SQLException sqle) {
// Rollback immediately before the XA Branch Context is deleted.
String xaBranchXid = this.xaBranchXid.toString();
rollback();
throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
}
long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
int prepare = xaResource.prepare(xaBranchXid);
// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and
// MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only
// feedback.
// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only feedback.
// Therefore, the database type check can be eliminated here.
if (prepare == XAResource.XA_RDONLY) {
// Branch Report to TC: RDONLY
reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
}
xaEnded = false;
}
} catch (XAException xe) {
// Branch Report to TC: Failed
reportStatusToTC(BranchStatus.PhaseOne_Failed);
throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-"
+ xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
rollBacked = false;
if (isHeld() && shouldBeHeld()) {
// if kept by a keeper, just hold the connection.
} else {
cleanXABranchContext();
originalConnection.close();
}
}
Expand Down Expand Up @@ -410,8 +397,8 @@ private void termination(String xaBranchXid) throws SQLException {
BranchStatus branchStatus = BaseDataSourceResource.getBranchStatus(xaBranchXid);
if (branchStatus != null) {
releaseIfNecessary();
throw new SQLException("failed xa branch " + xid + " the global transaction has finish, branch status: "
+ branchStatus.getCode());
throw new SQLException("failed xa branch " + xid
+ " the global transaction has finish, branch status: " + branchStatus.getCode());
}
}

Expand All @@ -422,16 +409,16 @@ private void termination(String xaBranchXid) throws SQLException {
*/
private void reportStatusToTC(BranchStatus status) {
try {
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), status, null);
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
status, null);
} catch (TransactionException te) {
LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}", status, xid, xaBranchXid.getBranchId(),
te.getCode(), te.getMessage());
LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}",
status, xid, xaBranchXid.getBranchId(), te.getCode(), te.getMessage());
}
}

/**
* Get the lock of the current connection
*
* @return the RESOURCE_LOCK
*/
public ResourceLock getResourceLock() {
Expand Down

0 comments on commit 976049b

Please sign in to comment.