风声话林语 發表於 2021-8-5 16:27:00

PHP实现RabbitMQ

<p><strong>介绍</strong></p>
<p>RabbitMQ是一个在AMQP基础上实现的企业级消息系统。何谓消息系统,就是消息队列系统,消息队列是“”消费-生产者模型“”的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。</p>
<p>在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步操作,而这种异步处理的方式大大的节省了服务器的请求时间,从而提高了系统的吞吐量。而且不影响服务器做其他相应,不独占服务器资源。</p>
<p>如:注册用户这种服务,它可能解耦成好几种独立的服务(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经过分解并发送到各个服务所在的url,分发的那个角色就相当于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个请求队列,那就是内存缓冲区了。做这项工作的框架叫做消息队列。</p>
<p>又比如:电商系统中的订单处理系统,传统处理模式是:下订单的时候,订单系统可能会调用库存系统的接口,这样两个系统之间存在一个严重依赖关系,如果库存系统宕机,那么整个流程都会受到影响。现在大多公司的处理方法是:引入消息队列,下完订单,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。</p>
<p>对库存系统来说,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。这样实现了两个系统间的解耦。</p>
<p>即使在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。</p>
<p>&nbsp;结构图如下:</p>
<p><img src="https://images2017.cnblogs.com/blog/489086/201708/489086-20170829135051968-316242301.png"></p>
<div class="para">几个概念说明:</div>
<div class="para">Broker:简单来说就是消息队列服务器实体。<br>  Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。<br>  Queue:消息队列载体,每个消息都会被投入到一个或多个队列。<br>  Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。<br>  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。<br>  vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。<br>  producer:消息生产者,就是投递消息的程序。<br>  consumer:消息消费者,就是接受消息的程序。<br>  channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。</div>
<div class="para">消息队列的使用过程大概如下:</div>
<div class="para">(1)客户端连接到消息队列服务器,打开一个channel。<br>  (2)客户端声明一个exchange,并设置相关属性。<br>  (3)客户端声明一个queue,并设置相关属性。<br>  (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。<br>  (5)客户端投递消息到exchange。</div>
<div class="para">exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。</div>
<div class="para">exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。</div>
<div class="para">RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:<br>  (1)exchange持久化,在声明时指定durable =&gt; 1<br>  (2)queue持久化,在声明时指定durable =&gt; 1<br>  (3)消息持久化,在投递时指定delivery_mode =&gt; 2(1是非持久化)</div>
<div class="para">如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。</div>
<div class="para">&nbsp;</div>
<p>好了,讲了这么多基本讲清楚了RabbitMQ的应用场景和好处,下面我们在windows平台上练一把手,更直观的来看看RabbitMQ到底是什么?</p>
<p>安装就省了,网上有很多教程,在此不做过多赘述。</p>
<p><strong>类封装</strong></p>
<p>定义一个名为&nbsp;rabbitmq.php的类文件</p>
<div class="cnblogs_code">
<pre>&lt;?<span style="color: rgba(0, 0, 0, 1)">php
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ {
      </span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$connect</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">连接对象</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$channel</span>;<span style="color: rgba(0, 128, 0, 1)">//
</span>      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$exchange</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">交换机对象</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$exchange_name</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">交换机名称</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$exchange_type</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">交换机类型</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$queue</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">对列对象</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$queue_name</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">队列名称</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$queue_type</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">对列类型</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$call_back_fnc</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">回调函数</span>
      <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(128, 0, 128, 1)">$is_ack</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">是否确认收到消息</span>

      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> __construct(<span style="color: rgba(128, 0, 128, 1)">$params</span>=[], <span style="color: rgba(128, 0, 128, 1)">$persistent_flag</span>=<span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(128, 0, 128, 1)">$con</span>['host'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['host'] ?? '127.0.0.1'<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$con</span>['vhost'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['vhost'] ?? '/'<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$con</span>['port'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['port'] ?? 5672<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$con</span>['login'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['login'] ?? 'guest'<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$con</span>['password'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['password'] ?? 'guest'<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; exchange_name = <span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_name'] ?? ''<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; exchange_type = <span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_type'] ??<span style="color: rgba(0, 0, 0, 1)"> AMQP_EX_TYPE_DIRECT;
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; queue_name = <span style="color: rgba(128, 0, 128, 1)">$params</span>['queue_name'] ?? ''<span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; queue_type = <span style="color: rgba(128, 0, 128, 1)">$params</span>['queue_type'] ??<span style="color: rgba(0, 0, 0, 1)"> AMQP_EX_TYPE_DIRECT;
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">连接初始化</span>
                <span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; connect = <span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; connectInit(<span style="color: rgba(128, 0, 128, 1)">$con</span>, <span style="color: rgba(128, 0, 128, 1)">$persistent_flag</span><span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(!<span style="color: rgba(128, 0, 128, 1)">$con</span><span style="color: rgba(0, 0, 0, 1)">) {
                  </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> <span style="color: rgba(0, 0, 255, 1)">Exception</span>('Cannot connect to the broker!'<span style="color: rgba(0, 0, 0, 1)">);
                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; channel =<span style="color: rgba(0, 0, 255, 1)">new</span> AMQPChannel(<span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt;<span style="color: rgba(0, 0, 0, 1)"> connect);
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(<span style="color: rgba(0, 0, 255, 1)">Exception</span> <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "Exception error connect:{<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
                writeLog(</span>'msg:'. <span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;   
            }
         
      }

      </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
         * 连接初始化
         * @param $config array 配置信息
         * @param persistent_flag bool 持久化
         * @return object
         </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> connectInit(<span style="color: rgba(128, 0, 128, 1)">$config</span>, <span style="color: rgba(128, 0, 128, 1)">$persistent_flag</span><span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(128, 0, 128, 1)">$connect</span> = <span style="color: rgba(0, 0, 255, 1)">new</span> AMQPConnection(<span style="color: rgba(128, 0, 128, 1)">$config</span><span style="color: rgba(0, 0, 0, 1)">);
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(128, 0, 128, 1)">$connect</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">pconnect()) {
                </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$persistent_flag</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$connect</span>-&gt;isPersistent(1<span style="color: rgba(0, 0, 0, 1)">);
            }
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(128, 0, 128, 1)">$connect</span><span style="color: rgba(0, 0, 0, 1)">;
      }

      </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
         * 设置交换机
         * @param $params['exchange_name'] string 交换机名称
         * @param $params['exchange_type'] string 交换机类型
         * @param $params['flag'] int 交换机标志
         * @return object | bool
         </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> setExchange(<span style="color: rgba(128, 0, 128, 1)">$params</span><span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = ''<span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(!<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">channel){
                  </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> \AMQPQueueException("Error channel on method setExchange", 1<span style="color: rgba(0, 0, 0, 1)">);
                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange = <span style="color: rgba(0, 0, 255, 1)">new</span> \AMQPExchange(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">channel);
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_name'<span style="color: rgba(0, 0, 0, 1)">]){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange_name = <span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_name']; <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 交换机名称</span>
<span style="color: rgba(0, 0, 0, 1)">                }
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> (<span style="color: rgba(0, 0, 255, 1)">empty</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange_name)) <span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange-&gt;setName(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange_name); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置名称</span>
                <span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_type'<span style="color: rgba(0, 0, 0, 1)">]){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange_type = <span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_type'<span style="color: rgba(0, 0, 0, 1)">];
                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange-&gt;<span style="color: rgba(0, 128, 128, 1)">settype</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange_type);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置交换机类型</span>
                <span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['flag'<span style="color: rgba(0, 0, 0, 1)">]){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange-&gt;setFlags(<span style="color: rgba(128, 0, 128, 1)">$params</span>['flag']);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">交换机标志</span>
<span style="color: rgba(0, 0, 0, 1)">                }
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; exchange_type &amp;&amp; <span style="color: rgba(128, 0, 128, 1)">$params</span>['flag'<span style="color: rgba(0, 0, 0, 1)">]){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange-&gt;declareExchange();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 创建</span>
<span style="color: rgba(0, 0, 0, 1)">                }
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(AMQPQueueException <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "AMQPQueueException error exchange: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(<span style="color: rgba(0, 0, 255, 1)">Exception</span> <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "Exception error exchange:{<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">){
                writeLog(</span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(128, 0, 128, 1)">$this</span><span style="color: rgba(0, 0, 0, 1)">;
      }

      </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
         * 设置队列
         * @param $params['queue_name'] string 对列名称
         * @param $params['flag'] int 对列标记
         * @param $params['arguments'] array 参数配置
         * @param $params['routing_key'] string 要绑定的模式或路由键
         * @return object | bool
         </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> setQueue(<span style="color: rgba(128, 0, 128, 1)">$params</span><span style="color: rgba(0, 0, 0, 1)">){
            </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = ''<span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue = <span style="color: rgba(0, 0, 255, 1)">new</span> \AMQPQueue(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">channel);
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 255, 1)">empty</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_name'<span style="color: rgba(0, 0, 0, 1)">])) {
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; exchange_name = <span style="color: rgba(128, 0, 128, 1)">$params</span>['exchange_name'<span style="color: rgba(0, 0, 0, 1)">];
                }
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(!<span style="color: rgba(0, 0, 255, 1)">empty</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['queue_name'<span style="color: rgba(0, 0, 0, 1)">])) {
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue_name = <span style="color: rgba(128, 0, 128, 1)">$params</span>['queue_name'];<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 队列名称</span>
<span style="color: rgba(0, 0, 0, 1)">                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;setName(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">queue_name);
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['flag'<span style="color: rgba(0, 0, 0, 1)">]){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;setFlags(<span style="color: rgba(128, 0, 128, 1)">$params</span>['flag']);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明</span>
<span style="color: rgba(0, 0, 0, 1)">                }
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(0, 128, 128, 1)">is_array</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['arguments']) &amp;&amp; !<span style="color: rgba(0, 0, 255, 1)">empty</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['arguments'<span style="color: rgba(0, 0, 0, 1)">])){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;setArguments(<span style="color: rgba(128, 0, 128, 1)">$params</span>['arguments']);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 参数配置</span>
<span style="color: rgba(0, 0, 0, 1)">                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;declareQueue();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 创建一个队列</span>
                <span style="color: rgba(128, 0, 128, 1)">$routing_key</span> = !<span style="color: rgba(0, 0, 255, 1)">empty</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['routing_key']) ? <span style="color: rgba(128, 0, 128, 1)">$params</span>['routing_key'] : <span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">queue_name;
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt; exchange_name &amp;&amp; <span style="color: rgba(128, 0, 128, 1)">$routing_key</span><span style="color: rgba(0, 0, 0, 1)">){
                  </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;bind(<span style="color: rgba(128, 0, 128, 1)">$this</span> -&gt;exchange_name, <span style="color: rgba(128, 0, 128, 1)">$routing_key</span>);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 交换机和队列的绑定操作</span>
<span style="color: rgba(0, 0, 0, 1)">                }

            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(AMQPQueueException <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "AMQPQueueException error queue: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(<span style="color: rgba(0, 0, 255, 1)">Exception</span> <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "Exception error queue:{<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">){
                writeLog(</span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(128, 0, 128, 1)">$this</span><span style="color: rgba(0, 0, 0, 1)">;
      }

      </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
         * 发布消息
         * @param $params['message'] string 消息
         * @param $params['routing_key'] string 要绑定的模式或路由键
         * @param $params['flag'] int 参数配置 AMQP_NOPARAM | AMQP_IMMEDIATE
         * @param $params['attributes'] array
         * @return bool
         </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> publishMessage(<span style="color: rgba(128, 0, 128, 1)">$params</span>=<span style="color: rgba(0, 0, 0, 1)">[]){
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(!<span style="color: rgba(128, 0, 128, 1)">$params</span>['message'])<span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(128, 0, 128, 1)">$routing_key</span> = <span style="color: rgba(128, 0, 128, 1)">$params</span>['routing_key'] ? <span style="color: rgba(128, 0, 128, 1)">$params</span>['routing_key'] : <span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">queue_name;
            </span><span style="color: rgba(128, 0, 128, 1)">$params</span>['attributes'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['attributes'] ??<span style="color: rgba(0, 0, 0, 1)"> [];
            </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布消息,带有路由key。如果需要,则会用于关联。</span>
            <span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;exchange-&gt;publish(<span style="color: rgba(128, 0, 128, 1)">$params</span>['message'], <span style="color: rgba(128, 0, 128, 1)">$routing_key</span>, <span style="color: rgba(128, 0, 128, 1)">$params</span>['flag'], <span style="color: rgba(128, 0, 128, 1)">$params</span>['attributes'<span style="color: rgba(0, 0, 0, 1)">]);
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
      }

      </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
         * 消费
         * @param $params['callback'] arguments 回调函数
         * @param $params['qos'] int 要预取的消息数
         * @param $params['is_ack'] bool 是否确认收到消息
         </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
      <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">function</span> consumeMessage(<span style="color: rgba(128, 0, 128, 1)">$params</span><span style="color: rgba(0, 0, 0, 1)">){
            </span><span style="color: rgba(128, 0, 128, 1)">$params</span>['callback'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['callback'] ?? <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(128, 0, 128, 1)">$params</span>['qos'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['qos'] ?? 0<span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(128, 0, 128, 1)">$params</span>['is_ack'] = <span style="color: rgba(128, 0, 128, 1)">$params</span>['is_ack'] ?? <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$params</span>['qos'<span style="color: rgba(0, 0, 0, 1)">]){
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;channel-&gt;qos(0, <span style="color: rgba(128, 0, 128, 1)">$params</span>['qos'<span style="color: rgba(0, 0, 0, 1)">]);
            }
            </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = ''<span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>(!<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">queue){
                  </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> \AMQPQueueException("Error queue on method consume", 1<span style="color: rgba(0, 0, 0, 1)">);
                }
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;call_back_fnc = <span style="color: rgba(128, 0, 128, 1)">$params</span>['callback'<span style="color: rgba(0, 0, 0, 1)">];
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;is_ack = <span style="color: rgba(128, 0, 128, 1)">$params</span>['is_ack'<span style="color: rgba(0, 0, 0, 1)">];
                </span><span style="color: rgba(128, 0, 128, 1)">$params</span>['callback'] = <span style="color: rgba(0, 0, 255, 1)">function</span>(<span style="color: rgba(128, 0, 128, 1)">$envelope</span>, <span style="color: rgba(128, 0, 128, 1)">$queue</span><span style="color: rgba(0, 0, 0, 1)">){
                              </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(0, 128, 128, 1)">is_callable</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">call_back_fnc)){
                                    </span><span style="color: rgba(0, 128, 128, 1)">call_user_func</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;call_back_fnc, <span style="color: rgba(128, 0, 128, 1)">$envelope</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">getBody());
                                    </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">is_ack){
                                        </span><span style="color: rgba(128, 0, 128, 1)">$queue</span>-&gt;ack(<span style="color: rgba(128, 0, 128, 1)">$envelope</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">getDeliveryTag());
                                    }</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">{
                                        </span><span style="color: rgba(128, 0, 128, 1)">$queue</span>-&gt;nack(<span style="color: rgba(128, 0, 128, 1)">$envelope</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">getDeliveryTag());
                                    }
                              }
                            };
                </span><span style="color: rgba(128, 0, 128, 1)">$this</span>-&gt;queue-&gt;consume(<span style="color: rgba(128, 0, 128, 1)">$params</span>['callback'<span style="color: rgba(0, 0, 0, 1)">]);
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(AMQPQueueException <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "AMQPQueueException error queue: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span>(<span style="color: rgba(0, 0, 255, 1)">Exception</span> <span style="color: rgba(128, 0, 128, 1)">$ex</span><span style="color: rgba(0, 0, 0, 1)">) {
                </span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span> = "Exception error queue:{<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getMessage()},\r\nline: {<span style="color: rgba(128, 0, 128, 1)">$ex</span>-&gt;getLine()}\r\n"<span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(<span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">){
                writeLog(</span><span style="color: rgba(128, 0, 128, 1)">$error_msg</span><span style="color: rgba(0, 0, 0, 1)">);
            }
      }
}

</span><span style="color: rgba(0, 0, 255, 1)">function</span> writeLog(<span style="color: rgba(128, 0, 128, 1)">$msg</span><span style="color: rgba(0, 0, 0, 1)">){
    </span><span style="color: rgba(128, 0, 128, 1)">$logFile</span> = './rabbitmq_w_' . <span style="color: rgba(0, 128, 128, 1)">date</span>('N') . '.log'<span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(128, 0, 128, 1)">$fp</span> = <span style="color: rgba(0, 128, 128, 1)">fopen</span>(<span style="color: rgba(128, 0, 128, 1)">$logFile</span>,"a"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(128, 0, 128, 1)">$requestdir</span> = <span style="color: rgba(0, 128, 128, 1)">dirname</span>(<span style="color: rgba(128, 0, 128, 1)">$logFile</span><span style="color: rgba(0, 0, 0, 1)">);

    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 128, 128, 1)">is_dir</span>(<span style="color: rgba(128, 0, 128, 1)">$requestdir</span><span style="color: rgba(0, 0, 0, 1)">)) {
      </span><span style="color: rgba(0, 128, 128, 1)">mkdir</span>(<span style="color: rgba(128, 0, 128, 1)">$requestdir</span>, 0777, 1<span style="color: rgba(0, 0, 0, 1)">);
    }
    </span><span style="color: rgba(0, 128, 128, 1)">flock</span>(<span style="color: rgba(128, 0, 128, 1)">$fp</span>,<span style="color: rgba(0, 0, 0, 1)"> LOCK_EX);
    </span><span style="color: rgba(0, 128, 128, 1)">fwrite</span>(<span style="color: rgba(128, 0, 128, 1)">$fp</span>,<span style="color: rgba(0, 128, 128, 1)">date</span>("Y-m-d H:i:s",<span style="color: rgba(0, 128, 128, 1)">time</span>()).": ".<span style="color: rgba(128, 0, 128, 1)">$msg</span>."\n"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 128, 128, 1)">flock</span>(<span style="color: rgba(128, 0, 128, 1)">$fp</span>,<span style="color: rgba(0, 0, 0, 1)"> LOCK_UN);
    </span><span style="color: rgba(0, 128, 128, 1)">fclose</span>(<span style="color: rgba(128, 0, 128, 1)">$fp</span><span style="color: rgba(0, 0, 0, 1)">);
} </span></pre>
</div>
<p>&nbsp;</p>
<p><strong>生产消息</strong></p>
<p>定义一个名称为sender.php的文件</p>
<div class="cnblogs_code">
<pre>&lt;?<span style="color: rgba(0, 0, 0, 1)">php
</span><span style="color: rgba(0, 0, 255, 1)">require_once</span> './rabbitmq.php'<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(128, 0, 128, 1)">$config</span> =<span style="color: rgba(0, 0, 0, 1)"> [
    </span>'host' =&gt; '127.0.0.1',      <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">IP地址</span>
    'vhost' =&gt; '/',             <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">主机</span>
    'port' =&gt; 5672,             <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">端口号</span>
    'login' =&gt; 'guest',         <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">用户名</span>
    'password' =&gt; 'guest',       <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">密码</span>
