RabbitMQ一个简单可靠的方案(.Net Core实现)
<h1>前言</h1><p> 最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:</p>
<p> 1. 临时异常,如数据库网络闪断、http请求临时失效等;</p>
<p> 2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;</p>
<p> 3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;</p>
<p> 4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;</p>
<p> 5. 非法异常,一些伪造、攻击类型的消息。</p>
<p> </p>
<p> 针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。</p>
<p> </p>
<h1>方案</h1>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180822000500168-1613678738.png" alt=""></p>
<p> </p>
<p> 1. 消息均使用<strong><span style="color: rgba(255, 0, 0, 1)">Exchange</span></strong>进行通讯,方式可以是direct或topic,不建议fanout。</p>
<p> 2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个<strong><span style="color: rgba(255, 0, 0, 1)">审计线程(Audit)</span></strong>监听所有Queue,用于记录消息到<strong><span style="color: rgba(255, 0, 0, 1)">MongoDB</span></strong><span style="color: rgba(255, 0, 0, 1)"><span style="color: rgba(0, 0, 0, 1)">,同时又不阻塞正常业务处理</span></span>。</p>
<p> 3. <strong><span style="color: rgba(255, 0, 0, 1)">生产者(Publisher)</span></strong>在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180826210807044-312537621.png" alt=""></p>
<p> 4. <strong><span style="color: rgba(255, 0, 0, 1)">消费者(Comsumer)</span></strong>消息处理失败时,则把消息发送到<strong><span style="color: rgba(255, 0, 0, 1)">重试交换机(Retry Exchange)</span></strong>,并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。</p>
<p><span style="color: rgba(0, 0, 0, 1)"> 5. 重试交换机Exchange设置<strong><span style="color: rgba(255, 0, 0, 1)">死信交换机(Dead Letter Exchange)</span></strong>,消息过期后自动转发到业务交换机(Exchange)。</span></p>
<p><span style="color: rgba(0, 0, 0, 1)"> 6. <strong><span style="color: rgba(255, 0, 0, 1)">WebApi</span></strong>可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。</span></p>
<p> </p>
<p> <em><span style="color: rgba(255, 0, 0, 1)">注:选择MongoDB作为存储介质的主要<em>原因</em>是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。</span></em></p>
<h1> </h1>
<h1><span style="color: rgba(0, 0, 0, 1)">生产者(Publisher)</span></h1>
<p> 1. 设置断线自动恢复</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
{
Uri </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Uri(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">amqp://guest:guest@192.168.132.137:5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
AutomaticRecoveryEnabled </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
};</span></pre>
</div>
<p> </p>
<p> 2. 定义Exchange,模式为direct</p>
<div class="cnblogs_code">
<pre> channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p> </p>
<p> 3. 根据业务定义QueueA和QueueB</p>
<div class="cnblogs_code">
<pre> channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p> </p>
<p> 4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息</p>
<div class="cnblogs_code">
<pre> channel.ConfirmSelect();</pre>
</div>
<p> </p>
<p> 5. 设置消息持久化</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span>;</pre>
</div>
<p> </p>
<p> 6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers</p>
<div class="cnblogs_code">
<pre> properties.MessageId = Guid.NewGuid().ToString(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">N</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
properties.Timestamp </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
properties.Headers </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">value</span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i}
};</span></pre>
</div>
<p> </p>
<p> 7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)</p>
<div class="cnblogs_code">
<pre> channel.BasicPublish(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, i % <span style="color: rgba(128, 0, 128, 1)">2</span> == <span style="color: rgba(128, 0, 128, 1)">0</span> ? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span> : <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, properties, body);</pre>
</div>
<p> </p>
<p> 8. 确定收到RabbitMQ服务端的确认消息</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> isOk =<span style="color: rgba(0, 0, 0, 1)"> channel.WaitForConfirms();
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">isOk)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">The message is not reached to the server!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<p> </p>
<p> 完整代码</p>
<div class="cnblogs_code"><img id="code_img_closed_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="code_img_closed" src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" alt=""><img id="code_img_opened_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="code_img_opened" style="display: none" src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" alt="">
<div id="cnblogs_code_open_b7434f8b-cbce-45ae-8637-19c60e0c34c5" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
{
Uri </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Uri(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">amqp://guest:guest@localhost:5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
AutomaticRecoveryEnabled </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
{
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
channel.ConfirmSelect();
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> i = <span style="color: rgba(128, 0, 128, 1)">0</span>; i < <span style="color: rgba(128, 0, 128, 1)">2</span>; i++<span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
properties.MessageId </span>= Guid.NewGuid().ToString(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">N</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
properties.Timestamp </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
properties.Headers </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">value</span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i}
};
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Hello </span><span style="color: rgba(128, 0, 0, 1)">"</span> +<span style="color: rgba(0, 0, 0, 1)"> i;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetBytes(message);
channel.BasicPublish(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, i % <span style="color: rgba(128, 0, 128, 1)">2</span> == <span style="color: rgba(128, 0, 128, 1)">0</span> ? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span> : <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, properties, body);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> isOk =<span style="color: rgba(0, 0, 0, 1)"> channel.WaitForConfirms();
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">isOk)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">The message is not reached to the server!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
}
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
<p> </p>
<p> 效果:QueueA和QueueB各一条消息,QueueAudit两条消息</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180826223250884-1619209740.png" alt=""></p>
<p> <span style="color: rgba(255, 0, 0, 1)"><em>注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。</em></span></p>
<p> </p>
<h1>正常消费者(ComsumerA)</h1>
<p> 1. 设置预取消息,避免<strong>公平轮训</strong>问题,可以根据需要设置预取消息数,这里是1</p>
<div class="cnblogs_code">
<pre> _channel.BasicQos(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>);</pre>
</div>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827001105974-36895420.png" alt=""></p>
<p> </p>
<p> 2. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre> _channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p> </p>
<p> 3. 编写回调函数</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(_channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">The QueueA is always successful.</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
_channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
{
_logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
};
_channel.BasicConsume(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueA</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, consumer);</pre>
</div>
<p> <span style="color: rgba(255, 0, 0, 1)"><em>注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。</em></span></p>
<p> </p>
<h1>异常消费者(ComsumerB)</h1>
<p> 1. 设置预取消息</p>
<div class="cnblogs_code">
<pre> _channel.BasicQos(0, 1, false);</pre>
</div>
<p> </p>
<p> 2. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre> _channel.ExchangeDeclare("Exchange", "direct"<span>);
_channel.QueueDeclare("QueueB", true, false, false<span>);
_channel.QueueBind("QueueB", "Exchange", "RouteB");</span></span></pre>
</div>
<p> </p>
<p> 3. 设置死信交换机(Dead Letter Exchange)</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> retryDic = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">},
{</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
};
_channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, retryDic);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p> </p>
<p> 4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒</p>
<div class="cnblogs_code">
<pre> _retryTime = <span style="color: rgba(0, 0, 255, 1)">new</span> List<<span style="color: rgba(0, 0, 255, 1)">int</span>><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(128, 0, 128, 1)">1</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 128, 1)">10</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 128, 1)">30</span> * <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">
};</span></pre>
</div>
<p> </p>
<p> 5. 获取当前重试次数</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> retryCount = <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span> && ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
{
retryCount </span>= (<span style="color: rgba(0, 0, 255, 1)">int</span>)ea.BasicProperties.Headers[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">];
_logger.LogWarning($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<p> </p>
<p> 6. 发生异常,判断是否可以重试</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> private</span> <span style="color: rgba(0, 0, 255, 1)">bool</span> CanRetry(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> retryCount)
{
</span><span style="color: rgba(0, 0, 255, 1)">return</span> retryCount <= _retryTime.Count - <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">;
}</span></pre>
</div>
<p> </p>
<p> 7. 可以重试,则启动重试机制</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> SetupRetry(<span style="color: rgba(0, 0, 255, 1)">int</span> retryCount, <span style="color: rgba(0, 0, 255, 1)">string</span> retryExchange, <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> retryRoute, BasicDeliverEventArgs ea)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties;
properties.Headers </span>= properties.Headers ?? <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">();
properties.Headers[</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span>] =<span style="color: rgba(0, 0, 0, 1)"> retryCount;
properties.Expiration </span>=<span style="color: rgba(0, 0, 0, 1)"> _retryTime.ToString();
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
_channel.BasicPublish(retryExchange, retryRoute, properties, body);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
{
_logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
}</span></pre>
</div>
<p> </p>
<p> 完整代码</p>
<div class="cnblogs_code"><img id="code_img_closed_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="code_img_closed" src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" alt=""><img id="code_img_opened_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="code_img_opened" style="display: none" src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" alt="">
<div id="cnblogs_code_open_7ad2d26f-9ba3-4674-9669-6a310b5640c0" class="cnblogs_code_hide">
<pre> _channel.BasicQos(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> retryDic = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">},
{</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
};
_channel.ExchangeDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, retryDic);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(_channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">The QueueB is always failed.</span>
<span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> canAck;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> retryCount = <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span> && ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
{
retryCount </span>= (<span style="color: rgba(0, 0, 255, 1)">int</span>)ea.BasicProperties.Headers[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retryCount</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">];
_logger.LogWarning($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
Handle();
canAck </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
_logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (CanRetry(retryCount))
{
SetupRetry(retryCount, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB_Retry</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ea);
canAck </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
canAck </span>= <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
}
}
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (canAck)
{
_channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
_channel.BasicNack(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (AlreadyClosedException ex)
{
_logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ is closed!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
};
_channel.BasicConsume(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueB</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, consumer);</pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
<p> </p>
<h1>审计消费者(Audit Comsumer)</h1>
<p> 1. 声明Exchange和Queue</p>
<div class="cnblogs_code">
<pre> _channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">direct</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteA</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">QueueAudit</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RouteB</span><span style="color: rgba(128, 0, 0, 1)">"</span>);</pre>
</div>
<p> </p>
<p> 2. 排除死信Exchange转发过来的重复消息</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> if</span> (ea.BasicProperties.Headers == <span style="color: rgba(0, 0, 255, 1)">null</span> || !ea.BasicProperties.Headers.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-death</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
{
...
}</span></pre>
</div>
<p> </p>
<p> 3. 生成消息实体</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> var</span> message = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Message
{
MessageId </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.MessageId,
Body </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.Body,
Exchange </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.Exchange,
Route </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.RoutingKey
};</span></pre>
</div>
<p> </p>
<p> 4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> if</span> (ea.BasicProperties.Headers != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> headers = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">foreach</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> header <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.Headers)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (header.Value <span style="color: rgba(0, 0, 255, 1)">is</span> <span style="color: rgba(0, 0, 255, 1)">byte</span><span style="color: rgba(0, 0, 0, 1)">[] bytes)
{
headers </span>=<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(bytes);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
headers </span>=<span style="color: rgba(0, 0, 0, 1)"> header.Value;
}
}
message.Headers </span>=<span style="color: rgba(0, 0, 0, 1)"> headers;
}</span></pre>
</div>
<p> </p>
<p> 5. 把Unix格式的Timestamp转成UTC时间</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"> if</span> (ea.BasicProperties.Timestamp.UnixTime > <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">)
{
message.TimestampUnix </span>=<span style="color: rgba(0, 0, 0, 1)"> ea.BasicProperties.Timestamp.UnixTime;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> offset =<span style="color: rgba(0, 0, 0, 1)"> DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
message.Timestamp </span>=<span style="color: rgba(0, 0, 0, 1)"> offset.UtcDateTime;
}</span></pre>
</div>
<p> </p>
<p> 6. 消息存入MongoDB</p>
<div class="cnblogs_code">
<pre> _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);</pre>
</div>
<p> </p>
<p> MongoDB记录:</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827011033856-665362313.png" alt=""></p>
<p> </p>
<p> 重试记录:</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827010935896-192368779.png" alt=""></p>
<p> </p>
<h1>消息检索及重发(WebApi)</h1>
<p> 1. 通过消息Id检索消息</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012913904-907874552.png" alt=""></p>
<p> </p>
<p> 2. 通过头消息检索消息</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012941233-1813930052.png" alt=""></p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827012955815-1690669672.png" alt=""></p>
<p> </p>
<p> 3. 消息重发,会重新生成MessageId</p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827013342888-2100215329.png" alt=""></p>
<p> <img src="https://images2018.cnblogs.com/blog/182190/201808/182190-20180827013444065-79331532.png" alt=""></p>
<p> </p>
<h1>Ack,Nack,Reject的关系</h1>
<p> 1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。</p>
<p> 2. 消息处理失败,执行Nack或者Reject:</p>
<p> a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;</p>
<p> b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;</p>
<p> c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。</p>
<p> </p>
<p> 3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。</p>
<p> </p>
<h1>RabbitMQ自动恢复</h1>
<h2>连接(Connection)恢复</h2>
<p> 1. 重连(Reconnect)</p>
<p> 2. 恢复连接监听(Listeners)</p>
<p> 3. 重新打开通道(Channels)</p>
<p> 4. 恢复通道监听(Listeners)</p>
<p> 5. 恢复basic.qos,publisher confirms以及transaction设置</p>
<p> </p>
<h2>拓扑(Topology)恢复</h2>
<p> 1. 重新声明交换机(Exchanges)</p>
<p> 2. 重新声明队列(Queues)</p>
<p> 3. 恢复所有绑定(Bindings)</p>
<p> 4. 恢复所有消费者(Consumers)</p>
<p> </p>
<h1>异常处理机制</h1>
<p> 1. 临时异常,如数据库网络闪断、http请求临时失效等</p>
<p> 通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。</p>
<p> </p>
<p> 2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行</p>
<p> 通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。</p>
<p> </p>
<p> 3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理</p>
<p> 等系统修正后,通过消息重发的方式处理。</p>
<p> </p>
<p> 4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等</p>
<p> 等系统恢复后,通过消息重发的方式处理。</p>
<p> </p>
<p> 5. 非法异常,一些伪造、攻击类型的消息</p>
<p> 多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。</p>
<p> </p>
<h1>源码地址</h1>
<p>https://github.com/ErikXu/RabbitMesage</p><br><br>
来源:https://www.cnblogs.com/Erik_Xu/p/9515208.html
頁:
[1]