沈玉彪 發表於 2026-1-31 23:40:00

收入写RAFT算法(一)Leader选举

<h1 id="raft-leader-选举实现文档">Raft Leader 选举实现文档</h1>
<h2 id="目录">目录</h2>
<ul>
<li>1. 概述</li>
<li>2. 核心概念</li>
<li>3. 涉及的类及其职责</li>
<li>4. 实现细节
<ul>
<li>4.1 节点状态与转换</li>
<li>4.2 选举超时机制</li>
<li>4.3 投票请求处理</li>
<li>4.4 选举发起流程</li>
<li>4.5 投票响应处理</li>
<li>4.6 心跳机制</li>
<li>4.7 安全性保证</li>
</ul>
</li>
<li>5. 测试指南</li>
<li>6. 使用示例</li>
<li>7. 常见问题</li>
</ul>
<hr>
<h2 id="1-概述">1. 概述</h2>
<h3 id="11-目的">1.1 目的</h3>
<p>本文档详细说明了 LingRaft-Lite 模块中 Raft Leader 选举功能的实现,包括涉及的类、实现细节、测试方法等,便于开发者理解和复现。</p>
<h3 id="12-功能范围">1.2 功能范围</h3>
<ul>
<li>节点状态管理(Follower、Candidate、Leader)</li>
<li>选举超时检测</li>
<li>投票请求与响应处理</li>
<li>多数派选举机制</li>
<li>心跳维护 Leader 地位</li>
<li>网络分区处理</li>
</ul>
<h3 id="13-raft-算法参考">1.3 Raft 算法参考</h3>
<p>本实现基于 Raft 论文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 选举部分,具体参考:</p>
<ul>
<li>Section 5.1: Leader Election</li>
<li>Section 5.2: Leader Election - RequestVote RPC</li>
<li>Section 5.4.1: Election Safety Property</li>
</ul>
<hr>
<h2 id="2-核心概念">2. 核心概念</h2>
<h3 id="21-节点状态">2.1 节点状态</h3>
<p>Raft 节点有三种状态:</p>
<table>
<thead>
<tr>
<th>状态</th>
<th>说明</th>
<th>职责</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>FOLLOWER</strong></td>
<td>从节点</td>
<td>响应 Leader 的 RPC 请求(AppendEntries、RequestVote)</td>
</tr>
<tr>
<td><strong>CANDIDATE</strong></td>
<td>候选节点</td>
<td>发起选举,向其他节点请求投票</td>
</tr>
<tr>
<td><strong>LEADER</strong></td>
<td>主节点</td>
<td>处理客户端请求,向 Follower 复制日志,发送心跳</td>
</tr>
</tbody>
</table>
<h3 id="22-任期-term">2.2 任期 (Term)</h3>
<p><strong>定义</strong>:</p>
<ul>
<li>时间被分成多个任期,每个任期以选举开始</li>
<li>任期号是单调递增的整数</li>
<li>每次选举都进入新任期</li>
</ul>
<p><strong>用途</strong>:</p>
<ul>
<li>识别过时的信息(旧任期的投票、心跳等)</li>
<li>防止脑裂(分裂投票)</li>
</ul>
<p><strong>实现</strong>:</p>
<pre><code class="language-java">private volatile long currentTerm = 0;// 当前任期号
</code></pre>
<h3 id="23-选举超时-election-timeout">2.3 选举超时 (Election Timeout)</h3>
<p><strong>定义</strong>:</p>
<ul>
<li>Follower 在收到有效心跳或投票请求之前等待的时间</li>
<li>超时后转为 Candidate 并发起选举</li>
</ul>
<p><strong>随机化</strong>:</p>
<ul>
<li>为了避免多个节点同时超时导致平票选举,超时时间随机化</li>
<li>通常在 150ms ~ 300ms 之间</li>
</ul>
<p><strong>实现</strong>:</p>
<pre><code class="language-java">// 配置随机范围
config.setElectionTimeoutRandomRange(Range.of(150, 300));

// 计算随机超时时间
int randomTimeout = raftConfig.getElectionTimeoutMs();
</code></pre>
<h3 id="24-多数派-majority">2.4 多数派 (Majority)</h3>
<p><strong>定义</strong>:</p>
<ul>
<li>超过半数的节点数:<code>N/2 + 1</code></li>
<li>3 节点集群需要 2 票</li>
<li>5 节点集群需要 3 票</li>
</ul>
<p><strong>重要性</strong>:</p>
<ul>
<li>保证选举结果的唯一性</li>
<li>两个多数派必然有交集,确保只有一个 Leader</li>
</ul>
<p><strong>实现</strong>:</p>
<pre><code class="language-java">public VoteCounter(long term, int totalNodes) {
    this.majorityCount = totalNodes / 2 + 1;
}
</code></pre>
<h3 id="25-投票规则">2.5 投票规则</h3>
<p><strong>节点投票给候选人的条件</strong>:</p>
<ol>
<li>候选人的任期 &gt;= 当前任期</li>
<li>如果任期相同,candidate 的日志至少和当前节点一样新</li>
</ol>
<p><strong>日志比较规则</strong>:</p>
<ul>
<li>如果 <code>candidateLastLogTerm &gt; lastLogTerm</code>,投票</li>
<li>如果 <code>candidateLastLogTerm == lastLogTerm</code> 且 <code>candidateLastLogIndex &gt;= lastLogIndex</code>,投票</li>
<li>否则,拒绝投票</li>
</ul>
<hr>
<h2 id="3-涉及的类及其职责">3. 涉及的类及其职责</h2>
<h3 id="31-核心类">3.1 核心类</h3>
<table>
<thead>
<tr>
<th>类名</th>
<th>路径</th>
<th>职责</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>RaftNodeImpl</strong></td>
<td><code>com.ling.raft.core.RaftNodeImpl</code></td>
<td>节点状态管理、选举发起、投票处理、心跳发送</td>
</tr>
<tr>
<td><strong>ConsensusModuleImpl</strong></td>
<td><code>com.ling.raft.core.ConsensusModuleImpl</code></td>
<td>投票请求和响应的具体实现逻辑</td>
</tr>
<tr>
<td><strong>VoteCounter</strong></td>
<td><code>com.ling.raft.core.VoteCounter</code></td>
<td>投票计数器,统计和判断多数派</td>
</tr>
<tr>
<td><strong>ElectionTask</strong></td>
<td><code>com.ling.raft.core.task.ElectionTask</code></td>
<td>选举超时检测任务</td>
</tr>
<tr>
<td><strong>HeartbeatTask</strong></td>
<td><code>com.ling.raft.core.task.HeartbeatTask</code></td>
<td>Leader 心跳发送任务</td>
</tr>
<tr>
<td><strong>ServerStatusEnum</strong></td>
<td><code>com.ling.raft.enums.ServerStatusEnum</code></td>
<td>节点状态枚举</td>
</tr>
<tr>
<td><strong>VoteRequest</strong></td>
<td><code>com.ling.raft.model.dto.VoteRequest</code></td>
<td>投票请求 RPC</td>
</tr>
<tr>
<td><strong>VoteResponse</strong></td>
<td><code>com.ling.raft.model.dto.VoteResponse</code></td>
<td>投票响应 RPC</td>
</tr>
<tr>
<td><strong>ThreeNodeElectionTest</strong></td>
<td><code>com.ling.raft.example.leader.ThreeNodeElectionTest</code></td>
<td>完整测试程序</td>
</tr>
</tbody>
</table>
<h3 id="32-类关系图">3.2 类关系图</h3>
<pre><code>┌─────────────────────┐
│   RaftNodeImpl      │
│   (节点主类)         │
└──────────┬──────────┘
         │ 持有引用
         ├─────────────────┐
         ▼               ▼
