舒友 發表於 2026-3-28 09:18:00

1、SEATA分布式事务——XA模式

<h4 id="一传统分布式xa事务的2pc">一、传统分布式XA事务的2PC</h4>
<p>  2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。常见的关系型数据库如 Oracle、MySQL 都支持两阶段提交协议,如下图:</p>
<ul>
<li>
<p>成功情况<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327225650638-643805447.png"></p>
</li>
<li>
<p>失败情况<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327225705069-1890165567.png"></p>
</li>
</ul>
<p>①、准备阶段(Prepare phase):TM(事务管理器)给每个RM(资源管理器,也就是数据库)发送 Prepare 消息,每个RM在本地执行事务,并写本地的 Undo/Redo (Undo 日志是记录修改前的数据,用于数据库回滚,Redo 日志是记录修改后的数据,用于提交事务后写入数据文件)日志,此时事务没有提交。<br>
②、提交阶段(commit phase):如果TM(事务管理器)收到了RM(资源管理器,也就是数据库)的执行失败或者超时消息时,直接给每个RM发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交(Commit)或者回滚(Rollback)操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。<br>
  传统XA事务的详细细节和实现方式,请查看我的另一篇博客:mysql数据库事务的实现和XA事务</p>
<h4 id="二seata的xa模式">二、Seata的XA模式</h4>
<p>  seata实现分布式事务的样例程序<br>
  在 Seata 定义的分布式事务框架内,XA模式是利用RM(资源管理器,也就是数据库)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327225833830-1939159956.png"><br>
从上面的图可以看到,seata XA 模式分为以下5个步骤:<br>
①、TM(事务管理器) 开启全局事务;<br>
②、RM 向 TC(事务协调者) 注册分支事务;<br>
③、RM 向 TC 报告分支事务状态;<br>
④、TC 向 RM (资源管理器,也就是数据库)发送commit/rollback 请求;<br>
⑤、TM 结束全局事务Global Commit/Rollback<br>
  负责RM 客户端的类是RmNettyRemotingClient.class,这个类的UML图如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327225910542-280558873.png"></p>
<p>RmNettyRemotingClient.class的父类中的内部类AbstractNettyRemotingClient.class::ClientHandler.class来处理 TC 发来的请求并再次委托给父类AbstractNettyRemoting.class::processMessage()函数来处理TC发来的请求</p>
<ul>
<li>AbstractNettyRemotingClient.class::ClientHandler.class的源码如下:</li>
</ul>
<pre><code>...省略部分导包代码...
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
...省略部分导包代码...
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...省略部分代码...
    @Sharable
    class ClientHandler extends ChannelDuplexHandler {
      ClientHandler() {
      }
   
      public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof RpcMessage) {
                AbstractNettyRemotingClient.this.processMessage(ctx, (RpcMessage)msg);
            } else {
                AbstractNettyRemotingClient.LOGGER.error("rpcMessage type error");
            }
   
      }
    }
    ...省略部分代码...
}
</code></pre>
<ul>
<li>AbstractNettyRemoting.class::processMessage()的源码如下:</li>
</ul>
<pre><code>public abstract class AbstractNettyRemoting implements Disposable {
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
      if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
      }
      Object body = rpcMessage.getBody();
      if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            final Pair&lt;RemotingProcessor, ExecutorService&gt; pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                  try {
                        pair.getSecond().execute(() -&gt; {
                            try {
                              //最终调用的是RmBranchCommitProcessor.class的process()函数
                              pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                              LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                              MDC.clear();
                            }
                        });
                  } catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@");
                            long idx = System.currentTimeMillis();
                            try {
                              String jstackFile = idx + ".log";
                              LOGGER.info("jstack command will dump to " + jstackFile);
                              Runtime.getRuntime().exec(String.format("jstack %s &gt; %s", pid, jstackFile));
                            } catch (IOException exx) {
                              LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                  }
                } else {
                  try {
                        pair.getFirst().process(ctx, rpcMessage);
                  } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                  }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
      } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
      }
    }
}
</code></pre>
<ul>
<li>RmBranchCommitProcessor.class的源码如下:</li>
</ul>
<pre><code>package org.apache.seata.core.rpc.processor.client;

