详解PHP实现生产者与消费者(Kafka应用)

详解PHP实现生产者与消费者(Kafka应用),第1张

概述详解PHP实现生产者消费者(Kafka应用)

本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

生产者配置参数创建生产者实例创建主题实例(依赖生产者)生产主题消息推送消息

具体代码如下:

        $conf = new \RdKafka\Conf();        // 绑定服务节点        $conf->set('Metadata.broker.List', '127.0.0.1:32772');        // 创建生产者        $kafka = new \RdKafka\Producer($conf);        // 创建主题实例        $topic = $kafka->newtopic('p1r1');        // 生产主题数据,此时消息在缓冲区中,并没有真正被推送        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');        // 阻塞时间(毫秒), 0为非阻塞        $kafka->poll(0);         // 推送消息,如果不调用此函数,消息不会被发送且会丢失        $result = $kafka->flush(5000);        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {            throw new \RuntimeException('Was unable to flush, messages might be lost!');        }
消费者

创建一个消费者需要几个步骤:

消费者配置参数应用配置参数创建消费者实例订阅对应主题拉取数据提交位移

具体代码如下:

        $conf = new \RdKafka\Conf();        // 绑定消费者组        $conf->set('group.ID', 'ceshi');        // 绑定服务节点,多个用,分隔        $conf->set('Metadata.broker.List', '127.0.0.1:32787');        // 设置自动提交为false        $conf->set('enable.auto.commit', 'false');        // 设置当前消费者拉取数据时的偏移量, 可选参数:        // earlIEst: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。        // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。        $conf->set('auto.offset.reset', 'earlIEst');        // 创建消费者实例        $consumer = new \RdKafka\KafkaConsumer($conf);        // 消费者订阅主题,数组形式        $consumer->subscribe(['topic1','topic2']);        while (true) {            // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)            $message = $consumer->consume(5000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                    // 业务逻辑                    var_dump($message);                    // 提交位移                    $consumer->commit($message);                    break;                case RD_KAFKA_RESP_ERR__PARTITION_EOF:                    echo "No more messages; will wait for more\n";                    break;                case RD_KAFKA_RESP_ERR__TIMED_OUT:                    echo "Timed out\n";                    break;                default:                    throw new \Exception($message->errstr(), $message->err);                    break;            }        }        // 关闭消费者(一般用在脚本中,不需要关闭)        $conumser->close();

只消费指定分区中的数据:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用    $consumer->assign([        new RdKafka\topicPartition("topic", 0),        new RdKafka\topicPartition("topic", 1),    ]);
总结

以上是内存溢出为你收集整理的详解PHP实现生产者与消费者(Kafka应用)全部内容,希望文章能够帮你解决详解PHP实现生产者与消费者(Kafka应用)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1012689.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-22
下一篇 2022-05-22

发表评论

登录后才能评论

评论列表(0条)

保存