┌─────────────────────┐┌─────────────────────┐
│ ConsensusModuleImpl ││   VoteCounter       │
│ (投票逻辑)          ││   (投票计数)         │
└─────────────────────┘└─────────────────────┘
         │                     │
         ├─────────────────────┤
         ▼                     ▼
┌─────────────────────┐┌─────────────────────┐
│ElectionTask       ││HeartbeatTask      │
│(选举超时检测)      ││(心跳任务)         │
└─────────────────────┘└─────────────────────┘
</code></pre>
<h3 id="33-关键字段说明">3.3 关键字段说明</h3>
<h4 id="raftnodeimpl">RaftNodeImpl</h4>
<pre><code class="language-java">// 节点状态
private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;

// 持久化状态
private volatile long currentTerm = 0;         // 当前任期
private volatile String votedFor = null;         // 本轮任期投票给的候选人

// 选举相关
private ScheduledExecutorService electionExecutor;
private ScheduledFuture&lt;?&gt; electionFuture;
private VoteCounter currentVoteCounter;
private final Random random = new Random();

// 心跳相关
private ScheduledExecutorService heartbeatExecutor;
private ScheduledFuture&lt;?&gt; heartbeatFuture;

// 时间记录
private volatile long prevElectionTime = 0;      // 上次选举时间
private volatile long preHeartBeatTime = 0;      // 上次收到心跳时间
</code></pre>
<h4 id="consensusmoduleimpl">ConsensusModuleImpl</h4>
<pre><code class="language-java">public final RaftNodeImpl node;// 持有 RaftNodeImpl 的引用
public final ReentrantLock voteLock = new ReentrantLock();// 投票锁
public final ReentrantLock appendEntriesLock = new ReentrantLock();// 追加条目锁
</code></pre>
<h4 id="votecounter">VoteCounter</h4>
<pre><code class="language-java">private final long term;                      // 当前选举任期
private final Set&lt;String&gt; votesReceived;      // 已投票的节点ID集合
private final int majorityCount;            // 需要获得的多数派票数
private volatile boolean votedForSelf;      // 是否已投票给自己
</code></pre>
<hr>
<h2 id="4-实现细节">4. 实现细节</h2>
<h3 id="41-节点状态与转换">4.1 节点状态与转换</h3>
<h4 id="411-状态枚举">4.1.1 状态枚举</h4>
<p><strong>类名</strong>:<code>ServerStatusEnum</code></p>
<p><strong>定义</strong>:</p>
<pre><code class="language-java">public enum ServerStatusEnum {
    LEADER("LEADER", "主节点"),
    CANDIDATE("CANDIDATE", "候选节点"),
    FOLLOWER("FOLLOWER", "从节点");
}
</code></pre>
<h4 id="412-状态转换图">4.1.2 状态转换图</h4>
<pre><code>         +-------------------------+
         |         初始化          |
         +-------------------------+
                  |
                  ▼
         +-------------------------+
         |      FOLLOWER          | &lt;------------+
         |(等待心跳或投票)      |            |
         +-------------------------+            |
                  |                           |
                  | 选举超时                     | 收到更高任期的
                  |                           | AppendEntries 或
                  ▼                           | RequestVote
         +-------------------------+            |
         |   CANDIDATE          |            |
         |(发起选举)             |            |
         +-------------------------+            |
                  |                           |
                  | 获得多数派                  |
                  |                           |
                  ▼                           |
         +-------------------------+            |
         |      LEADER            | --------------+
         |(处理客户端请求)      |发现更高任期
         +-------------------------+
