我们坐在高高的谷堆旁边 發表於 2018-8-27 02:16:00

RabbitMQ一个简单可靠的方案(.Net Core实现)

<h1>前言</h1>
<p>  最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:</p>
<p>  1. 临时异常,如数据库网络闪断、http请求临时失效等;</p>
<p>  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;</p>
<p>  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;</p>
<p>  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;</p>
<p>  5. 非法异常,一些伪造、攻击类型的消息。</p>
<p>&nbsp;</p>
<p>  针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。</p>
<p>&nbsp;</p>
<h1>方案</h1>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180822000500168-1613678738.png" alt=""></p>
<p>&nbsp;</p>
<p>  1. 消息均使用<strong><span style="color: rgba(255, 0, 0, 1)">Exchange</span></strong>进行通讯,方式可以是direct或topic,不建议fanout。</p>
<p>  2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个<strong><span style="color: rgba(255, 0, 0, 1)">审计线程(Audit)</span></strong>监听所有Queue,用于记录消息到<strong><span style="color: rgba(255, 0, 0, 1)">MongoDB</span></strong><span style="color: rgba(255, 0, 0, 1)"><span style="color: rgba(0, 0, 0, 1)">,同时又不阻塞正常业务处理</span></span>。</p>
<p>  3. <strong><span style="color: rgba(255, 0, 0, 1)">生产者(Publisher)</span></strong>在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180826210807044-312537621.png" alt=""></p>
<p>  4. <strong><span style="color: rgba(255, 0, 0, 1)">消费者(Comsumer)</span></strong>消息处理失败时,则把消息发送到<strong><span style="color: rgba(255, 0, 0, 1)">重试交换机(Retry Exchange)</span></strong>,并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。</p>
<p><span style="color: rgba(0, 0, 0, 1)">  5. 重试交换机Exchange设置<strong><span style="color: rgba(255, 0, 0, 1)">死信交换机(Dead Letter Exchange)</span></strong>,消息过期后自动转发到业务交换机(Exchange)。</span></p>
<p><span style="color: rgba(0, 0, 0, 1)">  6. <strong><span style="color: rgba(255, 0, 0, 1)">WebApi</span></strong>可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。</span></p>
<p>&nbsp;  </p>
<p>  <em><span style="color: rgba(255, 0, 0, 1)">注:选择MongoDB作为存储介质的主要<em>原因</em>是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。</span></em></p>
<h1>&nbsp;</h1>
<h1><span style="color: rgba(0, 0, 0, 1)">生产者(Publisher)</span></h1>
<p>  1. 设置断线自动恢复</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
  {
      Uri </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Uri(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">amqp://guest:guest@192.168.132.137:5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
      AutomaticRecoveryEnabled </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
  };</span></pre>
</div>
<p>&nbsp;</p>
<p>  2. 定义Exchange,模式为direct</p>
<div class="cnblogs_code">
<pre>  channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p>&nbsp;</p>
<p>  3. 根据业务定义QueueA和QueueB</p>
<div class="cnblogs_code">
<pre>  channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
  channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);

  channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
  channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p>&nbsp;</p>
<p>  4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息</p>
<div class="cnblogs_code">
<pre>  channel.ConfirmSelect();</pre>
</div>
<p>&nbsp;</p>
<p>  5. 设置消息持久化</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
  properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span>;</pre>
