机敏的香菇 發表於 2025-5-19 23:14:00

RocketMQ客户端是如何感知Broker节点的?

<p>本文已收录在Github,<strong>关注我,紧跟本系列专栏文章,咱们下篇再续!</strong></p>
<ul>
<li>🚀 魔都架构师 | 全网30W技术追随者</li>
<li>🔧 大厂分布式系统/数据中台实战专家</li>
<li>🏆 主导交易系统百万级流量调优 &amp; 车联网平台架构</li>
<li>🧠 AIGC应用开发先行者 | 区块链落地实践者</li>
<li>🌍 以技术驱动创新,我们的征途是改变世界!</li>
<li>👉 实战干货:编程严选网</li>
</ul>
<h2 id="0-前言">0 前言</h2>
<p>RocketMQ的Pro只需配置一个接入地址,即可访问整个集群,而无需client配置每个Broker的地址。<br>
即RocketMQ会自动根据要访问的topic名称、queue序号,找到对应Broker地址。</p>
<p>Q:咋实现的?</p>
<p>A:由NameServer协调Broker和client共同实现,可伸缩的分布式集群,都需类似服务。</p>
<h2 id="1-namingservice作用">1 NamingService作用</h2>
<p>为client提供路由信息,帮助client找到对应的Broker,NamingService本身也是集群。分布式集群架构的通用设计。</p>
<p>负责维护集群内所有节点的路由信息:client需访问的某特定服务在啥节点。</p>
<p>集群的节点会主动连接NamingService服务,注册自身路由信息。给client提供路由寻址服务的方式:</p>
<ol>
<li>client直连NamingService,查询路由信息</li>
<li>client连接集群内任一节点查询路由信息,节点再从自身缓存或查询NamingService</li>
</ol>
<h2 id="2-nameserver咋提供服务">2 NameServer咋提供服务?</h2>
<p>NameServer独立进程,为Broker、Pro、Con提供服务。支持单点、多节点集群部署。</p>
<p>NameServer最主要为client提供寻址服务,协助 client 找到topic对应的Broker地址。也负责监控每个Broker的存活状态。</p>
<p>每个NameServer节点都保存了集群所有Broker的路由信息,因此可独立提供服务。</p>
<p><img alt="" loading="lazy" src="https://img2024.cnblogs.com/other/1097393/202505/1097393-20250519231447276-290332778.png" class="lazyload"></p>
<p>每个Broker都需和【所有NameServer节点】通信。当 Broker 保存的 Topic 信息变化时,要主动通知所有 NameServer 更新路由信息。</p>
<p>为保证数据一致性,Broker会与所有NameServer节点建立长连接,定期上报Broker的路由信息。该上报路由信息的RPC请求,同时还起到 Broker 与 NameServer 间心跳作用,NameServer 依靠该心跳确定Broker健康状态。</p>
<p>client会选择连接某一NameServer节点,定期获取订阅topic的路由信息,用于Broker寻址。</p>
<p>每个 NameServer 节点都可独立提供服务,因此对于client(Pro和Con),只需选择任一 NameServer 节点来查询路由信息即可。</p>
<p>client在生产或消费某topic的消息前:</p>
<ul>
<li>先从NameServer查询这topic的路由信息</li>
<li>然后根据路由信息获取到当前topic和队列对应的Broker物理地址</li>
<li>再连接到Broker节点上生产或消费</li>
</ul>
<p>如果NameServer检测到与Broker的连接中断,NameServer会认为这个Broker不能再提供服务。会立即把这Broker从路由信息中移除,避免client连接到一个不可用Broker。</p>
<p>client在与Broker通信失败后,会重新去NameServer拉取路由信息,然后连接到其他Broker继续生产或消费消息,实现自动切换失效Broker。</p>
<h2 id="3-组成">3 组成</h2>
<h3 id="uml">UML</h3>
<p><img alt="" loading="lazy" src="https://img2024.cnblogs.com/other/1097393/202505/1097393-20250519231448016-1569832644.png" class="lazyload"></p>
<ul>
<li>NamesrvStartup:程序入口</li>
<li>NamesrvController:NameServer的总控制器,负责所有服务的生命周期管理</li>
<li>BrokerHousekeepingService:监控Broker连接状态的代理类</li>
<li>DefaultRequestProcessor:负责处理 client、Broker 发送过来的RPC请求的处理器</li>
<li>ClusterTestRequestProcessor:用于测试的请求处理器</li>
<li>RouteInfoManager:NameServer最核心的实现类,负责维护所有集群路由信息,这些路由信息都保存在内存,无持久化</li>
</ul>
<h3 id="routeinfomanager">RouteInfoManager</h3>
<p>如下5个Map保存了集群所有的Broker和topic的路由信息</p>
<pre><code class="language-java">public class RouteInfoManager {

    /**
   * 120s,broker 上一次心跳时间超过该值便会被剔除
   */
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
   * 维护topic和queue信息
   * topic消息队列的路由信息,消息发送时会根据路由表进行负载均衡
   * K=topic名称
   * V=Map:
   *      K=brokerName
   *      V=队列数据,如读/写队列的数量、权重
   */
    private final HashMap&lt;String/* topic */, Map&lt;String /* brokerName */ , QueueData&gt;&gt; topicQueueTable;

    /**
   * 维护集群中每个brokerName对应的Broker信息
   * broker的基础信息
   * K=brokerName,V=brokerName
   * broker所在的集群信息,主备broker的地址。
   */
    private final HashMap&lt;String/* brokerName */, BrokerData&gt; brokerAddrTable;

    /**
   * 维护clusterName与BrokerName的对应关系
   * broker集群信息
   * K=集群名称
   * V=集群中所有broker的名称
   */
    private final HashMap&lt;String/* clusterName */, Set&lt;String/* brokerName */&gt;&gt; clusterAddrTable;

    /**
   * 维护每个Broker(brokerAddr)当前的动态信息,包括心跳更新时间,路由数据版本等
   * Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10s要扫描的信息。
   */
    private final HashMap&lt;String/* brokerAddr */, BrokerLiveInfo&gt; brokerLiveTable;

    /**
   * 维护每个Broker (brokerAddr)对应的消息过滤服务的地址(Filter Server),用于服务端消息过滤
   * Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃
   */
    private final HashMap&lt;String/* brokerAddr */, List&lt;String&gt;/* Filter Server */&gt; filterServerTable;
</code></pre>
<h3 id="broker信息">Broker信息</h3>
<pre><code class="language-java">public class BrokerData implements Comparable&lt;BrokerData&gt; {
    // 集群名称
    private String cluster;
    private String brokerName;
    // 保存Broker物理地址 &lt;brokerId,broker address&gt;
    private HashMap&lt;Long/* brokerId */, String/* broker address */&gt; brokerAddrs;
</code></pre>
<h2 id="4-nameserver处理broker注册的路由信息">4 NameServer处理Broker注册的路由信息</h2>
<p>NameServer处理Broker和client所有RPC请求的入口方法:</p>
<h3 id="41-defaultrequestprocessor">4.1 DefaultRequestProcessor</h3>
<h4 id="411-processrequest">4.1.1 processRequest</h4>
<p>处理Broker注册请求</p>
<pre><code class="language-java">public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
                        // 典型的Request请求分发器,根据request.getCode()来分发请求到对应处理器
      switch (request.getCode()) {
          ...
      }
      return null;
}
</code></pre>
<h4 id="412-register_broker">4.1.2 REGISTER_BROKER</h4>
<p>Broker发给NameServer注册请求的Code为REGISTER_BROKER,根据Broker版本号不同,分别有两个不同处理方法</p>
<pre><code class="language-java">case RequestCode.REGISTER_BROKER:
    Version brokerVersion = MQVersion.value2Version(request.getVersion());
    if (brokerVersion.ordinal() &gt;= MQVersion.Version.V3_0_11.ordinal()) {
      return this.registerBrokerWithFilterServer(ctx, request);
    } else {
      return this.registerBroker(ctx, request);
    }
</code></pre>
<h5 id="-registerbroker">① registerBroker</h5>
<pre><code class="language-java">public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) {

        // registerBroker
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
      requestHeader.getClusterName(),
}
</code></pre>
<h5 id="-registerbrokerwithfilterserver">② registerBrokerWithFilterServer</h5>
<pre><code class="language-java">public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) {
    ...

    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
      requestHeader.getClusterName(),
</code></pre>
<p>两个方法流程差不多,都是调用</p>
<h3 id="42-routeinfomanager">4.2 RouteInfoManager</h3>
<h4 id="registerbroker">registerBroker</h4>
<p>注册Broker路由信息。</p>
<pre><code class="language-java">public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List&lt;String&gt; filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
      try {
            // 加写锁,防止并发修改数据
            this.lock.writeLock().lockInterruptibly();
            // 更新clusterAddrTable
            Set&lt;String&gt; brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet&lt;&gt;();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);
            // 更新brokerAddrTable
            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true; // 标识需要先注册
                brokerData = new BrokerData(clusterName, brokerName, new HashMap&lt;&gt;());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map&lt;Long, String&gt; brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove &lt;1, IP:PORT&gt; in namesrv, then add &lt;0, IP:PORT&gt;
            //The same IP:PORT must only have one record in brokerAddrTable
            // 更新brokerAddrTable中的brokerData
            Iterator&lt;Entry&lt;Long, String&gt;&gt; it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry&lt;Long, String&gt; item = it.next();
                if (null != brokerAddr &amp;&amp; brokerAddr.equals(item.getValue()) &amp;&amp; brokerId != item.getKey()) {
                  it.remove();
                }
            }
            // 如果是新注册的Master Broker,或者Broker中的路由信息变了,需要更新topicQueueTable
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                &amp;&amp; MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                  || registerFirst) {
                  ConcurrentMap&lt;String, TopicConfig&gt; tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                  if (tcTable != null) {
                        for (Map.Entry&lt;String, TopicConfig&gt; entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                  }
                }
            }
            // 更新brokerLiveTable
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                  System.currentTimeMillis(),
                  topicConfigWrapper.getDataVersion(),
                  channel,
                  haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
            // 更新filterServerTable
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                  this.filterServerTable.remove(brokerAddr);
                } else {
                  this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
            // 若是Slave Broker,需要在返回的信息中带上master的相关信息
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                  BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                  if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                  }
                }
            }
      } finally {
            // 释放写锁
            this.lock.writeLock().unlock();
      }
    } catch (Exception e) {
      log.error("registerBroker Exception", e);
    }

    return result;
}
</code></pre>
<h2 id="5-client定位broker">5 client定位Broker</h2>
<p>对于client,无论Pro、Con,通过topic寻找Broker的流程一致,都是同一实现。<br>
client启动后,会启动定时器,定期从NameServer拉取相关topic的路由信息,缓存在本地内存,需要时使用。</p>
<p>每个topic的路由信息由TopicRouteData对象表示:</p>
<pre><code class="language-java">public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List&lt;QueueData&gt; queueDatas;
    private List&lt;BrokerData&gt; brokerDatas;
    private HashMap&lt;String/* brokerAddr */, List&lt;String&gt;/* Filter Server */&gt; filterServerTable;