</code></pre>
<h4 id="413-转为-follower">4.1.3 转为 Follower</h4>
<p><strong>方法</strong>:<code>becomeFollower(newTerm)</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:175-196</code></p>
<pre><code class="language-java">public void becomeFollower(long newTerm) {
    // 检查任期
    if (newTerm &lt; currentTerm) {
      log.warn("Cannot become Follower with smaller term: {} &lt; {}",
               newTerm, currentTerm);
      return;
    }

    ServerStatusEnum oldStatus = nodeStatus;

    // 更新状态
    nodeStatus = ServerStatusEnum.FOLLOWER;
    currentTerm = newTerm;
    votedFor = null;// 重置投票记录
    currentVoteCounter = null;// 清空投票计数器

    // 停止心跳(如果之前是 Leader)
    cancelHeartbeatTimer();

    // 重置选举定时器
    resetElectionTimer();

    log.info("State changed: {} -&gt; FOLLOWER, term: {}", oldStatus, currentTerm);
}
</code></pre>
<p><strong>调用场景</strong>:</p>
<ol>
<li>节点初始化</li>
<li>收到更高任期的 AppendEntries</li>
<li>收到更高任期的 RequestVote</li>
<li>Candidate 收到有效 AppendEntries</li>
</ol>
<h4 id="414-转为-candidate">4.1.4 转为 Candidate</h4>
<p><strong>方法</strong>:<code>becomeCandidate()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:201-216</code></p>
<pre><code class="language-java">public void becomeCandidate() {
    ServerStatusEnum oldStatus = nodeStatus;

    // 增加任期号(重要!)
    currentTerm++;
    nodeStatus = ServerStatusEnum.CANDIDATE;
    votedFor = currentNodeConfig.getServerId();// 投票给自己

    log.info("State changed: {} -&gt; CANDIDATE, new term: {}", oldStatus, currentTerm);

    // 重置选举定时器
    resetElectionTimer();

    // 发起投票请求
    startElection();
}
</code></pre>
<p><strong>调用场景</strong>:</p>
<ol>
<li>选举超时</li>
<li>作为 Candidate 重新发起选举(平票后)</li>
</ol>
<h4 id="415-转为-leader">4.1.5 转为 Leader</h4>
<p><strong>方法</strong>:<code>becomeLeader()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:221-243</code></p>
<pre><code class="language-java">public void becomeLeader() {
    // 只有 Candidate 才能成为 Leader
    if (nodeStatus != ServerStatusEnum.CANDIDATE) {
      log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
      return;
    }

    ServerStatusEnum oldStatus = nodeStatus;
    nodeStatus = ServerStatusEnum.LEADER;

    // 初始化 Leader 状态(nextIndex、matchIndex)
    initializeLeaderState();

    // 取消选举定时器(Leader 不需要选举)
    cancelElectionTimer();

    log.info("========================================");
    log.info("State changed: {} -&gt; LEADER, term: {}", oldStatus, currentTerm);
    log.info("========================================");

    // 立即发送心跳并开始心跳定时器
    sendHeartbeats();
    startHeartbeatTimer();
}
</code></pre>
<p><strong>调用场景</strong>:</p>
<ol>
<li>Candidate 获得多数派投票</li>
<li>单机模式直接成为 Leader</li>
</ol>
<hr>
<h3 id="42-选举超时机制">4.2 选举超时机制</h3>
<h4 id="421-选举超时检测">4.2.1 选举超时检测</h4>
<p><strong>类名</strong>:<code>ElectionTask</code></p>
<p><strong>实现位置</strong>:<code>com.ling.raft.core.task.ElectionTask.java</code></p>
<p><strong>核心逻辑</strong>:</p>
<pre><code class="language-java">@Override
public void run() {
    try {
      // Leader 不需要选举
      if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
            log.debug("Current node is LEADER, skip election");
            return;
      }

      // 检查是否超时
      long currentTime = System.currentTimeMillis();
      int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
      long timeElapsed = currentTime - node.getPrevElectionTime();

      if (timeElapsed &lt; electionTimeoutMs) {
            // 未超时,重新设置定时器
            node.resetElectionTimer();
            return;
      }

      // 选举超时,开始新一轮选举
      log.info("========================================");
      log.info("ELECTION TIMEOUT DETECTED!");
      log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
      log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
      log.info("Converting to CANDIDATE and starting new election...");
      log.info("========================================");

      node.becomeCandidate();

    } catch (Exception e) {
      log.error("Error in election task", e);
      if (node.getIsRunning().get()) {
            node.resetElectionTimer();
      }
    }
}
</code></pre>
<p><strong>特点</strong>:</p>
<ol>
<li><strong>跳过 Leader</strong>:Leader 不需要选举</li>
<li><strong>严格超时检查</strong>:确保真的超时才发起选举</li>
<li><strong>日志详细</strong>:记录选举超时的关键信息</li>
</ol>
<h4 id="422-选举定时器管理">4.2.2 选举定时器管理</h4>
<p><strong>方法</strong>:<code>resetElectionTimer()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:455-475</code></p>
<pre><code class="language-java">public void resetElectionTimer() {
    if (!isRunning.get()) {
      return;
    }

    // 取消旧的定时任务
    cancelElectionTimer();

    // 计算随机超时时间
    int randomTimeout = raftConfig.getElectionTimeoutMs();

    // 更新超时时间戳
    prevElectionTime = System.currentTimeMillis();

    // 设置新的定时任务
    electionFuture = electionExecutor.schedule(
      new ElectionTask(this),
      randomTimeout,
      TimeUnit.MILLISECONDS
    );

    log.debug("Election timer reset, timeout: {}ms", randomTimeout);
}
</code></pre>
<p><strong>调用时机</strong>:</p>
<ol>
<li>节点初始化为 Follower</li>
<li>收到有效心跳</li>
<li>收到投票请求(即使拒绝)</li>
<li>转为 Follower(从任何状态)</li>
<li>转为 Candidate(重新开始计时)</li>
</ol>
<h4 id="423-超时时间随机化">4.2.3 超时时间随机化</h4>
<p><strong>配置方式</strong>:</p>
<pre><code class="language-java">RaftConfig config = new RaftConfig(currentNode, allNodes);
config.setElectionTimeout(2);// 基础倍数
config.setElectionTimeoutRandomRange(Range.of(150, 300));// 随机范围
</code></pre>
<p><strong>实现原理</strong>:</p>
<pre><code class="language-java">// RaftConfig 内部实现
public int getElectionTimeoutMs() {
    if (electionTimeoutRandomRange == null) {
      return electionTimeout * 1000;
    }

    // 在随机范围内选择一个值
    int min = electionTimeoutRandomRange.getMin();
    int max = electionTimeoutRandomRange.getMax();
    Random random = new Random();
    return min + random.nextInt(max - min + 1);
}
</code></pre>
<p><strong>避免平票的原理</strong>:</p>
<ul>
<li>3 个节点超时时间分别为:170ms、220ms、280ms</li>
<li>node1 先超时发起选举</li>
<li>node2 和 node3 收到投票请求后重置超时时间</li>
<li>node1 获得多数派(自己的票),成为 Leader</li>
</ul>
<hr>
<h3 id="43-投票请求处理">4.3 投票请求处理</h3>
<h4 id="431-requestvote-rpc">4.3.1 RequestVote RPC</h4>
<p><strong>请求格式</strong>:<code>VoteRequest</code></p>
<p><strong>字段说明</strong>:</p>
<pre><code class="language-java">public class VoteRequest {
    private long term;          // candidate 的任期号
    private String candidateId; // candidate 的节点 ID
    private long lastLogIndex;// candidate 最后一条日志的索引
    private long lastLogTerm;   // candidate 最后一条日志的任期号
}
</code></pre>
<p><strong>响应格式</strong>:<code>VoteResponse</code></p>
<p><strong>字段说明</strong>:</p>
<pre><code class="language-java">public class VoteResponse {
    private long term;          // 当前任期(用于更新 candidate 的任期)
    private boolean voteGranted; // 是否投票
}
</code></pre>
<h4 id="432-投票逻辑">4.3.2 投票逻辑</h4>
<p><strong>方法</strong>:<code>requestVote(VoteRequest voteRequest)</code></p>
<p><strong>实现位置</strong>:<code>ConsensusModuleImpl.java:45-90</code></p>
<pre><code class="language-java">@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
    voteLock.lock();
    try {
      long currentTerm = node.getCurrentTerm();
      String votedFor = node.getVotedFor();
      String candidateId = voteRequest.getCandidateId();

      log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
                candidateId, voteRequest.getTerm(), currentTerm, votedFor);

      // 1. 任期检查
      if (voteRequest.getTerm() &lt; currentTerm) {
            log.info("Rejected: candidate term {} &lt; current term {}",
                  voteRequest.getTerm(), currentTerm);
            return new VoteResponse(currentTerm, false);
      }

      // 2. 任期更大,更新并转为 Follower
      if (voteRequest.getTerm() &gt; currentTerm) {
            log.info("Higher term received: {} -&gt; {}, becoming FOLLOWER",
                  currentTerm, voteRequest.getTerm());
            node.becomeFollower(voteRequest.getTerm());
            currentTerm = node.getCurrentTerm();
            votedFor = node.getVotedFor();
      }

      // 3. 检查是否已投票给其他人
      if (votedFor != null &amp;&amp; !votedFor.equals(candidateId)) {
            log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
            return new VoteResponse(currentTerm, false);
      }

      // 4. 检查日志是否至少一样新
      if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
            log.info("Voting for candidate: {}", candidateId);
            node.setVotedFor(candidateId);
            node.setPrevElectionTime(System.currentTimeMillis());// 重置超时
            return new VoteResponse(currentTerm, true);
      } else {
            log.info("Candidate log not up to date");
            return new VoteResponse(currentTerm, false);
      }
    } finally {
      voteLock.unlock();
    }
}
</code></pre>
<p><strong>投票规则详解</strong>:</p>
<ol>
<li>
<p><strong>任期检查</strong></p>
<ul>
<li>candidate 的任期 &lt; 当前任期 → 拒绝</li>
</ul>
</li>
<li>
<p><strong>任期更新</strong></p>
<ul>
<li>candidate 的任期 &gt; 当前任期 → 更新任期,转为 Follower</li>
</ul>
</li>
<li>
<p><strong>唯一投票</strong></p>
<ul>
<li>本轮任期已投票给其他人 → 拒绝</li>
<li>已投票给该 candidate → 接受(幂等性)</li>
</ul>
</li>
<li>
<p><strong>日志完整性</strong></p>
<ul>
<li>candidate 的日志 &gt;= 自己的日志 → 接受</li>
<li>否则 → 拒绝</li>
</ul>
</li>
</ol>
<h4 id="433-日志比较逻辑">4.3.3 日志比较逻辑</h4>
<p><strong>方法</strong>:<code>isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)</code></p>
<p><strong>实现位置</strong>:<code>ConsensusModuleImpl.java:337-350</code></p>
<pre><code class="language-java">private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
    long lastLogTerm = getLastLogTerm();
    long lastLogIndex = getLastLogIndex();

    // 优先比较任期:candidate 的任期更大 → 更新
    if (candidateLastLogTerm &gt; lastLogTerm) {
      return true;
    }

    // 任期相同,比较索引:candidate 的索引 &gt;= 自己的索引 → 更新
    if (candidateLastLogTerm == lastLogTerm &amp;&amp; candidateLastLogIndex &gt;= lastLogIndex) {
      return true;
    }

    // 其他情况 → 不更新
    return false;
}
</code></pre>
<p><strong>示例</strong>:</p>
<pre><code>情况 1: candidate 任期更大
candidate: term=3, index=5
current:term=2, index=5
→ 投票 (任期更大)