import io.netty.channel.ChannelHandlerContext;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
import org.apache.seata.core.protocol.transaction.BranchCommitResponse;
import org.apache.seata.core.rpc.RemotingClient;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.TransactionMessageHandler;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RmBranchCommitProcessor implements RemotingProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);
    private TransactionMessageHandler handler;
    private RemotingClient remotingClient;

    public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
      this.handler = handler;
      this.remotingClient = remotingClient;
    }

    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
      String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
      Object msg = rpcMessage.getBody();
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
      }

      this.handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest)msg);
    }

    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
      BranchCommitResponse resultMessage = (BranchCommitResponse)this.handler.onRequest(branchCommitRequest, (RpcContext)null);
      if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
      }

      try {
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
      } catch (Throwable var6) {
            LOGGER.error("branch commit error: {}", var6.getMessage(), var6);
      }

    }
}
</code></pre>
<p>seata 的 xa 模式是两阶段提交:<br>
①、第一阶段先执行 XA Start、执行SQL、XA End三个步骤,之后直接执行XA Prepare。<br>
②、第二阶段执行 XA commit/rollback。<br>
但是oracle数据库不支持,因为 oracle 实现的是标准的 xa 协议,即 xa end 后,TC(事务协调者)向RM (资源管理器,也就是数据库)统一发送 prepare,最后再发送 commit/rollback。这也导致了 seata 的 xa 模式对 oracle 数据库的支持不太好。</p>
<h5 id="21xa模式数据源代理与at模式数据源代理的区别">2.1、xa模式数据源代理与at模式数据源代理的区别</h5>
<p>  seata 中的 XA 模式是使用数据源代理来实现的,需要手动配置数据源代理,代码如下:</p>
<pre><code>import org.apache.seata.rm.datasource.xa.DataSourceProxyXA;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import javax.sql.DataSource;

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
    //也可以根据普通 DataSource 来创建 XAConnection,但是这种方式有兼容性问题(比如 oracle数据库不支持)
    return new DruidDataSource();
}

@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
    //所以 seata 使用了开发者自己配置 XADataSource
    //seata 提供的 XA 数据源代理,要求代码框架中必须使用 druid 连接池
    return new DataSourceProxyXA(druidDataSource);
}
</code></pre>
<p>①、在数据源代理根据普通数据源(DataSource)中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT 模式的数据源代理机制的区别,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327230147490-1576941064.png"></p>
<p>DataSourceProxyXA、ConnectionProxyXA、StatementProxyXA的UML关系图,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327230205854-578171871.png"></p>
<ul>
<li>DataSourceProxyXA.class的部分源码</li>
</ul>
<pre><code>...省略部分导包代码...
import org.apache.seata.rm.datasource.util.XAUtils;
import javax.sql.DataSource;
import javax.sql.XAConnection;
...省略部分导包代码...
public class DataSourceProxyXA extends AbstractDataSourceProxyXA {
    ...省略部分代码...
    protected Connection getConnectionProxy(Connection connection) throws SQLException {
      return !RootContext.inGlobalTransaction() ? connection : this.getConnectionProxyXA(connection);
    }
   