<span style="color: rgba(0, 0, 0, 1)">];
</span><span style="color: rgba(128, 0, 128, 1)">$mq</span> = <span style="color: rgba(0, 0, 255, 1)">new</span> RabbitMQ(<span style="color: rgba(128, 0, 128, 1)">$config</span>);   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 初始化(rmq连接操作);</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (<span style="color: rgba(128, 0, 128, 1)">$mq</span><span style="color: rgba(0, 0, 0, 1)">) {
    </span><span style="color: rgba(128, 0, 128, 1)">$mq_route</span> = 'push_data_to_routing';<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 路由</span>
    <span style="color: rgba(128, 0, 128, 1)">$mq_exchange</span> = 'push_data_to_exchange';<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 交换机</span>
    <span style="color: rgba(128, 0, 128, 1)">$mq_queue</span> = 'push_data_to_queue';<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 队列
    // 建立连接,设置交换机,设置队列</span>
    <span style="color: rgba(128, 0, 128, 1)">$exchange_params</span> =<span style="color: rgba(0, 0, 0, 1)"> [
      </span>'exchange_name' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_exchange</span>,
      'exchange_type' =&gt; AMQP_EX_TYPE_DIRECT,
      'flag' =&gt; AMQP_DURABLE,<span style="color: rgba(0, 0, 0, 1)">
    ];
    </span><span style="color: rgba(128, 0, 128, 1)">$queue_params</span> =<span style="color: rgba(0, 0, 0, 1)"> [
      </span>'queue_name' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_queue</span>,
      'exchange_name' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_exchange</span>,
      'routing_key' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_route</span>,
      'flag' =&gt; AMQP_DURABLE,<span style="color: rgba(0, 0, 0, 1)">
    ];
    </span><span style="color: rgba(128, 0, 128, 1)">$publish_params</span> =<span style="color: rgba(0, 0, 0, 1)"> [
      </span>'routing_key' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_route</span>,<span style="color: rgba(0, 0, 0, 1)">
    ];
    </span><span style="color: rgba(128, 0, 128, 1)">$data_list</span> = ['iphone','honor','vivo','xiaomi','oppo'<span style="color: rgba(0, 0, 0, 1)">];
    </span><span style="color: rgba(128, 0, 128, 1)">$mq</span>-&gt;setExchange(<span style="color: rgba(128, 0, 128, 1)">$exchange_params</span>)-&gt;setQueue(<span style="color: rgba(128, 0, 128, 1)">$queue_params</span><span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">foreach</span> (<span style="color: rgba(128, 0, 128, 1)">$data_list</span> <span style="color: rgba(0, 0, 255, 1)">as</span> <span style="color: rgba(128, 0, 128, 1)">$item</span><span style="color: rgba(0, 0, 0, 1)">){
      </span><span style="color: rgba(128, 0, 128, 1)">$publish_params</span>['message'] = <span style="color: rgba(128, 0, 128, 1)">$item</span><span style="color: rgba(0, 0, 0, 1)">;
      </span><span style="color: rgba(128, 0, 128, 1)">$mq</span>-&gt;publishMessage(<span style="color: rgba(128, 0, 128, 1)">$publish_params</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">推送</span>
<span style="color: rgba(0, 0, 0, 1)">    }
}</span></pre>
</div>
<p><strong>接收消息</strong></p>
<p>定义一个名称为receiver.php的文件</p>
<div class="cnblogs_code">
<pre>&lt;?<span style="color: rgba(0, 0, 0, 1)">php
</span><span style="color: rgba(0, 0, 255, 1)">require_once</span> './rabbitmq.php'<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(128, 0, 128, 1)">$config</span> =<span style="color: rgba(0, 0, 0, 1)"> [
    </span>'host' =&gt; '127.0.0.1',      <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">IP地址</span>
    'vhost' =&gt; '/',             <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">主机</span>
    'port' =&gt; 5672,             <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">端口号</span>
    'login' =&gt; 'guest',         <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">用户名</span>
    'password' =&gt; 'guest',       <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">密码</span>
<span style="color: rgba(0, 0, 0, 1)">];
</span><span style="color: rgba(128, 0, 128, 1)">$mq</span> = <span style="color: rgba(0, 0, 255, 1)">new</span> RabbitMQ(<span style="color: rgba(128, 0, 128, 1)">$config</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(128, 0, 128, 1)">$mq_route</span> = 'push_data_to_routing'<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(128, 0, 128, 1)">$mq_exchange</span> = 'push_data_to_exchange'<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(128, 0, 128, 1)">$mq_queue</span> = 'push_data_to_queue'<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(128, 0, 128, 1)">$exchange_params</span> =<span style="color: rgba(0, 0, 0, 1)"> [
    </span>'exchange_name' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_exchange</span>,
    'exchange_type' =&gt; '',
    'flag' =&gt; AMQP_PASSIVE,<span style="color: rgba(0, 0, 0, 1)">
];
</span><span style="color: rgba(128, 0, 128, 1)">$queue_params</span> =<span style="color: rgba(0, 0, 0, 1)"> [
    </span>'queue_name' =&gt; <span style="color: rgba(128, 0, 128, 1)">$mq_queue</span>,
    'flag' =&gt; AMQP_PASSIVE,<span style="color: rgba(0, 0, 0, 1)">
];
</span><span style="color: rgba(128, 0, 128, 1)">$mq</span>-&gt;setExchange(<span style="color: rgba(128, 0, 128, 1)">$exchange_params</span>)-&gt;setQueue(<span style="color: rgba(128, 0, 128, 1)">$queue_params</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(128, 0, 128, 1)">$mq</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">consumeMessage([
    </span>'callback'=&gt;<span style="color: rgba(0, 0, 255, 1)">function</span>(<span style="color: rgba(128, 0, 128, 1)">$msg</span><span style="color: rgba(0, 0, 0, 1)">){
      </span><span style="color: rgba(0, 128, 128, 1)">sleep</span>(2<span style="color: rgba(0, 0, 0, 1)">);
      </span><span style="color: rgba(0, 128, 128, 1)">var_dump</span>(<span style="color: rgba(128, 0, 128, 1)">$msg</span><span style="color: rgba(0, 0, 0, 1)">);
      </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
    }
]);</span></pre>
</div>
<p>执行</p>
<p>win+R 调出cmd,切换至代码所在目录,一个窗口执行sender.php 一个窗口执行receiver.php</p>
<p>结果如下:</p>
<p><img src="https://img2020.cnblogs.com/blog/1362155/202108/1362155-20210805162224626-645888710.png"></p>
<p>&nbsp;<img src="https://img2020.cnblogs.com/blog/1362155/202108/1362155-20210805162549154-563286502.png"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<pre><span style="color: rgba(237, 148, 255, 1); font-style: italic">&nbsp;</span></pre><br><br>
来源:https://www.cnblogs.com/soaring-sun/p/15103928.html
頁: [1]
查看完整版本: PHP实现RabbitMQ