情况 2: 任期相同,索引更大或相等
candidate: term=2, index=5
current:term=2, index=4
→ 投票 (索引更大)

情况 3: 任期相同,索引更小
candidate: term=2, index=4
current:term=2, index=5
→ 不投票 (日志落后)

情况 4: 任期更小
candidate: term=1, index=10
current:term=2, index=5
→ 不投票 (任期更小)
</code></pre>
<h4 id="434-投票锁">4.3.4 投票锁</h4>
<p><strong>目的</strong>:防止并发投票请求导致状态不一致</p>
<p><strong>实现</strong>:</p>
<pre><code class="language-java">public final ReentrantLock voteLock = new ReentrantLock();

@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
    voteLock.lock();
    try {
      // 投票逻辑
      ...
    } finally {
      voteLock.unlock();
    }
}
</code></pre>
<p><strong>保护的资源</strong>:</p>
<ul>
<li><code>currentTerm</code></li>
<li><code>votedFor</code></li>
<li><code>nodeStatus</code></li>
</ul>
<hr>
<h3 id="44-选举发起流程">4.4 选举发起流程</h3>
<h4 id="441-开始选举">4.4.1 开始选举</h4>
<p><strong>方法</strong>:<code>startElection()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:266-289</code></p>
<pre><code class="language-java">private void startElection() {
    int totalNodes = raftConfig.getRaftNodeConfigList().size();
    currentVoteCounter = new VoteCounter(currentTerm, totalNodes);

    // 投票给自己
    currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());

    log.info("Starting election for term: {}, voted for self, votes: {}/{}",
            currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());

    // 单机模式直接成为 Leader
    if (totalNodes == 1) {
      log.info("Single node mode, becoming leader immediately");
      becomeLeader();
      return;
    }

    // 发送投票请求给所有其他节点
    List&lt;RaftNodeConfig&gt; otherNodes = getOtherNodes();
    for (RaftNodeConfig nodeConfig : otherNodes) {
      electionExecutor.execute(() -&gt; sendVoteRequest(nodeConfig));
    }

    // 检查是否已获得多数派(可能只有自己一票的情况)
    checkElectionResult();
}
</code></pre>
<p><strong>流程</strong>:</p>
<ol>
<li>创建投票计数器</li>
<li>投票给自己</li>
<li>单机模式直接成为 Leader</li>
<li>多机模式并发发送投票请求</li>
<li>检查选举结果</li>
</ol>
<h4 id="442-发送投票请求">4.4.2 发送投票请求</h4>
<p><strong>方法</strong>:<code>sendVoteRequest(targetNode)</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:294-316</code></p>
<pre><code class="language-java">private void sendVoteRequest(RaftNodeConfig targetNode) {
    try {
      // 构建 VoteRequest
      VoteRequest request = VoteRequest.builder()
                .term(currentTerm)
                .candidateId(currentNodeConfig.getServerId())
                .lastLogIndex(getLastLogIndex())
                .lastLogTerm(getLastLogTerm())
                .build();
      request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
      request.setCmd(Request.REQUEST_VOTE);

      log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);

      // 发送 RPC 请求
      VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);

      // 处理响应
      if (response != null) {
            handleVoteResponse(response, targetNode.getServerId());
      }
    } catch (Exception e) {
      log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
    }
}
</code></pre>
<p><strong>特点</strong>:</p>
<ol>
<li>并发发送到所有其他节点</li>
<li>使用线程池异步发送</li>
<li>超时设置为 3000ms</li>
<li>失败不重试(等待下一次选举)</li>
</ol>
<h4 id="443-投票计数器">4.4.3 投票计数器</h4>
<p><strong>类名</strong>:<code>VoteCounter</code></p>
<p><strong>实现位置</strong>:<code>com.ling.raft.core.VoteCounter.java</code></p>
<p><strong>核心方法</strong>:</p>
<pre><code class="language-java">// 记录投票
public synchronized boolean recordVote(String nodeId) {
    return votesReceived.add(nodeId);
}

// 投票给自己
public synchronized void voteForSelf(String selfId) {
    if (!votedForSelf) {
      votesReceived.add(selfId);
      votedForSelf = true;
    }
}

// 检查是否获得多数派
public boolean hasMajority() {
    return votesReceived.size() &gt;= majorityCount;
}

