【RabbitMQ】工作队列(Work Queues)与消息确认(Ack)
<h2>本章目标</h2><ul>
<li>
<p class="ds-markdown-paragraph">理解工作队列(竞争消费者模式)的概念和适用场景。</p>
</li>
<li>
<p class="ds-markdown-paragraph">掌握消息确认(Acknowledgment)机制,实现可靠的消息处理。</p>
</li>
<li>
<p class="ds-markdown-paragraph">学习消息持久化(Durability),防止服务器重启导致消息丢失。</p>
</li>
<li>
<p class="ds-markdown-paragraph">使用公平分发(Fair Dispatch)来优化多个消费者的工作效率。</p>
</li>
</ul>
<hr>
<h2>一、理论部分</h2>
<h3>1. 工作队列(Work Queues / Task Queues)</h3>
<p class="ds-markdown-paragraph">在上一章的"Hello World"示例中,我们每发送一条消息,就会被一个消费者立即接收。但在实际应用中,我们往往需要处理一些耗时任务(如发送邮件、处理图片、生成报告等)。</p>
<p class="ds-markdown-paragraph">工作队列(又称任务队列)的核心思想是避免立即执行资源密集型任务并等待其完成,而是将任务封装为消息并发送到队列中。在后台运行的多个工作进程(消费者)会从队列中取出消息并进行处理。</p>
<p class="ds-markdown-paragraph">这种多个消费者从一个队列中获取消息的模式称为竞争消费者模式(Competing Consumers Pattern),它能很容易地实现并行处理,从而横向扩展系统。</p>
<h3>2. 消息确认(Message Acknowledgment)</h3>
<p class="ds-markdown-paragraph">在默认的自动确认(<code>autoAck: true</code>)模式下,消息一旦被RabbitMQ传递给消费者,就会立即从队列中删除。这有一个严重的问题:如果消费者在处理消息过程中崩溃或断开连接,这条正在处理的消息就会永久丢失,而且无法被其他消费者重新处理。</p>
<p class="ds-markdown-paragraph">为了解决这个问题,AMQP提供了消息确认机制:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">消费者在创建时设置 <code>autoAck: false</code>(手动确认模式)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">当消费者成功处理完一条消息后,它会显式地向RabbitMQ发送一个确认(ACK)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">只有在收到ACK后,RabbitMQ才会安全地从队列中删除该消息。</p>
</li>
<li>
<p class="ds-markdown-paragraph">如果消费者在处理过程中断开连接(没有发送ACK),RabbitMQ会认为该消息未被成功处理,并将其重新入队,然后传递给另一个消费者(如果存在)。</p>
</li>
</ul>
<p class="ds-markdown-paragraph">这种机制确保了即使消费者偶尔死亡,消息也不会丢失。</p>
<h3>3. 消息持久化(Message Durability)</h3>
<p class="ds-markdown-paragraph">消息确认机制保护了消息在消费者处理时不丢失。但如果RabbitMQ服务器本身停止或崩溃了呢?默认情况下,RabbitMQ退出或崩溃时,它会忘记所有的队列和消息。</p>
<p class="ds-markdown-paragraph">为了确保消息在服务器重启后仍然存在,我们需要做两件事:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">将队列声明为持久的(Durable):这样队列本身会在服务器重启后继续存在。</p>
</li>
<li>
<p class="ds-markdown-paragraph">将消息标记为持久的(Persistent):在发布消息时,设置 <code>IBasicProperties.Persistent = true</code>。</p>
</li>
</ol>
<blockquote>
<p class="ds-markdown-paragraph">注意:将消息标记为<code>Persistent</code>并不能完全保证消息永不丢失。虽然RabbitMQ会将消息保存到磁盘,但在它接收到消息和保存到磁盘之间仍然有一个很短的时间窗口。对于更强的保证,需要使用发布者确认(Publisher Confirms),这将在后续章节介绍。</p>
</blockquote>
<h3>4. 公平分发(Fair Dispatch)</h3>
<p class="ds-markdown-paragraph">默认情况下,RabbitMQ会使用轮询(Round-robin) 的方式将消息平均分发给所有消费者,而不考虑每个消费者当前未确认的消息数量。这可能导致一个问题:某些消息处理起来很耗时,而某些很快。如果一个繁忙的消费者前面堆积了很多未确认的消息,而空闲的消费者却得不到新任务,就会造成处理能力浪费。</p>
<p class="ds-markdown-paragraph">为了解决这个问题,我们可以使用 <code>basicQos</code> 方法并设置 <code>prefetchCount = 1</code>。这告诉RabbitMQ不要一次向一个消费者发送超过一条消息。或者换句话说,在消费者处理并确认上一条消息之前,不要向其发送新消息。这样,RabbitMQ会将新消息分发给下一个空闲的消费者。</p>
<hr>
<h2>二、实操部分:构建可靠的工作队列</h2>
<p class="ds-markdown-paragraph">我们将创建一个任务发布者(<code>NewTask</code>)和多个工作者(<code>Worker</code>)。任务消息中的点号<code>.</code>数量代表其处理复杂度(每个点号耗时1秒)。</p>
<h3>第1步:创建项目</h3>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">创建一个新的解决方案。</p>
</li>
<li>
<p class="ds-markdown-paragraph">添加两个控制台应用程序项目:<code>NewTask</code> (生产者) 和 <code>Worker</code> (消费者)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">为两个项目添加 <code>RabbitMQ.Client</code> NuGet包。</p>
</li>
</ol>
<h3>第2步:编写可靠的任务生产者(NewTask.cs)</h3>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 示例: dotnet run "Message."
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> dotnet run "Message.."
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> dotnet run "Message..." ...</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span> ConnectionFactory() { HostName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span>, UserName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span>, Password = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> };
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 1. 声明一个持久化的队列</span>
channel.QueueDeclare(queue: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span>, <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 队列持久化</span>
exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 2. 准备消息</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> GetMessage(args);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetBytes(message);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 3. 设置消息属性为持久化</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 4. 发布消息</span>
channel.BasicPublish(exchange: <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">,
routingKey: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
basicProperties: properties, </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 传入持久化属性</span>
<span style="color: rgba(0, 0, 0, 1)"> body: body);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Sent {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Press to exit.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.ReadLine();
</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">string</span> GetMessage(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">[] args)
{
</span><span style="color: rgba(0, 0, 255, 1)">return</span> args.Length > <span style="color: rgba(128, 0, 128, 1)">0</span> ? <span style="color: rgba(0, 0, 255, 1)">string</span>.Join(<span style="color: rgba(128, 0, 0, 1)">"</span> <span style="color: rgba(128, 0, 0, 1)">"</span>, args) : <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Hello World!</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">关键更改:</p>
<ul>
<li>
<p class="ds-markdown-paragraph"><code>QueueDeclare</code> 中的 <code>durable: true</code> 确保队列在服务器重启后依然存在。</p>
</li>
<li>
<p class="ds-markdown-paragraph">创建了 <code>IBasicProperties</code> 对象并设置 <code>Persistent = true</code>,使消息本身也被标记为持久化。</p>
</li>
</ul>
<h3>第3步:编写可靠的工作者(Worker.cs)</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = <span style="color: rgba(0, 0, 255, 1)">new</span> ConnectionFactory() { HostName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span>, UserName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span>, Password = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> };
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> connection =<span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection())
</span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> connection.CreateModel())
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明持久化队列(必须与生产者声明参数一致)</span>
channel.QueueDeclare(queue: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
exclusive: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
autoDelete: </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">,
arguments: </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> !!! 关键设置:公平分发 !!!
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 告诉RabbitMQ,在当前工作者处理并确认上一条消息之前,不要向其发送新消息</span>
channel.BasicQos(prefetchSize: <span style="color: rgba(128, 0, 128, 1)">0</span>, prefetchCount: <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">global</span>: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> [*] Waiting for messages.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> EventingBasicConsumer(channel);
consumer.Received </span>+= (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
Console.WriteLine($</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Received {message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟耗时任务,消息中的每个点号'.'代表1秒工作</span>
<span style="color: rgba(0, 0, 255, 1)">int</span> dots = message.Split(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">.</span><span style="color: rgba(128, 0, 0, 1)">'</span>).Length - <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">;
Thread.Sleep(dots </span>* <span style="color: rgba(128, 0, 128, 1)">1000</span><span style="color: rgba(0, 0, 0, 1)">);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Done</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> !!! 手动发送消息确认(ACK) !!!
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 只有在任务处理完成后,才发送ACK,告知RabbitMQ可以安全删除消息</span>
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
};
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 启动消费者,设置 autoAck: false (手动确认模式)</span>
channel.BasicConsume(queue: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
autoAck: </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 关闭自动确认!</span>
<span style="color: rgba(0, 0, 0, 1)"> consumer: consumer);
Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)"> Press to exit.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
Console.ReadLine();
}</span></pre>
</div>
<p class="ds-markdown-paragraph">关键更改:</p>
<ul>
<li>
<p class="ds-markdown-paragraph"><code>channel.BasicQos(0, 1, false)</code>: 设置公平分发,每个消费者一次只预取一条消息。</p>
</li>
<li>
<p class="ds-markdown-paragraph"><code>channel.BasicConsume(autoAck: false)</code>: 切换到手动确认模式。</p>
</li>
<li>
<p class="ds-markdown-paragraph">在 <code>Received</code> 事件处理程序的最后,调用 <code>channel.BasicAck(...)</code> 来显式确认消息处理完成。<code>ea.DeliveryTag</code> 是消息的唯一标识符。</p>
</li>
<li>
<p class="ds-markdown-paragraph">使用 <code>Thread.Sleep</code> 模拟耗时任务。</p>
</li>
</ul>
<h3>第4步:运行与演示</h3>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">启动两个(或多个)工作者(Worker)<br>打开两个终端窗口,分别运行 <code>Worker</code> 项目。</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">cd Worker
dotnet run</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">两个窗口都会显示 <code>[*] Waiting for messages.</code>。</p>
</li>
<li>
<p class="ds-markdown-paragraph">发送任务<br>运行 <code>NewTask</code> 项目来发送一些耗时不同的任务。</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">cd NewTask
dotnet run </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">First message.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> # 耗时约1秒
dotnet run </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Second message..</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> # 耗时约2秒
dotnet run </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Third message...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> # 耗时约3秒
dotnet run </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Fourth message....</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> # 耗时约4秒
dotnet run </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Fifth message.....</span><span style="color: rgba(128, 0, 0, 1)">"</span> # 耗时约5秒</pre>
</div>
</div>
</div>
</div>
</div>
</div>
</li>
<li>
<p class="ds-markdown-paragraph">观察现象</p>
<ul>
<li>
<p class="ds-markdown-paragraph">你会看到任务被轮流分配给两个工作者(轮询分发)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">但是,由于我们设置了 <code>prefetchCount=1</code>,当一个工作者正在处理一个长任务(例如5秒)时,RabbitMQ不会再给它发送新消息,而是会将新消息分发给另一个空闲的工作者。这就是公平分发的效果。</p>
</li>
<li>
<p class="ds-markdown-paragraph">查看管理后台(Queues),你会看到 "Unacked"(未确认)消息的数量。只有当工作者调用 <code>BasicAck</code> 后,这个消息才会消失。</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">演示消息确认的重要性</p>
<ul>
<li>
<p class="ds-markdown-paragraph">让一个工作者正在处理一个长任务(比如5秒的任务)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">在它处理过程中,强制关闭这个工作者的终端窗口(模拟消费者崩溃)。</p>
</li>
<li>
<p class="ds-markdown-paragraph">观察另一个工作者窗口和管理后台:刚才那条被中断处理的消息(状态为Unacked)会重新变为Ready,并被自动传递给另一个仍在运行的工作者进行处理。这样就保证了消息绝不会因为消费者崩溃而丢失。</p>
</li>
</ul>
</li>
</ol><hr>
<h2>本章总结</h2>
<p class="ds-markdown-paragraph">在这一章中,我们构建了一个可靠的工作队列系统,并深入学习了RabbitMQ的核心可靠性机制:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">工作队列模式:使用多个消费者并行处理耗时任务。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消息确认(ACK):通过手动确认(<code>autoAck: false</code>)和 <code>BasicAck</code>,确保消息只有在被成功处理后才会被删除,防止消费者崩溃导致消息丢失。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消息与队列持久化:通过 <code>durable: true</code> 和 <code>properties.Persistent = true</code>,防止RabbitMQ服务器重启导致消息丢失。</p>
</li>
<li>
<p class="ds-markdown-paragraph">公平分发(QoS):通过 <code>BasicQos</code> 和 <code>prefetchCount: 1</code>,优化任务分发,使空闲的消费者能优先获得新任务,提高整体处理效率。</p>
</li>
</ol>
<p class="ds-markdown-paragraph">现在,你已经能够构建一个健壮的、用于处理后台任务的分布式系统了。在下一章,我们将离开简单的队列模型,探索RabbitMQ更强大的功能——交换机(Exchange),学习如何实现发布/订阅模式,将一条消息投递给多个消费者。</p><br><br>
来源:https://www.cnblogs.com/jixingsuiyuan/p/19104405
頁:
[1]