    protected Connection getConnectionProxyXA() throws SQLException {
      Connection connection = this.dataSource.getConnection();
      return this.getConnectionProxyXA(connection);
    }
    //创建ConnectionProxyXA和XAConnection
    private Connection getConnectionProxyXA(Connection connection) throws SQLException {
      Connection physicalConn = (Connection)connection.unwrap(Connection.class);
      XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this);
      ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID());
      connectionProxyXA.init();
      return connectionProxyXA;
    }
    ...省略部分代码...
}
</code></pre>
<ul>
<li>ConnectionProxyXA .class的部分源码和AbstractConnectionProxyXA.class的部分源码</li>
</ul>
<pre><code>...省略部分导包代码...
public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);
    private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt("client.rm.branchExecutionTimeoutXA", 60000);
    private volatile boolean currentAutoCommitStatus = true;
    private volatile XAXid xaBranchXid;
    private volatile boolean xaActive = false;
    private volatile boolean xaEnded = false;
    private volatile boolean kept = false;
    private volatile boolean rollBacked = false;
    private volatile Long branchRegisterTime = null;
    private volatile Long prepareTime = null;
    private static final Integer TIMEOUT;
    private boolean shouldBeHeld = false;

    public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
      super(originalConnection, xaConnection, resource, xid);
      this.shouldBeHeld = resource.isShouldBeHeld();
    }
   
    ...省略部分代码...
}
</code></pre>
<pre><code>package org.apache.seata.rm.datasource.xa;
...省略部分导包代码...
import javax.sql.XAConnection;
import java.sql.Connection;
...省略部分导包代码...
public abstract class AbstractConnectionProxyXA implements Connection {
   ...省略部分代码...
    protected Connection originalConnection;
    public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
      this.originalConnection = originalConnection;
      this.xaConnection = xaConnection;
      this.resource = resource;
      this.xid = xid;
    }
    //用重载的方式创建不同的StatementProxyXA
    @Override
    public Statement createStatement() throws SQLException {
      Statement targetStatement = originalConnection.createStatement();
      return new StatementProxyXA(this, targetStatement);
    }
   
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
      Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency);
      return new StatementProxyXA(this, statement);
    }
   
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
            throws SQLException {
      Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency,
                resultSetHoldability);
      return new StatementProxyXA(this, statement);
    }
   ...省略部分代码...
}
</code></pre>
<ul>
<li>StatementProxyXA.class的部分源码</li>
</ul>
<pre><code>import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
/**
* Statement proxy for XA mode.
*
*/
public class StatementProxyXA implements Statement {

    protected AbstractConnectionProxyXA connectionProxyXA;

    protected Statement targetStatement;

    public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
      this.connectionProxyXA = connectionProxyXA;
      this.targetStatement = targetStatement;
    }
    ...省略部分代码...
}
</code></pre>
<p>②、在数据源代理是指定的XA 数据源(XADataSource)进行代理方式中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT模式的数据源代理机制的区别,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327230322002-1316496604.png"></p>
<hr>
<blockquote>
<p><em>作者在SEATA的2.3.0版本中没有找到XADataSourceProxy.class、XAConnectionProxy.class、StatementProxyXA.class</em></p>
</blockquote>
<h5 id="22xa-第一阶段的部分源码">2.2、XA 第一阶段的部分源码</h5>
<p>  XA第一阶段是指XA Start、执行SQL、XA End、XA Prepare,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327230718330-246759307.png"></p>
<p>当 RM 收到 DML 请求后,seata 会使用 ExecuteTemplateXA.class中的静态函数execute()来执行,execute()中有一个地方很关键,就是把 autocommit 属性改为了 false,而 mysql 默认 autocommit 是 true。事务提交之后,还要把 autocommit 改回默认。如下所示:</p>
<pre><code>import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
/**
* Statement proxy for XA mode.
*
*/
public class StatementProxyXA implements Statement {
    protected AbstractConnectionProxyXA connectionProxyXA;
   
    protected Statement targetStatement;
   
    public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
      this.connectionProxyXA = connectionProxyXA;
      this.targetStatement = targetStatement;
    }
   
    @Override
    public int executeUpdate(String sql) throws SQLException {
      return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -&gt; statement.executeUpdate(
            (String)args), targetStatement, sql);
    }
    ...省略部分代码...
}
</code></pre>
<h6 id="221xa-start环节">2.2.1、XA Start环节</h6>
<ul>
<li>ExecuteTemplateXA .class的的部分源码</li>
</ul>
<pre><code>package org.apache.seata.rm.datasource.xa;
import org.apache.seata.rm.datasource.exec.StatementCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Statement;
public class ExecuteTemplateXA {
   
    private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);

    public static &lt;T, S extends Statement&gt; T execute(AbstractConnectionProxyXA connectionProxyXA,
                                                   StatementCallback&lt;T, S&gt; statementCallback,
                                                   S targetStatement,
                                                   Object... args) throws SQLException {
      boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
      if (autoCommitStatus) {
            // XA Start环节
            connectionProxyXA.setAutoCommit(false);
      }
      ...省略部分代码...
    }
   ...省略部分代码...
}
</code></pre>
<ul>
<li>ConnectionProxyXA .class的部分源码——真正开启XA Start环节</li>
</ul>
<pre><code>package org.apache.seata.rm.datasource.xa;