// 获取当前票数
public int getVoteCount() {
    return votesReceived.size();
}
</code></pre>
<p><strong>数据结构</strong>:</p>
<ul>
<li>使用 <code>ConcurrentHashMap.newKeySet()</code> 存储投票节点 ID</li>
<li>保证线程安全</li>
<li>自动去重(不会重复计票)</li>
</ul>
<hr>
<h3 id="45-投票响应处理">4.5 投票响应处理</h3>
<h4 id="451-处理投票响应">4.5.1 处理投票响应</h4>
<p><strong>方法</strong>:<code>handleVoteResponse(response, voterId)</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:322-361</code></p>
<pre><code class="language-java">private void handleVoteResponse(VoteResponse response, String voterId) {
    // 使用同步块确保原子性
    synchronized (this) {
      // 如果不是 Candidate,忽略
      if (nodeStatus != ServerStatusEnum.CANDIDATE) {
            log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
                  nodeStatus, voterId);
            return;
      }

      // 如果收到更高任期,转为 Follower
      if (response.getTerm() &gt; currentTerm) {
            log.info("Received higher term {} from {}, stepping down",
                  response.getTerm(), voterId);
            becomeFollower(response.getTerm());
            return;
      }

      // 忽略旧任期的响应
      if (response.getTerm() &lt; currentTerm) {
            log.debug("Received stale vote response from {} for old term {}",
                  voterId, response.getTerm());
            return;
      }

      // 统计投票
      if (response.isVoteGranted()) {
            boolean isNewVote = currentVoteCounter.recordVote(voterId);
            if (isNewVote) {
                log.info("Received vote from {} for term {}, total votes: {}/{}",
                        voterId, currentTerm, currentVoteCounter.getVoteCount(),
                        currentVoteCounter.getMajorityCount());

                // 检查选举结果
                checkElectionResult();
            }
      } else {
            log.debug("Vote denied by {} for term {}", voterId, currentTerm);
      }
    }
}
</code></pre>
<p><strong>处理逻辑</strong>:</p>
<ol>
<li>
<p><strong>状态检查</strong></p>
<ul>
<li>不再是 Candidate → 忽略</li>
</ul>
</li>
<li>
<p><strong>任期检查</strong></p>
<ul>
<li>响应任期 &gt; 当前任期 → 发现更高任期,转为 Follower</li>
<li>响应任期 &lt; 当前任期 → 忽略旧响应</li>
</ul>
</li>
<li>
<p><strong>投票统计</strong></p>
<ul>
<li>投票成功 → 记录投票,检查是否获得多数派</li>
<li>投票失败 → 记录日志</li>
</ul>
</li>
</ol>
<h4 id="452-检查选举结果">4.5.2 检查选举结果</h4>
<p><strong>方法</strong>:<code>checkElectionResult()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:367-373</code></p>
<pre><code class="language-java">private void checkElectionResult() {
    if (currentVoteCounter != null &amp;&amp; currentVoteCounter.hasMajority()) {
      log.info("Majority votes received ({}/{}), becoming LEADER",
                currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
      becomeLeader();
    }
}
</code></pre>
<p><strong>调用时机</strong>:</p>
<ol>
<li>投票给自己后(单机模式)</li>
<li>收到每个投票响应后</li>
<li>所有投票请求发送后(初始检查)</li>
</ol>
<hr>
<h3 id="46-心跳机制">4.6 心跳机制</h3>
<h4 id="461-心跳任务">4.6.1 心跳任务</h4>
<p><strong>类名</strong>:<code>HeartbeatTask</code></p>
<p><strong>实现位置</strong>:<code>com.ling.raft.core.task.HeartbeatTask.java</code></p>
<pre><code class="language-java">@Override
public void run() {
    try {
      // 只有 Leader 才发送心跳
      if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
            log.debug("Current node is not LEADER, skip heartbeat");
            return;
      }

      log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());

      // 发送心跳给所有节点
      node.sendHeartbeats();

    } catch (Exception e) {
      log.error("Error in heartbeat task", e);
    }
}
</code></pre>
<h4 id="462-发送心跳">4.6.2 发送心跳</h4>
<p><strong>方法</strong>:<code>sendHeartbeats()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:407-413</code></p>
<pre><code class="language-java">public void sendHeartbeats() {
    List&lt;RaftNodeConfig&gt; otherNodes = getOtherNodes();

    for (RaftNodeConfig nodeConfig : otherNodes) {
      heartbeatExecutor.execute(() -&gt; sendHeartbeat(nodeConfig));
    }
}
</code></pre>
<h4 id="463-单次心跳发送">4.6.3 单次心跳发送</h4>
<p><strong>方法</strong>:<code>sendHeartbeat(targetNode)</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:418-436</code></p>
<pre><code class="language-java">private void sendHeartbeat(RaftNodeConfig targetNode) {
    try {
      // 构建心跳请求(entries 为空)
      AppendEntriesRequest request = AppendEntriesRequest.builder()
                .term(currentTerm)
                .leaderId(currentNodeConfig.getServerId())
                .entries(new ArrayList&lt;&gt;())// 空列表表示心跳
                .build();
      request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
      request.setCmd(Request.APPEND_ENTRIES);

      // 发送请求
      AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);

      // 处理响应
      if (response != null) {
            handleHeartbeatResponse(response, targetNode.getServerId());
      }
    } catch (Exception e) {
      log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
    }
}
</code></pre>
<p><strong>心跳特点</strong>:</p>
<ul>
<li><code>entries</code> 为空列表</li>
<li>只包含 <code>term</code>、<code>leaderId</code> 等元数据</li>
<li>用于维护 Leader 地位,防止 Follower 发起新选举</li>
</ul>
<h4 id="464-心跳定时器">4.6.4 心跳定时器</h4>
<p><strong>方法</strong>:<code>startHeartbeatTimer()</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:380-391</code></p>
<pre><code class="language-java">private void startHeartbeatTimer() {
    int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();

    heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
            new HeartbeatTask(this),
            0,// 立即开始
            heartbeatInterval,// 间隔
            TimeUnit.MILLISECONDS
    );

    log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
}
</code></pre>
<p><strong>配置示例</strong>:</p>
<pre><code class="language-java">config.setHeartbeatInterval(1);// 每 1 秒发送一次心跳
</code></pre>
<h4 id="465-心跳响应处理">4.6.5 心跳响应处理</h4>
<p><strong>方法</strong>:<code>handleHeartbeatResponse(response, nodeId)</code></p>
<p><strong>实现位置</strong>:<code>RaftNodeImpl.java:441-448</code></p>
<pre><code class="language-java">private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
    // 如果响应的任期更大,转为 Follower
    if (response.getTerm() &gt; currentTerm) {
      log.info("Received higher term {} from {} in heartbeat response, stepping down",
                response.getTerm(), nodeId);
      becomeFollower(response.getTerm());
    }
}
</code></pre>
<p><strong>处理逻辑</strong>:</p>
<ul>
<li>检查响应中的任期</li>
<li>发现更高任期 → 立即转为 Follower</li>
<li>避免网络分区导致的脑裂</li>
</ul>
<hr>
<h3 id="47-安全性保证">4.7 安全性保证</h3>
<h4 id="471-选举安全性">4.7.1 选举安全性</h4>
<p><strong>目标</strong>:任期内最多一个 Leader</p>
<p><strong>实现</strong>:</p>
<ol>
<li>
<p><strong>任期单调递增</strong></p>
<pre><code class="language-java">public void becomeCandidate() {
    currentTerm++;// 每次选举增加任期
}
</code></pre>
</li>
<li>
<p><strong>只投一次票</strong></p>
<pre><code class="language-java">// ConsensusModuleImpl.requestVote()
if (votedFor != null &amp;&amp; !votedFor.equals(candidateId)) {
    return new VoteResponse(currentTerm, false);
}
</code></pre>
</li>
<li>
<p><strong>多数派约束</strong></p>
<pre><code class="language-java">// VoteCounter
public boolean hasMajority() {
    return votesReceived.size() &gt;= majorityCount;// N/2 + 1
}
</code></pre>
</li>
</ol>
<h4 id="472-任期更新规则">4.7.2 任期更新规则</h4>
<p><strong>规则</strong>:发现更高任期 → 更新任期,转为 Follower</p>
<p><strong>实现位置</strong>:</p>
<ul>
<li><code>ConsensusModuleImpl.requestVote()</code> 第 63-68 行</li>
<li><code>ConsensusModuleImpl.appendEntries()</code> 第 128-134 行</li>
<li><code>RaftNodeImpl.handleVoteResponse()</code> 第 333-337 行</li>
<li><code>RaftNodeImpl.handleHeartbeatResponse()</code> 第 443-447 行</li>
</ul>
<p><strong>示例</strong>:</p>
<pre><code class="language-java">// 在 requestVote 中
if (voteRequest.getTerm() &gt; currentTerm) {
    node.becomeFollower(voteRequest.getTerm());
    currentTerm = node.getCurrentTerm();
}
</code></pre>
<h4 id="473-日志完整性检查">4.7.3 日志完整性检查</h4>
<p><strong>目的</strong>:只投票给日志至少和自己一样新的候选人</p>
<p><strong>实现</strong>:<code>isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)</code></p>
<p><strong>规则</strong>:</p>
<ol>
<li>candidate 任期 &gt; 自己任期 → 投票</li>
<li>任期相同,candidate 索引 &gt;= 自己索引 → 投票</li>
<li>否则 → 拒绝</li>
</ol>
<p><strong>重要性</strong>:</p>
<ul>
<li>保证新 Leader 包含所有已提交的日志</li>
<li>防止日志丢失或覆盖</li>
</ul>
<h4 id="474-脑裂预防">4.7.4 脑裂预防</h4>
<p><strong>场景</strong>:网络分区,两个 Leader 同时存在</p>
<p><strong>预防机制</strong>:</p>
<ol>
<li>
<p><strong>多数派约束</strong></p>
<ul>
<li>Leader 需要多数派支持</li>
<li>分区后的少数派无法获得足够票数</li>
</ul>
</li>
<li>
<p><strong>心跳超时</strong></p>
<ul>
<li>少数派 Follower 收不到心跳</li>
<li>选举超时后发起选举</li>
<li>多数派选出新 Leader</li>
</ul>
</li>
<li>
<p><strong>任期递增</strong></p>
<ul>
<li>新 Leader 使用更高任期</li>
<li>旧 Leader 的心跳被拒绝</li>
</ul>
</li>
</ol>
<p><strong>示例</strong>:</p>
<pre><code>初始状态:5 节点(node1-5),Leader=node1

