From 976049bd37d581887d16f3f1ea3a1df3ce4d6f75 Mon Sep 17 00:00:00 2001 From: shukai Date: Tue, 21 Jan 2025 18:46:20 +0800 Subject: [PATCH] change close end and prepare --- .../rm/datasource/xa/ConnectionProxyXA.java | 139 ++++++++---------- 1 file changed, 63 insertions(+), 76 deletions(-) diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java index 1cc482f6b38..8ceaa80696d 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java @@ -41,13 +41,14 @@ /** * Connection proxy for XA mode. + * */ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class); - private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance() - .getInt(XA_BRANCH_EXECUTION_TIMEOUT, DefaultValues.DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT); + private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt(XA_BRANCH_EXECUTION_TIMEOUT, + DefaultValues.DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT); private volatile boolean currentAutoCommitStatus = true; @@ -65,8 +66,7 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold private volatile Long prepareTime = null; - private static final Integer TIMEOUT = Math.max(BRANCH_EXECUTION_TIMEOUT, - DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); + private static final Integer TIMEOUT = Math.max(BRANCH_EXECUTION_TIMEOUT, DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); private boolean shouldBeHeld = false; @@ -76,14 +76,12 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold * Constructor of Connection Proxy for XA mode. * * @param originalConnection Normal Connection from the original DataSource. - * @param xaConnection XA Connection based on physical connection of the - * normal Connection above. - * @param resource The corresponding Resource(DataSource proxy) from - * which the connections was created. - * @param xid Seata global transaction xid. + * @param xaConnection XA Connection based on physical connection of the normal Connection above. + * @param resource The corresponding Resource(DataSource proxy) from which the connections was created. + * @param xid Seata global transaction xid. */ public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, - String xid) { + String xid) { super(originalConnection, xaConnection, resource, xid); this.shouldBeHeld = resource.isShouldBeHeld(); } @@ -120,18 +118,15 @@ private void releaseIfNecessary() { private void xaEnd(XAXid xaXid, int flags) throws XAException { if (!xaEnded) { - xaEnded = true; - } - if (xaActive) { xaResource.end(xaXid, flags); + xaEnded = true; } } /** * XA commit - * - * @param xid global transaction xid - * @param branchId transaction branch id + * @param xid global transaction xid + * @param branchId transaction branch id * @param applicationData application data * @throws SQLException SQLException */ @@ -145,9 +140,8 @@ public void xaCommit(String xid, long branchId, String applicationData) throws X /** * XA rollback - * - * @param xid global transaction xid - * @param branchId transaction branch id + * @param xid global transaction xid + * @param branchId transaction branch id * @param applicationData application data */ public void xaRollback(String xid, long branchId, String applicationData) throws XAException { @@ -163,7 +157,6 @@ public void xaRollback(String xid, long branchId, String applicationData) throws /** * XA rollback - * * @param xaXid xaXid * @throws XAException XAException */ @@ -178,7 +171,7 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { return; } if (isReadOnly()) { - // If it is a read-only transaction, do nothing + //If it is a read-only transaction, do nothing currentAutoCommitStatus = autoCommit; return; } @@ -186,43 +179,33 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. - if (xaActive && !xaEnded) { + if (xaActive) { commit(); } } else { - int flags; if (this.xaBranchXid != null && currentAutoCommitStatus) { - flags = XAResource.TMJOIN; - } else { - if (xaActive) { - throw new SQLException( - "should NEVER happen: setAutoCommit from true to false while xa branch is active"); - } - if (JdbcConstants.ORACLE.equals(resource.getDbType())) { - flags = SeataXAResource.ORATRANSLOOSE; - } else { - flags = XAResource.TMNOFLAGS; - } - // Start a XA branch - long branchId; - try { - // 1. register branch to TC then get the branch message - branchRegisterTime = System.currentTimeMillis(); - branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), - null, xid, null, null); - } catch (TransactionException te) { - cleanXABranchContext(); - throw new SQLException( - "failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), - te); - } - // 2. build XA-Xid with xid and branchId - this.xaBranchXid = XAXidBuilder.build(xid, branchId); + return; } + if (xaActive) { + throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); + } + // Start a XA branch + long branchId; + try { + // 1. register branch to TC then get the branch message + branchRegisterTime = System.currentTimeMillis(); + branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, + null); + } catch (TransactionException te) { + cleanXABranchContext(); + throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); + } + // 2. build XA-Xid with xid and branchId + this.xaBranchXid = XAXidBuilder.build(xid, branchId); // Keep the Connection if necessary keepIfNecessary(); try { - start(flags); + start(); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); @@ -250,12 +233,6 @@ public void commit() throws SQLException { if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } - try { - end(XAResource.TMSUCCESS); - } catch (XAException e) { - throw new SQLException("Failed to end(TMSUCCESS) xa branch on " + xid + "-" + xaBranchXid.getBranchId() - + " since " + e.getMessage(), e); - } } } @@ -279,22 +256,25 @@ public void rollback() throws SQLException { LOGGER.info("{} was rollbacked", xaBranchXid); } catch (XAException xe) { throw new SQLException("Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() - + " since " + xe.getMessage(), xe); + + " since " + xe.getMessage(), xe); } finally { cleanXABranchContext(); } } - private void start(int flags) throws XAException, SQLException { + private void start() throws XAException, SQLException { try (ResourceLock ignored = resourceLock.obtain()) { // 3. XA Start - xaResource.start(this.xaBranchXid, flags); + if (JdbcConstants.ORACLE.equals(resource.getDbType())) { + xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); + } else { + xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); + } try { termination(); } catch (SQLException e) { - // the framework layer does not actively call ROLLBACK when setAutoCommit throws - // an SQL exception + // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception xaResource.end(this.xaBranchXid, XAResource.TMFAIL); xaRollback(xaBranchXid); // Branch Report to TC: Failed @@ -330,33 +310,40 @@ private void checkTimeout(Long now) throws XAException { public void close() throws SQLException { try (ResourceLock ignored = resourceLock.obtain()) { try { - if (xaEnded) { + if (xaActive && this.xaBranchXid != null) { + // XA End: Success + try { + end(XAResource.TMSUCCESS); + } catch (SQLException sqle) { + // Rollback immediately before the XA Branch Context is deleted. + String xaBranchXid = this.xaBranchXid.toString(); + rollback(); + throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + } long now = System.currentTimeMillis(); checkTimeout(now); setPrepareTime(now); int prepare = xaResource.prepare(xaBranchXid); - // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and - // MSSQL Server (2022), - // only Oracle has read-only optimization; the others do not provide read-only - // feedback. + // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), + // only Oracle has read-only optimization; the others do not provide read-only feedback. // Therefore, the database type check can be eliminated here. if (prepare == XAResource.XA_RDONLY) { // Branch Report to TC: RDONLY reportStatusToTC(BranchStatus.PhaseOne_RDONLY); } - xaEnded = false; } } catch (XAException xe) { // Branch Report to TC: Failed reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" - + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe); + throw new SQLException( + "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe + .getMessage(), xe); } finally { + cleanXABranchContext(); rollBacked = false; if (isHeld() && shouldBeHeld()) { // if kept by a keeper, just hold the connection. } else { - cleanXABranchContext(); originalConnection.close(); } } @@ -410,8 +397,8 @@ private void termination(String xaBranchXid) throws SQLException { BranchStatus branchStatus = BaseDataSourceResource.getBranchStatus(xaBranchXid); if (branchStatus != null) { releaseIfNecessary(); - throw new SQLException("failed xa branch " + xid + " the global transaction has finish, branch status: " - + branchStatus.getCode()); + throw new SQLException("failed xa branch " + xid + + " the global transaction has finish, branch status: " + branchStatus.getCode()); } } @@ -422,16 +409,16 @@ private void termination(String xaBranchXid) throws SQLException { */ private void reportStatusToTC(BranchStatus status) { try { - DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), status, null); + DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), + status, null); } catch (TransactionException te) { - LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}", status, xid, xaBranchXid.getBranchId(), - te.getCode(), te.getMessage()); + LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}", + status, xid, xaBranchXid.getBranchId(), te.getCode(), te.getMessage()); } } /** * Get the lock of the current connection - * * @return the RESOURCE_LOCK */ public ResourceLock getResourceLock() {