【RabbitMQ】实现完整的消息可靠性保障体系
<div class="dad65929"><div class="_4f9bf79 d7dc56a8 _43c05b5">
<div class="ds-message _63c77b1">
<div class="ds-markdown">
<h2>本章目标</h2>
<ul>
<li>
<p class="ds-markdown-paragraph">掌握生产者确认(Publisher Confirms)机制,确保消息到达Broker。</p>
</li>
<li>
<p class="ds-markdown-paragraph">深入理解消费者确认(Consumer Acknowledgments)的最佳实践。</p>
</li>
<li>
<p class="ds-markdown-paragraph">学习死信队列(Dead Letter Exchange, DLX)处理失败消息。</p>
</li>
<li>
<p class="ds-markdown-paragraph">实现完整的消息可靠性保障体系。</p>
</li>
</ul>
<hr>
<h2>一、理论部分</h2>
<h3>1. 消息传递的生命周期与可靠性挑战</h3>
<p class="ds-markdown-paragraph">在分布式系统中,消息可能在任何环节丢失:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">生产者 -> Broker:网络故障、Broker崩溃</p>
</li>
<li>
<p class="ds-markdown-paragraph">Broker内部:服务器宕机、队列未持久化</p>
</li>
<li>
<p class="ds-markdown-paragraph">Broker -> 消费者:消费者处理失败、连接中断</p>
</li>
</ol>
<h3>2. 生产者确认(Publisher Confirms)</h3>
<p class="ds-markdown-paragraph">这是RabbitMQ提供的一种生产者端的可靠性机制。当生产者启用确认模式后,Broker会异步通知生产者消息是否已经成功处理。</p>
<ul>
<li>
<p class="ds-markdown-paragraph">事务(Transactions):AMQP协议支持事务,但性能较差(同步,吞吐量降低约200-300倍)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">发布者确认(Publisher Confirms):性能更好的异步替代方案,是生产环境推荐的方式。</p>
</li>
</ul>
<p class="ds-markdown-paragraph">确认的两种结果:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">ACK:消息已被Broker成功接收和处理(持久化到磁盘)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">NACK:消息未被Broker处理(通常由于内部错误)。</p>
</li>
</ul>
<h3>3. 消费者确认(Consumer Acknowledgments)</h3>
<p class="ds-markdown-paragraph">我们在前面的章节已经接触过,本章将深入探讨:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">自动确认(autoAck: true):消息一送达就确认,风险高。</p>
</li>
<li>
<p class="ds-markdown-paragraph">手动确认(autoAck: false):</p>
<ul>
<li>
<p class="ds-markdown-paragraph"><code>BasicAck</code>:成功处理,消息从队列删除。</p>
</li>
<li>
<p class="ds-markdown-paragraph"><code>BasicNack</code>:处理失败,可以要求重新入队或丢弃。</p>
</li>
<li>
<p class="ds-markdown-paragraph"><code>BasicReject</code>:同<code>BasicNack</code>,但不支持批量操作。</p>
</li>
</ul>
</li>
</ul>
<h3>4. 死信队列(Dead Letter Exchange, DLX)</h3>
<p class="ds-markdown-paragraph">当消息遇到以下情况时,会成为"死信":</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">消息被消费者<code>basic.reject</code>或<code>basic.nack</code>且<code>requeue = false</code></p>
</li>
<li>
<p class="ds-markdown-paragraph">消息因TTL(Time-To-Live)过期</p>
</li>
<li>
<p class="ds-markdown-paragraph">队列达到最大长度限制</p>
</li>
</ol>
<p class="ds-markdown-paragraph">死信消息会被重新发布到配置的DLX,然后根据DLX的类型路由到死信队列。</p>
<h3>5. 完整的可靠性保障体系</h3>
<p class="ds-markdown-paragraph">生产级应用需要多层次的保障:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">生产者确认:确保消息到达Broker</p>
</li>
<li>
<p class="ds-markdown-paragraph">消息持久化:队列持久化 + 消息持久化</p>
</li>
<li>
<p class="ds-markdown-paragraph">消费者确认:确保消息被成功处理</p>
</li>
<li>
<p class="ds-markdown-paragraph">死信队列:处理无法正常消费的消息</p>
</li>
<li>
<p class="ds-markdown-paragraph">监控与告警:及时发现和处理问题</p>
</li>
</ol><hr>
<h2>二、实操部分:构建完整的可靠消息系统</h2>
<p class="ds-markdown-paragraph">我们将构建一个包含完整可靠性保障的订单处理系统。</p>
<h3>第1步:创建项目结构</h3>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">创建新解决方案,包含以下项目:</p>
<ul>
<li>
<p class="ds-markdown-paragraph"><code>ReliableProducer</code> - 支持确认的生产者</p>
</li>
<li>
<p class="ds-markdown-paragraph"><code>ReliableConsumer</code> - 支持手动确认和死信处理的消费者</p>
</li>
<li>
<p class="ds-markdown-paragraph"><code>DeadLetterProcessor</code> - 死信消息处理器</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">为所有项目添加<code>RabbitMQ.Client</code> NuGet包。</p>
</li>
</ol>
<h3>第2步:实现可靠生产者(ReliableProducer.cs)</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory()
{
HostName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
UserName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Password </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 启用发布者确认模式</span>
<span style="color: rgba(0, 0, 0, 1)"> channel.ConfirmSelect();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明持久化队列</span>
channel.QueueDeclare(queue: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
exclusive: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置确认事件处理器</span>
channel.BasicAcks += (sender, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✓] Message {ea.DeliveryTag} confirmed by broker</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
};
channel.BasicNacks </span>+= (sender, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✗] Message {ea.DeliveryTag} not confirmed by broker</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在实际应用中,这里应该实现重试逻辑</span>
<span style="color: rgba(0, 0, 0, 1)"> };
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = <span style="color: rgba(128, 0, 128, 1)">1</span>; i <= <span style="color: rgba(128, 0, 128, 1)">10</span>; i++<span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message = $<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order #{i} - Product XYZ</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetBytes(message);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置消息为持久化</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
properties.MessageId </span>=<span style="color: rgba(0, 0, 0, 1)"> Guid.NewGuid().ToString();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布消息</span>
channel.BasicPublish(exchange: <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">,
routingKey: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
basicProperties: properties,
body: body);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Sent {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 等待确认(在实际应用中可能使用异步方式)</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (channel.WaitForConfirms(TimeSpan.FromSeconds(<span style="color: rgba(128, 0, 128, 1)">5</span><span style="color: rgba(0, 0, 0, 1)">)))
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✓] Message {i} confirmed</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✗] Message {i} confirmation timeout</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 实现重试逻辑</span>
<span style="color: rgba(0, 0, 0, 1)"> }
Thread.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1000</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟消息间隔</span>
<span style="color: rgba(0, 0, 0, 1)"> }
}
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Press to exit.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.ReadLine();</span></pre>
</div>
<h3>第3步:配置死信交换机和队列</h3>
<p class="ds-markdown-paragraph">在实际应用中,我们通常在生产者和消费者中都声明所需的交换机和队列。这里我们在消费者中配置完整的死信机制。</p>
<h3>第4步:实现可靠消费者(ReliableConsumer.cs)</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory()
{
HostName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
UserName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Password </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 声明死信交换机</span>
channel.ExchangeDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dlx</span><span style="color: rgba(128, 0, 0, 1)">"</span>, ExchangeType.Direct, durable: <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 声明死信队列</span>
channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
exclusive: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 绑定死信队列到死信交换机</span>
channel.QueueBind(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dlx</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 声明主队列,并配置死信参数</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> arguments = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dlx</span><span style="color: rgba(128, 0, 0, 1)">"</span> }, <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 指定死信交换机</span>
{ <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter</span><span style="color: rgba(128, 0, 0, 1)">"</span> } <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 死信路由键</span>
<span style="color: rgba(0, 0, 0, 1)"> };
channel.QueueDeclare(queue: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
exclusive: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: arguments);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置公平分发</span>
channel.BasicQos(prefetchSize: <span style="color: rgba(128, 0, 128, 1)">0</span>, prefetchCount: <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">global</span>: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [*] Waiting for orders. To exit press CTRL+C</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Received {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟业务处理</span>
<span style="color: rgba(0, 0, 0, 1)"> ProcessOrder(message, ea.DeliveryTag);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 处理成功,手动确认</span>
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✓] Order processed successfully: {ea.DeliveryTag}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✗] Failed to process order {ea.DeliveryTag}: {ex.Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 处理失败,拒绝消息并不重新入队(发送到死信队列)</span>
<span style="color: rgba(0, 0, 0, 1)"> channel.BasicNack(deliveryTag: ea.DeliveryTag,
multiple: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
requeue: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
};
channel.BasicConsume(queue: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
autoAck: </span><span style="color: rgba(0, 0, 255, 1)">false</span>,<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 手动确认模式</span>
<span style="color: rgba(0, 0, 0, 1)"> consumer: consumer);
Console.ReadLine();
}
</span><span style="color: rgba(0, 0, 255, 1)">void</span> ProcessOrder(<span style="color: rgba(0, 0, 255, 1)">string</span> message, <span style="color: rgba(0, 0, 255, 1)">ulong</span><span style="color: rgba(0, 0, 0, 1)"> deliveryTag)
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟业务逻辑 - 随机失败以测试可靠性机制</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> random = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Random();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟10%的失败率</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (random.Next(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">10</span>) == <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Simulated processing failure</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟处理时间</span>
Thread.Sleep(<span style="color: rgba(128, 0, 128, 1)">2000</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Processing order {deliveryTag}: {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>第5步:实现死信处理器(DeadLetterProcessor.cs)</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory()
{
HostName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
UserName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Password </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明死信队列(确保存在)</span>
channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
exclusive: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [*] Waiting for dead letters. To exit press CTRL+C</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> originalQueue = ea.BasicProperties.Headers?[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-first-death-queue</span><span style="color: rgba(128, 0, 0, 1)">"</span>]?<span style="color: rgba(0, 0, 0, 1)">.ToString();
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Received failed message:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Original Queue: {originalQueue}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Message: {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Routing Key: {ea.RoutingKey}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Delivery Tag: {ea.DeliveryTag}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在实际应用中,这里可以实现:
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 发送告警通知
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 记录到错误日志
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 人工干预
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 重试机制</span>
<span style="color: rgba(0, 0, 0, 1)">
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> -> Sending alert to administrator...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> -> Logging to error system...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 确认死信消息</span>
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
};
channel.BasicConsume(queue: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dead_letter_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
autoAck: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
consumer: consumer);
Console.ReadLine();
}</span></pre>
</div>
<h3>第6步:高级特性 - 带重试机制的消费者</h3>
<p class="ds-markdown-paragraph">创建<code>RetryConsumer.cs</code>,实现更复杂的重试逻辑:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory()
{
HostName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
UserName </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Password </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 配置重试队列(带TTL)</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> retryArguments = <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-exchange</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)"> },
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-dead-letter-routing-key</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> },
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">x-message-ttl</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 128, 1)">10000</span> } <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 10秒后重试</span>
<span style="color: rgba(0, 0, 0, 1)"> };
channel.QueueDeclare(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span>, durable: <span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, arguments: retryArguments);
channel.QueueDeclare(queue: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span>, durable: <span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span>, arguments: <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
channel.BasicQos(prefetchSize: </span><span style="color: rgba(128, 0, 128, 1)">0</span>, prefetchCount: <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">global</span>: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [*] Waiting for messages with retry support.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 检查重试次数</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> retryCount =<span style="color: rgba(0, 0, 0, 1)"> GetRetryCount(ea.BasicProperties);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Received (attempt {retryCount + 1}): {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
ProcessOrderWithRetry(message, retryCount);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✓] Successfully processed</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✗] Processing failed: {ex.Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (retryCount < <span style="color: rgba(128, 0, 128, 1)">3</span>) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 最多重试3次</span>
<span style="color: rgba(0, 0, 0, 1)"> {
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [↻] Scheduling retry {retryCount + 1}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布到重试队列</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
properties.Headers </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, <span style="color: rgba(0, 0, 255, 1)">object</span>><span style="color: rgba(0, 0, 0, 1)">
{
{ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry-count</span><span style="color: rgba(128, 0, 0, 1)">"</span>, retryCount + <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)"> }
};
channel.BasicPublish(</span><span style="color: rgba(128, 0, 0, 1)">""</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, properties, body);
channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 确认原消息</span>
<span style="color: rgba(0, 0, 0, 1)"> }
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [✗] Max retries exceeded, sending to DLQ</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
channel.BasicNack(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
}
};
channel.BasicConsume(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reliable_orders</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, consumer);
Console.ReadLine();
}
</span><span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> GetRetryCount(IBasicProperties properties)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (properties.Headers?.ContainsKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry-count</span><span style="color: rgba(128, 0, 0, 1)">"</span>) == <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> retryCountBytes = (<span style="color: rgba(0, 0, 255, 1)">byte</span>[])properties.Headers[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry-count</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">];
</span><span style="color: rgba(0, 0, 255, 1)">return</span> BitConverter.ToInt32(retryCountBytes, <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">void</span> ProcessOrderWithRetry(<span style="color: rgba(0, 0, 255, 1)">string</span> message, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> retryCount)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> random = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Random();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟处理,重试次数越多成功率越高(模拟系统恢复)</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> failureChance = Math.Max(<span style="color: rgba(128, 0, 128, 1)">10</span> - retryCount * <span style="color: rgba(128, 0, 128, 1)">3</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 降低失败率</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (random.Next(<span style="color: rgba(128, 0, 128, 1)">0</span>, failureChance) == <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception($<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Simulated failure on attempt {retryCount + 1}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
Thread.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Processed successfully on attempt {retryCount + 1}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>第7步:运行与测试</h3>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">启动所有服务</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)"># 终端1:启动死信处理器
dotnet run </span>--<span style="color: rgba(0, 0, 0, 1)">project DeadLetterProcessor
# 终端2:启动主消费者
dotnet run </span>--<span style="color: rgba(0, 0, 0, 1)">project ReliableConsumer
# 终端3:启动生产者
dotnet run </span>--project ReliableProducer</pre>
</div>
</div>
</div>
</div>
</div>
</div>
</li>
<li>
<p class="ds-markdown-paragraph">测试场景1:正常流程</p>
<ul>
<li>
<p class="ds-markdown-paragraph">观察生产者确认日志</p>
</li>
<li>
<p class="ds-markdown-paragraph">观察消费者处理成功的日志</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">测试场景2:消费者处理失败</p>
<ul>
<li>
<p class="ds-markdown-paragraph">在消费者处理时强制关闭消费者进程</p>
</li>
<li>
<p class="ds-markdown-paragraph">观察消息重新投递到其他消费者</p>
</li>
<li>
<p class="ds-markdown-paragraph">或者观察消息进入死信队列</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">测试场景3:死信处理</p>
<ul>
<li>
<p class="ds-markdown-paragraph">让消费者处理失败,消息进入死信队列</p>
</li>
<li>
<p class="ds-markdown-paragraph">观察死信处理器的告警和日志记录</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">测试场景4:重试机制</p>
<ul>
<li>
<p class="ds-markdown-paragraph">使用<code>RetryConsumer</code>测试重试逻辑</p>
</li>
<li>
<p class="ds-markdown-paragraph">观察消息在重试队列中的行为</p>
</li>
</ul>
</li>
</ol>
<h3>第8步:监控与管理</h3>
<p class="ds-markdown-paragraph">在RabbitMQ管理界面(http://localhost:15672)监控:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">队列深度和消息状态</p>
</li>
<li>
<p class="ds-markdown-paragraph">确认率和投递率</p>
</li>
<li>
<p class="ds-markdown-paragraph">死信队列中的消息数量</p>
</li>
</ul>
<hr>
<h2>本章总结</h2>
<p class="ds-markdown-paragraph">在这一章中,我们构建了一个完整的消息可靠性保障体系:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">生产者确认:使用<code>ConfirmSelect</code>和确认事件确保消息到达Broker。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消息持久化:队列持久化 + 消息持久化,应对服务器重启。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消费者确认:手动确认模式,确保消息被成功处理。</p>
</li>
<li>
<p class="ds-markdown-paragraph">死信队列:处理无法正常消费的消息,防止消息丢失。</p>
</li>
<li>
<p class="ds-markdown-paragraph">重试机制:实现带延迟的重试逻辑,提高系统韧性。</p>
</li>
<li>
<p class="ds-markdown-paragraph">监控告警:通过死信处理器实现错误通知。</p>
</li>
</ol>
<p class="ds-markdown-paragraph">这些机制组合使用,可以构建出生产级的可靠消息系统。在下一章,我们将学习如何将RabbitMQ与ASP.NET Core集成,构建现代化的微服务应用。</p>
</div>
</div>
</div>
</div><br><br>
来源:https://www.cnblogs.com/jixingsuiyuan/p/19117336
頁:
[1]