网络分区:
- 分区 A: node1, node2 (2 节点)
- 分区 B: node3, node4, node5 (3 节点)

分区 A:
- node1 仍是 Leader
- node2 收不到心跳,超时后转为 Candidate
- 只有 1 票(自己),无法获得多数派(需要 3 票)
- 无法选出新 Leader

分区 B:
- node3 超时后发起选举
- 获得自己 + node4 + node5 的票(3 票)
- 成为新 Leader(term=2)

网络恢复后:
- node1 发送心跳(term=1)
- 其他节点拒绝(term=2 &gt; term=1)
- node1 收到更高任期,转为 Follower
</code></pre>
<hr>
<h2 id="5-测试指南">5. 测试指南</h2>
<h3 id="51-测试程序">5.1 测试程序</h3>
<p><strong>文件位置</strong>:</p>
<pre><code>LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java
</code></pre>
<p><strong>运行方式</strong>:</p>
<pre><code class="language-bash"># 直接运行 main 方法
java -cp &lt;classpath&gt; com.ling.raft.example.leader.ThreeNodeElectionTest
</code></pre>
<p><strong>脚本运行</strong>:</p>
<pre><code class="language-bash">cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
start-cluster.bat
</code></pre>
<h3 id="52-测试功能">5.2 测试功能</h3>
<h4 id="521-基本测试场景">5.2.1 基本测试场景</h4>
<p><strong>场景 1:正常选举</strong></p>
<pre><code> Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!

Waiting for leader election...
node1:C(t1) node2:F(t1) node3:F(t1)
node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!
</code></pre>
<p><strong>场景 2:Leader 故障</strong></p>
<pre><code>raft&gt; kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
node1:F(t1) node2:C(t2) node3:F(t2)
node1:F(t2) node2:L(t2) node3:F(t2)
</code></pre>
<p><strong>场景 3:节点恢复</strong></p>
<pre><code>raft&gt; revive node1
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
node1:F(t2) node2:L(t2) node3:F(t2)
</code></pre>
<h4 id="522-交互式命令">5.2.2 交互式命令</h4>
<table>
<thead>
<tr>
<th>命令</th>
<th>说明</th>
<th>示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>status</code></td>
<td>查看所有节点状态</td>
<td><code>status</code></td>
</tr>
<tr>
<td><code>leader</code></td>
<td>显示当前 Leader 信息</td>
<td><code>leader</code></td>
</tr>
<tr>
<td><code>kill &lt;node&gt;</code></td>
<td>模拟节点故障</td>
<td><code>kill node1</code></td>
</tr>
<tr>
<td><code>revive &lt;node&gt;</code></td>
<td>恢复节点</td>
<td><code>revive node1</code></td>
</tr>
<tr>
<td><code>log &lt;level&gt;</code></td>
<td>控制日志级别</td>
<td><code>log debug</code></td>
</tr>
<tr>
<td><code>stop</code></td>
<td>停止所有节点并退出</td>
<td><code>stop</code></td>
</tr>
</tbody>
</table>
<h4 id="523-日志级别控制">5.2.3 日志级别控制</h4>
<p><strong>控制方式</strong>:</p>
<pre><code>raft&gt; log silent
✓ Log level set to ERROR (silent mode)

raft&gt; log info
✓ Log level set to INFO

raft&gt; log debug
✓ Log level set to DEBUG (verbose mode)

raft&gt; log election
✓ Showing election logs only

raft&gt; log heartbeat
✓ Showing heartbeat logs only
</code></pre>
<p><strong>日志级别说明</strong>:</p>
<ul>
<li><code>silent/error</code> - 仅错误信息</li>
<li><code>warn</code> - 警告及以上</li>
<li><code>info</code> - 信息及以上(默认)</li>
<li><code>debug</code> - 调试信息(全部日志)</li>
<li><code>election</code> - 仅选举相关日志</li>
<li><code>heartbeat</code> - 仅心跳相关日志</li>
</ul>
<h3 id="53-预期输出">5.3 预期输出</h3>
<h4 id="正常选举">正常选举</h4>
<pre><code>╔════════════════════════════════════════════════════════════╗
║          Raft Leader Election Test - 3 Nodes               ║
╚════════════════════════════════════════════════════════════╝

Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!

Waiting for leader election...
node1:F(t1) node2:F(t1) node3:F(t1)
node1:C(t1) node2:F(t1) node3:F(t1)
node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!

┌────────────────────────────────────────────────────────────┐
│                     Cluster Status                         │
├────────────┬──────────────┬─────────┬─────────┬────────────┤
│ Node       │ Status       │ Term    │ Log   │ Voted For│
├────────────┼──────────────┼─────────┼─────────┼────────────┤
│ node1      │ LEADER       │ 1       │ 0       │ -          │
│ node2      │ FOLLOWER   │ 1       │ 0       │ node1      │
│ node3      │ FOLLOWER   │ 1       │ 0       │ node1      │
└────────────┴──────────────┴─────────┴─────────┴────────────┘
</code></pre>
<h4 id="leader-故障恢复">Leader 故障恢复</h4>
<pre><code>raft&gt; kill node1
Killing node1...
✓ node1 stopped
! Leader killed, waiting for new election...
node1:F(t1) node2:C(t2) node3:F(t2)
node1:F(t2) node2:L(t2) node3:F(t2)

raft&gt; leader
┌────────────────────────────────────────────────────────────┐
│                      Leader Info                           │
├────────────────────────────────────────────────────────────┤
│Node ID:   node2                                       │
│Address:   127.0.0.1:8082                                 │
│Term:      2                                              │
└────────────────────────────────────────────────────────────┘

raft&gt; revive node1
Reviving node1...
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
node1:F(t2) node2:L(t2) node3:F(t2)
</code></pre>
<h3 id="54-完整测试流程">5.4 完整测试流程</h3>
<h4 id="步骤-1启动并验证选举">步骤 1:启动并验证选举</h4>
<pre><code class="language-bash"># 运行测试程序
java com.ling.raft.example.leader.ThreeNodeElectionTest

# 观察选举过程
node1:F(t1) node2:F(t1) node3:F(t1)
node1:C(t1) node2:F(t1) node3:F(t1)
node1:L(t1) node2:F(t1) node3:F(t1)

# 查看当前状态
raft&gt; status
</code></pre>
<h4 id="步骤-2验证心跳">步骤 2:验证心跳</h4>
<pre><code class="language-bash"># 等待几秒,观察心跳
node1:L(t1) node2:F(t1) node3:F(t1)
node1:L(t1) node2:F(t1) node3:F(t1)
node1:L(t1) node2:F(t1) node3:F(t1)

# 启用心跳日志观察
raft&gt; log heartbeat
✓ Showing heartbeat logs only
</code></pre>
<h4 id="步骤-3模拟-leader-故障">步骤 3:模拟 Leader 故障</h4>
<pre><code class="language-bash"># 杀死 Leader
raft&gt; kill node1
✓ node1 stopped
! Leader killed, waiting for new election...