</div>
<p>&nbsp;</p>
<p>  6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers</p>
<div class="cnblogs_code">
<pre>  properties.MessageId = Guid.NewGuid().ToString(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">N</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  properties.Timestamp </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
  properties.Headers </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">
  {
      { </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">value</span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i}
  };</span></pre>
</div>
<p>&nbsp;</p>
<p>  7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)</p>
<div class="cnblogs_code">
<pre>  channel.BasicPublish(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, i % <span style="color: rgba(128, 0, 128, 1)">2</span> == <span style="color: rgba(128, 0, 128, 1)">0</span> ? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span> : <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, properties, body);</pre>
</div>
<p>&nbsp;</p>
<p>  8. 确定收到RabbitMQ服务端的确认消息</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> isOk =<span style="color: rgba(0, 0, 0, 1)"> channel.WaitForConfirms();
  </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">isOk)
  {
      </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">The message is not reached to the server!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  完整代码</p>
<div class="cnblogs_code"><img id="code_img_closed_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="code_img_closed" src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" alt=""><img id="code_img_opened_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="code_img_opened" style="display: none" src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" alt="">
<div id="cnblogs_code_open_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
{
    Uri </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Uri(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">amqp://guest:guest@localhost:5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
    AutomaticRecoveryEnabled </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
};

</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
{
    </span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
    {
      channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);

      channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
      channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);

      channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
      channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);

      channel.ConfirmSelect();

      </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> i = <span style="color: rgba(128, 0, 128, 1)">0</span>; i &lt; <span style="color: rgba(128, 0, 128, 1)">2</span>; i++<span style="color: rgba(0, 0, 0, 1)">)
      {
            </span><span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
            properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
            properties.MessageId </span>= Guid.NewGuid().ToString(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">N</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
            properties.Timestamp </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            properties.Headers </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">
            {
                { </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">value</span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i}
            };

            </span><span style="color: rgba(0, 0, 255, 1)">var</span> message = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Hello </span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i;
            </span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, i % <span style="color: rgba(128, 0, 128, 1)">2</span> == <span style="color: rgba(128, 0, 128, 1)">0</span> ? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span> : <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, properties, body);
            </span><span style="color: rgba(0, 0, 255, 1)">var</span> isOk =<span style="color: rgba(0, 0, 0, 1)"> channel.WaitForConfirms();
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">isOk)
            {
                </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">The message is not reached to the server!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
            }
      }
    }
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
<p>&nbsp;</p>
<p>  效果:QueueA和QueueB各一条消息,QueueAudit两条消息</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180826223250884-1619209740.png" alt=""></p>
<p>&nbsp;  <span style="color: rgba(255, 0, 0, 1)"><em>注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。</em></span></p>
<p>&nbsp;</p>
<h1>正常消费者(ComsumerA)</h1>
<p>  1. 设置预取消息,避免<strong>公平轮训</strong>问题,可以根据需要设置预取消息数,这里是1</p>
<div class="cnblogs_code">
<pre>  _channel.BasicQos(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>);</pre>
</div>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827001105974-36895420.png" alt=""></p>
<p>&nbsp;</p>
<p>  2. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre>  _channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  _channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
  _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p>&nbsp;</p>
<p>  3. 编写回调函数</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(_channel);
  consumer.Received </span>+= (model, ea) =&gt;<span style="color: rgba(0, 0, 0, 1)">
  {
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">The QueueA is always successful.</span>
      <span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
      {
        _channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
      }
      </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
      {
        _logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      }
  };

  _channel.BasicConsume(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, consumer);</pre>
</div>
<p>  <span style="color: rgba(255, 0, 0, 1)"><em>注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。</em></span></p>
<p>&nbsp;</p>
<h1>异常消费者(ComsumerB)</h1>
<p>  1. 设置预取消息</p>
<div class="cnblogs_code">
<pre>  _channel.BasicQos(0, 1, false);</pre>
</div>
<p>&nbsp;</p>
<p>  2. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre>  _channel.ExchangeDeclare("Exchange", "direct"<span>);
  _channel.QueueDeclare("QueueB", true, false, false<span>);
  _channel.QueueBind("QueueB", "Exchange", "RouteB");</span></span></pre>
</div>
<p>&nbsp;</p>
<p>  3.&nbsp; 设置死信交换机(Dead Letter Exchange)</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> retryDic = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">
  {
      {</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">},
      {</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
  };

  _channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  _channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, retryDic);
  _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p>&nbsp;</p>
<p>  4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒</p>
<div class="cnblogs_code">
<pre>  _retryTime = <span style="color: rgba(0, 0, 255, 1)">new</span> List&lt;<span style="color: rgba(0, 0, 255, 1)">int</span>&gt;<span style="color: rgba(0, 0, 0, 1)">
  {
      </span><span style="color: rgba(128, 0, 128, 1)">1</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">,
      </span><span style="color: rgba(128, 0, 128, 1)">10</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">,
      </span><span style="color: rgba(128, 0, 128, 1)">30</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">
  };</span></pre>
</div>
<p>&nbsp;</p>
<p>  5. 获取当前重试次数</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> retryCount = <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;
  </span><span style="color: rgba(0, 0, 255, 1)">if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span> &amp;&amp; ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
  {
      retryCount </span>= (<span style="color: rgba(0, 0, 255, 1)">int</span>)ea.BasicProperties.Headers[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">];
      _logger.LogWarning($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  6. 发生异常,判断是否可以重试</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  private</span> <span style="color: rgba(0, 0, 255, 1)">bool</span> CanRetry(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> retryCount)
  {
      </span><span style="color: rgba(0, 0, 255, 1)">return</span> retryCount &lt;= _retryTime.Count - <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">;
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  7. 可以重试,则启动重试机制</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> SetupRetry(<span style="color: rgba(0, 0, 255, 1)">int</span> retryCount, <span style="color: rgba(0, 0, 255, 1)">string</span> retryExchange, <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> retryRoute, BasicDeliverEventArgs ea)
  {
      </span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body;
      </span><span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties;
      properties.Headers </span>= properties.Headers ?? <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">();
      properties.Headers[</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span>] =<span style="color: rgba(0, 0, 0, 1)"> retryCount;
      properties.Expiration </span>=<span style="color: rgba(0, 0, 0, 1)"> _retryTime.ToString();

      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
      {
        _channel.BasicPublish(retryExchange, retryRoute, properties, body);
      }
      </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
      {
        _logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      }
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  完整代码</p>
<div class="cnblogs_code"><img id="code_img_closed_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="code_img_closed" src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" alt=""><img id="code_img_opened_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="code_img_opened" style="display: none" src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" alt="">
<div id="cnblogs_code_open_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="cnblogs_code_hide">
<pre>    _channel.BasicQos(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
   
    _channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
    _channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
    _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
   
    </span><span style="color: rgba(0, 0, 255, 1)">var</span> retryDic = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">
    {
      {</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">},
      {</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
    };
   
    _channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
    _channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, retryDic);
    _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
   
    </span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(_channel);
    consumer.Received </span>+= (model, ea) =&gt;<span style="color: rgba(0, 0, 0, 1)">
    {
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">The QueueB is always failed.</span>
      <span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> canAck;
      </span><span style="color: rgba(0, 0, 255, 1)">var</span> retryCount = <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span> &amp;&amp; ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
      {
            retryCount </span>= (<span style="color: rgba(0, 0, 255, 1)">int</span>)ea.BasicProperties.Headers[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">];
            _logger.LogWarning($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      }
   
      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
      {
            Handle();
            canAck </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
      }
      </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
      {
            _logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
            </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (CanRetry(retryCount))
            {
                SetupRetry(retryCount, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ea);
                canAck </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
            {
                canAck </span>= <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            }
      }
   
      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
      {
            </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (canAck)
            {
                _channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
            }
            </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
            {
                _channel.BasicNack(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
            }
      }
      </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
      {
            _logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      }
    };
   
    _channel.BasicConsume(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, consumer);</pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
<p>&nbsp;</p>
<h1>审计消费者(Audit Comsumer)</h1>
<p>  1. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre>  _channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);

  _channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
  _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
  _channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p>&nbsp;</p>
<p>  2. 排除死信Exchange转发过来的重复消息</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  if</span> (ea.BasicProperties.Headers == <span style="color: rgba(0, 0, 255, 1)">null</span> || !ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-death</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
  {
      ...
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  3. 生成消息实体</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  var</span> message = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Message
  {
      MessageId </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.MessageId,
      Body </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.Body,
      Exchange </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.Exchange,
      Route </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.RoutingKey
  };</span></pre>
</div>
<p>&nbsp;</p>
<p>  4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
  {
      </span><span style="color: rgba(0, 0, 255, 1)">var</span> headers = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>&gt;<span style="color: rgba(0, 0, 0, 1)">();

      </span><span style="color: rgba(0, 0, 255, 1)">foreach</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> header <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.Headers)
      {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (header.Value <span style="color: rgba(0, 0, 255, 1)">is</span> <span style="color: rgba(0, 0, 255, 1)">byte</span><span style="color: rgba(0, 0, 0, 1)">[] bytes)
        {
              headers </span>=<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(bytes);
        }
        </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
        {
              headers </span>=<span style="color: rgba(0, 0, 0, 1)"> header.Value;
        }
      }

      message.Headers </span>=<span style="color: rgba(0, 0, 0, 1)"> headers;
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  5. 把Unix格式的Timestamp转成UTC时间</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">  if</span> (ea.BasicProperties.Timestamp.UnixTime &gt; <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">)
  {
      message.TimestampUnix </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.Timestamp.UnixTime;
      </span><span style="color: rgba(0, 0, 255, 1)">var</span> offset =<span style="color: rgba(0, 0, 0, 1)"> DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
      message.Timestamp </span>=<span style="color: rgba(0, 0, 0, 1)"> offset.UtcDateTime;
  }</span></pre>
</div>
<p>&nbsp;</p>
<p>  6. 消息存入MongoDB</p>
<div class="cnblogs_code">
<pre>  _mongoDbContext.Collection&lt;Message&gt;().InsertOne(message, cancellationToken: cancellationToken);</pre>
</div>
<p>&nbsp;</p>
<p>  MongoDB记录:</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827011033856-665362313.png" alt=""></p>
<p>&nbsp;</p>
<p>  重试记录:</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827010935896-192368779.png" alt=""></p>
<p>&nbsp;</p>
<h1>消息检索及重发(WebApi)</h1>
<p>  1. 通过消息Id检索消息</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012913904-907874552.png" alt=""></p>
<p>&nbsp;</p>
<p>  2. 通过头消息检索消息</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012941233-1813930052.png" alt=""></p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012955815-1690669672.png" alt=""></p>
<p>&nbsp;</p>
<p>  3. 消息重发,会重新生成MessageId</p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827013342888-2100215329.png" alt=""></p>
<p>  <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827013444065-79331532.png" alt=""></p>
<p>&nbsp;</p>
<h1>Ack,Nack,Reject的关系</h1>
<p>  1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。</p>
<p>  2. 消息处理失败,执行Nack或者Reject:</p>
<p>  a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;</p>
<p>  b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;</p>
<p>  c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。</p>
<p>&nbsp;</p>
<p>  3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。</p>
<p>  </p>
<h1>RabbitMQ自动恢复</h1>
<h2>连接(Connection)恢复</h2>
<p>  1. 重连(Reconnect)</p>
<p>  2. 恢复连接监听(Listeners)</p>
<p>  3. 重新打开通道(Channels)</p>
<p>  4. 恢复通道监听(Listeners)</p>
<p>  5. 恢复basic.qos,publisher confirms以及transaction设置</p>
<p>  &nbsp;</p>
<h2>拓扑(Topology)恢复</h2>
<p>  1. 重新声明交换机(Exchanges)</p>
<p>  2. 重新声明队列(Queues)</p>
<p>  3. 恢复所有绑定(Bindings)</p>
<p>  4. 恢复所有消费者(Consumers)</p>
<p>&nbsp;</p>
<h1>异常处理机制</h1>
<p>  1. 临时异常,如数据库网络闪断、http请求临时失效等</p>
<p>  通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。</p>
<p>&nbsp;</p>
<p>  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行</p>
<p>  通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。</p>
<p>  </p>
<p>  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理</p>
<p>  等系统修正后,通过消息重发的方式处理。</p>
<p>&nbsp;</p>
<p>  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等</p>
<p>  等系统恢复后,通过消息重发的方式处理。</p>
<p>&nbsp;</p>
<p>  5. 非法异常,一些伪造、攻击类型的消息</p>
<p>  多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。</p>
<p>&nbsp;</p>
<h1>源码地址</h1>
<p>https://github.com/ErikXu/RabbitMesage</p><br><br>
来源:https://www.cnblogs.com/Erik_Xu/p/9515208.html
頁: [1]
查看完整版本: RabbitMQ一个简单可靠的方案(.Net Core实现)