PHP实现php-amqplibphp-amqplib实例RabbitMq

PHP实现php-amqplibphp-amqplib实例RabbitMq,第1张

PHP实现php-amqplib/php-amqplib实例RabbitMq

项目代码

https://gitee.com/owenzhang24/tp5

其他笔记:

1: 列出队列(Listing queues)

如果你想查看Rabbitmq队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用rabbitmqctl 工具:

rabbitmqctl list_queues。

在Windows中,省略sudo:

rabbitmqctl.bat list_queues

2: 工作队列

我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在window系统运行,去掉sudo:

$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

3: rabbitmqctl能够列出服务器上所有的交换器:

$ sudo rabbitmqctl list_exchanges

这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

4:列出所有现存的绑定 rabbitmqctl list_bindings

5: 如果你想把日志保存到文件中,只需要打开控制台输入: (receive_logs.php 源代码)

$ php receive_logs.php > logs_from_rabbit.log

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

如果要触发一个error级别的日志,只需要输入:

$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

第一:安装RabbitMq环境

windows环境的rabbitmq安装与启动

https://my.oschina.net/owenzhang24/blog/5051652

第二:composer require php-amqplib/php-amqplib 第三:代码类
  1. rabbitMq实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php
  2. 供外部调用的rabbitMq类:application/common/lib/classes/RabbitMqWork.php
  3. 测试发送消息到rabbitMq中的方法:application/index/controller/Index.php
  4. 添加php think命令实现接收rabbitMq中的消息:application/common/command private function __construct($exchangeType) { self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); self::$channel = self::$connection->channel(); if (!empty($exchangeType)) { self::$exchangeName = self::$exchangeNames[$exchangeType]; self::$channel->exchange_declare( self::$exchangeName, //交换机名称 $exchangeType, //路由类型 false, //don't check if a queue with the same name exists 是否检测同名队列 true, //the queue will not survive server restarts 是否开启队列持久化 false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列 ); } } public static function instance($exchangeType = '') { if (!self::$instance instanceof self) { self::$instance = new self($exchangeType); } return self::$instance; } private function __clone() { } public function send() { self::$channel->queue_declare('hello', false, false, false); $msg = new AMQPMessage('Hello World!'); self::$channel->basic_publish($msg, '', 'hello'); echo "[X] Sent 'Hello World!'n"; } public function receive($callback) { self::$channel->queue_declare('hello', false, false, false, true); echo "[*] Waiting for messages. To exit press CTRL+Cn"; self::$channel->basic_consume('hello', '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } public function addTask($data = '') { self::$channel->queue_declare('task_queue', false, true, false, true); if (empty($data)) $data = 'Hello World!'; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); self::$channel->basic_publish($msg, '', 'task_queue'); echo "[x] Sent $data n"; } public function workTask($callback) { self::$channel->queue_declare('task_queue', false, true, false, true); echo ' [*] Waiting for messages. To exit press CTRL+C', "n"; self::$channel->basic_qos(null, 1, null); self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } public function sendQueue($data = '') { if (empty($data)) $data = 'info:Hello World!'; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName); echo "[x] Sent $data n"; } public function subscribeQueue($callback) { list($queue_name, ,) = self::$channel->queue_declare( "", //队列名称 false, //don't check if a queue with the same name exists 是否检测同名队列 true, //the queue will not survive server restarts 是否开启队列持久化 true, //the queue might be accessed by other channels 队列是否可以被其他队列访问 false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列 ); self::$channel->queue_bind($queue_name, self::$exchangeName); echo "[*] Waiting for logs. To exit press CTRL+C n"; self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } public function sendDirect($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo "[x] Sent $routingKey:$data n"; } public function receiveDirect(Closure $callback, array $bindingKeys) { list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey); } echo "[x] Waiting for logs. To exit press CTRL+C n"; self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } public function sendTopic($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo " [x] Sent ", $routingKey, ':', $data, " n"; } public function receiveTopic(Closure $callback, array $bindingKeys) { list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey); } echo ' [*] Waiting for logs. To exit press CTRL+C', "n"; self::$channel->basic_consume($queueName, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } public function __destruct() { // TODO: Implement __destruct() method. self::$channel->close(); self::$connection->close(); } }

    application/common/lib/classes/RabbitMqWork.php

    RabbitMq = RabbitMq::instance($exchageType);
        }
    
        
        public function send()
        {
            $this->RabbitMq->send();
        }
    
        
        public function receive($callback)
        {
            $this->RabbitMq->receive($callback);
        }
    
        
        public function addTask($data)
        {
            $this->RabbitMq->addTask($data);
        }
    
        
        public function workTask($callback)
        {
            $this->RabbitMq->workTask($callback);
        }
    
        
        public function sendQueue($data)
        {
            $this->RabbitMq->sendQueue($data);
        }
    
        
        public function subscribeQueue($callback)
        {
            $this->RabbitMq->subscribeQueue($callback);
        }
    
        
        public function sendDirect($routingKey, $data)
        {
            $this->RabbitMq->sendDirect($routingKey, $data);
        }
    
        
        public function receiveDirect(Closure $callback, array $bindingKeys)
        {
            $this->RabbitMq->receiveDirect($callback, $bindingKeys);
        }
    
        
        public function sendTopic($routingKey, $data)
        {
            $this->RabbitMq->sendTopic($routingKey, $data);
        }
    
        
        public function receiveTopic(Closure $callback, array $bindingKeys)
        {
            $this->RabbitMq->receiveTopic($callback, $bindingKeys);
        }
    }

    application/index/controller/Index.php

    send();
    //        $this->addTask();
    //        $this->sendQueue();
    //        $this->sendDirect();
            $this->sendTopic();
            var_dump(11);
            die();
        }
        public function searchBlog()
        {
    //        $id=1;
    //        $res = SyncBlog::getInstance()->syncBlog($id);
            $search='11';
            $res = SearchBlog::getInstance()->searchBlog($search, 1, 100);
            var_dump($res);
            die();
            var_dump(1111);
            die();
        }
    
        
        public function send()
        {
            $RabbitMqWork = new RabbitMqWork();
            $RabbitMqWork->send();
        }
    
        
        public function addTask()
        {
            $data = input('data', 'This is work task!');
            $RabbitMqWork = new RabbitMqWork();
            $RabbitMqWork->addTask($data);
        }
    
        
        public function sendQueue()
        {
            $data = input('data', 'This is send queue1');
            $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
            $RabbitMqWork->sendQueue($data);
        }
    
        
        public function sendDirect()
        {
            $data = input('data', 'Hello World!');
            $routingKey = input('routingKey', 'info');
            $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
            $RabbitMqWork->sendDirect($routingKey, $data);
        }
    
        
        public function sendTopic()
        {
            $data = input('data', 'Hello World!');
            $routingKey = input('routingKey', 'lazy.boy');
            $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
            $RabbitMqWork->sendTopic($routingKey, $data);
        }
    }

    application/command.php

    
    // +----------------------------------------------------------------------
    
    return [
        'simpleMq' => 'applicationcommandSimpleWork',
        'workQueue' => 'applicationcommandWorkQueue',
        'sendQueue' => 'applicationcommandSendQueue',
        'directQueue' => 'applicationcommandDirectQueue',
        'topicQueue' => 'applicationcommandTopicQueue',
    ];

    application/common/command namespace appcommand; use appcommonlibclassesrabbitmqRabbitMq; use appcommonlibclassesRabbitMqWork; use thinkconsoleCommand; use thinkconsoleInput; use thinkconsoleOutput; class DirectQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('directQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $callback = function ($msg){ echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body n"; }; $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS); } }

    application/command/SendQueue.php

    setName('sendQueue');
        }
    
        protected function execute(Input $input, Output $output)
        {
            $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
            $callback = function ($msg) {
                echo 'Receive:';
                echo "Msg:$msg->body n";
                Log::error("Msg:$msg->body");
            };
            $RabbitMqWork->subscribeQueue($callback);
        }
    }

    application/command/SimpleWork.php

    setName('simpleMq');
        }
    
        protected function execute(Input $input, Output $output)
        {
            $RabbitMqWork= new RabbitMqWork();
            $callback = function ($msg){
                echo 'Receive:';
                $queueName = $msg->delivery_info['routing_key'];
                $msgData = $msg->body;
                $isAck = true;
                echo 'Msg:'.$msgData."n";
                echo 'QueueName:'.$queueName."n";
                if($isAck) {
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                }
            };
            $RabbitMqWork->receive($callback);
        }
    }

    application/command/TopicQueue.php

    setName('topicQueue');
        }
    
        protected function execute(Input $input, Output $output)
        {
            $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
            $callback = function ($msg){
                echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n";
            };
            $bindingKeys = [
                '*.orange.*',
                '*.*.rabbit',
                'lazy.#'
            ];
            $RabbitMqWork->receiveTopic($callback,$bindingKeys);
        }
    }

    application/command/WorkQueue.php

    setName('workQueue');
        }
    
        protected function execute(Input $input, Output $output)
        {
            $RabbitMqWork = new RabbitMqWork();
            $callback = function ($msg){
                echo " [x] Received ", $msg->body, "n";
                sleep(substr_count($msg->body, '.'));
                echo " [x] Done", "n";
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };
            $RabbitMqWork->workTask($callback);
        }
    }

    Buy me a cup of coffee :)

    欢迎分享,转载请注明来源:内存溢出

    原文地址: https://outofmemory.cn/zaji/5688454.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存