# 观察新选举
node1:F(t1) node2:C(t2) node3:F(t2)
node1:F(t2) node2:L(t2) node3:F(t2)
</code></pre>
<h4 id="步骤-4恢复旧-leader">步骤 4:恢复旧 Leader</h4>
<pre><code class="language-bash"># 恢复节点
raft&gt; revive node1
✓ node1 revived

# 观察恢复过程
node1:F(t2) node2:L(t2) node3:F(t2)
</code></pre>
<h4 id="步骤-5多次故障测试">步骤 5:多次故障测试</h4>
<pre><code class="language-bash"># 持续故障恢复
raft&gt; kill node2
raft&gt; kill node3
raft&gt; revive node2
raft&gt; revive node3
</code></pre>
<hr>
<h2 id="6-使用示例">6. 使用示例</h2>
<h3 id="61-基本使用">6.1 基本使用</h3>
<pre><code class="language-java">// 1. 创建节点配置
RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
List&lt;RaftNodeConfig&gt; allNodes = Arrays.asList(node1, node2, node3);

// 2. 创建 Raft 配置
RaftConfig config1 = new RaftConfig(node1, allNodes);
config1.setElectionTimeout(2);// 基础超时倍数
config1.setElectionTimeoutRandomRange(Range.of(150, 300));// 随机范围
config1.setHeartbeatInterval(1);// 心跳间隔 1 秒

// 3. 创建 RPC 组件
DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
DefaultRpcClient rpcClient1 = new DefaultRpcClient();

// 4. 创建并初始化 Raft 节点
RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
rpcServer1.setRaftNode(raftNode1);
raftNode1.init();

// 5. 等待选举
Thread.sleep(2000);

// 6. 检查节点状态
if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
    System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
}
</code></pre>
<h3 id="62-监控选举状态">6.2 监控选举状态</h3>
<pre><code class="language-java">// 创建监控线程
Thread monitor = new Thread(() -&gt; {
    while (true) {
      System.out.printf("Node1: %s(t%d) ",
            raftNode1.getNodeStatus(),
            raftNode1.getCurrentTerm());

      System.out.printf("Node2: %s(t%d) ",
            raftNode2.getNodeStatus(),
            raftNode2.getCurrentTerm());

      System.out.printf("Node3: %s(t%d)\n",
            raftNode3.getNodeStatus(),
            raftNode3.getCurrentTerm());

      Thread.sleep(3000);
    }
});
monitor.setDaemon(true);
monitor.start();
</code></pre>
<h3 id="63-手动触发选举">6.3 手动触发选举</h3>
<pre><code class="language-java">// 停止 Leader 的心跳定时器
raftNode1.cancelHeartbeatTimer();

// 模拟 Follower 超时
raftNode2.resetElectionTimer();// 重置超时
// 等待超时后,node2 会自动发起选举
</code></pre>
<h3 id="64-查询投票信息">6.4 查询投票信息</h3>
<pre><code class="language-java">// 获取当前投票信息
String votedFor = raftNode1.getVotedFor();
long currentTerm = raftNode1.getCurrentTerm();
ServerStatusEnum status = raftNode1.getNodeStatus();

System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);

// 如果是 Candidate,查看投票计数器
if (status == ServerStatusEnum.CANDIDATE) {
    VoteCounter counter = raftNode1.getCurrentVoteCounter();
    System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
}
</code></pre>
<hr>
<h2 id="7-常见问题">7. 常见问题</h2>
<h3 id="71-为什么选举超时需要随机化">7.1 为什么选举超时需要随机化?</h3>
<p><strong>原因</strong>:</p>
<ul>
<li>如果所有节点使用固定的超时时间,可能同时超时</li>
<li>同时超时的节点会同时发起选举</li>
<li>导致平票(split vote),需要重新选举</li>
<li>随机化可以避免多个节点同时超时</li>
</ul>
<p><strong>示例</strong>:</p>
<pre><code>不随机化(3 个节点都使用 200ms):
t=0ms: 所有节点启动
t=200ms: 3 个节点同时超时,都转为 Candidate
t=201ms: 3 个节点都发送投票请求
t=210ms: 每个节点只收到自己的票(1 票)
t=220ms: 选举超时,重新选举(平票)

随机化(3 个节点使用 150-300ms 随机):
t=0ms: 所有节点启动
t=170ms: node1 超时,发起选举
t=171ms: node2 和 node3 收到投票请求,重置超时
t=220ms: node2 超时(新时间)
t=221ms: node1 已经是 Leader,node2 收到心跳,重置超时
t=280ms: node3 超时
t=281ms: node3 收到心跳,重置超时
t=1000ms: 心跳继续,node1 保持 Leader
</code></pre>
<p><strong>代码实现</strong>:</p>
<pre><code class="language-java">// RaftConfig.java
public int getElectionTimeoutMs() {
    if (electionTimeoutRandomRange == null) {
      return electionTimeout * 1000;
    }

    int min = electionTimeoutRandomRange.getMin();
    int max = electionTimeoutRandomRange.getMax();
    Random random = new Random();
    return min + random.nextInt(max - min + 1);
}
</code></pre>
<h3 id="72-为什么收到投票请求后要重置超时">7.2 为什么收到投票请求后要重置超时?</h3>
<p><strong>原因</strong>:</p>
<ul>
<li>收到投票请求表示至少有一个其他节点是活跃的</li>
<li>重置超时可以减少不必要的选举</li>
<li>避免频繁切换状态</li>
</ul>
<p><strong>代码实现</strong>:</p>
<pre><code class="language-java">// RaftNodeImpl.java:498-500
@Override
public VoteResponse handleVoteRequest(VoteRequest voteRequest) {
    VoteResponse response = consensus.requestVote(voteRequest);

    // 如果投票给了对方,重置选举定时器
    if (response.isVoteGranted()) {
      resetElectionTimer();
    }

    return response;
}
</code></pre>
<h3 id="73-为什么-candidate-要增加任期">7.3 为什么 Candidate 要增加任期?</h3>
<p><strong>原因</strong>:</p>
<ul>
<li>避免使用旧任期发起新的选举</li>
<li>区分不同轮的选举</li>
<li>保证任期单调递增</li>
</ul>
<p><strong>代码实现</strong>:</p>
<pre><code class="language-java">// RaftNodeImpl.java:201-216
public void becomeCandidate() {
    ServerStatusEnum oldStatus = nodeStatus;

    // 增加任期号(重要!)
    currentTerm++;
    nodeStatus = ServerStatusEnum.CANDIDATE;
    votedFor = currentNodeConfig.getServerId();

    log.info("State changed: {} -&gt; CANDIDATE, new term: {}", oldStatus, currentTerm);

    resetElectionTimer();
    startElection();
}
</code></pre>
<h3 id="74-如何处理网络分区">7.4 如何处理网络分区?</h3>
<p><strong>Raft 的保证</strong>:</p>
<ul>
<li>旧 Leader 无法获得多数派,无法提交新日志</li>
<li>新 Leader 会在多数派分区选举产生</li>
<li>网络恢复后,旧 Leader 会转为 Follower</li>
</ul>
<p><strong>代码体现</strong>:</p>
<pre><code class="language-java">// 旧 Leader 的心跳被拒绝
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
    if (response.getTerm() &gt; currentTerm) {
      log.info("Received higher term {}, stepping down", response.getTerm());
      becomeFollower(response.getTerm());
    }
}

