composer require php-amqplib/php-amqplib推送消息到队列
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin'); //创建通道 $chan = $connection->channel(); $chan->/confirm/i_select(); // 发布确认模式 //推送成功 $chan->set_ack_handler( function (AMQPMessage $message) { var_dump($message); } ); //推送失败 $chan->set_nack_handler( function (AMQPMessage $message) { var_dump($message); } ); //创建交换机 $exchangeName = 'test_exchange';//交换机名称 $chan->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false); //创建队列 $queueName = 'test_queue'; $chan->queue_declare($queueName, false, true, false, false); //绑定队列与交换机 $routeKey = 'test_route';//路由名称 $chan->queue_bind($queueName,$exchangeName,$routeKey); //发布消息 $data = [ 'msg_id'=>1, 'data'=>'测试入队' ]; $msg = new AMQPMessage(json_encode($data),['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]); $chan->basic_publish($msg, 'test_exchange_topic', 'test_route'); $chan->/confirm/i_select_ok(); // 监听成功或失败返回结束 成功/失败 => set_ack_handler/set_nack_handler $chan->close(); $connection->close();消费消息
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin'); //创建通道 $chan = $connection->channel(); //创建交换机 $exchangeName = 'test_exchange';//交换机名称 $chan->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false); //创建队列 $queueName = 'test_queue'; $chan->queue_declare($queueName, false, true, false, false); //绑定队列与交换机 $routeKey = 'test_route'; $chan->queue_bind($queueName,$exchangeName,$routeKey); //获取队列中的消息 $chan->basic_consume($queueName, 'test', false, true, false, false, function($message){ var_dump($message->body); }); while($chan->is_consuming()){ $chan->wait(); } 命令行运行该脚本,就会阻塞获取rabbitmq中的消息。 # PUSH延时队列 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin'); $channel = $connection->channel(); $channel->/confirm/i_select(); // 发布确认模式 //推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { var_dump($message); } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { var_dump($message); } ); //申明两个路由 $delayExchangeRoute = 'delay_exchange_route';//需要延迟交换机绑定路由 $workExchangeRoute = 'work_exchange_route';//消费交换机绑定路由 //给delay_exchange交换机push 延时消息,时间到后转发到work_exchange交换机处理, //声明两个交换机 $delayExchange = 'delay_exchange'; $workExchange = 'work_exchange'; $channel->exchange_declare($delayExchange, 'direct',false,false,false); $channel->exchange_declare($workExchange, 'direct',false,false,false); $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', $workExchange);//很关键 表示过期后由哪个work_exchange处理 $tale->set('x-dead-letter-routing-key',$delayExchangeRoute);//很关键 表示过期后根据什么路由策略转发到work_exchange // $tale->set('x-message-ttl',16000); //存活时长 下面的过期时间不能超过 //绑定需要延迟的队列与交换机 $delayQueue = 'delay_queue'; $channel->queue_declare($delayQueue,false,true,false,false,false,$tale); $channel->queue_bind($delayQueue, $delayExchange,$delayExchangeRoute); //绑定真正消费的对列与交换机 $workQueue = 'work_queue'; $channel->queue_declare($workQueue,false,true,false,false,false,$tale); $channel->queue_bind($workQueue, $workExchange,$workExchangeRoute); $data = [ 'msg_id'=>2, 'data'=>'ceshi延迟队列' ]; $msg = new AMQPMessage(json_encode($data,256), array( 'expiration' => 50000,//延迟50秒 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); $channel->basic_publish($msg,$delayExchange,$delayExchangeRoute); $channel->/confirm/i_select_ok(); $channel->close(); $connection->close();
执行脚本,就会将消息push到delay_queue 队列,50秒后就会转发到work_queue队列,监听了work_queue的进程就会立马消费。
创建交换机参数详解- exchange: 交换机名称。
- type: 交换机类型。
1、direct:完整的路由匹配。
2、topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
3、 fanout:不处理路由,消息发送到交换机,交换机会转发到所有绑定到该交换机的队列。
4、headers:不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue
与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑
定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,
topic 的路由键都需要要字符串形式的。 - passive: 设为false,交换机不存在就创建,并相信成功;设为true,交换机不存酒就会报错。
- durable:是否持久化消息。
- auto_delete:没有队列绑定到该交换机后,自动删除该exchange。
- internal:当前Exchange是否用于RabbitMQ内部使用,默认为false,即不允许使用客户端推送消息。
- nowait:如果为true则表示不等待服务器回执信息,函数将返回null,可以提高访问速度。
- arguments: 扩展参数,定制化的需求,比如优先级等。
- ticket:
- queue:队列名称。
- passive:如果为true,如果存在这个队列,则会返回队列的信息,如果不存在这个队列,则会抛异常,设为fasle,不存在就会自动创建。
- durable:持久化。
- exclusive:设为true,该队列只有当前这个进程可以消费,其他客户端不能读取到这个队列。如果当进程断开连接,这个队列也会被销毁,不管是否设置了持久化或者自动删除。
- auto_delete:没有消息时候,自动删除该队列。
- nowait:不等待处理结果。
- arguments:额外参数。
- ticket:
上面的延迟队列是通过 DLX + TTL 方式实现的,他的原理是先将消息路由到一个正常的队列,当消息过期时间到后,交换机会自动路由到死信队列消费。这样需要两个队列来实现,并且由于队列先进先出原理,会阻塞后面推送的非延迟消息或者延迟时间更短的消息。因此rabbit官方提供了延迟插件解决类似问题。
- 安装插件
下载地址:https://www.rabbitmq.com/community-plugins.html
复制 rabbitmq_delayed_message_exchange-3.8.0.ez 到rabbitmq的plugins目录 - 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
查看已安装的插件
可以通过rabbitmq-plugins list 查看已安装的插件
- 实现过程
$channel->exchange_declare('delayed_message_exchange', 'x-delayed-message',false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct'])); $channel->queue_declare('delay_message_queue',false,true,false,false,false); $channel->queue_bind('delay_message_queue', 'delayed_message_exchange','delayed_message_exchange_route'); $data = [ 'msg_id'=>18, 'time'=>time(), 'desc'=>'测试延迟18s' ]; $msg = new AMQPMessage(json_encode($data,256), array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); $msg->set('application_headers', new AMQPTable([ 'x-delay' => 18000 // 延迟时间,单位毫秒 ])); $channel->basic_publish($msg,'delayed_message_exchange','delayed_message_exchange_route');消息的可靠投递
事务机制
$chan->tx_select(); $chan->tx_commit(); $chan->tx_rollback();
事务机制能保证消息的可靠投递,但会带来性能的损耗,rabbitmq官方提供了更轻量级的实现方案:发送、确认机制。
发送/确认机制
//推送成功 $chan->set_ack_handler( function (AMQPMessage $message) { echo('投递成功:'.$message->body . PHP_EOL); } ); //推送失败 $chan->set_nack_handler( function (AMQPMessage $message) { echo('投递失败:'.$message->body . PHP_EOL); } ); $chan->/confirm/i_select(); // 发布确认模式 //消息push之后调用 wait_for_pending_acks 方法 $chan->wait_for_pending_acks();
注:消息只能保证被成功推送到交换机,如果交换机不能正确路由到queue,消息也会丢失。因为交换机投递消息到queue是异步的。
其他补充方案配合使用
复杂情况下,我们需要记录消息的整个历程,消息push之前,将消息入库,记录状态0,投递成功记录状态1,消息被消费之后记录状态2。业务场景根据需要,可以通过定时检查入库的消息,没有消费或者没有投递成功的重新投递,同时为了保证消费者不重复消费,需要做好消费端的幂等性。
消息的消费确认机制
rabbitmq的消息消费确认机制是通过消费者发送给MQ服务端的,分为自动发送和手动发送。即basic_consume方法的 no_ack 参数。该参数为ture,表示自动发送确认消费机制,即消费者接收到消息,就告诉rabbitmq,我已经消费消息了。设为false:即手动发送确认消费机制,需要在消费逻辑处理完成后加上以下代码:
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
演示可靠消费
这里假设有两个消费者(consumer_01和consumer_02)监听同一个队列,通过模拟consumer_01在处理消息时候出现异常退出,来确认rabbitmq会自动将消息发送给consumer_02消费。
生产者代码:
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin'); //创建通道 $chan = $connection->channel(); //推送成功 $chan->set_ack_handler( function (AMQPMessage $message) { echo('投递成功:'.$message->body . PHP_EOL); } ); //推送失败 $chan->set_nack_handler( function (AMQPMessage $message) { echo('投递失败:'.$message->body . PHP_EOL); } ); $chan->/confirm/i_select(); // 发布确认模式 //创建交换机 $exchangeName = 'test_exchange';//交换机名称 AMQPExchangeType::DIRECT $res = $chan->exchange_declare($exchangeName,AMQPExchangeType::DIRECT , false, true, true, false, false); //创建队列 $queueName = 'test_queue'; $chan->queue_declare($queueName, false, true, false, false); //绑定队列与交换机 $routeKey = 'test_route';//路由名称 $chan->queue_bind($queueName,$exchangeName,$routeKey); //发布消息 $data = [ 'msg_id'=>1, 'time'=>date('Y-m-d H:i:s'), 'data'=>'测试入队' ]; $msg = new AMQPMessage(json_encode($data,256),['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]); $chan->basic_publish($msg, 'test_exchange', 'test_route'); $chan->wait_for_pending_acks(); $chan->close(); $connection->close();
消息投递后在rabbitmq 可视化后台可以看到如下信息:
注意图中红框参数,Ready值为1,Total值为1。下面模拟消费者:
consumer_01代码:
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin'); $chan = $connection->channel(); $chan->basic_consume('test_queue', 'consumer_01', false, false, false, false, function($message){ echo('consumer_01接收到消息:' . $message->body . '==时间' . date('Y-m-d H:i:s') . PHP_EOL); echo('consumer_01消息处理中:' . PHP_EOL); sleep(60);//睡眠60秒模拟消息处理逻辑 echo('consumer_01消息处理完成:' . '==时间' . date('Y-m-d H:i:s') . PHP_EOL); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); echo('------------------------------'.PHP_EOL); }); while($chan->is_consuming()){ $chan->wait(); }
consumer_02代码:
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin'); $chan = $connection->channel(); $queueName = 'test_queue'; $chan->basic_consume($queueName, 'test', false, false, false, false, function($message){ echo('consumer_02接收到消息:' . $message->body . '==时间' . date('Y-m-d H:i:s') . PHP_EOL); echo('consumer_02消息处理中:' . PHP_EOL); sleep(5);//睡眠5秒模拟消息处理 echo('consumer_02消息处理完成:' . '==时间' . date('Y-m-d H:i:s') . PHP_EOL); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); echo('------------------------------'.PHP_EOL); }); while($chan->is_consuming()){ $chan->wait(); }
启动consumer_01:如下图:
此时将该进程杀死,然后rabbitmq后台消息状态如图:出现Unacked 值为1,即未确认消息数量为1。
然后启动consumer_02,rabbitmq就会自动将消息发送给consumer_02消费。如下图所示:
当多个消费者监听同一个队列的时候,MQ会采用简单轮询机制发送消息给消费者的,即队列中的第一条消息发给consumer_01,第二条消息发给consumer_02,然后第三条发给consumer_01,这样依次反复。
但真实场景可能consumer_01处理消息的能力比consumer_02慢很多,比如consumer_01处理一条消息要10秒,consumer_02处理一条消息只需要5秒。这样就可能导致consumer_01的消息堆积,因此rabbitmq提供了basic_qos 方法,将prefetch_count参数设为1,即告诉rabbitmq,如果我有消息正在处理,还没有返回消费确认信息,则不要给我发消息。MQ就会将消息发送给其他监听了该队列并且空闲的消费者。
$chan->basic_qos(0, 1, false);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)