RabbitMQ Node.js 示例
<table class="d-block"><tbody class="d-block">
<tr class="d-block">
<td class="d-block comment-body markdown-bodyjs-comment-body">
<p>RabbitQM 处理和管理消息队列的中间人(broker)。可简单理解为邮局,你在程序中写好消息,指定好收件人,剩下的事件就是 RabbitMQ 的工作了,它会保证收件人正确收到邮件。</p>
<p>任何发送邮件的程序都是 <code>Producer</code>,消息队列可理解为邮筒,新件将堆积在此处。所有待处理的消息都以队列形式存储,总体上看来就是一个巨大的消息 buffer,至于存储量与设置的内存及硬件有关。任何应用都可以向队列添加消息,也可以多个消费者都在从队列中获取消息。</p>
<p>而 <code>consumer</code> 即是消息队列中消息的应用,其处于等待接收来自 RabbitMQ 发送来的消息。</p>
<p>消息生产者,消费者及 RabbitMQ 这个中间人三者不必同时存在于同一机器上,实际运用时也确实大部分不会部署在同一机器上,比如有专门的机器作为 RabbitMQ 实体,而应用程序会部署在其他的集群。应用程序可以是同时负责生产消息的,也同时是消费者。</p>
<p align="center"><img alt="来自官方文档中关于 RabbitMQ 消息列队的示意图" src="https://camo.githubusercontent.com/45330b63e7453cfb9017628fc29745c89073b097/68747470733a2f2f7777772e7261626269746d712e636f6d2f696d672f7475746f7269616c732f707974686f6e2d6f6e652e706e67" data-canonical-src="https://www.rabbitmq.com/img/tutorials/python-one.png" style="max-width: 100%"></p>
<p align="center">来自官方文档中关于 RabbitMQ 消息列队的示意图</p>
<h2>安装</h2>
<p>通过官网提供的地址下载相应平台的程序进行安装,Mac 可通过 Homebrew 进行安装:</p>
<div class="highlight highlight-source-shell"><pre>$ brew update <span class="pl-k">&&</span> brew install rabbitmq</pre></div>
<h2>启动</h2>
<p>如果使用 Homebrew 安装,可通过 <code>brew services start rabbitmq</code> 命令来启动 RabbitMQ 服务。</p>
<div class="highlight highlight-source-shell"><pre>$ brew services start rabbitmq
==<span class="pl-k">></span> Successfully started <span class="pl-s"><span class="pl-pds">`</span>rabbitmq<span class="pl-pds">`</span></span> (label: homebrew.mxcl.rabbitmq)</pre></div>
<p>或直接运行 <code>/usr/local/sbin/rabbitmq-server</code>。</p>
<p>启动后,会有一个可视化的管理后台,可通过 http://localhost:15672/ 访问,用户名密码皆为 <code>guest</code>。</p>
<h2>基于 Node.js 的 Hello World 示例</h2>
<p>通过 amqp.node 展示 RabbitMQ 在 Node.js 中应用的一个示例。</p>
<p>RabbmitMQ 支持多种协议进行通信,amqp.node 使用的是 AMQP 0-9-1 这一开源协议,后者专门为处理消息而设计。作为客户端消费消息,使用的是 amqp.node client 模块,但 RabbitMQ 本身是支持多种客户端的。</p>
<p>初始化一个 Node,js 项目然后通过以下命令安装 amqp.node 模块:</p>
<div class="highlight highlight-source-shell"><pre>$ mkdir rabbitmq-demo <span class="pl-k">&&</span> yarn init -y
$ yarn add amqplib</pre></div>
<h3>发送消息</h3>
<p>创建 <code>send.js</code> 文件,在其中编写发送消息的逻辑,它将连接到 RabbitMQ 发送消息然后退出。</p>
<p>首先建立到 RabbitMQ 服务的连接,</p>
<div class="highlight highlight-source-js"><pre><span class="pl-c">#!/usr/bin/env node</span>
<p><span class="pl-k">var</span> amqp <span class="pl-k">=</span> <span class="pl-c1">require</span>(<span class="pl-s"><span class="pl-pds">'</span>amqplib/callback_api<span class="pl-pds">'</span></span>);<br>
<span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {});</p></pre></div><p></p>
<p>连接建立成功后,创建一个通道(channel),具体的发送将会在这个通道中进行。</p>
<div class="highlight highlight-source-js"><pre><span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {
<span class="pl-k">if</span> (error0) {
<span class="pl-k">throw</span> error0;
}
<span class="pl-smi">connection</span>.<span class="pl-en">createChannel</span>(<span class="pl-k">function</span>(<span class="pl-smi">error1</span>, <span class="pl-smi">channel</span>) {});
});</pre></div>
<p>发送消息前,需要先声明一个队列,然后将消息发送到该队列:</p>
<div class="highlight highlight-source-js"><pre><span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {
<span class="pl-k">if</span> (error0) {
<span class="pl-k">throw</span> error0;
}
<span class="pl-smi">connection</span>.<span class="pl-en">createChannel</span>(<span class="pl-k">function</span>(<span class="pl-smi">error1</span>, <span class="pl-smi">channel</span>) {
<span class="pl-k">if</span> (error1) {
<span class="pl-k">throw</span> error1;
}
<span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
<span class="pl-k">var</span> msg <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>Hello world<span class="pl-pds">'</span></span>;
<pre><code><span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> Sent %s<span class="pl-pds">"</span></span>, msg);
</code></pre>
<p>});<br>
});</p></pre></div><p></p>
<p>队列的创建是一个幂等操作,只该队列不存在的情况才会新建。</p>
<p>最后关闭连接并退出。</p>
<div class="highlight highlight-source-js"><pre><span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span>() {
<span class="pl-smi">connection</span>.<span class="pl-c1">close</span>();
<span class="pl-c1">process</span>.<span class="pl-en">exit</span>(<span class="pl-c1">0</span>);
}, <span class="pl-c1">500</span>);</pre></div>
<details>
<summary>
完整的 send.js
</summary>
<div class="highlight highlight-source-js"><pre><span class="pl-c">#!/usr/bin/env node</span>
<p><span class="pl-k">var</span> amqp <span class="pl-k">=</span> <span class="pl-c1">require</span>(<span class="pl-s"><span class="pl-pds">'</span>amqplib/callback_api<span class="pl-pds">'</span></span>);</p>
<p><span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {<br>
<span class="pl-k">if</span> (error0) {<br>
<span class="pl-k">throw</span> error0;<br>
}<br>
<span class="pl-smi">connection</span>.<span class="pl-en">createChannel</span>(<span class="pl-k">function</span>(<span class="pl-smi">error1</span>, <span class="pl-smi">channel</span>) {<br>
<span class="pl-k">if</span> (error1) {<br>
<span class="pl-k">throw</span> error1;<br>
}</p>
<pre><code> <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
<span class="pl-k">var</span> msg <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>Hello World!<span class="pl-pds">'</span></span>;
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> Sent %s<span class="pl-pds">"</span></span>, msg);
});
<span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span>() {
<span class="pl-smi">connection</span>.<span class="pl-c1">close</span>();
<span class="pl-c1">process</span>.<span class="pl-en">exit</span>(<span class="pl-c1">0</span>);
}, <span class="pl-c1">500</span>);
</code></pre>
<p>});</p></pre></div><p></p>
</details>
<h3>接收消息</h3>
<p>下面开始编写消费者,消费者做的事情是监听来自 RabbitMQ 的消息并处理。</p>
<p>创建 <code>receive.js</code>,引入 amqp.node 模块,流程和发送者一样,也是先创建连接,然后创建通道,在通道中声明需要监听的队列:</p>
<div class="highlight highlight-source-js"><pre><span class="pl-c">#!/usr/bin/env node</span>
<p><span class="pl-k">var</span> amqp <span class="pl-k">=</span> <span class="pl-c1">require</span>(<span class="pl-s"><span class="pl-pds">'</span>amqplib/callback_api<span class="pl-pds">'</span></span>);</p>
<p><span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {<br>
<span class="pl-k">if</span> (error0) {<br>
<span class="pl-k">throw</span> error0;<br>
}<br>
<span class="pl-smi">connection</span>.<span class="pl-en">createChannel</span>(<span class="pl-k">function</span>(<span class="pl-smi">error1</span>, <span class="pl-smi">channel</span>) {<br>
<span class="pl-k">if</span> (error1) {<br>
<span class="pl-k">throw</span> error1;<br>
}<br>
<span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;</p>
<pre><code><span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
</code></pre>
<p>});<br>
});</p></pre></div><p></p>
<p>这里的队列声明不会与发送者那边的冲突,因为上面提到过,队列只在不存在的情况下才会重新生成。这里再次声明可以保证监听前队列已经存在。并且实际场景下,消费者有可能是在发送者之前启动的。</p>
<p>然后添加监听的逻辑:</p>
<div class="highlight highlight-source-js"><pre> <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [*] Waiting for messages in %s. To exit press CTRL+C<span class="pl-pds">"</span></span>, queue);
<p><span class="pl-smi">channel</span>.<span class="pl-en">consume</span>(queue, <span class="pl-k">function</span>(<span class="pl-smi">msg</span>) {<br>
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> Received %s<span class="pl-pds">"</span></span>, <span class="pl-smi">msg</span>.<span class="pl-c1">content</span>.<span class="pl-c1">toString</span>());<br>
}, {<br>
noAck<span class="pl-k">:</span> <span class="pl-c1">true</span><br>
});</p></pre></div><p></p>
<details>
<summary>
完整的 receive.js
</summary>
<div class="highlight highlight-source-js"><pre>#<span class="pl-k">!</span><span class="pl-sr"><span class="pl-pds">/</span>usr<span class="pl-pds">/</span></span>bin<span class="pl-k">/</span>env node
<p><span class="pl-k">var</span> amqp <span class="pl-k">=</span> <span class="pl-c1">require</span>(<span class="pl-s"><span class="pl-pds">'</span>amqplib/callback_api<span class="pl-pds">'</span></span>);</p>
<p><span class="pl-smi">amqp</span>.<span class="pl-en">connect</span>(<span class="pl-s"><span class="pl-pds">'</span>amqp://localhost<span class="pl-pds">'</span></span>, <span class="pl-k">function</span>(<span class="pl-smi">error0</span>, <span class="pl-smi">connection</span>) {<br>
<span class="pl-k">if</span> (error0) {<br>
<span class="pl-k">throw</span> error0;<br>
}<br>
<span class="pl-smi">connection</span>.<span class="pl-en">createChannel</span>(<span class="pl-k">function</span>(<span class="pl-smi">error1</span>, <span class="pl-smi">channel</span>) {<br>
<span class="pl-k">if</span> (error1) {<br>
<span class="pl-k">throw</span> error1;<br>
}</p>
<pre><code> <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [*] Waiting for messages in %s. To exit press CTRL+C<span class="pl-pds">"</span></span>, queue);
<span class="pl-smi">channel</span>.<span class="pl-en">consume</span>(queue, <span class="pl-k">function</span>(<span class="pl-smi">msg</span>) {
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> Received %s<span class="pl-pds">"</span></span>, <span class="pl-smi">msg</span>.<span class="pl-c1">content</span>.<span class="pl-c1">toString</span>());
}, {
noAck<span class="pl-k">:</span> <span class="pl-c1">true</span>
});
});
</code></pre>
<p>});</p></pre></div><p></p>
</details>
<h3>运行</h3>
<p>分别在命令行启动上面两个程序,查看打印的信息。</p>
<div class="highlight highlight-source-shell"><pre>$ node send.js
Sent Hello World<span class="pl-k">!</span>
<p>$ node receive.js<br>
[<span class="pl-k">*</span>] Waiting <span class="pl-k">for</span> <span class="pl-smi">messages</span> <span class="pl-k">in</span> hello. To <span class="pl-c1">exit</span> press CTRL+C<br>
Received Hello World<span class="pl-k">!</span></p></pre></div><p></p>
<p>另外,可通过 <code>sudo rabbitmqctl list_queues</code> 手动查看 RabbitMQ 中的消息。</p>
<div class="highlight highlight-source-shell"><pre>$ /usr/local/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues <span class="pl-k">for</span> vhost / ...
name messages
hello 0</pre></div>
<p>如果发现 <code>rabbitmqctl</code> 命令不可用,需要添加 <code>/usr/local/sbin</code> 到环境变量中,</p>
<div class="highlight highlight-source-shell"><pre><span class="pl-k">export</span> PATH=/usr/local/sbin:<span class="pl-smi">$PATH</span></pre></div>
<p>其中 fish shell 通过添加如下命令到 fish 的配置文件即可:</p>
<div class="highlight highlight-source-shell"><pre><span class="pl-c1">set</span> -gx PATH /usr/local/sbin <span class="pl-smi">$PATH</span></pre></div>
<h2>相关资源</h2>
<ul>
<li>RabbitMQ Introduction</li>
<li>Node.js code for RabbitMQ tutorials</li>
<li>squaremo/amqp.node</li>
</ul>
</td>
</tr>
</tbody>
</table>
</div>
<div id="MySignature" role="contentinfo">
<div>
<img src="https://licensebuttons.net/l/by-nc-sa/3.0/88x31.png" style="vertical-align: middle">
<strong>CC BY-NC-SA 署名-非商业性使用-相同方式共享</strong>
</div><br><br>
来源:https://www.cnblogs.com/Wayou/p/rabbitmq_nodejs_example.html
頁:
[1]