青春厦门 發表於 2019-8-1 09:44:00

php使用rdkafka进行消费

<ul>
<li>
<p>如仅作为消费者或生产者,直接使用下面消费者或生产者的代码,并安装扩展即可。</p>
</li>
<li>
<p>PHP要安装rdkafka扩展,而rdkafka又依赖librdkafka,因此你需要安装rdkafka和librdkafka,之后就可以与kafka服务器交互了。</p>
</li>
<li>
<p>如搭建kafka服务,需要jdk环境和zookeeper,以及kafka远程访问的配置,请参考</p>
<ul>
<li>linux : https://www.cnblogs.com/lz0925/p/11189440.html</li>
<li>windows :https://www.cnblogs.com/lz0925/p/11227128.html</li>
</ul>
</li>
</ul>
<h3 id="消费者">消费者</h3>
<pre><code>&lt;?php
/**
* 代码中的输出注释都可以打开供调试使用
* 对 中台生产的用户信息 进行消费
* Date: 2019/7/31
*/
// 设置将要消费消息的主题
$topic = 'alikafka-jl-yz-zt-updata-test';
$host = '172.168.50.233';
$group_id = 'CID_alikafka_jl_lz';

$conf = new \RdKafka\Conf();
// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
$conf-&gt;setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
      case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
//            echo "Assign: ";
//            var_dump($partitions);
            $kafka-&gt;assign($partitions);
            break;
      case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
//            echo "Revoke: ";
//            var_dump($partitions);
            $kafka-&gt;assign(NULL);
            break;
      default:
            throw new \Exception($err);
    }
});
// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,
// 所以同一个组内的消费者数量如果订阅了一个topic,
// 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf-&gt;set('group.id', $group_id);

// 添加 kafka集群服务器地址
$conf-&gt;set('metadata.broker.list', $host); //'localhost:9092,localhost:9093,localhost:9094,localhost:9095'

// 针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止
$conf-&gt;set('socket.timeout.ms', 50);
//多进程和信号
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf-&gt;set('internal.termination.signal', SIGIO);
} else {
    $conf-&gt;set('queue.buffering.max.ms', 1);
}

$topicConf = new \RdKafka\TopicConf();
// 在interval.ms的时间内自动提交确认、建议不要启动, 1是启动,0是未启动
$topicConf-&gt;set('auto.commit.enable', 1);
$topicConf-&gt;set('auto.commit.interval.ms', 100);
//smallest:简单理解为从头开始消费,largest:简单理解为从最新的开始消费
$topicConf-&gt;set('auto.offset.reset', 'smallest');
// 设置offset的存储为broker
//$topicConf-&gt;set('offset.store.method', 'broker');
// 设置offset的存储为file
//$topicConf-&gt;set('offset.store.method', 'file');
// 设置offset的存储路径
$topicConf-&gt;set('offset.store.path', 'kafka_offset.log');
//$topicConf-&gt;set('offset.store.path', __DIR__);

$conf-&gt;setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// 更新订阅集(自动分配partitions )
$consumer-&gt;subscribe([$topic]);

//      指定topic分配partitions使用那个分区
//      $consumer-&gt;assign([
//            new \RdKafka\TopicPartition("zzy8", 0),
//            new \RdKafka\TopicPartition("zzy8", 1),
//            ]);

while (true) {
//            设置120s为超时
    $message = $consumer-&gt;consume(3 * 1000);
    if (!empty($message)) {
      switch ($message-&gt;err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump('New message received :', $message);// 打印消息
//            拆解对象为数组,并根据业务需求处理数据
                $payload = json_decode($message-&gt;payload,true);
                $key = $message-&gt;key;
//            根据kafka中不同key,调用对应方法传递处理数据*(如果有必要的话)
                //对该条message进行处理,比如用户数据同步, 记录日志。
//                var_dump("asasasasasasasasasasasas");
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                var_dump("##################");
                break;
            default:
                var_dump("nothing");
                throw new \Exception($message-&gt;errstr(), $message-&gt;err);
                break;
      }

    } else {
      var_dump('this is empty obj!!!');
    }
}
</code></pre>
<h3 id="生产者">生产者</h3>
<pre><code>&lt;?php
/**
* Date: 2019/8/1
*/
$conf = new RdKafka\Conf();

$conf-&gt;setDrmSgCb(function ($kafka, $message){
    file_put_contents("d:/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf-&gt;setErrorCb(function ($kafka, $err, $reason){
    file_put_contents("d:/err_cb.log",sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
$rk = new RdKafka\Producer($conf);
$rk-&gt;setLogLevel(LOG_DEBUG);

$rk-&gt;addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf-&gt;set('request.required.acks', 0);
$topic = $rk-&gt;newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i &lt; 10; $i++) {
    //RD_KAFKA_PARTITION_UA自动选择分区
    //$option可选
    $topic-&gt;produce(RD_KAFKA_PARTITION_UA, 0, "Message . $i", $option);
}

$len = $rk-&gt;getOutQLen();
while ($len &gt; 0) {
    $len = $rk-&gt;getOutQLen();
//            var_dump($len);
    $rk-&gt;poll(10);
}
var_dump("finish");exit;
</code></pre>
<ul>
<li>列如:将消费者保存为consumer.php文件后,使用php命令行运行</li>
</ul>


</div>
<div id="MySignature" role="contentinfo">
    知止而后有定,定而后能静,静而后能安,安而后能虑,虑而后能得。
所谓诚其意者,毋自欺也。<br><br>
来源:https://www.cnblogs.com/lz0925/p/11280654.html
頁: [1]
查看完整版本: php使用rdkafka进行消费