</code></pre>
<p><img alt="" loading="lazy" src="https://img2024.cnblogs.com/other/1097393/202505/1097393-20250519231448515-1238787326.png" class="lazyload"></p>
<p>client选定队列后,可在对应QueueData找到对应BrokerName,然后找到对应BrokerData对象,最终找到对应Master Broker的地址。</p>
<h2 id="6-根据topic查询broker的路由信息">6 根据topic,查询Broker的路由信息</h2>
<h3 id="routeinfomanagerpickuptopicroutedata">RouteInfoManager#pickupTopicRouteData</h3>
<p>NameServer处理client请求和处理Broker请求流程一致,都是通过路由分发器将请求分发的对应的处理方法中:</p>
<pre><code class="language-java">    public TopicRouteData pickupTopicRouteData(final String topic) {
      // 1.初始化返回值 topicRouteData
      TopicRouteData topicRouteData = new TopicRouteData();
      boolean foundQueueData = false;
      boolean foundBrokerData = false;
      Set&lt;String&gt; brokerNameSet = new HashSet&lt;&gt;();
      List&lt;BrokerData&gt; brokerDataList = new LinkedList&lt;&gt;();
      topicRouteData.setBrokerDatas(brokerDataList);

      HashMap&lt;String, List&lt;String&gt;&gt; filterServerMap = new HashMap&lt;&gt;();
      topicRouteData.setFilterServerTable(filterServerMap);

      try {
            try {
                // 2.加读锁
                this.lock.readLock().lockInterruptibly();
                // 3.先获取topic对应的queue信息
                List&lt;QueueData&gt; queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                  // 4.将queue信息存入返回值
                  topicRouteData.setQueueDatas(queueDataList);
                  foundQueueData = true;
                  // 5.遍历队列,找出相关的所有BrokerName
                  for (QueueData qd : queueDataList) {
                        brokerNameSet.add(qd.getBrokerName());
                  }
                  // 6.遍历BrokerName,找到对应BrokerData,并写入返回结果中
                  for (String brokerName : brokerNameSet) {
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap&lt;Long, String&gt;) brokerData
                              .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                              List&lt;String&gt; filterServerList = this.filterServerTable.get(brokerAddr);
                              filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                  }
                }
            } finally {
                // 7.释放读锁
                this.lock.readLock().unlock();
            }
      } catch (Exception e) {
            log.error("pickupTopicRouteData Exception", e);
      }

      log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

      if (foundBrokerData &amp;&amp; foundQueueData) {
              // 8.返回结果
            return topicRouteData;
      }

      return null;
    }
</code></pre>
<blockquote>
<p>本文由博客一文多发平台 OpenWrite 发布!</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/JavaEdge/p/18885051
頁: [1]
查看完整版本: RocketMQ客户端是如何感知Broker节点的?