import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.PooledConnection;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.rm.BaseDataSourceResource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.util.SeataXAResource;
import org.apache.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
   ...省略部分代码...
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
      if (currentAutoCommitStatus == autoCommit) {
            return;
      }
      if (isReadOnly()) {
            //If it is a read-only transaction, do nothing
            currentAutoCommitStatus = autoCommit;
            return;
      }
      if (autoCommit) {
            // According to JDBC spec:
            // If this method is called during a transaction and the
            // auto-commit mode is changed, the transaction is committed.
            if (xaActive) {
                commit();
            }
      } else {
            if (xaActive) {
                throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
            }
            // Start a XA branch
            long branchId;
            try {
                // 1. register branch to TC then get the branch message
                branchRegisterTime = System.currentTimeMillis();
                branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
                        null);
            } catch (TransactionException te) {
                cleanXABranchContext();
                throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
            }
            // 2. build XA-Xid with xid and branchId
            this.xaBranchXid = XAXidBuilder.build(xid, branchId);
            // Keep the Connection if necessary
            keepIfNecessary();
            try {
                start();//开启XA事务
            } catch (XAException e) {
                cleanXABranchContext();
                throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
            }
            // 4. XA is active
            this.xaActive = true;
   
      }
   
      currentAutoCommitStatus = autoCommit;
    }
    //最终调用了start()函数才真正开启了XA Start环节
    private synchronized void start() throws XAException, SQLException {
      // 3. XA Start
      if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
            xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
      } else {
            xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
      }
   
      try {
            termination();
      } catch (SQLException e) {
            // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception
            xaResource.end(this.xaBranchXid, XAResource.TMFAIL);
            xaRollback(xaBranchXid);
            // Branch Report to TC: Failed
            reportStatusToTC(BranchStatus.PhaseOne_Failed);
            throwe;
      }
    }
   ...省略部分代码...
}
</code></pre>
<h6 id="222执行sql环节">2.2.2、执行SQL环节</h6>
<ul>
<li>ExecuteTemplateXA .class的部分源码</li>
</ul>
<pre><code>package org.apache.seata.rm.datasource.xa;
import org.apache.seata.rm.datasource.exec.StatementCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Statement;
/**
* The type Execute template.
*
*/
public class ExecuteTemplateXA {

    private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);

    public static &lt;T, S extends Statement&gt; T execute(AbstractConnectionProxyXA connectionProxyXA,
                                                   StatementCallback&lt;T, S&gt; statementCallback,
                                                   S targetStatement,
                                                   Object... args) throws SQLException {
      boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
      if (autoCommitStatus) {
            // XA Start
            connectionProxyXA.setAutoCommit(false);
      }
      try {
            T res = null;
            try {
                //执行SQL环节,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL
                // execute SQL
                res = statementCallback.execute(targetStatement, args);
                        ...省略部分代码...
    }
    ...省略部分代码...
}
</code></pre>
<ul>
<li>StatementProxyXA .class的部分源代码,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL</li>
</ul>
<pre><code>public class StatementProxyXA implements Statement {
    protected AbstractConnectionProxyXA connectionProxyXA;
   
    protected Statement targetStatement;
   
    public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
      this.connectionProxyXA = connectionProxyXA;
      this.targetStatement = targetStatement;
    }
   
    @Override
    public int executeUpdate(String sql) throws SQLException {
      //最终调用了StatementProxyXA.class中的匿名内部类执行了SQL
      return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -&gt; statement.executeUpdate(
            (String)args), targetStatement, sql);
    }
    ...省略部分代码...
}
</code></pre>
<h6 id="223xa-end环节和xa-prepare环节">2.2.3、XA End环节和XA Prepare环节</h6>
<ul>
<li>ExecuteTemplateXA .class的的部分源码</li>
</ul>
<pre><code>package org.apache.seata.rm.datasource.xa;
import org.apache.seata.rm.datasource.exec.StatementCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Statement;
/**
* The type Execute template.
*
*/
public class ExecuteTemplateXA {