// 旧 Leader 无法获得多数派
public boolean hasMajority() {
    return votesReceived.size() &gt;= majorityCount;// N/2 + 1
}
</code></pre>
<h3 id="75-为什么心跳间隔通常远小于选举超时">7.5 为什么心跳间隔通常远小于选举超时?</h3>
<p><strong>原因</strong>:</p>
<ul>
<li>心跳间隔短(如 100ms),选举超时长(如 200-300ms)</li>
<li>确保 Follower 在超时前收到心跳</li>
<li>避免不必要的选举</li>
</ul>
<p><strong>配置示例</strong>:</p>
<pre><code class="language-java">config.setHeartbeatInterval(1);// 1 秒(1000ms)
config.setElectionTimeoutRandomRange(Range.of(150, 300));// 150-300ms

// 注意:这里心跳间隔是秒,超时是毫秒
// 实际使用时,心跳间隔应该 &lt; 选举超时
</code></pre>
<p><strong>建议配置</strong>:</p>
<pre><code>心跳间隔:50ms - 100ms
选举超时:150ms - 300ms
</code></pre>
<h3 id="76-如何避免平票split-vote">7.6 如何避免平票(split vote)?</h3>
<p><strong>平票场景</strong>:</p>
<pre><code>3 个节点:
- node1: term=2, votes=
- node2: term=2, votes=
- node3: term=2, votes=

每个节点只有 1 票,无法获得多数派(需要 2 票)
选举超时后重新选举
</code></pre>
<p><strong>避免方法</strong>:</p>
<ol>
<li>
<p><strong>随机化超时</strong>(已实现)</p>
<ul>
<li>减少多个节点同时超时的概率</li>
</ul>
</li>
<li>
<p><strong>预投票(Pre-vote)</strong>(未实现)</p>
<ul>
<li>先询问其他节点是否愿意投票</li>
<li>如果多数派同意,再真正发起选举</li>
</ul>
</li>
<li>
<p><strong>快速重试</strong>(未实现)</p>
<ul>
<li>平票后快速重新选举</li>
<li>立即开始,不等超时</li>
</ul>
</li>
</ol>
<p><strong>当前实现</strong>:</p>
<ul>
<li>仅依赖超时随机化</li>
<li>平票后等待超时重试</li>
</ul>
<h3 id="77-为什么单机模式直接成为-leader">7.7 为什么单机模式直接成为 Leader?</h3>
<p><strong>原因</strong>:</p>
<ul>
<li>单机集群不需要选举</li>
<li>只有一个节点,自己就是多数派</li>
<li>提高启动速度</li>
</ul>
<p><strong>代码实现</strong>:</p>
<pre><code class="language-java">// RaftNodeImpl.java:274-279
private void startElection() {
    int totalNodes = raftConfig.getRaftNodeConfigList().size();
    currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
    currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());

    // 单机模式直接成为 Leader
    if (totalNodes == 1) {
      log.info("Single node mode, becoming leader immediately");
      becomeLeader();
      return;
    }

    // 多机模式发送投票请求
    ...
}
</code></pre>
<h3 id="78-如何调优选举参数">7.8 如何调优选举参数?</h3>
<p><strong>参数建议</strong>:</p>
<table>
<thead>
<tr>
<th>参数</th>
<th>推荐值</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>心跳间隔</strong></td>
<td>50ms - 100ms</td>
<td>越短越快,但网络开销大</td>
</tr>
<tr>
<td><strong>选举超时最小</strong></td>
<td>150ms - 200ms</td>
<td>应该 &gt; 心跳间隔</td>
</tr>
<tr>
<td><strong>选举超时最大</strong></td>
<td>300ms - 400ms</td>
<td>应该是心跳间隔的 3-5 倍</td>
</tr>
<tr>
<td><strong>RPC 超时</strong></td>
<td>2000ms - 3000ms</td>
<td>应该 &gt; 选举超时</td>
</tr>
</tbody>
</table>
<p><strong>调优示例</strong>:</p>
<pre><code class="language-java">// 低延迟场景(数据中心内)
config.setHeartbeatInterval(1);      // 1ms
config.setElectionTimeoutRandomRange(Range.of(10, 20));// 10-20ms

// 高稳定性场景(广域网)
config.setHeartbeatInterval(100);    // 100ms
config.setElectionTimeoutRandomRange(Range.of(500, 1000));// 500-1000ms

// 开发调试场景
config.setHeartbeatInterval(1);      // 1秒
config.setElectionTimeoutRandomRange(Range.of(2000, 4000));// 2-4秒
</code></pre>
<hr>
<h2 id="附录">附录</h2>
<h3 id="a-术语表">A. 术语表</h3>
<table>
<thead>
<tr>
<th>术语</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Term</strong></td>
<td>任期号,单调递增,用于识别 Leader</td>
</tr>
<tr>
<td><strong>Election Timeout</strong></td>
<td>选举超时时间,随机化避免平票</td>
</tr>
<tr>
<td><strong>Heartbeat</strong></td>
<td>心跳,Leader 定期发送维持地位</td>
</tr>
<tr>
<td><strong>Majority</strong></td>
<td>多数派,超过半数的节点(N/2 + 1)</td>
</tr>
<tr>
<td><strong>Split Vote</strong></td>
<td>平票选举,没有节点获得多数派</td>
</tr>
<tr>
<td><strong>Candidate</strong></td>
<td>候选节点,发起选举的节点</td>
</tr>
<tr>
<td><strong>Leader</strong></td>
<td>主节点,处理客户端请求</td>
</tr>
<tr>
<td><strong>Follower</strong></td>
<td>从节点,响应 Leader 的请求</td>
</tr>
</tbody>
</table>
<h3 id="b-参考资料">B. 参考资料</h3>
<ol>
<li><strong>Raft 论文</strong>:Diego Ongaro, John Ousterhout. "In Search of an Understandable Consensus Algorithm." 2014</li>
<li><strong>Raft GitHub</strong>:https://github.com/ongardie/raft.github.io</li>
<li><strong>可视化 Raft</strong>:http://thesecretlivesofdata.com/raft/</li>
<li><strong>Raft Scope</strong>:https://raft.github.io/raftscope/index.html</li>
</ol>
<h3 id="c-相关文件">C. 相关文件</h3>
<table>
<thead>
<tr>
<th>文件</th>
<th>路径</th>
</tr>
</thead>
<tbody>
<tr>
<td>RaftNodeImpl</td>
<td><code>com.ling.raft.core.RaftNodeImpl</code></td>
</tr>
<tr>
<td>ConsensusModuleImpl</td>
<td><code>com.ling.raft.core.ConsensusModuleImpl</code></td>
</tr>
<tr>
<td>VoteCounter</td>
<td><code>com.ling.raft.core.VoteCounter</code></td>
</tr>
<tr>
<td>ElectionTask</td>
<td><code>com.ling.raft.core.task.ElectionTask</code></td>
</tr>
<tr>
<td>HeartbeatTask</td>
<td><code>com.ling.raft.core.task.HeartbeatTask</code></td>
</tr>
<tr>
<td>ServerStatusEnum</td>
<td><code>com.ling.raft.enums.ServerStatusEnum</code></td>
</tr>
<tr>
<td>VoteRequest</td>
<td><code>com.ling.raft.model.dto.VoteRequest</code></td>
</tr>
<tr>
<td>VoteResponse</td>
<td><code>com.ling.raft.model.dto.VoteResponse</code></td>
</tr>
<tr>
<td>ThreeNodeElectionTest</td>
<td><code>com.ling.raft.example.leader.ThreeNodeElectionTest</code></td>
</tr>
</tbody>
</table>
<hr><br><br>
来源:https://www.cnblogs.com/CoffeeToCode/p/19559476
頁: [1]
查看完整版本: 收入写RAFT算法(一)Leader选举