    private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);

    public static &lt;T, S extends Statement&gt; T execute(AbstractConnectionProxyXA connectionProxyXA,
                                                   StatementCallback&lt;T, S&gt; statementCallback,
                                                   S targetStatement,
                                                   Object... args) throws SQLException {
      boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
      if (autoCommitStatus) {
            // XA Start
            connectionProxyXA.setAutoCommit(false);
      }
      try {
            T res = null;
            try {
                // execute SQL
                res = statementCallback.execute(targetStatement, args);

            } catch (Throwable ex) {
                if (autoCommitStatus) {
                  // XA End &amp; Rollback
                  try {
                        connectionProxyXA.rollback();
                  } catch (SQLException sqle) {
                        // log and ignore the rollback failure.
                        LOGGER.warn(
                            "Failed to rollback xa branch of " + connectionProxyXA.xid +
                              "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
                            sqle);
                  }
                }

                if (ex instanceof SQLException) {
                  throw ex;
                } else {
                  throw new SQLException(ex);
                }

            }
            if (autoCommitStatus) {
                try {
                  // XA End &amp; Prepare
                  //XA End环节和XA Prepare环节
                  connectionProxyXA.commit();
                  ...省略部分代码...
    }
    ...省略部分代码...
}
</code></pre>
<ul>
<li>ConnectionProxyXA .class的部分源码——真正执行XA End环节和XA Prepare环节</li>
</ul>
<pre><code>package org.apache.seata.rm.datasource.xa;

import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.PooledConnection;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.rm.BaseDataSourceResource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.util.SeataXAResource;
import org.apache.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
   ...省略部分代码...
    @Override
    public synchronized void commit() throws SQLException {
      if (currentAutoCommitStatus || isReadOnly()) {
            // Ignore the committing on an autocommit session and read-only transaction.
            return;
      }
      if (!xaActive || this.xaBranchXid == null) {
            throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
      }
      try {
            // XA End: Success
            try {
                end(XAResource.TMSUCCESS);
            } catch (SQLException sqle) {
                // Rollback immediately before the XA Branch Context is deleted.
                String xaBranchXid = this.xaBranchXid.toString();
                rollback();
                throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
            }
            long now = System.currentTimeMillis();
            checkTimeout(now);
            setPrepareTime(now);
            int prepare = xaResource.prepare(xaBranchXid);
            // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
            // only Oracle has read-only optimization; the others do not provide read-only feedback.
            // Therefore, the database type check can be eliminated here.
            if (prepare == XAResource.XA_RDONLY) {
                // Branch Report to TC: RDONLY
                reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
            }
      } catch (XAException xe) {
            // Branch Report to TC: Failed
            reportStatusToTC(BranchStatus.PhaseOne_Failed);
            throw new SQLException(
                "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                  .getMessage(), xe);
      } finally {
            cleanXABranchContext();
      }
    }
    private void xaEnd(XAXid xaXid, int flags) throws XAException {
      if (!xaEnded) {
            xaResource.end(xaXid, flags);
            xaEnded = true;
      }
    }
   
    private synchronized void end(int flags) throws XAException, SQLException {
      xaEnd(xaBranchXid, flags);
      termination();
    }
   
    private void termination() throws SQLException {
      termination(this.xaBranchXid.toString());
    }
   
    private void termination(String xaBranchXid) throws SQLException {
      // if it is not empty, the resource will hang and need to be terminated early
      BranchStatus branchStatus = BaseDataSourceResource.getBranchStatus(xaBranchXid);
      if (branchStatus != null) {
            releaseIfNecessary();
            throw new SQLException("failed xa branch " + xid
                  + " the global transaction has finish, branch status: " + branchStatus.getCode());
      }
    }
    //给TC汇报XA Commit状态和XA Rollback状态
    private void reportStatusToTC(BranchStatus status) {
      try {
            DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
                  status, null);
      } catch (TransactionException te) {
            LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}",
                  status, xid, xaBranchXid.getBranchId(), te.getCode(), te.getMessage());
      }
    }
   ...省略部分代码...
}
</code></pre>
<h5 id="23xa-第二阶段的部分源码">2.3、XA 第二阶段的部分源码</h5>
<p>  XA第二阶段是指XA Commit或者XA Rollback,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327231023094-1268330348.png"></p>
<p>当XA模式的第一阶段执行完成后,便会根据第一阶段的执行结果来执行XA模式的第二阶段,如下图所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327231039251-1503310811.png"></p>
<ul>
<li>RmBranchCommitProcessor.class的部分源码</li>
</ul>
<pre><code>...省略导包代码...
public class RmBranchCommitProcessor implements RemotingProcessor {
   ...省略部分代码...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
      String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
      Object msg = rpcMessage.getBody();
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
      }
      handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
    }
   
    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
      BranchCommitResponse resultMessage;
      resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
      if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
      }
      try {
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
      } catch (Throwable throwable) {
            LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
      }
    }
   ...省略部分代码...
}
</code></pre>
<ul>
<li>AbstractRMHandler.class的部分源码</li>
</ul>
<pre><code>...省略导包代码...
public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {
   ...省略部分代码...
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
      BranchCommitResponse response = new BranchCommitResponse();
      exceptionHandleTemplate(new AbstractCallback&lt;BranchCommitRequest, BranchCommitResponse&gt;() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
      }, request, response);
      return response;
    }
   
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
      BranchRollbackResponse response = new BranchRollbackResponse();
      exceptionHandleTemplate(new AbstractCallback&lt;BranchRollbackRequest, BranchRollbackResponse&gt;() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
      }, request, response);
      return response;
    }
   
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
      throws TransactionException {
      String xid = request.getXid();
      long branchId = request.getBranchId();
      String resourceId = request.getResourceId();
      String applicationData = request.getApplicationData();
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
      }
      BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
      response.setXid(xid);
      response.setBranchId(branchId);
      response.setBranchStatus(status);
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
      }
   
    }
   
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
      throws TransactionException {
      String xid = request.getXid();
      long branchId = request.getBranchId();
      String resourceId = request.getResourceId();
      String applicationData = request.getApplicationData();
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
      }
      BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
      response.setXid(xid);
      response.setBranchId(branchId);
      response.setBranchStatus(status);
      if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
      }
    }
   
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
      if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
      }
      AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
      transactionRequest.setRMInboundMessageHandler(this);
   
      return transactionRequest.handle(context);
    }
   ...省略部分代码...
}
</code></pre>
<ul>
<li>BranchCommitRequest.class的部分源码</li>
</ul>
<pre><code>...省略导包代码...
public class BranchCommitRequest extends AbstractBranchEndRequest {

    @Override
    public short getTypeCode() {
      return MessageType.TYPE_BRANCH_COMMIT;
    }

    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
      return handler.handle(this);
    }


}
</code></pre>
<ul>
<li>ResourceManagerXA.class的部分源码</li>
</ul>
<pre><code>...省略导包代码...
public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager {
   ...省略部分代码...
    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
      return finishBranch(true, branchType, xid, branchId, resourceId, applicationData);
    }
   
    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
      return finishBranch(false, branchType, xid, branchId, resourceId, applicationData);
    }
   ...省略部分代码...
}
</code></pre>
<p>ResourceManagerXA.class的UML关系图,如下所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327231359785-2018562747.png"></p>
<p>上述类的调用关系,如下时序图所示:<br>
<img src="https://img2024.cnblogs.com/blog/2485827/202603/2485827-20260327231402019-659605308.png"></p>
<p>ResourceManagerXA.class::finishBranch()函数最终调用了ConnectionProxyXA.class::xaCommit()函数或者ConnectionProxyXA.class::xaRollback()函数,如下所示:</p>
<pre><code>...省略导包代码...
public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
   ...省略部分代码...
    public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException {
      XAXid xaXid = XAXidBuilder.build(xid, branchId);
      xaResource.commit(xaXid, false);
      releaseIfNecessary();
    }
   
   
    public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException {
      if (this.xaBranchXid != null) {
            xaRollback(xaBranchXid);
      } else {
            XAXid xaXid = XAXidBuilder.build(xid, branchId);
            xaRollback(xaXid);
      }
    }
   ...省略部分代码...
}
</code></pre><br><br>
来源:https://www.cnblogs.com/Carey-ccl/p/19784564
頁: [1]
查看完整版本: 1、SEATA分布式事务——XA模式